面试官询问如何选择线程池的拒绝策略以确保任务不被丢弃的深入分析与解答

这个面试问题很有趣,虽然答案看似简单,但我们可以进一步探讨CallerRunsPolicy拒绝策略的潜在风险以及相应的解决方案。

下面是详细内容。

线程池拒绝策略的种类

当同时运行的线程数量达到最大限制时,并且队列已满,ThreadPoolExecutor提供了一些拒绝策略:

  • AbortPolicy:会抛出RejectedExecutionException,拒绝处理新的任务。
  • CallerRunsPolicy:允许调用线程直接运行被拒绝的任务。也就是说,调用execute方法的线程会执行被拒绝的任务。如果执行器已关闭,则该任务会被丢弃。这种策略可能会降低新任务的提交速度,从而影响程序的整体性能。如果应用程序能够承受这种延迟,并且希望每个任务都被执行,您可以选择此策略。
  • DiscardPolicy:直接丢弃新提交的任务,不做处理。
  • DiscardOldestPolicy:会丢弃最早的未处理任务请求。

例如,当使用ThreadPoolTaskExecutor或直接通过ThreadPoolExecutor构造线程池时,如果不指定RejectedExecutionHandler拒绝策略,默认使用AbortPolicy。在这种情况下,如队列已满,ThreadPoolExecutor将抛出RejectedExecutionException,导致任务被丢弃。为了避免这种情况,您可以选择使用CallerRunsPolicy。与其他策略不同,CallerRunsPolicy不会丢弃任务或抛出异常,而是将任务回退给调用者,由调用线程来执行这些任务。

public static class CallerRunsPolicy implements RejectedExecutionHandler {  
    public CallerRunsPolicy() { }  
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
        if (!e.isShutdown()) {  
            // 直接在主线程中执行,而不是在线程池中执行  
            r.run();  
        }  
    }  
}  

不允许丢弃任务的情况下,应选择哪个拒绝策略?

根据上述线程池拒绝策略的介绍,显而易见答案是:CallerRunsPolicy

结合CallerRunsPolicy的源代码,我们可以看到:

public static class CallerRunsPolicy implements RejectedExecutionHandler {  
    public CallerRunsPolicy() { }  
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {  
        // 只要程序未关闭,使用调用execute方法的线程执行该任务  
        if (!e.isShutdown()) {  
            r.run();  
        }  
    }  
}  

这段代码表明,只要程序未关闭,就会使用调用者的线程来执行被拒绝的任务。

CallerRunsPolicy拒绝策略的风险和解决方案

如前所述,选择CallerRunsPolicy拒绝策略适合需要确保所有任务请求都能被执行的情况。

然而,如果被拒绝的任务是一个耗时操作,而提交任务的线程是主线程,这可能会导致主线程被阻塞,从而影响程序的正常运行。

举个例子,假设线程池最大线程数为2,阻塞队列大小为1(这意味着第四个任务会触发拒绝策略),代码如下:

Logger log = LoggerFactory.getLogger(ThreadPoolTest.class);  
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,  
        2,  
        60,  
        TimeUnit.SECONDS,  
        new ArrayBlockingQueue<>(1),  
        new ThreadPoolExecutor.CallerRunsPolicy());  

// 提交第一个任务,由核心线程执行  
threadPoolExecutor.execute(() -> {  
    log.info("核心线程执行第一个任务");  
    ThreadUtil.sleep(1, TimeUnit.MINUTES);  
});  

// 提交第二个任务,由于核心线程被占用,任务将进入队列等待  
threadPoolExecutor.execute(() -> {  
    log.info("非核心线程处理入队的第二个任务");  
    ThreadUtil.sleep(1, TimeUnit.MINUTES);  
});  

// 提交第三个任务,由于核心线程被占用且队列已满,创建非核心线程处理  
threadPoolExecutor.execute(() -> {  
    log.info("非核心线程处理第三个任务");  
    ThreadUtil.sleep(1, TimeUnit.MINUTES);  
});  

// 提交第四个任务,由于核心线程和非核心线程都被占用,队列也满了,根据CallerRunsPolicy策略,任务将由主线程来执行  
threadPoolExecutor.execute(() -> {  
    log.info("主线程处理第四个任务");  
    ThreadUtil.sleep(2, TimeUnit.MINUTES);  
});  

// 提交第五个任务,主线程被第四个任务卡住,该任务必须等到主线程执行完才能提交  
threadPoolExecutor.execute(() -> {  
    log.info("核心线程执行第五个任务");  
});  

输出结果如下:

18:19:48.203 INFO  [pool-1-thread-1] c.j.concurrent.ThreadPoolTest - 核心线程执行第一个任务  
18:19:48.203 INFO  [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 非核心线程处理第三个任务  
18:19:48.203 INFO  [main] c.j.concurrent.ThreadPoolTest - 主线程处理第四个任务  
18:20:48.212 INFO  [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 非核心线程处理入队的第二个任务  
18:21:48.219 INFO  [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 核心线程执行第五个任务  

从输出结果可见,由于CallerRunsPolicy的拒绝策略,耗时任务在主线程中执行,导致线程池阻塞,从而阻碍后续任务的及时执行,严重时可能会引发内存溢出(OOM)。

为了从根本上解决这个问题,调用者希望通过CallerRunsPolicy确保所有任务都能被执行,但当内存足够时,可以考虑增加阻塞队列的大小并调整堆内存,以容纳更多任务,保证任务的有效执行。

为充分利用CPU资源,还可以调整线程池的maximumPoolSize(最大线程数)参数,以提升任务处理速度,避免在BlockingQueue中积压过多任务导致内存耗尽。

图片

调整阻塞队列大小和最大线程数

当服务器资源达到上限时,就需要重新考虑线程池的调度策略。阻塞主线程的原因是我们希望确保每个任务都不被丢弃。那么,有没有一种方法既能保证任务不被丢弃,同时在服务器有余力时及时处理呢?

一种建议是任务持久化,可以通过以下方式实现:

  1. 设计一个任务表,将任务存储到MySQL数据库中。
  2. 使用Redis缓存任务。
  3. 将任务提交到消息队列中。

以方案一为例,简单介绍实现逻辑:

  1. 实现RejectedExecutionHandler接口,定义自定义拒绝策略,该策略负责将线程池暂时无法处理的任务存储在MySQL中。需要注意的是,任务会先进入阻塞队列,只有在队列满时才会触发拒绝策略。
  2. 继承BlockingQueue,实现一个混合式阻塞队列,该队列包含JDK自带的ArrayBlockingQueue,并重写take()方法,优先从数据库读取最早的任务,数据库中无任务时再从ArrayBlockingQueue中取任务。

图片

将部分任务保存到MySQL中

整个实现逻辑相对简单,核心在于自定义拒绝策略和阻塞队列。当线程池满载时,可以通过拒绝策略将新任务持久化到MySQL中,待线程池有余力时,优先处理数据库中的任务,避免出现“饥饿”问题。

当然,我们也可以借鉴其他主流框架的做法。例如,Netty的拒绝策略是创建一个独立于线程池的线程来处理这些任务,这种做法需要良好的硬件设备,并且临时创建的线程难以进行精确监控:

private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {  
    NewThreadRunsPolicy() {  
        super();  
    }  
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
        try {  
            // 创建一个临时线程处理任务  
            final Thread t = new Thread(r, "Temporary task executor");  
            t.start();  
        } catch (Throwable e) {  
            throw new RejectedExecutionException(  
                    "Failed to start a new thread", e);  
        }  
    }  
}  

ActiveMQ则尝试在指定时限内将任务尽可能地入队,以确保最大化交付:

new RejectedExecutionHandler() {  
    @Override  
    public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {  
        try {  
            // 限时阻塞等待,尽可能交付  
            executor.getQueue().offer(r, 60, TimeUnit.SECONDS);  
        } catch (InterruptedException e) {  
            throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");  
        }  
        throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");  
    }  
};