- 保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制
- publisher confirmCallback 确认模式
- publisher returnCalllback 未投递到queue退回模式
- consumer ack机制
可靠抵达
ConfirmCallback(生产端确认)
1,开启发送端确认
spring.rabbitmq.publisher-confirms=true
#上面的是过时的配置
spring.rabbitmq.publisher-confirm-type=correlated
2,设置确认回调
@Configuration
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 使用JSON序列化机制,进行消息转换
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* 定制RabbitTemplate
*
* @PostConstruct MyRabbitConfig对象创建完成以后,执行这个方法
*/
@PostConstruct
public void initRabbitTemplate(){
//设置确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @Author pms
* @param correlationData 当前消息的唯一关联数据(这个是消息的唯一id)
* @param b 消息是否成功收到
* @param s 失败的原因
* @Return void
* @Date 2021/7/19
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("confirm...correlationData:"+correlationData+",ack:"+b+",cause:"+s);
}
});
}
}
ReturnCallback(生产端确认)
1,修改配置
#开启发送端消息抵达队列的确认
spring.rabbitmq.publisher-returns=true
#只要抵达队列,已异步发送优先回调我们这个returnconfirm
spring.rabbitmq.template.mandatory=true
2,设置消息抵达队列的确认回调
//设置消息抵达队列的确认回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要消息没有陶笛给给指定的队列,就触发这个失败回调
* @Author pms
* @param message 投递失败的消息详细信息
* @param i 回复的状态码
* @param s 回复的文本内容
* @param s1 当时这个消息发给哪个交换机
* @param s2 当时这个消息用那个路由键
* @Return void
* @Date 2021/7/19
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("message:"+message+"==>replyCode:"+i+"==>replyText:"+s+"==>exchange:"+s1+"==>routingKey:"+s2);
}
});
消费端确认机制
消费端是默认自动确认的,只要消息接收到,客户端就会自动确认消息,RabbitMQ就会移除这个消息
1,修改配置,手动签收
spring.jms.listener.acknowledge-mode=manual
2,消息监听处签收或拒签
@Service
@RabbitListener(queues={"hello-java-queue"})//标注这个类可以监听这个队列中的所有消息
public class RabbitImpl {
//我们收到很多消息,自动回复给服务器ack,只有一个消息处理了,然后服务器宕机了,发生消息丢失
//这时我们就需要手动确认模式
@RabbitHandler
public void recieveMessage(Message message, Channel channel){
long deliveryTag=message.getMessageProperties().getDeliveryTag();
try {
//deliveryTag签收的消息标签
//multiple 是否批量签收
channel.basicAck(deliveryTag,false);//签收
//deliveryTag签收的消息标签
//multiple 是否批量签收
//是否从新入队 (将这条拒收的消息又重新存放带队列中)
channel.basicNack(deliveryTag,false,false);//拒收-支持批量
//deliveryTag签收的消息标签
//multiple 是否批量签收
channel.basicReject(deliveryTag,false);//拒收-不支持批量
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("接收到消息=>"+message);
}
}