目錄
本篇文章所涉及到的demo練習 使用的
cloud 2021。0。3+ springboot2。6。8
1。1。 cloud Stream是什麼
官方定義:Spring Cloud Stream是一個用於構建 與
共享訊息系統
連線的高度可擴充套件的
事件驅動微服務
。
目前主流的訊息框架有:
ActiveMQ
RabbitMQ
RocketMQ
Kafka
假設公司業務專案用了RabbitMQ,而大資料專案用了Kafka。這時候就會出現有兩個訊息框架,相對於程式設計師來說其實並不友好,還得兩個都掌握,正常對於一個程式設計師來說熟練一個訊息框架都不錯了,何況還搞了兩個,並且兩個維護起來也不好維護。
RabbitMQ和Kafka是兩個不同的框架,兩個訊息模型上也存在著差異,並且程式碼上用法也不一樣。Spring Cloud Stream就是不再關注具體MQ的細節,可以在不改程式碼的基礎上,來完成Rabbit和Kafka兩個不同的訊息中介軟體的切換(這裡的切換指的是原本用的RabbitMQ,但是用著用著發現kafka比較符合,所以想要換框架)。
總結成一句話:遮蔽底層訊息中介軟體的差異,降低切換成本,統一訊息的程式設計模型
注意:遺憾的是目前僅支援
RabbitMQ、Kafka
。
1。2。 設計思想
常規的MQ設計如下:
Message:生產者/消費者之間靠訊息媒介傳遞資訊內容
MessageChannel:訊息必須走特定的通道
佇列:假如發訊息會先發到訊息隊列當中
訊息佇列的訊息如何被消費呢:訂閱的人可以進行消費
cloud Stream設計如下:
透過定義繫結器Binder作為中間層,實現了應用程式與訊息中介軟體細節之間的隔離。
在沒有繫結器這個概念的情況下,我們的SpringBoot應用要直接與訊息中介軟體進行資訊互動的時候,由於各訊息中介軟體構建的初衷不同,它們的實現細節上會有較大的差異性,透過定義繫結器作為中間層,完美地實現了應用程式與訊息中介軟體細節之間的隔離。Stream對訊息中介軟體的進一步封裝,可以做到程式碼層面對中介軟體的無感知,甚至於動態的切換中介軟體(rabbitmq切換為kafka),使得微服務開發的高度解耦,服務可以關注更多自己的業務流程
注意:左圖是官網的架構圖
Binder可以生成Binding,Binding用來繫結訊息容器的生產者和消費者,它有兩種型別,INPUT和OUTPUT,INPUT對應於消費者,OUTPUT對應於生產者。
stream為了遮蔽差異,抽象出來了一個Binder層,而目前為止,只提供了兩個框架的實現,透過具體的實現來連線訊息中介軟體。
假如想要透過stream連線RabbitMQ就使用:
假如想要透過stream連線Kafka就使用:
Stream中的訊息通訊方式遵循了
釋出-訂閱模式
,Topic主題進行廣播,在RabbitMQ就是
Exchange
,在Kakfa中就是
Topic
。
1。3。 標準流程
Binder:
很方便的連線中介軟體,遮蔽差異
Channel:
通道,是佇列Queue的一種抽象,在訊息通訊系統中就是實現儲存和轉發的媒介,透過Channe對佇列進行配置
Source(源:傳送者)和Sink(水槽:接受者):
簡單的可理解為參照物件是Spring Cloud Stream自身,從Stream釋出訊息就是輸出,接受訊息就是輸入。
1。4。 註解
註解完全是基於官方給的模型而定的!透過stream使用訊息中介軟體也是非常簡單的,直接使用以下註解就可以使用。
注意:註解依然是能用的,但是官方明確表示註解已經被棄用,棄用並不是不能用,而是用了會畫橫槓不建議用。但是功能是沒有問題的,低版本的cloud是沒有被棄用的。針對於註解和函數語言程式設計兩種我都會進行使用。
題外話:學技術永遠是這樣,技術一直在不斷的更新迭代,真正學習一個技術並不是要掌握編碼使用,而是要掌握他到底是什麼,能幹什麼,要去深入理解他,對於編碼,我認為其實不是很重要。就算你今天掌握了官方最新用法,回頭人家又改寫法了。
二、基於註解程式碼練習
生產者就是訊息傳送者,消費者就是訊息接受者。這裡我就不用kafka了,我直接用的是RabbitMQ。
2。1。 訊息驅動之生產者
1。建立專案(可以是聚合可以是普通springboot專案)2。新增pom
因為是和RabbitMQ整合,所以就是引入的spring-cloud-starter-stream-rabbit啟動器
3。新增application配置
server: port: 8801spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此處配置要繫結的rabbitmq的服務資訊; defaultRabbit: # 表示定義的名稱,用於於binding整合 type: rabbit # 訊息元件型別 environment: # 設定rabbitmq的相關的環境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務的整合處理 output: # 這個名字是一個通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設定訊息型別,本次為json,文字則設定“text/plain” binder: defaultRabbit # 設定要繫結的訊息服務的具體設定
4。新增介面
public interface IMessageProvider { public String send();}
5。新增實現類
import org。springframework。cloud。stream。annotation。EnableBinding;import org。springframework。cloud。stream。messaging。Source;import org。springframework。integration。support。MessageBuilder;import org。springframework。messaging。MessageChannel;import javax。annotation。Resource;import java。util。UUID;// 可以理解為是一個訊息的傳送管道的定義@EnableBinding(Source。class)public class MessageProviderImpl implements IMessageProvider { // 訊息的傳送管道 @Resource private MessageChannel output; @Override public String send() { String serial = UUID。randomUUID()。toString(); // 建立併發送訊息 this。output。send(MessageBuilder。withPayload(serial)。build()); System。out。println(“***serial: ” + serial); return serial; }}
6。新增controller控制器
@RestControllerpublic class SendMessageController { @Autowired private IMessageProvider iMessageProvider; @GetMapping(“send”) public String send() { return iMessageProvider。send(); }}
7。測試
(1)首先要保證RabbitMQ是可以訪問的: http://localhost:15672
(2)啟動專案訪問: http://localhost:8801/send
下圖波峰代表傳送訊息成功
啟動後會建立交換機,名稱就是application。yml當中的destination屬性設定的
注意:停止服務後並沒有刪除交換機!!!
2。2。 訊息驅動之消費者
1。建立專案
3。新增application配置
server: port: 8802spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此處配置要繫結的rabbitmq的服務資訊; defaultRabbit: # 表示定義的名稱,用於於binding整合 type: rabbit # 訊息元件型別 environment: # 設定rabbitmq的相關的環境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務的整合處理 input: # 這個名字是一個通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設定訊息型別,本次為物件json,如果是文字則設定“text/plain” binder: defaultRabbit # 設定要繫結的訊息服務的具體設定
4。新增監聽(消費者只負責接受訊息)
import org。springframework。beans。factory。annotation。Value;import org。springframework。cloud。stream。annotation。EnableBinding;import org。springframework。cloud。stream。annotation。StreamListener;import org。springframework。cloud。stream。messaging。Sink;import org。springframework。messaging。Message;import org。springframework。stereotype。Component;@Component@EnableBinding(Sink。class)public class ReceiveMessageListener { @Value(“${server。port}”) private String serverPort; @StreamListener(Sink。INPUT) public void input(Message
5。測試
(1)啟動RabbitMQ
(2)啟動傳送訊息端服務
(3)啟動消費者服務,啟動後會發現,他自動會向這個交換機當中新增一個佇列。
傳送訊息: http://localhost:8801/send
接受訊息:
注意:當停止服務後訊息佇列會被自動刪除!!!
2。3。 目前存在的問題
1。依照8802, clone出來一份執行8803,主要用來演示多個消費者的場景
2。啟動8801生產者
3。啟動8802消費者
4。啟動8803消費者
當三個服務都啟動後透過RabbitMQ介面會發現,一個交換機綁定了兩個佇列
執行後會發現存在兩個問題:
有重複消費問題
訊息持久化問題
(1)重複消費問題:
傳送訊息後兩個消費者都收到了訊息: http://localhost:8801/send
比如在如下場景中,訂單系統我們做叢集部署,都會從RabbitMQ中獲取訂單資訊,那如果一個訂單同時被兩個服務獲取到,那麼就會造成資料錯誤,我們得避免這種情況。這時我們就可以使用Stream中的訊息分組來解決
注意在Stream中處於同一個group中的多個消費者是競爭關係,就能夠保證訊息只會被其中一個應用消費一次。不同組是可以全面消費的(重複消費),同一組內會發生競爭關係,只有其中一個可以消費。
(2)訊息持久化問題:
當生產者傳送訊息的時候,消費者恰好宕機了,但是過一會消費者恢復了,但是訊息卻沒收到。那也就是意味著訊息佇列是臨時訊息佇列。針對於這一點,大家也可以測試一下,加深一下印象。
2。4。 分組解決重複消費問題
原理:微服務應用放置於同一個group中,就能夠保證訊息只會被其中一個應用消費一次。
同一個組內會發生競爭關係,只有其中一個可以消費。
接下來直接調整兩個消費者為同一個組:新增如下配置
當兩個消費者都設定好後啟動,會發現一個問題:
實際上分到一個組對於RabbitMQ來說就是兩個消費者監聽了一個佇列。
一個佇列那也就意味著,當佇列收到一條訊息,哪個消費者誰先消費就是誰的,消費完佇列裡面就沒有了,也就是隻有一個消費者能消費到訊息!
注意:假如不設定group屬性的時候,預設是啟動一個消費者,就會建立一個消費佇列,啟動多個服務就會建立多個佇列。stream預設使用的是RabbitMQ的topic交換機。當傳送者向這個交換機發送訊息的時候,兩個佇列就都會接收到。關於RabbitMQ相關知識本篇不記錄,後續會專門寫RabbitMQ相關文章。
最終測試:8802/8803實現了輪詢分組,每次只有一個消費者8801模組的發的訊息只能被8802或8803其中一個接收到,這樣避免了重複消費。
2。5。 訊息持久化
當三個專案都啟動著的時候,現在我們要做幾件事:
停止8802和8803並去除掉8802的分組
group: gxs
,8803不去分組資訊,停止掉專案的時候會發現訊息佇列並沒有刪除,說明一旦設定分組資訊,訊息佇列就不再是臨時佇列。
8801傳送4條訊息
啟動8802然後訊息並沒有列印,沒有收到訊息(注意8802是去掉分組資訊的)
再啟動8803,有分組屬性配置,後臺打出來了MQ上的訊息
原因就是:當兩個專案都停止的時候,佇列並未刪除,而8803還綁定了這個佇列,所以他就算宕機了,又重啟了,依然可以收到訊息。而8802沒有設定分組資訊,他再啟動後系統會給他建立一個臨時佇列,自然而然收不到之前的訊息了。