首頁/ 汽車/ 正文

Spring-Cloud如何非同步跨執行緒查詢鏈路日誌(附例項)

概述

由於分散式系統節點眾多,排查錯誤日誌要涉及到多個節點,如果在多個節點中沒有唯一的請求id來把各個節點的請求日誌串聯起來,那麼查詢起來就會耗時耗力,因此

Spring Sleuth

出現了(Spring Sleuth基於

Google dapper

論文實現,詳細瞭解可以檢視此論文)。

Sleuth會在接收請求的入口透過Filter生成唯一的標識

TraceId

,這個TraceId會一直跟隨請求路徑傳遞到各個節點,只要有日誌輸出就會把TraceId的值打印出來,如下圖(正常還會生成SpanId,為了便於理解沒展現)

Spring-Cloud如何非同步跨執行緒查詢鏈路日誌(附例項)

分散式系統傳遞traceId示意圖

假如線上發生問題,要排查日誌,那麼根據這個

TraceId

,就能夠快速查詢到各個節點對應的請求日誌,但是唯一的遺憾是非同步執行會丟失TraceId,因此這裡介紹非同步跨執行緒下如何保證TraceId不丟失的問題

我們在

官方文件

中找到了非同步傳遞Traceid說明,如下圖

Spring-Cloud如何非同步跨執行緒查詢鏈路日誌(附例項)

大致意思Sleuth預設支援

@Async

傳遞TraceId,並且支援

spring。sleuth。async。enabled

進行控制,同時提供了

LazyTraceExecutor

TraceableExecutorService

TraceableScheduledExecutorService

執行緒包裝類,來支援跨執行緒傳遞TraceId,其中

TraceableScheduledExecutorService

是ScheduledExecutorService類的實現,用於實現定時任務觸發,個人覺得這種需求不是特別多,所以只介紹常用的一些配置,比如

@Async

配置、執行緒池配置、

EventBus

配置,具體檢視後續章節

Asnc配置

預設Sleuth是支援

@Async註解

非同步傳遞TraceId的,但是如果自定義執行緒池,配置不對的情況可能就會導致失效,因為Spring在這快有個bug,詳細瞭解請檢視以下連結:

https://github。com/spring-projects/spring-framework/issues/21484

https://github。com/spring-projects/spring-framework/issues/21559

所以正確配置方法有如下3種

配置方法

方式1(推薦)

這裡用到了Sleuth的LazyTraceExecutor包裝了執行緒池,這樣可以保證trace物件傳到下一個執行緒中

@Configuration@EnableAsync@Role(BeanDefinition。ROLE_INFRASTRUCTURE)public class SpringAsyncConfig extends AsyncConfigurerSupport { @Autowired private BeanFactory beanFactory; @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor。setCorePoolSize(10); executor。setMaxPoolSize(10); executor。setQueueCapacity(500); executor。setThreadNamePrefix(“AsyncExecutor-”); executor。initialize(); return new LazyTraceExecutor(this。beanFactory, executor); }}

方式2

Sleuth初始化時會預設查詢TaskExecutor作為Async的執行緒池,如果查詢不到會獲取預設的執行緒池

@EnableAsync@Configurationpublic class WebConfig { @Bean public TaskExecutor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor。setCorePoolSize(10); executor。setMaxPoolSize(10); executor。setQueueCapacity(500); executor。setThreadNamePrefix(“AsyncExecutor-”); executor。initialize(); return executor; }}

方式3

如果預設不配置任何執行緒池,只在工程中加了

@EnableAsync

註解,那麼Sleuth會使用自帶的執行緒池

SimpleAsyncTaskExecutor

,這個執行緒池每次呼叫都會建立新執行緒,如果呼叫量比較多,建立的執行緒也會非常多,我們知道系統資源是有限的,如果執行緒數過多,會導致程式記憶體吃緊,從而導致OOM,所以不推薦使用這種方式

測試驗證

測試程式碼

Async配置

@Configuration@EnableAsync@Role(BeanDefinition。ROLE_INFRASTRUCTURE)public class SpringAsyncConfig extends AsyncConfigurerSupport { @Autowired private BeanFactory beanFactory; @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor。setCorePoolSize(10); executor。setMaxPoolSize(10); executor。setQueueCapacity(500); executor。setThreadNamePrefix(“AsyncExecutor-”); executor。initialize(); return new LazyTraceExecutor(this。beanFactory, executor); }}

Service

@Service@Slf4jpublic class TestService {@Asyncpublic void printAsyncLog() { log。info(“async log。。。。。”);}}

Controller

@Slf4j@RestController@RequestMapping(“/test/async”)public class AsyncTestWeb { @Autowired private TestService testService; @RequestMapping(value = “/print/log”, method = RequestMethod。GET) public String printLog() { log。info(“sync log。。。。。1。。。。。”); testService。printAsyncLog(); log。info(“sync log。。。。。2。。。。。”); return “success”; }}

請求測試

執行請求test/async/print/log,輸出以下資訊,可以看到TraceId一樣,只有Spanid發生了變化,執行緒名稱字首

AsyncExecutor

與設定字首相同

19:44:54。818, [fae1c9449e12695f fae1c9449e12695f] [http-nio-8080-exec-8] INFO [] com。example。elkdemo。web。AsyncTestWeb printLog:30 - sync log。。。。。1。。。。。 19:44:54。819, [fae1c9449e12695f fae1c9449e12695f] [http-nio-8080-exec-8] INFO [] com。example。elkdemo。web。AsyncTestWeb printLog:32 - sync log。。。。。2。。。。。 19:44:54。819, [fae1c9449e12695f 2d51edbb45896bd8] [AsyncExecutor-2] INFO [] c。e。elkdemo。service。TestService printAsyncLog:50 - async log。。。。。

執行緒池配置

執行緒池執行是透過

TraceableExecutorService

包裝了

ExecutorService

,而且在初始化的時候需要注入進去

BeanFactory

物件,所以執行緒池作為全域性變數和區域性變數配置稍有不同,

注意下面執行緒池設定只是示例程式碼,實際運用中可以根據需求自行修改

全域性變數配置

建構函式初始化(推薦)

@Service@Slf4jpublic class TestService{final BeanFactory beanFactory;private TraceableExecutorService traceableExecutorService;public TestService(BeanFactory beanFactory1) { this。beanFactory = beanFactory1; this。traceableExecutorService = new TraceableExecutorService(beanFactory, Executors。newFixedThreadPool(10), “test”);}/** * 非同步輸出執行緒池日誌 */public void printThreadPoolLog() { traceableExecutorService。execute(() -> log。info(“async thread pool log。。。。。”));}}

單例初始化

@Service@Slf4jpublic class TestService {@Autowiredprivate BeanFactory beanFactory;volatile TraceableExecutorService traceableExecutorService;public TraceableExecutorService getTraceableExecutorService() { if (traceableExecutorService == null) { synchronized (TraceableExecutorService。class) { if (traceableExecutorService == null) { traceableExecutorService = new TraceableExecutorService(beanFactory, Executors。newFixedThreadPool(10), “test”); } } } return traceableExecutorService;}/** * 非同步輸出執行緒池日誌 */public void printThreadPoolLog() { TraceableExecutorService executorService = getTraceableExecutorService(); executorService。execute(() -> log。info(“async thread pool log。。。。。”));}}

透過InitializingBean的afterPropertiesSet進行初始化

@Service@Slf4jpublic class TestService implements InitializingBean {@Autowiredprivate BeanFactory beanFactory;private TraceableExecutorService traceableExecutorService;@Overridepublic void afterPropertiesSet() { traceableExecutorService = new TraceableExecutorService(beanFactory, Executors。newFixedThreadPool(10), “test”);}/** * 非同步輸出執行緒池日誌 */public void printThreadPoolLog() { traceableExecutorService。execute(() -> log。info(“async thread pool log。。。。。”));}}

區域性變數配置

/** * 非同步輸出執行緒池日誌 */public void printThreadPoolLog2() { TraceableExecutorService executorService = new TraceableExecutorService(beanFactory, Executors。newFixedThreadPool(10), “test”); executorService。execute(() -> log。info(“async thread pool log。。。。。”));}

測試驗證

這裡採用全域性變數配置方式測試

測試程式碼

Controller

@Slf4j@RestController@RequestMapping(“/test/async”)public class AsyncTestWeb { @Autowired private TestService testService; @RequestMapping(value = “/print/threadPool/log”, method = RequestMethod。GET) public String printThreadPoolLog() { log。info(“sync log。。。。。1。。。。。”); testService。printThreadPoolLog(); log。info(“sync log。。。。。2。。。。。”); return “success”; }}

Service

service採用建構函式方式進行初始化

@Service@Slf4jpublic class TestService{final BeanFactory beanFactory;private TraceableExecutorService traceableExecutorService;public TestService(BeanFactory beanFactory1) { this。beanFactory = beanFactory1; this。traceableExecutorService = new TraceableExecutorService(beanFactory, Executors。newFixedThreadPool(10), “test”);}/** * 非同步輸出執行緒池日誌 */public void printThreadPoolLog() { traceableExecutorService。execute(() -> log。info(“async thread pool log。。。。。”));}}

請求測試

執行請求/test/async/print/threadPool/log,輸出以下資訊,可以看到Traceid一樣,只有Spanid發生了變化

19:35:13。799, [884212fb58c658c5 884212fb58c658c5] [http-nio-8080-exec-5] INFO [] com。example。elkdemo。web。AsyncTestWeb printThreadPoolLog:38 - sync log。。。。。1。。。。。 19:35:13。801, [884212fb58c658c5 884212fb58c658c5] [http-nio-8080-exec-5] INFO [] com。example。elkdemo。web。AsyncTestWeb printThreadPoolLog:40 - sync log。。。。。2。。。。。 19:35:13。801, [884212fb58c658c5 70008b8d3a97602d] [pool-4-thread-2] INFO [] c。e。elkdemo。service。TestService lambda$printThreadPoolLog$0:37 - async thread pool log。。。。。

EventBus配置

EventBus

配置與執行緒池配置類似,把

TraceableExecutorService

注入到

AsyncEventBus

中即可,因

TraceableExecutorService

類引用了

BeanFactory

例項,所以比原生方式複雜了一點,以下只介紹建構函式的初始化方式,其他初始化方式與執行緒池配置類似,所以這裡就不再舉例說明

建構函式進行初始化

@Component@Slf4jpublic class PushEventBus { private EventBus eventBus; public PushEventBus(BeanFactory beanFactory) { Executor traceableExecutorService = new TraceableExecutorService(beanFactory, Executors。newFixedThreadPool(10), “test”); this。eventBus = new AsyncEventBus(traceableExecutorService); } public void register(Object obj) { eventBus。register(obj); } public void post(Object obj) { eventBus。post(obj); }}

測試驗證

測試程式碼

EventBus

@Component@Slf4jpublic class PushEventBus { private EventBus eventBus; public PushEventBus(BeanFactory beanFactory) { Executor traceableExecutorService = new TraceableExecutorService(beanFactory, Executors。newFixedThreadPool(10), “test”); this。eventBus = new AsyncEventBus(traceableExecutorService); } public void register(Object obj) { eventBus。register(obj); } public void post(Object obj) { eventBus。post(obj); }}

監聽類

@Slf4jpublic class EventListener { /** * 監聽 Integer 型別的訊息 */ @Subscribe public void listenInteger(Integer param) { log。info(“EventListener#listenInteger->{}”,param); } /** * 監聽 String 型別的訊息 */ @Subscribe public void listenString(String param) { log。info(“EventListener#listenString->{}”,param); }}

controller

@Slf4j@RestController@RequestMapping(“/test/async”)public class AsyncTestWeb {@Autowiredprivate PushEventBus pushEventBus;@RequestMapping(value = “/print/guava/log”, method = RequestMethod。GET)public String printGuavaLog() { pushEventBus。register(new EventListener()); log。info(“sync log。。。。。1。。。。。”); pushEventBus。post(“11”); log。info(“sync log。。。。。2。。。。。”); return “success”;}}

請求測試

執行請求/test/async/print/guava/log,輸出以下資訊,可以看到Traceid一樣,只有Spanid發生了變化

19:27:44。234, [50844e0d3909868c 50844e0d3909868c] [http-nio-8080-exec-3] INFO [] com。example。elkdemo。web。AsyncTestWeb printGuavaLog:48 - sync log。。。。。1。。。。。 19:27:44。236, [50844e0d3909868c 50844e0d3909868c] [http-nio-8080-exec-3] INFO [] com。example。elkdemo。web。AsyncTestWeb printGuavaLog:50 - sync log。。。。。2。。。。。 19:27:44。236, [50844e0d3909868c 702bf55c84873f17] [pool-3-thread-1] INFO [] c。e。elkdemo。service。EventListener listenString:21 - EventListener#listenString->11

相關文章

頂部