简介 Executors 是JDK 1.5中提供的一个创建线程池的建议类,其是对ThreadPoolExecutor进行了一次封装,屏蔽了ThreadPoolExecutor复杂的参数,简单使用且暴力。Executors提供了3种线程池分别为newCachedThreadPool(无固定大小线程池,如果线程空闲时间超过60s则回收),固定线程池newFixedThreadPool,和单一线程池newSingleThreadExecutor(只有一个线程),相比使用ThreadPoolExecutor,Executors虽然也是使用ThreadPoolExecutor去创建线程池,但是其给ThreadPoolExecutor提供了默认参数,这就使我们在使用Executors时比ThreadPoolExecutor简单的多得多,往往只有传入一个创建线程的个数参数即可,但就是由于其屏蔽了ThreadPoolExecutor的细节导致在使用Executors在处理大量任务时存在OOM的隐患,所以在阿里巴巴编程规范中明确禁止使用Executors去创建线程池,我们可以通过查看api和查看 Executors代码可以得知为什么会这样.
ThreadPoolExecutor api中关于ThreadPoolExecutor构造方法的解释 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 参数: corePoolSize - 池中所保存的线程数,包括空闲线程。 maximumPoolSize - 池中允许的最大线程数。 keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 unit - keepAliveTime 参数的时间单位。 workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。 threadFactory - 执行程序创建新线程时使用的工厂。 handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
1 2 3 4 5 6 7 8 9 10 public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }
在创建线程时会先去判断当前活动的线程数是否达到corePoolSize设置的数量,如果没有的话则调用ThreadFactory去新建线程,如果当前正在使用执行任务的线程已经达到corePoolSize设置的数量,则将任务存放到构造方法中设置的BlockingQueue workQueue队列中,如果BlockingQueue使用的无解队列如LinkedBlockingDeque的话,那么等待执行的任务就会无限往无界队列中添加,如果使用的是有界队列的话如ArrayBlockingQueue,当任务数达到有界队列的数量后,如果目前执行任务的线程数量小于maximumPoolSize设置的线程数量则继续创建线程执行任务,直到线程数到达设置的maximumPoolSize时,执行拒绝策略RejectedExecutionHandler中实现的方法丢弃任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
Executors存在的问题 如果你认真查看上文关于,也许你就会发现一些问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 package com.liu; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; /** * @author Liush * @description * @date 2019/10/9 9:46 **/ public class MyRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("线程队列已满,且线程数已达最大线程数"); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package com.liu; import java.util.concurrent.*; /** * @author Liush * @description * @date 2019/10/9 9:40 **/ public class ThreadPoolTest { public static void main(String[] args) { BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(10); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 5000, TimeUnit.MILLISECONDS, blockingQueue, Executors.defaultThreadFactory(), new MyRejectedExecutionHandler()); for (int i = 0; i < 100; i++) { threadPoolExecutor.execute(new Runnable() { @Override public void run() { try { System.out.println("阻塞队列中有待执行任务个数" + blockingQueue.size()); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 线程队列已满,且线程数已达最大线程数 阻塞队列中有待执行任务个数10 线程队列已满,且线程数已达最大线程数 线程队列已满,且线程数已达最大线程数 线程队列已满,且线程数已达最大线程数 线程队列已满,且线程数已达最大线程数 阻塞队列中有待执行任务个数10 线程队列已满,且线程数已达最大线程数 线程队列已满,且线程数已达最大线程数 线程队列已满,且线程数已达最大线程数 线程队列已满,且线程数已达最大线程数 线程队列已满,且线程数已达最大线程数 线程队列已满,且线程数已达最大线程数 线程队列已满,且线程数已达最大线程数 ... 阻塞队列中有待执行任务个数9 阻塞队列中有待执行任务个数8 阻塞队列中有待执行任务个数6 阻塞队列中有待执行任务个数6 阻塞队列中有待执行任务个数4 阻塞队列中有待执行任务个数4 阻塞队列中有待执行任务个数3 阻塞队列中有待执行任务个数2 阻塞队列中有待执行任务个数1 阻塞队列中有待执行任务个数0
offer(e, time, unit)
poll(time, unit)
由上文可知我们在Executors中的 newCachedThreadPool中使用的是SynchronousQueue队列,也就是说当多个线程一起往SynchronousQueue调用offer返回时会返回false,从而使线程池开启新的线程,又由于newCachedThreadPool将 maximumPoolSize(池中允许的最大线程数)设置为Integer.MAX_VALUE,从而导致在大规模并发下导致OOM,以下是ThreadPoolExecutor中execute方法的源码,其注释和代码描述上述ThreadPoolExecutor的执行策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 package com.liu; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; /** * @author Liush * @description * @date 2019/10/9 14:03 **/ public class QueueTest { public static void main(String[] args) throws Exception{ //不使用公平队列 SynchronousQueue<String> synchronousQueue=new SynchronousQueue(); //使用公平队列 //SynchronousQueue<String> synchronousQueue=new SynchronousQueue(true); ExecutorService executorService =Executors.newCachedThreadPool(); executorService.execute(new Runnable() { @Override public void run() { try { synchronousQueue.put("1"); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread.sleep(1000); executorService.execute(new Runnable() { @Override public void run() { try { synchronousQueue.put("2"); Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread.sleep(1000); executorService.execute(new Runnable() { @Override public void run() { try { synchronousQueue.put("3"); Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread.sleep(1000); executorService.execute(new Runnable() { @Override public void run() { while (true){ try { System.out.println(synchronousQueue.take() +"消费"); } catch (InterruptedException e) { e.printStackTrace(); } } } }); } }