首頁/ 汽車/ 正文

RocketMQ原始碼之BrokerController核心元件SlaveSynchronize

一、前言

前面我們分析了BrokerController

用於管理

broker負責的topic元資料元件TopicConfigManager,這一篇我們就來分析一下Slave節點的主從同步元件SlaveSynchronize;

RocketMQ原始碼之BrokerController核心元件SlaveSynchronize

二、原始碼導讀

BrokerController有個handleSlaveSynchronize方法,在role為BrokerRole。SLAVE的時候,會註冊一個定時任務,每隔10秒鐘執行一次BrokerController。this。slaveSynchronize。syncAll方法;

private void handleSlaveSynchronize(BrokerRole role) { if (role == BrokerRole。SLAVE) { if (null != slaveSyncFuture) { slaveSyncFuture。cancel(false); } this。slaveSynchronize。setMasterAddr(null); slaveSyncFuture = this。scheduledExecutorService。scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 呼叫syncAll方法拉取master資料進行同步 BrokerController。this。slaveSynchronize。syncAll(); } catch (Throwable e) { log。error(“ScheduledTask SlaveSynchronize syncAll error。”, e); } } }, 1000 * 3, 1000 * 10, TimeUnit。MILLISECONDS); } else { //handle the slave synchronise if (null != slaveSyncFuture) { slaveSyncFuture。cancel(false); } this。slaveSynchronize。setMasterAddr(null); }}

三、原始碼分析

核心方法為syncAll,內部呼叫了4個私有方法進行同步資料,實際獲取資料則是透過remotingClient請求master節點進行獲取;

同步topic元資料;

同步消費者偏移量資料;

同步延遲偏移量;

同步訂閱組配置;

// 主從同步元件public class SlaveSynchronize { private static final InternalLogger log = InternalLoggerFactory。getLogger(LoggerName。BROKER_LOGGER_NAME); private final BrokerController brokerController; // master地址,在一個broker分組裡是有master和slave,dledger是屬於沒有主從概念 private volatile String masterAddr = null; public SlaveSynchronize(BrokerController brokerController) { this。brokerController = brokerController; } public String getMasterAddr() { return masterAddr; } public void setMasterAddr(String masterAddr) { this。masterAddr = masterAddr; } public void syncAll() { // 同步topic元資料 this。syncTopicConfig(); // 同步消費者偏移量資料 this。syncConsumerOffset(); // 同步延遲偏移量 this。syncDelayOffset(); // 同步訂閱組配置 this。syncSubscriptionGroupConfig(); } // 同步topic元資料 private void syncTopicConfig() { // 獲取master地址備份 String masterAddrBak = this。masterAddr; // 判斷master地址備份是和自己的Broker地址是否一致 if (masterAddrBak != null && !masterAddrBak。equals(brokerController。getBrokerAddr())) { try { // 作為slave,從master那邊去傳送請求查詢獲取到一份topic元資料 TopicConfigSerializeWrapper topicWrapper = this。brokerController。getBrokerOuterAPI() 。getAllTopicConfig(masterAddrBak); // 作為slave,自己的topic元資料裡的資料版本號,是不是跟我們從master那邊查詢到的資料版本號是一樣的 // 如果說是不一樣的 if (!this。brokerController。getTopicConfigManager()。getDataVersion() 。equals(topicWrapper。getDataVersion())) { // 作為slave,把自己這裡的資料版本號做一個更新 this。brokerController。getTopicConfigManager()。getDataVersion() 。assignNewOne(topicWrapper。getDataVersion()); this。brokerController。getTopicConfigManager()。getTopicConfigTable()。clear(); // 把slave自己這裡的topic元資料做一個更新 this。brokerController。getTopicConfigManager()。getTopicConfigTable() 。putAll(topicWrapper。getTopicConfigTable()); // 把我們的topic元資料做一個持久化 this。brokerController。getTopicConfigManager()。persist(); log。info(“Update slave topic config from master, {}”, masterAddrBak); } } catch (Exception e) { log。error(“SyncTopicConfig Exception, {}”, masterAddrBak, e); } } } // 同步consumer消費偏移量資料 private void syncConsumerOffset() { String masterAddrBak = this。masterAddr; // 判斷master地址備份是和自己的Broker地址是否一致 if (masterAddrBak != null && !masterAddrBak。equals(brokerController。getBrokerAddr())) { try { // 獲取主節點消費偏移量資料 ConsumerOffsetSerializeWrapper offsetWrapper = this。brokerController。getBrokerOuterAPI()。getAllConsumerOffset(masterAddrBak); // 更新最近的消費偏移量資料 this。brokerController。getConsumerOffsetManager()。getOffsetTable() 。putAll(offsetWrapper。getOffsetTable()); // 持久化 this。brokerController。getConsumerOffsetManager()。persist(); log。info(“Update slave consumer offset from master, {}”, masterAddrBak); } catch (Exception e) { log。error(“SyncConsumerOffset Exception, {}”, masterAddrBak, e); } } } // 同步延遲偏移量 private void syncDelayOffset() { String masterAddrBak = this。masterAddr; // 判斷master地址備份是和自己的Broker地址是否一致 if (masterAddrBak != null && !masterAddrBak。equals(brokerController。getBrokerAddr())) { try { // 獲取所有的delay延遲訊息偏移量 String delayOffset = this。brokerController。getBrokerOuterAPI()。getAllDelayOffset(masterAddrBak); if (delayOffset != null) { // 獲取持久化的磁碟路徑 String fileName = StorePathConfigHelper。getDelayOffsetStorePath(this。brokerController 。getMessageStoreConfig()。getStorePathRootDir()); try { // 持久化 MixAll。string2File(delayOffset, fileName); } catch (IOException e) { log。error(“Persist file Exception, {}”, fileName, e); } } log。info(“Update slave delay offset from master, {}”, masterAddrBak); } catch (Exception e) { log。error(“SyncDelayOffset Exception, {}”, masterAddrBak, e); } } } // 同步訂閱組配置 private void syncSubscriptionGroupConfig() { String masterAddrBak = this。masterAddr; // 判斷master地址備份是和自己的Broker地址是否一致 if (masterAddrBak != null && !masterAddrBak。equals(brokerController。getBrokerAddr())) { try { // 獲取所有的訂閱消費組配置資料 SubscriptionGroupWrapper subscriptionWrapper = this。brokerController。getBrokerOuterAPI() 。getAllSubscriptionGroupConfig(masterAddrBak); // 判斷master資料版本是否和自己的一致,如果不一致則進行更新 if (!this。brokerController。getSubscriptionGroupManager()。getDataVersion() 。equals(subscriptionWrapper。getDataVersion())) { SubscriptionGroupManager subscriptionGroupManager = this。brokerController。getSubscriptionGroupManager(); // 版本資料號更新 subscriptionGroupManager。getDataVersion()。assignNewOne( subscriptionWrapper。getDataVersion()); // 資料清理 subscriptionGroupManager。getSubscriptionGroupTable()。clear(); // 更新資料 subscriptionGroupManager。getSubscriptionGroupTable()。putAll( subscriptionWrapper。getSubscriptionGroupTable()); // 持久化 subscriptionGroupManager。persist(); log。info(“Update slave Subscription Group from master, {}”, masterAddrBak); } } catch (Exception e) { log。error(“SyncSubscriptionGroup Exception, {}”, masterAddrBak, e); } } }}

磁碟檔案持久化

// 幫助子類把核心資料進行磁碟檔案持久化// 每次進行持久化,新檔案內容寫入。tmp,老檔案內容寫入。bak做備份,老檔案刪除,。tmp重新命名為新檔案public synchronized void persist() { // 先呼叫子類的編碼方法,把記憶體資料編碼成json字串 String jsonString = this。encode(true); if (jsonString != null) { String fileName = this。configFilePath(); try { // 再把json字串內容寫入到磁碟檔案裡去就可以了 MixAll。string2File(jsonString, fileName); } catch (IOException e) { log。error(“persist file ” + fileName + “ exception”, e); } }}

四、總結

BrokerController有個handleSlaveSynchronize方法,在role為BrokerRole。SLAVE的時候,會註冊一個定時任務,每隔10秒鐘執行一次BrokerController。this。slaveSynchronize。syncAll();SlaveSynchronize的syncAll方法分別呼叫了syncTopicConfig、syncConsumerOffset、syncDelayOffset、syncSubscriptionGroupConfig方法;

相關文章

頂部