《Java核心技術》---- 多線程


API:

java.lang.Object

  • void notifyAll() 解除那些在該對象上調用wait方法的線程的阻塞狀態。該方法只能在同步方法或同步塊內部調用。如果當前線程不是對象鎖的持有者,該方法拋出一個IllegalMonitorStateException異常。
  • void nofity() 隨機選擇一個在該對象上調用wait方法的線程,解除其阻塞狀態。
  • void wait() 導致線程進入等待狀態直到它被通知。該方法只能在同步方法或同步塊內部調用。如果當前線程不是對象鎖的持有者,該方法拋出一個IllegalMonitorStateException異常。
  • void wait(long millis)
  • void wait(long millis, int nanos) 導致線程進入等待狀態直到它被通知或經過指定的時間。

 

java.lang.Thread

  • static void sleep(long millis) 休眠給定的毫秒數。
  • Thread(Runnable target) 構造一個新線程,用於調用給定target的run()方法。
  • void start() 啟動這個線程,將引發調用run()方法。這個方法將立即返回,並且新線程將並行運行。
  • void run()調用關聯Runnable的run方法。
  • void interrupt() 向線程發送中斷請求。線程的中斷狀態將被設置為true。如果目前該線程被一個sleep調用阻塞,那么, InterruptedException異常被拋出。
  • static boolean interrupted() 測試當前線程(即正在執行這一命令的線程)是否被中斷。這是一個靜態方法。這一調用會產生副作用,它將當前的中斷狀態重置為false.
  • boolean isInterrupted() 測試線程是否被終止。不像靜態的中斷方法,這一調用不改變線程的中斷狀態。
  • static Thread currentThread() 返回代表當前執行線程的Thread對象。
  • void setPriority(int newPriority) 設置線程優先級,一般使用Thread.NORM_PRIORITY.
  • static void MIN_PRIORITY 最小優先級,值為1。
  • static int NORM_PRIORITY 值為5。
  • static int MAX_PRIORITY 最大優先級,值為10。
  • static void yield() 導致當前執行線程處於讓步狀態。如果有其他的可運行線程具有至少與此線程同樣高的優先級,那么這些線程接下來會被調度。注意,這是一個靜態方法。
  • void setDaemon(boolean isDaemon) 標識該線程為守護線程或用戶線程,這一方法必須在線程啟動之前調用。
  • static void setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler handler) 設置未捕獲異常的默認處理器。
  • static Thread.UncaughtExceptionHandler getDefaultUncaughtExceptionHandler() 獲取未捕獲異常的默認處理器。
  • void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler handler) 設置未捕獲異常的處理器
  • Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() 獲取未捕獲異常的處理器。

 

java.lang.Thread.UncaughtExceptionHandler

  • void uncaughtException(Thread t, Throwable e) 當一個線程因為捕獲異常而終止,但規定要將客戶報告記錄到日志中。

 

java.lang.ThreadLocal<T>

  • T get() 得到這個線程的當前值。如果是首次調用get,會調用initialize()來得到這個值。
  • protected initialize() 應該覆蓋這個方法來提供一個初始值。默認情況下,這個方法返回null。
  • void set<T t> 為這個線程設置一個新值。
  • void remove() 刪除對應這個線程的值。

java.lang.Runnable

  • void run() 必須覆蓋這個方法,並在這個方法中提供所要執行的任務指令。

 

java.util.concurrent.ArrayBlockingQueue<E>

  • ArrayBlockingQueue(int capacity)
  • ArrayBlockingQueue(int capacity, boolean fair) 構造一個帶有指定的容量和公平性設置的阻塞隊列。該隊列用循環數組實現。

 

java.util.concurrent.LinkedBlockingQueue<E>

  • LinkedBlockingQueue() 構造一個無上限的阻塞隊列或雙向隊列,用鏈表實現。
  • LinkedBlockingQueue(int capacity)
  • LinkedBlockingQueue(int capacity, boolean fair) 構造一個帶有指定的容量的阻塞隊列或雙向隊列。該隊列用鏈表實現。

 

java.util.concurrent.DelayQueue<E extends Delayed>

  • DelayQueue() 構造一個包含Delayed元素的無界的阻塞時間有限的阻塞隊列。只有那些延遲已經超過時間的元素可以從隊列中移出。
  • getDelay(TimeUnit unit) 得到該對象的延遲,用給定的時間單位進行度量。

java.util.concurrent.PriorityBlockingQueue<E>

  • PriorityBlockingQueue()
  • PriorityBlockingQueue(int initialCapacity)
  • PriorityBlockingQueue(int initialCapacity, Comparator<? super E>) 構造一個無邊界阻塞優先隊列,用堆實現。

 

java.util.concurrent.BlockingQueue<E>

  • void put(E element) 添加元素,在必要時阻塞
  • E take() 移出並返回頭元素,必要時阻塞
  • boolean offer(E element, long time, TimeUnit unit) 添加給定的元素,如果成功返回true,必要時阻塞,直至元素已經被添加或超時
  • E poll(long time, TimeUnit unit) 移出並返回頭元素,必要時阻塞,直至元素可用或超時完成。失敗時返回null。

 

java.util.concurrent.BlockingDeque<E>

  • void putFirst(E element)
  • void putLast<E element) 添加元素,必要時阻塞
  • E takeFirst()
  • E takeLast() 移出並返回頭元素或尾元素,必要時阻塞
  • boolean offerFirst(E element, long time, TimeUnit unit)
  • boolean offerLast(E element, long time, TimeUnit unit) 添加給定元素,成功返回true,必要時阻塞直至元素被添減或超時。
  • E pollFirst(long time, TimeUnit unit)
  • E pollLast(long time, TimeUnit unit) 移動並返回頭元素或尾元素,必要時阻塞,直到元素可用或超時,失敗時返回null。

java.util.concurrent.TransferQueue<E>

  • void transfer(E element)
  • boolean tryTransfer(E element, long time, TimeUnit unit) 傳輸一個值,或者嘗試在給定的超時時間內傳輸這個值,這個調用將阻塞,直到另一個線程將元素刪除。第二個方法會在調用成功時返回true。

java.util.concurrent.locks.Lock

  • void lock() 獲取這個鎖;如果鎖同時被另一個線程擁有則發生阻塞。
  • void unlock() 釋放這個鎖。
  • Condition newCondition() 返回一個與該鎖相關的條件對象。
  • boolean tryLock() 嘗試獲得鎖而沒有發生阻塞;如果成功返回真。
  • boolean tryLock(long time, TimeUnit unit) 嘗試獲得鎖,阻塞時間不會超過給定的值,如果成功返回true。
  • void lockInterruptibly() 獲得鎖,但是會不確定地發生阻塞。如果線程被中斷,拋出一個InterruptedException.

 

java.util.concurrent.locks.Condition

  • void await() 將該線程放到條件的等待集中。
  • void signalAll() 解除該條件的等待集中的所有線程的阻塞狀態。
  • void signal() 從該條件的等待集中隨機地選擇一個線程,解除其阻塞狀態。
  • boolean await(long time, TimeUnit unit) 進入該條件的等待集,直到線程從等待集移出或等待了制定的時間后解除阻塞。如果因為等待時間到了而返回,會返回false,否則返回true。
  • void awaitUninterruptibly() 進入該條件的等待集,直到線程從等待集移出才解除阻塞。如果線程被中斷,該方法不會拋出InterruptedException異常。

 

java.util.concurrent.locks.ReentrantLock

  • ReentrantLock() 構建一個可以被用來保護臨界區的可重入鎖。
  • ReentrantLock(boolean fair) 構建一個帶有公平策略的鎖。一個公平鎖偏愛等待時間最長的線程。但是,這一公平的保證將大大降低性能。所以,默認情況下,鎖沒有被強制為公平的。

 

java.util.concurrent.locks.ReentrantReadWriteLock

  • Lock readLock() 得到一個可以被多個讀操作共用的讀鎖,但會排斥所有寫操作。
  • Lock writeLock() 得到一個寫鎖,排斥所有其他的讀操作和寫操作。

java.utl.concurrent.ThreadLocalRandom

  • static ThreadLocalRandom current() 返回特定於當前線程的Random類實例。eg. int random = ThreadLocalRandom.current().nextInt(upperBound);

 

並發編程中的三個概念:

1,原子性,即一個操作或者多個操作 要么全部執行並且執行的過程不會被任何因素打斷,要么就都不執行。

2,可見性,指當多個線程訪問同一個變量時,一個線程修改了這個變量的值,其他線程能夠立即看得到修改的值。

3,有序性,即程序執行的順序按照代碼的先后順序執行。

 

一,中斷線程

  可以調用interrupt()方法發送中斷請求。但是,如果線程被阻塞,就無法檢測中斷狀態。這是產生InterruptedException異常的地方。當在一個被阻賽的線程(調用sleep或wait)上調用interrupt方法時,阻塞調用將會被InterruptedException中斷。

發出中斷請求並不意味着線程立即會終止,發送請求只是要引起線程注意,要求中斷的線程可以決定如何處理這個中斷請求。某些線程會處理完異常后,繼續執行,而不理會中斷。但是更普遍的情況是,線程將簡單地將中斷作為一個終止的請求。即如下代碼:

public void run() {
    try {
        ...
        while(!Thread.currentThread().isInterrupted() && more work to check) {
            do more work;
        }
    } catch (InterruptedException ex) {
        // Thread was interrupted during sleep or wait;
    } finally {
        //cleanup, if required;
    }
    // exiting the run method terminates the thread
}

如果在線程調用sleep方法后,isInterrupted檢測沒有必要也沒用。如果在中斷狀態調用sleep方法,它也不會休眠。相反,它將清除這一狀態並拋出InterruptedException。

 

二,線程狀態

  • New(新創建)
  • Runnable(可運行)
  • Blocked(被阻塞)
  • Waiting(等待)
  • Timed waiting(計時等待)
  • Terminated(被終止)

 

1,新創建線程

  new Thread(r), 當一個線程處於新建狀態時,程序還沒有開始運行線程中的代碼。

2,可運行線程

  一旦調用start方法,線程處於runnable狀態。一個可運行的線程可能正在運行也可能沒有運行。

一旦一個線程開始運行,它不必始終保持運行。搶占式調度系統給每一個可運行線程一個時間片來執行任務,當時間片用完,操作系統剝奪該線程的運行權,並給另一個線程運行機會。

3,被阻塞線程和等待線程

  當線程處於被阻塞或等待狀態時,它暫時不活動。

  進入被阻塞或等待狀態的情況:

  •   當一個線程試圖獲取一個內部的對象鎖,而該鎖被其他線程持有,則該線程進入阻塞狀態。當所有其他線程釋放該鎖,並且線程調度器允許該線程持有這把鎖時,該線程將變成非阻塞狀態。
  •       When the thread waits for another thread to notify the scheduler of a condition, it enters the waiting state.
  •   有一些方法有一個超時參數,調用它們導致線程進入計時等待狀態。這一狀態將一直保持到超時期滿或者接收到適當的通知。這些方法有:Thread.sleep(), Object.wait(), Thread.join(), Lock.tryLock以及Condition.await()

 

三,線程屬性

1,線程優先級

在java中,每個線程有一個優先級。默認情況下,一個線程繼承它的父線程的優先級。可以使用setPriority方法提高或降低任何一個線程的優先級。可以將優先級設置為MIN_PRIORITY(在Thread類中定義為1)與MAX_PRIORITY(定義為10)之間的任何值。NORM_PRIORITY被定義為5。

2,守護線程

在Java中有兩類線程:用戶線程 (User Thread)、守護線程 (Daemon Thread)。

所謂守護 線程,是指在程序運行的時候在后台提供一種通用服務的線程,比如垃圾回收線程就是一個很稱職的守護者,並且這種線程並不屬於程序中不可或缺的部分。因 此,當所有的非守護線程結束時,程序也就終止了,同時會殺死進程中的所有守護線程。反過來說,只要任何非守護線程還在運行,程序就不會終止。

用戶線程和守護線程兩者幾乎沒有區別,唯一的不同之處就在於虛擬機的離開:如果用戶線程已經全部退出運行了,只剩下守護線程存在了,虛擬機也就退出了。 因為沒有了被守護者,守護線程也就沒有工作可做了,也就沒有繼續運行程序的必要了。

可以通過調用 t.setDaemon(true) 將線程轉換為守護線程。但需要注意以下幾點: 

(1) thread.setDaemon(true)必須在thread.start()之前設置,否則會跑出一個IllegalThreadStateException異常。你不能把正在運行的常規線程設置為守護線程。 

(2) 在Daemon線程中產生的新線程也是Daemon的。

(3) 守護線程應該永遠不去訪問固有資源,如文件、數據庫,因為它會在任何時候甚至在一個操作的中間發生中斷。

3,未捕獲異常處理器

線程的run方法不能拋出任何被檢測的異常,也就是說各個線程需要自己把自己的checked exception處理掉。這一點是通過java.lang.Runnable.run()方法聲明(因為此方法聲明上沒有throw exception部分)進行了約束。

但是,線程還是有可能拋出unchecked exception, 當這類異常被拋出時,線程就會終結,而對於朱線程和其他線程完全不受影響,且完全感知不到某個線程拋出的異常。在Java中,線程方法的異常,都應該在先車觀念代碼run方法內進行try-catch並處理掉,換句話說,我們不能捕獲從線程中逃逸的異常。

不過,在線程死亡之前,異常被傳遞到一個用於未被捕獲異常的處理器。該處理器必須屬於一個實現Thread.UncaughtExceptionHandler接口的類。這個接口只有一個方法:

void uncaughtException(Thread t, Throwable e)

可以用setUncaughtExceptionHandler方法為任何線程安裝一個處理器。也可以用Thread類的靜態方法setDefaultUncaughtExceptionHandler為所有線程安裝一個默認的處理器。

package com.ivy.thread;

import java.lang.Thread.UncaughtExceptionHandler;

public class ThreadExceptionTest {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Thread t = new Thread(new ExceptionThread());
        t.setUncaughtExceptionHandler(new MyUncheckedExceptionhandler());
        t.start();
    }

}

class MyUncheckedExceptionhandler implements UncaughtExceptionHandler {

    @Override
    public void uncaughtException(Thread t, Throwable e) {
        // TODO Auto-generated method stub
        System.out.println("caught exception:" + e);
    }
    
    
}

class ExceptionThread implements Runnable {

    @Override
    public void run() {
        // TODO Auto-generated method stub
        throw new RuntimeException("throw runtime exception");
    }
    
}

 

四,同步

前題:

package com.ivy.thread.unsynch;

public class Bank {

    private final double[] accounts;
    
    public Bank(int n, double initialBalance) {
        accounts = new double[n];
        for (int i=0; i< accounts.length; i++) {
            accounts[i] = initialBalance;
        }
    }
    
    public void transfer(int from, int to, double amount) {
        if (accounts[from] < amount) 
            return;
        System.out.println(Thread.currentThread());
        accounts[from] -= amount;
        System.out.printf("%10.2f from %d to %d", amount, from, to);
        accounts[to] += amount;
        System.out.printf(" Total Balance : %10.2f%n", getTotalBalance());
    }
    
    public double getTotalBalance() {
        double sum = 0;
        
        for(double a : accounts) {
            sum += a;
        }
        return sum;
    }
    
    public int size() {
        return accounts.length;
    }
}
package com.ivy.thread.unsynch;

public class TransferRunnable implements Runnable{

    private Bank bank;
    private int fromAccount;
    private double maxAmount;
    private int DELAY = 10;
    
    public TransferRunnable(Bank b, int from, double max) {
        bank = b;
        fromAccount = from;
        maxAmount = max;
    }

    @Override
    public void run() {
        try {
            while(true) {
                int toAccount = (int)(bank.size() * Math.random());
                double amount = maxAmount * Math.random();
                bank.transfer(fromAccount, toAccount, amount);
                Thread.sleep((int)(DELAY * Math.random()));
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
    }
}
package com.ivy.thread.unsynch;

public class UnsynchBankTest {

    public static final int NACCOUNTS = 100;
    public static final double INITIAL_BALANCE = 1000;
    
    public static void main(String[] args) {
        // TODO Auto-generated method stub

        Bank b = new Bank(NACCOUNTS, INITIAL_BALANCE);
        for (int i=0; i<NACCOUNTS; i++) {
            TransferRunnable r = new TransferRunnable(b, i, INITIAL_BALANCE);
            Thread t = new Thread(r);
            t.start();
        }
    }

}

1,競爭條件

當兩個線程試圖同時更新同一個賬戶的時候,就出現了競爭條件,結果會有誤差,原因是操作不是原子性的。

Java提供了兩種防止代碼塊受並發訪問的干擾的方式:

  • synchronized關鍵字
  • 使用鎖和條件對象

2,鎖對象

myLock.lock();
try {
   // do something
}
finally {
    myLock.unlock();
}

這一結構確保任何時刻只有一個線程進入臨界區。一旦一個線程持有了鎖對象,其他任何線程都無法通過lock語句。當其他線程調用lock時,它們被阻塞,直到第一個線程釋放鎖對象。

使用鎖來保護Bank類的transfer方法:

public class Bank {

    private Lock bankLock = new ReentrantLock();
    ...
    
    public void transfer(int from, int to, double amount) {
        bankLock.lock();
        try {
            if (accounts[from] < amount) 
                return;
            System.out.println(Thread.currentThread());
            accounts[from] -= amount;
            System.out.printf("%10.2f from %d to %d", amount, from, to);
            accounts[to] += amount;
            System.out.printf(" Total Balance : %10.2f%n", getTotalBalance());
        }
        finally {
            bankLock.unlock();
        }
    }
    
    ...
}

每個Bank對象有自己的ReentrantLock對象,如果兩個線程試圖訪問同一個Bank對象,那么鎖以串行方式提供服務。但是如果兩個線程訪問不同的Bank對象,每個線程得到不同的鎖對象,兩個線程都不會發生阻塞。

鎖是可重入的,因為線程可以重復地獲得已經持有的鎖。鎖保持一個持有計數來跟蹤對lock方法的嵌套調用。線程在每一次調用lock都要調用unlock來釋放鎖。由於這一特性,被一個鎖保護的代碼可以調用另一個使用相同的鎖的方法。

3,條件對象

線程進入臨界區,卻發現在某一條件滿足之后它才能執行。要使用一個條件對象來管理那些已經獲得了一個鎖但是卻不能做有用工作的線程。條件對象經常被稱為條件變量。

當帳戶沒有足夠余額時,需要等待直到另一個線程向帳戶中注入資金。但是,這一線程剛剛獲得了對bankLock的排他性訪問,因此別的線程沒有進行存款操作的機會。所以我們需要條件對象來解決這個問題。

一個鎖對象可以有一個或多個相關的條件對象。可以用newCondition方法獲得一個條件對象。如下:

class Bank {
    private Condition sufficientFunds;
    
    public Bank() {
        sufficientFunds = bankLock.newCondition();
    }
}

如果A線程在運行transfer()發現余額不足時,會調用sufficientFunds.await(),這時A線程就被阻塞,並且放棄了鎖。於是另一個線程B就可以擁有這把鎖,並進行增加帳戶余額的操作。

等待獲得鎖的線程B和調用await()方法的線程A存在本質上的不同。一旦一個線程(例如A線程)調用了await()方法,它進入該條件的等待集。當鎖可用時,該線程A不能馬上解除阻塞。相反,它處於阻塞狀態,直到另一個線程(線程B)調用同一條件上的signalAll方法為止。

所以當B線程轉帳時,應該調用sufficientFunds.signalAll();來重新激活因為這一條件而等待的所有線程(線程A)。當這些線程從等待集當中移出時,他們再次成為可運行的,調度器將再次激活它們。同時,它們會試圖重新進入原來的對象。一旦鎖可用,它們中的某個將從await調用返回,獲得該鎖並從被阻塞的地方繼續執行。

至關重要的是最終需要某個其他線程調用signalAll方法。當一個線程調用await方法時,它沒辦法重新激活其自身,只能依賴於其他線程。如果沒有其他線程來重新激活等待的線程,它就永遠不再運行了。一般,在對象的狀態有可能使等待線程的方向改變時調用signalAll()比較恰當。

public class Bank {

    private Lock bankLock = new ReentrantLock();
    private Condition sufficientFunds = bankLock.newCondition();
    
        ...
    public void transfer(int from, int to, double amount) {
        bankLock.lock();
        try {
            while (accounts[from] < amount) {
                sufficientFunds.await();
            }
            System.out.println(Thread.currentThread());
            accounts[from] -= amount;
            System.out.printf("%10.2f from %d to %d", amount, from, to);
            accounts[to] += amount;
            System.out.printf(" Total Balance : %10.2f%n", getTotalBalance());
            sufficientFunds.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            bankLock.unlock();
        }
    }
    
    ...
}
    

注意調用signalAll不會立即激活一個等待線程。它僅僅解除等待線程的阻塞,以便這些線程可以在當前線程退出同步方法之后,通過競爭實現對對象的訪問。

另一個方法signal(), 是隨機解除等待集中某個線程的阻塞狀態。這比解除所有線程的阻塞更加有效,但也存在危險。如果隨機選擇的線程發現自己仍然不能運行,那么它再次被阻塞。如果沒有其他線程再次調用signal,那么系統就死鎖了。

當一個線程擁有某個條件的鎖時,它僅僅可以在該條件上調用await/signalAll或signal方法。

總結一下:

  • 鎖用來保護代碼片斷,任何時刻只能由一個線程執行被保護的代碼。
  • 鎖可以管理試圖進入被保護代碼段的線程。
  • 鎖可以擁有一個或多個相關的條件對象。
  • 每個條件對象管理那些已經進入被保護的代碼段但還不能運行的線程。

 

4,synchronized關鍵字

Java中的每個對象都有一個內部鎖。如果一個方法用synchronized關鍵字聲明,那么對象的鎖將保護整個方法。也就是說,要調用該方法,線程必須獲得內部的鎖對象。

換句話說,

public synchronized void method() {
    method body;
}

等價於:

public void method() {
    this.intrinsicLock.lock();
    try {
        method body;
    }
    finally{
        this.intrinsicLock.unlock();
    }
}

內部對象鎖只有一個相關條件。wait()方法添加一個線程到等待集中,nontifyAll/notify方法解除等待線程的阻塞狀態。

改進后的transfer()方法: 

public synchronized void transfer2(int from, int to, double amount) {
        bankLock.lock();
        try {
            while (accounts[from] < amount) {
                wait();
            }
            System.out.println(Thread.currentThread());
            accounts[from] -= amount;
            System.out.printf("%10.2f from %d to %d", amount, from, to);
            accounts[to] += amount;
            System.out.printf(" Total Balance : %10.2f%n", getTotalBalance());
            notifyAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            bankLock.unlock();
        }
    }

 

5,同步阻塞

每個Java對象有一個鎖。線程可以通過調用同步方法獲得鎖。還可以通過進入一個同步阻塞獲得鎖。

當線程進入如下形式的阻塞,就會獲得obj的鎖。

synchronized(obj) {
    critical section;
}

例如: 

public class Bank {
    private final double[] accounts;
    private Object lock = new Object();

    ...
    
    public void transfer(int from, int to, int amount) {
        synchronized(lock) {
            accounts[from] -= amount;
            accounts[to] += amount;
        }
        System.out.printf(" Total Balance : %10.2f%n", getTotalBalance());
    }
    
    ...

}

lock對象被創建僅僅是用來使用每個Java對象持有的鎖。通過使用一個對象的鎖來實現程序控制的原子操作。

 

6,監視器

監視器具有的特性:

  • 監視器是只包含私有域的類。
  • 每個監視器類的對象有一個相關的鎖。
  • 使用該鎖對所有的方法進行加鎖。換句話說,如果調用obj.method(),那么obj對象的鎖是在方法調用開始時自動獲得,並且當方法返回時自動釋放該鎖。因為所有的域是私有的,這樣的安排可以確保一個線程在對對象操作時,沒有其他線程能訪問該域。
  • 該鎖可以有任意多個相關條件。

 

7,Volatile關鍵字

i,volatile關鍵字的兩層語義:

一旦一個共享變量(類的成員變量,類的靜態成員變量)被volatile修飾后,就具備了兩層語義:

  • 保證了不同線程對這個變量進行操作時的可見性,即一個線程修改了某個變量的值,這個新值對其他線程來說是立即可見的。
  • 禁止進行指令重排序。

先看一段代碼,假如線程1先執行,線程2后執行:

//線程1
boolean stop = false;
while(!stop){
	doSomething();
}

//線程2
stop = true;

 這段代碼是很典型的一段代碼,很多人在中斷線程時可能都會采用這種標記辦法。但是事實上,這段代碼會完全運行正確么?即一定會將線程中斷么?不一定,也許在大多數時候,這個代碼能夠把線程中斷,但是也有可能會導致無法中斷線程(雖然這個可能性很小,但是只要一旦發生這種情況就會造成死循環了)。

下面解釋一下這段代碼為何有可能導致無法中斷線程。在前面已經解釋過,每個線程在運行過程中都有自己的工作內存,那么線程1在運行的時候,會將stop變量的值拷貝一份放在自己的工作內存當中。

那么當線程2更改了stop變量的值之后,但是還沒來得及寫入主存當中,線程2轉去做其他事情了,那么線程1由於不知道線程2對stop變量的更改,因此還會一直循環下去。但是用volatile修飾之后就變得不一樣了:

  • 使用volatile關鍵字會強制將修改的值立即寫入主存;
  • 使用volatile關鍵字的話,當線程2進行修改時,會導致線程1的工作內存中環村變量stop的緩存行無效。
  • 由於線程1的工作內存中緩存變量stop的緩存行無效,所以線程1再次讀取stop的值回去主存讀取。所以拿到的就是最新的值。

所以volatile保證了多線程的可見性

ii,volatile並不保證原子性

再看一個例子:

public class Test {
	public volatile int inc = 0;
	
	public void increase() {
		inc++;
	}
	
	public static void main(String[] args) {
		final Test test = new Test();
		for(int i=0;i<10;i++){
			new Thread(){
				public void run() {
					for(int j=0;j<1000;j++)
						test.increase();
				};
			}.start();
		}
		
		while(Thread.activeCount()>1)  //保證前面的線程都執行完
			Thread.yield();
		System.out.println(test.inc);
	}
}

運行它會發現每次運行結果都不一致,都是一個小於10000的數字。

這里面就有一個誤區了,volatile關鍵字能保證可見性沒有錯,但是上面的程序錯在沒能保證原子性。可見性只能保證每次讀取的是最新的值,但是volatile沒辦法保證對變量的操作的原子性。

  在前面已經提到過,自增操作是不具備原子性的,它包括讀取變量的原始值、進行加1操作、寫入工作內存。那么就是說自增操作的三個子操作可能會分割開執行,就有可能導致下面這種情況出現:

  假如某個時刻變量inc的值為10,

  線程1對變量進行自增操作,線程1先讀取了變量inc的原始值,然后線程1被阻塞了;

  然后線程2對變量進行自增操作,線程2也去讀取變量inc的原始值,由於線程1只是對變量inc進行讀取操作,而沒有對變量進行修改操作,所以不會導致線程2的工作內存中緩存變量inc的緩存行無效,所以線程2會直接去主存讀取inc的值,發現inc的值時10,然后進行加1操作,並把11寫入工作內存,最后寫入主存。

  然后線程1接着進行加1操作,由於已經讀取了inc的值,注意此時在線程1的工作內存中inc的值仍然為10,所以線程1對inc進行加1操作后inc的值為11,然后將11寫入工作內存,最后寫入主存。

  那么兩個線程分別進行了一次自增操作后,inc只增加了1。

  解釋到這里,可能有朋友會有疑問,不對啊,前面不是保證一個變量在修改volatile變量時,會讓緩存行無效嗎?然后其他線程去讀就會讀到新的值,對,這個沒錯。這個就是上面的happens-before規則中的volatile變量規則,但是要注意,線程1對變量進行讀取操作之后,被阻塞了的話,並沒有對inc值進行修改。然后雖然volatile能保證線程2對變量inc的值讀取是從內存中讀取的,但是線程1沒有進行修改,所以線程2根本就不會看到修改的值。

  根源就在這里,自增操作不是原子性操作,而且volatile也無法保證對變量的任何操作都是原子性的。

  把上面的代碼改成以下任何一種都可以達到效果:

  采用synchronized:

public class Test {
    public  int inc = 0;
    
    public synchronized void increase() {
        inc++;
    }
    
    public static void main(String[] args) {
        final Test test = new Test();
        for(int i=0;i<10;i++){
            new Thread(){
                public void run() {
                    for(int j=0;j<1000;j++)
                        test.increase();
                };
            }.start();
        }
        
        while(Thread.activeCount()>1)  //保證前面的線程都執行完
            Thread.yield();
        System.out.println(test.inc);
    }
}
復制代碼
public class Test {
    public  int inc = 0;
    
    public synchronized void increase() {
        inc++;
    }
    
    public static void main(String[] args) {
        final Test test = new Test();
        for(int i=0;i<10;i++){
            new Thread(){
                public void run() {
                    for(int j=0;j<1000;j++)
                        test.increase();
                };
            }.start();
        }
        
        while(Thread.activeCount()>1)  //保證前面的線程都執行完
            Thread.yield();
        System.out.println(test.inc);
    }
}
復制代碼

  采用Lock:

public class Test {
    public  int inc = 0;
    Lock lock = new ReentrantLock();
    
    public  void increase() {
        lock.lock();
        try {
            inc++;
        } finally{
            lock.unlock();
        }
    }
    
    public static void main(String[] args) {
        final Test test = new Test();
        for(int i=0;i<10;i++){
            new Thread(){
                public void run() {
                    for(int j=0;j<1000;j++)
                        test.increase();
                };
            }.start();
        }
        
        while(Thread.activeCount()>1)  //保證前面的線程都執行完
            Thread.yield();
        System.out.println(test.inc);
    }
}

復制代碼
public class Test {
    public  int inc = 0;
    Lock lock = new ReentrantLock();
    
    public  void increase() {
        lock.lock();
        try {
            inc++;
        } finally{
            lock.unlock();
        }
    }
    
    public static void main(String[] args) {
        final Test test = new Test();
        for(int i=0;i<10;i++){
            new Thread(){
                public void run() {
                    for(int j=0;j<1000;j++)
                        test.increase();
                };
            }.start();
        }
        
        while(Thread.activeCount()>1)  //保證前面的線程都執行完
            Thread.yield();
        System.out.println(test.inc);
    }
}
復制代碼

  采用AtomicInteger:

public class Test {
    public  AtomicInteger inc = new AtomicInteger();
     
    public  void increase() {
        inc.getAndIncrement();
    }
    
    public static void main(String[] args) {
        final Test test = new Test();
        for(int i=0;i<10;i++){
            new Thread(){
                public void run() {
                    for(int j=0;j<1000;j++)
                        test.increase();
                };
            }.start();
        }
        
        while(Thread.activeCount()>1)  //保證前面的線程都執行完
            Thread.yield();
        System.out.println(test.inc);
    }
}

復制代碼
public class Test {
    public  AtomicInteger inc = new AtomicInteger();
     
    public  void increase() {
        inc.getAndIncrement();
    }
    
    public static void main(String[] args) {
        final Test test = new Test();
        for(int i=0;i<10;i++){
            new Thread(){
                public void run() {
                    for(int j=0;j<1000;j++)
                        test.increase();
                };
            }.start();
        }
        
        while(Thread.activeCount()>1)  //保證前面的線程都執行完
            Thread.yield();
        System.out.println(test.inc);
    }
}
復制代碼

  在java 1.5的java.util.concurrent.atomic包下提供了一些原子操作類,即對基本數據類型的 自增(加1操作),自減(減1操作)、以及加法操作(加一個數),減法操作(減一個數)進行了封裝,保證這些操作是原子性操作。atomic是利用CAS來實現原子性操作的(Compare And Swap),CAS實際上是利用處理器提供的CMPXCHG指令實現的,而處理器執行CMPXCHG指令是一個原子性操作。

iii,volatile保證有序性

在前面提到volatile關鍵字能禁止指令重排序,所以volatile能在一定程度上保證有序性。

volatile關鍵字禁止指令重排序有兩層意思:

  1)當程序執行到volatile變量的讀操作或者寫操作時,在其前面的操作的更改肯定全部已經進行,且結果已經對后面的操作可見;在其后面的操作肯定還沒有進行;

  2)在進行指令優化時,不能將在對volatile變量訪問的語句放在其后面執行,也不能把volatile變量后面的語句放到其前面執行。

可能上面說的比較繞,舉個簡單的例子:

//x、y為非volatile變量
//flag為volatile變量

x = 2;        //語句1
y = 0;        //語句2
flag = true;  //語句3
x = 4;         //語句4
y = -1;       //語句5

由於flag變量為volatile變量,那么在進行指令重排序的過程的時候,不會將語句3放到語句1、語句2前面,也不會講語句3放到語句4、語句5后面。但是要注意語句1和語句2的順序、語句4和語句5的順序是不作任何保證的。並且volatile關鍵字能保證,執行到語句3時,語句1和語句2必定是執行完畢了的,且語句1和語句2的執行結果對語句3、語句4、語句5是可見的。

iv,使用volatile的場景

synchronized關鍵字是防止多個線程同時執行一段代碼,那么就會很影響程序執行效率,而volatile關鍵字在某些情況下性能要優於synchronized,但是要注意volatile關鍵字是無法替代synchronized關鍵字的,因為volatile關鍵字無法保證操作的原子性。通常來說,使用volatile必須具備以下2個條件:

  1)對變量的寫操作不依賴於當前值

  2)該變量沒有包含在具有其他變量的不變式中

  實際上,這些條件表明,可以被寫入 volatile 變量的這些有效值獨立於任何程序的狀態,包括變量的當前狀態。

  事實上,上面的2個條件需要保證操作是原子性操作,才能保證使用volatile關鍵字的程序在並發時能夠正確執行。

 

8,死鎖

有可能會因為每個線程要等待鎖而導致所有線程都被阻塞。這樣的狀態稱為死鎖。遺憾的是,Java編程語言沒有任何東西可以避免或打破死鎖現象,必須仔細設計程序,以確保不會出現死鎖。

 

9,線程局部變量

在線程間共享變量有風險,所以有時候要避免共享變量,使用ThreadLocal輔助類為各個線程提供各自的實例。Java的ThreadLocal不是設計用來解決多線程安全問題的,事實證明也解決不了,共享變量a還是會被隨意更改。ThreadLocal無能為力。所以,一般用ThreadLocal都不會將一個共享變量放到線程的ThreadLocal中。一般來講,存放到ThreadLocal中的變量都是當前線程,本身就獨一無二的一個變量。其他線程本身就不能訪問,存到ThreadLocal中只是為了方便在程序中同一個線程之間傳遞這個變量。 

 

10,鎖測試與超時

線程在調用lock()方法來獲得另一個線程所持有的鎖的時候,很可能發生阻塞。應該更加謹慎地申請鎖。tryLock()方法試圖申請一個鎖,在成功獲得鎖后返回true,否則,立即返回false,而且線程可以立即離開去做其他事情。

if(myLock.tryLock()) {
    //now the thread owns the lock
    try {...}
    finally {myLock.unlock();}
} else {
    //do something else
}

也可以調用tryLock時,使用超時參數: 

if (myLock.tryLock(100, TimeUnit.MILLISECONDS))

如果調用帶有超時參數的tryLock,那么線程在等待期間被中斷,將拋出InterruptedException異常。這是一個非常有用的特性,因為允許程序打破死鎖。也可以調用lockInterruptibly方法,它相當於一個超時設為無限的tryLock方法。

在等待一個條件時,也可以提供一個超時:

myCondition.await(100, TimeUnit.MILLISECONDS)

如果一個線程被另一個線程通過調用signalAll或signal激活,或者超時時限已達到,或者線程被中斷,那么await方法將返回。

 

11,讀/寫鎖

如果很多線程從一個數據結構讀取數據而很少線程修改其中數據的話,ReentrantReadWriteLock類非常有用。

下面是使用讀/寫鎖的必要步驟:

  a. 構造一個ReentrantReadWriteLock對象

private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

  b, 抽取讀鎖和寫鎖

private Lock readLock = rwl.readLock();
private Lock writeLock = rwl.writeLock();

  c, 對所有的獲取方法加讀鎖:

public double getTotalBalance() {
    readLock.lock();
    try {...}
    finally { readLock.unlock();}
}

  d, 對所有的修改方法加寫鎖:

public void transfer(...) {
    writeLock.lock();
    try {...}
    finally {writeLock.unlock();}
}

 

五,阻塞隊列

阻塞隊列與普通隊列的區別在於,當隊列是空的時,從隊列中獲取元素的操作將會被阻塞,或者當隊列是滿時,往隊列里添加元素的操作會被阻塞。試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其他的線程往空的隊列插入新的元素。同樣,試圖往已滿的阻塞隊列中添加新元素的線程同樣也會被阻塞,直到其他的線程使隊列重新變得空閑起來,如從隊列中移除一個或者多個元素,或者完全清空隊列,下圖展示了如何通過阻塞隊列來合作:



線程1往阻塞隊列中添加元素,而線程2從阻塞隊列中移除元素

從5.0開始,JDK在java.util.concurrent包里提供了阻塞隊列的官方實現。盡管JDK中已經包含了阻塞隊列的官方實現,但是熟悉其背后的原理還是很有幫助的。

阻塞隊列的實現類似於帶上限的Semaphore的實現。下面是阻塞隊列的一個簡單實現

public class BlockingQueue {

    private List queue = new LinkedList();

    private int  limit = 10;

    public BlockingQueue(int limit){

        this.limit = limit;

    }

    public synchronized void enqueue(Object item) throws InterruptedException {

        while(this.queue.size() == this.limit) {

            wait();

        }

        if(this.queue.size() == 0) {

            notifyAll();

        }

        this.queue.add(item);

    }

    public synchronized Object dequeue() throws InterruptedException{

        while(this.queue.size() == 0){

            wait();

        }

        if(this.queue.size() == this.limit){

            notifyAll();

        }

        return this.queue.remove(0);

    }

}                    

 

必須注意到,在enqueue和dequeue方法內部,只有隊列的大小等於上限(limit)或者下限(0)時,才調用notifyAll方法。如果隊列的大小既不等於上限,也不等於下限,任何線程調用enqueue或者dequeue方法時,都不會阻塞,都能夠正常的往隊列中添加或者移除元素。

 

六,線程安全的集合

java.util.concurrent包提供了映射表/有序集合隊列的高效實現:ConcurrentHashMap/ConcurrentSkipListMap/ConcurrentSkipListSet/ConcurrentLinkedQueue.

 

七,Callable 與 Future

在前面的文章中我們講述了創建線程的2種方式,一種是直接繼承Thread,另外一種就是實現Runnable接口。這2種方式都有一個缺陷就是:在執行完任務之后無法獲取執行結果。如果需要獲取執行結果,就必須通過共享變量或者使用線程通信的方式來達到效果,這樣使用起來就比較麻煩。

而自從Java 1.5開始,就提供了Callable和Future,通過它們可以在任務執行完畢之后得到任務執行結果。

Callable與Runnable

Runnable是一個接口,在它里面只聲明了一個run()方法,由於run()方法返回值為void類型,所以在執行完任務之后無法返回任何結果:

public interface Runnable {
    public abstract void run();
}

Callable位於java.util.concurrent包下,它也是一個接口,在它里面也只聲明了一個方法,只不過這個方法叫做call(),可以看到,這是一個泛型接口,call()函數返回的類型就是傳遞進來的V類型:

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

 那么怎么使用Callable呢?一般情況下是配合ExecutorService來使用的,在ExecutorService接口中聲明了若干個submit方法的重載版本:

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

第一個submit方法里面的參數類型就是Callable。一般情況下我們使用第一個submit方法和第三個submit方法,第二個submit方法很少使用。

Future

Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。必要時可以通過get方法獲取執行結果,該方法會阻塞直到任務返回結果。

  Future類位於java.util.concurrent包下,它是一個接口:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

   在Future接口中聲明了5個方法,下面依次解釋每個方法的作用:

  • cancel方法用來取消任務,如果取消任務成功則返回true,如果取消任務失敗則返回false。參數mayInterruptIfRunning表示是否允許取消正在執行卻沒有執行完畢的任務,如果設置true,則表示可以取消正在執行過程中的任務。如果任務已經完成,則無論mayInterruptIfRunning為true還是false,此方法肯定返回false,即如果取消已經完成的任務會返回false;如果任務正在執行,若mayInterruptIfRunning設置為true,則返回true,若mayInterruptIfRunning設置為false,則返回false;如果任務還沒有執行,則無論mayInterruptIfRunning為true還是false,肯定返回true。
  • isCancelled方法表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true。
  • isDone方法表示任務是否已經完成,若任務完成,則返回true;
  • get()方法用來獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回;
  • get(long timeout, TimeUnit unit)用來獲取執行結果,如果在指定時間內,還沒獲取到結果,就直接返回null。

  也就是說Future提供了三種功能:

  1)判斷任務是否完成;

  2)能夠中斷任務;

  3)能夠獲取任務執行結果。

  因為Future只是一個接口,所以是無法直接用來創建對象使用的,因此就有了下面的FutureTask。

FutureTask

我們先來看一下FutureTask的實現:

public class FutureTask<V> implements RunnableFuture<V>

   FutureTask類實現了RunnableFuture接口,我們看一下RunnableFuture接口的實現:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

   可以看出RunnableFuture繼承了Runnable接口和Future接口,而FutureTask實現了RunnableFuture接口。所以它既可以作為Runnable被線程執行,又可以作為Future得到Callable的返回值。

  FutureTask提供了2個構造器:

public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}

  事實上,FutureTask是Future接口的一個唯一實現類。

Callable&Future 示例:

package com.ivy.thread.unsynch;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class MyCallable implements Callable<String>{

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        ExecutorService executor = Executors.newFixedThreadPool(10);
        List<Future<String>> futures = new ArrayList<>();
        MyCallable myCallable = new MyCallable();
        for (int i = 0; i<100; i++) {
            Future<String> future = executor.submit(myCallable);
            futures.add(future);
        }
        
        for (Future<String> f : futures) {
            try {
                System.out.println(f.get());
            } catch (InterruptedException | ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        executor.shutdown();
    }

    @Override
    public String call() throws Exception {
        // TODO Auto-generated method stub
        Thread.sleep(1000);
        return Thread.currentThread().getName();
    }

}

結果:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-3
pool-1-thread-4
pool-1-thread-5
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10
pool-1-thread-2

...

 

Callable&FutureTask示例:

package com.ivy.thread.unsynch;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class MyCallableFutureTask implements Callable<String>{

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        ExecutorService executor = Executors.newFixedThreadPool(10);
        MyCallable myCallable = new MyCallable();
        List<FutureTask<String>> futures = new ArrayList<>();
        
        for (int i = 0; i<100; i++) {
            FutureTask<String> task = new FutureTask<>(myCallable);
            executor.submit(task);
            futures.add(task);
        }
        
        for (FutureTask<String> f : futures) {
            try {
                System.out.println(f.get());
            } catch (InterruptedException | ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        executor.shutdown();
    }

    @Override
    public String call() throws Exception {
        // TODO Auto-generated method stub
        Thread.sleep(1000);
        return Thread.currentThread().getName();
    }

}

 

九,執行器

線程池中包含許多准備運行的空閑線程。將Runnable對象交給線程池,就會有一個線程調用run方法,當run方法退出時,線程不會死亡,而是在池中准備為下一個請求提供服務。另一個使用線程池的理由是減少並發線程的數目。創建大量線程會大大降低性能甚至使虛擬機崩潰。如果有個會創建許多線程的算法,應該使用一個線程數固定的線程池以限制並發線程的總數。

執行器類有許多靜態工廠方法來構建線程池:

  • newCachedThreadPool 必要時創建新線程;空閑線程會被保留60秒
  • newFixedThreadPool 該池包含固定數量的線程;空閑線程會一直被保留
  • newSingleThreadExecutor 只有一個線程的池,該線程順序執行每個提交的任務
  • newScheduledThreadPool 用於預定執行而構建的固定線程池,代替Timer
  • newSingleThreadScheduledExecutor 用於預定執行而構建的單線程池

線程池

前三個方法返回的是實現了ExecutorService接口的ThreadPoolExecutor類的對象,ExecutorService接口有三個方法:

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

一般情況下我們使用第一個submit方法和第三個submit方法,第二個submit方法很少使用。

當用完一個線程池的時候,調用shutdown。該方法啟動該線程的關閉序列。被關閉的執行器不再接受新的任務。當所有任務都完成以后,線程池中的線程死亡。另一種方法是調用shutdownNow,這時線程池會取消尚未開始的所有任務並試圖中斷正在運行的線程。

在使用連接池時應該做的事情,示例見Callable&Future示例:

  1. 調用Executors類中靜態方法newCachedThreadPool或newFixedThreadPool.
  2. 調用submit提交Runnable或Callable對象。
  3. 如果想要取消一個任務,或如果提交Callable對象,那就要保存好返回的Future對象。
  4. 當不再提交任何任務時,調用shutdown.

預定執行

ScheduledExecutorService接口具有為預定執行或重復執行任務而設計的方法。它是一種允許使用線程池機制的Timer的泛化。Executors類的newScheduledThreadPool和newSingleThreadScheduledExecutor方法返回實現了ScheduledExecutorService接口的對象。可以預定Runnable或Callable在初始的延遲后只運行一次,也可以預定一個Runnable對象周期性地運行。

Fork-Join框架

什么是fork/join框架

  fork/join框架是ExecutorService接口的一個實現,可以幫助開發人員充分利用多核處理器的優勢,編寫出並行執行的程序,提高應用程序的性能;設計的目的是為了處理那些可以被遞歸拆分的任務。

  fork/join框架與其它ExecutorService的實現類相似,會給線程池中的線程分發任務,不同之處在於它使用了工作竊取算法,所謂工作竊取,指的是對那些處理完自身任務的線程,會從其它線程竊取任務執行。

  fork/join框架的核心是ForkJoinPool類,該類繼承了AbstractExecutorService類。ForkJoinPool實現了工作竊取算法並且能夠執行 ForkJoinTask任務。

基本使用方法

  在使用fork/join框架之前,我們需要先對任務進行分割,任務分割代碼應該跟下面的偽代碼類似:

if (任務足夠小){
  直接執行該任務;
}else{ 將任務一分為二; 執行這兩個任務並等待結果;
}

  首先,我們會在ForkJoinTask的子類中封裝以上代碼,不過一般我們會使用更加具體的ForkJoinTask類型,如 RecursiveTask(可以返回一個結果)或RecursiveAction

  當寫好ForkJoinTask的子類后,創建該對象,該對象代表了所有需要完成的任務;然后將這個任務對象傳給ForkJoinPool實例的invoke()去執行即可。

示例:

package com.ivy.thread;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinTest {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        final int SIZE = 10000000;
        double[] numbers = new double[SIZE];
        for(int i=0;i<SIZE;i++) {
            numbers[i] = Math.random();
        }
        Counter counter = new Counter(numbers, 0, numbers.length, new Filter() {

            @Override
            public boolean accept(double t) {
                return t > 0.5;
            }
            
        });
        
        ForkJoinPool pool = new ForkJoinPool();
        pool.invoke(counter);
        System.out.println(counter.join());
    }

}

interface Filter {
    boolean accept(double t);
}

class Counter extends RecursiveTask<Integer> {

    public static final int THRESHOLD = 1000;
    private double[] values;
    private int from;
    private int to;
    private Filter filter;
    
    public Counter(double[] values, int from, int to, Filter filter) {
        this.values = values;
        this.from = from;
        this.to = to;
        this.filter = filter;
    }
    @Override
    protected Integer compute() {
        if(to-from < THRESHOLD) {
            int count = 0;
            for(int i=from;i<to; i++) {
                if (filter.accept(values[i])) {
                    count ++;
                }
            }
            return count;
        } else {
            int mid =(from+to)/2;
            Counter first = new Counter(values, from, mid, filter);
            Counter second = new Counter(values, mid, to, filter);
            invokeAll(first, second);
            return first.join() + second.join();
        }
    }
    
}

 


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2020 ITdaan.com