首頁/ 汽車/ 正文

Spring Cloud Stream詳解

目錄

本篇文章所涉及到的demo練習 使用的

cloud 2021。0。3+ springboot2。6。8

1。1。 cloud Stream是什麼

官方定義:Spring Cloud Stream是一個用於構建 與

共享訊息系統

連線的高度可擴充套件的

事件驅動微服務

目前主流的訊息框架有:

ActiveMQ

RabbitMQ

RocketMQ

Kafka

假設公司業務專案用了RabbitMQ,而大資料專案用了Kafka。這時候就會出現有兩個訊息框架,相對於程式設計師來說其實並不友好,還得兩個都掌握,正常對於一個程式設計師來說熟練一個訊息框架都不錯了,何況還搞了兩個,並且兩個維護起來也不好維護。

RabbitMQ和Kafka是兩個不同的框架,兩個訊息模型上也存在著差異,並且程式碼上用法也不一樣。Spring Cloud Stream就是不再關注具體MQ的細節,可以在不改程式碼的基礎上,來完成Rabbit和Kafka兩個不同的訊息中介軟體的切換(這裡的切換指的是原本用的RabbitMQ,但是用著用著發現kafka比較符合,所以想要換框架)。

總結成一句話:遮蔽底層訊息中介軟體的差異,降低切換成本,統一訊息的程式設計模型

注意:遺憾的是目前僅支援

RabbitMQ、Kafka

1。2。 設計思想

常規的MQ設計如下:

Spring Cloud Stream詳解

Message:生產者/消費者之間靠訊息媒介傳遞資訊內容

MessageChannel:訊息必須走特定的通道

佇列:假如發訊息會先發到訊息隊列當中

訊息佇列的訊息如何被消費呢:訂閱的人可以進行消費

cloud Stream設計如下:

透過定義繫結器Binder作為中間層,實現了應用程式與訊息中介軟體細節之間的隔離。

在沒有繫結器這個概念的情況下,我們的SpringBoot應用要直接與訊息中介軟體進行資訊互動的時候,由於各訊息中介軟體構建的初衷不同,它們的實現細節上會有較大的差異性,透過定義繫結器作為中間層,完美地實現了應用程式與訊息中介軟體細節之間的隔離。Stream對訊息中介軟體的進一步封裝,可以做到程式碼層面對中介軟體的無感知,甚至於動態的切換中介軟體(rabbitmq切換為kafka),使得微服務開發的高度解耦,服務可以關注更多自己的業務流程

注意:左圖是官網的架構圖

Spring Cloud Stream詳解

Binder可以生成Binding,Binding用來繫結訊息容器的生產者和消費者,它有兩種型別,INPUT和OUTPUT,INPUT對應於消費者,OUTPUT對應於生產者。

Spring Cloud Stream詳解

stream為了遮蔽差異,抽象出來了一個Binder層,而目前為止,只提供了兩個框架的實現,透過具體的實現來連線訊息中介軟體。

假如想要透過stream連線RabbitMQ就使用:

org。springframework。cloud spring-cloud-starter-stream-rabbit

假如想要透過stream連線Kafka就使用:

org。springframework。cloud spring-cloud-starter-stream-kafka

Stream中的訊息通訊方式遵循了

釋出-訂閱模式

,Topic主題進行廣播,在RabbitMQ就是

Exchange

,在Kakfa中就是

Topic

1。3。 標準流程

Spring Cloud Stream詳解

Binder:

很方便的連線中介軟體,遮蔽差異

Channel:

通道,是佇列Queue的一種抽象,在訊息通訊系統中就是實現儲存和轉發的媒介,透過Channe對佇列進行配置

Source(源:傳送者)和Sink(水槽:接受者):

簡單的可理解為參照物件是Spring Cloud Stream自身,從Stream釋出訊息就是輸出,接受訊息就是輸入。

1。4。 註解

Spring Cloud Stream詳解

註解完全是基於官方給的模型而定的!透過stream使用訊息中介軟體也是非常簡單的,直接使用以下註解就可以使用。

Spring Cloud Stream詳解

注意:註解依然是能用的,但是官方明確表示註解已經被棄用,棄用並不是不能用,而是用了會畫橫槓不建議用。但是功能是沒有問題的,低版本的cloud是沒有被棄用的。針對於註解和函數語言程式設計兩種我都會進行使用。

題外話:學技術永遠是這樣,技術一直在不斷的更新迭代,真正學習一個技術並不是要掌握編碼使用,而是要掌握他到底是什麼,能幹什麼,要去深入理解他,對於編碼,我認為其實不是很重要。就算你今天掌握了官方最新用法,回頭人家又改寫法了。

Spring Cloud Stream詳解

Spring Cloud Stream詳解

二、基於註解程式碼練習

生產者就是訊息傳送者,消費者就是訊息接受者。這裡我就不用kafka了,我直接用的是RabbitMQ。

2。1。 訊息驅動之生產者

1。建立專案(可以是聚合可以是普通springboot專案)2。新增pom

因為是和RabbitMQ整合,所以就是引入的spring-cloud-starter-stream-rabbit啟動器

8 8 UTF-8 2。6。8 2021。0。3 org。springframework。boot spring-boot-dependencies ${springboot。version} pom import org。springframework。cloud spring-cloud-dependencies ${springcloud。version} pom import org。springframework。boot spring-boot-starter-web org。springframework。cloud spring-cloud-starter-stream-rabbit

3。新增application配置

server: port: 8801spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此處配置要繫結的rabbitmq的服務資訊; defaultRabbit: # 表示定義的名稱,用於於binding整合 type: rabbit # 訊息元件型別 environment: # 設定rabbitmq的相關的環境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務的整合處理 output: # 這個名字是一個通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設定訊息型別,本次為json,文字則設定“text/plain” binder: defaultRabbit # 設定要繫結的訊息服務的具體設定

4。新增介面

public interface IMessageProvider { public String send();}

5。新增實現類

import org。springframework。cloud。stream。annotation。EnableBinding;import org。springframework。cloud。stream。messaging。Source;import org。springframework。integration。support。MessageBuilder;import org。springframework。messaging。MessageChannel;import javax。annotation。Resource;import java。util。UUID;// 可以理解為是一個訊息的傳送管道的定義@EnableBinding(Source。class)public class MessageProviderImpl implements IMessageProvider { // 訊息的傳送管道 @Resource private MessageChannel output; @Override public String send() { String serial = UUID。randomUUID()。toString(); // 建立併發送訊息 this。output。send(MessageBuilder。withPayload(serial)。build()); System。out。println(“***serial: ” + serial); return serial; }}

6。新增controller控制器

@RestControllerpublic class SendMessageController { @Autowired private IMessageProvider iMessageProvider; @GetMapping(“send”) public String send() { return iMessageProvider。send(); }}

7。測試

(1)首先要保證RabbitMQ是可以訪問的: http://localhost:15672

Spring Cloud Stream詳解

(2)啟動專案訪問: http://localhost:8801/send

下圖波峰代表傳送訊息成功

Spring Cloud Stream詳解

啟動後會建立交換機,名稱就是application。yml當中的destination屬性設定的

Spring Cloud Stream詳解

注意:停止服務後並沒有刪除交換機!!!

2。2。 訊息驅動之消費者

1。建立專案

3。新增application配置

server: port: 8802spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此處配置要繫結的rabbitmq的服務資訊; defaultRabbit: # 表示定義的名稱,用於於binding整合 type: rabbit # 訊息元件型別 environment: # 設定rabbitmq的相關的環境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服務的整合處理 input: # 這個名字是一個通道的名稱 destination: studyExchange # 表示要使用的Exchange名稱定義 content-type: application/json # 設定訊息型別,本次為物件json,如果是文字則設定“text/plain” binder: defaultRabbit # 設定要繫結的訊息服務的具體設定

4。新增監聽(消費者只負責接受訊息)

import org。springframework。beans。factory。annotation。Value;import org。springframework。cloud。stream。annotation。EnableBinding;import org。springframework。cloud。stream。annotation。StreamListener;import org。springframework。cloud。stream。messaging。Sink;import org。springframework。messaging。Message;import org。springframework。stereotype。Component;@Component@EnableBinding(Sink。class)public class ReceiveMessageListener { @Value(“${server。port}”) private String serverPort; @StreamListener(Sink。INPUT) public void input(Message message) { System。out。println(“消費者1號,————->接收到的訊息:” + message。getPayload() + “\t port: ” + serverPort); }}

5。測試

(1)啟動RabbitMQ

(2)啟動傳送訊息端服務

(3)啟動消費者服務,啟動後會發現,他自動會向這個交換機當中新增一個佇列。

Spring Cloud Stream詳解

傳送訊息: http://localhost:8801/send

接受訊息:

Spring Cloud Stream詳解

Spring Cloud Stream詳解

注意:當停止服務後訊息佇列會被自動刪除!!!

2。3。 目前存在的問題

1。依照8802, clone出來一份執行8803,主要用來演示多個消費者的場景

2。啟動8801生產者

3。啟動8802消費者

4。啟動8803消費者

當三個服務都啟動後透過RabbitMQ介面會發現,一個交換機綁定了兩個佇列

Spring Cloud Stream詳解

執行後會發現存在兩個問題:

有重複消費問題

訊息持久化問題

(1)重複消費問題:

傳送訊息後兩個消費者都收到了訊息: http://localhost:8801/send

Spring Cloud Stream詳解

Spring Cloud Stream詳解

比如在如下場景中,訂單系統我們做叢集部署,都會從RabbitMQ中獲取訂單資訊,那如果一個訂單同時被兩個服務獲取到,那麼就會造成資料錯誤,我們得避免這種情況。這時我們就可以使用Stream中的訊息分組來解決

Spring Cloud Stream詳解

注意在Stream中處於同一個group中的多個消費者是競爭關係,就能夠保證訊息只會被其中一個應用消費一次。不同組是可以全面消費的(重複消費),同一組內會發生競爭關係,只有其中一個可以消費。

(2)訊息持久化問題:

當生產者傳送訊息的時候,消費者恰好宕機了,但是過一會消費者恢復了,但是訊息卻沒收到。那也就是意味著訊息佇列是臨時訊息佇列。針對於這一點,大家也可以測試一下,加深一下印象。

2。4。 分組解決重複消費問題

原理:微服務應用放置於同一個group中,就能夠保證訊息只會被其中一個應用消費一次。

同一個組內會發生競爭關係,只有其中一個可以消費。

接下來直接調整兩個消費者為同一個組:新增如下配置

Spring Cloud Stream詳解

當兩個消費者都設定好後啟動,會發現一個問題:

實際上分到一個組對於RabbitMQ來說就是兩個消費者監聽了一個佇列。

一個佇列那也就意味著,當佇列收到一條訊息,哪個消費者誰先消費就是誰的,消費完佇列裡面就沒有了,也就是隻有一個消費者能消費到訊息!

注意:假如不設定group屬性的時候,預設是啟動一個消費者,就會建立一個消費佇列,啟動多個服務就會建立多個佇列。stream預設使用的是RabbitMQ的topic交換機。當傳送者向這個交換機發送訊息的時候,兩個佇列就都會接收到。關於RabbitMQ相關知識本篇不記錄,後續會專門寫RabbitMQ相關文章。

Spring Cloud Stream詳解

最終測試:8802/8803實現了輪詢分組,每次只有一個消費者8801模組的發的訊息只能被8802或8803其中一個接收到,這樣避免了重複消費。

2。5。 訊息持久化

當三個專案都啟動著的時候,現在我們要做幾件事:

停止8802和8803並去除掉8802的分組

group: gxs

,8803不去分組資訊,停止掉專案的時候會發現訊息佇列並沒有刪除,說明一旦設定分組資訊,訊息佇列就不再是臨時佇列。

8801傳送4條訊息

啟動8802然後訊息並沒有列印,沒有收到訊息(注意8802是去掉分組資訊的)

再啟動8803,有分組屬性配置,後臺打出來了MQ上的訊息

原因就是:當兩個專案都停止的時候,佇列並未刪除,而8803還綁定了這個佇列,所以他就算宕機了,又重啟了,依然可以收到訊息。而8802沒有設定分組資訊,他再啟動後系統會給他建立一個臨時佇列,自然而然收不到之前的訊息了。

相關文章

頂部