Dubbo线程池策略解析:AbortPolicyWithReport与ThreadPoolExecutor

Dubbo线程池策略解析:AbortPolicyWithReport与ThreadPoolExecutor
1. LV3296与PIC18F4682的硬件架构解析LV3296是一款高性能的条形码扫描模块采用CMOS图像传感器技术能够快速识别一维和二维条形码。其核心优势在于内置DSP处理器实现实时解码支持自动曝光和增益控制工作电压范围宽3.3V-5V提供UART和USB双接口输出PIC18F4682是Microchip公司生产的8位微控制器特别适合作为LV3296的主控芯片64KB Flash程序存储器3968字节RAM内置EUSART模块增强型通用同步异步收发器支持USB 1.1协议工作频率最高40MHz提示在实际项目中建议为PIC18F4682配置8MHz外部晶体振荡器配合内部PLL倍频到32MHz工作这样既能保证USB通信的时钟精度又能提供足够的处理能力。2. 系统连接方案设计# 1. 概述本文分享Dubbo 的线程池策略。在 《精尽 Dubbo 源码分析 —— 线程池》 一文中我们已经分享了四种线程池策略fixed固定大小线程池启动时建立线程不关闭一直持有。cached缓存线程池空闲一分钟自动删除需要时重建。limited可伸缩线程池但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。eager优先创建Worker线程池。在任务数量大于corePoolSize但是小于maximumPoolSize时优先创建Worker来处理任务。当任务数量大于maximumPoolSize时将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比于cached:cached在任务数量超过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)本文分享的线程池策略是基于Dubbo SPI实现的所以可以和上述四种策略进行组合使用。目前 Dubbo 提供了两种实现AbortPolicyWithReport打印JVM Dubbo 线程池以及用户线程堆栈信息方便定位问题。并且该策略在系统资源紧张时会抛出RejectedExecutionException 异常拒绝新提交的任务。ThreadPoolExecutor默认的线程池策略创建线程池时使用的就是这个策略。2. ThreadPoolcom.alibaba.dubbo.common.threadpool.ThreadPool线程池接口。代码如下SPI(fixed) public interface ThreadPool { /** * 线程池 * * param url 线程参数 * return 线程池 */ Adaptive({Constants.THREADPOOL_KEY}) Executor getExecutor(URL url); }SPI(fixed)注解Dubbo SPI拓展点默认为fixed。Adaptive({Constants.THREADPOOL_KEY})注解基于 Dubbo SPI Adaptive 机制加载对应的线程池实现使用URL.threadpool属性。#getExecutor(URL url)方法获得对应的线程池的执行器。3. AbortPolicyWithReportcom.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport实现java.util.concurrent.RejectedExecutionHandler接口拒绝策略实现类。打印JVM Dubbo 线程池以及用户线程堆栈信息方便定位问题。并且该策略在系统资源紧张时会抛出RejectedExecutionException 异常拒绝新提交的任务。3.1 属性/** * 线程名 */ protected static final Logger logger LoggerFactory.getLogger(AbortPolicyWithReport.class); /** * 线程名 */ private final String threadName; /** * URL 对象 */ private final URL url; /** * 最后打印时间 */ private static volatile long lastPrintTime 0; /** * 信号量防止打印日志时堆积过多导致 OOM 。 */ private static Semaphore guard new Semaphore(1); /** * 已注册的线程池集合 * * key线程池名称 */ private static final ConcurrentMapString, AtomicInteger threadCounters new ConcurrentHashMapString, AtomicInteger(); public AbortPolicyWithReport(String threadName, URL url) { this.threadName threadName; this.url url; }threadName属性线程名。因为一个线程池对应一个拒绝策略所以需要记录下线程名。url属性URL 对象。里面可能包含拒绝时打印 JVM 等信息的一些配置。lastPrintTime静态属性最后打印时间。该属性用于实现一段时间内有且仅打印一次 JVM 等信息。因为在系统崩溃的时候一定会有大量的拒绝策略此时不能一直打印会导致系统更加崩溃。guard静态属性信号量防止打印日志时堆积过多导致 OOM 。threadCounters静态属性已注册的线程池集合。其中Key 为线程名Value 为拒绝次数。3.2 rejectedExecution#rejectedExecution(Runnable r, ThreadPoolExecutor e)实现方法代码如下Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 打印告警日志 String msg String.format(Thread pool is EXHAUSTED! Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d), Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!, threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), url.getProtocol(), url.getIp(), url.getPort()); logger.warn(msg); // 打印 JVM Dubbo 线程池以及用户线程堆栈信息 dumpJStack(); // 抛出 RejectedExecutionException 异常 throw new RejectedExecutionException(msg); }打印告警日志包括线程池状态以及 Dubbo URL 信息。调用#dumpJStack()方法打印 JVM Dubbo 线程池以及用户线程堆栈信息。抛出 RejectedExecutionException 异常。3.3 dumpJStack#dumpJStack()方法打印 JVM Dubbo 线程池以及用户线程堆栈信息。代码如下/** * 打印 JVM Dubbo 线程池以及用户线程堆栈信息。 */ private void dumpJStack() { // 获得当前时间 long now System.currentTimeMillis(); // 每 10 分钟打印一次 // 若获得信号量则进行打印 if (now - lastPrintTime 10 * 60 * 1000 // 十分钟内 || !guard.tryAcquire()) { // 尝试获取信号量 return; } // 创建线程池后台执行打印任务 ExecutorService pool Executors.newSingleThreadExecutor(); pool.execute(new Runnable() { Override public void run() { // 获得系统 String name url.getParameter(Constants.APPLICATION_KEY, dubbo); // 获得路径 String dumpPath url.getParameter(dump.directory); // 未设置 dump.directory 则使用 user.home if (StringUtils.isEmpty(dumpPath)) { dumpPath System.getProperty(user.home); } // 创建文件 SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd_HH:mm:ss); String dateStr sdf.format(new Date()); // 获得输出流 FileOutputStream jstackStream null; try { jstackStream new FileOutputStream(new File(dumpPath, Dubbo_JStack.log . dateStr)); // 打印 JStack JVMUtil.jstack(jstackStream); } catch (Throwable t) { logger.error(dump jstack error, t); } finally { // 释放信号量 guard.release(); // 释放输出流 if (jstackStream ! null) { try { jstackStream.flush(); jstackStream.close(); } catch (IOException e) { } } } // 拼接内容 StringBuilder msg new StringBuilder(); msg.append(Thread dump is wrote into: ).append(dumpPath); // 打印线程池状态 MapThread, StackTraceElement[] allThreads Thread.getAllStackTraces(); msg.append(, there are ).append(allThreads.size()).append( threads in total.); // 写入线程池状态 FileOutputStream threadDumpStream null; try { threadDumpStream new FileOutputStream(new File(dumpPath, Dubbo_ThreadDump.log . dateStr)); threadDumpStream.write(msg.toString().getBytes()); } catch (Throwable t) { logger.error(dump thread dump error, t); } finally { if (threadDumpStream ! null) { try { threadDumpStream.flush(); threadDumpStream.close(); } catch (IOException e) { } } } // 更新最后打印时间 lastPrintTime System.currentTimeMillis(); } }); // 关闭线程池 pool.shutdown(); }每 10 分钟打印一次。并且通过guard信号量保证有且仅有一个线程执行打印任务。创建线程池后台执行打印任务。因为打印 JVM 堆栈和线程池状态是比较慢的如果放在rejectedExecution方法中执行可能影响正常的业务逻辑执行。调用JVMUtil#jstack(OutputStream)方法打印 JVM 堆栈。代码如下public static void jstack(OutputStream stream) throws IOException { // 获得所有线程 MapThread, StackTraceElement[] allThreads Thread.getAllStackTraces(); // 打印到 stream for (Map.EntryThread, StackTraceElement[] entry : allThreads.entrySet()) { // 打印线程 Thread thread entry.getKey(); // 打印线程状态 stream.write(getThreadDumpString(thread, entry.getValue()).getBytes()); } } private static String getThreadDumpString(Thread thread, StackTraceElement[] stackTraceElements) { StringBuilder sb new StringBuilder(\ thread.getName() \ Id thread.getId() thread.getState()); if (thread.isDaemon()) { sb.append( daemon); } sb.append( prio).append(thread.getPriority()).append(\n); // 打印线程堆栈 for (StackTraceElement stackTraceElement : stackTraceElements) { sb.append(\tat ).append(stackTraceElement.toString()).append(\n); } sb.append(\n); return sb.toString(); }x打印 Dubbo 线程池状态到Dubbo_ThreadDump.log文件。4. ThreadPoolExecutorjava.util.concurrent.ThreadPoolExecutor实现java.util.concurrent.RejectedExecutionHandler接口默认的线程池策略创建线程池时使用的就是这个策略。代码如下public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException(...); }直接抛出 RejectedExecutionException 异常。5. 创建线程池在com.alibaba.dubbo.common.threadpool.support包下ThreadPool的实现类创建线程池时使用AbortPolicyWithReport作为拒绝策略。例如FixedThreadPool 的代码实现如下public class FixedThreadPool implements ThreadPool { Override public Executor getExecutor(URL url) { // 线程名 String name url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); // 线程数 int threads url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); // 队列数 int queues url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); // 创建执行器 return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues 0 ? new SynchronousQueueRunnable() : (queues 0 ? new LinkedBlockingQueueRunnable() : new LinkedBlockingQueueRunnable(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }在new AbortPolicyWithReport(name, url)代码段我们可以看到创建了 AbortPolicyWithReport 对象。