1。 RocketMQ整合之非同步傳送
非同步傳送能夠提升傳送效率, 適合高併發場景下使用, 基於RocketMQ整合之普通訊息傳送做改造:
1、增加非同步傳送介面
com。mirson。spring。boot。mq。rocket。basic。provider。RocketMqProviderContorller
/** * 非同步傳送訊息 * @return */ @GetMapping(“/asyncSendString”) public String asyncSendString() { for(int i=0; i<10; i++) { String msg = “seq number: ” + i; final String seq = String。valueOf(i); // 非同步方式傳送 rocketMQTemplate。asyncSend(RabbitMqConfig。TOPIC, msg, new SendCallback() { public void onSuccess(SendResult sendResult) { // 傳送成功回撥處理 log。info(“seq number: ” + seq + “, send result: ” + sendResult。getSendStatus()); } public void onException(Throwable e) { // 傳送異常回調處理 log。error(e。getMessage(), e); } }); } return “async send success”; }
增加asyncSendString非同步傳送介面, 連續傳送十條有序訊息, 呼叫rocketMQTemplate的asyncSend方法, 實現非同步傳送, SendCallback內部提供了onSuccess與onException兩個回撥方法,可以針對性的做相應業務處理。
同步方式傳送, 能保證訊息有序傳遞, 這裡採用非同步傳送,不能保證訊息能夠有序接收, 在實際使用中, 要結合具體的業務場景使用。
2、測試驗證
呼叫非同步傳送介面
非同步方式傳送十條訊息。
監聽器日誌
十條訊息全部接收成功, 注意訊息的傳送順序, 與訂閱的接收順序, 沒有保持一致;非同步方式能夠提升傳送效率, 但缺點是不能保障訊息的有序消費,在實際使用中, 可以結合同步鎖來使用, 比如可以根據賬戶ID加鎖, 因每個賬戶資料具有獨立性, 這樣可以提升訊息的傳遞傳送效率, 又能保障每個賬戶接收到的資料是有序的。
2。 RocketMQ整合之ACL許可權控制
ACL是Access Control List簡稱, 意為訪問控制列表, 是RocketMQ4。4新加入的功能。加入ACL能夠透過許可權管理控制訊息佇列, 針對不同角色使用者分配不同的佇列操作許可權, 便於許可權管控, 提升訊息佇列資料的安全性。
1、ACL基本處理流程
2、
建立rocketmq-acl工程
3、
工程配置
application。yml
server: port: 12615spring: application: name: rocketmq-acl# RocketMQ配置rocketmq: name-server: 10。10。20。15:9876 # 生產者配置 producer: group: basic-group # 許可權資訊 access-key: rocketmq2 secret-key: 12345678 # 消費者配置 consumer: # 許可權資訊 accessKey: rocketmq2 secret-key: 12345678
注意producer與consumer, 都需配置許可權資訊, accessKey相當於使用者名稱, secret-key相當於密碼。
這些資訊不是隨便填寫, 要與RocketMQ服務配置檔案保持一致。
4、定義訊息監聽器
com。mirson。spring。boot。mq。rocket。acl。consume。StrSpringMessageConsumer
@Service@RocketMQMessageListener( topic = RabbitMqConfig。TOPIC_SPRING_MESSAGE, consumerGroup = RabbitMqConfig。CONSUME_GROUP_SPRING_MESSAGE// accessKey = “RocketMQ”, // 不需再填寫, 會自動從配置檔案中讀取// secretKey = “12345678” // 不需再填寫, 會自動從配置檔案中讀取)@Log4j2public class StrSpringMessageConsumer implements RocketMQListener
透過RocketMQMessageListener註解, 也可以配置accessKey與secretKey資訊, 但在工程配置檔案中我們已經填好, 系統啟動會自動讀取, 可以不用再填寫。
5、定義傳送介面
com。mirson。spring。boot。mq。rocket。acl。provider。RocketMqProviderContorller
/** * 傳送RocketMQ Spring Message封裝訊息 * @return */ @GetMapping(“/sendSpringMessage”) public String sendSpringMessage() { String msg = “random number: ” + RandomUtils。nextInt(0, 100); // Send Spring Message With String SendResult result = rocketMQTemplate。syncSend(RabbitMqConfig。TOPIC_SPRING_MESSAGE , MessageBuilder。withPayload(msg)。build()); log。info(“send result: ” + result。getSendStatus()); return msg; }
傳送一個Spring Message封裝的訊息, 呼叫rocketMQTemplate的syncSend方法傳送資料, 無需加額外引數。
6、RabiitMQ服務端使用者許可權設定
測試之前, 先要確保RocketMQ伺服器開啟了ACL驗證功能。
開啟ACL驗證
修改%RABBITMQ_HOME%/conf/broker。conf檔案, 末尾增加:
#開啟ACL許可權控制功能aclEnable=true
分配使用者許可權
修改%RABBITMQ_HOME%/conf/plain_acl。yml檔案:
globalWhiteRemoteAddresses:accounts:- accessKey: RocketMQ secretKey: 12345678 whiteRemoteAddress: admin: false defaultTopicPerm: DENY defaultGroupPerm: SUB topicPerms: - topicA=DENY - topicB=PUB|SUB - topicC=SUB groupPerms: # the group should convert to retry topic - groupA=DENY - groupB=PUB|SUB - groupC=SUB - accessKey: rocketmq2 secretKey: 12345678 whiteRemoteAddress: 192。168。1。* # if it is admin, it could access all resources admin: true defaultTopicPerm: PUB|SUB defaultGroupPerm: SUB topicPerms: - topicA=DENY - topicB=PUB|SUB - topic_acl_spring_message=PUB|SUB - topic_acl_transaction_spring_message=PUB|SUB - topicC=SUB groupPerms: # the group should convert to retry topic - groupA=DENY - groupB=PUB|SUB - groupC=SUB - group_acl_spring_message=PUB|SUB - group_acl_transaction_spring_message=PUB|SUB
預設該配置檔案下會有兩個使用者, RocketMQ與rocketmq2, 這裡要修改rocketmq2的許可權。
rocketmq2使用者雖然具有admin許可權, 但是Rocketmq的ACL處理原始碼仍要讀取topicPerms屬性配置,否則會報錯, 這裡追加我們用於ACL測試的相關TOPIC與GROUP, 確保rocketmq2使用者擁有測試時的所有許可權, RocketMQ使用者則無許可權:
defaultTopicPerm: PUB|SUB defaultGroupPerm: SUB topicPerms: - topicA=DENY - topicB=PUB|SUB - topic_acl_spring_message=PUB|SUB - topic_acl_transaction_spring_message=PUB|SUB - topicC=SUB groupPerms: # the group should convert to retry topic - groupA=DENY - groupB=PUB|SUB - groupC=SUB - group_acl_spring_message=PUB|SUB - group_acl_transaction_spring_message=PUB|SUB
許可權控制引數說明
欄位
取值
含義
globalWhiteRemoteAddresses
;192。168。
。*;192。168。0。1
全域性IP白名單
accessKey
字串
Access Key 使用者名稱
secretKey
字串
Secret Key 密碼
whiteRemoteAddress
;192。168。
。*;192。168。0。1
使用者IP白名單
admin
true;false
是否管理員賬戶
defaultTopicPerm
DENY;PUB;SUB;PUB|SUB
預設的Topic許可權
defaultGroupPerm
DENY;PUB;SUB;PUB|SUB
預設的ConsumerGroup許可權
topicPerms
topic=許可權
各個Topic的許可權
groupPerms
group=許可權
各個ConsumerGroup的許可權
7、
測試驗證
許可權分配好後, 重啟RabbitMQ服務, 要確保讀取的是我們修改的配置檔案。
1)
啟動NameServer
nohup bin/mqnamesrv >/dev/null 2>&1 &
2)
啟動Broker
sh bin/mqbroker -n 127。0。0。1:9876 -c /usr/local/rocketmq4。4/conf/broker。conf &
3)
關閉Broker
bin/mqshutdown broker
4)
關閉name server
bin/mqshutdown namesrv
使用RocketMQ使用者,傳送資料, 預期應該是無許可權。
修改配置檔案, 啟動服務:
# RocketMQ配置rocketmq: name-server: 10。10。20。15:9876 # 生產者配置 producer: group: basic-group # 許可權資訊 access-key: RocketMQ secret-key: 12345678 # 消費者配置 consumer: # 許可權資訊 accessKey: RocketMQ secret-key: 12345678
訪問傳送介面
http://127。0。0。1:12615/sendSpringMessage
出現異常, 檢視控制檯日誌:
沒有該主題topic_acl_spring_message的操作許可權, ACL正常生效。
使用rocketmq2使用者,傳送資料,預期是可以正常傳送與接收
修改配置檔案:
# RocketMQ配置rocketmq: name-server: 10。10。20。15:9876 # 生產者配置 producer: group: basic-group # 許可權資訊 access-key: rocketmq2 secret-key: 12345678 # 消費者配置 consumer: # 許可權資訊 accessKey: rocketmq2 secret-key: 12345678
呼叫傳送介面
檢視控制檯監聽日誌:
透過ACL許可權控制, 能夠正常傳送與接收佇列資料。
3。 RocketMQ整合之Transaction事務訊息
假設場景,使用者進行轉賬, 先扣除自身的賬戶金額, 再發送訊息通知, 增加對方的賬戶金額, 在傳送訊息通知的過程中如果失敗該如何處理? 為了解決本地事務執行與訊息傳送的原子性問題, RocketMQ推出了Transaction事務訊息(並非分散式事務解決方案, 但可以基於此功能,與補償機制實現一套方案)。 具體處理機制:
1、
仍採用ACL機制, 基於rocketmq-acl工程改造實現
2、
增加接收監聽器
com。mirson。spring。boot。mq。rocket。acl。consume。StringTransactionConsumer
@Service@RocketMQMessageListener( topic = RabbitMqConfig。TOPIC_SPRING_TRANSACTION_MESSAGE, consumerGroup = RabbitMqConfig。CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE)@Log4j2public class StringTransactionConsumer implements RocketMQListener
這裡訂閱的GROUP為RabbitMqConfig。CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE。
3、
自定義事務監聽器
com。mirson。spring。boot。mq。rocket。acl。config。TransactionListener
@RocketMQTransactionListener( txProducerGroup = RabbitMqConfig。CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE)@Log4j2public class TransactionListener implements RocketMQLocalTransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap
透過註解RocketMQTransactionListener實現自定義事務監聽器, txProducerGroup要與上面監聽器配置的Group一致,如果接收監聽器不指定Group, 將採用RocketMQ預設的事務控制處理器。
這裡監聽器的主要作用, 控制訊息的提交與回滾, 透過取模計算, 將結果為1的資料進行回滾並列印。
4、
定義傳送介面
/** * 傳送RocketMQ Transaction 事務訊息 * @return */ @GetMapping(“/sendTransactionMessage”) public String sendTransactionMessage() { for (int i = 0; i < 10; i++) { Message msg = MessageBuilder。withPayload(“seq number ” + i)。 setHeader(RocketMQHeaders。TRANSACTION_ID, “KEY_” + i)。build(); SendResult sendResult = rocketMQTemplate。sendMessageInTransaction(RabbitMqConfig。CONSUME_GROUP_TRANSACTION_SPRING_MESSAGE, RabbitMqConfig。TOPIC_SPRING_TRANSACTION_MESSAGE, msg, null); log。info(“seq ” + i + “ send result: ” + sendResult。getSendStatus()); } return “send transaction message success。”; }
定義sendTransactionMessage介面傳送事務訊息, 這裡連續傳送十條事務訊息,呼叫rocketMQTemplate 的sendMessageInTransaction方法, 指定配置的組別與主題資訊。
5、
測試驗證
傳送十條事務訊息, 在事務監聽器裡面, 有部分資料會出現回滾, 下面驗證, 監聽器是否正常接收確認的訊息, 能否接收到回滾的訊息。
呼叫介面
http://127。0。0。1:12615/sendTransactionMessage
檢視接收日誌結果
可以看到, 成功傳送了十條資料, 有4條資料出現回滾, 監聽器列印接收了6條資料, 驗證成功。
4。 總結
這裡全面的講解RocketMQ技術點, 相對較多, 也可以看出RocketMQ功能比較豐富, 有較好的擴充套件性,靈活性,適用各種業務場景, 不僅可以與Spring Boot 整合, 還可以支援Spring Cloud Stream 在微服務中應用。RocketMQ還支援訊息軌跡跟蹤, 非同步順序傳送, 併發消費等, 更多功能大家可以再深入研究, 能夠更好的適應生產專案對不同場景的使用要求。