RocketMQ如何保证消息不丢失
RocketMQ的消息想要确保不丢失,需要生产者、消费者以及Broker的共同努力,缺一不可。
首先在生产者端,消息的发送分为同步、异步两种和单向发送(单向发送不保证成功,不建议使用),在同步发送消息的情况下,消息的发送会同步阻塞等待 Broker 返回结果,在 Broker 确认收到消息之后,生产者才会拿到 SendResult。如果这个过程中发生异常,那么就说明消息发送可能失败了,就需要生产者进行重新发送消息。
try { |
异步发送的时候,会有成功和失败的回调,这还是需要在失败回调中处理重试确保成功
// 异步发送消息, 发送结果通过callback返回给客户端。 |
但是 Broker 其实并不会立即把消息存储到磁盘上,而是先存储到内存中,内存存储成功之后,就返回给确认结果给生产者了。然后再通过异步刷盘的方式将内存中的数据存储到磁盘上。但是这个过程中,如果机器挂了,那么就可能会导致数据丢失。
如果想要保证消息不丢失,可以将消息保存机制修改为同步刷盘,这样,Broker 会在同步请求中把数据保存在磁盘上,确保保存成功后再返回确认结果给生产者。
## 默认情况为 ASYNC_FLUSH |
除了同步发送消息,还有异步发送,异步发送的话就需要生产者重写 SendCallback 的 onSuccess 和 onException 方法,用于给 Broker 进行回调。在方法中实现消息的确认或者重新发送。
为了保证消息不丢失,RocketMQ 肯定要通过集群方式进行部署,Broker 通常采用一主多从部署方式,并且采用主从同步的方式做数据复制。
当主 Broker 宕机时,从 Broker 会接管主 Broker 的工作,保证消息不丢失。同时,RocketMQ 的 Broker 还可以配置多个实例,消息会在多个 Broker 之间进行冗余备份,从而保证数据的可靠性。
默认方式下,Broker 在接收消息后,写入 master 成功,就可以返回确认响应给生产者了,接着消息将会异步复制到 slave 节点。但是如果这个过程中,Master 的磁盘损坏了。那就会导致数据丢失了。
如果想要解决这个问题,可以配置同步复制的方式,即 Master 在将数据同步到 Slave 节点后,再返回给生产者确认结果。
## 默认为 ASYNC_MASTER |
在消费者端,需要确保在消息拉取并消费成功之后再给 Broker 返回 ACK,就可以保证消息不丢失了,如果这个过程中 Broker 一直没收到 ACK,那么就可以重试。
所以,在消费者的代码中,一定要在业务逻辑的最后一步 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 当然,也可以先把数据保存在数据库中,就返回,然后自己再慢慢处理。
