Java并发编程:Java线程池中心ThreadPoolExecutor的运用


玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。

目次

  • 引出线程池
  • Executor框架
  • ThreadPoolExecutor详解
    • 组织函数
    • 重要的变量
    • 线程池实行流程
    • 义务行列workQueue
    • 义务谢绝战略
    • 线程池的封闭
  • ThreadPoolExecutor建立线程池实例
  • 参考:

引出线程池

线程是并发编程的基础,前面的文章里,我们的实例基础都是基于线程开辟作为实例,而且都是运用的时候就建立一个线程。这类体式格局对照简朴,然则存在一个题目,那就是线程的数量题目。

假设有一个体系对照复杂,须要的线程数许多,若是都是接纳这类体式格局来建立线程的话,那末就会极大的斲丧体系资本。起首是由于线程自身的建立和烧毁须要时候,若是每一个小义务都建立一个线程,那末就会大大下降体系的效力。其次是线程自身也是占用内存空间的,大批的线程运转会抢占内存资本,处置惩罚欠妥很可能会内存溢出,这明显不是我们想看到的。

那末有甚么设施处理呢?有一个好的思绪就是对线程举行复用,由于一切的线程实在不都是同一时候一同运转的,有些线程在某个时候多是余暇状况,若是这局部余暇线程能有用应用起来,那末就能让线程的运转被充足的应用,如许就不须要建立那末多的线程了。我们能够把特定数量的线程放在一个容器里,须要运用线程时,从容器里拿出余暇线程运用,线程事情完后不急着封闭,而是退回到线程池守候运用。如许的容器一样平常被称为线程池。用线程池来治理线程黑白常有用的要领,用一张图片能够简朴的展现出线程池的治理流程:

Executor框架

Java中也有一套框架来掌握治理线程,那就是Executor框架。Executor框架是JDK1.5以后才引入的,位于java.util.cocurrent 包下,能够经由历程该框架来掌握线程的启动、实行和封闭,从而简化并发编程的操纵,这是它的中心成员类图:

Executor:最上层的接口,界说了一个基础要领execute,吸收一个Runnable参数,用来替换一般建立或启动线程的要领。

ExecutorService:继承自Executor接口,供应了处置惩罚多线程的要领。

ScheduledExecutorService:准时调理接口,继承自ExecutorService。

AbstractExecutorService:实行框架的抽象类。

ThreadPoolExecutor:线程池中最中心的一个类,供应了线程池操纵的基础要领。

Executors:线程池工场类,可用于建立一系列有特定功用的线程池。

ThreadPoolExecutor详解

以上Executor框架中的基础成员,个中最中心的的成员无疑就是ThreadPoolExecutor,想相识Java中线程池的运转机制,就必须先相识这个类,而最好的相识体式格局无疑就是看源码。

组织函数

翻开ThreadPoolExecutor的源码,发明类中供应了四个组织要领

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

能够看出,ThreadPoolExecutor的组织函数中的参数照样对照多的,而且最中心的是第四个组织函数,个中完成了底层的初始化事情。

下面解释一下组织函数参数的寄义:

  • corePoolSize:线程池的基础巨细。当提交一个义务到线程池后,线程池会建立一个线程实行义务,反复这类操纵,直到线程池中的数量到达corePoolSize后不再建立新线程,而是把义务放到缓存行列中。
  • maximumPoolSize:线程池许可建立的最大线程数。
  • workQueue:壅塞行列,用于存储守候实行的义务,而且只能存储挪用execute 要领提交的义务。经常使用的有三种行列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。
  • keepAliveTime:线程池中线程的最大余暇时候,这类状况一样平常是线程数量大于义务的数量致使。
  • unit:keepAliveTime的时候单元,TimeUnit是一个罗列范例,位于java.util.concurrent包下。
  • threadFactory:线程工场,用于建立线程。

  • handler:谢绝战略,当义务太多来不及处置惩罚时所接纳的处置惩罚战略。

重要的变量

看完了组织函数,我们来看下ThreadPoolExecutor类中几个重要的成员变量:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl:掌握线程运转状况的一个字段。同时,依据下面的几个要领runStateOfworkerCountOfctlOf能够看出,该字段还包含了两局部的信息:线程池的运转状况 (runState) 和线程池内有用线程的数量 (workerCount),而且运用的是Integar范例,高3位生存runState,低29位生存workerCount。

COUNT_BITS:值为29的常量,在字段CAPACITY被援用盘算。

CAPACITY:透露表现有用线程数量(workerCount)的上限,巨细为 (1<<29) - 1。

下面5个变量透露表现的是线程的运转状况,分别是:

  • RUNNING :吸收新提交的义务,而且能处置惩罚壅塞行列中的义务;
  • SHUTDOWN:不吸收新的义务,但会实行行列中的义务。
  • STOP:不吸收新义务,也不处置惩罚行列中的义务,同时中缀正在处置惩罚义务的线程。
  • TIDYING:若是一切的义务都已住手了,workerCount (有用线程数) 为0,线程池进入该状况后会挪用 terminated() 要领进入TERMINATED 状况。
  • TERMINATED:terminated( ) 要领实行终了。

用一个状况转换图透露表现也许以下 (图片来源于https://www.cnblogs.com/liuzhihu/p/8177371.html):

组织函数和基础参数都相识后,接下来就是对类中重要要领的研讨了。

线程池实行流程

execute要领

ThreadPoolExecutor类的中心调理要领是execute(),经由历程挪用这个要领能够向线程池提交一个义务,交由线程池去实行。而ThreadPoolExecutor的事情逻辑也能够藉由这个要领来一步步理清。这是要领的源码:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //猎取ctl的值,前面说了,该值纪录着runState和workerCount
    int c = ctl.get();
    /*
     * 挪用workerCountOf获得以后运动的线程数;
     * 以后运动线程数小于corePoolSize,新建一个线程放入线程池中;
     * addWorker(): 把义务增添到该线程中。
     */
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        //若是上面的增添线程操纵失利,从新猎取ctl值
        c = ctl.get();
    }
    //若是以后线程池是运转状况,而且往事情行列中增添该义务
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        /*
         * 若是以后线程不是运转状况,把义务从行列中移除
         * 挪用reject(内部挪用handler)谢绝吸收义务  
         */
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //猎取线程池中的有用线程数,若是为0,则实行addWorker建立一个新线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    /*
     * 若是实行到这里,有两种状况:
     * 1. 线程池已不是RUNNING状况;
     * 2. 线程池是RUNNING状况,但workerCount >= corePoolSize而且workQueue已满。
     * 这时候,再次挪用addWorker要领,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
     * 若是失利则谢绝该义务
     */
    else if (!addWorker(command, false))
        reject(command);
}

简朴归纳综合一下代码的逻辑,也许是如许:

1、推断以后运转中的线程数是不是小于corePoolSize,是的话则挪用addWorker建立线程实行义务。

2、不知足1的前提,就把义务放入事情行列workQueue中。

3、若是义务胜利到场workQueue,推断线程池是不是是运转状况,不是的话先把义务移出事情行列,并挪用reject要领,运用谢绝战略谢绝该义务。线程若是黑白运转中,挪用addWorker建立一个新线程。

4、若是放入workQueue失利 (行列已满),则挪用addWorker建立线程实行义务,若是这时候建立线程失利 (addWorker传进去的第二个参数值是false,申明这类状况是以后线程数不小于maximumPoolSize),就会挪用reject(内部挪用handler)谢绝吸收义务。

全部实行流程用一张图片透露表现大抵以下:

以上就是execute要领的也许逻辑,接下来看看addWorker的要领完成。

addWorker要领

源码以下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        /**线程池状况不为SHUTDOWN时
        * 推断行列或许义务是不是为空,是的话返回false
        */.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            /*  这里能够看出core参数决议着运动线程数的巨细对照工具
            *   core为true透露表现与 corePoolSize巨细举行对照
            *   core为false透露表现与 maximumPoolSize巨细举行对照
            *   以后运动线程数大于对照工具就返回false
            */
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 实验增添workerCount,若是胜利,则跳出第一个for轮回
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 若是增添workerCount失利,则从新猎取ctl的值
            c = ctl.get();  // Re-read ctl
            // 若是以后的运转状况不即是rs,申明状况已被转变,返回第一个for轮回继承实行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //建立一个worker工具w
        w = new Worker(firstTask);
        //实例化w的线程t
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // workers是一个HashSet,生存着义务的worker工具
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

从代码中能够看出,addWorker要领的重要事情是在线程池中建立一个新的线程并实行,个中firstTask参数指定的是新线程须要实行的第一个义务,core参数决议于运动线程数的对照工具是corePoolSize照样maximumPoolSize。依据传进来的参数起首对线程池和行列的状况举行推断,知足前提就新建一个Worker工具,并实例化该工具的线程,末了启动线程。

Worker类

依据addWorker源码中的逻辑,我们能够发明,线程池中的每一个线程实在都是对应的Worker工具在保护的,以是我们有必要对Worker类一探终究,先看一下类的源码:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

从Worker类的组织函数能够看出,当实例化一个Worker工具时,Worker工具会把传进来的Runnable参数firstTask赋值给本身的同名属性,而且用线程工场也就是以后的ThreadFactory来新建一个线程。

-玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。-

同时,由于Worker完成了Runnable接口,以是当Worker类中的线程启动时,挪用的实际上是run()要领。run要领中挪用的是runWorker要领,我们来看下它的详细完成:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    //猎取第一个义务
    Runnable task = w.firstTask;
    w.firstTask = null;
    //许可中缀
    w.unlock(); // allow interrupts
    //是不是由于非常退出轮回的标记,processWorkerExit要领会对该参数做推断
    boolean completedAbruptly = true;
    try {
        //推断task是不是为null,是的话经由历程getTask()从行列中猎取义务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            /* 这里的推断重要逻辑是如许:
             * 若是线程池正在住手,那末就确保以后线程是中缀状况;
             * 若是不是的话,就要包管不是中缀状况
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //用于纪录义务实行前须要做哪些事,属于ThreadPoolExecutor类中的要领,                //是空的,须要子类详细完成
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //实行义务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks  ;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {     
        processWorkerExit(w, completedAbruptly);
    }
}

总结一下runWorker要领的运转逻辑:

1、经由历程while轮回赓续地经由历程getTask()要领从行列中猎取义务;

2、若是线程池正在住手状况,确保以后的线程是中缀状况,不然确保以后线程不中缀;

3、挪用task的run()要领实行义务,实行终了后须要置为null;

4、轮回挪用getTask()取不到义务了,跳出轮回,实行processWorkerExit()要领。

过完runWorker()的运转流程,我们来看下getTask()是怎样完成的。

getTask要领

getTask()要领的作用是从行列中猎取义务,下面是该要领的源码:

private Runnable getTask() {
    //纪录上次从行列猎取义务是不是超时
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //将workerCount减1
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        /*  timed变量用于推断线程的操纵是不是须要举行超时推断
         *  allowCoreThreadTimeOut不管它,默许是false
         *  wc > corePoolSize,以后线程是若是大于中心线程数corePoolSize
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            /* 依据timed变量推断,若是为true,挪用workQueue的poll要领猎取义务,
             * 若是在keepAliveTime时候内没有猎取到义务,则返回null;
             * timed为false的话,就挪用workQueue的take要领壅塞行列,            
             * 直到行列中有义务可取。
             */
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            //r为null,申明time为true,超时了,把timedOut也设置为true
            timedOut = true;
        } catch (InterruptedException retry) {
            //发作非常,把timedOut也设置为false,从新跑轮回
            timedOut = false;
        }
    }
}

getTask的代码看上去对照简朴,但实在内有天地,我们来重点剖析一下两个if推断的逻辑:

1、当进入getTask要领后,先推断以后线程池状况,若是线程池状况rs >= SHUTDOWN,再举行以下推断:

1)rs 的状况是不是大于STOP;2)行列是不是为空;

知足以上前提个中之一,就将workerCount减1并返回null,也就是透露表现行列中不再有义务。由于线程池的状况值是SHUTDOWN以上时,行列中不再许可增添新义务,以是上面两个前提知足一个都申明行列中的义务都取完了。

2、进入第二个if推断,这里的逻辑有点绕,但作用对照重要,是为了掌握线程池的有用线程数量,我们来详细剖析下代码:

wc > maximumPoolSize:推断以后线程数是不是大于maximumPoolSize,这类状况一样平常很少发作,除非是maximumPoolSize的巨细在该顺序实行的同时被举行设置,好比挪用ThreadPoolExecutor中的setMaximumPoolSize要领。

timed && timedOut:若是为true,透露表现以后的操纵须要举行超时推断,而且上次从行列猎取义务已超时。

wc > 1 || workQueue.isEmpty():若是事情线程大于1,或许壅塞行列是空的。

compareAndDecrementWorkerCount:对照并将线程池中的workerCount减1

在上文中,我们剖析execute要领的逻辑时相识到,若是以后线程池的线程数量超过了corePoolSize且小于maximumPoolSize,而且workQueue已满时,依然能够增添事情线程。

但挪用getTask()取义务的历程当中,若是超时没有猎取到义务,也就是timedOut为true的状况,申明workQueue已为空了,也就申明了以后线程池中不须要那末多线程来实行义务了,能够把多于corePoolSize数量的线程烧毁掉,也就是赓续的让义务被掏出,让线程数量保持在corePoolSize便可,直到getTask要领返回null。

而当getTask要领返回null后,runWorker要领中就会由于取不到义务而实行processWorkerExit()要领。

processWorkerExit要领

processWorkerExit要领的作用重如果对worker工具的移除,下面是要领的源码:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //是非常退出的话,实行顺序将workerCount数量减1
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount  = w.completedTasks;
        // 从workers的鸠合中移除worker工具,也就透露表现着从线程池中移除一个事情线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

至此,从executor要领最先的全部运转历程就终了了,总结一下该流程:

实行executor --> 新建Worker工具,并实例化线程 --> 挪用runWorker要领,经由历程getTask()猎取义务,并实行run要领 --> getTask()要领中赓续向行列取义务,并将workerCount数量减1,直至返回null --> 挪用processWorkerExit消灭worker工具。

用一张流程图透露表现以下所示 (图片来源于https://www.cnblogs.com/liuzhihu/p/8177371.html):

义务行列workQueue

前面我们屡次提到了workQueue,这是一个义务行列,用来寄存守候实行的义务,它是BlockingQueue

  • SynchronousQueue:直接提交的行列。这个行列没有容量,当吸收到义务的时候,会直接提交给线程处置惩罚,而不保存它。若是没有余暇的线程,就新建一个线程来处置惩罚这个义务!若是线程数量到达最大值,就会实行谢绝战略。以是,运用这个范例行列的时候,一样平常都是将maximumPoolSize一样平常指定成Integer.MAX_VALUE,制止轻易被谢绝。

  • ArrayBlockingQueue:有界的义务行列。须要给定一个参数来限定行列的长度,吸收到义务的时候,若是没有到达corePoolSize的值,则新建线程 (中心线程) 实行义务,若是到达了,则将义务放入守候行列。若是行列已满,则在总线程数不到maximumPoolSize的前提下新建线程实行义务,若大于maximumPoolSize,则实行谢绝战略。

  • LinkedBlockingQueue:无界的义务行列。该行列没有义务数量的限定,以是义务能够一向入队,晓得耗尽体系资本。当吸收义务,若是以后线程数小于corePoolSize,则新建线程处置惩罚义务;若是以后线程数即是corePoolSize,则进入行列守候。

义务谢绝战略

当线程池的义务行列已满而且线程数量到达maximumPoolSize时,关于新加的义务一样平常会接纳谢绝战略,一般有以下四种战略:

  1. AbortPolicy:直接抛出非常,这是默许战略;
  2. CallerRunsPolicy:用挪用者地点的线程来实行义务;
  3. DiscardOldestPolicy:抛弃壅塞行列中靠最前的义务,并实行以后义务;
  4. DiscardPolicy:直接抛弃义务;

线程池的封闭

ThreadPoolExecutor供应了两个要领,用于线程池的封闭,分别是shutdown()和shutdownNow():

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

代码逻辑就不逐一举行剖析了,总结一下两个要领的特性就是:

  • shutdown():不会马上住手线程池,而是要等一切义务缓存行列中的义务都实行完后才住手,但再也不会吸收新的义务
  • shutdownNow():马上住手线程池,并实验打断正在实行的义务,而且清空义务缓存行列,返回还没有实行的义务

ThreadPoolExecutor建立线程池实例

ThreadPoolExecutor的运转机制讲完了,接下来展现一下如何用ThreadPoolExecutor建立线程池实例,详细代码以下:

public static void main(String[] args) {
    ExecutorService service = new ThreadPoolExecutor(5, 10, 300, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(5));
    //用lambda表达式编写要领体中的逻辑
    Runnable run = () -> {
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName()   "正在实行");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };
    for (int i = 0; i < 10; i  ) {
        service.execute(run);
    }
    //这里肯定要做封闭
    service.shutdown();
}

上面的代码中,我们建立的ThreadPoolExecutor线程池的中心线程数为5个,以是,当挪用线程池实行义务时,同时运转的线程最多也是5个,实行main要领,输出效果以下:

pool-1-thread-3正在实行
pool-1-thread-1正在实行
pool-1-thread-4正在实行
pool-1-thread-5正在实行
pool-1-thread-3正在实行
pool-1-thread-2正在实行
pool-1-thread-1正在实行
pool-1-thread-4正在实行
pool-1-thread-5正在实行

看到出来,线程池确切只要5个线程在事情,也就是真正的完成了线程的复用,申明我们的ThreadPoolExecutor实例是有用的。

参考:

https://www.cnblogs.com/liuzhihu/p/8177371.html

https://www.cnblogs.com/dolphin0520/p/3932921.html

《实战Java:高并发顺序设计》

-玖富娱乐是一家为代理招商,直属主管信息发布为主的资讯网站,同时也兼顾玖富娱乐代理注册登录地址。