22
33import cn .hutool .core .util .ObjectUtil ;
44import com .github .cadecode .uniboot .common .plugin .mq .consts .RabbitConst ;
5+ import com .github .cadecode .uniboot .common .plugin .mq .handler .AbstractTxMsgHandler ;
56import lombok .RequiredArgsConstructor ;
67import lombok .extern .slf4j .Slf4j ;
78import org .springframework .amqp .core .Exchange ;
@@ -37,6 +38,8 @@ public class RabbitCallback implements ConfirmCallback, ReturnsCallback, Initial
3738
3839 private final RabbitTemplate rabbitTemplate ;
3940
41+ private final AbstractTxMsgHandler txMsgTaskHandler ;
42+
4043 /**
4144 * 消息是否成功到达交换机
4245 */
@@ -46,9 +49,11 @@ public void confirm(CorrelationData correlationData, boolean ack, String cause)
4649 String correlationId = ObjectUtil .defaultIfNull (correlationData , CorrelationData ::getId , "" );
4750 if (ack ) {
4851 log .debug ("Rabbit message send ok, id:{}" , correlationId );
52+ txMsgTaskHandler .handleConfirm (correlationData , true , cause );
4953 return ;
5054 }
5155 log .error ("Rabbit message send fail, correlationId:{}, cause:{}" , correlationId , cause );
56+ txMsgTaskHandler .handleConfirm (correlationData , false , cause );
5257 }
5358
5459 /**
@@ -62,6 +67,7 @@ public void returnedMessage(ReturnedMessage returned) {
6267 }
6368 log .error ("Rabbit message is returned, message:{}, replyCode:{}. replyText:{}, exchange:{}, routingKey :{}" ,
6469 returned .getMessage (), returned .getReplyCode (), returned .getReplyText (), returned .getExchange (), returned .getRoutingKey ());
70+ txMsgTaskHandler .handleReturned (returned );
6571 }
6672
6773 /**
0 commit comments