前言:最近在做分布式海量數據處理項目,使用到了java的線程池,所以搜集了一些資料對它的使用做了一下總結和探究。文中最核心的東西在于后面兩節無界隊列線程池和有界隊列線程池的實例使用以及線上問題處理方案。
1. 為什么要用線程池?
在Java中,如果每當一個請求到達就創建一個新線程,開銷是相當大的。在實際使用中,每個請求創建新線程的服務器在創建和銷毀線程上花費的時間和消耗的系統資源,甚至可能要比花在實際處理實際的用戶請求的時間和資源要多的多。除了創建和銷毀線程的開銷之外,活動的線程也需要消耗系統資源。如果在一個JVM中創建太多的線程,可能會導致系統由于過度消耗內存或者“切換過度”而導致系統資源不足。為了防止資源不足,服務器應用程序需要一些辦法來限制任何給定時刻處理的請求數目,盡可能減少創建和銷毀線程的次數,特別是一些資源耗費比較大的線程的創建和銷毀,盡量利用已有對象來進行服務,這就是“池化資源”技術產生的原因。
線程池主要用來解決線程生命周期開銷問題和資源不足問題,通過對多個任務重用線程,線程創建的開銷被分攤到多個任務上了,而且由于在請求到達時線程已經存在,所以消除了創建所帶來的延遲。這樣,就可以立即請求服務,使應用程序響應更快。另外,通過適當的調整線程池中的線程數據可以防止出現資源不足的情況。
網上找來的這段話,清晰的描述了為什么要使用線程池,使用線程池有哪些好處。工程項目中使用線程池的場景比比皆是。
本文關注的重點是如何在實戰中來使用好線程池這一技術,來滿足海量數據大并發用戶請求的場景。
2. ThreadPoolExecutor類
Java中的線程池技術主要用的是ThreadPoolExecutor 這個類。先來看這個類的構造函數,
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize 線程池維護線程的最少數量
maximumPoolSize 線程池維護線程的最大數量
keepAliveTime 線程池維護線程所允許的空閑時間
workQueue 任務隊列,用來存放我們所定義的任務處理線程
threadFactory 線程創建工廠
handler 線程池對拒絕任務的處理策略
ThreadPoolExecutor 將根據 corePoolSize和 maximumPoolSize 設置的邊界自動調整池大小。當新任務在方法execute(Runnable) 中提交時, 如果運行的線程少于 corePoolSize,則創建新線程來處理請求,即使其他輔助線程是空閑的。如果運行的線程多于 corePoolSize 而少于 maximumPoolSize,則僅當隊列滿時才創建新線程。 如果設置的corePoolSize 和 maximumPoolSize 相同,則創建了固定大小的線程池。
ThreadPoolExecutor是Executors類的實現,Executors類里面提供了一些靜態工廠,生成一些常用的線程池,主要有以下幾個:
newSingleThreadExecutor:創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當于單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。
newFixedThreadPool:創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。
newCachedThreadPool:創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(或者說JVM)能夠創建的最大線程大小。
在實際的項目中,我們會使用得到比較多的是newFixedThreadPool,創建固定大小的線程池,但是這個方法在真實的線上環境中還是會有很多問題,這個將會在下面一節中詳細講到。
當任務源源不斷的過來,而我們的系統又處理不過來的時候,我們要采取的策略是拒絕服務。RejectedExecutionHandler接口提供了拒絕任務處理的自定義方法的機會。在ThreadPoolExecutor中已經包含四種處理策略。
1)CallerRunsPolicy:線程調用運行該任務的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
這個策略顯然不想放棄執行任務。但是由于池中已經沒有任何資源了,那么就直接使用調用該execute的線程本身來執行。
2)AbortPolicy:處理程序遭到拒絕將拋出運行時 RejectedExecutionException
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException();
}
這種策略直接拋出異常,丟棄任務。
3)DiscardPolicy:不能執行的任務將被刪除
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
這種策略和AbortPolicy幾乎一樣,也是丟棄任務,只不過他不拋出異常。
4)DiscardOldestPolicy:如果執行程序尚未關閉,則位于工作隊列頭部的任務將被刪除,然后重試執行程序(如果再次失敗,則重復此過程)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
該策略就稍微復雜一些,在pool沒有關閉的前提下首先丟掉緩存在隊列中的最早的任務,然后重新嘗試運行該任務。這個策略需要適當小心。
3. ThreadPoolExecutor無界隊列使用
public class ThreadPool {
private final static String poolName = “mypool”;
static private ThreadPool threadFixedPool = new ThreadPool(2);
private ExecutorService executor;
static public ThreadPool getFixedInstance() {
return threadFixedPool;
}
private ThreadPool(int num) {
executor = Executors.newFixedThreadPool(num, new DaemonThreadFactory(poolName));
}
public void execute(Runnable r) {
executor.execute(r);
}
public static void main(String[] params) {
class MyRunnable implements Runnable {
public void run() {
System.out.println(“OK!”);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
for (int i = 0; i < 10; i++) {
ThreadPool.getFixedInstance().execute(new MyRunnable());
}
try {
Thread.sleep(2000);
System.out.println(“Process end.”);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在這段代碼中,我們發現我們用到了Executors.newFixedThreadPool()函數,這個函數的實現是這樣子的:
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
它實際上是創建了一個無界隊列的固定大小的線程池。執行這段代碼,我們發現所有的任務都正常處理了。但是在真實的線上環境中會存在這樣的一個問題,前端的用戶請求源源不斷的過來,后端的處理線程如果處理時間變長,無法快速的將用戶請求處理完返回結果給前端,那么任務隊列中將堵塞大量的請求。這些請求在前端都是有超時時間設置的,假設請求是通過套接字過來,當我們的后端處理進程處理完一個請求后,從隊列中拿下一個任務,發現這個任務的套接字已經無效了,這是因為在用戶端已經超時,將套接字建立的連接關閉了。這樣一來我們這邊的處理程序再去讀取套接字時,就會發生I/0 Exception. 惡性循環,導致我們所有的處理服務線程讀的都是超時的套接字,所有的請求過來都拋I/O異常,這樣等于我們整個系統都掛掉了,已經無法對外提供正常的服務了。
對于海量數據的處理,現在業界都是采用集群系統來進行處理,當請求的數量不斷加大的時候,我們可以通過增加處理節點,反正現在硬件設備相對便宜。但是要保證系統的可靠性和穩定性,在程序方面我們還是可以進一步的優化的,我們下一節要講述的就是針對線上出現的這類問題的一種處理策略。
4. ThreadPoolExecutor有界隊列使用
public class ThreadPool {
private final static String poolName = “mypool”;
static private ThreadPool threadFixedPool = null;
public ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);
private ExecutorService executor;
static public ThreadPool getFixedInstance() {
return threadFixedPool;
}
private ThreadPool(int num) {
executor = new ThreadPoolExecutor(2, 4,60,TimeUnit.SECONDS, queue,new DaemonThreadFactory
(poolName), new ThreadPoolExecutor.AbortPolicy());
}
public void execute(Runnable r) {
executor.execute(r);
}
public static void main(String[] params) {
class MyRunnable implements Runnable {
public void run() {
System.out.println(“OK!”);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
int count = 0;
for (int i = 0; i < 10; i++) {
try {
ThreadPool.getFixedInstance().execute(new MyRunnable());
} catch (RejectedExecutionException e) {
e.printStackTrace();
count++;
}
}
try {
log.info(“queue size:” + ThreadPool.getFixedInstance().queue.size());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“Reject task: ” + count);
}
}
首先我們來看下這段代碼幾個重要的參數,corePoolSize 為2,maximumPoolSize為4,任務隊列大小為2,每個任務平均處理時間為10ms,一共有10個并發任務。
執行這段代碼,我們會發現,有4個任務失敗了。這里就驗證了我們在上面提到有界隊列時候線程池的執行順序。當新任務在方法 execute(Runnable) 中提交時, 如果運行的線程少于 corePoolSize,則創建新線程來處理請求。 如果運行的線程多于corePoolSize 而少于 maximumPoolSize,則僅當隊列滿時才創建新線程,如果此時線程數量達到maximumPoolSize,并且隊列已經滿,就會拒絕繼續進來的請求。
現在我們調整一下代碼中的幾個參數,將并發任務數改為200,執行結果Reject task: 182,說明有18個任務成功了,線程處理完一個請求后會接著去處理下一個過來的請求。在真實的線上環境中,會源源不斷的有新的請求過來,當前的被拒絕了,但只要線程池線程把當下的任務處理完之后還是可以處理下一個發送過來的請求。
通過有界隊列可以實現系統的過載保護,在高壓的情況下,我們的系統處理能力不會變為0,還能正常對外進行服務,雖然有些服務可能會被拒絕,至于如何減少被拒絕的數量以及對拒絕的請求采取何種處理策略我將會在下一篇文章《系統的過載保護》中繼續闡述。