更新時間:2020-05-28 16:14:44 來源:動力節點 瀏覽2920次
ThreadPoolExecutor里面使用到JUC同步器框架AbstractQueuedSynchronizer、大量的位操作、CAS操作。ThreadPoolExecutor提供了固定活躍線程、額外的線程、任務隊列以及拒絕策略這幾個重要的功能。下面我們一起來看看Java線程池ThreadPoolExecutor的原理解析。
1.JUC同步器框架
ThreadPoolExecutor里面使用到JUC同步器框架,主要用于四個方面:
(1)全局鎖mainLock成員屬性,是可重入鎖ReentrantLock類型,主要是用于訪問工作線程Worker集合和進行數據統計記錄時候的加鎖操作。
(2)條件變量termination,Condition類型,主要用于線程進行等待終結awaitTermination()方法時的帶期限阻塞。
(3)任務隊列workQueue,BlockingQueue類型,任務隊列,用于存放待執行的任務。
(4)工作線程,內部類Worker類型,是線程池中真正的工作線程對象。
2.核心線程
這里先參考ThreadPoolExecutor的實現并且進行簡化,實現一個只有核心線程的線程池,要求如下:暫時不考慮任務執行異常情況下的處理;任務隊列為無界隊列;線程池容量固定為核心線程數量;暫時不考慮拒絕策略。
public?class?CoreThreadPool?implements?Executor?{
????private?BlockingQueue<Runnable>?workQueue;
????private?static?final?AtomicInteger?COUNTER?=?new?AtomicInteger();
????private?int?coreSize;
????private?int?threadCount?=?0;
????public?CoreThreadPool(int?coreSize)?{
????????this.coreSize?=?coreSize;
????????this.workQueue?=?new?LinkedBlockingQueue<>();
????}
????@Override
????public?void?execute(Runnable?command)?{
????????if?(++threadCount?<=?coreSize)?{
????????????new?Worker(command).start();
????????}?else?{
????????????try?{
????????????????workQueue.put(command);
????????????}?catch?(InterruptedException?e)?{
????????????????throw?new?IllegalStateException(e);
????????????}
????????}
????}
????private?class?Worker?extends?Thread?{
????????private?Runnable?firstTask;
????????public?Worker(Runnable?runnable)?{
????????????super(String.format("Worker-%d",?COUNTER.getAndIncrement()));
????????????this.firstTask?=?runnable;
????????}
????????@Override
????????public?void?run()?{
????????????Runnable?task?=?this.firstTask;
????????????while?(null?!=?task?||?null?!=?(task?=?getTask()))?{
????????????????try?{
????????????????????task.run();
????????????????}?finally?{
????????????????????task?=?null;
????????????????}
????????????}
????????}
????}
????private?Runnable?getTask()?{
????????try?{
????????????return?workQueue.take();
????????}?catch?(InterruptedException?e)?{
????????????throw?new?IllegalStateException(e);
????????}
????}
????public?static?void?main(String[]?args)?throws?Exception?{
????????CoreThreadPool?pool?=?new?CoreThreadPool(5);
????????IntStream.range(0,?10)
????????????????.forEach(i?->?pool.execute(()?->
????????????????????????System.out.println(String.format("Thread:%s,value:%d",?Thread.currentThread().getName(),?i))));
????????Thread.sleep(Integer.MAX_VALUE);
????}
}
某次運行結果如下:
Thread:Worker-0,value:0
Thread:Worker-3,value:3
Thread:Worker-2,value:2
Thread:Worker-1,value:1
Thread:Worker-4,value:4
Thread:Worker-1,value:5
Thread:Worker-2,value:8
Thread:Worker-4,value:7
Thread:Worker-0,value:6
Thread:Worker-3,value:9
設計此線程池的時候,核心線程是懶創建的,如果線程空閑的時候則阻塞在任務隊列的take()方法,其實對于ThreadPoolExecutor也是類似這樣實現,只是如果使用了keepAliveTime并且允許核心線程超時則會使用BlockingQueue#poll進行輪詢代替永久阻塞。
3.其他附加功能
構建ThreadPoolExecutor實例的時候,需要定義maximumPoolSize(線程池最大線程數)和corePoolSize(核心線程數)。當任務隊列是有界的阻塞隊列,核心線程滿負載,任務隊列已經滿的情況下,會嘗試創建額外的maximumPoolSize-corePoolSize個線程去執行新提交的任務。當ThreadPoolExecutor這里實現的兩個主要附加功能是:
(1)一定條件下會創建非核心線程去執行任務,非核心線程的回收周期(線程生命周期終結時刻)是keepAliveTime,線程生命周期終結的條件是:下一次通過任務隊列獲取任務的時候并且存活時間超過keepAliveTime。
(2)提供拒絕策略,也就是在核心線程滿負載、任務隊列已滿、非核心線程滿負載的條件下會觸發拒絕策略。
以上就是動力節點java培訓機構的小編針對“Java實例教程:ThreadPoolExecutor的原理解析”的內容進行的回答,希望對大家有所幫助,如有疑問,請在線咨詢,有專業老師隨時為你服務。
0基礎 0學費 15天面授
有基礎 直達就業
業余時間 高薪轉行
工作1~3年,加薪神器
工作3~5年,晉升架構
提交申請后,顧問老師會電話與您溝通安排學習