11package com .github .cadecode .uniboot .common .plugin .mq .util ;
22
33import cn .hutool .core .util .IdUtil ;
4+ import cn .hutool .core .util .ObjectUtil ;
45import lombok .RequiredArgsConstructor ;
56import org .springframework .amqp .core .MessagePostProcessor ;
67import org .springframework .amqp .rabbit .connection .CorrelationData ;
@@ -27,31 +28,50 @@ public static RabbitTemplate template() {
2728 }
2829
2930 public static String send (String exchangeName , String routingKey , Object message ) {
30- return send (exchangeName , routingKey , message , msg -> msg );
31+ return send (exchangeName , routingKey , message , msg -> msg , null );
3132 }
3233
3334 public static String send (String exchangeName , String routingKey , Object message , MessagePostProcessor postProcessor ) {
34- CorrelationData correlationData = geneCorrelationData (message );
35+ return send (exchangeName , routingKey , message , postProcessor , null );
36+ }
37+
38+ public static String send (String exchangeName , String routingKey , Object message , MessagePostProcessor postProcessor ,
39+ CorrelationData correlationData ) {
40+ // 不存在消息 id 时构造新的 correlationData
41+ if (ObjectUtil .isNull (correlationData ) || ObjectUtil .isNull (correlationData .getId ())) {
42+ correlationData = geneCorrelationData ();
43+ }
3544 RABBIT_TEMPLATE .convertAndSend (exchangeName , routingKey , message , postProcessor , correlationData );
3645 return correlationData .getId ();
3746 }
3847
3948 public static String sendDelay (String exchangeName , String routingKey , Object message , Integer delayTime ) {
40- return sendDelay (exchangeName , routingKey , message , delayTime , msg -> msg );
49+ return sendDelay (exchangeName , routingKey , message , delayTime , msg -> msg , null );
4150 }
4251
43- public static String sendDelay (String exchangeName , String routingKey , Object message , Integer delayTime , MessagePostProcessor postProcessor ) {
44- CorrelationData correlationData = geneCorrelationData (message );
52+ public static String sendDelay (String exchangeName , String routingKey , Object message , Integer delayTime ,
53+ MessagePostProcessor postProcessor ) {
54+ return sendDelay (exchangeName , routingKey , message , delayTime , postProcessor , null );
55+ }
56+
57+ public static String sendDelay (String exchangeName , String routingKey , Object message , Integer delayTime ,
58+ MessagePostProcessor postProcessor , CorrelationData correlationData ) {
59+ CorrelationData currCorrelationData ;
60+ if (ObjectUtil .isNull (correlationData ) || ObjectUtil .isNull (correlationData .getId ())) {
61+ currCorrelationData = geneCorrelationData ();
62+ } else {
63+ currCorrelationData = correlationData ;
64+ }
4565 RABBIT_TEMPLATE .convertAndSend (exchangeName , routingKey , message , msg -> {
4666 // 注入 delay 时间
4767 msg .getMessageProperties ().setDelay (delayTime );
48- postProcessor .postProcessMessage (msg , correlationData );
68+ postProcessor .postProcessMessage (msg , currCorrelationData );
4969 return msg ;
50- }, correlationData );
51- return correlationData .getId ();
70+ }, currCorrelationData );
71+ return currCorrelationData .getId ();
5272 }
5373
54- public static CorrelationData geneCorrelationData (Object message ) {
74+ public static CorrelationData geneCorrelationData () {
5575 // 有 hutool 雪花算法生成 id
5676 String msgId = String .valueOf (IdUtil .getSnowflakeNextIdStr ());
5777 return new CorrelationData (msgId );
0 commit comments