• 在线客服

  • 扫描二维码
    下载博学谷APP

  • 扫描二维码
    关注博学谷微信公众号

  • 意见反馈

原创 线程池的实现原理详解附源码

发布时间:2020-04-13 19:05:36 浏览 4157 来源:博学谷 作者:照照

    线程池作为存放线程的池子,能存放很多可以复用的线程。线程池的优点主要是可以降低系统资源消耗,提高响应速度以及提高线程的可管理性。本文将附上源码为大家详解线程池的实现原理。内容主要包括提交任务、创建线程、工作线程的实现原理和线程复用机制。

     

    线程池的实现原理

     

    一、提交任务

     

    线程池框架提供了两种方式提交任务,submit()execute(),通过submit()方法提交的任务可以返回任务执行的结果,通过execute()方法提交的任务不能获取任务执行的结果。

     

    1submit()的实现源码

     

    public Future<?> submit(Runnable task) {
           

    if (task == null) throw new NullPointerException();
           

    RunnableFuture<Void> ftask = newTaskFor(task, null);
           

    execute(ftask);
           

    return ftask;
       

    }

     

    2execute()的实现源码

     

    public void execute(Runnable command) {
           

    if (command == null)
           

         throw new NullPointerException();
           

    //获取线程池控制状态
           

    int c = ctl.get();
           

    // (1)
           

    //worker数量小于corePoolSize
           

    if (workerCountOf(c) < corePoolSize) {
           

         //创建worker,addWorker方法boolean参数用来判断是否创建核心线程
           

         if (addWorker(command, true))
           

             //成功则返回
           

             return;
           

         //失败则再次获取线程池控制状态
           

         c = ctl.get();
           

    }
           

    //(2)
          

    //线程池处于RUNNING状态,将任务加入workQueue任务缓存队列
           

    if (isRunning(c) && workQueue.offer(command)) {
           

         // 再次检查,获取线程池控制状态,防止在任务入队的过程中线程池关闭了或者线程池中没有线程了
           

         int recheck = ctl.get();
           

         //线程池不处于RUNNING状态,且将任务从workQueue移除成功
           

         if (! isRunning(recheck) && remove(command))
           

             //采取任务拒绝策略
           

             reject(command);
           

         //worker数量等于0
           

         else if (workerCountOf(recheck) == 0)
           

             //创建worker
           

             addWorker(null, false);
           

    }
           

    //(3)
           

    else if (!addWorker(command, false))  //创建worker
           

         reject(command);  //如果创建worker失败,采取任务拒绝策略
       

    }

     

    总结:若线程池工作线程数量小于corePoolSize,则创建新线程来执行任务;若工作线程数量大于或等于core PoolSize,则将任务加入BlockingQueue;若无法将任务加入BlockingQueue(BlockingQueue已满),且工作线程数量小于maximumPoolSize,则创建新的线程来执行任务;若工作线程数量达到maximumPoolSize,则创建线程失败,采取任务拒绝策略。

     

    二、创建线程

     

    1、实现代码:

     

    //addWorker有两个参数:Runnable类型的firstTask,用于指定新增的线程执行的第一个任务;boolean类型的core,表示是否创建核心线程

     

    //该方法的返回值代表是否成功新增一个线程
     

    private boolean addWorker(Runnable firstTask, boolean core) {
           

    retry:
           

    for (;;) {
           

         int c = ctl.get();
           

         int rs = runStateOf(c);


                // (1)
           

         if (rs >= SHUTDOWN &&
           

             ! (rs == SHUTDOWN &&
           

                firstTask == null &&
           

                ! workQueue.isEmpty()))
           

             return false;


           
       for (;;) {
           

            int wc = workerCountOf(c);
           

             //线程数超标,不能再创建线程,直接返回
           

             if (wc >= CAPACITY ||
           

                 wc >= (core ? corePoolSize : maximumPoolSize))
             

               return false;
           

             //CAS操作递增workCount
           

             //如果成功,那么创建线程前的所有条件校验都满足了,准备创建线程执行任务,退出retry循环
           

             //如果失败,说明有其他线程也在尝试往线程池中创建线程(往线程池提交任务可以是并发的),则继续往下执行
           

             if (compareAndIncrementWorkerCount(c))
           

                 break retry;
           

             //重新获取线程池控制状态
           

             c = ctl.get();
           

             // 如果线程池的状态发生了变更,如有其他线程关闭了这个线程池,那么需要回到外层的for循环
           

             if (runStateOf(c) != rs)
           

                 continue retry;
           

             //如果只是CAS操作失败的话,进入内层的for循环就可以了
           

         }
           

    }


           
    //到这里,创建线程前的所有条件校验都满足了,可以开始创建线程来执行任务

     

            //worker是否已经启动
           

    boolean workerStarted = false;
           

    //是否已将这个worker添加到workers这个HashSet
           

    boolean workerAdded = false;
           

    Worker w = null;
         

       try {
           

         //创建一个worker,从这里可以看出对线程的包装
           

         w = new Worker(firstTask);
           

         //取出worker中的线程对象,Worker的构造方法会调用ThreadFactory来创建一个新的线程
           

         final Thread t = w.thread;
           

         if (t != null) {
           

             //获取全局锁, 并发的访问线程池workers对象必须加锁,持有锁的期间线程池也不会被关闭
           

             final ReentrantLock mainLock = this.mainLock;
           

             mainLock.lock();
           

             try {
           

                 //重新获取线程池的运行状态
           

                 int rs = runStateOf(ctl.get());


                   
      //小于SHUTTDOWNRUNNING
           

                 //等于SHUTDOWN并且firstTasknull,不接受新的任务,但是会继续执行等待队列中的任务
           

                 if (rs < SHUTDOWN ||
           

                     (rs == SHUTDOWN && firstTask == null)) {
           

                     //worker里面的thread不能是已启动的
           

                     if (t.isAlive())
          

                         throw new IllegalThreadStateException();
           

                    //将新创建的线程加入到线程池中
           

                     workers.add(w);
           

                     int s = workers.size();
           

                     // 更新largestPoolSize
           

                     if (s > largestPoolSize)
           

                         largestPoolSize = s;
           

                     workerAdded = true;
           

                 }
           

             } finally {
           

                 mainLock.unlock();
           

             }
           

             //线程添加线程池成功,则启动新创建的线程
           

             if (workerAdded) {
           

                 t.start();
           

                 workerStarted = true;
           

             }
           

         }
           

    } finally {
           

         //若线程启动失败,做一些清理工作,例如从workers中移除新添加的worker并递减wokerCount
           

         if (! workerStarted)
           

             addWorkerFailed(w);
           

    }
           

    //返回线程是否启动成功
           

    return workerStarted;
       

    }

     

    2、总结:

     

    addWorker()方法完成了一些任务。比如原子性的增加workerCount;将用户给定的任务封装成为一个worker,并将此worker添加进workers集合中;启动worker对应的线程;若线程启动失败,回滚worker的创建动作,即从workers中移除新添加的worker,并原子性的减少workerCount

     

    三、工作线程的实现原理

     

    1Worker

     

    Worker类继承自AQS类,具有锁的功能;实现了Runable接口,可以将自身作为一个任务在线程中执行。

     

    private final class Worker
           

    extends AbstractQueuedSynchronizer
           

    implements Runnable

     

    2Worker的主要字段:

     

    //用来封装worker的线程,线程池中真正运行的线程,通过线程工厂创建而来
           

    final Thread thread;
           

    //worker所对应的第一个任务,可能为空
           

    Runnable firstTask;
           

    //记录当前线程完成的任务数
           

    volatile long completedTasks;

     

    3Worker的构造函数:

     

    Worker(Runnable firstTask) {
           

         //设置AQSstate-1,在执行runWorker()方法之前阻止线程中断
           

         setState(-1);
           

         //初始化第一个任务
           

         this.firstTask = firstTask;
           

         //利用指定的线程工厂创建一个线程,注意,参数是Worker实例本身this
           

         //也就是当执行start方法启动线程thread时,真正执行的是Worker类的run方法
           

         this.thread = getThreadFactory().newThread(this);
           

    }

     

    四、线程复用机制

     

    worker中的线程start 后,执行的是workerrun()方法,而run()方法最终会调用ThreadPoolExecutor类的runWorker()方法,runWorker()方法实现了线程池中的线程复用机制。下面我们来看一下runWorker()方法的实现。

     

    final void runWorker(Worker w) {
           

    //获取当前线程
           

    Thread wt = Thread.currentThread();
           

    //获取wfirstTask
           

    Runnable task = w.firstTask;
           

    //设置wfirstTasknull
           

    w.firstTask = null;
           

    // 释放锁,设置AQSstate0,允许中断
           

    w.unlock();
           

    //用于标识线程是否异常终止,finallyprocessWorkerExit()方法会有不同逻辑
           

    boolean completedAbruptly = true;
           

    try {
           

         //循环调用getTask()获取任务,不断从任务缓存队列获取任务并执行
           

         while (task != null || (task = getTask()) != null) {
           

             //进入循环内部,代表已经获取到可执行的任务,则对worker对象加锁,保证线程在执行任务过程中不会被中
           

             w.lock();
           

             if ((runStateAtLeast(ctl.get(), STOP) ||  //若线程池状态大于等于STOP,那么意味着该线程要中断
           

                  (Thread.interrupted() &&      //线程被中断
           

                   runStateAtLeast(ctl.get(), STOP))) &&  //且是因为线程池内部状态变化而被中断
           

                 !wt.isInterrupted())           //确保该线程未被中断
           

                 //发出中断请求
           

                 wt.interrupt();
           

             try {
           

                 //开始执行任务前的Hook方法
           

                 beforeExecute(wt, task);
           

                 Throwable thrown = null;
           

                 try {
           

                     //到这里正式开始执行任务
           

                     task.run();
           

                 } catch (RuntimeException x) {
           

                     thrown = x; throw x;
           

                 } catch (Error x) {
           

                     thrown = x; throw x;
           

                 } catch (Throwable x) {
           

                     thrown = x; throw new Error(x);
           

                 } finally {
           

                     //执行任务后的Hook方法
           

                     afterExecute(task, thrown);
           

                 }
           

             } finally {
           

                 //置空task,准备通过getTask()获取下一个任务
           

                 task = null;
           

                 //completedTasks递增
           

                 w.completedTasks++;
           

                 //释放掉worker持有的独占锁
           

                 w.unlock();
           

             }
           

         }
           

         completedAbruptly = false;
           

    } finally {
           

         //到这里,线程执行结束,需要执行结束线程的一些清理工作
           

         //线程执行结束可能有两种情况:
           

         //1.getTask()返回null,也就是说,这个worker的使命结束了,线程执行结束
           

         //2.任务执行过程中发生了异常
           

         //第一种情况,getTask()返回null,那么getTask()中会将workerCount递减
           

         //第二种情况,workerCount没有进行处理,这个递减操作会在processWorkerExit()中处理
           

         processWorkerExit(w, completedAbruptly);
           

    }
       

    }

     

    五、关闭线程池

     

    1shutdown()方法的实现原理

     

    shutdown()方法将线程池运行状态设置为SHUTDOWN,此时线程池不会接受新的任务,但会处理阻塞队列中的任务。

     

    public void shutdown() {
           

    final ReentrantLock mainLock = this.mainLock;
           

    //获取全局锁
           

    mainLock.lock();
           

    try {
           

         //检查shutdown权限
           

         checkShutdownAccess();
           

         //设置线程池运行状态为SHUTDOWN
           

         advanceRunState(SHUTDOWN);
           

         //中断所有空闲worker
           

         interruptIdleWorkers();
           

         //onShutdown()钩子方法
           

         onShutdown();
           

    } finally {
           

         //释放锁
           

         mainLock.unlock();
           

    }
           

    //尝试终止线程池
           

    tryTerminate();
       

    }

     

    shutdown()方法首先会检查是否具有shutdown的权限,然后设置线程池的运行状态为SHUTDOWN,之后中断所有空闲的worker,再调用onShutdown()钩子方法,最后尝试终止线程池。

     

    2shutdownNow()的实现原理

     

    private void interruptWorkers() {
           

    final ReentrantLock mainLock = this.mainLock;
           

    //获取全局锁
           

    mainLock.lock();
           

    try {
           

         //遍历workers集合
           

         for (Worker w : workers)
           

             //调用Worker类的interruptIfStarted()方法中断线程
           

             w.interruptIfStarted();
           

    } finally {
           

         //释放锁
           

         mainLock.unlock();
           

    }
       

    }

     

    看到这里,相信大家对于线程池的实现原理已经有了一个大致的认识。如果想要学习更加深入的知识,提高自身的Java技能,不妨在博学谷多看看海量的视频学习资源,课程除了视频教学还配有相应的源码和学习资料,大家还在等什么,现在一起来体验看看吧!

    申请免费试学名额    

在职想转行提升,担心学不会?根据个人情况规划学习路线,闯关式自适应学习模式保证学习效果
讲师一对一辅导,在线答疑解惑,指导就业!

上一篇: 优化MySQL数据库的方法有哪些? 下一篇: Java网络编程学习大纲整理

相关推荐 更多

热门文章

  • 前端是什么
  • 前端开发的工作职责
  • 前端开发需要会什么?先掌握这三大核心关键技术
  • 前端开发的工作方向有哪些?
  • 简历加分-4步写出HR想要的简历
  • 程序员如何突击面试?两大招带你拿下面试官
  • 程序员面试技巧
  • 架构师的厉害之处竟然是这……
  • 架构师书籍推荐
  • 懂了这些,才能成为架构师
  • 查看更多

扫描二维码,了解更多信息

博学谷二维码