Java 线程池拒绝策略(自定义优雅降级)
线程池拒绝策略简介
线程池拒绝策略(`RejectedExecutionHandler`)
是线程池的最后一道安全防线,
只有满足「线程池饱和」的核心条件时,
才会触发拒绝逻辑:
核心触发条件:
线程池中的线程数已达到最大线程数(`maximumPoolSize`) +
任务队列(`workQueue`)已经被任务堆满
此时线程池没有任何能力处理新提交的任务,
就会执行预设的拒绝策略。
线程池的任务执行优先级:核心线程执行 → 任务队列排队 → 扩容至最大线程执行 → 触发拒绝策略
JDK内置4种原生拒绝策略
JDK 在`ThreadPoolExecutor` 中内置了 4个静态内部类, 实现标准的拒绝策略,直接可用, 也是自定义拒绝策略的基础, 所有策略都实现了`java.util.concurrent.RejectedExecutionHandler`接口, 核心方法只有一个: // 拒绝策略的核心方法:当任务被拒绝时,线程池会调用此方法 void rejectedExecution(Runnable r, ThreadPoolExecutor executor); // 参数说明:r = 被拒绝的任务;executor = 当前触发拒绝的线程池对象 ✔️ 1. AbortPolicy(默认策略)- 中止抛出异常 new ThreadPoolExecutor.AbortPolicy() - 核心逻辑:直接抛出 `RejectedExecutionException` 运行时异常,拒绝执行任务; - 特点:任务会丢失,异常会向上抛,可能导致业务流程中断; - 适用场景:开发/测试环境、对任务完整性要求极高,不允许任务丢失的场景; - 生产注意:线上慎用,默认策略会直接抛异常,极易引发服务报错。 ✔️ 2. CallerRunsPolicy - 调用者线程兜底执行 new ThreadPoolExecutor.CallerRunsPolicy() - 核心逻辑:不会拒绝任务、不会丢失任务,让「提交任务的线程」自己执行这个被拒绝的任务; - 特点: 1. 任务100%不丢失,是最安全的原生策略; 2. 天然实现流量削峰:提交任务的线程被占用,无法继续提交新任务,间接降低线程池的任务提交速率; 3. 性能损耗:调用线程(如主线程/业务线程)会被占用执行任务,可能导致该线程的主流程变慢; - 适用场景:生产环境首选的原生策略,绝大多数业务场景都适配,推荐优先使用。 ✔️ 3. DiscardPolicy - 静默丢弃任务 new ThreadPoolExecutor.DiscardPolicy() - 核心逻辑:静默丢弃被拒绝的任务,不抛异常、不做任何处理; - 特点:任务丢失无感知,代码无报错,问题隐蔽性极强; - 适用场景:仅适用于「任务丢失无影响」的场景(如非核心的日志打印、数据上报、统计埋点等)。 ✔️ 4. DiscardOldestPolicy - 丢弃队列最老任务+重试提交 new ThreadPoolExecutor.DiscardOldestPolicy() - 核心逻辑:丢弃任务队列中排队最久的第一个任务,然后尝试将当前被拒绝的任务重新提交到线程池; - 特点:任务会丢失(丢失的是旧任务),可能导致业务时序错乱(旧任务被丢弃,新任务执行); - 适用场景:任务无严格时序要求、新任务比旧任务优先级高的场景(如实时数据计算、消息推送)。
原生拒绝策略的缺点
1. AbortPolicy:抛异常导致业务熔断,不够优雅; 2. DiscardPolicy/DiscardOldestPolicy:任务丢失,无降级补偿; 3. CallerRunsPolicy:仅能兜底执行,无法针对不同业务做差异化处理; 4. 所有原生策略:无日志、无告警、无降级兜底逻辑,出现任务拒绝时无法感知,也无法做补救。
生产级的优雅拒绝
生产级线程池拒绝需满足以下特性
1.不丢失核心任务:
核心业务任务必须保证执行/兜底,
非核心任务可选择性丢弃;
2.详细日志记录:
记录被拒绝的任务信息、线程池状态,便于问题排查;
3.告警预警:
触发拒绝时,输出ERROR日志/对接告警系统,
及时感知线程池饱和;
4.差异化降级:
核心任务兜底执行,
非核心任务优雅丢弃/异步补偿;
5.线程池保护:
拒绝逻辑不能阻塞线程池本身,
不能引发新的异常。
基础自定义拒绝策略(匿名内部类,轻量简洁,推荐中小型项目)
适合快速实现轻量的自定义逻辑,
无额外类文件,
代码侵入性低,核心实现「日志告警+差异化处理」,生产环境直接可用。
核心需求
- 触发拒绝时,打印ERROR级别的完整日志(线程池状态+任务信息);
- 核心任务:走 CallerRunsPolicy 兜底执行;
- 非核心任务:走 DiscardPolicy 静默丢弃,并记录丢弃日志。
完整代码示例
import java.util.concurrent.;
public class ThreadPoolCustomRejectDemo1 {
// 线程池核心参数
private static final int CORE_POOL_SIZE = 2;
private static final int MAX_POOL_SIZE = 3;
private static final long KEEP_ALIVE_TIME = 30L;
// 有界队列,必须用有界队列才会触发拒绝策略!
private static final BlockingQueue<Runnable> WORK_QUEUE = new ArrayBlockingQueue<>(2);
public static void main(String[] args) {
// 1. 创建线程池,使用【匿名内部类】实现自定义拒绝策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
WORK_QUEUE,
new ThreadFactory() {
int count = 1;
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("业务线程-" + count++);
return thread;
}
},
// 核心:自定义拒绝策略(匿名内部类)
(r, pool) -> {
System.err.println("======= 触发线程池拒绝策略 =======");
// 打印线程池饱和状态,便于排查问题
System.err.println("线程池状态:核心线程数=" + pool.getCorePoolSize()
+ ", 最大线程数=" + pool.getMaximumPoolSize()
+ ", 活跃线程数=" + pool.getActiveCount()
+ ", 队列任务数=" + pool.getQueue().size());
System.err.println("被拒绝的任务:" + r.toString());
// ========== 核心:差异化优雅降级 ==========
if (r instanceof CoreTask) {
// 核心任务:调用者线程兜底执行(同CallerRunsPolicy)
System.err.println("核心任务,兜底执行:" + r);
r.run();
} else {
// 非核心任务:优雅丢弃,记录日志即可
System.err.println("非核心任务,优雅丢弃:" + r);
}
});
// 2. 模拟提交任务(核心任务+非核心任务)
for (int i = 0; i < 10; i++) {
int taskId = i;
if (taskId % 3 == 0) {
// 提交核心任务
executor.submit(new CoreTask(taskId));
} else {
// 提交非核心任务
executor.submit(new NonCoreTask(taskId));
}
}
executor.shutdown();
}
// 核心任务
static class CoreTask implements Runnable {
private int taskId;
public CoreTask(int taskId) { this.taskId = taskId; }
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行核心任务:" + taskId);
try { Thread.sleep(200); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
@Override
public String toString() { return "CoreTask-" + taskId; }
}
// 非核心任务
static class NonCoreTask implements Runnable {
private int taskId;
public NonCoreTask(int taskId) { this.taskId = taskId; }
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 执行非核心任务:" + taskId);
try { Thread.sleep(200); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
@Override
public String toString() { return "NonCoreTask-" + taskId; }
}
}
生产级「优雅降级」拒绝策略(独立实现类,极致优雅,大厂标配)
实现的核心优雅能力(全部满足)
✅ 完整的日志告警:打印ERROR日志+线程池全量状态+任务详情,问题可追溯;
✅ 核心任务兜底:核心任务由调用线程执行,保证核心链路不中断;
✅ 非核心任务补偿:非核心任务可选择「丢弃+异步重试」或「持久化到MQ/本地队列」,后续补偿;
✅ 线程池安全保护:所有逻辑做异常捕获,避免拒绝策略本身抛异常导致线程池崩溃;
✅ 灵活扩展:支持自定义核心任务标识、自定义降级逻辑。
完整代码:生产级优雅降级拒绝策略(独立类)
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/
生产级线程池优雅降级拒绝策略(全局复用,可直接上线)
核心能力:核心任务兜底执行 + 非核心任务优雅丢弃/补偿 + 全量日志告警 + 线程池安全保护
/
public class GracefulDegradeRejectPolicy implements RejectedExecutionHandler {
/
核心拒绝逻辑
@param r 被拒绝的任务
@param executor 当前线程池对象
/
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 安全防护:空判断,避免NPE
if (r == null || executor.isShutdown()) {
return;
}
try {
// ========== 第一步:打印全量告警日志(ERROR级别,必须有) ==========
printRejectAlarmLog(r, executor);
// ========== 第二步:核心逻辑 - 优雅降级处理 ==========
if (isCoreTask(r)) {
// 策略1:核心任务 - 调用者线程兜底执行(同CallerRunsPolicy),核心链路不丢
System.err.println("[优雅降级] 核心任务,调用线程兜底执行:" + r.getClass().getSimpleName());
r.run();
} else {
// 策略2:非核心任务 - 优雅丢弃 + 可选补偿(重试/持久化)
System.err.println("[优雅降级] 非核心任务,优雅丢弃:" + r.getClass().getSimpleName());
// 可选扩展:非核心任务补偿逻辑(如 异步重试/写入MQ/写入本地队列)
compensateNonCoreTask(r);
}
} catch (Exception e) {
// 终极防护:捕获所有异常,避免拒绝策略本身抛异常,导致线程池崩溃
System.err.println("[线程池安全防护] 拒绝策略执行异常,任务最终丢弃,异常信息:" + e.getMessage());
}
}
/
打印拒绝告警日志(全量信息,问题可追溯)
/
private void printRejectAlarmLog(Runnable r, ThreadPoolExecutor executor) {
BlockingQueue<Runnable> queue = executor.getQueue();
StringBuilder alarmLog = new StringBuilder();
alarmLog.append("\n======= 【线程池饱和-触发优雅降级】 =======")
.append("\n任务详情:").append(r.toString())
.append("\n线程池状态:")
.append("核心线程数=").append(executor.getCorePoolSize())
.append(", 最大线程数=").append(executor.getMaximumPoolSize())
.append(", 当前活跃线程数=").append(executor.getActiveCount())
.append(", 当前总线程数=").append(executor.getPoolSize())
.append("\n队列状态:队列容量=").append(queue.remainingCapacity() + queue.size())
.append(", 队列任务数=").append(queue.size())
.append(", 队列剩余容量=").append(queue.remainingCapacity())
.append("\n任务统计:总提交任务数=").append(executor.getTaskCount())
.append(", 已完成任务数=").append(executor.getCompletedTaskCount())
.append("\n=========================================");
System.err.println(alarmLog);
}
/
自定义:判断是否为核心任务(业务层可根据需求重写该逻辑)
实现方式:1.任务实现标记接口 2.任务类名包含指定关键字 3.自定义注解
/
private boolean isCoreTask(Runnable r) {
// 示例:任务类名包含 "Core" 即为核心任务,可根据业务灵活修改
return r.getClass().getSimpleName().contains("Core");
}
/
非核心任务补偿逻辑(可选扩展,按需实现)
可选方案:1.异步重试提交 2.写入MQ队列后续消费 3.写入本地文件/内存队列 4.直接丢弃
/
private void compensateNonCoreTask(Runnable r) {
// 方案1:简单重试(最多重试1次,避免死循环)
try {
if (r != null) {
// 此处可结合线程池做异步重试,示例为简单演示
System.err.println("[非核心任务补偿] 尝试重试提交任务...");
}
} catch (Exception e) {
System.err.println("[非核心任务补偿] 重试失败,任务最终丢弃:" + e.getMessage());
}
// 方案2(推荐):写入MQ队列(如RabbitMQ/RocketMQ),后续消费补偿,任务不丢失
// MQProducer.send(r);
// 方案3:写入本地内存队列,定时任务兜底消费
// LocalQueueManager.addTask(r);
}
}
使用该优雅降级策略
创建线程池时,
直接传入自定义策略即可
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolGracefulDemo {
public static void main(String[] args) {
// 创建线程池,集成优雅降级拒绝策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 3, 30L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
new ThreadPoolExecutor.DefaultThreadFactory(),
new GracefulDegradeRejectPolicy() // 核心:使用优雅降级策略
);
// 提交核心/非核心任务,触发拒绝时自动执行优雅降级逻辑
for (int i = 0; i < 10; i++) {
if (i % 3 == 0) {
executor.submit(new CoreOrderTask(i)); // 核心订单任务
} else {
executor.submit(new NonCoreLogTask(i)); // 非核心日志任务
}
}
executor.shutdown();
}
// 核心任务:订单处理(类名含Core,会被兜底执行)
static class CoreOrderTask implements Runnable {
private int orderId;
public CoreOrderTask(int orderId) { this.orderId = orderId; }
@Override
public void run() { System.out.println("执行核心订单任务:" + orderId); }
@Override
public String toString() { return "CoreOrderTask-" + orderId; }
}
// 非核心任务:日志上报(类名不含Core,会被优雅丢弃)
static class NonCoreLogTask implements Runnable {
private int logId;
public NonCoreLogTask(int logId) { this.logId = logId; }
@Override
public void run() { System.out.println("执行非核心日志任务:" + logId); }
@Override
public String toString() { return "NonCoreLogTask-" + logId; }
}
}
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。


