Skip to content

Commit b21d09c

Browse files
committed
feat: TxMsg 重试支持指定 connectionName
1 parent f4c650d commit b21d09c

5 files changed

Lines changed: 35 additions & 12 deletions

File tree

common/plugin/mq/src/main/java/com/github/cadecode/uniboot/common/plugin/mq/model/BaseTxMsg.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,6 @@ public interface BaseTxMsg {
2020

2121
String getMessage();
2222

23+
String getConnectionName();
24+
2325
}

common/plugin/mq/src/main/java/com/github/cadecode/uniboot/common/plugin/mq/model/TxMsg.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,7 @@ public class TxMsg implements BaseTxMsg {
3434
private String routingKey;
3535

3636
private String message;
37+
38+
private String connectionName;
39+
3740
}

framework/framework_base/src/main/java/com/github/cadecode/uniboot/framework/base/plugin/bean/po/PlgMqMsg.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public class PlgMqMsg {
3636

3737
private String message;
3838

39+
private String connectionName;
40+
3941
private SendStateEnum sendState;
4042

4143
private ConsumeStateEnum consumeState;

framework/framework_base/src/main/java/com/github/cadecode/uniboot/framework/base/plugin/bean/vo/PlgMqMsgVo.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public static class PlgMqMsgPageResVo {
3434
private String bizKey;
3535
private String exchange;
3636
private String routingKey;
37+
private String connectionName;
3738
private String message;
3839
private SendStateEnum sendState;
3940
private ConsumeStateEnum consumeState;

framework/framework_base/src/main/java/com/github/cadecode/uniboot/framework/base/plugin/handler/MqTxMsgHandler.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.amqp.core.Message;
2626
import org.springframework.amqp.core.ReturnedMessage;
2727
import org.springframework.amqp.rabbit.connection.CorrelationData;
28+
import org.springframework.amqp.rabbit.connection.SimpleResourceHolder;
2829
import org.springframework.stereotype.Component;
2930

3031
import java.util.Date;
@@ -143,6 +144,11 @@ public void checkBeforeSend(BaseTxMsg txMsg, MsgOption msgOption) {
143144
if (ObjUtil.isEmpty(txMsg.getId())) {
144145
((TxMsg) txMsg).setId(String.valueOf(IdWorker.getId(txMsg)));
145146
}
147+
// 填充 connectionName
148+
if (ObjUtil.isEmpty(txMsg.getConnectionName())) {
149+
String connectionName = (String) SimpleResourceHolder.get(RabbitUtil.template().getConnectionFactory());
150+
((TxMsg) txMsg).setConnectionName(connectionName);
151+
}
146152
}
147153

148154
@Override
@@ -190,18 +196,27 @@ public void sendCommitted(BaseTxMsg txMsg, MsgOption msgOption) {
190196
}
191197

192198
private void doSendCommittedOrRetry(BaseTxMsg txMsg) {
193-
// 放入 txMsg id
194-
CorrelationData correlationData = new CorrelationData(txMsg.getId());
195-
RabbitUtil.send(txMsg.getExchange(), txMsg.getRoutingKey(), txMsg.getMessage(), msg -> {
196-
// 设置事务消息 head 标记
197-
msg.getMessageProperties().getHeaders().put(HEAD_TX_MSG_ID, txMsg.getId());
198-
msg.getMessageProperties().getHeaders().put(HEAD_TX_MSG_BIZ_TYPE, txMsg.getBizType());
199-
msg.getMessageProperties().getHeaders().put(HEAD_TX_MSG_BIZ_KEY, txMsg.getBizKey());
200-
// 设置 ReturnedMessage,用于 callback 中获取消息
201-
ReturnedMessage defaultReturnedMessage = new ReturnedMessage(msg, 0, DEFAULT_TX_MSG_REPLY_TEXT, null, null);
202-
correlationData.setReturned(defaultReturnedMessage);
203-
return msg;
204-
}, correlationData);
199+
try {
200+
if (ObjUtil.isNotNull(txMsg.getConnectionName())) {
201+
SimpleResourceHolder.bind(RabbitUtil.template().getConnectionFactory(), txMsg.getConnectionName());
202+
}
203+
// 放入 txMsg id
204+
CorrelationData correlationData = new CorrelationData(txMsg.getId());
205+
RabbitUtil.send(txMsg.getExchange(), txMsg.getRoutingKey(), txMsg.getMessage(), msg -> {
206+
// 设置事务消息 head 标记
207+
msg.getMessageProperties().getHeaders().put(HEAD_TX_MSG_ID, txMsg.getId());
208+
msg.getMessageProperties().getHeaders().put(HEAD_TX_MSG_BIZ_TYPE, txMsg.getBizType());
209+
msg.getMessageProperties().getHeaders().put(HEAD_TX_MSG_BIZ_KEY, txMsg.getBizKey());
210+
// 设置 ReturnedMessage,用于 callback 中获取消息
211+
ReturnedMessage defaultReturnedMessage = new ReturnedMessage(msg, 0, DEFAULT_TX_MSG_REPLY_TEXT, null, null);
212+
correlationData.setReturned(defaultReturnedMessage);
213+
return msg;
214+
}, correlationData);
215+
} finally {
216+
if (ObjUtil.isNotNull(txMsg.getConnectionName())) {
217+
SimpleResourceHolder.unbindIfPossible(RabbitUtil.template().getConnectionFactory());
218+
}
219+
}
205220
}
206221

207222
@Override

0 commit comments

Comments
 (0)