Java並發編程系列之十五:Executor框架


Java使用線程完成異步任務是很普遍的事,而線程的創建與銷毀需要一定的開銷,如果每個任務都需要創建一個線程將會消耗大量的計算資源,JDK 5之后把工作單元和執行機制區分開了,工作單元包括Runnable和Callable,而執行機制則由Executor框架提供。Executor框架為線程的啟動、執行和關閉提供了便利,底層使用線程池實現。使用Executor框架管理線程的好處在於簡化管理、提高效率,還能避免this逃逸問題——是指不完整的對象被線程調用。

Executor框架使用了兩級調度模型進行線程的調度。在上層,Java多線程程序通常把應用分解為多個任務,然后使用用戶調度框架Executor將這些任務映射為固定數量的線程;在底層,操作系統內核將這些線程映射到硬件處理器上。

Executor框架包括線程池,Executor,Executors,ExecutorService,CompletionService,Future,C
allable 等。

主線程首先通過Runnable或者Callable接口創建任務對象。工具類Executors可以把一個Runnable對象封裝為Callable對象(通過調用Executors.callable(Runnable task)實現),然后可以把Runnable對象直接交給ExecutorService執行,ExecutorService通過調用ExecutorService.execute(Runnable command)完成任務的執行;或者把Runnable對象或Callable對象交給ExecutorService執行,ExecutorService通過調用ExecutorService.submit(Runnable task)或者ExecutorService.submit(Callable task)完成任務的提交。在使用ExecutorService的submit方法的時候會返回一個實現Future接口的對象(目前返回的是FutureTask對象)。由於FutureTask實現了Runnable,也可以直接創建FutureTask,然后交給ExecutorService執行。

ExecutorService 接口繼承自 Executor 接口,它提供了更豐富的實現多線程的方法。比如可以調用 ExecutorService 的 shutdown()方法來平滑地關閉 ExecutorService,調用該方法后,將導致 ExecutorService 停止接受任何新的任務且等待已經提交的任務執行完成(已經提交的任務會分兩類:一類是已經在執行的,另一類是還沒有開始執行的),當所有已經提交的任務執行完畢后將會關閉 ExecutorService。

通過Executors工具類可以創建不同的線程池ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool。

FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads,ThreadFactory factory)

FixedThreadPool適用於為了滿足管理資源的需求,而需要限制當前線程數量的應用場景,它適用於負載比較重的服務器。

SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor(ThreadFactory factory)

SingleThreadExecutor適用於需要保證順序地執行各個任務,並且在任意時間點不會有多個線程在活動的場景。

CachedThreadPool

public static ExecutorService newCachedThreadPool()
public static ExecutorService newCachedThreadPool(ThreadFactory factory)

CachedThreadPool是大小無界的線程池,適用於執行很多的短期異步任務的小程序,或者負載比較輕的服務器。

ScheduledThreadPoolExecutor

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,ThreadFactory factory)

創建一個支持定時及周期性的任務執行的線程池,多數情況下可用來替代Timer類。ScheduledThreadPoolExecutor適用於需要在多個后台線程執行周期任務,同時為了滿足資源管理需求需要限制后台線程數量的應用場景。

Executor框架的最核心的類是ThreadPoolExecutor,它是線程池的實現類,主要由四個組件構成。

  1. corePool:核心線程池的大小
  2. maximumPool:最大線程池的大小
  3. BlockingQueue:用來暫時保存任務的工作隊列
  4. RejectedExecutionHandler:飽和策略。當ThreadPoolExecutor已經關閉或者ThreadPoolExecutor已經飽和時(是指達到了最大線程池的大小且工作隊列已滿),execute方法將要調用的Handler


使用Executor框架執行Runnable任務

package com.rhwayfun.concurrency;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Created by rhwayfun on 16-4-4.
*/

public class ExecutorRunnableTest {

static class Runner implements Runnable{
public void run() {
System.out.println(Thread.currentThread().getName() + " is called");
}
}

public static void main(String[] args){
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++){
cachedThreadPool.execute(new Runner());
}
cachedThreadPool.shutdown();
}
}

結果如下:

這里寫圖片描述

通過下面對CachedThreadPool的分析就能知道執行任務的時候首先會從線程池選擇空閑的線程執行任務,如果沒有沒有空閑的線程就會創建一個新的線程執行任務。這里出現同一個線程執行兩遍的原因在於第一次執行任務的空閑線程執行完任務后不會馬上終止,認識等待60秒才會終止。

使用Executor框架執行Callable任務

Runnable 任務沒有返回值而 Callable 任務有返回值。並且 Callable 的call()方法只能通過 ExecutorService 的 submit(Callable task) 方法來執行,並且返回一個 Future(目前是FutureTask),是表示任務等待完成的 Future。如果需要得到Callable執行返回的結果,可以通過吊桶FutureTask的get方法得到。

下面的代碼演示使用Executor框架執行Callable任務:

package com.rhwayfun.concurrency;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
* Created by rhwayfun on 16-4-4.
*/

public class ExecutorCallableTest {

/**
* Callable任務
*/

static class Runner implements Callable<String> {

private String runId;

public Runner(String runId) {
this.runId = runId;
}

public String call() throws Exception {
System.out.println(Thread.currentThread().getName() + " call method is invoked!");
return Thread.currentThread().getName() + " call method and id is " + runId;
}
}

public static void main(String[] args) {
//線程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
//接收Callable任務的返回結果
List<Future<String>> futureTaskList = new ArrayList<Future<String>>();

for (int i = 0; i < 5; i++) {
Future<String> future = cachedThreadPool.submit(new Runner(String.valueOf(i)));
futureTaskList.add(future);
}

//遍歷線程執行的返回結果
for (Future f : futureTaskList) {
try {
//如果任務沒有完成則忙等待
while (!f.isDone()) {}
System.out.println(f.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
//關閉線程池,不再接收新的任務
cachedThreadPool.shutdown();
}
}
}
}

程序的運行結果如下:

這里寫圖片描述

submit 方法也是首先選擇空閑線程來執行任務,如果沒有,才會創建新的線程來執行任務。如果 Future 的返回尚未完成則 get()方法會阻塞等待直到 Future 完成返回。

FixedThreadPool詳解

創建FixedThreadPool的源碼如下:

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

其corePoolSize和maximumPoolSize都被設為nThreads的值。當線程池中的線程數大於corePoolSize時,keepAliveTime為多余的空閑線程等待新任務的最長時間,超過這個時間后多余的線程將被終止。具體在FixedThreadPool的執行過程如下:

  1. 如果當前運行的線程數少於corePoolSize,就創建新的線程執行任務
  2. 在線程池如果當前運行的線程數等於corePoolSize時,將任務加入到LinkedBlockingQueue等待執行
  3. 線程執行完1中的任務后,會在循環中反復從LinkedBlockingQueue獲取任務來執行

由於LinkedBlockingQueue使用的無界隊列,所以線程池中線程數不會超過corePoolSize,因此不斷加入線程池中的任務將被執行,因為不會馬上被執行的任務都加入到LinkedBlockingQueue等待了。

CachedThreadPool詳解

CachedThreadPool是一個根據需要創建線程的線程池。創建一個CachedThreadPool的源碼如下:

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

由源碼可以看出,CachedThreadPool的corePoolSize為0,maximumPoolSize為Integer.MAX_VALUE,keepAliveTime為60L,意味着多余的空閑線程等待新任務的執行時間為60秒。

CachedThreadPool使用沒有容量的SynchronousQueue作為線程池的工作隊列(SynchronousQueue是一個沒有容量的阻塞隊列,每個插入操作必須等待另一個線程的對應移除操作),但是CachedThreadPool的maximumPool是無界的。這就意味着如果線程的提交速度高於線程的處理速度,CachedThreadPool會不斷創建線程,極端情況是因為創建線程過多耗盡CPU和內存資源。

CachedThreadPool的執行過程如下:

  1. 首先執行SynchronousQueue的offer方法。如果maximumPool有空閑線程正在執行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主線程執行offer操作與空閑線程的poll操作配對成功,主線程把任務交給空閑線程執行,否則執行2
  2. 如果maximumPool為空或者maximumPool沒有空閑線程時,CachedThreadPool將會創建一個新線程執行任務
  3. 在步驟2新創建的線程將任務執行完后,將會在SynchronousQueue隊列中等待60秒,如果60秒內主線程提交了新任務,那么將繼續執行主線程提交的新任務,否則會終止該空閑線程。

ScheduledThreadPoolExecutor詳解

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,主要用來在給定的延遲之后執行任務,或者定期運行任務。Timer類也具有類似的功能,Timer對應的單個的后台線程,而ScheduledThreadPoolExecutor可以在構造函數內指定多個對應的后台線程。

ScheduledThreadPoolExecutor為了支持周期性任務的執行,使用了DelayQueue作為任務隊列。ScheduledThreadPoolExecutor會把待調度的任務(該任務是ScheduledFutureTask)放到DelayQueue中,線程池中的線程從DelayQueue中獲取要執行的定時任務並執行。

ScheduledFutureTask包含了3個變量:

  1. long型變量time,是任務具體的執行時間
  2. long型變量sequenceNumber,是這個任務被添加到ScheduledThreadPoolExecutor中的序號
  3. long型成員period,表示任務執行的間隔周期

下面是ScheduledThreadPoolExecutor具體的執行步驟:

  1. 線程從DelayQueue中獲取已經到期的ScheduledFutureTask。到期任務是指time大於等於當前時間的任務
  2. 線程執行這個過期任務
  3. 線程修改這個任務的time變量為下次執行的時間(當前時間加上間隔時間)
  4. 線程把修改后的任務放回DelayQueue,過期后會被重新執行

注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2021 ITdaan.com