延遲隊(duì)列在實(shí)際項(xiàng)目中有非常多的應(yīng)用場景,最常見的比如訂單未支付,超時(shí)取消訂單,在創(chuàng)建訂單的時(shí)候發(fā)送一條延遲消息,達(dá)到延遲時(shí)間之后消費(fèi)者收到消息,如果訂單沒有支付的話,那么就取消訂單。
那么,今天我們需要來談的問題就是RabbitMQ、RocketMQ、Kafka中分別是怎么實(shí)現(xiàn)延時(shí)隊(duì)列的,以及他們對(duì)應(yīng)的實(shí)現(xiàn)原理是什么?
RabbitMQ本身并不存在延遲隊(duì)列的概念,在 RabbitMQ 中是通過 DLX 死信交換機(jī)和 TTL 消息過期來實(shí)現(xiàn)延遲隊(duì)列的。
有兩種方式可以設(shè)置 TTL。
1. 通過隊(duì)列屬性設(shè)置,這樣的話隊(duì)列中的所有消息都會(huì)擁有相同的過期時(shí)間
2. 對(duì)消息單獨(dú)設(shè)置過期時(shí)間,這樣每條消息的過期時(shí)間都可以不同
那么如果同時(shí)設(shè)置呢?這樣將會(huì)以兩個(gè)時(shí)間中較小的值為準(zhǔn)。
針對(duì)隊(duì)列的方式通過參數(shù)x-message-ttl
來設(shè)置。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
針對(duì)消息的方式通過setExpiration
來設(shè)置。
AMQP.BasicProperties properties = new AMQP.BasicProperties();
Properties.setDeliveryMode(2);
properties.setExpiration("60000");
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "message".getBytes());
一個(gè)消息要成為死信消息有 3 種情況:
1. 消息被拒絕,比如調(diào)用reject
方法,并且需要設(shè)置requeue
為false
2. 消息過期
3. 隊(duì)列達(dá)到最大長度
可以通過參數(shù)dead-letter-exchange
設(shè)置死信交換機(jī),也可以通過參數(shù)dead-letter- exchange
指定 RoutingKey(未指定則使用原隊(duì)列的 RoutingKey)。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "exchange.dlx");
args.put("x-dead-letter-routing-key", "routingkey");
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
當(dāng)我們對(duì)消息設(shè)置了 TTL 和 DLX 之后,當(dāng)消息正常發(fā)送,通過 Exchange 到達(dá) Queue 之后,由于設(shè)置了 TTL 過期時(shí)間,并且消息沒有被消費(fèi)(訂閱的是死信隊(duì)列),達(dá)到過期時(shí)間之后,消息就轉(zhuǎn)移到與之綁定的 DLX 死信隊(duì)列之中。
這樣的話,就相當(dāng)于通過 DLX 和 TTL 間接實(shí)現(xiàn)了延遲消息的功能,實(shí)際使用中我們可以根據(jù)不同的延遲級(jí)別綁定設(shè)置不同延遲時(shí)間的隊(duì)列來達(dá)到實(shí)現(xiàn)不同延遲時(shí)間的效果。
RocketMQ 和 RabbitMQ 不同,它本身就有延遲隊(duì)列的功能,但是開源版本只能支持固定延遲時(shí)間的消息,不支持任意時(shí)間精度的消息(這個(gè)好像只有阿里云版本的可以)。
他的默認(rèn)時(shí)間間隔分為 18 個(gè)級(jí)別,基本上也能滿足大部分場景的需要了。
默認(rèn)延遲級(jí)別:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h。
使用起來也非常的簡單,直接通過setDelayTimeLevel
設(shè)置延遲級(jí)別即可。
setDelayTimeLevel(level)
實(shí)現(xiàn)原理說起來比較簡單,Broker 會(huì)根據(jù)不同的延遲級(jí)別創(chuàng)建出多個(gè)不同級(jí)別的隊(duì)列,當(dāng)我們發(fā)送延遲消息的時(shí)候,根據(jù)不同的延遲級(jí)別發(fā)送到不同的隊(duì)列中,同時(shí)在 Broker 內(nèi)部通過一個(gè)定時(shí)器去輪詢這些隊(duì)列(RocketMQ 會(huì)為每個(gè)延遲級(jí)別分別創(chuàng)建一個(gè)定時(shí)任務(wù)),如果消息達(dá)到發(fā)送時(shí)間,那么就直接把消息發(fā)送到指 topic 隊(duì)列中。
RocketMQ 這種實(shí)現(xiàn)方式是放在服務(wù)端去做的,同時(shí)有個(gè)好處就是相同延遲時(shí)間的消息是可以保證有序性的。
談到這里就順便提一下關(guān)于消息消費(fèi)重試的原理,這個(gè)本質(zhì)上來說其實(shí)是一樣的,對(duì)于消費(fèi)失敗需要重試的消息實(shí)際上都會(huì)被丟到延遲隊(duì)列的 topic 里,到期后再轉(zhuǎn)發(fā)到真正的 topic 中。
對(duì)于 Kafka 來說,原生并不支持延遲隊(duì)列的功能,需要我們手動(dòng)去實(shí)現(xiàn),這里我根據(jù) RocketMQ 的設(shè)計(jì)提供一個(gè)實(shí)現(xiàn)思路。
這個(gè)設(shè)計(jì),我們也不支持任意時(shí)間精度的延遲消息,只支持固定級(jí)別的延遲,因?yàn)閷?duì)于大部分延遲消息的場景來說足夠使用了。
只創(chuàng)建一個(gè) topic,但是針對(duì)該 topic 創(chuàng)建 18 個(gè) partition,每個(gè) partition 對(duì)應(yīng)不同的延遲級(jí)別,這樣做和 RocketMQ 一樣有個(gè)好處就是能達(dá)到相同延遲時(shí)間的消息達(dá)到有序性。
· 首先創(chuàng)建一個(gè)單獨(dú)針對(duì)延遲隊(duì)列的 topic,同時(shí)創(chuàng)建 18 個(gè) partition 針對(duì)不同的延遲級(jí)別
· 發(fā)送消息的時(shí)候根據(jù)延遲參數(shù)發(fā)送到延遲 topic 對(duì)應(yīng)的 partition,對(duì)應(yīng)的key
為延遲時(shí)間,同時(shí)把原 topic 保存到 header 中
ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>("delay_topic", delayPartition, delayTime, data);
producerRecord.headers().add("origin_topic", topic.getBytes(StandardCharsets.UTF_8));
· 內(nèi)嵌的consumer
單獨(dú)設(shè)置一個(gè)ConsumerGroup
去消費(fèi)延遲 topic 消息,消費(fèi)到消息之后如果沒有達(dá)到延遲時(shí)間那么就進(jìn)行pause
,然后seek
到當(dāng)前ConsumerRecord
的offset
位置,同時(shí)使用定時(shí)器去輪詢延遲的TopicPartition
,達(dá)到延遲時(shí)間之后進(jìn)行resume
· 如果達(dá)到了延遲時(shí)間,那么就獲取到header
中的真實(shí) topic ,直接轉(zhuǎn)發(fā)
這里為什么要進(jìn)行pause
和resume
呢?因?yàn)槿绻贿@樣的話,如果超時(shí)未消費(fèi)達(dá)到max.poll.interval.ms
最大時(shí)間(默認(rèn)300s),那么將會(huì)觸發(fā) Rebalance。
聯(lián)系客服