首頁/ 汽車/ 正文

如何用 Java 幾分鐘處理完 30 億個資料?

1. 場景說明

現有一個 10G 檔案的資料,裡面包含了 18-70 之間的整數,分別表示 18-70 歲的人群數量統計。假設年齡範圍分佈均勻,分別表示系統中所有使用者的年齡數,找出重複次數最多的那個數,現有一臺記憶體為 4G、2 核 CPU 的電腦,請寫一個演算法實現。

23,31,42,19,60,30,36,……。。

2. 模擬資料

Java 中一個整數佔 4 個位元組,模擬 10G 為 30 億左右個數據, 採用追加模式寫入 10G 資料到硬盤裡。

每 100 萬個記錄寫一行,大概 4M 一行,10G 大概 2500 行資料。

package bigdata; import java。io。*;import java。util。Random; /** * @Desc: * @Author: bingbing * @Date: 2022/5/4 0004 19:05 */public class GenerateData { private static Random random = new Random(); public static int generateRandomData(int start, int end) { return random。nextInt(end - start + 1) + start; } /** * 產生10G的 1-1000的資料在D盤 */ public void generateData() throws IOException { File file = new File(“D:\\ User。dat”); if (!file。exists()) { try { file。createNewFile(); } catch (IOException e) { e。printStackTrace(); } } int start = 18; int end = 70; long startTime = System。currentTimeMillis(); BufferedWriter bos = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, true))); for (long i = 1; i < Integer。MAX_VALUE * 1。7; i++) { String data = generateRandomData(start, end) + “,”; bos。write(data); // 每100萬條記錄成一行,100萬條資料大概4M if (i % 1000000 == 0) { bos。write(“\n”); } } System。out。println(“寫入完成! 共花費時間:” + (System。currentTimeMillis() - startTime) / 1000 + “ s”); bos。close(); } public static void main(String[] args) { GenerateData generateData = new GenerateData(); try { generateData。generateData(); } catch (IOException e) { e。printStackTrace(); } }}

上述程式碼調整引數執行 2 次,湊 10G 資料在 D 盤 User。dat 檔案裡:

如何用 Java 幾分鐘處理完 30 億個資料?

準備好 10G 資料後,接著寫如何處理這些資料。

3. 場景分析

10G 的資料比當前擁有的執行記憶體大的多,不能全量載入到記憶體中讀取。如果採用全量載入,那麼記憶體會直接爆掉,只能按行讀取。Java 中的 bufferedReader 的 readLine() 按行讀取檔案裡的內容。

4. 讀取資料

首先,我們寫一個方法單執行緒讀完這 30 億資料需要多少時間,每讀 100 行列印一次:

private static void readData() throws IOException { BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(FILE_NAME), “utf-8”)); String line; long start = System。currentTimeMillis(); int count = 1; while ((line = br。readLine()) != null) { // 按行讀取 if (count % 100 == 0) { System。out。println(“讀取100行,總耗時間: ” + (System。currentTimeMillis() - start) / 1000 + “ s”); System。gc(); } count++; } running = false; br。close();}

按行讀完 10G 的資料大概 20 秒,基本每 100 行,1 億多資料花 1 秒,速度還挺快。

如何用 Java 幾分鐘處理完 30 億個資料?

5. 處理資料

5.1 思路一

透過單執行緒處理,初始化一個 countMap,key 為年齡,value 為出現的次數。將每行讀取到的資料按照 “,” 進行分割,然後獲取到的每一項進行儲存到 countMap 裡。如果存在,那麼值 key 的 value+1。

for (int i = start; i <= end; i++) { try { File subFile = new File(dir + “\\” + i + “。dat”); if (!file。exists()) { subFile。createNewFile(); } countMap。computeIfAbsent(i + “”, integer -> new AtomicInteger(0)); } catch (FileNotFoundException e) { e。printStackTrace(); } catch (IOException e) { e。printStackTrace(); }}

單執行緒讀取並統計 countMap:

publicstatic void splitLine(String lineData) { String[] arr = lineData。split(“,”); for (String str : arr) { if (StringUtils。isEmpty(str)) { continue; } countMap。computeIfAbsent(str, s -> new AtomicInteger(0))。getAndIncrement(); }}

透過比較找出年齡數最多的年齡並打印出來:

private static void findMostAge() { Integer targetValue = 0; String targetKey = null; Iterator> entrySetIterator = countMap。entrySet()。iterator(); while (entrySetIterator。hasNext()) { Map。Entry entry = entrySetIterator。next(); Integer value = entry。getValue()。get(); String key = entry。getKey(); if (value > targetValue) { targetValue = value; targetKey = key; } } System。out。println(“數量最多的年齡為:” + targetKey + “數量為:” + targetValue);}

測試結果

總共花了 3 分鐘讀取完並統計完所有資料。

如何用 Java 幾分鐘處理完 30 億個資料?

記憶體消耗為 2G-2。5G,CPU 利用率太低,只向上浮動了 20%-25% 之間。

如何用 Java 幾分鐘處理完 30 億個資料?

要想提高 CPU 利用率,那麼可以使用多執行緒去處理。

下面我們使用多執行緒去解決這個 CPU 利用率低的問題。

5.2 思路二:分治法

使用多執行緒去消費讀取到的資料。採用生產者、消費者模式去消費資料。

因為在讀取的時候是比較快的,單執行緒的資料處理能力比較差。因此思路一的效能阻塞在取資料的一方且又是同步操作,導致整個鏈路的效能會變的很差。

所謂分治法就是分而治之,也就是說將海量資料分割處理。 根據 CPU 的能力初始化 n 個執行緒,每一個執行緒去消費一個佇列,這樣執行緒在消費的時候不會出現搶佔佇列的問題。同時為了保證執行緒安全和生產者消費者模式的完整,採用阻塞佇列。Java 中提供了 LinkedBlockingQueue 就是一個阻塞佇列。

如何用 Java 幾分鐘處理完 30 億個資料?

初始化阻塞佇列

使用 LinkedList 建立一個阻塞佇列列表:

private static List> blockQueueLists = new LinkedList<>();

在 static 塊裡初始化阻塞佇列的數量和單個阻塞佇列的容量為 256。

上面講到了 30 億資料大概 2500 行,按行塞到佇列裡。20 個佇列,那麼每個佇列 125 個,因此可以容量可以設計為 256 即可。

//每個佇列容量為256for (int i = 0; i < threadNums; i++) { blockQueueLists。add(new LinkedBlockingQueue<>(256));}

生產者

為了實現負載的功能,首先定義一個 count 計數器,用來記錄行數:

private static AtomicLong count = new AtomicLong(0);

按照行數來計算佇列的下標

long index=count。get()%threadNums

下面演算法就實現了對佇列列表中的佇列進行輪詢的投放:

static class SplitData { public static void splitLine(String lineData) { String[] arr = lineData。split(“\n”); for (String str : arr) { if (StringUtils。isEmpty(str)) { continue; } long index = count。get() % threadNums; try { // 如果滿了就阻塞 blockQueueLists。get((int) index)。put(str); } catch (InterruptedException e) { e。printStackTrace(); } count。getAndIncrement(); } }

消費者

1) 佇列執行緒私有化

消費方在啟動執行緒的時候根據 index 去獲取到指定的佇列,這樣就實現了佇列的執行緒私有化。

private static void startConsumer() throws FileNotFoundException, UnsupportedEncodingException { //如果共用一個佇列,那麼執行緒不宜過多,容易出現搶佔現象 System。out。println(“開始消費。。。”); for (int i = 0; i < threadNums; i++) { final int index = i; // 每一個執行緒負責一個 queue,這樣不會出現執行緒搶佔佇列的情況。 new Thread(() -> { while (consumerRunning) { startConsumer = true; try { String str = blockQueueLists。get(index)。take(); countNum(str); } catch (InterruptedException e) { e。printStackTrace(); } } })。start(); }}

2) 多子執行緒分割字串

由於從佇列中多到的字串非常的龐大,如果又是用單執行緒呼叫 split(“,”) 去分割,那麼效能同樣會阻塞在這個地方。

// 按照 arr的大小,運用多執行緒分割字串private static void countNum(String str) { int[] arr = new int[2]; arr[1] = str。length() / 3; for (int i = 0; i < 3; i++) { final String innerStr = SplitData。splitStr(str, arr); new Thread(() -> { String[] strArray = innerStr。split(“,”); for (String s : strArray) { countMap。computeIfAbsent(s, s1 -> new AtomicInteger(0))。getAndIncrement(); } })。start(); }}

3) 分割字串演算法

分割時從 0 開始,按照等分的原則,將字串 n 等份,每一個執行緒分到一份。

用一個 arr 陣列的 arr[0] 記錄每次的分割開始位置。arr[1] 記錄每次分割的結束位置,如果遇到的開始的字元不為 “,” 那麼就 startIndex-1。如果結束的位置不為 “,” 那麼將 endIndex 向後移一位。

如果 endIndex 超過了字串的最大長度,那麼就把最後一個字元賦值給 arr[1]。

/** * 按照 x座標 來分割 字串,如果切到的字元不為“,”, 那麼把座標向前或者向後移動一位。 * * @param line * @param arr 存放x1,x2座標 * @return */public static String splitStr(String line, int[] arr) { int startIndex = arr[0]; int endIndex = arr[1]; char start = line。charAt(startIndex); char end = line。charAt(endIndex); if ((startIndex == 0 || start == ‘,’) && end == ‘,’) { arr[0] = endIndex + 1; arr[1] = arr[0] + line。length() / 3; if (arr[1] >= line。length()) { arr[1] = line。length() - 1; } return line。substring(startIndex, endIndex); } if (startIndex != 0 && start != ‘,’) { startIndex = startIndex - 1; } if (end != ‘,’) { endIndex = endIndex + 1; } arr[0] = startIndex; arr[1] = endIndex; if (arr[1] >= line。length()) { arr[1] = line。length() - 1; } return splitStr(line, arr);}

測試結果

記憶體和 CPU 初始佔用大小:

如何用 Java 幾分鐘處理完 30 億個資料?

啟動後,執行時記憶體穩定在 11。7G,CPU 穩定利用在 90% 以上。

如何用 Java 幾分鐘處理完 30 億個資料?

總耗時由 180 秒縮減到 103 秒,效率提升 75%,得到的結果也與單執行緒處理的一致。

如何用 Java 幾分鐘處理完 30 億個資料?

6。 遇到的問題

如果在運行了的時候,發現 GC 突然罷工不工作了,有可能是 JVM 的堆中存在的垃圾太多,沒回收導致記憶體的突增。

如何用 Java 幾分鐘處理完 30 億個資料?

解決方法

在讀取一定數量後,可以讓主執行緒暫停幾秒,手動呼叫 GC。

相關文章

頂部