MQ的作用解耦、異步、削峰填谷。
未使用MQ的情況
mysql并發(fā)寫大部分情況下維持在600-800之間,并發(fā)讀1200-1500之間,所以消費端在消費消息的時候需控制在并發(fā)小于1000,從而達到限流的效果。
使用MQ的情況
mq做個緩沖,消息放到磁盤,幾個G或上t都可以存儲,消息丟失的可能性比較小。
可用性降低
多了MQ,對外部依賴增加,但通過try-catch兜底,mq消息發(fā)送失敗,則插入數(shù)據(jù)庫。
提高復(fù)雜度
需要搭建高可用的Kafka集群或RocketMQ集群。
消息重復(fù)
通過消費端的冪等性實現(xiàn)。
中間生產(chǎn)消息的時候,有可能會發(fā)生網(wǎng)絡(luò)的波動,業(yè)務(wù)程序認為消息沒有發(fā)送成功,其實消息已經(jīng)寫入了一條,應(yīng)用端超時timeout,此時會進行消息的重發(fā)即2個id為1的都會寫入到mq中,后端應(yīng)用消費的時候,就會消費到2條消息。
消息順序
比如下單、支付、發(fā)送物流通知,這3個業(yè)務(wù)消息并發(fā)的產(chǎn)生且后端多線程消費的情況下,需要考慮消息消費順序的問題。
解決的方式是單個消費者、單個生產(chǎn)者、單個隊列可以保證消息有序的消費。
一個主題,多個隊列的情況下需要通過負載均衡的方式路由到不同的隊列中來。
有多個消費者不能確保消息消費的順序。
一致性問題
A、B、C三個系統(tǒng),A和B兩個寫入數(shù)據(jù)庫成功了,C系統(tǒng)寫庫失敗,這種情況可以用分布式事務(wù)解決,可以使用RocketMQ提供的分布式事務(wù)或阿里開源的Seta。
RabbitMQ
支持并發(fā)1.2W。
RabbitMQ集群很弱,主要確保高可用,不能拓展性能。
想性能更高,得搭建多主多從,比如3主3從、4主4從,第一個可以確保高可用,第二個可以提高整個的性能,但RabbitMQ集群不可以這樣拓展性能。
Kafka
支持并發(fā)100W。
RocketMQ
支持并發(fā)10W。
Kafka、RocketMQ天生支持分布式,支持動態(tài)擴展、動態(tài)擴縮容。
RocketMQ相對來說功能也比較豐富,支持死信消息、延遲(基于死信消息可以實現(xiàn)延遲消息)消息、消息的回溯、消息的過濾。
Kafka不支持死信消息。
消息端消費成功,發(fā)起ACK確認,作為RabbitMQ來說,可以直接把這個消息刪掉。
Kafka或RocketMQ會記錄消息者的偏移量,保證下一次消費的時候不會消費同一條消息。
死信消息
如果消費很多次還沒有成功,比如10次、20次都不能消費成功,mq中的這個消息就不能被確認 ,這個時候就引入了死信消息,進入一個單獨的死信隊列進行保存,后續(xù)進行手工處理或額外處理,比如用消息補償機制,實在消費不了的則異步通知生產(chǎn)者。
RabbitMQ在ack確認很多次都沒有成功返回的時候,則會設(shè)置一個標識,就會認為這個消息是死信消息,就會把這個消息寫入DCL隊列中。
RocketMQ也有這樣的死信消息,如果消息重試的次數(shù)超過16次,作為RocketMQ也會把這個消息寫入專門的死信隊列中去。
補償機制要根據(jù)業(yè)務(wù)來,比如微信沖電話費,在微信應(yīng)用里面,通過異步的方式來通知成功或失敗,如果說失敗了,失敗的補償機制就是退費;如果這條消息反正也消費不了,不知道出于什么原因,也有可能加入了失信名單 或超過了消費的額度,這個時候就消費不了,多次嘗試之后,在微信的后端就認為是死信消息,而退費就是一種補償機制。
延遲消息
一般情況下,消息只要發(fā)到mq,消費者就會里立馬消費掉,但是有的業(yè)務(wù)場景需要在這個消息上加一個延遲的時間,比如延遲10分鐘再被消費。
應(yīng)用場景比如買電影票-線上電影票的購票流程:
1、選座位,對這個座位進行鎖定,防止再被其他人鎖定
2、必須在10分鐘之內(nèi)支付
異常情況:選了座位,不支付。
對于后端系統(tǒng)來說,只要鎖定過期且沒有支付,就需要把座位釋放掉。
這種情況可以采用定時任務(wù)來處理,不斷的去輪循數(shù)據(jù)庫,但會出現(xiàn)新的問題,1要查詢數(shù)據(jù)庫,2每個人選定的時間不一樣,若定時10分鐘跑一次,就會出現(xiàn)釋放座位不及時的情況,若定時1秒跑一次,系統(tǒng)性能開銷比較大。
最優(yōu)的方案是采用延時消息,每一次選座位的時候,就寫一個延時10分鐘的消息,在消費的時候,必須等10分鐘之后,消費者再處理,不需要輪詢數(shù)據(jù)庫。
不同MQ為什么性能差別這么大?
主要依賴于Rabbitmq、Kafka持久化的底層機制:將消息寫入磁盤的零拷貝技術(shù)。
Netty、Nginx都有用到該技術(shù)。
零拷貝包括MMAP的零拷貝、Sendfile的零拷貝。
作為消費者要拉取消息進行消費,站在IO的角度去看,為了確保消息的高可用,往往把消息放到磁盤,一旦數(shù)據(jù)沒有寫入磁盤就會有丟失數(shù)據(jù)的可能性,所以消息會先寫入磁盤。
把數(shù)據(jù)從磁盤讀出,再通過網(wǎng)絡(luò)發(fā)送給消費者。
應(yīng)用發(fā)送數(shù)據(jù)要先發(fā)送給操作系統(tǒng)的網(wǎng)卡,最終通過網(wǎng)卡發(fā)送數(shù)據(jù)給消費者。
站在磁盤的角度來看,數(shù)據(jù)首先要經(jīng)過第一個拷貝,這里叫DMA拷貝到文件讀取緩沖區(qū),偽代碼為buffer=file.read ,寫完之后,發(fā)給消費者,創(chuàng)建一個socket即建立一個TCP網(wǎng)絡(luò)通信,通過socket調(diào)用send方法,把讀到的buffer進行發(fā)送。
站在io的角度來看,經(jīng)過了幾次拷貝?
第一次:數(shù)據(jù)從磁盤拷貝到內(nèi)核的文件讀取緩沖區(qū),這個過程稱為DMA拷貝,
然后數(shù)據(jù)經(jīng)過第二次拷貝:CPU拷貝,拷貝的數(shù)據(jù)放入應(yīng)用緩沖區(qū)即就是剛才定義的buffer字節(jié)流。
應(yīng)用程序并不能直接操作網(wǎng)卡,底層調(diào)用socket,通過socket調(diào)用操作系統(tǒng)的網(wǎng)卡,但是操作系統(tǒng)網(wǎng)卡會有一個問題 :不能直接讀到應(yīng)用的內(nèi)存,所以又需要經(jīng)過一次CPU拷貝到套接字發(fā)送緩沖區(qū),最后再經(jīng)過一次DMA拷貝(直接內(nèi)存讀取 Direct Memory Access)。
內(nèi)核或操作系統(tǒng)的驅(qū)動允許不同速度的硬件進行溝通的時候才會有DMA拷貝。
如果沒有DMA,就需要通過CPU的大量中斷來進行負載。
什么叫中斷?
在計算機里面,啟動一個線程,讓CPU來跑,CPU在跑的時候,你給我發(fā)了一個消息,我的電腦怎么知道我的網(wǎng)卡里面進來一條消息呢?這個就需要網(wǎng)卡在硬件級別叫下CPU:cpu等一等,現(xiàn)在我要打斷你一下。
如果通過CPU負載的話,效率很低,因為CPU干很多事情,CPU做大量中斷負載的話,比如200M的數(shù)據(jù),如果通過CPU拷貝,大概需要200ms,而通過DMA拷貝,速度只需要2毫秒。
計算機里面,越底層的東西就越快,通過CPU拷貝到話,效率往往很低,因為這個時候還需要向CPU請求負載, 這里會涉及到很多的中斷負載的切換。
在不考慮MQ應(yīng)用程序運轉(zhuǎn)多少時間的情況下,傳統(tǒng)的拷貝大概需要404毫秒。
RocketMQ MMAP零拷貝技術(shù)
在RocketMQ中采用一種MMAP的零拷貝技術(shù),本身是做內(nèi)存映射,當內(nèi)存的應(yīng)用緩沖區(qū)調(diào)用操作系統(tǒng)的mmap函數(shù),可以做一個內(nèi)存映射。
拿到能夠操作文件的通道到一個高級類FileChannel,這個高級類實際上是對文件進行操作。
底層會調(diào)用操作系統(tǒng)的mmap函數(shù)來完成映射,映射的意思是內(nèi)存即磁盤,磁盤即內(nèi)存,如果完成映射之后,這個文件和內(nèi)存的這個buffer(ByteBuffer)就一致了。
mmap是內(nèi)存文件通過FileChannel調(diào)用map方法間接調(diào)用的,設(shè)置讀寫模式,文件映射到底可以讀還是可以寫,內(nèi)存映射的位置即從哪里開始,0表示從頭開始,內(nèi)存映射大小為1024即這個文件可以映射1kb左右,拿到這個buffer之后,就可以進行寫入,這個ByteBuffer和Hashmap是一樣的方式,直接put把字符串轉(zhuǎn)換成byte數(shù)組進行寫入,寫入完之后,再去調(diào)用flip方法進行刷盤,這個數(shù)據(jù)就可以同步到磁盤了,當然刷完盤之后,還可以拿出來,通過mmap.get把里面的前5個數(shù)據(jù)讀取出來,讀取之后還可以打印,
文件中這么多NULL,剛好長度是1024。
通過mmap創(chuàng)建的,因為它進行內(nèi)存映射,所以這個文件必須要有空格,通過NULL值進行表示,讀的時候,通過偏移量+長度,指定了5個長度,就可以讀取到lijin這個字符串數(shù)據(jù)。
傳統(tǒng)的方式
Server端(服務(wù)端)啟動,模擬一個消費者即專門啟動一個Server Socket監(jiān)聽,接受到數(shù)據(jù),把數(shù)據(jù)讀出來就可以了。
這個是傳統(tǒng)的客戶端讀一個文件發(fā)送到網(wǎng)絡(luò)的過程,這段代碼跟
這個的業(yè)務(wù)場景是一樣的。
創(chuàng)建一個socket,因為要發(fā)給對應(yīng)的消費者,先建立一個網(wǎng)絡(luò)連接。
inpuStream.read()會進行2次拷貝,一個是DMA拷貝,一次是CPU拷貝。
而這種方式只是一次拷貝,因為是內(nèi)存映射。
map方法在系統(tǒng)啟動的時候就被調(diào)用了。
傳統(tǒng)的方式,每次都要new一個FileInputStream,這里涉及到了2次拷貝(每一次讀取出來,讀到buffer中,涉及到2次拷貝:一次DMA拷貝、一次CPU拷貝),耗時202毫秒,因為要發(fā)送網(wǎng)絡(luò),通過連接本機的8081端口,發(fā)送給它,還要創(chuàng)建一個對應(yīng)的輸出流拿取結(jié)果。
傳統(tǒng)的方式本質(zhì)上和文件讀取是一樣的,這是通過流的方式讀取,while true不斷的讀并且累加,讀完之后,拿到了buffer,再寫網(wǎng)絡(luò),網(wǎng)絡(luò)就通過socket創(chuàng)建的getOutputStream(文件的輸出流、socket的輸出流)轉(zhuǎn)到DataOutputStream。
創(chuàng)建的socket就是一個連接,應(yīng)用要跟消費者建立一個TCP的連接,這個TCP的連接在底層表示都是socket,不單單只是數(shù)據(jù)連接,還包含了數(shù)據(jù)通道,這里new一個socket就相當于跟另外一個消費者8081這樣的socket通道建立了鏈接,通過socket通道里面的dataOutputStream.write方法輸出數(shù)據(jù),這里又會涉及到一次DMA拷貝,一次CPU拷貝。
首先做一次CPU拷貝,相當于把buffer的數(shù)據(jù)首先要發(fā)到套接字緩沖區(qū)(socket里面的緩沖區(qū)),這個socket要通過網(wǎng)卡發(fā)給消費者最終要把應(yīng)用內(nèi)存發(fā)送給網(wǎng)卡里面的內(nèi)存,網(wǎng)卡是一個外設(shè),網(wǎng)卡通過一個USB都可以去接,所以就需要做一個DMA拷貝。
這種方式共有4次拷貝,耗時為422毫秒,這是RabbitMQ的情況,而RocketMQ的mmap發(fā)送只有204毫秒,DMA拷貝速度一般是CPU的百倍。
Kafka的sendfile零拷貝技術(shù)
Kafka不會涉及到cpu拷貝,只是進行文件描述符的傳遞,這點消耗的時間可以忽略。
文件描述符類似一個指針,在linux上面所有東西都是文件描述符。
把數(shù)據(jù)放到文件數(shù)據(jù)讀取緩沖區(qū),這里就會有一個文件描述符,類似于網(wǎng)盤的地址,比如百度云網(wǎng)盤的分享鏈接,而真實的數(shù)據(jù)放百度網(wǎng)盤,這種開銷可以忽略,既然數(shù)據(jù)已經(jīng)放到了文件緩沖區(qū),只要拿到文件緩沖區(qū)的指針,指針在應(yīng)用程序里面內(nèi)存的大小就可以忽略不計了。
在現(xiàn)代新的操作系統(tǒng)里面,既然都屬于內(nèi)核操作系統(tǒng)的進程,文件讀取緩沖區(qū)的內(nèi)存和套接字的內(nèi)存可以共享。
文件描述符(offset=1024,size=9721823),比如要讀取的文件,偏移量是1024,讀取9721823這個大小的數(shù)據(jù)。
把文件描述符傳給應(yīng)用,這個速度和時間可以忽略不計,調(diào)用socket,相當于告訴socket你要去文件讀取緩沖區(qū)內(nèi)存找我要發(fā)送的數(shù)據(jù),因為我已經(jīng)告訴你偏移量和大小了。
通過sendfile的方式,只剩下2次DMA拷貝了,數(shù)據(jù)的傳輸基本上在內(nèi)核就完成了。
第一步new出一個SocketChannel,使用8081的服務(wù)器地址,SocketChannel是套接字發(fā)送緩沖區(qū)的一個通道,F(xiàn)ileChannel是針對磁盤文件的通道,2個通道通過transferTo進行共享,
共享的位置是從0開始,長度是文件大小,這里沒有使用多文件,只讀了一個文件。
fileName可以通過FileChannel傳過去,傳過去就已經(jīng)網(wǎng)絡(luò)傳輸了,只要調(diào)用transferTo方法就會完成網(wǎng)絡(luò)發(fā)送,這種方式的耗時只需要16毫秒。
傳統(tǒng)的傳輸要轉(zhuǎn)換成InputStream、FileInputStream、DataOutputStream,所以開銷會大些,同樣的一張圖片,轉(zhuǎn)換出來的字節(jié)流會多一點。
通過NIO轉(zhuǎn)換transferTo直接就這么轉(zhuǎn)了,如果把中間的CPU拷貝的時間忽略,相當于2+2+12,傳輸文件描述符的話,還是會占據(jù)一點點時間。
不同的序列化方式即轉(zhuǎn)換的流不一樣,傳輸?shù)淖止?jié)數(shù)大小也不一樣。
為什么Kafka不用mmap
既然sendfile零拷貝技術(shù)效率更高,RocketMQ早期版本也是基于Kakfa java版本重寫改進的,那RocketMQ為什么不用sendfile技術(shù)?
因為它們的設(shè)計理念不一樣。
作為文件描述符等同于網(wǎng)盤地址。
RocketMQ有很多功能的延伸點是不一樣的,比如延遲消息、死信消息需要數(shù)據(jù)流轉(zhuǎn)到MQ應(yīng)用。
RocketMQ要支持延遲消息,數(shù)據(jù)最好要進入應(yīng)用,不能單純拿一個文件描述符做延遲消息,這也是為什么Kafka沒有延遲消息的原因。
數(shù)據(jù)是通過這樣的方式發(fā)送的,數(shù)據(jù)不會直接經(jīng)過Kafka。Kafka的設(shè)計比較簡單,沒有延遲消息、死信消息等。
比如1萬條消息中有一個消息發(fā)送不成功,這種情況一定要放到mq的應(yīng)用內(nèi)存才能處理,
而通過sendfile方式,很多的消息數(shù)據(jù)都是文件讀取緩沖區(qū)的文件描述符。
類似網(wǎng)盤資料中的數(shù)據(jù)很多,是一個代碼壓縮包,單獨把其中的一段代碼拿出來是非常麻煩的。
Kafka做死信消息,要寫一個定時器,不斷的輪詢,如果消息失敗了,把這個消息寫入到Kafka的一個文件或一個隊列中,可以這樣變相的實現(xiàn),但自身原生是不支持死信消息的。
聯(lián)系客服