首頁/ 汽車/ 正文

漫談 Java 平臺上的反應式程式設計

反應式程式設計(Reactive Programming)是一套完整的程式設計體系,既有其指導思想,又有相應的框架和庫的支援,並且在生產環境中有大量實際的應用。在支援度方面,既有大公司參與實踐,也有強大的開源社群的支援。反應式程式設計出現的時間並不短,它所受到的關注度不斷升高。這主要體現在主流程式設計平臺和框架增強了對它的支援,使它得到了更多的受眾,同時也反映了其在開發中的價值。就 Java 平臺來說,幾個突出的事件包括:Java 9中把反應式流規範以 java。util。concurrent。Flow

類的方式新增到了標準庫中;Spring 5 對反應式程式設計模型提供了內建支援,並增加了新的 WebFlux 模組來支援反應式Web應用的開發。

反應式程式設計所涵蓋的內容很多。與其他程式設計正規化一樣,反應式程式設計要求開發人員改變其固有的思維模式,以不同的角度來看問題。對於熟悉了傳統面向物件程式設計正規化的人來說,這樣的思想轉變可能並不那麼容易。反應式程式設計在解決某些問題時有其先天的優勢。在對應用效能要求很高的今天,反應式程式設計更大有用武之地。作為開發人員來說,根據專案的需求和特徵,選擇最適合的程式設計模型可以達到事半功倍的效果。

反應式程式設計相關的術語目前並沒有非常統一的翻譯方法,本文中儘量使用較為常見的譯法或英文原文。

在討論反應式程式設計之前,首先必須要提到的是《反應式宣言(The Reactive Manifesto)》。反應式宣言中對反應式系統(Reactive Systems)的特徵進行了定義,有如下四個:

及時響應(Responsive)

:系統在儘可能的情況下及時響應請求。

有韌性(Resilient)

:系統在出現失敗時仍然可以及時響應。

有彈性(Elastic)

:在不同的負載下,系統仍然保持及時響應。

訊息驅動(Message Driven)

:系統使用非同步訊息傳遞來確定不同元件之間的邊界,並確保鬆散耦合、隔離和位置透明性。

這四個特徵互相關聯和影響。

及時響應

是核心價值,是反應式系統所追求的目標。

有韌性

有彈性

是反應式系統的外在表現形式,透過它們才能實現

及時響應

這個核心價值。

訊息驅動

則是實現手段。

漫談 Java 平臺上的反應式程式設計

反應式系統的特徵

反應式程式設計的重要概念之一是

負壓(back-pressure)

,是系統在負載過大時的重要反饋手段。當一個元件的負載過大時,可能導致該元件崩潰。為了避免元件失敗,它應該透過負壓來通知其上游元件減少負載。負壓可能會一直級聯往上傳遞,最終到達使用者處,進而對響應的及時性造成影響。這是在系統整體無法滿足過量需求時的自我保護手段,可以保證系統的韌性,不會出現失敗的情況。此時系統應該透過增加資源等方式來做出調整。

反應式流

反應式流(Reactive Streams)

是一個反應式程式設計相關的規範。反應式流為帶負壓的非同步非阻塞流處理提供了標準。反應式流規範的出發點是作為不同反應式框架互操作的基礎,因此它所提供的介面很簡單。

資料傳遞方式

隨著反應式流的出現,我們可以對Java平臺上常見的幾種資料傳遞方式做一下總結和比較。

直接的方法呼叫。資料使用者直接呼叫提供者的方法來獲取資料。這種方式是同步的,呼叫者在方法返回前會被阻塞。呼叫者和提供者之間的耦合最緊。每次方法呼叫只能返回一個數據。(雖然可以使用集合類來返回多個數據,但從概念上來說,集合類仍然只能視為一個數據。)

使用 Iterable 。 Iterable 表示一個可以被列舉的資料的集合,通常用不同的集合型別來表示,如 List 、 Set 和 Map 等。 Iterable 定義了可以對集合的資料所進行的操作。這些操作是同步的。 Iterable 所包含的資料數量是有限的。

使用 Future 。 Future 表示的是一個可以在未來獲取的結果,由一個非同步操作來負責給出這個結果。在獲取到 Future 物件之後,可以使用 get 方法來獲取到所需要的結果。雖然計算的過程是非同步的, get 方法使用時仍然是阻塞的。 Future 只能表示一個結果。

反應式流。反應式流表示的是非同步無阻塞的資料流,其中包含的元素數量可能是無限的。

上述四種資料傳遞方式,實際上代表了兩個維度上的四個不同的值,如下表所示。

值的數量/ 獲取方式

同步

非同步

1個

方法呼叫

Future

多個

Iterable

反應式流

Java 8 的 java。util。stream。Stream 可以看成是對 Iterable 的一種擴充套件,可以包含無限元素。 Stream 同時又有一部分反應式流實現的特徵,主要體現在其流式介面(Fluent interface)上,也可以做並行處理。不過 Stream 缺少了對負壓的支援。

Future 和 CompletableFuture

Java 中的 Future 把非同步操作進行了抽象,但是隻解決了一半的問題。雖然 Future 所表示的計算是非同步的,但是對計算結果的獲取仍然是同步阻塞的。 Future 原本的設計思路是:當需要執行耗時的計算時,提交該計算任務到 ExecutorService ,並得到一個 Future 物件作為返回值。接著就可以執行其他任務,然後再使用之前得到的 Future 物件來獲取到所需的計算的結果值,再繼續下面的計算。這樣的設計思路有一個突出的問題,那就是在實際的程式設計實踐中,很難找到一個合適的時機來獲取 Future 物件的計算結果。因為 get 方法是阻塞的,如果呼叫早了,主執行緒仍然會被阻塞;如果呼叫晚了,在某種程度上降低了併發的效率。除此之外,如果需要在程式碼的不同部分之間傳遞計算的結果,需要把 Future 物件在不同的物件之間進行傳遞,也增加了系統的耦合性。

Java 8 的 CompletableFuture 的出現解決了上面提到的 Future 的問題。而解決的辦法是允許非同步操作進行級聯。比如有一個服務用來生成報表,另外一個服務用來發送電子郵件。生成報表的服務返回的是 CompletableFuture 物件,只需要透過 thenApply 或 thenRun 就可以呼叫傳送電子郵件的服務,得到的結果是另外一個 CompletableFuture 物件。在使用 CompletableFuture 時,不需要考慮獲取非同步操作結果的時機,只需要以宣告式的方式定義出對結果的操作即可。這也避免了不必要的 CompletableFuture 物件傳遞。

CompletableFuture 仍然只能表示一個結果。如果把 CompletableFuture 的思路進一步擴充套件,就是反應式流解決問題的思路。在實際中,非同步服務通常都是處理資料流。比如上面提到的傳送電子郵件的服務,會接受來自不同源的資料。反應式流的一個重要目標是確保流的消費者不會因為負載過重而崩潰。

在具體介紹反應式流之前,我們先看一下反應式流會帶來的思維方式的轉變。

流式思考(Thinking in Streams)

反應式流所帶來的程式設計思維模式的改變是轉為

以流為中心

。這是從以邏輯為中心到以資料為中心的轉換,也是命令式到宣告式的轉換。傳統的指令式程式設計正規化以控制流為核心,透過順序、分支和迴圈這 3 種控制結構來完成不同的行為。開發人員在程式中編寫的是執行的步驟;以資料為中心側重的是資料在不同元件的流動。開發人員在程式中編寫的是對資料變化的宣告式反應。

我們透過一個具體的示例來說明以流為中心的思維模式。在電子商務網站中都有購物車這個功能。使用者在購物車介面可以看到所有已經新增的商品,還可以進一步修改商品的數量。當數量更新之後,購物車介面上要顯示更新後的訂單總價。按照一般的面向物件的思路,我們會有一個訂單物件,裡面包含了當前全部的商品,並有一個屬性來表示訂單的總價。當商品數量更新之後,訂單物件中的商品被更新,同時需要重新呼叫計算總價的方法來更新總價屬性值。

下面是按照命令式的思路的基本 Java 程式碼。 updateQty 用來更新訂單商品數量, calculateTotal 用來計算總價。典型的執行流程是先呼叫 updateQty ,再呼叫 calculateTotal 。

class Order { public void updateQty(LineItem item, int qty) { item。qty = qty; this。calculateTotal(); } private void calculateTotal() { this。total = 。。。 }}

如果採用事件驅動的方式,比如典型的Web介面中,情況並不會好太多。我們可以為不同的動作建立相應的事件。每個事件有自己的型別和相應的資料(payload)。比如,商品數量更新事件的資料中會包含商品的ID和新的數量。系統對不同的事件有不同的處理方式。商品數量更新事件其實是對之前的 updateQty 方法呼叫的封裝。引入事件的好處是可以把呼叫者和處理者進行解耦。當直接呼叫 order。updateQty() 方法時,呼叫者和處理者緊密耦合在一起。在引入了事件之後,原來的一個步驟被劃分成3個小步驟:

呼叫者建立事件併發布。

事件中介軟體負責傳遞事件,通常採用事件匯流排(Event Bus)來完成。

處理者接收到事件後進行處理。

事件驅動的方式增加了一定的靈活性,但對資料的處理方式仍然不是很自然。再回到最初的問題,問題的本質在於訂單的總價是會隨著商品的數量而改變的。當商品的數量變化時,訂單物件本身並不會對該變化作出反應來更新自身的總價屬性。如果以反應式的思維模式,那會是不一樣的情況。

在以流為中心是思維模式中,值可能產生變化的變數都是一個流。流中的元素代表了變數在不同時刻的值。如果一個變數的值的變化會引起另外一個變數的變化,則把前一個變數所表示的流作為它所能引起變化另外一個變數對應的流的上游。我們可以把每個商品的數量看成一個流。當數量更新時,流中會產生一個新的元素。流中的元素可能是 1 -> 2 -> 3 -> 2 ,也可能是其他合法的序列。每個元素表示了使用者的一次操作的結果。訂單的總價也是一個流,它的元素表示了由於商品數量變化所對應的總價。總價對應的流中的元素是根據所有商品數量流的元素來產生的。每當任意一個商品數量中產生了新的元素,都會在總價流中產生一個對應的新元素。對總價的計算邏輯使用流的運算子來表示。

接著我們來具體看看怎麼以反應式流的方式來實現購物車。為了更加直觀的展示,這裡我使用的是JavaScript上的反應式庫RxJS。下面的程式碼是一個簡單的購物車頁面。頁面上有3個固定的商品。每個商品有對應的 input 元素。 input 元素的 data-price 屬性表明了商品的單價。函式 calculateItemPrice 的作用是根據一個 input 元素來計算其對應商品的價格,也就是單價乘以數量。

總價的計算邏輯在下面的6行程式碼中。對於每個 input 元素, Rx。Observable。fromEvent 從其 change 事件中創建出一個流。每當 change 事件產生時,流就會產生一個對應的事件物件。對於事件物件,可以透過 target 屬性獲取到對應的 input 元素,再使用 calculateItemPrice 進行計算。在經過 map 運算子之後,流的元素變成了每個商品的價格。流中的初始元素是數量為 1 時的價格。 Rx。Observable。combineLatest 方法的作用是把每個 input 所對應的流進行合併,從每個流中獲取最新的元素,組合成一個數組,作為它所對應的流的元素。我們只需要把陣列的值進行累加,就得到了總價。

<!DOCTYPE html> 使用反應式程式設計的購物車示例

商品1,單價10
商品2,單價15
商品3,單價20
總價:

該頁面的執行效果如下所示。

漫談 Java 平臺上的反應式程式設計

購物車執行效果

從上述程式碼可以看到,反應式流採用了與傳統程式設計不同的思路,更加註重的是資料層面上的抽象,淡化了狀態。

Java 9 的 Flow

下面我們結合 Java 9 中的 java。util。concurrent。Flow 類來說明反應式流規範。Java 9中的 Flow 只是簡單的把反應式流規範的4個介面整合到了一個類中。

Publisher

顧名思義, Publisher 是資料的釋出者。 Publisher 介面只有一個方法 subscribe 來新增資料的訂閱者,也就是下面的 Subscriber 。

Subscriber

Subscriber 是資料的訂閱者。 Subscriber 介面有4個方法,都是作為不同事件的處理器。在訂閱者成功訂閱到釋出者之後,其 onSubscribe(Subscription s) 方法會被呼叫。 Subscription 表示的是當前的訂閱關係。當訂閱成功後,可以使用 Subscription 的 request(long n) 方法來請求釋出者釋出 n 條資料。釋出者可能產生3種不同的訊息通知,分別對應 Subscriber 的另外3個回撥方法。

資料通知:對應 onNext 方法,表示釋出者產生的資料。

錯誤通知:對應 onError 方法,表示釋出者產生了錯誤。

結束通知:對應 onComplete 方法,表示釋出者已經完成了所有資料的釋出。

在上述3種通知中,錯誤通知和結束通知都是終結通知,也就是在終結通知之後,不會再有其他通知產生。

Subscription

Subscription 表示的是一個訂閱關係。除了之前提到的 request 方法之外,還有 cancel 方法用來取消訂閱。需要注意的是,在 cancel 方法呼叫之後,釋出者仍然有可能繼續釋出通知。但訂閱最終會被取消。

Processor

Processor 表示的一種特殊的物件,既是生產者,又是訂閱者。

Publisher 只有在收到請求之後,才會產生資料。這就保證了訂閱者可以根據自己的處理能力,確定要 Publisher 產生的資料量,這就是負壓的實現方式。

Reactor

反應式流規範所提供的API是很簡單的,並不能滿足日常開發的需求。反應式流的價值在於對流以宣告式的方式進行的各種操作,以及不同流之間的整合。這些都需要透過第三方庫來完成。目前Java平臺上主流的反應式庫有兩個,分別是Netflix維護的 RxJava 和 Pivotal 維護的 Reactor。RxJava 是Java平臺反應式程式設計的鼻祖。反應式流規範在很大程度上借鑑了 RxJava 的理念。由於 RxJava 的產生早於反應式流規範,與規範的相容性並不是特別好。Reactor 是一個完全基於反應式流規範的全新實現,也是 Spring 5 預設的反應式框架。

Reactor 的兩個最核心的類是 Flux 和 Mono 。Reactor採用了兩個不同的類來表示流。 Flux 表示的包含0到無限個元素的流,而 Mono 則表示最多一個元素的流。雖然從邏輯上來說, Mono 表示的流都可以用 Flux 來表示,這樣的區分使得很多操作的語義更容易理解。比如對一個 Flux 進行 reduce 操作的結果是一個 Mono 。而對一個 Mono 進行 repeat 操作得到的是一個 Flux 。

Flux 和 Mono 的強大之處來源於各種不同的運算子。完整的運算子列表可以參考官方文件。下面對這些運算子做一些基本的分類介紹。

第一類是建立 Flux 和 Mono 的靜態方法。比如 Flux 的 fromArray 、 fromIterable 和 fromStream 方法分別從陣列、 Iterable 和 Stream 中建立 Flux 。 interval 可以根據時間間隔生成從 0 開始的遞增序列。 Mono 還可以從 Runnable 、 Callable 和 CompletableFuture 中建立。

Flux。fromArray(new String[] {“a”, “b”, “c”}) 。subscribe(System。out::println);Mono。fromFuture(CompletableFuture。completedFuture(“Hello World”)) 。subscribe(System。out::println);

第二類是緩衝上游流中的元素的運算子,包括 buffer 、 bufferTimeout 、 bufferWhen 、 bufferUntil 、 bufferWhile 、 window 、 windowTimeout 、 windowWhen 、 windowUntil 和 windowWhile 等。 buffer 等方法按照元素數量和/或間隔時間來收集元素,把原始的 Flux 轉換成 Flux> 。 window 等方法與 buffer 作用類似,只不過是把原始的 Flux 轉換成 Flux> 。

使用 bufferTimeout 可以用簡潔的方式解決一些複雜的問題。比如,有一個執行批次處理的服務,我們需要在請求數量達到某個閾值時馬上執行批次處理,或者給定的時間間隔過去之後也要執行批次處理。這樣既可以在負載高時降低批次處理的壓力,又可以在負載低時保證及時性。

在下面的程式碼中, Flux。interval 用來生成遞增的序列,其中第一個 Flux 的時間間隔是 100 毫秒,第二個 Flux 的時間間隔是 10 毫秒,並有一秒的延遲。兩個 Flux 表示的流被 merge 合併。 bufferTimeout 的設定是最多 10 個元素和最長 500 毫秒。由於生成的流是無限的,我們使用 take(3) 來取前面3個元素。 toStream() 是把 Flux 轉換成Java 8的 Stream ,這樣可以阻止主執行緒退出直到流中全部元素被消費。在最初的 500 毫秒,只有第一個 Flux 產生資料,因此得到的 List 中只包含5個元素。在接著的 500 毫秒,由於時間精確度的原因,在 List 中仍然是可能有來自第二個 Flux 的元素。第三個 List 則包含 10 個元素。

Flux。merge( Flux。interval(Duration。ofMillis(100))。map(v -> “a” + v), Flux。interval(Duration。ofSeconds(1), Duration。ofMillis(10))。map(v -> “b” + v))。bufferTimeout(10, Duration。ofMillis(500)) 。take(3) 。toStream() 。forEach(System。out::println);

第三類是收集運算子,包括 collect 、 collectList 、 collectMap 、 collectMultimap 和 collectSortedList 等,用來把流中的元素收集到不同的集合物件中。

第四類是流合併運算子,包括 concat 和 merge 等。 concat 和 merge 都可以合併多個流,不同之處在於 concat 會在完全消費前一個流之後,才開始消費下一個流;而 merge 則同時消費所有流,來自不同流的元素會交織在一起。

第五類是流轉換合併運算子,包括 concatMap 和 flatMap 。這些運算子都把原始流的每個元素轉換成一個新的流,再合併這些新生成的流。在合併流時, concatMap 的語義與 concat 相似,而 flatMap 的語義與 merge 相似。下面程式碼的輸出結果是 0 、 0 、 1 、 0 、 1 、 2 。

Flux。just(1, 2, 3)。concatMap(v -> Flux。interval(Duration。ofMillis(100))。take(v)) 。toStream() 。forEach(System。out::println);

第六類是對流元素進行處理的運算子。這一類運算子種類很多,也比較容易理解。比如對流中元素進行轉換的 map ,對元素進行過濾的 filter ,去掉重複元素的 distinct ,從流中抽取給定數量元素的 take 和跳過流中給定數量元素的 skip 。

除了上述這些之外,還有其他不同的運算子,具體參見官方文件。

總結

反應式程式設計在解決某些問題時有其獨到之處,可以作為傳統程式設計正規化的良好補充,也可以從頭開發一個完整的反應式應用。要了解反應式程式設計,最重要的是思維模式的轉變。這不可能一蹴而就,只能透過大量的實戰開發來獲取相關經驗。大膽在你的下一個專案中使用反應式程式設計吧,肯定會有不一樣的體驗。

相關文章

頂部