九色国产,午夜在线视频,新黄色网址,九九色综合,天天做夜夜做久久做狠狠,天天躁夜夜躁狠狠躁2021a,久久不卡一区二区三区

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
RabbitMQ、RocketMQ、Kafka延遲隊(duì)列實(shí)現(xiàn)

延遲隊(duì)列在實(shí)際項(xiàng)目中有非常多的應(yīng)用場景,最常見的比如訂單未支付,超時(shí)取消訂單,在創(chuàng)建訂單的時(shí)候發(fā)送一條延遲消息,達(dá)到延遲時(shí)間之后消費(fèi)者收到消息,如果訂單沒有支付的話,那么就取消訂單。

那么,今天我們需要來談的問題就是RabbitMQ、RocketMQ、Kafka中分別是怎么實(shí)現(xiàn)延時(shí)隊(duì)列的,以及他們對(duì)應(yīng)的實(shí)現(xiàn)原理是什么?

RabbitMQ

RabbitMQ本身并不存在延遲隊(duì)列的概念,在 RabbitMQ 中是通過 DLX 死信交換機(jī)和 TTL 消息過期來實(shí)現(xiàn)延遲隊(duì)列的。

TTL(Time to Live)過期時(shí)間

有兩種方式可以設(shè)置 TTL。

  1. 1. 通過隊(duì)列屬性設(shè)置,這樣的話隊(duì)列中的所有消息都會(huì)擁有相同的過期時(shí)間

  2. 2. 對(duì)消息單獨(dú)設(shè)置過期時(shí)間,這樣每條消息的過期時(shí)間都可以不同

那么如果同時(shí)設(shè)置呢?這樣將會(huì)以兩個(gè)時(shí)間中較小的值為準(zhǔn)。

針對(duì)隊(duì)列的方式通過參數(shù)x-message-ttl來設(shè)置。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl"6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

針對(duì)消息的方式通過setExpiration來設(shè)置。

AMQP.BasicProperties properties = new AMQP.BasicProperties();
Properties.setDeliveryMode(2);
properties.setExpiration("60000");
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "message".getBytes());

DLX(Dead Letter Exchange)死信交換機(jī)

一個(gè)消息要成為死信消息有 3 種情況:

  1. 1. 消息被拒絕,比如調(diào)用reject方法,并且需要設(shè)置requeuefalse

  2. 2. 消息過期

  3. 3. 隊(duì)列達(dá)到最大長度

可以通過參數(shù)dead-letter-exchange設(shè)置死信交換機(jī),也可以通過參數(shù)dead-letter- exchange指定 RoutingKey(未指定則使用原隊(duì)列的 RoutingKey)。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange""exchange.dlx");
args.put("x-dead-letter-routing-key""routingkey");
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

原理

當(dāng)我們對(duì)消息設(shè)置了 TTL 和 DLX 之后,當(dāng)消息正常發(fā)送,通過 Exchange 到達(dá) Queue 之后,由于設(shè)置了 TTL 過期時(shí)間,并且消息沒有被消費(fèi)(訂閱的是死信隊(duì)列),達(dá)到過期時(shí)間之后,消息就轉(zhuǎn)移到與之綁定的 DLX 死信隊(duì)列之中。

這樣的話,就相當(dāng)于通過 DLX 和 TTL 間接實(shí)現(xiàn)了延遲消息的功能,實(shí)際使用中我們可以根據(jù)不同的延遲級(jí)別綁定設(shè)置不同延遲時(shí)間的隊(duì)列來達(dá)到實(shí)現(xiàn)不同延遲時(shí)間的效果。

RocketMQ

RocketMQ 和 RabbitMQ 不同,它本身就有延遲隊(duì)列的功能,但是開源版本只能支持固定延遲時(shí)間的消息,不支持任意時(shí)間精度的消息(這個(gè)好像只有阿里云版本的可以)。

他的默認(rèn)時(shí)間間隔分為 18 個(gè)級(jí)別,基本上也能滿足大部分場景的需要了。

默認(rèn)延遲級(jí)別:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h。

使用起來也非常的簡單,直接通過setDelayTimeLevel設(shè)置延遲級(jí)別即可。

setDelayTimeLevel(level)

原理

實(shí)現(xiàn)原理說起來比較簡單,Broker 會(huì)根據(jù)不同的延遲級(jí)別創(chuàng)建出多個(gè)不同級(jí)別的隊(duì)列,當(dāng)我們發(fā)送延遲消息的時(shí)候,根據(jù)不同的延遲級(jí)別發(fā)送到不同的隊(duì)列中,同時(shí)在 Broker 內(nèi)部通過一個(gè)定時(shí)器去輪詢這些隊(duì)列(RocketMQ 會(huì)為每個(gè)延遲級(jí)別分別創(chuàng)建一個(gè)定時(shí)任務(wù)),如果消息達(dá)到發(fā)送時(shí)間,那么就直接把消息發(fā)送到指 topic 隊(duì)列中。

RocketMQ 這種實(shí)現(xiàn)方式是放在服務(wù)端去做的,同時(shí)有個(gè)好處就是相同延遲時(shí)間的消息是可以保證有序性的。

談到這里就順便提一下關(guān)于消息消費(fèi)重試的原理,這個(gè)本質(zhì)上來說其實(shí)是一樣的,對(duì)于消費(fèi)失敗需要重試的消息實(shí)際上都會(huì)被丟到延遲隊(duì)列的 topic 里,到期后再轉(zhuǎn)發(fā)到真正的 topic 中。

Kafka

對(duì)于 Kafka 來說,原生并不支持延遲隊(duì)列的功能,需要我們手動(dòng)去實(shí)現(xiàn),這里我根據(jù) RocketMQ 的設(shè)計(jì)提供一個(gè)實(shí)現(xiàn)思路。

這個(gè)設(shè)計(jì),我們也不支持任意時(shí)間精度的延遲消息,只支持固定級(jí)別的延遲,因?yàn)閷?duì)于大部分延遲消息的場景來說足夠使用了。

只創(chuàng)建一個(gè) topic,但是針對(duì)該 topic 創(chuàng)建 18 個(gè) partition,每個(gè) partition 對(duì)應(yīng)不同的延遲級(jí)別,這樣做和 RocketMQ 一樣有個(gè)好處就是能達(dá)到相同延遲時(shí)間的消息達(dá)到有序性。

原理

  • · 首先創(chuàng)建一個(gè)單獨(dú)針對(duì)延遲隊(duì)列的 topic,同時(shí)創(chuàng)建 18 個(gè) partition 針對(duì)不同的延遲級(jí)別

  • · 發(fā)送消息的時(shí)候根據(jù)延遲參數(shù)發(fā)送到延遲 topic 對(duì)應(yīng)的 partition,對(duì)應(yīng)的key為延遲時(shí)間,同時(shí)把原 topic 保存到 header 中

ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>("delay_topic", delayPartition, delayTime, data);
producerRecord.headers().add("origin_topic", topic.getBytes(StandardCharsets.UTF_8));
  • · 內(nèi)嵌的consumer單獨(dú)設(shè)置一個(gè)ConsumerGroup去消費(fèi)延遲 topic 消息,消費(fèi)到消息之后如果沒有達(dá)到延遲時(shí)間那么就進(jìn)行pause,然后seek到當(dāng)前ConsumerRecordoffset位置,同時(shí)使用定時(shí)器去輪詢延遲的TopicPartition,達(dá)到延遲時(shí)間之后進(jìn)行resume

  • · 如果達(dá)到了延遲時(shí)間,那么就獲取到header中的真實(shí) topic ,直接轉(zhuǎn)發(fā)

這里為什么要進(jìn)行pauseresume呢?因?yàn)槿绻贿@樣的話,如果超時(shí)未消費(fèi)達(dá)到max.poll.interval.ms 最大時(shí)間(默認(rèn)300s),那么將會(huì)觸發(fā) Rebalance。

本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
干貨:RabbitMQ核心概念及工作原理
RabbitMQ 實(shí)現(xiàn)延時(shí)隊(duì)列
RabbitMQ如何實(shí)現(xiàn)延遲隊(duì)列
面試官:說說RabbitMQ 消費(fèi)端限流、TTL、死信隊(duì)列
10分鐘搞懂!消息隊(duì)列選型全方位對(duì)比
RabbitMQ 入門教程(四)
更多類似文章 >>
生活服務(wù)
熱點(diǎn)新聞
分享 收藏 導(dǎo)長圖 關(guān)注 下載文章
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服