首頁/ 汽車/ 正文

C++非同步從理論到實踐總覽篇

C++20帶來了coroutine特性, 同時新的execution也在提案過程中, 這兩者都給我們在C++中解決非同步問題帶來了新的思路。 但對比其他語言的實現, C++的協程和後續的execution都存在一定的理解和封裝成本, 本系列的分享我們將圍繞基本的原理, 相應的封裝, 以及剝析優秀的第三方實現, 最終結合筆者framework落地的情況來展開。

1。 糾結的開篇

之前設計我們遊戲用的c++框架的時候, 剛好c++20的coroutine已經發布, 又因為是專門 給game server用的c++ framework, 對多執行緒的訴求相對有限, 或者本著少併發少奇怪的錯誤的原則, 除網路和IO和日誌等少量模組外, 大部分模組主要還是工作在主執行緒上的, 所以當時設計的重點也就放在了c++20 coroutine的包裝和使用上, 更多的使用coroutine來完善非同步的支援。 但如果考慮到framework作為前後端公用框架的話, 原來主要針對主執行緒使用的包裝的coroutine排程器就顯得有些不夠用, 以此作為基礎, 我們開始了嘗試結合比較新的c++非同步思路, 來重新思考應該如何實現一個儘量利用c++新特性, 業務層簡單易用的非同步框架了。

本系列的主要內容也是圍繞這條主線來鋪開, 過程中我們 主要以:

自有的framework非同步實現

- 主要落地嘗試利用c++20的coroutine實現一個業務級的排程器。

asio

- 這個應該不用多說了, 近年來一直高頻迭代, 業界廣泛使用的開源第三方庫, 中間的非同步任務排程, 網路部分的程式碼實現都非常優質。

libunifex

- 最接近當前sender/receiver版 execution提案的可實操版本, c++17/20相容, 但不推薦使用c++17的版本進行任何嘗試, 原因後續檔案會展開。

這幾個庫作為基礎, 逐步展開我們對c++非同步的探索, 然後再回到落地實踐這條主線上, 探討一個業務側使用簡單, 內部高效的非同步庫應該如何來實現並落地。 當然, 我們的側重點主要還是c++非同步的排程和處理上, 網路相關的有部分內容可能會簡單提到, 但不會進行深入的展開。   其實整個嘗試的過程只能說非常不順利了, 當然, 隨著對相關實現的深入理解和細節的深挖, 收益也是頗多的。 閒話不多說了, 我們直接切入主題, 以對非同步的思考來展開這篇總覽的內容。

2。 前塵往事 - rstudio framework實現

rstudio framework的非同步框架由兩塊比較獨立的部分組成:

一部分是源自asio幾年前版本的post和strand部分實現, 另外附加了一些業務側較常用的像Fence等物件;

另外一部分是主執行緒的協程排程器實現, 這部分最早是基於c++17實現的一版stackless 協程; 另外一版則是gcc11。1正式釋出後, 直接用c++20重構了整個實現, 直接使用c++20的coroutine的一個版本。

2。1 asio 部分

這一部分的內容因為後續有asio scheduler實現具體的分析篇章, 這個地方主要以業務側使用進行展開了。

2。1。1 executor概述

去除了各平臺網路處理相關的程式碼

僅保留了post和相關的功能(新版本有executor實現)

早期c++11相容, 無coroutine支援

除網路庫外, asio非常有使用價值的一部分程式碼

2。1。2 一個簡單的使用示例

GJobSystem->Post([]() { //some calculate task here //。。。 GJobSystem->Post( []() { //task notify code here //。。。 }, rstudio::JobSystemType::kLogicJob); }, rstudio::JobSystemType::kWorkJob);

相關的時序圖:

C++非同步從理論到實踐總覽篇

2。1。3 當前框架使用的執行緒結構

C++非同步從理論到實踐總覽篇

預定義的列舉值:

enum class JobSystemType : int { kLogicJob = 0, // logic thread(main thread) kWorkJob, // work thread kSlowJob, // slow work thread(run io or other slow job) kNetworkJob, // add a separate thread for network kNetworkConnectJob, // extra connect thread for network kLogJob, // log thread kNotifyExternalJob, // use external process to report something, 1 thread only~~ kTotalJobTypes,};

不同Job說明:

kLogicJob

主執行緒(邏輯執行緒)執行任務

kWorkJob

Work Thread執行緒池執行任務(多個), 一般是計算量可控的小任務

kSlowJob

IO專用執行緒池, IO相關的任務投遞到本執行緒池

kNetworkJob

目前tbuspp專用的處理執行緒

kNetworkConnectJob

專用的網路連線執行緒, tbuspp模式下不需要

kLogJob

日誌專用執行緒, 目前日誌模組是自己起的執行緒, 可以歸併到此處管理

kNotifyExternalJob

專用的通知執行緒, 如lua error的上報, 使用該型別

2。1。4 Timer任務相關

相關介面:

//NoIgnore versionuint64_t JobSystemModule::AddAlwaysRunJob(JobSystemType jobType, threads::ThreadJobFunction&& periodJob, unsigned long periodTimeMs);uint64_t JobSystemModule::AddTimesRunJob(JobSystemType jobType, threads::ThreadJobFunction&& periodJob, unsigned long periodTimeMs, unsigned int runCount); uint64_t JobSystemModule::AddDelayRunJob(JobSystemType jobType, threads::ThreadJobFunction&& periodJob, unsigned long delayTimeMs); void JobSystemModule::KillTimerJob(uint64_t tid);

本部分並未直接使用asio原始的basic_waitable_timer實現, 而是自己實現的定時任務。

相關影片推薦

[linux]一個讓效能飛起的解決方案,非同步處理到底有哪些不一樣

協程!協程!協程!給你一個吊打面試官的機會

學習地址:C/C++Linux伺服器開發/後臺架構師【零聲教育】-學習影片教程-騰訊課堂

需要C/C++ Linux伺服器架構師學習資料加qun

812855908

獲取(資料包括

C/C++,Linux,golang技術,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg

等),免費分享

C++非同步從理論到實踐總覽篇

2。1。5 線上程池上關聯執行任務 - Strand

特定的情況下, 被派發到Work執行緒池的任務存在依賴關係

需要串聯執行的時候, 這個時候我們需要額外的設施 JobStrand

來保證任務是按先後依賴關係來序列執行的

如下圖中part1, part2, part3, part4序列執行的情況所示

C++非同步從理論到實踐總覽篇

示例程式碼:

auto strand = GJobSystem->RequestStrand(rstudio::JobSystemType::kWorkJob);starnd。Post([](){ //part1~ // 。。。});starnd。Post([](){ //part2~ // 。。。});starnd。Post([](){ //part3~ // 。。。});starnd。Post([](){ //part4~ // 。。。});starnd。Post([](){ GJobSystem->Post([](){ //return code here // 。。。 }, rstudio::JobSystemType::kLogicJob); });

2。1。6 其他輔助設施

JobFence

jobs::JobFencePtr JobSystemModule::RequestFence();

字面義, 柵欄, 起到攔截執行的作用。

一般多用於模組的初始化和結束

如tbuspp在kNetworkJob上的初始化和結束。

C++非同步從理論到實踐總覽篇

示例程式碼(TcpService的初始化)

job_system_module_->Post( [this, workTicket]() { if (!workTicket || workTicket->IsExpired()) return; InitInNetworkThread(); }, JobSystemType::kNetworkJob);period_task_ptr = job_system_module_->AddAlwaysRunJob( JobSystemType::kNetworkJob, [this, workTicket]() { if (!workTicket || workTicket->IsExpired()) return; LoopInNetworkThread(); }, 10);fence_->FenceTo((int)JobSystemType::kNetworkJob);fence_->Wait();

JobNotify && JobWaiter

jobs::JobWaiterPtr JobSystemModule::RequestWaiter();jobs::JobNotifyPtr JobSystemModule::RequestNotify();

批次任務管理使用

等待的方式的區別

JobNotify

: 執行完成呼叫額外指定的回撥。

JobWaiter

: 以Wait的方式在特定執行緒等待所有Job執行完成。

JobTicket

jobs::JobTicketPtr JobSystemModule::RequestTicket();

令牌物件

一般用來處理跨執行緒的生命週期控制

回撥之前先透過IsExpired()來判斷對應物件是否已經釋放

示例程式碼:

GJobSystem->Post( [this, workTicket]() { if (!workTicket || workTicket->IsExpired()) return; InitInNetworkThread(); }, JobSystemType::kNetworkJob);

2。2 asio 與其他實現的對比

正好今年的GDC上有一個<>的分享, 裡面主要講述的是對Halo Infinite的引擎升級, 提供新的JobSystem和新的動態幀的機制來支撐專案的, 我們直接以它為例子來對比一下framework和Halo的實現, 並且也借用Halo Infinite的例子, 來更好的瞭解這種lambda post模式的缺陷, 以及可以改進的點。   Halo引入新的JobSystem主要是為了將老的Tetris結構的併發模式:

C++非同步從理論到實踐總覽篇

向新的基於Dependency的圖狀結構遷移:

C++非同步從理論到實踐總覽篇

他使用的JobSystem的業務Api其實很簡單, 我們直接來看一下相關的程式碼:

JobSystem& jobSsytem = JobSystem::Get();JobGraphHandle graphHandle = jobSystem。CreateJobGraph();JobHandle jobA = jobSystem。AddJob( graphHandle, “JobA”, [](){。。。} );JobHandle jobB = jobSystem。AddJob( graphHandle, “JobB”, [](){。。。} );jobSystem。AddJobToJobDependency(jobA, jobB);jobSystem。SubmitJobGraph(graphHandle);

透過這樣的機制, 就很容易形成如:

C++非同步從理論到實踐總覽篇

另外還有一個用於同步的SyncPoint:

JobSystem& jobSystem = JobSystem::Get();JobGraphHandle graphHandle = jobSystem。CreateJobGraph();SyncPointHandle syncPointX = jobSystem。CreateSyncPoint(graphHandle, “SyncPointX”);JobHandle jobA = jobSystem。AddJob(graphHandle, “JobA”, [](){。。。});JobHandle jobB = jobSystem。AddJob(graphHandle, “JobB”, [](){。。。});jobSystem。AddJobToSyncPointDependency(jobA, syncPointX);jobSystem。AddSyncPointToJobDependency(syncPointX, jobB);jobSystem。SubmitJobGraph(graphHandle);

大致的作用如下:

C++非同步從理論到實踐總覽篇

這樣在workload主動觸發SyncPoint後, 整體執行才會繼續往下推進, 這樣就能方便的加入一些主動的同步點對整個Graph的執行做相關的控制了。

回到asio, 我們前面也介紹了, 使用strand和post(), 我們也能很方便的構造出Graph形的執行情況 , 而SyncPoint其實型別framework中提供的Event, 表達上會略有差異, 但很容易看出兩套實現其實是相當類同的。 這樣的話, Halo 的JobSystem有的所有優缺點, framework基本也同樣存在了, 這裡簡單搬運一下:

C++非同步從理論到實踐總覽篇

對於複雜併發業務的表達以lambda內嵌為主, 雖然這種方式儘可能保證所有程式碼上下文是比較集中的, 對比純粹使用callback的模式有所進步, 但這種自由度過高的方式本身也會存在一些問題, 純粹靠編碼者來維繫併發上下文的正確性, 這種情況下狀態值在lambda之間的傳遞也需要特別的小心, 容易出錯, 並且難以除錯。

2。3 coroutine實現部分

coroutine部分之前的帖子裡已經寫得比較詳細了, 這裡僅給出連結以及簡單的程式碼示例:

如何在C++17中實現stackless coroutine以及相關的任務排程器

C++20 Coroutine例項教學

另外還有一個purecpp大會的演講影片, 主要內容與上述的兩篇文章相關度比較高, 這裡也給出相關的連結, 感興趣的同學可以自行觀看:C++20 coroutine原理與應用

程式碼示例:

//C++ 20 coroutineauto clientProxy = mRpcClient->CreateServiceProxy(“mmo。HeartBeat”);mScheduler。CreateTask20([clientProxy]() -> rstudio::logic::CoResumingTaskCpp20 { auto* task = rco_self_task(); printf(“step1: task is %llu\n”, task->GetId()); co_await rstudio::logic::cotasks::NextFrame{}; printf(“step2 after yield!\n”); int c = 0; while (c < 5) { printf(“in while loop c=%d\n”, c); co_await rstudio::logic::cotasks::Sleep(1000); c++; } for (c = 0; c < 5; c++) { printf(“in for loop c=%d\n”, c); co_await rstudio::logic::cotasks::NextFrame{}; } printf(“step3 %d\n”, c); auto newTaskId = co_await rstudio::logic::cotasks::CreateTask(false, []()-> logic::CoResumingTaskCpp20 { printf(“from child coroutine!\n”); co_await rstudio::logic::cotasks::Sleep(2000); printf(“after child coroutine sleep\n”); }); printf(“new task create in coroutine: %llu\n”, newTaskId); printf(“Begin wait for task!\n”); co_await rstudio::logic::cotasks::WaitTaskFinish{ newTaskId, 10000 }; printf(“After wait for task!\n”); rstudio::logic::cotasks::RpcRequest rpcReq{clientProxy, “DoHeartBeat”, rstudio::reflection::Args{ 3 }, 5000}; auto* rpcret = co_await rpcReq; if (rpcret->rpcResultType == rstudio::network::RpcResponseResultType::RequestSuc) { assert(rpcret->totalRet == 1); auto retval = rpcret->retValue。to(); assert(retval == 4); printf(“rpc coroutine run suc, val = %d!\n”, retval); } else { printf(“rpc coroutine run failed! result = %d \n”, (int)rpcret->rpcResultType); } co_await rstudio::logic::cotasks::Sleep(5000); printf(“step4, after 5s sleep\n”); co_return rstudio::logic::CoNil;} );

執行結果:

step1: task is 1step2 after yield!in while loop c=0in while loop c=1in while loop c=2in while loop c=3in while loop c=4in for loop c=0in for loop c=1in for loop c=2in for loop c=3in for loop c=4step3 5new task create in coroutine: 2Begin wait for task!from child coroutine!after child coroutine sleepAfter wait for task!service yield call finish!rpc coroutine run suc, val = 4!step4, after 5s sleep

整體來看, 協程的使用還是給非同步程式設計帶來了很多便利, 但框架本身的實現其實還是有比較多迭代最佳化的空間的:

asio的排程部分與coroutine部分的實現是分離的

coroutine暫時只支援主執行緒

2。4 小結

上面也結合halo的例項說到了一些限制, 那麼這些問題有沒有好的解決辦法了, 答案是肯定的, 雖然execution並未完全透過提案, 但整體而言, execution新的sender/reciever模型, 對於解決上面提到的一些缺陷, 應該是提供了非常好的思路, 我們下一章節中繼續展開。

3。 so easy - execution就是解?

最開始的想法其實比較簡單, 結合原來的framework, 適當引入提案中的execution一些比較可取的思路, 讓framework的非同步程式設計能更多的吸取c++新特性和execution比較高階的框架抽象能力, 提升整個非同步庫的實現質量。 所以最開始定的主線思路其實是更多的向execution傾斜, 怎麼了解掌握execution, 怎麼與現在的framework結合成了主線思路。

我們選擇的基礎參考庫是來自衝元宇宙這波改名的Meta公司的libunifex, 客觀來說, Meta公司的folly庫, 以及libunifex庫的實現質量, 肯定都是業界前沿的, 對c++新特性的使用和探索, 也是相當給力的。 這些我們後續在分析libunifex具體實現的篇章中也能實際感受到。

但深入瞭解libunifex後, 我們會發現, 它的優點有不少:

嘗試為c++提供表達非同步的框架性結構。

泛型用得出神入化, ponder在它前面基本是小弟級別的, 一系列泛用性特別強的template 程式設計示例, 比如隱含在sender/receiver思路內的lazy evaluate表達, 如何在大量使用泛型的情況下提供業務定製點等等。

結構化的表達併發和非同步, 相關程式碼的編寫從自由發揮自主把控走向框架化, 約束化, 能夠更有序更可靠的表達複雜非同步邏輯

整個執行pipeline的組織, 所有資訊是compile time和runtime完備的, dependencies不會丟失。

節點之間的值型別是強制檢查的, 有問題的情況 , 大多時候compiler time就會報錯。 有不少優點的同時, 也有很多缺點:

整個庫的實現嚴重依賴了c++20 ranges採用的一種定製手段 cpo, 並且也使用了類似ranges的pipe表達方法, 理解相關程式碼存在一定的門坎。(後續會有具體的篇章展開相關的內容)

庫同時向下相容了c++17, 但由於c++17本身特性的限制, 引入了大量的宏, 以及X Macros展開的方式, 導致相關的程式碼閱讀難度進一步提升。 但實際上c++17版本並不具備可維護的價值, 依賴SIFINAE的實現, 如果中間任何一環報錯, 必然需要在N屏的報錯中尋找有效資訊。

libunifex對coroutine的支援存疑, 雖然讓coroutine可以作為一種reciever存在, 但本質上來說, coroutine其實更適合拿來做流程控制的膠水, 而不是作為非同步中的某個節點存在。

預設的scheduler實現質量離工業級還存在一定的距離, 這一點後續的程式碼分析中也會具體提到。 諸多問題的存在, 可能也是execution提案沒有短時間內獲得透過的原因吧, 但整體來說, execution本身的理念還是很有參考價值的, 但以它的現狀來說, 離最終的解肯定還是有比較大的距離的。

4。 嘗試重新思考 - 要什麼, 用什麼

事情到這個點就有點尷尬了, 原有的asio, 架構層面來說, 跟新的execution是存在落差的。 而專案實踐上來說, asio相當穩紮穩打, 而以libunifex當前的狀態來說, 離工業化使用其實是有一定距離的。 但asio作者在21年時候的兩篇演講(更像coding show):

Talking Async Ep1: Why C++20 is the Awesomest Language for Network Programming

Talking Async Ep2: Cancellation in depth第一篇基本整個演示了asio從最開始的callback, 到融入c++20 coroutine後的優雅非同步表達, 我們可以透過下面的程式碼片斷感受一下:

asio相關示例程式碼1

awaitable listen(tcp::acceptor& acceptor, tcp::endpoint target){ for (;;) { auto [e, client] = co_await acceptor。async_accept(use_nothrow_awaitable); if (e) break; auto ex = client。get_executor(); co_spawn(ex, proxy(std::move(client), target), detached); }}

asio相關示例程式碼2

auto [e] = co_await server。async_connect(target, use_nothrow_awaitable); if (!e) { co_await ( ( transfer(client, server, client_to_server_deadline) || watchdog(client_to_server_deadline) ) && ( transfer(server, client, server_to_client_deadline) || watchdog(server_to_client_deadline) ) ); }

對比原來每個async_xxx()函式後接callback的模式, 整個實現可以說是相當的優雅了, 程式碼的可讀性也得到了極大的提高, 這兩段程式碼都來自於上面的演講中, 想深入瞭解的可以直接開啟相關的連結觀看影片, 很推薦大家去看一下。   能夠把複雜的事情用更簡潔易懂的方法表達, 這肯定是讓人振奮的, 當然, 深入瞭解相關實現後, 也會發現存在一些問題, 但我們的本意是參考學習, 得出最終想要的可以比較好的支撐併發和非同步業務的基礎框架, 有這些, 其實已經可以理出一條比較清晰的思路了:

execution部分主要使用它的sender/receiver概念, 和它提供的一些通用的演算法。 移除掉所有因為fallback c++17引入的大量程式碼噪聲。 拋棄它並不完備的各種scheduler實現

協程借鑑部分asio的思路, 首先讓協程可以基於context上下文, 在跨執行緒的情況下使用, 另外更多還是使用原有框架有明確的scheduler的方式對所有協程進行管理和定製的模式。

使用asio的scheduler部分作為execution的底層scheduler實現, 同時也使用asio的timer表達, 去除原始libunifex依賴不同scheduler提供schedule_at()方法來執行定時器相關邏輯的實現。

根據業務需要, 定製一些必要的sender adapter等簡化業務的使用。

嘗試用execution框架對接ISPC等特殊的併發庫, 能夠以一個清晰的方式來表達這種混合環境上執行的邏輯。

本系列涉及的基礎知識和相關內容比較多, 先給出一個臨時的大綱, 後續可能會有調整。 目前的思路是先介紹大家相對熟悉度不那麼高的execution基礎知識和libunifex, 後面再介紹asio相關的scheduler以及coroutine實現, 最後再回歸筆者正在迭代的framework, 這樣一個順序來展開。

原文地址:https://mp。weixin。qq。com/s/DaS67_UUUXC96lQoYLdbxw

相關文章

頂部