消息发送到被消费的流程:
JAVA的生产端的发送数据—–>Broker(消息服务器)——–>达到Exchange交换机————->通过路由键到达Queue<——JAVA消费端监听并消费
这里P->B、E->Q属于生产端确认,Q->C是属于消费端确认,
生产端消息确认机制
1.yml配置
rabbitmq:
host: xxxxxxxx
virtual-host: /
port: xxx
#开启发送端确认p->b
publisher-confirms: true
#开启消息抵达队列的确认 e->q
publisher-returns: true
#只要消息到达队列,以异步的方式优先回调我们这个returnconfirm
template:
mandatory: true
2.生产端消息确认机制自定义配置代码
下面这些配置代码属于回调!
/**
* @description: Rabbit核心配置
* @author TAO
* @date 2020/8/4 20:48
*/
@Configuration
public class MyRabbitConfig {
//讲对象序列化为JSON
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@Autowired
RabbitTemplate rabbitTemplate;
//定制RabbitTemplate
@PostConstruct//MyRabbitConfig对象创建完成后执行这个初始化方法
public void initRabbitTemplate(){
//设置发送消息确认回调p->b
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
/**
* @param correlationData 当前消息的唯一关联数据 ,这个是消息的唯一id
* @param ack 消息是否成功收到
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println(“confirm…correlationData=>”+correlationData+”———-ack==>”+ack+”——–cause ==>”+cause);
}
});
//设置消息抵达队列的失败回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要消息没有投递给指定的队列,就会触发这个失败回调
* @param message 投递失败的消息
* @param replyCode 回复的状态码
* @param replyText 回复的文本内容
* @param exchange 消息发送的交换机
* @param routingKey 消息走的路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(“returnedMessage–>”+message+”\nreplyCode–>”+replyCode+”\nexchange–>”+exchange+”\nroutingKey–>”+routingKey);
}
});
}
}
3.编写发送消息请求
方便测试
@RestController
public class SendMQController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping(“/send”)
public String send(){
//new CorrelationData(UUID.randomUUID().toString())指定消息的唯一id
//项目实际情况还会将消息的唯一id存入数据库中,用作后期队列中的消息消费情况做对比
String id=UUID.randomUUID().toString();
System.out.println(“消息发送中…消息唯一id”+id);
rabbitTemplate.convertAndSend(“hello-java-exchange”, “hello.jasva”, “我是一条消息”,new CorrelationData(id));
return “ok”;
}
}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
4.查看效果
消费端确认机制
消费端是默认自动确认的,只要消息接收到,客户端就会自动确认消息,RabbitMQ就会移除这个消息
1.编写yml配置文件
rabbitmq:
host: xxx
virtual-host: /
port: xx
#开启发送端确认p->b
publisher-confirms: true
#开启消息抵达队列的确认 e->q
publisher-returns: true
#只要消息到达队列,以异步的方式优先回调我们这个returnconfirm
template:
mandatory: true
#手动签收消息
listener:
simple:
acknowledge-mode: manual
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
消息监听处代码编写
@Service
@RabbitListener(queues={“hello-java-queue”})//标注这个类可以监听这个队列中的所有消息
public class RabbitImpl {
//我们收到很多消息,自动回复给服务器ack,只有一个消息处理了,然后服务器宕机了,发生消息丢失
//这时我们就需要手动确认模式
@RabbitHandler
public void recieveMessage(Message message, Channel channel){
long deliveryTag=message.getMessageProperties().getDeliveryTag();
try {
//deliveryTag签收的消息标签
//multiple 是否批量签收
channel.basicAck(deliveryTag,false);//签收
//deliveryTag签收的消息标签
//multiple 是否批量签收
//是否从新入队 (将这条拒收的消息又重新存放带队列中)
channel.basicNack(deliveryTag,false,false);//拒收-支持批量
//deliveryTag签收的消息标签
//multiple 是否批量签收
channel.basicReject(deliveryTag,false);//拒收-不支持批量
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(“接收到消息=>”+message);
}
}
消费者获取到消息,成功处理,可 回复Ack给Broker
-basic.ack用于肯定确认;broker将移除此消息
-basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量
-basic.reject用于否定确认;同上,但不能批量
默认自动ack,消息被消费者收到,就会从broker的queue中移除”queue无消费者,消息依然会被存储,直到消费者消费,消费者收到消息,默认会自动ack,但是如果无法确定此消息是否被处理完成,或者成功处理。我们可以开启手动ack模式
-消息处理成功, ack(),接受下一个消息,此消息broker就会移除
-消息处理失败, nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
-消息一直没有调用ack/nack方法, broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人