首頁/ 汽車/ 正文

如何用300行程式碼實現一個完整的執行緒池

開源專案Workflow中有個重要的基礎模組:

程式碼僅300行的C語言執行緒池

本文會伴隨原始碼分析,而

邏輯完備

對稱無差別

的特點於第3部分開始

歡迎跳閱, 或直接到Github主頁上圍觀程式碼

https://github.com/sogou/workflow/blob/master/src/kernel/thrdpool.c

0 - Workflow的thrdpool

Workflow的大招:計算通訊融為一體的非同步排程模式,而計算的核心:Executor排程器,就是基於這個執行緒池實現的。可以說,一個通用而高效的執行緒池,是我們寫C/C++程式碼時離不開的基礎模組。

thrdpool

程式碼位置在

src/kernel/

,不僅可以直接拿來使用,同時也適合閱讀學習。

而更重要的,秉承Workflow專案本身一貫的嚴謹極簡的作風,這個thrdpool程式碼極致簡潔,

實現邏輯上亦非常完備,結構精巧,處處嚴謹,

複雜的併發處理依然可以

對稱無差別,

不得不讓我驚歎:

妙啊!!!

你可能會很好奇,執行緒池還能寫出什麼別緻的新思路嗎?先列出一些,你們細品:

特點1

:建立完執行緒池後,無需記錄任何執行緒id或物件,執行緒池可以透過

一個等一個的方式優雅地去結束

所有執行緒; → 也就是說,每一個

執行緒

都是對等的

特點2

執行緒任務可以由另一個執行緒任務調起

;甚至執行緒池正在

被銷燬時也可以提交下一個任務

;(這很重要,因為執行緒本身很可能是不知道執行緒池的狀態的; → 即,每一個

任務

也是對等的

特點3

:同理,

執行緒任務也可以銷燬這個執行緒池

;(非常完整~ → 每一種

行為

也是對等的,包括

destroy

我真的迫不及待為大家深層解讀一下,這個我願稱之為

“邏輯完備”的執行緒池

1 - 前置知識

第一部分我先從最基本的內容梳理一些個人理解,有基礎的小夥伴可以直接跳過。如果有不準確的地方,歡迎大家指正交流~

為什麼需要執行緒池?(其實思路不僅對執行緒池,對任何有限資源的排程管理都是類似的)

我們知道,

透過pthread

或者

std::thread

建立執行緒,就可以實現多執行緒併發執行我們的程式碼。

但是CPU的核數是固定的,所以真正並行執行的最大值也是固定的,

過多的執行緒除了頻繁建立產生overhead以外,還會導致對系統資源進行爭搶

,這些都是不必要的浪費。

因此我們可以管理有限個執行緒,

迴圈且合理地利用它們

那麼執行緒池一般包含哪些內容呢?

首先是管理若干個工具人執行緒;

其次是管理交給執行緒去執行的任務,這個一般會有一個佇列;

再然後執行緒之間需要一些同步機制,比如mutex、condition等;

最後就是各執行緒池實現上自身需要的其他內容了;

接下來我們看看

Workflow

thrdpool

是怎麼做的。

2 - 程式碼概覽

以下共7步常用思路,足以讓我們把程式碼飛快過一遍。

第1步:先看標頭檔案,有什麼介面。

我們開啟

thrdpool。h

,只需關注這三個:

// 建立執行緒池thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize);// 把任務交給執行緒池的入口int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool); // 銷燬執行緒池void thrdpool_destroy(void (*pending)(const struct thrdpool_task *), thrdpool_t *pool);

第2步:介面上有什麼資料結構。

即,我們如何描述一個交給執行緒池的任務。

struct thrdpool_task{ void (*routine)(void *); // 函式指標 void *context; // 上下文};

第3步:再看實現。c,內部資料結構。

struct __thrdpool{ struct list_head task_queue;// 任務佇列 size_t nthreads; // 執行緒個數 size_t stacksize; // 構造執行緒時的引數 pthread_t tid; // 執行期間記錄的是個zero值 pthread_mutex_t mutex; pthread_cond_t cond; pthread_key_t key; pthread_cond_t *terminate;};

沒有一個多餘,每一個成員都很到位:

tid

:執行緒id,

整個執行緒池只有一個

,它不會奇怪地去記錄任何一個執行緒的id,這樣就不完美了,它平時執行的時候是

空值

,退出的時候,它是用來

實現鏈式等待的關鍵

mutex

cond

是常見的執行緒間同步的工具,其中這個cond是用來給

生產者和消費者

去操作任務佇列用的。

key

:是執行緒池的key,然後會賦予給每個由執行緒池建立的執行緒作為他們的thread local,

用於區分這個執行緒是否是執行緒池建立的

一個

pthread_cond_t *terminate

,這有兩個用途:不僅是退出時的標記位 ,而且還是呼叫退出的那個人要等待的condition。

以上各個成員的用途,好像說了,又好像沒說,是因為

幾乎每一個成員都值得深挖一下

,所以我們記住它們,後面看程式碼的時候就會豁然開朗!

第4步:介面都呼叫了什麼核心函式。

thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize){ thrdpool_t *pool; ret = pthread_key_create(&pool->key, NULL); if (ret == 0) { // 去掉了其他程式碼,但是注意到剛才的tid和terminate的賦值 memset(&pool->tid, 0, sizeof (pthread_t)); pool->terminate = NULL; if (__thrdpool_create_threads(nthreads, pool) >= 0) return pool; 。。。

這裡可以看到

__thrdpool_create_threads()

裡邊最關鍵的,就是迴圈建立

nthreads

個執行緒。

while (pool->nthreads < nthreads) { ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); 。。。

第5步:略讀核心函式的功能。

所以我們在上一步知道了,每個執行緒執行的是

__thrdpool_routine()

不難想象,它會

不停從佇列拿任務出來執行

static void *__thrdpool_routine(void *arg) { 。。。 while (1) { // 1。 從佇列裡拿一個任務出來,沒有就等待 pthread_mutex_lock(&pool->mutex); while (!pool->terminate && list_empty(&pool->task_queue)) pthread_cond_wait(&pool->cond, &pool->mutex); // 2。 執行緒池結束的標誌位,記住它,先跳過 if (pool->terminate) break; // 3。 如果能走到這裡,恭喜你,拿到了任務~ entry = list_entry(*pos, struct __thrdpool_task_entry, list); list_del(*pos); // 4。 先解鎖 pthread_mutex_unlock(&pool->mutex); task_routine = entry->task。routine; task_context = entry->task。context; free(entry); // 5。 再執行 task_routine(task_context); // 6。 這裡也先記住它,意思是執行緒池裡的執行緒可以銷燬執行緒池 if (pool->nthreads == 0) { /* Thread pool was destroyed by the task。 */ free(pool); return NULL; } } 。。。 // 後面還有魔法,留下一章解讀~~~

第6步:把函式之間的關係聯絡起來。

剛才看到的

__thrdpool_routine()

就是執行緒的核心函數了,它可以和誰關聯起來呢?可以和介面

thrdpool_schedule()

關聯上

我們說過,執行緒池上有個佇列管理任務:

每個執行

routine

的執行緒,都是消費者

每個發起

schedule

的執行緒,都是生產者

我們已經看過消費者了,來看看生產者的程式碼:

inline void __thrdpool_schedule(const struct thrdpool_task *task, void *buf, thrdpool_t *pool){ struct __thrdpool_task_entry *entry = (struct __thrdpool_task_entry *)buf; entry->task = *task; pthread_mutex_lock(&pool->mutex); // 新增到佇列裡 list_add_tail(&entry->list, &pool->task_queue); // 叫醒在等待的執行緒 pthread_cond_signal(&pool->cond); pthread_mutex_unlock(&pool->mutex);}

說到這裡,

特點2

就非常清晰了:開篇說的

特點2

是說,”

執行緒任務可以由另一個執行緒任務調起

”。

只要對佇列的管理做得好,顯然消費者所執行的函數里也可以生產

第7步:看其他情況的處理

對於執行緒池來說就是比如銷燬的情況。

介面

thrdpool_destroy()

實現非常簡單:

void thrdpool_destroy(void (*pending)(const struct thrdpool_task *), thrdpool_t *pool) { 。。。 // 1。 內部會設定pool->terminate,並叫醒所有等在佇列拿任務的執行緒 __thrdpool_terminate(in_pool, pool); // 2。 把佇列裡還沒有執行的任務都拿出來,透過pending返回給使用者 list_for_each_safe(pos, tmp, &pool->task_queue) { entry = list_entry(pos, struct __thrdpool_task_entry, list); list_del(pos); if (pending) pending(&entry->task); 。。。 // 後面就是銷燬各種記憶體,同樣有魔法~

在退出的時候,我們那些

已經提交但是還沒有被執行的任務

是絕對不能就這麼扔掉了的,於是我們可以傳入一個

pending()

函式,

上層可以做自己的回收、回撥、或任何保證上層邏輯完備的事情。

設計的完整性,無處不在。

接下來我們就可以跟著我們的核心問題,針對性地看看每個特點都是怎麼實現的。

相關影片推薦

150行程式碼,帶你手寫執行緒池,自行準備linux環境

BAT面試必備:多執行緒、多程序、協程如何選擇及執行緒池如何最高效

學習地址:C/C++Linux伺服器開發/後臺架構師【零聲教育】-學習影片教程-騰訊課堂

需要C/C++ Linux伺服器架構師學習資料加qun

812855908

獲取(資料包括

C/C++,Linux,golang技術,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg

等),免費分享

如何用300行程式碼實現一個完整的執行緒池

3 - 特點1: 一個等待一個的優雅退出

這裡提出一個問題:

執行緒池要退出,如何結束所有執行緒?

一般執行緒池的實現都是需要記錄下所有的執行緒id,或者thread物件,以便於我們去用

join

方法等待它們結束。

不嚴格地用join收拾乾淨會有什麼問題?最直觀的,模組退出時很可能會報

記憶體洩漏

但是我們剛才看,

pool裡並沒有記錄所有的tid呀

?正如開篇說的,

pool上只有一個tid,而且還是個空的值

特點1

正是

Workflow thrdpool

的答案:

無需記錄所有執行緒,我可以讓執行緒挨個自動退出、且一個等待下一個,最終達到呼叫完thrdpool_destroy()後記憶體回收乾淨的目的。

這裡先給一個簡單的圖,假設發起destroy的人是main執行緒,我們如何做到一個等一個退出:

如何用300行程式碼實現一個完整的執行緒池

外部執行緒,比如main,發起destroy

步驟如下:

執行緒的退出,由thrdpool_destroy()設定

pool->terminate

開始。

我們每個執行緒,在

while(1)

裡會第一時間發現terminate,執行緒池要退出了,然後會

break

出這個while迴圈。

注意這個時候,

還持有著mutex鎖

,我們拿出pool上唯一的那個tid,放到我的臨時變數,我會根據拿出來的值做不同的處理。

且我會把我自己的tid放上去

,然後再解mutex鎖。

那麼很顯然,

第一個從pool上拿tid的人,會發現這是個0值,就可以直接結束了

,不用負責等待任何其他人,但我在完全結束之前需要有人負責等待我的結束,所以我會把我的id放上去。

而如果發現自己從pool裡拿到的tid不是0值,

說明我要負責join上一個人

,並且把我的tid放上去,

讓下一個人負責我

最後的那個人,是

那個發現pool->nthreads為0的人,那麼我就可以透過這個terminate

(它本身是個condition)去通知發起destroy的人。

最後發起者就可以退了。

是不是非常

有意思

!!!非常

優雅

的做法!!!

所以我們會發現,其實

大家不太需要知道太多資訊,只需要知道我要負責的上一個人

當然每一步都是非常嚴謹的,結合剛才跳過的第一段魔法感受一下:

static void *__thrdpool_routine(void *arg) { while (1) { // 1。注意這裡還持有鎖 pthread_mutex_lock(&pool->mutex); 。。。 // 等著佇列拿任務出來 // 2。 這既是標識位,也是發起銷燬的那個人所等待的condition if (pool->terminate) break; 。。。 // 執行拿到的任務 } /* One thread joins another。 Don‘t need to keep all thread IDs。 */ // 3。 把執行緒池上記錄的那個tid拿下來,我來負責上一人 tid = pool->tid; // 4。 把我自己記錄到執行緒池上,下一個人來負責我 pool->tid = pthread_self(); // 5。 每個人都減1,最後一個人負責叫醒發起detroy的人 if (——pool->nthreads == 0) pthread_cond_signal(pool->terminate); // 6。 這裡可以解鎖進行等待了 pthread_mutex_unlock(&pool->mutex); // 7。 只有第一個人拿到0值 if (memcmp(&tid, &__zero_tid, sizeof (pthread_t)) != 0) // 8。 只要不0值,我就要負責等上一個結束才能退 pthread_join(tid, NULL); return NULL; // 9。 退出,乾乾淨淨~}

4 - 特點2:執行緒任務可以由另一個執行緒任務調起

在第二部分我們看過原始碼,只要佇列管理得好,執行緒任務裡提交下一個任務是完全OK的。

這很合理。

那麼問題來了,

特點1

又說,我們每個執行緒,

是不需要知道太多執行緒池的狀態和資訊的

。而執行緒池的銷燬是個過程,如果在這個過程間提交任務會怎麼樣呢?

因此

特點2

的一個重要解讀是:

執行緒池被銷燬時也可以提交下一個任務

。而且剛才提過,還沒有被執行的任務,可以透過我們傳入的pending()函式拿回來。

簡單看看銷燬時的嚴謹做法:

static void __thrdpool_terminate(int in_pool, thrdpool_t *pool) { pthread_cond_t term = PTHREAD_COND_INITIALIZER; pthread_mutex_lock(&pool->mutex); // 1。 加鎖設定標誌位,之後的新增任務不會被執行,但可以pending拿到 pool->terminate = &term; // 2。 廣播所有等待的消費者 pthread_cond_broadcast(&pool->cond); if (in_pool) // 3。 這裡的魔法等下講>_<~ { /* Thread pool destroyed in a pool thread is legal。 */ pthread_detach(pthread_self()); pool->nthreads——; } // 4。 如果還有執行緒沒有退完,我會等,注意這裡是while while (pool->nthreads > 0) pthread_cond_wait(&term, &pool->mutex); pthread_mutex_unlock(&pool->mutex); // 5。同樣地等待打算退出的上一個人 if (memcmp(&pool->tid, &__zero_tid, sizeof (pthread_t)) != 0) pthread_join(pool->tid, NULL); }

5 - 特點3:同樣可以線上程任務裡銷燬這個執行緒池

既然執行緒任務可以做任何事情,理論上,

執行緒任務也可以銷燬執行緒池

作為一個邏輯完備的執行緒池,大膽一點,我們把問號去掉。

而且,

銷燬並不會結束當前任務,

它會等這個任務執行完

想象一下,剛才的

__thrdpool_routine()

while(1)

裡拿出來的那個任務,做的事情竟然是發起

thrdpool_destroy().

。。

把上面的圖大膽改一下:

如何用300行程式碼實現一個完整的執行緒池

我們讓一個routine來destroy執行緒池

如果發起銷燬的人,

是我們自己內部的執行緒

,那麼我們就不是等

n

個,而是等

n-1

,少了一個外部執行緒等待我們。如何實現才能讓這些邏輯都完美融合呢?我們把剛才跳過的

三段魔法串起來

看看。

第一段魔法,銷燬的發起者。

如果發起銷燬的人是執行緒池內部的執行緒,那麼它具有較強的自我管理意識

(因為前面說了,會等它這個任務執行完)而我們可以放心大膽地

pthread_detach

,無需任何人join它等待它結束。

static void __thrdpool_terminate(int in_pool, thrdpool_t *pool) { 。。。 // 每個由執行緒池建立的執行緒都設定了一個key,由此判斷是否是in_pool if (in_pool) { /* Thread pool destroyed in a pool thread is legal。 */ pthread_detach(pthread_self()); pool->nthreads——; }

第二段魔法:執行緒池誰來free?

一定是發起銷燬的那個人。所以這裡用

in_pool

來判斷是否是外部的人:

void thrdpool_destroy(void (*pending)(const struct thrdpool_task *), thrdpool_t *pool){ // 已經呼叫完第一段,且挨個pending(未執行的task)了 // 銷燬其他內部分配的記憶體 。。。 // 如果不是內部執行緒發起的銷燬,要負責回收執行緒池記憶體 if (!in_pool) free(pool);}

那現在不是main執行緒發起的銷燬呢

?發起的銷燬的那個內部執行緒,怎麼能保證我可以在最後關頭

把所有資源回收乾淨、調free(pool)、最後功成身退呢

在前面閱讀原始碼第5步,其實我們看過,

__thrdpool_routine()裡有free的地方。

於是現在三段魔法終於串起來了。

第三段魔法:嚴謹的併發。

static void *__thrdpool_routine(void *arg){ while (1) { // 。。。 task_routine(task_context); // 如果routine裡做的事情,是銷燬執行緒池。。。 // 注意這個時候,其他記憶體都已經被destroy裡清掉了,萬萬不可以再用什麼mutex、cond if (pool->nthreads == 0) { /* Thread pool was destroyed by the task。 */ free(pool); return NULL; } 。。。

非常重要的一點,

由於併發,我們是不知道誰先操作的。假設我們稍微改一改這個順序,就又是另一番邏輯

比如我作為一個內部執行緒,在

routine()

裡呼叫

destroy()

期間,發現還有執行緒沒有執行完,我就要等在我的terminate上,待最後看到

nthreads==0

的那個人叫醒我。

然後,我的程式碼繼續執行,函式棧就會從

destroy()

回到

routine()

,也就是上面那幾行,再然後就可以

free(pool)

;由於這時候我已經放飛自我detach了,於是一切順利結束。

你看,無論如何都可以完美地銷燬執行緒池:

如何用300行程式碼實現一個完整的執行緒池

併發是複雜多變的,程式碼是簡潔統一的

是不是

太妙了

!我寫到這裡已經要感動哭了!

6 - 簡單的用法

這個執行緒池只有兩個檔案:

thrdpool。h

thrdpool。c

,而且只依賴核心的資料結構

list。h

。我們把它拿出來玩,自己寫一段程式碼:

void my_routine(void *context) { // 我們要執行的函式 printf(“task-%llu start。\n”, reinterpret_cast(context); );} void my_pending(const struct thrdpool_task *task) { // 執行緒池銷燬後,沒執行的任務會到這裡 printf(“pending task-%llu。\n”, reinterpret_cast(task->context);); } int main() { thrdpool_t *thrd_pool = thrdpool_create(3, 1024); // 建立 struct thrdpool_task task; unsigned long long i; for (i = 0; i < 5; i++) { task。routine = &my_routine; task。context = reinterpret_cast(i); thrdpool_schedule(&task, thrd_pool); // 呼叫 } getchar(); // 卡住主執行緒,按回車繼續 thrdpool_destroy(&my_pending, thrd_pool); // 結束 return 0; }

再列印幾行log,直接編譯就可以跑起來:

如何用300行程式碼實現一個完整的執行緒池

媽媽再也不用擔心我的C語言作業

簡單程度堪比大一上學期C語言作業。

7 - 併發與結構之美

最後談談感受。

看完之後我很後悔為什麼沒有早點看為什麼不早點就可以獲得知識的感覺,並且在淺層看懂之際,我知道自己肯定沒有完全理解到裡邊的精髓,畢竟我不能

深刻地理解到設計者當時對併發的構思和模型上的選擇

我只能說,沒有十多年

頂級的系統呼叫和併發程式設計的功底

寫不出這樣的程式碼,沒有

極致的審美與對品控的偏執

也寫不出這樣的程式碼。

併發程式設計

有很多說道,就正如退出這個這麼簡單的事情,想要做到退出時回收乾淨卻很難。如果說你寫業務邏輯自己管執行緒,退出什麼的sleep(1)都無所謂,

但如果說做框架的人不能把自己的框架做得完美無暇邏輯自洽

就難免讓人感覺差點意思

而這個thrdpool,它作為一個執行緒池,是如此的

邏輯完備

用最對稱簡潔的方式去面對複雜的併發

再次讓我深深地感到震撼:我們身邊那些原始的、底層的、基礎的程式碼,還有很多新思路,還可以寫得如此美。

Workflow專案原始碼地址

https://github.com/sogou/workflow

相關文章

頂部