Skip to content

Commit 440e860

Browse files
committed
feat: 添加事务消息相关
1 parent 007c569 commit 440e860

8 files changed

Lines changed: 349 additions & 0 deletions

File tree

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.github.cadecode.uniboot.common.plugin.mq.config;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import lombok.extern.slf4j.Slf4j;
5+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
6+
import org.springframework.context.annotation.Configuration;
7+
8+
/**
9+
* 事务消息自动配置
10+
*
11+
* @author Cade Li
12+
* @since 2023/8/20
13+
*/
14+
@Slf4j
15+
@RequiredArgsConstructor
16+
@EnableConfigurationProperties(TxMsgProperties.class)
17+
@Configuration
18+
public class TxMsgAutoConfig {
19+
20+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package com.github.cadecode.uniboot.common.plugin.mq.config;
2+
3+
import cn.hutool.core.bean.BeanUtil;
4+
import cn.hutool.core.bean.copier.CopyOptions;
5+
import cn.hutool.core.util.ObjectUtil;
6+
import lombok.AllArgsConstructor;
7+
import lombok.Builder;
8+
import lombok.Data;
9+
import lombok.NoArgsConstructor;
10+
import org.springframework.beans.factory.InitializingBean;
11+
import org.springframework.boot.context.properties.ConfigurationProperties;
12+
13+
/**
14+
* TxMsg 配置
15+
*
16+
* @author Cade Li
17+
* @since 2023/8/19
18+
*/
19+
@Data
20+
@ConfigurationProperties(TxMsgProperties.ENV_CONF_TX_MSG_PREFIX)
21+
public class TxMsgProperties implements InitializingBean {
22+
23+
public static final String ENV_CONF_TX_MSG_PREFIX = "uni-boot.mq.tx-msg";
24+
25+
public static final long DEFAULT_RETRY_FIX_DELAY = 30 * 1000;
26+
27+
public static final long DEFAULT_CLEAR_FIX_DELAY = 2 * 60 * 60 * 1000;
28+
29+
public static final long DEFAULT_AUTO_CLEAR_INTERVAL = 60 * 60 * 1000;
30+
31+
// 重试默认属性
32+
33+
public static final int DEFAULT_MAX_RETRY_TIMES = 3;
34+
35+
public static final long DEFAULT_BACKOFF_INIT_INTERVAL = 10 * 1000;
36+
37+
public static final double DEFAULT_BACKOFF_MULTIPLIER = 2.0;
38+
39+
public static final long DEFAULT_BACKOFF_MAX_INTERVAL = 30 * 60 * 1000;
40+
41+
// 重试默认属性 --end
42+
43+
/**
44+
* 定时重试间隔时间
45+
*/
46+
private Long retryFixDelay = DEFAULT_RETRY_FIX_DELAY;
47+
48+
/**
49+
* 定时清理记录间隔时间
50+
*/
51+
private Long clearFixDelay = DEFAULT_CLEAR_FIX_DELAY;
52+
53+
/**
54+
* 是否开启自动清理
55+
*/
56+
private Boolean autoClear = true;
57+
58+
/**
59+
* 自动清理多久之前的记录
60+
*/
61+
private Long autoClearInterval = DEFAULT_AUTO_CLEAR_INTERVAL;
62+
63+
/**
64+
* 事务消息配置
65+
*/
66+
private MsgOption defaultMsgOption;
67+
68+
public MsgOption createMsgOption(MsgOption msgOption) {
69+
if (ObjectUtil.isNull(msgOption)) {
70+
msgOption = new MsgOption();
71+
}
72+
BeanUtil.copyProperties(defaultMsgOption, msgOption, CopyOptions.create().setOverride(false));
73+
return msgOption;
74+
}
75+
76+
@Override
77+
public void afterPropertiesSet() {
78+
MsgOption allDefaultOption = MsgOption.builder()
79+
.maxRetryTimes(DEFAULT_MAX_RETRY_TIMES)
80+
.backoffInitInterval(DEFAULT_BACKOFF_INIT_INTERVAL)
81+
.backoffMultiplier(DEFAULT_BACKOFF_MULTIPLIER)
82+
.backoffMaxInterval(DEFAULT_BACKOFF_MAX_INTERVAL)
83+
.build();
84+
if (ObjectUtil.isNull(defaultMsgOption)) {
85+
defaultMsgOption = allDefaultOption;
86+
} else {
87+
BeanUtil.copyProperties(allDefaultOption, defaultMsgOption, CopyOptions.create().setOverride(false));
88+
}
89+
}
90+
91+
@Data
92+
@AllArgsConstructor
93+
@NoArgsConstructor
94+
@Builder
95+
public static class MsgOption {
96+
97+
// 重试相关属性
98+
99+
private Integer maxRetryTimes;
100+
101+
private Long backoffInitInterval;
102+
103+
private Double backoffMultiplier;
104+
105+
private Long backoffMaxInterval;
106+
107+
// 重试相关属性 -- end
108+
}
109+
110+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.github.cadecode.uniboot.common.plugin.mq.exception;
2+
3+
/**
4+
* 事务消息异常
5+
*
6+
* @author Cade Li
7+
* @since 2023/8/20
8+
*/
9+
public class TxMsgException extends RuntimeException {
10+
public TxMsgException() {
11+
super();
12+
}
13+
14+
public TxMsgException(String message) {
15+
super(message);
16+
}
17+
18+
public TxMsgException(String message, Throwable cause) {
19+
super(message, cause);
20+
}
21+
22+
public TxMsgException(Throwable cause) {
23+
super(cause);
24+
}
25+
26+
protected TxMsgException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
27+
super(message, cause, enableSuppression, writableStackTrace);
28+
}
29+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.github.cadecode.uniboot.common.plugin.mq.handler;
2+
3+
import com.github.cadecode.uniboot.common.plugin.mq.config.TxMsgProperties.MsgOption;
4+
import com.github.cadecode.uniboot.common.plugin.mq.model.BaseTxMsg;
5+
import org.springframework.amqp.core.ReturnedMessage;
6+
import org.springframework.amqp.rabbit.connection.CorrelationData;
7+
8+
/**
9+
* 事务消息处理器
10+
* 1. 入库/更新
11+
* 2. 重试/清理记录
12+
*
13+
* @author Cade Li
14+
* @since 2023/8/20
15+
*/
16+
public abstract class AbstractTxMsgHandler {
17+
18+
public abstract void doRetry();
19+
20+
public abstract void doClear(Long autoClearInterval);
21+
22+
public abstract void sendNoTransaction(BaseTxMsg txMsg, MsgOption msgOption);
23+
24+
public abstract void sendNotCommit(BaseTxMsg txMsg, MsgOption msgOption);
25+
26+
public abstract void saveBeforeRegister(BaseTxMsg txMsg, MsgOption msgOption);
27+
28+
public abstract void sendCommitted(BaseTxMsg txMsg, MsgOption msgOption);
29+
30+
public abstract void handleConfirm(CorrelationData correlationData, boolean ack, String cause);
31+
32+
public abstract void handleReturned(ReturnedMessage returned);
33+
34+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.github.cadecode.uniboot.common.plugin.mq.model;
2+
3+
/**
4+
* 事务消息基本信息抽象
5+
*
6+
* @author Cade Li
7+
* @since 2023/8/19
8+
*/
9+
public interface BaseTxMsg {
10+
11+
String getId();
12+
13+
String getBizType();
14+
15+
String getBizKey();
16+
17+
String getExchange();
18+
19+
String getRoutingKey();
20+
21+
String getMessage();
22+
23+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.github.cadecode.uniboot.common.plugin.mq.model;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Builder;
5+
import lombok.Data;
6+
import lombok.NoArgsConstructor;
7+
8+
/**
9+
* 事务消息基本信息
10+
*
11+
* @author Cade Li
12+
* @since 2023/8/18
13+
*/
14+
@Data
15+
@AllArgsConstructor
16+
@NoArgsConstructor
17+
@Builder
18+
public class TxMsg implements BaseTxMsg {
19+
20+
private String id;
21+
22+
/**
23+
* 业务类型
24+
*/
25+
private String bizType;
26+
27+
/**
28+
* 业务键
29+
*/
30+
private String bizKey;
31+
32+
private String exchange;
33+
34+
private String routingKey;
35+
36+
private String message;
37+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.github.cadecode.uniboot.common.plugin.mq.task;
2+
3+
import cn.hutool.core.util.ObjectUtil;
4+
import com.github.cadecode.uniboot.common.plugin.mq.config.TxMsgProperties;
5+
import com.github.cadecode.uniboot.common.plugin.mq.handler.AbstractTxMsgHandler;
6+
import lombok.RequiredArgsConstructor;
7+
import lombok.extern.slf4j.Slf4j;
8+
import org.springframework.boot.context.event.ApplicationStartedEvent;
9+
import org.springframework.context.event.EventListener;
10+
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
11+
import org.springframework.stereotype.Component;
12+
13+
/**
14+
* 事务消息定时任务
15+
*
16+
* @author Cade Li
17+
* @since 2023/8/19
18+
*/
19+
@Slf4j
20+
@RequiredArgsConstructor
21+
@Component
22+
public class TxMsgTask {
23+
24+
private final ThreadPoolTaskScheduler taskScheduler;
25+
26+
private final TxMsgProperties txMsgProperties;
27+
28+
private final AbstractTxMsgHandler txMsgTaskHandler;
29+
30+
@EventListener(ApplicationStartedEvent.class)
31+
public void onApplicationStartedEvent() {
32+
log.info("TxMsg task do retry started");
33+
taskScheduler.scheduleWithFixedDelay(txMsgTaskHandler::doRetry, txMsgProperties.getRetryFixDelay());
34+
log.info("TxMsg task do clear start, {}", txMsgProperties.getAutoClear());
35+
if (ObjectUtil.equal(txMsgProperties.getAutoClear(), true)) {
36+
taskScheduler.scheduleWithFixedDelay(() -> txMsgTaskHandler.doClear(txMsgProperties.getAutoClearInterval()),
37+
txMsgProperties.getClearFixDelay());
38+
}
39+
}
40+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package com.github.cadecode.uniboot.common.plugin.mq.util;
2+
3+
import com.github.cadecode.uniboot.common.plugin.mq.config.TxMsgProperties;
4+
import com.github.cadecode.uniboot.common.plugin.mq.config.TxMsgProperties.MsgOption;
5+
import com.github.cadecode.uniboot.common.plugin.mq.handler.AbstractTxMsgHandler;
6+
import com.github.cadecode.uniboot.common.plugin.mq.model.BaseTxMsg;
7+
import lombok.RequiredArgsConstructor;
8+
import lombok.extern.slf4j.Slf4j;
9+
import org.springframework.stereotype.Component;
10+
import org.springframework.transaction.support.TransactionSynchronization;
11+
import org.springframework.transaction.support.TransactionSynchronizationManager;
12+
13+
/**
14+
* 事务消息工具类
15+
*
16+
* @author Cade Li
17+
* @since 2023/8/19
18+
*/
19+
@Slf4j
20+
@RequiredArgsConstructor
21+
@Component
22+
public class TxMsgKit {
23+
24+
private final TxMsgProperties txMsgProperties;
25+
26+
private final AbstractTxMsgHandler txMsgTaskHandler;
27+
28+
public void sendTx(BaseTxMsg txMsg) {
29+
sendTx(txMsg, null);
30+
}
31+
32+
public void sendTx(BaseTxMsg txMsg, MsgOption msgOption) {
33+
MsgOption currOption = txMsgProperties.createMsgOption(msgOption);
34+
// 若没有事务
35+
if (!TransactionSynchronizationManager.isActualTransactionActive()) {
36+
log.error("TxMsg send need transaction, txMsg:{}, biz:{}_{}", txMsg.getId(), txMsg.getBizType(), txMsg.getBizKey());
37+
txMsgTaskHandler.sendNoTransaction(txMsg, currOption);
38+
return;
39+
}
40+
// 持久化
41+
txMsgTaskHandler.saveBeforeRegister(txMsg, msgOption);
42+
// 注册到事务管理器
43+
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
44+
@Override
45+
public void afterCompletion(int status) {
46+
// 判断状态
47+
if (status != TransactionSynchronization.STATUS_COMMITTED) {
48+
log.error("TxMsg send not committed, txMsg:{}, biz:{}_{}", txMsg.getId(), txMsg.getBizType(), txMsg.getBizKey());
49+
txMsgTaskHandler.sendNotCommit(txMsg, currOption);
50+
return;
51+
}
52+
txMsgTaskHandler.sendCommitted(txMsg, currOption);
53+
}
54+
});
55+
}
56+
}

0 commit comments

Comments
 (0)