首頁/ 汽車/ 正文

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

整體閱讀時間,在 40 分鐘左右。

常見的訊息佇列很多,主要包括 RabbitMQ、Kafka、RocketMQ 和 ActiveMQ,相關的選型可以看我之前的系列,

這篇文章只講 RabbitMQ,先講原理,後搞實戰。

文章很長,如果你能一次性看完,“大哥,請收下我的膝蓋”,建議大家先收藏,啥時需要面試,或者工作中遇到了,可以再慢慢看。

不 BB,直接上思維導圖:

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

1。 訊息佇列

1。1 訊息佇列模式

訊息佇列目前主要 2 種模式,分別為“點對點模式”和“釋出/訂閱模式”。

1。1。1 點對點模式

一個具體的訊息只能由一個消費者消費,多個生產者可以向同一個訊息佇列傳送訊息,但是一個訊息在被一個訊息者處理的時候,這個訊息在佇列上會被鎖住或者被移除並且其他消費者無法處理該訊息。

需要額外注意的是,如果消費者處理一個訊息失敗了,訊息系統一般會把這個訊息放回佇列,這樣其他消費者可以繼續處理。

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

1。1。2 釋出/訂閱模式

單個訊息可以被多個訂閱者併發的獲取和處理。一般來說,訂閱有兩種型別:

臨時(ephemeral)訂閱

:這種訂閱只有在消費者啟動並且執行的時候才存在。一旦消費者退出,相應的訂閱以及尚未處理的訊息就會丟失。

持久(durable)訂閱

:這種訂閱會一直存在,除非主動去刪除。消費者退出後,訊息系統會繼續維護該訂閱,並且後續訊息可以被繼續處理。

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

1。2 衡量標準

對訊息佇列進行技術選型時,需要透過以下指標衡量你所選擇的訊息佇列,是否可以滿足你的需求:

訊息順序

:傳送到佇列的訊息,消費時是否可以保證消費的順序,比如A先下單,B後下單,應該是A先去扣庫存,B再去扣,順序不能反。

訊息路由

:根據路由規則,只訂閱匹配路由規則的訊息,比如有A/B兩者規則的訊息,消費者可以只訂閱A訊息,B訊息不會消費。

訊息可靠性:是否會存在丟訊息的情況,比如有A/B兩個訊息,最後只有B訊息能消費,A訊息丟失。

訊息時序

:主要包括“訊息存活時間”和“延遲/預定的訊息”,“訊息存活時間”表示生產者可以對訊息設定TTL,如果超過該TTL,訊息會自動消失;“延遲/預定的訊息”指的是可以延遲或者預訂消費訊息,比如延時5分鐘,那麼訊息會5分鐘後才能讓消費者消費,時間未到的話,是不能消費的。

訊息留存

:訊息消費成功後,是否還會繼續保留在訊息佇列。

容錯性

:當一條訊息消費失敗後,是否有一些機制,保證這條訊息是一種能成功,比如非同步第三方退款訊息,需要保證這條訊息消費掉,才能確定給使用者退款成功,所以必須保證這條訊息消費成功的準確性。

伸縮

:當訊息佇列效能有問題,比如消費太慢,是否可以快速支援庫容;當消費佇列過多,浪費系統資源,是否可以支援縮容。

吞吐量

:支援的最高併發數。

2。 RabbitMQ 原理初探

RabbitMQ 2007 年釋出,是使用 Erlang 語言開發的開源訊息佇列系統,基於 AMQP 協議來實現。

2。1 基本概念

提到RabbitMQ,就不得不提AMQP協議。AMQP協議是具有現代特徵的二進位制協議。是一個提供統一訊息服務的應用層標準高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。

先了解一下AMQP協議中間的幾個重要概念:

Server:接收客戶端的連線,實現AMQP實體服務。

Connection:連線,應用程式與Server的網路連線,TCP連線。

Channel:通道,訊息讀寫等操作在通道中進行。客戶端可以建立多個通道,每個通道代表一個會話任務。

Message:訊息,應用程式和伺服器之間傳送的資料,訊息可以非常簡單,也可以很複雜。由Properties和Body組成。Properties為外包裝,可以對訊息進行修飾,比如訊息的優先順序、延遲等高階特性;Body就是訊息體內容。

Virtual Host:虛擬主機,用於邏輯隔離。一個虛擬主機裡面可以有若干個Exchange和Queue,同一個虛擬主機裡面不能有相同名稱的Exchange或Queue。

Exchange:交換器,接收訊息,按照路由規則將訊息路由到一個或者多個佇列。如果路由不到,或者返回給生產者,或者直接丟棄。RabbitMQ常用的交換器常用型別有direct、topic、fanout、headers四種,後面詳細介紹。

Binding:繫結,交換器和訊息佇列之間的虛擬連線,繫結中可以包含一個或者多個RoutingKey。

RoutingKey:路由鍵,生產者將訊息傳送給交換器的時候,會發送一個RoutingKey,用來指定路由規則,這樣交換器就知道把訊息傳送到哪個佇列。路由鍵通常為一個“。”分割的字串,例如“com。rabbitmq”。

Queue:訊息佇列,用來儲存訊息,供消費者消費。

2。2 工作原理

AMQP 協議模型由三部分組成:生產者、消費者和服務端,執行流程如下:

生產者是連線到 Server,建立一個連線,開啟一個通道。

生產者宣告交換器和佇列,設定相關屬性,並透過路由鍵將交換器和佇列進行繫結。

消費者也需要進行建立連線,開啟通道等操作,便於接收訊息。

生產者傳送訊息,傳送到服務端中的虛擬主機。

虛擬主機中的交換器根據路由鍵選擇路由規則,傳送到不同的訊息佇列中。

訂閱了訊息佇列的消費者就可以獲取到訊息,進行消費。

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

2。3 常用交換器

RabbitMQ常用的交換器型別有direct、topic、fanout、headers四種:

Direct Exchange:見文知意,直連交換機意思是此交換機需要繫結一個佇列,要求該訊息與一個特定的路由鍵完全匹配。簡單點說就是一對一的,點對點的傳送。

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

Fanout Exchange:這種型別的交換機需要將佇列繫結到交換機上。一個傳送到交換機的訊息都會被轉發到與該交換機繫結的所有佇列上。很像子網廣播,每臺子網內的主機都獲得了一份複製的訊息。簡單點說就是釋出訂閱。

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

Topic Exchange:直接翻譯的話叫做主題交換機,如果從用法上面翻譯可能叫萬用字元交換機會更加貼切。這種交換機是使用萬用字元去匹配,路由到對應的佇列。萬用字元有兩種:“*” 、 “#”。需要注意的是萬用字元前面必須要加上“。”符號。

*符號:有且只匹配一個詞。比如 a。*可以匹配到“a。b”、“a。c”,但是匹配不了“a。b。c”。

#符號:匹配一個或多個詞。比如“rabbit。#”既可以匹配到“rabbit。a。b”、“rabbit。a”,也可以匹配到“rabbit。a。b。c”。

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

Headers Exchange:這種交換機用的相對沒這麼多。它跟上面三種有點區別,它的路由不是用routingKey進行路由匹配,而是在匹配請求頭中所帶的鍵值進行路由。建立佇列需要設定繫結的頭部資訊,有兩種模式:全部匹配和部分匹配。如上圖所示,交換機會根據生產者傳送過來的頭部資訊攜帶的鍵值去匹配佇列繫結的鍵值,路由到對應的佇列。

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

2。4 消費原理

我們先看幾個基本概念:

broker:每個節點執行的服務程式,功能為維護該節點的佇列的增刪以及轉發佇列操作請求。

master queue:每個佇列都分為一個主佇列和若干個映象佇列。

mirror queue:映象佇列,作為master queue的備份。在master queue所在節點掛掉之後,系統把mirror queue提升為master queue,負責處理客戶端佇列操作請求。注意,mirror queue只做映象,設計目的不是為了承擔客戶端讀寫壓力。

叢集中有兩個節點,每個節點上有一個broker,每個broker負責本機上佇列的維護,並且borker之間可以互相通訊。叢集中有兩個佇列A和B,每個佇列都分為master queue和mirror queue(備份)。那麼佇列上的生產消費怎麼實現的呢?

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

對於消費佇列,如下圖有兩個consumer消費佇列A,這兩個consumer連在了叢集的不同機器上。RabbitMQ叢集中的任何一個節點都擁有叢集上所有佇列的元資訊,所以連線到叢集中的任何一個節點都可以,主要區別在於有的consumer連在master queue所在節點,有的連在非master queue節點上。

因為mirror queue要和master queue保持一致,故需要同步機制,正因為一致性的限制,導致所有的讀寫操作都必須都操作在master queue上(想想,為啥讀也要從master queue中讀?和資料庫讀寫分離是不一樣的),然後由master節點同步操作到mirror queue所在的節點。即使consumer連線到了非master queue節點,該consumer的操作也會被路由到master queue所在的節點上,這樣才能進行消費。

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

對於生成佇列,原理和消費一樣,如果連線到非 master queue 節點,則路由過去。

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

所以,到這裡小夥伴們就可以看到 RabbitMQ的不足:由於master queue單節點,導致效能瓶頸,吞吐量受限。雖然為了提高效能,內部使用了Erlang這個語言實現,但是終究擺脫不了架構設計上的致命缺陷。

2。5 高階特性

2。5。1 過期時間

Time To Live,也就是生存時間,是一條訊息在佇列中的最大存活時間,單位是毫秒,下面看看RabbitMQ過期時間特性:

RabbitMQ可以對訊息和佇列設定TTL。

RabbitMQ支援設定訊息的過期時間,在訊息傳送的時候可以進行指定,每條訊息的過期時間可以不同。

RabbitMQ支援設定佇列的過期時間,從訊息入佇列開始計算,直到超過了佇列的超時時間配置,那麼訊息會變成死信,自動清除。

如果兩種方式一起使用,則過期時間以兩者中較小的那個數值為準。

當然也可以不設定TTL,不設定表示訊息不會過期;如果設定為0,則表示除非此時可以直接將訊息投遞到消費者,否則該訊息將被立即丟棄。

2。5。2 訊息確認

為了保證訊息從佇列可靠地到達消費者,RabbitMQ提供了訊息確認機制。

消費者訂閱佇列的時候,可以指定autoAck引數,當autoAck為true的時候,RabbitMQ採用自動確認模式,RabbitMQ自動把傳送出去的訊息設定為確認,然後從記憶體或者硬碟中刪除,而不管消費者是否真正消費到了這些訊息。

當autoAck為false的時候,RabbitMQ會等待消費者回復的確認訊號,收到確認訊號之後才從記憶體或者磁碟中刪除訊息。

訊息確認機制是RabbitMQ訊息可靠性投遞的基礎,只要設定autoAck引數為false,消費者就有足夠的時間處理訊息,不用擔心處理訊息的過程中消費者程序掛掉後訊息丟失的問題。

2。5。3 持久化

訊息的可靠性是RabbitMQ的一大特色,那麼RabbitMQ是如何保證訊息可靠性的呢?答案就是訊息持久化。持久化可以防止在異常情況下丟失資料。RabbitMQ的持久化分為三個部分:交換器持久化、佇列持久化和訊息的持久化。

交換器持久化可以透過在宣告佇列時將durable引數設定為true。如果交換器不設定持久化,那麼在RabbitMQ服務重啟之後,相關的交換器元資料會丟失,不過訊息不會丟失,只是不能將訊息傳送到這個交換器了。

佇列的持久化能保證其本身的元資料不會因異常情況而丟失,但是不能保證內部所儲存的訊息不會丟失。要確保訊息不會丟失,需要將其設定為持久化。佇列的持久化可以透過在宣告佇列時將durable引數設定為true。

設定了佇列和訊息的持久化,當RabbitMQ服務重啟之後,訊息依然存在。如果只設置佇列持久化或者訊息持久化,重啟之後訊息都會消失。

當然,也可以將所有的訊息都設定為持久化,但是這樣做會影響RabbitMQ的效能,因為磁碟的寫入速度比記憶體的寫入要慢得多。

對於可靠性不是那麼高的訊息可以不採用持久化處理以提高整體的吞吐量。魚和熊掌不可兼得,關鍵在於選擇和取捨。在實際中,需要根據實際情況在可靠性和吞吐量之間做一個權衡。

2。5。4 死信佇列

當訊息在一個佇列中變成死信之後,他能被重新發送到另一個交換器中,這個交換器成為死信交換器,與該交換器繫結的佇列稱為死信佇列。

訊息變成死信有下面幾種情況:

訊息被拒絕。

訊息過期

佇列達到最大長度

DLX也是一個正常的交換器,和一般的交換器沒有區別,他能在任何的佇列上面被指定,實際上就是設定某個佇列的屬性。當這個佇列中有死信的時候,RabbitMQ會自動將這個訊息重新發送到設定的交換器上,進而被路由到另一個佇列,我們可以監聽這個佇列中訊息做相應的處理。

死信佇列有什麼用?當發生異常的時候,訊息不能夠被消費者正常消費,被加入到了死信佇列中。後續的程式可以根據死信佇列中的內容分析當時發生的異常,進而改善和最佳化系統。

2。5。5 延遲佇列

一般的佇列,訊息一旦進入佇列就會被消費者立即消費。延遲佇列就是進入該佇列的訊息會被消費者延遲消費,延遲佇列中儲存的物件是的延遲訊息,“延遲訊息”是指當訊息被髮送以後,等待特定的時間後,消費者才能拿到這個訊息進行消費。

延遲佇列用於需要延遲工作的場景。最常見的使用場景:淘寶或者天貓我們都使用過,使用者在下單之後通常有30分鐘的時間進行支付,如果這30分鐘之內沒有支付成功,那麼訂單就會自動取消。

除了延遲消費,延遲佇列的典型應用場景還有延遲重試。比如消費者從佇列裡面消費訊息失敗了,可以延遲一段時間以後進行重試。

2。6 特性分析

這裡才是內容的重點,不僅需要知道Rabbit的特性,還需要知道支援這些特性的原因:

訊息路由(支援)

:RabbitMQ可以透過不同的交換器支援不同種類的訊息路由;

訊息有序(不支援)

:當消費訊息時,如果消費失敗,訊息會被放回佇列,然後重新消費,這樣會導致訊息無序;

訊息時序(非常好)

:透過延時佇列,可以指定訊息的延時時間,過期時間TTL等;

容錯處理(非常好)

:透過交付重試和死信交換器(DLX)來處理訊息處理故障;

伸縮(一般)

:伸縮其實沒有非常智慧,因為即使伸縮了,master queue還是隻有一個,負載還是隻有這一個master queue去抗,所以我理解RabbitMQ的伸縮很弱(個人理解)。

持久化(不太好)

:沒有消費的訊息,可以支援持久化,這個是為了保證機器宕機時訊息可以恢復,但是消費過的訊息,就會被馬上刪除,因為RabbitMQ設計時,就不是為了去儲存歷史資料的。

訊息回溯(不支援)

:因為訊息不支援永久儲存,所以自然就不支援回溯。

高吞吐(中等)

:因為所有的請求的執行,最後都是在master queue,它的這個設計,導致單機效能達不到十萬級的標準。

3。 RabbitMQ環境搭建

因為我用的是Mac,所以直接可以參考官網:

https://www。rabbitmq。com/install-homebrew。html

需要注意的是,一定需要先執行:

brew update

然後再執行:

brew install rabbitmq

之前沒有執行brew update,直接執行brew install rabbitmq時,會報各種各樣奇怪的錯誤,其中“403 Forbidde”居多。

但是在執行“brew install rabbitmq”,會自動安裝其它的程式,如果你使用原始碼安裝Rabbitmq,因為啟動該服務依賴erlang環境,所以你還需手動安裝erlang,但是目前官方已經一鍵給你搞定,會自動安裝Rabbitmq依賴的所有程式,是不是很棒!

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

最後執行成功的輸出如下:

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

啟動服務:

# 啟動方式1:後臺啟動brew services start rabbitmq# 啟動方式2:當前視窗啟動cd /usr/local/Cellar/rabbitmq/3。8。19rabbitmq-server

在瀏覽器輸入:

http://localhost:15672/

會出現RabbitMQ後臺管理介面(使用者名稱和密碼都為guest):

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

透過brew安裝,一行命令搞定,真香!

4。 RabbitMQ測試

4。1 新增賬號

首先得啟動mq

## 新增賬號。/rabbitmqctl add_user admin admin## 新增訪問許可權。/rabbitmqctl set_permissions -p “/” admin “。*” “。*” “。*”## 設定超級許可權。/rabbitmqctl set_user_tags admin administrator

4。2 編碼實測

因為程式碼中引入了java 8的特性,pom引入依賴:

com。rabbitmq amqp-client 5。5。1 org。apache。maven。plugins maven-compiler-plugin 8 8

開始寫程式碼:

public class RabbitMqTest { //訊息佇列名稱 private final static String QUEUE_NAME = “hello”; @Test public void send() throws java。io。IOException, TimeoutException { //建立連線工程 ConnectionFactory factory = new ConnectionFactory(); factory。setHost(“127。0。0。1”); factory。setPort(5672); factory。setUsername(“admin”); factory。setPassword(“admin”); //建立連線 Connection connection = factory。newConnection(); //建立訊息通道 Channel channel = connection。createChannel(); //生成一個訊息佇列 channel。queueDeclare(QUEUE_NAME, true, false, false, null); for (int i = 0; i < 10; i++) { String message = “Hello World RabbitMQ count: ” + i; //釋出訊息,第一個引數表示路由(Exchange名稱),為“”則表示使用預設訊息路由 channel。basicPublish(“”, QUEUE_NAME, null, message。getBytes()); System。out。println(“ [x] Sent ‘” + message + “’”); } //關閉訊息通道和連線 channel。close(); connection。close(); } @Test public void consumer() throws java。io。IOException, TimeoutException { //建立連線工廠 ConnectionFactory factory = new ConnectionFactory(); factory。setHost(“127。0。0。1”); factory。setPort(5672); factory。setUsername(“admin”); factory。setPassword(“admin”); //建立連線 Connection connection = factory。newConnection(); //建立訊息通道 final Channel channel = connection。createChannel(); //訊息佇列 channel。queueDeclare(QUEUE_NAME, true, false, false, null); System。out。println(“[*] Waiting for message。 To exist press CTRL+C”); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery。getBody(), “UTF-8”); System。out。println(“ [x] Received ‘” + message + “’”); }; channel。basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {}); }}

執行send()後控制檯輸出:

[x] Sent ‘Hello World RabbitMQ count: 0’[x] Sent ‘Hello World RabbitMQ count: 1’[x] Sent ‘Hello World RabbitMQ count: 2’[x] Sent ‘Hello World RabbitMQ count: 3’[x] Sent ‘Hello World RabbitMQ count: 4’[x] Sent ‘Hello World RabbitMQ count: 5’[x] Sent ‘Hello World RabbitMQ count: 6’[x] Sent ‘Hello World RabbitMQ count: 7’[x] Sent ‘Hello World RabbitMQ count: 8’[x] Sent ‘Hello World RabbitMQ count: 9’

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

執行consumer()後:

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

示例中的程式碼講解,可以直接參考官網:https://www。rabbitmq。com/tutorials/tutorial-one-java。html

5。 基本使用姿勢

5。1 公共程式碼封裝

封裝工廠類:

public class RabbitUtil { public static ConnectionFactory getConnectionFactory() { //建立連線工程,下面給出的是預設的case ConnectionFactory factory = new ConnectionFactory(); factory。setHost(“127。0。0。1”); factory。setPort(5672); factory。setUsername(“admin”); factory。setPassword(“admin”); factory。setVirtualHost(“/”); return factory; }}

封裝生成者:

public class MsgProducer { public static void publishMsg(String exchange, BuiltinExchangeType exchangeType, String toutingKey, String message) throws IOException, TimeoutException { ConnectionFactory factory = RabbitUtil。getConnectionFactory(); //建立連線 Connection connection = factory。newConnection(); //建立訊息通道 Channel channel = connection。createChannel(); // 宣告exchange中的訊息為可持久化,不自動刪除 channel。exchangeDeclare(exchange, exchangeType, true, false, null); // 釋出訊息 channel。basicPublish(exchange, toutingKey, null, message。getBytes()); System。out。println(“Sent ‘” + message + “’”); channel。close(); connection。close(); }}

封裝消費者:

public class MsgConsumer { public static void consumerMsg(String exchange, String queue, String routingKey) throws IOException, TimeoutException { ConnectionFactory factory = RabbitUtil。getConnectionFactory(); //建立連線 Connection connection = factory。newConnection(); //建立訊息通道 final Channel channel = connection。createChannel(); //訊息佇列 channel。queueDeclare(queue, true, false, false, null); //繫結佇列到交換機 channel。queueBind(queue, exchange, routingKey); System。out。println(“[*] Waiting for message。 To exist press CTRL+C”); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP。BasicProperties properties, byte[] body) throws IOException { String message = new String(body, “UTF-8”); try { System。out。println(“ [x] Received ‘” + message); } finally { System。out。println(“ [x] Done”); channel。basicAck(envelope。getDeliveryTag(), false); } } }; // 取消自動ack channel。basicConsume(queue, false, consumer); }}

5。2 Direct方式

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

5。2。1 Direct示例

生產者:

public class DirectProducer { private static final String EXCHANGE_NAME = “direct。exchange”; public void publishMsg(String routingKey, String msg) { try { MsgProducer。publishMsg(EXCHANGE_NAME, BuiltinExchangeType。DIRECT, routingKey, msg); } catch (Exception e) { e。printStackTrace(); } } public static void main(String[] args) throws InterruptedException { DirectProducer directProducer = new DirectProducer(); String[] routingKey = new String[]{“aaa”, “bbb”, “ccc”}; String msg = “hello >>> ”; for (int i = 0; i < 10; i++) { directProducer。publishMsg(routingKey[i % 3], msg + i); } System。out。println(“——over————-”); Thread。sleep(1000 * 60 * 100); }}

執行生產者,往訊息佇列中放入10條訊息,其中key分別為“aaa”、“bbb”和“ccc”,分別放入qa、qb、qc三個佇列:

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

下面是qa佇列的資訊:

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

消費者:

public class DirectConsumer { private static final String exchangeName = “direct。exchange”; public void msgConsumer(String queueName, String routingKey) { try { MsgConsumer。consumerMsg(exchangeName, queueName, routingKey); } catch (IOException e) { e。printStackTrace(); } catch (TimeoutException e) { e。printStackTrace(); } } public static void main(String[] args) throws InterruptedException { DirectConsumer consumer = new DirectConsumer(); String[] routingKey = new String[]{“aaa”, “bbb”, “ccc”}; String[] queueNames = new String[]{“qa”, “qb”, “qc”}; for (int i = 0; i < 3; i++) { consumer。msgConsumer(queueNames[i], routingKey[i]); } Thread。sleep(1000 * 60 * 100); }}

執行後的輸出:

[*] Waiting for message。 To exist press CTRL+C [x] Received ’hello >>> 0 [x] Done [x] Received ‘hello >>> 3 [x] Done [x] Received ’hello >>> 6 [x] Done [x] Received ‘hello >>> 9 [x] Done[*] Waiting for message。 To exist press CTRL+C [x] Received ’hello >>> 1 [x] Done [x] Received ‘hello >>> 4 [x] Done [x] Received ’hello >>> 7 [x] Done[*] Waiting for message。 To exist press CTRL+C [x] Received ‘hello >>> 2 [x] Done [x] Received ’hello >>> 5 [x] Done [x] Received ‘hello >>> 8 [x] Done

可以看到,分別從qa、qb、qc中將不同的key的資料消費掉。

5。2。2 問題探討

有個疑問:這個佇列的名稱qa、qb和qc是RabbitMQ自動生成的麼,我們可以指定佇列名稱麼?

我做了個簡單的實驗,我把消費者程式碼修改了一下:

public static void main(String[] args) throws InterruptedException { DirectConsumer consumer = new DirectConsumer(); String[] routingKey = new String[]{“aaa”, “bbb”, “ccc”}; String[] queueNames = new String[]{“qa”, “qb”, “qc1”}; // 將qc修改為qc1 for (int i = 0; i < 3; i++) { consumer。msgConsumer(queueNames[i], routingKey[i]); } Thread。sleep(1000 * 60 * 100);}

執行後如下圖所示:

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

我們可以發現,多了一個qc1,所以可以判斷這個介面中的queues,是消費者執行時,會將消費者指定的佇列名稱和direct。exchange繫結,繫結的依據就是key。

當我們把佇列中的資料全部消費掉,然後重新執行生成者後,會發現qc和qc1中都有3條待消費的資料,因為繫結的key都是“ccc”,所以兩者的資料是一樣的:

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

繫結關係如下:

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

注意:當沒有Queue繫結到Exchange時,往Exchange中寫入的訊息也不會重新分發到之後繫結的queue上。

思考:不執行消費者,看不到這個Queres中資訊,我其實可以把這個介面理解為消費者資訊介面。不過感覺還是怪怪的,這個queues如果是消費者資訊,就不應該叫queues,我理解queues應該是RabbitMQ中實際存放資料的queues,難道是我理解錯了?

5。3 Fanout方式(指定佇列)

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

生產者封裝:

public class FanoutProducer { private static final String EXCHANGE_NAME = “fanout。exchange”; public void publishMsg(String routingKey, String msg) { try { MsgProducer。publishMsg(EXCHANGE_NAME, BuiltinExchangeType。FANOUT, routingKey, msg); } catch (Exception e) { e。printStackTrace(); } } public static void main(String[] args) { FanoutProducer directProducer = new FanoutProducer(); String msg = “hello >>> ”; for (int i = 0; i < 10; i++) { directProducer。publishMsg(“”, msg + i); } }}

消費者:

public class FanoutConsumer { private static final String EXCHANGE_NAME = “fanout。exchange”; public void msgConsumer(String queueName, String routingKey) { try { MsgConsumer。consumerMsg(EXCHANGE_NAME, queueName, routingKey); } catch (IOException e) { e。printStackTrace(); } catch (TimeoutException e) { e。printStackTrace(); } } public static void main(String[] args) { FanoutConsumer consumer = new FanoutConsumer(); String[] queueNames = new String[]{“qa-2”, “qb-2”, “qc-2”}; for (int i = 0; i < 3; i++) { consumer。msgConsumer(queueNames[i], “”); } }}

執行生成者,結果如下:

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

我們發現,生產者生產的10條資料,在每個消費者中都可以消費,這個是和Direct不同的地方,但是使用Fanout方式時,有幾個點需要注意一下:

生產者的routkey可以為空,因為生產者的所有資料,會下放到每一個佇列,所以不會透過routkey去路由;

消費者需要指定queues,因為消費者需要繫結到指定的queues才能消費。

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

這幅圖就畫出了Fanout的精髓之處,exchange會和所有的queue進行繫結,不區分路由,消費者需要繫結指定的queue才能發起消費。

注意:往佇列塞資料時,可能透過介面看不到訊息個數的增加,可能是你之前已經開啟了消費程序,導致增加的訊息馬上被消費了。

5。4 Fanout方式(隨機獲取佇列)

上面我們是指定了佇列,這個方式其實很不友好,比如對於Fanout,我其實根本無需關心佇列的名字,如果還指定對應佇列進行消費,感覺這個很冗餘,所以我們這裡就採用隨機獲取佇列名字的方式,下面程式碼直接Copy官網。

生成者封裝:

public static void publishMsgV2(String exchange, BuiltinExchangeType exchangeType, String message) throws IOException, TimeoutException { ConnectionFactory factory = RabbitUtil。getConnectionFactory(); //建立連線 Connection connection = factory。newConnection(); //建立訊息通道 Channel channel = connection。createChannel(); // 宣告exchange中的訊息 channel。exchangeDeclare(exchange, exchangeType); // 釋出訊息 channel。basicPublish(exchange, “”, null, message。getBytes(“UTF-8”)); System。out。println(“Sent ’” + message + “‘”); channel。close(); connection。close();}

消費者封裝:

public static void consumerMsgV2(String exchange) throws IOException, TimeoutException { ConnectionFactory factory = RabbitUtil。getConnectionFactory(); Connection connection = factory。newConnection(); final Channel channel = connection。createChannel(); channel。exchangeDeclare(exchange, “fanout”); String queueName = channel。queueDeclare()。getQueue(); channel。queueBind(queueName, exchange, “”); System。out。println(“ [*] Waiting for messages。 To exit press CTRL+C”); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery。getBody(), “UTF-8”); System。out。println(“ [x] Received ’” + message + “‘”); }; channel。basicConsume(queueName, true, deliverCallback, consumerTag -> { });}

生產者:

public class FanoutProducer { private static final String EXCHANGE_NAME = “fanout。exchange。v2”; public void publishMsg(String msg) { try { MsgProducer。publishMsgV2(EXCHANGE_NAME, BuiltinExchangeType。FANOUT, msg); } catch (Exception e) { e。printStackTrace(); } } public static void main(String[] args) { FanoutProducer directProducer = new FanoutProducer(); String msg = “hello >>> ”; for (int i = 0; i < 10000; i++) { directProducer。publishMsg(msg + i); } }}

消費者:

public class FanoutConsumer { private static final String EXCHANGE_NAME = “fanout。exchange。v2”; public void msgConsumer() { try { MsgConsumer。consumerMsgV2(EXCHANGE_NAME); } catch (IOException e) { e。printStackTrace(); } catch (TimeoutException e) { e。printStackTrace(); } } public static void main(String[] args) { FanoutConsumer consumer = new FanoutConsumer(); for (int i = 0; i < 3; i++) { consumer。msgConsumer(); } }}

執行後,管理介面如下:

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

5。5 Topic方式

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

程式碼詳見官網:https://www。rabbitmq。com/tutorials/tutorial-five-java。html

更多方式,請直接檢視官網:https://www。rabbitmq。com/getstarted。html

新來個技術總監,把 RabbitMQ 講得那叫一個透徹,佩服

6。 RabbitMQ 進階

6。1 durable 和 autoDeleted

在定義Queue時,可以指定這兩個引數:

/** * Declare an exchange。 * @see com。rabbitmq。client。AMQP。Exchange。Declare * @see com。rabbitmq。client。AMQP。Exchange。DeclareOk * @param exchange the name of the exchange * @param type the exchange type * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) * @param autoDelete true if the server should delete the exchange when it is no longer in use * @param arguments other properties (construction arguments) for the exchange * @return a declaration-confirm method to indicate the exchange was successfully declared * @throws java。io。IOException if an error is encountered */Exchange。DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map arguments) throws IOException; /*** Declare a queue* @see com。rabbitmq。client。AMQP。Queue。Declare* @see com。rabbitmq。client。AMQP。Queue。DeclareOk* @param queue the name of the queue* @param durable true if we are declaring a durable queue (the queue will survive a server restart)* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)* @param arguments other properties (construction arguments) for the queue* @return a declaration-confirm method to indicate the queue was successfully declared* @throws java。io。IOException if an error is encountered*/Queue。DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) throws IOException;

6。1。1 durable

持久化,保證RabbitMQ在退出或者crash等異常情況下資料沒有丟失,需要將queue,exchange和Message都持久化。

若是將queue的持久化標識durable設定為true,則代表是一個持久的佇列,那麼在服務重啟之後,會重新讀取之前被持久化的queue。

雖然佇列可以被持久化,但是裡面的訊息是否為持久化,還要看訊息的持久化設定。即重啟queue,但是queue裡面還沒有發出去的訊息,那佇列裡面還存在該訊息麼?這個取決於該訊息的設定。

6。1。2 autoDeleted

自動刪除,如果該佇列沒有任何訂閱的消費者的話,該佇列會被自動刪除。這種佇列適用於臨時佇列。

當一個Queue被設定為自動刪除時,當消費者斷掉之後,queue會被刪除,這個主要針對的是一些不是特別重要的資料,不希望出現訊息積累的情況。

6。1。3 小節

當一個Queue已經宣告好了之後,不能更新durable或者autoDelted值;當需要修改時,需要先刪除再重新宣告

消費的Queue宣告應該和投遞的Queue宣告的 durable,autoDelted屬性一致,否則會報錯

對於重要的資料,一般設定 durable=true, autoDeleted=false

對於設定 autoDeleted=true 的佇列,當沒有消費者之後,佇列會自動被刪除

6。4 ACK

執行一個任務可能需要花費幾秒鐘,你可能會擔心如果一個消費者在執行任務過程中掛掉了。一旦RabbitMQ將訊息分發給了消費者,就會從記憶體中刪除。在這種情況下,如果正在執行任務的消費者宕機,會丟失正在處理的訊息和分發給這個消費者但尚未處理的訊息。

但是,我們不想丟失任何任務,如果有一個消費者掛掉了,那麼我們應該將分發給它的任務交付給另一個消費者去處理。

為了確保訊息不會丟失,RabbitMQ支援訊息應答。消費者傳送一個訊息應答,告訴RabbitMQ這個訊息已經接收並且處理完畢了。RabbitMQ就可以刪除它了。

因此手動ACK的常見手段:

// 接收訊息之後,主動ack/nakConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP。BasicProperties properties, byte[] body) throws IOException { String message = new String(body, “UTF-8”); try { System。out。println(“ [ ” + queue + “ ] Received ’” + message); channel。basicAck(envelope。getDeliveryTag(), false); } catch (Exception e) { channel。basicNack(envelope。getDeliveryTag(), false, true); } }};// 取消自動ackchannel。basicConsume(queue, false, consumer);

相關文章

頂部