概述
由於分散式系統節點眾多,排查錯誤日誌要涉及到多個節點,如果在多個節點中沒有唯一的請求id來把各個節點的請求日誌串聯起來,那麼查詢起來就會耗時耗力,因此
Spring Sleuth
出現了(Spring Sleuth基於
Google dapper
論文實現,詳細瞭解可以檢視此論文)。
Sleuth會在接收請求的入口透過Filter生成唯一的標識
TraceId
,這個TraceId會一直跟隨請求路徑傳遞到各個節點,只要有日誌輸出就會把TraceId的值打印出來,如下圖(正常還會生成SpanId,為了便於理解沒展現)
分散式系統傳遞traceId示意圖
假如線上發生問題,要排查日誌,那麼根據這個
TraceId
,就能夠快速查詢到各個節點對應的請求日誌,但是唯一的遺憾是非同步執行會丟失TraceId,因此這裡介紹非同步跨執行緒下如何保證TraceId不丟失的問題
我們在
官方文件
中找到了非同步傳遞Traceid說明,如下圖
大致意思Sleuth預設支援
@Async
傳遞TraceId,並且支援
spring。sleuth。async。enabled
進行控制,同時提供了
•
LazyTraceExecutor
•
TraceableExecutorService
•
TraceableScheduledExecutorService
執行緒包裝類,來支援跨執行緒傳遞TraceId,其中
TraceableScheduledExecutorService
是ScheduledExecutorService類的實現,用於實現定時任務觸發,個人覺得這種需求不是特別多,所以只介紹常用的一些配置,比如
@Async
配置、執行緒池配置、
EventBus
配置,具體檢視後續章節
Asnc配置
預設Sleuth是支援
@Async註解
非同步傳遞TraceId的,但是如果自定義執行緒池,配置不對的情況可能就會導致失效,因為Spring在這快有個bug,詳細瞭解請檢視以下連結:
https://github。com/spring-projects/spring-framework/issues/21484
https://github。com/spring-projects/spring-framework/issues/21559
所以正確配置方法有如下3種
配置方法
方式1(推薦)
這裡用到了Sleuth的LazyTraceExecutor包裝了執行緒池,這樣可以保證trace物件傳到下一個執行緒中
@Configuration@EnableAsync@Role(BeanDefinition。ROLE_INFRASTRUCTURE)public class SpringAsyncConfig extends AsyncConfigurerSupport { @Autowired private BeanFactory beanFactory; @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor。setCorePoolSize(10); executor。setMaxPoolSize(10); executor。setQueueCapacity(500); executor。setThreadNamePrefix(“AsyncExecutor-”); executor。initialize(); return new LazyTraceExecutor(this。beanFactory, executor); }}
方式2
Sleuth初始化時會預設查詢TaskExecutor作為Async的執行緒池,如果查詢不到會獲取預設的執行緒池
@EnableAsync@Configurationpublic class WebConfig { @Bean public TaskExecutor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor。setCorePoolSize(10); executor。setMaxPoolSize(10); executor。setQueueCapacity(500); executor。setThreadNamePrefix(“AsyncExecutor-”); executor。initialize(); return executor; }}
方式3
如果預設不配置任何執行緒池,只在工程中加了
@EnableAsync
註解,那麼Sleuth會使用自帶的執行緒池
SimpleAsyncTaskExecutor
,這個執行緒池每次呼叫都會建立新執行緒,如果呼叫量比較多,建立的執行緒也會非常多,我們知道系統資源是有限的,如果執行緒數過多,會導致程式記憶體吃緊,從而導致OOM,所以不推薦使用這種方式
測試驗證
測試程式碼
Async配置
@Configuration@EnableAsync@Role(BeanDefinition。ROLE_INFRASTRUCTURE)public class SpringAsyncConfig extends AsyncConfigurerSupport { @Autowired private BeanFactory beanFactory; @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor。setCorePoolSize(10); executor。setMaxPoolSize(10); executor。setQueueCapacity(500); executor。setThreadNamePrefix(“AsyncExecutor-”); executor。initialize(); return new LazyTraceExecutor(this。beanFactory, executor); }}
Service
@Service@Slf4jpublic class TestService {@Asyncpublic void printAsyncLog() { log。info(“async log。。。。。”);}}
Controller
@Slf4j@RestController@RequestMapping(“/test/async”)public class AsyncTestWeb { @Autowired private TestService testService; @RequestMapping(value = “/print/log”, method = RequestMethod。GET) public String printLog() { log。info(“sync log。。。。。1。。。。。”); testService。printAsyncLog(); log。info(“sync log。。。。。2。。。。。”); return “success”; }}
請求測試
執行請求test/async/print/log,輸出以下資訊,可以看到TraceId一樣,只有Spanid發生了變化,執行緒名稱字首
AsyncExecutor
與設定字首相同
19:44:54。818, [fae1c9449e12695f fae1c9449e12695f] [http-nio-8080-exec-8] INFO [] com。example。elkdemo。web。AsyncTestWeb printLog:30 - sync log。。。。。1。。。。。 19:44:54。819, [fae1c9449e12695f fae1c9449e12695f] [http-nio-8080-exec-8] INFO [] com。example。elkdemo。web。AsyncTestWeb printLog:32 - sync log。。。。。2。。。。。 19:44:54。819, [fae1c9449e12695f 2d51edbb45896bd8] [AsyncExecutor-2] INFO [] c。e。elkdemo。service。TestService printAsyncLog:50 - async log。。。。。
執行緒池配置
執行緒池執行是透過
TraceableExecutorService
包裝了
ExecutorService
,而且在初始化的時候需要注入進去
BeanFactory
物件,所以執行緒池作為全域性變數和區域性變數配置稍有不同,
注意下面執行緒池設定只是示例程式碼,實際運用中可以根據需求自行修改
全域性變數配置
建構函式初始化(推薦)
@Service@Slf4jpublic class TestService{final BeanFactory beanFactory;private TraceableExecutorService traceableExecutorService;public TestService(BeanFactory beanFactory1) { this。beanFactory = beanFactory1; this。traceableExecutorService = new TraceableExecutorService(beanFactory, Executors。newFixedThreadPool(10), “test”);}/** * 非同步輸出執行緒池日誌 */public void printThreadPoolLog() { traceableExecutorService。execute(() -> log。info(“async thread pool log。。。。。”));}}
單例初始化
@Service@Slf4jpublic class TestService {@Autowiredprivate BeanFactory beanFactory;volatile TraceableExecutorService traceableExecutorService;public TraceableExecutorService getTraceableExecutorService() { if (traceableExecutorService == null) { synchronized (TraceableExecutorService。class) { if (traceableExecutorService == null) { traceableExecutorService = new TraceableExecutorService(beanFactory, Executors。newFixedThreadPool(10), “test”); } } } return traceableExecutorService;}/** * 非同步輸出執行緒池日誌 */public void printThreadPoolLog() { TraceableExecutorService executorService = getTraceableExecutorService(); executorService。execute(() -> log。info(“async thread pool log。。。。。”));}}
透過InitializingBean的afterPropertiesSet進行初始化
@Service@Slf4jpublic class TestService implements InitializingBean {@Autowiredprivate BeanFactory beanFactory;private TraceableExecutorService traceableExecutorService;@Overridepublic void afterPropertiesSet() { traceableExecutorService = new TraceableExecutorService(beanFactory, Executors。newFixedThreadPool(10), “test”);}/** * 非同步輸出執行緒池日誌 */public void printThreadPoolLog() { traceableExecutorService。execute(() -> log。info(“async thread pool log。。。。。”));}}
區域性變數配置
/** * 非同步輸出執行緒池日誌 */public void printThreadPoolLog2() { TraceableExecutorService executorService = new TraceableExecutorService(beanFactory, Executors。newFixedThreadPool(10), “test”); executorService。execute(() -> log。info(“async thread pool log。。。。。”));}
測試驗證
這裡採用全域性變數配置方式測試
測試程式碼
Controller
@Slf4j@RestController@RequestMapping(“/test/async”)public class AsyncTestWeb { @Autowired private TestService testService; @RequestMapping(value = “/print/threadPool/log”, method = RequestMethod。GET) public String printThreadPoolLog() { log。info(“sync log。。。。。1。。。。。”); testService。printThreadPoolLog(); log。info(“sync log。。。。。2。。。。。”); return “success”; }}
Service
service採用建構函式方式進行初始化
@Service@Slf4jpublic class TestService{final BeanFactory beanFactory;private TraceableExecutorService traceableExecutorService;public TestService(BeanFactory beanFactory1) { this。beanFactory = beanFactory1; this。traceableExecutorService = new TraceableExecutorService(beanFactory, Executors。newFixedThreadPool(10), “test”);}/** * 非同步輸出執行緒池日誌 */public void printThreadPoolLog() { traceableExecutorService。execute(() -> log。info(“async thread pool log。。。。。”));}}
請求測試
執行請求/test/async/print/threadPool/log,輸出以下資訊,可以看到Traceid一樣,只有Spanid發生了變化
19:35:13。799, [884212fb58c658c5 884212fb58c658c5] [http-nio-8080-exec-5] INFO [] com。example。elkdemo。web。AsyncTestWeb printThreadPoolLog:38 - sync log。。。。。1。。。。。 19:35:13。801, [884212fb58c658c5 884212fb58c658c5] [http-nio-8080-exec-5] INFO [] com。example。elkdemo。web。AsyncTestWeb printThreadPoolLog:40 - sync log。。。。。2。。。。。 19:35:13。801, [884212fb58c658c5 70008b8d3a97602d] [pool-4-thread-2] INFO [] c。e。elkdemo。service。TestService lambda$printThreadPoolLog$0:37 - async thread pool log。。。。。
EventBus配置
EventBus
配置與執行緒池配置類似,把
TraceableExecutorService
注入到
AsyncEventBus
中即可,因
TraceableExecutorService
類引用了
BeanFactory
例項,所以比原生方式複雜了一點,以下只介紹建構函式的初始化方式,其他初始化方式與執行緒池配置類似,所以這裡就不再舉例說明
建構函式進行初始化
@Component@Slf4jpublic class PushEventBus { private EventBus eventBus; public PushEventBus(BeanFactory beanFactory) { Executor traceableExecutorService = new TraceableExecutorService(beanFactory, Executors。newFixedThreadPool(10), “test”); this。eventBus = new AsyncEventBus(traceableExecutorService); } public void register(Object obj) { eventBus。register(obj); } public void post(Object obj) { eventBus。post(obj); }}
測試驗證
測試程式碼
EventBus
@Component@Slf4jpublic class PushEventBus { private EventBus eventBus; public PushEventBus(BeanFactory beanFactory) { Executor traceableExecutorService = new TraceableExecutorService(beanFactory, Executors。newFixedThreadPool(10), “test”); this。eventBus = new AsyncEventBus(traceableExecutorService); } public void register(Object obj) { eventBus。register(obj); } public void post(Object obj) { eventBus。post(obj); }}
監聽類
@Slf4jpublic class EventListener { /** * 監聽 Integer 型別的訊息 */ @Subscribe public void listenInteger(Integer param) { log。info(“EventListener#listenInteger->{}”,param); } /** * 監聽 String 型別的訊息 */ @Subscribe public void listenString(String param) { log。info(“EventListener#listenString->{}”,param); }}
controller
@Slf4j@RestController@RequestMapping(“/test/async”)public class AsyncTestWeb {@Autowiredprivate PushEventBus pushEventBus;@RequestMapping(value = “/print/guava/log”, method = RequestMethod。GET)public String printGuavaLog() { pushEventBus。register(new EventListener()); log。info(“sync log。。。。。1。。。。。”); pushEventBus。post(“11”); log。info(“sync log。。。。。2。。。。。”); return “success”;}}
請求測試
執行請求/test/async/print/guava/log,輸出以下資訊,可以看到Traceid一樣,只有Spanid發生了變化
19:27:44。234, [50844e0d3909868c 50844e0d3909868c] [http-nio-8080-exec-3] INFO [] com。example。elkdemo。web。AsyncTestWeb printGuavaLog:48 - sync log。。。。。1。。。。。 19:27:44。236, [50844e0d3909868c 50844e0d3909868c] [http-nio-8080-exec-3] INFO [] com。example。elkdemo。web。AsyncTestWeb printGuavaLog:50 - sync log。。。。。2。。。。。 19:27:44。236, [50844e0d3909868c 702bf55c84873f17] [pool-3-thread-1] INFO [] c。e。elkdemo。service。EventListener listenString:21 - EventListener#listenString->11