Skip to content

Commit c6c8769

Browse files
committed
feat: 添加 RabbitMQ 消息确认和退回回调
1 parent e0c6349 commit c6c8769

1 file changed

Lines changed: 83 additions & 0 deletions

File tree

  • common/plugin/mq/src/main/java/com/github/cadecode/uniboot/common/plugin/mq/rabbit
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package com.github.cadecode.uniboot.common.plugin.mq.rabbit;
2+
3+
import cn.hutool.core.util.ObjectUtil;
4+
import com.github.cadecode.uniboot.common.plugin.mq.consts.RabbitConst;
5+
import lombok.RequiredArgsConstructor;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.springframework.amqp.core.Exchange;
8+
import org.springframework.amqp.core.ReturnedMessage;
9+
import org.springframework.amqp.rabbit.connection.CorrelationData;
10+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
11+
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
12+
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnsCallback;
13+
import org.springframework.beans.factory.InitializingBean;
14+
import org.springframework.stereotype.Component;
15+
16+
import java.util.List;
17+
import java.util.Map;
18+
import java.util.stream.Collectors;
19+
20+
/**
21+
* RabbitMQ 可靠性回调
22+
*
23+
* @author Cade Li
24+
* @since 2023/8/18
25+
*/
26+
@Slf4j
27+
@RequiredArgsConstructor
28+
@Component
29+
public class RabbitCallback implements ConfirmCallback, ReturnsCallback, InitializingBean {
30+
31+
/**
32+
* 交换机名称映射
33+
*/
34+
private Map<String, Exchange> exchangeNameMap;
35+
36+
private final List<Exchange> exchanges;
37+
38+
private final RabbitTemplate rabbitTemplate;
39+
40+
/**
41+
* 消息是否成功到达交换机
42+
*/
43+
@Override
44+
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
45+
// 获取 correlationData id
46+
String correlationId = ObjectUtil.defaultIfNull(correlationData, CorrelationData::getId, "");
47+
if (!ack) {
48+
log.error("Rabbit message send fail, correlationId:{}, cause:{}", correlationId, cause);
49+
}
50+
}
51+
52+
/**
53+
* 消息是否成功投递到队列
54+
*/
55+
@Override
56+
public void returnedMessage(ReturnedMessage returned) {
57+
// 由于 delay 交换机会默认执行退回回调,不需要处理
58+
if (isExchangeDelayed(returned.getExchange())) {
59+
return;
60+
}
61+
log.error("Rabbit message is returned, message:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}",
62+
returned.getMessage(), returned.getReplyCode(), returned.getReplyText(), returned.getExchange(), returned.getRoutingKey());
63+
}
64+
65+
/**
66+
* 判断是否是 delay 交换机
67+
*/
68+
private boolean isExchangeDelayed(String exchangeName) {
69+
if (ObjectUtil.isEmpty(exchangeNameMap)) {
70+
exchangeNameMap = exchanges.stream().collect(Collectors.toMap(Exchange::getName, o -> o));
71+
}
72+
// 若是 delay 交换机
73+
Exchange exchange = exchangeNameMap.get(exchangeName);
74+
return exchange.isDelayed() || RabbitConst.EXC_TYPE_DELAYED.equals(exchange.getType());
75+
}
76+
77+
@Override
78+
public void afterPropertiesSet() {
79+
// 设置回调
80+
rabbitTemplate.setConfirmCallback(this);
81+
rabbitTemplate.setReturnsCallback(this);
82+
}
83+
}

0 commit comments

Comments
 (0)