Fork/Join框架是Java7引入的一种用于并行计算的框架,它特别适用于那些可以递归地分解成更小任务的问题,这些任务可以独立处理。该框架基于分治原则,非常适合排序、搜索以及其他递归算法的任务。
ForkJoinTask是可以在Fork/Join框架内运行的任务的基类。它提供了核心操作,允许任务分叉新的子任务,并在它们完成后合并。
import java.util.concurrent.RecursiveTask;
public class SumTask extends RecursiveTask {
private final int[] arr;
private final int start, end;
public SumTask(int[] arr, int start, int end) {
this.arr = arr;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= 10) {
int sum = 0;
for (int i = start; i < end; i++) {
sum += arr[i];
}
return sum;
} else {
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(arr, start, mid);
SumTask rightTask = new SumTask(arr, mid, end);
leftTask.fork();
int rightResult = rightTask.compute();
int leftResult = leftTask.join();
return leftResult + rightResult;
}
}
}
ForkJoinPool是管理一组工作线程池的中心类,用于执行ForkJoinTask实例。它使用工作窃取算法,通过从繁忙的线程重新分配任务到空闲线程,保持所有线程忙碌。
import java.util.concurrent.ForkJoinPool;
public class ForkJoinExample {
public static void main(String[] args) {
int[] arr = new int[100];
for (int i = 0; i < arr.length; i++) {
arr[i] = i + 1;
}
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(arr, 0, arr.length);
int result = pool.invoke(task);
System.out.println("Sum: " + result);
}
}
当任务需要返回结果时,使用RecursiveTask;当不需要返回结果时,使用RecursiveAction。
import java.util.concurrent.RecursiveAction;
public class PrintTask extends RecursiveAction {
private final int[] arr;
private final int start, end;
public PrintTask(int[] arr, int start, int end) {
this.arr = arr;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start <= 10) {
for (int i = start; i < end; i++) {
System.out.print(arr[i] + "\n");
}
System.out.println();
} else {
int mid = (start + end) / 2;
PrintTask leftTask = new PrintTask(arr, start, mid);
PrintTask rightTask = new PrintTask(arr, mid, end);
invokeAll(leftTask, rightTask);
}
}
}
运行ForkJoinExample将输出数组元素的总和。Fork/Join框架将任务分解成更小的块,并并行处理它们,显示出显著的性能提升,尤其是在处理大数据集时。例如,计算1到100的数字总和:
Sum: 5050
对于PrintTask,框架将数组打印任务分解,同时执行并行输出各个部分:
1
2
3
...
20
...
效率:利用所有可用的CPU核心,从而加快任务执行速度。
可扩展性:通过将大数据集分解成更小、更易管理的任务来处理。
工作窃取:通过将任务从过载线程重新分配到空闲线程,保持所有线程忙碌。
复杂性:需要仔细设计和理解并行性,这可能会增加代码复杂性。
开销:分叉和合并任务有固有的开销,对于较小的任务可能并不划算。
调试:由于线程执行的非确定性,调试并行任务可能具有挑战性。
大型递归问题:当有自然分解成更小子任务的任务时,例如排序、搜索和矩阵乘法。