简介 Executors 是JDK 1.5中提供的一个创建线程池的建议类,其是对ThreadPoolExecutor进行了一次封装,屏蔽了ThreadPoolExecutor复杂的参数,简单使用且暴力。Executors提供了3种线程池分别为newCachedThreadPool(无固定大小线程池,如果线程空闲时间超过60s则回收),固定线程池newFixedThreadPool,和单一线程池newSingleThreadExecutor(只有一个线程),相比使用ThreadPoolExecutor,Executors虽然也是使用ThreadPoolExecutor去创建线程池,但是其给ThreadPoolExecutor提供了默认参数,这就使我们在使用Executors时比ThreadPoolExecutor简单的多得多,往往只有传入一个创建线程的个数参数即可,但就是由于其屏蔽了ThreadPoolExecutor的细节导致在使用Executors在处理大量任务时存在OOM的隐患,所以在阿里巴巴编程规范中明确禁止使用Executors去创建线程池,我们可以通过查看api和查看 Executors代码可以得知为什么会这样.
ThreadPoolExecutor api中关于ThreadPoolExecutor构造方法的解释 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 参数: corePoolSize - 池中所保存的线程数,包括空闲线程。 maximumPoolSize - 池中允许的最大线程数。 keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 unit - keepAliveTime 参数的时间单位。 workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。 threadFactory - 执行程序创建新线程时使用的工厂。 handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
ThreadPoolExecutor执行流程,当需要线程时调用参数中的ThreadFactory方法去创建线程,JDK中提供了默认的工具类去创建默认的线程工程Executors.defaultThreadFactory(),其有一个newThread方法去创建一个线程,在实际使用中我们可以自行实现线程工程(修改线程名字等),方便后续日志追踪线程信息
1 2 3 4 5 6 7 8 9 10 public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }
在创建线程时会先去判断当前活动的线程数是否达到corePoolSize设置的数量,如果没有的话则调用ThreadFactory去新建线程,如果当前正在使用执行任务的线程已经达到corePoolSize设置的数量,则将任务存放到构造方法中设置的BlockingQueue workQueue队列中,如果BlockingQueue使用的无解队列如LinkedBlockingDeque的话,那么等待执行的任务就会无限往无界队列中添加,如果使用的是有界队列的话如ArrayBlockingQueue,当任务数达到有界队列的数量后,如果目前执行任务的线程数量小于maximumPoolSize设置的线程数量则继续创建线程执行任务,直到线程数到达设置的maximumPoolSize时,执行拒绝策略RejectedExecutionHandler中实现的方法丢弃任务。
以下是线程执行的流程图
Executors
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
Executors存在的问题 如果你认真查看上文关于,也许你就会发现一些问题
在newCachedThreadPool中maximumPoolSize最大数量为Integer.MAX_VALUE,那么根据上文所说,如果线程有大量任务需要执行,那么会不断的开启新的线程直到线程数达到Integer.MAX_VALUE,但是往往还没达到这个线程数,服务器就OOM了。
编写测试代码
首先编写一个拒绝处理器类RejectedExecutionHandler的实现,其功能为当执行队列已满且执行线程数已经达到设置的最大线程时打印文字
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 package com.liu; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; /** * @author Liush * @description * @date 2019/10/9 9:46 **/ public class MyRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("线程队列已满,且线程数已达最大线程数"); } }
创建线程池,且设置参数,我们设置了一个10个大小的队列,为了方便查看拒绝执行线程时调用RejectedExecutionHandler中的代码我们设置了最小线程数为1,最大线程数为2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package com.liu; import java.util.concurrent.*; /** * @author Liush * @description * @date 2019/10/9 9:40 **/ public class ThreadPoolTest { public static void main(String[] args) { BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(10); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 5000, TimeUnit.MILLISECONDS, blockingQueue, Executors.defaultThreadFactory(), new MyRejectedExecutionHandler()); for (int i = 0; i < 100; i++) { threadPoolExecutor.execute(new Runnable() { @Override public void run() { try { System.out.println("阻塞队列中有待执行任务个数" + blockingQueue.size()); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }
执行结果,我们看到了任务出现了大量失败,因为我们将最大线程数设置为2,且队列大小设置为10,所以最多同时处理12个任务(2个任务正在执行,其余10个在队列中),直到最后当循环快结束时(此时大量任务已经被丢弃),阻塞队列中的任务数量才降低
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 线程队列已满,且线程数已达最大线程数 阻塞队列中有待执行任务个数10 线程队列已满,且线程数已达最大线程数 线程队列已满,且线程数已达最大线程数 线程队列已满,且线程数已达最大线程数 线程队列已满,且线程数已达最大线程数 阻塞队列中有待执行任务个数10 线程队列已满,且线程数已达最大线程数 线程队列已满,且线程数已达最大线程数 线程队列已满,且线程数已达最大线程数 线程队列已满,且线程数已达最大线程数 线程队列已满,且线程数已达最大线程数 线程队列已满,且线程数已达最大线程数 线程队列已满,且线程数已达最大线程数 ... 阻塞队列中有待执行任务个数9 阻塞队列中有待执行任务个数8 阻塞队列中有待执行任务个数6 阻塞队列中有待执行任务个数6 阻塞队列中有待执行任务个数4 阻塞队列中有待执行任务个数4 阻塞队列中有待执行任务个数3 阻塞队列中有待执行任务个数2 阻塞队列中有待执行任务个数1 阻塞队列中有待执行任务个数0
SynchronousQueue
SynchronousQueue实现了BlockingQueue,BlockingQueue的实现类都是安全线程的
SynchronousQueue是阻塞队列,只能单进单出,也就是说多个线程调用queue的put或者offer方法只能有一个线程往里插入元素,除非消费者take出元素后才能继续插入元素。
BlockingQueue插入元素和移除元素有4种方法,分别对应当队列为空或者队列已满时不同的生产策略和消费策略
操作
抛出异常(当队列已满或者这队列为空)
特殊值(当队列已满或者这队列为空)
阻塞(当队列已满或者这队列为空)
超时(当队列已满或者这队列为空)
插入
add(e)
offer(e)
put(e)
offer(e, time, unit)
移除
remove()
poll()
take()
poll(time, unit)
检查
element()
peek()
不可用
不可用
由上文可知我们在Executors中的 newCachedThreadPool中使用的是SynchronousQueue队列,也就是说当多个线程一起往SynchronousQueue调用offer返回时会返回false,从而使线程池开启新的线程,又由于newCachedThreadPool将 maximumPoolSize(池中允许的最大线程数)设置为Integer.MAX_VALUE,从而导致在大规模并发下导致OOM,以下是ThreadPoolExecutor中execute方法的源码,其注释和代码描述上述ThreadPoolExecutor的执行策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
SynchronousQueue的公平队列,虽然SynchronousQueue只允许只有一个元素,但是其却可以维护一个公平队列,在新建SynchronousQueue实例时在构造方法中传入true即可,这个公平队列的作用就是按照线程玩队列插入元素的先后顺序插入元素,实践代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 package com.liu; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; /** * @author Liush * @description * @date 2019/10/9 14:03 **/ public class QueueTest { public static void main(String[] args) throws Exception{ //不使用公平队列 SynchronousQueue<String> synchronousQueue=new SynchronousQueue(); //使用公平队列 //SynchronousQueue<String> synchronousQueue=new SynchronousQueue(true); ExecutorService executorService =Executors.newCachedThreadPool(); executorService.execute(new Runnable() { @Override public void run() { try { synchronousQueue.put("1"); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread.sleep(1000); executorService.execute(new Runnable() { @Override public void run() { try { synchronousQueue.put("2"); Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread.sleep(1000); executorService.execute(new Runnable() { @Override public void run() { try { synchronousQueue.put("3"); Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread.sleep(1000); executorService.execute(new Runnable() { @Override public void run() { while (true){ try { System.out.println(synchronousQueue.take() +"消费"); } catch (InterruptedException e) { e.printStackTrace(); } } } }); } }
上述代码开启了3个生产者,开启期间线程睡眠了1秒,保证其先后调用顺序,以下结果是我们不使用公平队列的情况的生产者输出,我们看到消费者并没有满足先进先出的原则,
如果我们使用公平队列,结果如下,满足了队列先进先出的原则