首頁/ 汽車/ 正文

「Spring Boot 整合應用」RocketMQ的整合用法(下)

1。 RocketMQ整合之非同步傳送

非同步傳送能夠提升傳送效率, 適合高併發場景下使用, 基於RocketMQ整合之普通訊息傳送做改造:

1、增加非同步傳送介面

com。mirson。spring。boot。mq。rocket。basic。provider。RocketMqProviderContorller

/** * 非同步傳送訊息 * @return */ @GetMapping(“/asyncSendString”) public String asyncSendString() { for(int i=0; i<10; i++) { String msg = “seq number: ” + i; final String seq = String。valueOf(i); // 非同步方式傳送 rocketMQTemplate。asyncSend(RabbitMqConfig。TOPIC, msg, new SendCallback() { public void onSuccess(SendResult sendResult) { // 傳送成功回撥處理 log。info(“seq number: ” + seq + “, send result: ” + sendResult。getSendStatus()); } public void onException(Throwable e) { // 傳送異常回調處理 log。error(e。getMessage(), e); } }); } return “async send success”; }

增加asyncSendString非同步傳送介面, 連續傳送十條有序訊息, 呼叫rocketMQTemplate的asyncSend方法, 實現非同步傳送, SendCallback內部提供了onSuccess與onException兩個回撥方法,可以針對性的做相應業務處理。

同步方式傳送, 能保證訊息有序傳遞, 這裡採用非同步傳送,不能保證訊息能夠有序接收, 在實際使用中, 要結合具體的業務場景使用。

2、測試驗證

呼叫非同步傳送介面

「Spring Boot 整合應用」RocketMQ的整合用法(下)

非同步方式傳送十條訊息。

監聽器日誌

「Spring Boot 整合應用」RocketMQ的整合用法(下)

十條訊息全部接收成功, 注意訊息的傳送順序, 與訂閱的接收順序, 沒有保持一致;非同步方式能夠提升傳送效率, 但缺點是不能保障訊息的有序消費,在實際使用中, 可以結合同步鎖來使用, 比如可以根據賬戶ID加鎖, 因每個賬戶資料具有獨立性, 這樣可以提升訊息的傳遞傳送效率, 又能保障每個賬戶接收到的資料是有序的。

2。 RocketMQ整合之ACL許可權控制

ACL是Access Control List簡稱, 意為訪問控制列表, 是RocketMQ4。4新加入的功能。加入ACL能夠透過許可權管理控制訊息佇列, 針對不同角色使用者分配不同的佇列操作許可權, 便於許可權管控, 提升訊息佇列資料的安全性。

1、ACL基本處理流程

「Spring Boot 整合應用」RocketMQ的整合用法(下)

2、

建立rocketmq-acl工程

「Spring Boot 整合應用」RocketMQ的整合用法(下)

3、

工程配置

application。yml

server: port: 12615spring: application: name: rocketmq-acl# RocketMQ配置rocketmq: name-server: 10。10。20。15:9876 # 生產者配置 producer: group: basic-group # 許可權資訊 access-key: rocketmq2 secret-key: 12345678 # 消費者配置 consumer: # 許可權資訊 accessKey: rocketmq2 secret-key: 12345678

注意producer與consumer, 都需配置許可權資訊, accessKey相當於使用者名稱, secret-key相當於密碼。

這些資訊不是隨便填寫, 要與RocketMQ服務配置檔案保持一致。

4、定義訊息監聽器

com。mirson。spring。boot。mq。rocket。acl。consume。StrSpringMessageConsumer

@Service@RocketMQMessageListener( topic = RabbitMqConfig。TOPIC_SPRING_MESSAGE, consumerGroup = RabbitMqConfig。CONSUME_GROUP_SPRING_MESSAGE// accessKey = “RocketMQ”, // 不需再填寫, 會自動從配置檔案中讀取// secretKey = “12345678” // 不需再填寫, 會自動從配置檔案中讀取)@Log4j2public class StrSpringMessageConsumer implements RocketMQListener { @Override public void onMessage(String str) { log。info(“StrSpringMessageConsumer => receive str: ” + str); }}

透過RocketMQMessageListener註解, 也可以配置accessKey與secretKey資訊, 但在工程配置檔案中我們已經填好, 系統啟動會自動讀取, 可以不用再填寫。

5、定義傳送介面

com。mirson。spring。boot。mq。rocket。acl。provider。RocketMqProviderContorller

/** * 傳送RocketMQ Spring Message封裝訊息 * @return */ @GetMapping(“/sendSpringMessage”) public String sendSpringMessage() { String msg = “random number: ” + RandomUtils。nextInt(0, 100); // Send Spring Message With String SendResult result = rocketMQTemplate。syncSend(RabbitMqConfig。TOPIC_SPRING_MESSAGE , MessageBuilder。withPayload(msg)。build()); log。info(“send result: ” + result。getSendStatus()); return msg; }

傳送一個Spring Message封裝的訊息, 呼叫rocketMQTemplate的syncSend方法傳送資料, 無需加額外引數。

6、RabiitMQ服務端使用者許可權設定

測試之前, 先要確保RocketMQ伺服器開啟了ACL驗證功能。

開啟ACL驗證

修改%RABBITMQ_HOME%/conf/broker。conf檔案, 末尾增加:

#開啟ACL許可權控制功能aclEnable=true

分配使用者許可權

修改%RABBITMQ_HOME%/conf/plain_acl。yml檔案:

globalWhiteRemoteAddresses:accounts:- accessKey: RocketMQ secretKey: 12345678 whiteRemoteAddress: admin: false defaultTopicPerm: DENY defaultGroupPerm: SUB topicPerms: - topicA=DENY - topicB=PUB|SUB - topicC=SUB groupPerms: # the group should convert to retry topic - groupA=DENY - groupB=PUB|SUB - groupC=SUB - accessKey: rocketmq2 secretKey: 12345678 whiteRemoteAddress: 192。168。1。* # if it is admin, it could access all resources admin: true defaultTopicPerm: PUB|SUB defaultGroupPerm: SUB topicPerms: - topicA=DENY - topicB=PUB|SUB - topic_acl_spring_message=PUB|SUB - topic_acl_transaction_spring_message=PUB|SUB - topicC=SUB groupPerms: # the group should convert to retry topic - groupA=DENY - groupB=PUB|SUB - groupC=SUB - group_acl_spring_message=PUB|SUB - group_acl_transaction_spring_message=PUB|SUB

預設該配置檔案下會有兩個使用者, RocketMQ與rocketmq2, 這裡要修改rocketmq2的許可權。

rocketmq2使用者雖然具有admin許可權, 但是Rocketmq的ACL處理原始碼仍要讀取topicPerms屬性配置,否則會報錯, 這裡追加我們用於ACL測試的相關TOPIC與GROUP, 確保rocketmq2使用者擁有測試時的所有許可權, RocketMQ使用者則無許可權:

defaultTopicPerm: PUB|SUB defaultGroupPerm: SUB topicPerms: - topicA=DENY - topicB=PUB|SUB - topic_acl_spring_message=PUB|SUB - topic_acl_transaction_spring_message=PUB|SUB - topicC=SUB groupPerms: # the group should convert to retry topic - groupA=DENY - groupB=PUB|SUB - groupC=SUB - group_acl_spring_message=PUB|SUB - group_acl_transaction_spring_message=PUB|SUB

許可權控制引數說明

欄位

取值

含義

globalWhiteRemoteAddresses

;192。168。

。*;192。168。0。1

全域性IP白名單

accessKey

字串

Access Key 使用者名稱

secretKey

字串

Secret Key 密碼

whiteRemoteAddress

;192。168。

。*;192。168。0。1

使用者IP白名單

admin

true;false

是否管理員賬戶

defaultTopicPerm

DENY;PUB;SUB;PUB|SUB

預設的Topic許可權

defaultGroupPerm

DENY;PUB;SUB;PUB|SUB

預設的ConsumerGroup許可權

topicPerms

topic=許可權

各個Topic的許可權

groupPerms

group=許可權

各個ConsumerGroup的許可權

7、

測試驗證

許可權分配好後, 重啟RabbitMQ服務, 要確保讀取的是我們修改的配置檔案。

1)

啟動NameServer

nohup bin/mqnamesrv >/dev/null 2>&1 &

2)

啟動Broker

sh bin/mqbroker -n 127。0。0。1:9876 -c /usr/local/rocketmq4。4/conf/broker。conf &

3)

關閉Broker

bin/mqshutdown broker

4)

關閉name server

bin/mqshutdown namesrv

使用RocketMQ使用者,傳送資料, 預期應該是無許可權。

修改配置檔案, 啟動服務:

# RocketMQ配置rocketmq: name-server: 10。10。20。15:9876 # 生產者配置 producer: group: basic-group # 許可權資訊 access-key: RocketMQ secret-key: 12345678 # 消費者配置 consumer: # 許可權資訊 accessKey: RocketMQ secret-key: 12345678

訪問傳送介面

http://127。0。0。1:12615/sendSpringMessage

「Spring Boot 整合應用」RocketMQ的整合用法(下)

出現異常, 檢視控制檯日誌:

「Spring Boot 整合應用」RocketMQ的整合用法(下)

沒有該主題topic_acl_spring_message的操作許可權, ACL正常生效。

使用rocketmq2使用者,傳送資料,預期是可以正常傳送與接收

修改配置檔案:

# RocketMQ配置rocketmq: name-server: 10。10。20。15:9876 # 生產者配置 producer: group: basic-group # 許可權資訊 access-key: rocketmq2 secret-key: 12345678 # 消費者配置 consumer: # 許可權資訊 accessKey: rocketmq2 secret-key: 12345678

呼叫傳送介面

「Spring Boot 整合應用」RocketMQ的整合用法(下)

檢視控制檯監聽日誌:

「Spring Boot 整合應用」RocketMQ的整合用法(下)

透過ACL許可權控制, 能夠正常傳送與接收佇列資料。

3。 RocketMQ整合之Transaction事務訊息

假設場景,使用者進行轉賬, 先扣除自身的賬戶金額, 再發送訊息通知, 增加對方的賬戶金額, 在傳送訊息通知的過程中如果失敗該如何處理? 為了解決本地事務執行與訊息傳送的原子性問題, RocketMQ推出了Transaction事務訊息(並非分散式事務解決方案, 但可以基於此功能,與補償機制實現一套方案)。 具體處理機制:

「Spring Boot 整合應用」RocketMQ的整合用法(下)

1、

仍採用ACL機制, 基於rocketmq-acl工程改造實現

2、

增加接收監聽器

com。mirson。spring。boot。mq。rocket。acl。consume。StringTransactionConsumer

@Service@RocketMQMessageListener( topic = RabbitMqConfig。TOPIC_SPRING_TRANSACTION_MESSAGE, consumerGroup = RabbitMqConfig。CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE)@Log4j2public class StringTransactionConsumer implements RocketMQListener { @Override public void onMessage(String message) { log。info(“StringTransactionConsumer => receive transaction str: ” + message); }}

這裡訂閱的GROUP為RabbitMqConfig。CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE。

3、

自定義事務監聽器

com。mirson。spring。boot。mq。rocket。acl。config。TransactionListener

@RocketMQTransactionListener( txProducerGroup = RabbitMqConfig。CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE)@Log4j2public class TransactionListener implements RocketMQLocalTransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap localTrans = new ConcurrentHashMap(); @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { String transId = (String)msg。getHeaders()。get(RocketMQHeaders。TRANSACTION_ID); int value = transactionIndex。getAndIncrement(); int status = value % 3; localTrans。put(transId, status); if (status == 0) { return RocketMQLocalTransactionState。COMMIT; } if (status == 1) { log。info(“ # ROLLBACK # Simulating %s related local transaction exec failed! {}”, new String((byte[])msg。getPayload())); return RocketMQLocalTransactionState。ROLLBACK; } return RocketMQLocalTransactionState。UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String transId = (String)msg。getHeaders()。get(RocketMQHeaders。TRANSACTION_ID); RocketMQLocalTransactionState retState = RocketMQLocalTransactionState。COMMIT; Integer status = localTrans。get(transId); if (null != status) { switch (status) { case 0: retState = RocketMQLocalTransactionState。UNKNOWN; break; case 1: retState = RocketMQLocalTransactionState。COMMIT; break; case 2: retState = RocketMQLocalTransactionState。COMMIT; break; } } return retState; }}

透過註解RocketMQTransactionListener實現自定義事務監聽器, txProducerGroup要與上面監聽器配置的Group一致,如果接收監聽器不指定Group, 將採用RocketMQ預設的事務控制處理器。

這裡監聽器的主要作用, 控制訊息的提交與回滾, 透過取模計算, 將結果為1的資料進行回滾並列印。

4、

定義傳送介面

/** * 傳送RocketMQ Transaction 事務訊息 * @return */ @GetMapping(“/sendTransactionMessage”) public String sendTransactionMessage() { for (int i = 0; i < 10; i++) { Message msg = MessageBuilder。withPayload(“seq number ” + i)。 setHeader(RocketMQHeaders。TRANSACTION_ID, “KEY_” + i)。build(); SendResult sendResult = rocketMQTemplate。sendMessageInTransaction(RabbitMqConfig。CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE, RabbitMqConfig。TOPIC_SPRING_TRANSACTION_MESSAGE, msg, null); log。info(“seq ” + i + “ send result: ” + sendResult。getSendStatus()); } return “send transaction message success。”; }

定義sendTransactionMessage介面傳送事務訊息, 這裡連續傳送十條事務訊息,呼叫rocketMQTemplate 的sendMessageInTransaction方法, 指定配置的組別與主題資訊。

5、

測試驗證

傳送十條事務訊息, 在事務監聽器裡面, 有部分資料會出現回滾, 下面驗證, 監聽器是否正常接收確認的訊息, 能否接收到回滾的訊息。

呼叫介面

http://127。0。0。1:12615/sendTransactionMessage

「Spring Boot 整合應用」RocketMQ的整合用法(下)

檢視接收日誌結果

「Spring Boot 整合應用」RocketMQ的整合用法(下)

可以看到, 成功傳送了十條資料, 有4條資料出現回滾, 監聽器列印接收了6條資料, 驗證成功。

4。 總結

這裡全面的講解RocketMQ技術點, 相對較多, 也可以看出RocketMQ功能比較豐富, 有較好的擴充套件性,靈活性,適用各種業務場景, 不僅可以與Spring Boot 整合, 還可以支援Spring Cloud Stream 在微服務中應用。RocketMQ還支援訊息軌跡跟蹤, 非同步順序傳送, 併發消費等, 更多功能大家可以再深入研究, 能夠更好的適應生產專案對不同場景的使用要求。

相關文章

頂部