JUC

JAVA

# 一、并发编程

juc是java中java.util.concurrent包的简称,它位于jdkrt.jar中,是jdk的核心工具包之一。

从字面上来理解就是java并发工具包。

其主要内容包含3个部分:atomic包、locks包以直接在该包下的直接接口和类。

主要功能如下:

  • atomic 支持原子操作类相关代码。
  • locksjava中锁相关代码。
  • 直接接口和类是其他并发容器相关代码。

# 并发编程三要素

  • 原子性:不可分割的操作,保证要么同时成功,要么同时失败;
  • 有序性:程序执行的顺序和代码的顺序保持一致;
  • 可用性:一个线程对共享变量的修改,另一个线程可见;

# CPU密集型(CPU-bound)

CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading 很高。

# IO密集型(I/O bound)

IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。

# 池化思想

Pooling 池化

池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。“池化”思想不仅仅能应用在计算机领域,在金融、设备、人员管理、工作管理等领域也有相关的应用。

在计算机领域中的表现为:统一管理IT资源,包括服务器、存储、和网络资源等等。通过共享资源,使用户在低投入中获益。除去线程池,还有其他比较典型的几种使用策略包括:

  1. 内存池(Memory Pooling):预先申请内存,提升申请内存速度,减少内存碎片。
  2. 连接池(Connection Pooling):预先申请数据库连接,提升申请连接的速度,降低系统的开销。
  3. 实例池(Object Pooling):循环使用对象,减少资源在初始化和释放时的昂贵损耗。

# Quartz

Quartz 是一个由 java 编写的任务调度库,由 OpenSymphony 组织开源出来。

在实际项目开发中使用 Quartz 的还是居多,比较推荐使用 Quartz。因为 Quartz 理论上能够同时对上万个任务进行调度,拥有丰富的功能特性,包括任务调度、任务持久化、可集群化、插件等等。

# 线程

Thread,程序中单个顺序的流控制是进程内部的一个执行单元。

创建线程的方式

  1. 继承Thread类,覆写run方法编写处理代码;
  2. 实现Runnable接口,覆写run方法;将该子类作为参数构造Thread对象,调用start方法。

# 进程

程序的一次执行过程,是系统运行程序的基本单元。每个独立执行的程序(正在执行的程序),都称为进程。

# ThreadLocal

ThreadLocal<String> localName = new ThreadLocal();
1

首先,它是一个数据结构,类似于HashMap。可以存key-value,ThreadLocal的作用主要是做数据隔离。填充的数据set方法只属于当前线程。变量的数据对其他线程是相对隔离的。

Q:ThreadLocal是如何防止变量被其他线程篡改?

W:查询set方法和get方法源码,每个线程中(Thread)都有一个ThreadLocalMap数据结构,当执行set方法,其值是保存在当前的线程的threadLocals变量中,当执行get方法也是从该变量取。

# 内存泄漏

memory leak 内存泄漏

指程序在申请内存后,无法释放已申请的内存空间。

# 内存溢出

指程序在申请内存时,没有足够的内存空间供其使用。

# 二、线程池

Thread Pool

线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。

也就是说,线程池是一种通过“池化”思想,帮助我们管理线程而获取并发性的工具,在Java中的体现是ThreadPoolExecutor类

为了复用线程,避免频繁的创建和销毁线程,浪费资源。

# 使用线程池的好处

《Java 并发编程的艺术》提到的使用线程池的好处:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

# 线程池解决的问题

线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:

  1. 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
  2. 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
  3. 系统无法合理管理内部的资源分布,会降低系统的稳定性。

为解决资源分配这个问题,线程池采用了池化思想(Pooling)。

# 线程池作用

1、控制线程,通过控制线程来控制最大并发数;

2、实现任务线程队列缓存策略和拒绝机制;

3、实现以时间相关的功能,如定时任务,周期执行;

4、隔离线程环境。

# 为什么自己创建线程池

线程池必须手动通过 ThreadPoolExecutor 的构造函数来声明,避免使用Executors 类的 newFixedThreadPoolnewCachedThreadPool ,因为可能会有 OOM 的风险。

说白了就是:使用有界队列,控制线程创建数量。

除了避免 OOM 的原因之外,不推荐使用 Executors提供的两种快捷的线程池的原因还有:

  1. 实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。
  2. 我们应该显示地给我们的线程池命名,这样有助于我们定位问题。

如果我们要手动创建一个线程池,必须要了解这个线程池都有哪些参数?这些参数都有什么作用。

# Executor框架

Executor框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题。

补充:this 逃逸是指在构造函数返回之前其他线程就持有该对象的引用. 调用尚未构造完全的对象的方法可能引发令人疑惑的错误。

Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。

# 三、ThreadPoolExecutor

Java中的线程池核心实现类是ThreadPoolExecutor,使用ThreadPoolExecutor创建线程池。

# ThreadPoolExecutor的UML类图

线程池实现类ThreadPoolExecutor是Executor框架最核心的类。

ThreadPoolExecutor的继承关系如下:

# 1、顶层接口Executor

顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。

# 2、ExecutorService接口

ExecutorService接口增加了一些能力:

(1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;

(2)提供了管控线程池的方法,比如停止线程池的运行。

# 3、AbstractExecutorService类

AbstractExecutorService则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。

# 4、实现类ThreadPoolExecutor

最下层的实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

# ThreadPoolExecutor构造方法

ThreadPoolExecutor 有如下4个构造方法:

参数最多的那个构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    //...
}
1
2
3
4
5
6
7
8
9

# 七大参数

  1. corePoolSize:核心线程数,线程池正常情况下保持的线程数。
  2. maximumPoolSize:最大线程数,当线程池繁忙时最多可以拥有的线程数。
  3. keepAliveTime:空闲线程存活时间,没有活儿之后“短工”可以生存的最大时间。
  4. TimeUnit:时间单位。
  5. BlockingQueue:线程池任务队列,用于保存线程池待执行任务的容器。
  6. ThreadFactory:创建线程的工厂,用于创建线程池中线程的工厂方法。
  7. RejectedExecutionHandler:拒绝策略,当任务量超过线程池可以保存的最大任务数时执行的策略。

# 1、corePoolSize

核心线程数:是指线程池中长期存活的线程数

  • 核心线程会一直存活,即使没有任务需要执行。
  • 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理。
  • 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭。

# 2、maximumPoolSize

最大线程数:线程池允许创建的最大线程数量。当线程池的任务队列满了之后,可以创建的最大线程数。

  • 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务;
  • 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常;

# 3、keepAliveTime

空闲线程存活时间。当线程池中没有任务时,会销毁一些线程,销毁的线程数=maximumPoolSize(最大线程数)-corePoolSize(核心线程数)。

  • 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize;
  • 如果allowCoreThreadTimeout=true,则会直到线程数量=0

# 4、TimeUnit

时间单位:空闲线程存活时间的描述单位,此参数是配合参数keepAliveTime使用的。

keepAliveTime是一个 long 类型的值,比如keepAliveTime传递的是 1,那么这个 1 表示的是 1 天?还是 1 小时?还是 1 秒钟?是由参数 TimeUnit说了算的。

TimeUnit 有以下 7 个值:

  1. TimeUnit.DAYS:天
  2. TimeUnit.HOURS:小时
  3. TimeUnit.MINUTES:分
  4. TimeUnit.SECONDS:秒
  5. TimeUnit.MILLISECONDS:毫秒
  6. TimeUnit.MICROSECONDS:微妙
  7. TimeUnit.NANOSECONDS:纳秒

# 5、lockingQueue

阻塞队列:线程池存放任务的队列,用来存储线程池的所有待执行任务。

  • 当核心线程数达到最大时,新任务会放在队列中排队等待执行。

关于任务队列,java中提供了四种方式:直接提交队列有界任务队列无界任务队列优先任务队列

它可以设置以下几个值:

  1. ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
  2. LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
  3. SynchronousQueue:一个不存储元素的阻塞队列,即直接提交给线程不保持它们。
  4. PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
  5. DelayQueue:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素。
  6. LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法。
  7. LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

比较常用的是LinkedBlockingQueue,线程池的排队策略和 BlockingQueue 息息相关。

队列详解

1.SynchronousQueue

使用SynchronousQueue队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略。因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略;

2.ArrayBlockingQueue

使用ArrayBlockingQueue有界任务队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于maximumPoolSize,则执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。

3.LinkedBlockingQueue

使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。

4.PriorityBlockingQueue

PriorityBlockingQueue它其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。

# 6、ThreadFactory

线程工厂:线程池创建线程时调用的工厂方法

通过此方法可以设置线程的优先级、线程命名规则以及线程类型(用户线程还是守护线程)等。

线程工厂的使用示例如下:

public static void main(String[] args) {
    // 创建线程工厂
    ThreadFactory threadFactory = new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            // 创建线程池中的线程
            Thread thread = new Thread(r);
            // 设置线程名称
            thread.setName("Thread-" + r.hashCode());
            // 设置线程优先级(最大值:10)
            thread.setPriority(Thread.MAX_PRIORITY);
            //......
            return thread;
        }
    };
    // 创建线程池,使用自定义的线程工厂
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory); 
    threadPoolExecutor.submit(new Runnable() {
        @Override
        public void run() {
            Thread thread = Thread.currentThread();
            System.out.println(String.format("线程:%s,线程优先级:%d",
                                             thread.getName(), thread.getPriority()));
        }
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

以上程序的执行结果如下:

线程:Thread-491044909,线程优先级:10
1

从上述执行结果可以看出,自定义线程工厂起作用了,线程的名称和线程的优先级都是通过线程工厂设置的。

# 7、RejectedExecutionHandler

拒绝策略:当线程池的任务超出线程池队列可以存储的最大值之后,执行的策略。

两种情况会拒绝处理任务:

  • 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务

  • 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务。

  • 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常。

ThreadPoolExecutor类的拒绝策略有以下 4 种

  • AbortPolicy:拒绝并抛出异常,抛出异常RejectedExecutionException
  • CallerRunsPolicy:使用当前调用的线程来执行此任务。
  • DiscardOldestPolicy:抛弃队列头部(最旧)的一个任务,并执行当前任务。
  • DiscardPolicy:忽略并抛弃当前任务。

线程池的默认策略是 AbortPolicy 拒绝并抛出异常。

当然也可以实现RejectedExecutionHandler接口,自定义处理器。

# 各项参数的默认值

  • corePoolSize=1
  • maxPoolSize=Integer.MAX_VALUE
  • keepAliveTime=60s
  • allowCoreThreadTimeout=false
  • queueCapacity=Integer.MAX_VALUE
  • rejectedExecutionHandler=AbortPolicy()

# ThreadPoolExecutor运行流程

线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分成两部分:任务管理线程管理

# 1、任务管理

任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:

(1)直接申请线程执行该任务;

(2)缓冲到队列中等待线程执行;

(3)拒绝该任务。线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

# 2、线程管理

(1)线程池如何维护自身状态。

(2)线程池如何管理任务。

(3)线程池如何管理线程。

# ThreadPoolExecutor执行顺序

  1. 当线程数小于核心线程数时,创建线程。
  2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
  3. 当线程数大于等于核心线程数,且任务队列已满。
    • 若线程数小于最大线程数,创建线程。
    • 若线程数等于最大线程数,抛出异常,拒绝任务。

# 四、ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 主要用来在给定的延迟后运行任务,或者定期执行任务。

ScheduledThreadPoolExecutor 使用的任务队列 DelayQueue 封装了一个 PriorityQueue

  • PriorityQueue会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask 的 time 变量小的先执行);
  • 如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask 的 squenceNumber 变量小的先执行)。

# 运行机制

ScheduledThreadPoolExecutor 的执行主要分为两大部分:

  1. 当调用 ScheduledThreadPoolExecutorscheduleAtFixedRate() 方法或者**scheduleWirhFixedDelay()** 方法时,会向 ScheduledThreadPoolExecutorDelayQueue 添加一个实现了 RunnableScheduledFuture 接口的 ScheduledFutureTask
  2. 线程池中的线程从 DelayQueue 中获取 ScheduledFutureTask,然后执行任务。

ScheduledThreadPoolExecutor 为了实现周期性的执行任务,对 ThreadPoolExecutor做了如下修改:

  • 使用 DelayQueue 作为任务队列;
  • 获取任务的方不同;
  • 执行周期任务后,增加了额外的处理。

# 执行周期任务的步骤

ScheduledThreadPoolExecutor 执行周期任务的步骤:

  1. 线程 1 从 DelayQueue 中获取已到期的 ScheduledFutureTask(DelayQueue.take())。到期任务是指 ScheduledFutureTask的 time 大于等于当前系统的时间;
  2. 线程 1 执行这个 ScheduledFutureTask
  3. 线程 1 修改 ScheduledFutureTask 的 time 变量为下次将要被执行的时间;
  4. 线程 1 把这个修改 time 之后的 ScheduledFutureTask 放回 DelayQueue 中(DelayQueue.add())。

# 五、线程池最佳实战

# 不同类别的业务用不同的线程池

很多人在实际项目中都会有类似这样的问题:我的项目中多个业务需要用到线程池,是为每个线程池都定义一个还是说定义一个公共的线程池呢?

一般建议是不同的业务使用不同的线程池,配置线程池的时候根据当前业务的情况对当前线程池进行配置,因为不同的业务的并发以及对资源的使用情况都不同,重心优化系统性能瓶颈相关的业务。

# 线程池命名

初始化线程池的时候需要显示命名(设置线程池名称前缀),有利于定位问题。

默认情况下创建的线程名字类似 pool-1-thread-n 这样的,没有业务含义,不利于我们定位问题。

线程池里的线程命名通常有下面两种方式

1.利用 guava 的 ThreadFactoryBuilder

ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)
1
2
3
4

2.自己实现 ThreadFactor

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * 线程工厂,它设置线程名称,有利于我们定位问题。
 */
public final class NamingThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final ThreadFactory delegate;
    private final String name;

    /**
     * 创建一个带名字的线程池生产工厂
     */
    public NamingThreadFactory(ThreadFactory delegate, String name) {
        this.delegate = delegate;
        this.name = name; // TODO consider uniquifying this
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = delegate.newThread(r);
        t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
        return t;
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 配置线程池参数

了解了ThreadPoolExecutor的执行流程后,下面看下如何设置各项参数。

需要根据几个值来决定:

  • tasks :每秒的任务数
  • tasktime:每个任务花费时间
  • responsetime:系统允许容忍的最大响应时间,比如每个任务的响应时间不得超过2秒。

# 1、线程数

对于线程数的设置,网上流传着一个规则:

  • CPU密集型任务:线程数设置为CPU核心数+1,之所以比CPU数多一个,是因为线程有可能因为某些原因暂停,这时候多出来的那个线程就可以充分利用当前空闲的那个CPU,避免资源的浪费。
  • I/O密集型任务:线程数设置为CPU核心数*2,在I/O期间,线程是阻塞的,此时CPU是空闲的,所以可以相应的增加线程数。

当然,该规则只是一个参考,在实际应用中,需要根据真实的业务场景确定CPU执行时长和I/O执行时长的占比,从而设置一个合理的值。

(1)corePoolSize设置

corePoolSize设置公式:

corePoolSize = 每秒需要多少个线程处理 = tasks/(1/tasktime) = tasks * tasktime
1

每个任务需要tasktime秒处理,则每个线程每钞可处理1/tasktime个任务。系统每秒有tasks个任务需要处理,则需要的线程数为:tasks/(1/tasktime),即tasks * tasktime个线程数。

假设系统每秒任务数为100 -1000,每个任务耗时0.1秒,则需要1000.1至1000*0.1,即10~100个线程。那么corePoolSize应该设置为大于10,具体数字最好根据8020原则,即80%情况下系统每秒任务数,若系统80%的情况下第秒任务数小于200,最多时为1000,则corePoolSize可设置为20。

(2)maxPoolSize设置

maxPoolSize设置公式:

maxPoolSize = (max(tasks)- queueCapacity)/(1/tasktime)
1

当系统负载达到最大值时,核心线程数已无法按时处理完所有任务,这时就需要增加线程。每秒200个任务需要20个线程,那么当每秒达到1000个任务时,则需要(1000-queueCapacity)*(20/200),即60个线程,可将maxPoolSize设置为60。

# 2、存活时间keepAliveTime

线程数量只增加不减少也不行。当负载降低时,可减少线程数量,如果一个线程空闲时间达到keepAliveTiime,该线程就退出。默认情况下线程池最少会保持corePoolSize个线程。

# 3、阻塞队列BlockingQueue

选择阻塞队列

​ 不推荐使用无界队列,对于一个运行中的系统,如果我们没有考虑到所有可能发生的情况,使用无界队列有可能因为某些异常情况导致创建了大量的任务,线程池来不及处理,而最终把内存耗完。

队列长度queueCapacity

我们要根据任务的提交频率和任务的运行时长,以及允许的任务延迟,计算出一个比较合理的队列大小,同时结合拒绝策略使用,比如,对于不重要的任务,可以采用丢弃策略;而针对重要的任务,系统实在处理不过来的话,可能做相应的告警,通知运维人员进行处理。

任务队列的长度要根据核心线程数,以及系统对任务响应时间的要求有关。

队列长度设置公式为

(corePoolSize/tasktime)*responsetime: (20/0.1)*2=400
1

即队列长度可设置为400。

队列长度设置过大,会导致任务响应时间过长,切忌以下写法:

LinkedBlockingQueue queue = new LinkedBlockingQueue();
1

这实际上是将队列长度设置为Integer.MAX_VALUE,将会导致线程数量永远为corePoolSize,再也不会增加,当任务数量陡增时,任务响应时间也将随之陡增。

参考资料:美团技术团队在**《Java 线程池实现原理及其在美团业务中的实践》 (opens new window)**这篇文章中介绍到对线程池参数实现可自定义配置的思路和方法。

# 动态配置线程池参数

并发任务的执行情况和任务类型相关,IO密集型和CPU密集型的任务运行起来的情况差异非常大,业界的一些线程池参数配置占比是较难合理预估的,这导致很难有一个简单有效的通用公式帮我们直接计算出结果。

美团技术团队的思路是主要对线程池的核心参数实现自定义可配置。根据系统的运行情况进行动态设置(利用配置中心)。

# 线程池参数动态修改流程

可以将线程池的参数从代码中迁移到分布式配置中心上,实现线程池参数可动态配置和即时生效,线程池参数动态化前后的参数修改流程对比如下:

# 三个核心参数

线程池构造参数有8个,但是最核心的是3个:corePoolSize、maximumPoolSize,workQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。

  • corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
  • maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,信任就会被存放在队列中。

这三个参数是 ThreadPoolExecutor 最重要的参数,它们基本决定了线程池对于任务的处理策略。

# 动态化线程池的核心设计

动态化线程池的核心设计包括以下三个方面:

  1. 简化线程池配置:配置最核心的是3个参数。考虑到在实际应用中我们获取并发性的场景主要是两种:

    (1)并行执行子任务,提高响应速度。这种情况下,应该使用同步队列,没有什么任务应该被缓存下来,而是应该立即执行。

    (2)并行执行大批次任务,提升吞吐量。这种情况下,应该使用有界队列,使用队列去缓冲大批量的任务,队列容量必须声明,防止任务无限制堆积。

    所以线程池只需要提供这三个关键参数的配置,并且提供两种队列的选择,就可以满足绝大多数的业务需求,Less is More。

  2. 参数可动态修改:为了解决参数不好配,修改参数成本高等问题。在Java线程池留有高扩展性的基础上,封装线程池,允许线程池监听同步外部的消息,根据消息进行修改配置。将线程池的配置放置在平台侧,允许开发同学简单的查看、修改线程池配置。

  3. 增加线程池监控:对某事物缺乏状态的观测,就对其改进无从下手。在线程池执行任务的生命周期添加监控能力,帮助开发同学了解线程池状态。

动态化线程池整体设计流程图

# 如何支持参数动态配置

ThreadPoolExecutor是允许在运行期间动态设置corePoolSizemaximumPoolSize的。且看 ThreadPoolExecutor 提供的下面这些方法。

格外需要注意的是corePoolSize, 程序运行期间的时候,我们调用 setCorePoolSize()这个方法的话,线程池会首先判断当前工作线程数是否大于corePoolSize,如果大于的话就会回收工作线程。

另外,你也看到了上面并没有动态指定队列长度的方法,美团的方式是自定义了一个叫做 ResizableCapacityLinkedBlockIngQueue 的队列(主要就是把LinkedBlockingQueue的 capacity 字段的 final 关键字修饰给去掉了,让它变为可变的)。

# 最终实现参数效果

最终实现的可动态修改线程池参数效果如下:

# 监测线程池运行状态

你可以通过一些手段来检测线程池的运行状态比如 SpringBoot 中的 Actuator 组件。

除此之外,我们还可以利用 ThreadPoolExecutor 的相关 API 做一个简陋的监控。ThreadPoolExecutor提供的几个publicgetter方法,可以读取到当前线程池的运行状态以及参数,如下图所示:

从上图可以看出, ThreadPoolExecutor提供了线程池当前的线程数和活跃线程数、已经执行完成的任务数、正在排队中的任务数等等。

下面是一个简单的 Demo。printThreadPoolStatus()会每隔一秒打印出线程池的线程数、活跃线程数、完成的任务数、以及队列中的任务数。

/**
 * 打印线程池的状态
 *
 * @param threadPool 线程池对象
 */
public static void printThreadPoolStatus(ThreadPoolExecutor threadPool) {
    ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, createThreadFactory("print-thread-pool-status", false));
    scheduledExecutorService.scheduleAtFixedRate(() -> {
        log.info("=========================");
        log.info("ThreadPool Size: [{}]", threadPool.getPoolSize());
        log.info("Active Threads: {}", threadPool.getActiveCount());
        log.info("Number of Tasks : {}", threadPool.getCompletedTaskCount());
        log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
        log.info("=========================");
    }, 0, 1, TimeUnit.SECONDS);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# ThreadPoolExecutor使用示例

首先创建一个 Runnable 接口的实现类MyRunnable.java

import java.util.Date;

/**
 * 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。
 * @author shuang.kou
 */
publicclass MyRunnable implements Runnable {

    private String command;

    public MyRunnable(String s) {
        this.command = s;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
        processCommand();
        System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
    }

    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        returnthis.command;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。创建ThreadPoolExecutorDemo.java

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

publicclass ThreadPoolExecutorDemo {

    privatestaticfinalint CORE_POOL_SIZE = 5;
    privatestaticfinalint MAX_POOL_SIZE = 10;
    privatestaticfinalint QUEUE_CAPACITY = 100;
    privatestaticfinal Long KEEP_ALIVE_TIME = 1L;
    public static void main(String[] args) {

        //使用阿里巴巴推荐的创建线程池的方式
        //通过ThreadPoolExecutor构造函数自定义参数创建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 0; i < 10; i++) {
            //创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
            Runnable worker = new MyRunnable("" + i);
            //执行Runnable
            executor.execute(worker);
        }
        //终止线程池
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

上面的代码指定了:

  1. corePoolSize: 核心线程数为 5。
  2. maximumPoolSize :最大线程数 10
  3. keepAliveTime : 等待时间为 1L。
  4. unit: 等待时间的单位为 TimeUnit.SECONDS。
  5. workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100;
  6. handler:饱和策略为 CallerRunsPolicy

# 六、参考资料

《Java 线程池实现原理及其在美团业务中的实践》 (opens new window)