“分而治之”是一個有效的處理大數據的方法,著名的MapReduce就是采用這種分而治之的思路. 簡單點說,如果要處理的1000個數據,但是我們不具備處理1000個數據的能力,可以只處理10個數據, 可以把這1000個數據分階段處理100次,每次處理10個,把100次的處理結果進行合成,形成最后這1000個數據的處理結果。
把一個大任務調用fork()方法分解為若干小的任務,把小任務的處理結果進行join()合并為大任務的結果。
系統對ForkJoinPool線程池進行了優化,提交的任務數量與線程的數量不一定是一對一關系.在多數情況下,一個物理線程實際上需要處理多個邏輯任務。
ForkJoinPool線程池中最常用的方法是:
ForkJoinTask submit(ForkJoinTask task) 向線程池提交一個ForkJoinTask任務. ForkJoinTask任務支持fork()分解與join()等待的任務. ForkJoinTask有兩個重要的子類:RecursiveAction和 RecursiveTask ,它們的區別在于RecursiveAction任務沒有返回值, RecursiveTask 任務可以帶有返回值。
package com.wkcto.threadpool;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* 演示ForkJoinPool線程池的使用
* 使用該線程池模擬數列求和
*/
public class Test09 {
//計算數列的和, 需要返回結果,可以定義任務繼承RecursiveTask
private static class CountTask extends RecursiveTask<Long>{
private static final int THRESHOLD = 10000; //定義數據規模的閾值,允許計算10000個數內的和,超過該閾值的數列就需要分解
private static final int TASKNUM = 100; //定義每次把大任務分解為100個小任務
private long start; //計算數列的起始值
private long end; //計算數列的結束值
public CountTask(long start, long end) {
this.start = start;
this.end = end;
}
//重寫RecursiveTask類的compute()方法,計算數列的結果
@Override
protected Long compute() {
long sum = 0 ; //保存計算的結果
//判斷任務是否需要繼續分解,如果當前數列end與start范圍的數超過閾值THRESHOLD,就需要繼續分解
if ( end - start < THRESHOLD){
//小于閾值可以直接計算
for (long i = start ; i <= end; i++){
sum += i;
}
}else { //數列范圍超過閾值,需要繼續分解
//約定每次分解成100個小任務,計算每個任務的計算量
long step = (start + end ) / TASKNUM;
//start = 0 , end = 200000, step = 2000, 如果計算[0,200000]范圍內數列的和, 把該范圍的數列分解為100個小任務,每個任務計算2000個數即可
//注意,如果任務劃分的層次很深,即THRESHOLD閾值太小,每個任務的計算量很小,層次劃分就會很深,可能出現兩種情況:一是系統內的線程數量會越積越多,導致性能下降嚴重; 二是分解次數過多,方法調用過多可能會導致棧溢出
//創建一個存儲任務的集合
ArrayList<CountTask> subTaskList = new ArrayList<>();
long pos = start; //每個任務的起始位置
for (int i = 0; i < TASKNUM; i++) {
long lastOne = pos + step; //每個任務的結束位置
//調整最后一個任務的結束位置
if ( lastOne > end ){
lastOne = end;
}
//創建子任務
CountTask task = new CountTask(pos, lastOne);
//把任務添加到集合中
subTaskList.add(task);
//調用for()提交子任務
task.fork();
//調整下個任務的起始位置
pos += step + 1;
}
//等待所有的子任務結束后,合并計算結果
for (CountTask task : subTaskList) {
sum += task.join(); //join()會一直等待子任務執行完畢返回執行結果
}
}
return sum;
}
}
public static void main(String[] args) {
//創建ForkJoinPool線程池
ForkJoinPool forkJoinPool = new ForkJoinPool();
//創建一個大的任務
CountTask task = new CountTask(0L, 200000L);
//把大任務提交給線程池
ForkJoinTask<Long> result = forkJoinPool.submit(task);
try {
Long res = result.get(); //調用任務的get()方法返回結果
System.out.println("計算數列結果為:" + res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
//驗證
long s = 0L;
for (long i = 0; i <= 200000 ; i++) {
s += i;
}
System.out.println(s);
}
}