消息队列为啥会断线
做后台开发时,消息队列几乎是标配。RabbitMQ、Kafka、RocketMQ 用得多了,总会遇到一个让人头疼的问题——网络抖动导致连接断开。别小看这几十秒的断网,如果处理不好,消息丢了、任务卡住,半夜报警电话就来了。
其实断线原因挺常见:服务器重启、网络波动、防火墙策略调整,甚至云服务商机房维护。这时候如果程序不会自动重连,那等于把系统稳定性交给运气。
自动重连不是默认开启的
很多人以为,客户端连上消息队列之后就能一劳永逸。实际上,大多数客户端默认并不会无限重试。比如 RabbitMQ 的 Java 客户端,虽然支持自动恢复,但需要手动开启。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(10000);
Connection conn = factory.newConnection();这段代码里 setAutomaticRecoveryEnabled(true) 才是关键。不加这句,一旦断开就彻底掉线,得靠人手动重启服务。
Kafka 的消费者重连机制
Kafka 稍微聪明点,消费者本身就有重平衡和重试机制。但前提是你的消费逻辑不能阻塞太久,否则会被踢出消费者组。
常见陷阱是:消费消息时调了某个慢接口,超时时间又没控制好,触发了 session.timeout.ms,结果被误判为宕机。这时候即使网络恢复了,也得重新加入组,中间可能重复消费。
props.put("session.timeout.ms", "30000");
props.put("max.poll.interval.ms", "600000");
props.put("enable.auto.commit", "false");特别是 max.poll.interval.ms,如果你处理一条消息可能花十几秒,这个值就得相应调大,不然自动重连根本没机会发挥作用。
自己写个简单的重连逻辑
有些轻量级场景用不上完整客户端,或者你用的是私有协议。这时候可以自己实现一个带指数退避的重连机制。
while (!connected) {
try {
connection = createMQConnection();
connected = true;
} catch (Exception e) {
Thread.sleep(retryInterval);
retryInterval = Math.min(retryInterval * 2, 30000);
}
}第一次失败等 1 秒,第二次 2 秒,再往后 4、8、16,最多等 30 秒。这样既不会疯狂刷日志,又能快速响应恢复。
别忘了监听连接状态
光重连还不够,你还得知道什么时候断了、什么时候恢复了。很多客户端提供回调接口,比如 RabbitMQ 的 ShutdownListener。
connection.addShutdownListener(cause -> {
log.warn("连接关闭,原因:{}", cause.getReason());
});加上日志记录,出问题时翻一下就知道是不是频繁断线,方便后续优化。
实际项目中见过有人用定时任务每隔 5 秒发一次心跳消息来判断队列是否可用。虽然土,但在没有完善监控的情况下也能应急。
测试你的重连机制
写了代码不测等于白搭。最简单的办法就是运行程序后,手动关掉 MQ 服务,等半分钟再开。看应用能不能继续收消息,有没有丢数据。
更狠一点,在 Docker 里跑 MQ,直接 docker stop 模拟宕机。这种真实断线场景下,才能看出重连逻辑靠不靠谱。
别等到线上出事才想起来验证,那时候损失已经造成了。