首頁/ 汽車/ 正文

不使用binlog,canal,kafka等只用java+mybatis攔截器

專案中因為要遷庫,所以我要在原專案中接入我的雙寫邏輯,確保新舊兩個庫都有資料寫入,假如新庫寫入失敗,舊庫資料也能寫入,這就確保了重要資料不能丟失。

一開始考慮的方案是使用資料同步工具,像是canal或是DTS等,但是環境這塊卡的比較死,沒有其他花裡胡哨的工具,只能純靠java改寫程式碼來實現了,期間排了不少坑,這裡做個人踩坑記錄

實現效果,批次雙寫全部報200,自測下來還算成功

不使用binlog,canal,kafka等只用java+mybatis攔截器

首先列一下要實現類的目錄,因為我們門戶,任務,和開放介面都是單獨一套springboot然後共用common包的形式,所以這套package每個springboot都要引入

不使用binlog,canal,kafka等只用java+mybatis攔截器

這裡整理得比較匆忙,但重點其實是這幾個類

不使用binlog,canal,kafka等只用java+mybatis攔截器

aop中TargetDataSource之前說過了,還包括DataSourceConfig,DataSourceUtil和DynamicDataSource的配置,可以看我之前寫的這一篇: mybatis動態資料來源配置(附自定義註解實現資料來源切換)

另外一個註解沒有用上,是做多資料來源事務切換的作用,想看程式碼和使用場景的,完全參考如下:配置多個數據源的事務

不使用binlog,canal,kafka等只用java+mybatis攔截器

現在開始講重點,為了追蹤mysql中實時變化的資料,就要用到mybatis攔截器,這是我之前參考的部落格,但不能完全適用於我的業務場景: 用mybatis 攔截器 為insert update操作填充欄位

說一下我的寫法

進入AutoFillInterceptor類,也就是我的攔截器

不使用binlog,canal,kafka等只用java+mybatis攔截器

因為我要引入我的DoubleWriteService做我的雙寫邏輯,這裡會發生第一個坑,因為service載入速度是在攔截器後面的,直接Autowired啟動會報錯,這裡用到了懶載入機制,確保啟動順利

不使用binlog,canal,kafka等只用java+mybatis攔截器

之後進入攔截器的時候先判空,再去SpringUtils取bean,就能呼叫了

不使用binlog,canal,kafka等只用java+mybatis攔截器

附上工具類SpringUtils

@Componentpublic class SpringUtils implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if (SpringUtils。applicationContext == null) { SpringUtils。applicationContext = applicationContext; } } public static ApplicationContext getApplicationContext() { return applicationContext; } //根據name public static Object getBean(String name) { return getApplicationContext()。getBean(name); } //根據型別 public static T getBean(Class clazz) { return getApplicationContext()。getBean(clazz); } public static T getBean(String name, Class clazz) { return getApplicationContext()。getBean(name, clazz); }}

接著往下走方法,這裡遇到的第一個坑就是無限迴圈呼叫導致的stackoverflow,看過前面那篇部落格的都知道,這個mybatis攔截器的作用其實就是去監控各類dao增改的操作,如果走到我的dao裡,就會導致再次進入攔截器,然後再進入我的dao,迴圈往復導致了爆棧,所以我的操作很簡單,就是取MappedStatement中的id,也就是完整的包路徑+Dao類名,如果是就直接返回

不使用binlog,canal,kafka等只用java+mybatis攔截器

接下來就是處理我的第一個arg,這個args是如何產生的呢,開頭我們宣告的就是args,第一個就肯定是MappedStatement了,第二個Object是我們的傳參,可以是實體類或是map,這個之後會講

不使用binlog,canal,kafka等只用java+mybatis攔截器

取MappedStatement的用處其實有很多,自己debug的話可以看到許多mybatis分裝的引數,我這裡就取id就夠了,一般常用的就是取sql字串了,這裡註釋的就是取sql並且替換佔位符?的邏輯。我看了好幾篇部落格,都寫的一樣: SpringBoot透過MyBatis攔截器列印完整SQL語句(無問號) 但我自己跑的時候卻發現獲取不到引數,所以我的雙寫不用直接sql的形式做

不使用binlog,canal,kafka等只用java+mybatis攔截器

這裡我就用到了第二個args,也就是增改他可以傳entity類,可以傳map,我這裡做了類判斷和新增修改判斷,然後分別進入我相應的方法裡。我的service入口傳參簡單明瞭,就是傳表名和map/entity的形式

不使用binlog,canal,kafka等只用java+mybatis攔截器

順便說一下列舉的作用,因為我攔截器只能獲得sqlID,為了獲取表名tableName,這裡列舉其實用到了包含,其實就是如下效果,假如表名交lzq_test1,包路徑名叫com。lzq。common。dao。lzqTest1Dao

不使用binlog,canal,kafka等只用java+mybatis攔截器

攔截器獲取的sqlId一般都叫com。lzq。common。dao。lzqTest1Dao。insert,所以這裡用contains去獲取

不使用binlog,canal,kafka等只用java+mybatis攔截器

列舉講完了,之後就進入我的service做主要邏輯了

不使用binlog,canal,kafka等只用java+mybatis攔截器

可以看到insert和update各做了map和obj類的傳參,確保都能順利執行寫入,先講一下各個方法的作用:

void dealWithSql(String sql); 直接傳sql執行sql,因為攔截器那一步獲取不到sql,這個方法沒用上

void update(String tableName, Object o); 更新邏輯入口,支援obj和map

void insert(String tableName, Object o);新增邏輯入口,支援obj和map

String processUpdateBinLog(Map map,String tableName,String isUpdate); 切換資料來源執行的入口,因為他是要上註解,同類呼叫會讓註解失效,所以要設成public外部調取

List getRedisFailList(String tableName,String isUpdate); 我會把dao執行失敗的傳參放入redis裡,這裡統一獲取失敗後的傳參全是jsonstring格式

void handleTransactionals(List tableNameList, List objList,List isUpdateList); 處理事務的方法,表明列表,物件列表和新增修改列表,一一對應,統一成功或失敗,這裡沒用上

進入實現類,所有依賴如圖所示,事務的注掉了因為發現不好用,同類呼叫避免依賴問題再次加上@Lazy

不使用binlog,canal,kafka等只用java+mybatis攔截器

執行緒池根據應用自己配

@Data@Configuration@ConfigurationProperties(prefix = “executor”)@EnableConfigurationProperties(ExecutorConfig。class)public class ExecutorConfig { private int corePoolSize = 5;//2 private int maxPoolSize = 10;//4 private int keepAliveSeconds = 60; private int queueCapacity = 200; @Bean public TraceThreadPoolExecutor traceThreadPoolExecutor() { return new TraceThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, queueCapacity, rejectedExecutionHandler()); } RejectedExecutionHandler rejectedExecutionHandler() { return new ThreadPoolExecutor。CallerRunsPolicy(); }}

雙寫開關根據配置檔案實現

不使用binlog,canal,kafka等只用java+mybatis攔截器

新增/更新方法如下

不使用binlog,canal,kafka等只用java+mybatis攔截器

不使用binlog,canal,kafka等只用java+mybatis攔截器

再往下走,根據需要自己配

public String processUpdateBinLogData(String tableName, Map param,String isUpdate) { //, int threadCount String result; if (null == tableName) { return null; } result = procUpdateBinLogByMultiThread(tableName,param,isUpdate); /** 請求資料超過二十,才使用多執行緒,否則就是單執行緒 */// int threadCount = 2;//requestList。size() > 20 ? 20 : 1;// try { // result = procUpdateBinLogByMultiThread(tableName,param,isUpdate);// if (threadCount > 1) { // /* 多執行緒處理 */// result = procUpdateBinLogByMultiThread(enumByTable,param, threadCount);// } else { // /* 直接處理 */// result = processUpdateBinLog(param, enumByTable);// }// } catch (Exception e) { //// } return result; }

private String procUpdateBinLogByMultiThread(String tableName,Map param,String isUpdate) { //List result = new ArrayList<>(); if (!param。isEmpty()) { processUpdateBinLogDataByThread(param, tableName,isUpdate); }// if (!param。isEmpty() && null != threadCount) { // List> splitList = ListUtils。averageSplit(requestList, threadCount);// if (CollectionUtils。isNotEmpty(splitList)){ // List>> futureList = new ArrayList<>();// for (List r : splitList) { // Future> future = processUpdateBinLogDataByThread(param, enumByTable);// if (null != future){ // futureList。add(future);// }// }// /** 獲取多執行緒返回結果 */// for (Future> r : futureList) { // result。addAll(r。get());// }// }// result。addAll(future。get());// } return null; }

這裡用Future類和執行緒池去走非同步

public Future processUpdateBinLogDataByThread(Map param, String tableName,String isUpdate) { //Future> result = null; Future result = null; //if (CollectionUtils。isNotEmpty(requestList)) { if (!param。isEmpty()) { result = threadPoolExecutor。submit(new TraceAsyncCallableTask() { private final Map paramIn = param; private final String tableNameIn = tableName; private final String isUpdateIn = isUpdate; private Object res; @Override public String getMethod() { return “processUpdateBinLogDataByThread”; } @Override public String[] getParams() { String[] paraArr = new String[1]; return paraArr; } @Override public String getService() { return “DoubleWriteService”; } @Override public Object getRes() { return res; } @Override public String call() { //同類呼叫才能生效註解 return doubleWriteService。processUpdateBinLog(paramIn, tableNameIn,isUpdateIn); } }); } return result; }

注意這裡,用到了切換資料來源的註解

@Override @TargetDataSource(connName = “dbTwo”) //非同步重新生效不註解,換成同類呼叫 public String processUpdateBinLog(Map map, String tableName,String isUpdate) { //List result = new ArrayList<>(); //String result = “fail”; int isSucc = 0; Map queryMap = new HashMap<>(); //String str; if (!map。isEmpty()) { try { //報錯測試 //int i = 1;i = i /0; map = humpToUnderline(map); queryMap。put(“tableName”, tableName); queryMap。put(“fieldsMap”, map); queryMap。put(“queryMap”, map); if (DoubleWriteConstants。UPDATE。equals(isUpdate)){ isSucc = dataMigrationDao。updateTableListDynamic(queryMap); //更新直接去map的主鍵 logger。info(“{}表雙寫{}成功,主鍵->{}”, tableName,isUpdate, map。get(“id”)); }else { isSucc = dataMigrationDao。insertTableListDynamic(queryMap); //新增取mybatis返回主鍵 logger。info(“{}表雙寫{}成功,主鍵->{}”, tableName,isUpdate, queryMap。get(“id”)); } }catch (Exception e){ String jsonStr = JSONObject。toJSONString(map); //key格式:字首+新增/更新+表名 cacheClient。lpush( DoubleWriteConstants。REDIS_KEY_PREFIX + isUpdate + tableName,jsonStr); logger。error(“{}表雙寫{}錯誤,json->{},異常->{}” ,tableName,isUpdate,jsonStr, e。getMessage()); } } return null; }

駝峰轉下劃線邏輯,這裡用hutool的邏輯複製過來的

/** * 把 map 中的 key 由駝峰命名轉為下劃線,使用LinkedHashMap確保欄位順序一致性 */ private HashMap humpToUnderline(Map map) { //使用LinkedHashMap確保欄位順序一致性 HashMap transitionMap = new LinkedHashMap<>(16); map。forEach((k, v) -> transitionMap。put(toUnderlineCase(k), v)); return transitionMap; } public static String toUnderlineCase(CharSequence str) { return toSymbolCase(str, ‘_’); } public static String toSymbolCase(CharSequence str, char symbol) { if (str == null) { return null; } else { int length = str。length(); StringBuilder sb = new StringBuilder(); for(int i = 0; i < length; ++i) { char c = str。charAt(i); Character preChar = i > 0 ? str。charAt(i - 1) : null; if (Character。isUpperCase(c)) { Character nextChar = i < str。length() - 1 ? str。charAt(i + 1) : null; if (null != preChar && Character。isUpperCase(preChar)) { sb。append(c); } else if (null != nextChar && Character。isUpperCase(nextChar)) { if (null != preChar && symbol != preChar) { sb。append(symbol); } sb。append(c); } else { if (null != preChar && symbol != preChar) { sb。append(symbol); } sb。append(Character。toLowerCase(c)); } } else { if (sb。length() > 0 && Character。isUpperCase(sb。charAt(sb。length() - 1)) && symbol != c) { sb。append(symbol); } sb。append(c); } } return sb。toString(); } }

最後是beanToMap邏輯,注意有時候會有轉化失敗問題,會影響到入mybatis的傳參,自己處理

public static Map beanToMap(Object object){ Map map = null; try { map = new HashMap(); BeanInfo beanInfo = Introspector。getBeanInfo(object。getClass()); PropertyDescriptor[] propertyDescriptors = beanInfo。getPropertyDescriptors(); for (PropertyDescriptor property : propertyDescriptors) { String key = property。getName(); if (key。compareToIgnoreCase(“class”) == 0) { continue; } Method getter = property。getReadMethod(); Object value = getter!=null ? getter。invoke(object) : null; map。put(key, value); } //key 可能會把自己的class 和hashcode編進去,直接去掉 map。remove(“class”); } catch (Exception e) { e。printStackTrace(); return new HashMap<>(); } Set set = map。keySet(); Iterator it = set。iterator(); while (it。hasNext()){ String key = it。next(); if (map。get(key)==null || map。get(key)==“”){ map。remove(key); set = map。keySet(); it = set。iterator(); } } if (“false”。equals(map。get(“emtpy”))){ logger。error(“{}雙寫前obj轉化失敗”,object); } return map; }

最後講一下dao,上篇部落格也整理過了,就是動態新增修改

不使用binlog,canal,kafka等只用java+mybatis攔截器

作為消費端使用很方便,只要傳表名和map就完全能用,這裡注意insert要加上useGeneratedKeys = “true” keyProperty = “id”來獲取自增主鍵,這會自動注入到當前map裡

insert into ${ map。tableName} ( `${ key}` ) values ( #{ value} update ${ map。tableName} ${ key}= #{ value} ${ key}= #{ value} where id = #{ map。queryMap。id}

原文連結

https://blog。csdn。net/Koikoi12/article/details/125680976

相關文章

頂部