限制Java线程池运行线程以及等待线程数量的策略

对于java.util.concurrent.Executors所提供的FixedThreadPool,可以保证可以在内存中有固定数量的线程数运行。但是由于FixedThreadPool绑定的是LinkedBlockingQueue。队列的上限没有限制(默认上限为Integer.MAX_VALUE),不断的提交新的线程,会造成任务在内存中长时间的堆积。

我们有可能面临如下的场景,主线程不断地提交任务线程,希望有固定数量的在线程中运行,也不想造成线程在内存中大量的等待堆积。由此需要我们自己定义一个线程池策略。ThreadPoolExecutor为我们线程池的设置提供了很大的灵活性。

首先看FixedThreadPool的实现:

1
2
3
4
5
6
public static ExecutorService More ...newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

可以看到,FixedThreadPool绑定的是LinkedBlockingQueue<Runnable>。我们需要做的第一个改造就是绑定有大小上线的BlockingQueue,在我的实现中绑定ArrayBlockingQueue<Runnable>并设置了size。

第二个是采用CallerRunsPolicy。ThreadPoolExecutor可以定义不同的任务拒绝策略。CallerRunsPolicy指的是当线程池拒绝该任务的时候,线程在本地线程直接execute。这样就限制了本地线程的循环提交流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<Runnable>(10);
RejectedExecutionHandler rejectedExecutionHandler =
new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService threadPool = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,
workingQueue, rejectedExecutionHandler);
for (int i = 0; i < 100; i++) {
threadPool.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
System.out.println("thread " + String.valueOf(threadNo) + " is called");
Thread.sleep(10000);
System.out.println("thread " + String.valueOf(threadNo) + " is awake");
throw new Exception();
}
});
}

代码中定义了大小为10的线程池,for循环提交了20个线程的时候,10个执行线程,10个线程放入了workingQueue。当提交到第21个线程的时候,会触发RejectedExecutionHandler。在这里我们配置了CallerRunsPolicy策略。所以会在主线程直接执行该线程。也就是说,在本程序中最多会有11个线程在执行,10个线程在等待。由此限制了线程池的等待线程数与执行线程数

一个java web程序员,希望自己两年之内能成为data scientist,正在找工作