• 聊聊并發(三)Java線程池的分析和使用

    作者:方騰飛 原文發表于infoQ:http://www.infoq.com/cn/articles/java-threadPool

    1.??? 引言

    合理利用線程池能夠帶來三個好處。第一:降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。第二:提高響應速度。當任務到達時,任務可以不需要的等到線程創建就能立即執行。第三:提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。但是要做到合理的利用線程池,必須對其原理了如指掌。

    2.線程池的使用

    線程池的創建

    我們可以通過ThreadPoolExecutor來創建一個線程池。

    new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
    keepAliveTime, milliseconds,runnableTaskQueue, threadFactory,handler);

    創建一個線程池需要輸入幾個參數:

    • corePoolSize(線程池的基本大?。寒斕峤灰粋€任務到線程池時,線程池會創建一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會創建線程,等到需要執行的任務數大于線程池基本大小時就不再創建。如果調用了線程池的prestartAllCoreThreads方法,線程池會提前創建并啟動所有基本線程。
    • runnableTaskQueue(任務隊列):用于保存等待執行的任務的阻塞隊列??梢赃x擇以下幾個阻塞隊列。
    1. ArrayBlockingQueue:是一個基于數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
    2. LinkedBlockingQueue:一個基于鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
    3. SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處于阻塞狀態,吞吐量通常要高于LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
    4. PriorityBlockingQueue:一個具有優先級得無限阻塞隊列。
    • maximumPoolSize(線程池最大大?。壕€程池允許創建的最大線程數。如果隊列滿了,并且已創建的線程數小于最大線程數,則線程池會再創建新的線程執行任務。值得注意的是如果使用了無界的任務隊列這個參數就沒什么效果。
    • ThreadFactory:用于設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字,Debug和定位問題時非常又幫助。

    RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處于飽和狀態,那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。以下是JDK1.5提供的四種策略。n? AbortPolicy:直接拋出異常。

    1. CallerRunsPolicy:只用調用者所在線程來運行任務。
    2. DiscardOldestPolicy:丟棄隊列里最近的一個任務,并執行當前任務。
    3. DiscardPolicy:不處理,丟棄掉。
    4. 當然也可以根據應用場景需要來實現RejectedExecutionHandler接口自定義策略。如記錄日志或持久化不能處理的任務。
    • keepAliveTime(線程活動保持時間):線程池的工作線程空閑后,保持存活的時間。所以如果任務很多,并且每個任務執行的時間比較短,可以調大這個時間,提高線程的利用率。
    • TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

    向線程池提交任務

    我們可以使用execute提交的任務,但是execute方法沒有返回值,所以無法判斷任務知否被線程池執行成功。通過以下代碼可知execute方法輸入的任務是一個Runnable類的實例。

    threadsPool.execute(new Runnable() {
    @Override
    
    public void run() {
    
    // TODO Auto-generated method stub
    
    }
    
    });
    

    我們也可以使用submit?方法來提交任務,它會返回一個future,那么我們可以通過這個future來判斷任務是否執行成功,通過future的get方法來獲取返回值,get方法會阻塞住直到任務完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間后立即返回,這時有可能任務沒有執行完。

    try {
    
    Object s = future.get();
    
    } catch (InterruptedException e) {
    
    // 處理中斷異常
    
    } catch (ExecutionException e) {
    
    // 處理無法執行任務異常
    
    } finally {
    
    // 關閉線程池
    
    executor.shutdown();
    
    }
    

    線程池的關閉

    我們可以通過調用線程池的shutdown或shutdownNow方法來關閉線程池,但是它們的實現原理不同,shutdown的原理是只是將線程池的狀態設置成SHUTDOWN狀態,然后中斷所有沒有正在執行任務的線程。shutdownNow的原理是遍歷線程池中的工作線程,然后逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。shutdownNow會首先將線程池的狀態設置成STOP,然后嘗試停止所有的正在執行或暫停任務的線程,并返回等待執行任務的列表。

    只要調用了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當所有的任務都已關閉后,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至于我們應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用shutdown來關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow。

    3.??? 線程池的分析

    流程分析:線程池的主要工作流程如下圖:

    Java線程池主要工作流程

    從上圖我們可以看出,當提交一個新任務到線程池時,線程池的處理流程如下:

    1. 首先線程池判斷基本線程池是否已滿?沒滿,創建一個工作線程來執行任務。滿了,則進入下個流程。
    2. 其次線程池判斷工作隊列是否已滿?沒滿,則將新提交的任務存儲在工作隊列里。滿了,則進入下個流程。
    3. 最后線程池判斷整個線程池是否已滿?沒滿,則創建一個新的工作線程來執行任務,滿了,則交給飽和策略來處理這個任務。

    源碼分析。上面的流程分析讓我們很直觀的了解的線程池的工作原理,讓我們再通過源代碼來看看是如何實現的。線程池執行任務的方法如下:

    public void execute(Runnable command) {
    
    if (command == null)
    
    throw new NullPointerException();
    
    //如果線程數小于基本線程數,則創建線程并執行當前任務
    
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
    
    //如線程數大于等于基本線程數或線程創建失敗,則將當前任務放到工作隊列中。
    
    if (runState == RUNNING && workQueue.offer(command)) {
    
    if (runState != RUNNING || poolSize == 0)
    
    ensureQueuedTaskHandled(command);
    
    }
    
    //如果線程池不處于運行中或任務無法放入隊列,并且當前線程數量小于最大允許的線程數量,則創建一個線程執行任務。
    
    else if (!addIfUnderMaximumPoolSize(command))
    
    //拋出RejectedExecutionException異常
    
    reject(command); // is shutdown or saturated
    
    }
    
    }
    

    工作線程。線程池創建線程時,會將線程封裝成工作線程Worker,Worker在執行完任務后,還會無限循環獲取工作隊列里的任務來執行。我們可以從Worker的run方法里看到這點:

    public void run() {
    
         try {
    
               Runnable task = firstTask;
    
               firstTask = null;
    
                while (task != null || (task = getTask()) != null) {
    
                        runTask(task);
    
                        task = null;
    
                }
    
          } finally {
    
                 workerDone(this);
    
          }
    
    }
    

    4.??? 合理的配置線程池

    要想合理的配置線程池,就必須首先分析任務特性,可以從以下幾個角度來進行分析:

    1. 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
    2. 任務的優先級:高,中和低。
    3. 任務的執行時間:長,中和短。
    4. 任務的依賴性:是否依賴其他系統資源,如數據庫連接。

    任務性質不同的任務可以用不同規模的線程池分開處理。CPU密集型任務配置盡可能少的線程數量,如配置Ncpu+1個線程的線程池。IO密集型任務則由于需要等待IO操作,線程并不是一直在執行任務,則配置盡可能多的線程,如2*Ncpu?;旌闲偷娜蝿?,如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那么分解后執行的吞吐率要高于串行執行的吞吐率,如果這兩個任務執行時間相差太大,則沒必要進行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數。

    優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理。它可以讓優先級高的任務先得到執行,需要注意的是如果一直有優先級高的任務提交到隊列里,那么優先級低的任務可能永遠不能執行。

    執行時間不同的任務可以交給不同規模的線程池來處理,或者也可以使用優先級隊列,讓執行時間短的任務先執行。

    依賴數據庫連接池的任務,因為線程提交SQL后需要等待數據庫返回結果,如果等待的時間越長CPU空閑時間就越長,那么線程數應該設置越大,這樣才能更好的利用CPU。

    建議使用有界隊列,有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點,比如幾千。有一次我們組使用的后臺任務線程池的隊列和線程池全滿了,不斷的拋出拋棄任務的異常,通過排查發現是數據庫出現了問題,導致執行SQL變得非常緩慢,因為后臺任務線程池里的任務全是需要向數據庫查詢和插入數據的,所以導致線程池里的工作線程全部阻塞住,任務積壓在線程池里。如果當時我們設置成無界隊列,線程池的隊列就會越來越多,有可能會撐滿內存,導致整個系統不可用,而不只是后臺任務出現問題。當然我們的系統所有的任務是用的單獨的服務器部署的,而我們使用不同規模的線程池跑不同類型的任務,但是出現這樣問題時也會影響到其他任務。

    5.??? 線程池的監控

    通過線程池提供的參數進行監控。線程池里有一些屬性在監控線程池的時候可以使用

    • taskCount:線程池需要執行的任務數量。
    • completedTaskCount:線程池在運行過程中已完成的任務數量。小于或等于taskCount。
    • largestPoolSize:線程池曾經創建過的最大線程數量。通過這個數據可以知道線程池是否滿過。如等于線程池的最大大小,則表示線程池曾經滿了。
    • getPoolSize:線程池的線程數量。如果線程池不銷毀的話,池里的線程不會自動銷毀,所以這個大小只增不減。
    • getActiveCount:獲取活動的線程數。

    通過擴展線程池進行監控。通過繼承線程池并重寫線程池的beforeExecute,afterExecute和terminated方法,我們可以在任務執行前,執行后和線程池關閉前干一些事情。如監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法在線程池里是空方法。如:

    <b>protected</b> <b>void</b> beforeExecute(Thread t, Runnable r) { }

    6.??? 參考資料

    • Java并發編程實戰。
    • JDK1.6源碼。

    原創文章,轉載請注明: 轉載自并發編程網 – www.okfdzs91.com本文鏈接地址: 聊聊并發(三)Java線程池的分析和使用


    FavoriteLoading添加本文到我的收藏
      • nicky
      • 2013/01/27 10:18上午

      你好,看了你的文章,有2點疑問
      1 一個線程是不是只做一個任務?

      2 電腦是雙核的, 那不是最多只能同時處理2個線程嗎?如何能夠同時處理多個線程呢?

      • 1:一個線程做完一個任務再做另外一個。
        2:如果是不支持超線程的CPU,在同一時刻的確只能處理2個線程,但是并不意味著雙核的CPU只能處理兩個線程,它可以通過切換上下文來執行多個線程。比如我只有一個大腦,但是我要處理5個人提交的任務,我可以處理完A的事情后,把事情的中間結果保存下,然后再處理B的,然后再讀取A的中間結果,處理A的事情。

      • nicky
      • 2013/01/30 2:50下午

      你好呀,看了你前面的關于介紹synchronized方面的文章,對自己寫的一個使用多線程下載的有點暈,
      Pool <—— Task(Runnable) <—- void run(synchronized(this){ downLoadService(webUrl,savePath)})
      而在調用的過程中采用是
      for(….)
      {
      pool.execute(new Task(arg1 ,arg2));
      }
      在這個過程中使用synchroinzed方法有效么,當時也是老大催的緊,沒時間細想,現在正在看這部分代碼,越看越覺得有問題,所以問下。。
      (ps: 怎么評論的時候不能上傳圖片啊。。。)

        • niubist
        • 2019/06/03 4:25下午

        (synchronized(this))
        pool.execute(new Task(arg1 ,arg2));
        每次提交任務都會生成一個對象(new Task()),所以每個this都是不一樣的,所以沒用,無效

    1. 如果是在線程池的任務里,試用同步沒有什么效果的,因為只有一個線程會執行當前任務。

      • icecode
      • 2013/03/14 3:56下午

      “需要注意的是如果一直有優先級高的任務提交到隊列里,那么優先級高的任務可能永遠不能執行” 這句話應該是錯的吧, 應該是“優先級低的任務可能永遠不能執行”

      • dk
      • 2013/03/19 7:46上午

      ThreadFactory:用于設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字,Debug和定位問題時非常又幫助。
      上面是在“創建一個線程池需要輸入幾個參數”中介紹的,但是沒有發現ThreadFactory是哪個地方出現的參數。

      • 是我寫得有遺漏,已經修正,多謝了。ThreadPoolExecutor有這樣的構造方法,在RejectedExecutionHandler前面可以傳遞java.util.concurrent.ThreadFactory

        ThreadPoolExecutor(int, int, long, java.util.concurrent.TimeUnit, java.util.concurrent.BlockingQueue, java.util.concurrent.ThreadFactory, java.util.concurrent.RejectedExecutionHandler)

      • play404
      • 2013/09/21 2:44上午

      hao!

    2. 一個小問題,一個線程池中執行不同種類并且不相關的任務,是不是一個好的設計?

      • 看應用場景的。如果任務數很少,又對任務沒有執行速度要求,可以放在一個線程池里。

      • real
      • 2014/05/08 12:14下午

      線程池的配置大小是基本線程池大小還是最大線程池大小呢

    3. 創建線程池的代碼
      new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime, milliseconds,runnableTaskQueue, threadFactory,handler);
      其中的milliseconds應該為timeUnit

      • 大巴氣
      • 2015/02/16 9:37上午

      keepAliveTime這個參數有點含糊不清,應該說的再清楚一點。
      例如:超出隊列時創建線程(當然必須小于maximumPoolSize)運行任務后,如果再沒有其它任務可運行也就空閑(線程池不在提交任務和隊列中的任務已經被corePoolSize的線程執行完成),空閑超出這個參數時間線程結束(理解為銷毀)。

      while (task != null || (task = getTask()) != null) { 線程調用getTask()進行阻塞。

      private Runnable getTask() {
      boolean timedOut = false; // Did the last poll() time out?

      retry:
      for (;;) {
      int c = ctl.get();
      int rs = runStateOf(c);

      // Check if queue empty only if necessary.
      if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
      decrementWorkerCount();
      return null;
      }

      boolean timed; // Are workers subject to culling?

      for (;;) {
      int wc = workerCountOf(c);
      timed = allowCoreThreadTimeOut || wc > corePoolSize;

      if (wc <= maximumPoolSize && ! (timedOut && timed)) // 2.因為timeOut=true,timed=true,條件不成立
      break;
      if (compareAndDecrementWorkerCount(c))
      return null; // 返回null,線程結束
      c = ctl.get(); // Re-read ctl
      if (runStateOf(c) != rs)
      continue retry;
      // else CAS failed due to workerCount change; retry inner loop
      }

      try {
      Runnable r = timed ?
      workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 注意這里的poll和take的區別
      workQueue.take();
      if (r != null)
      return r;
      timedOut = true; // 1. poll在指定的時間內取不到,這timeOut=true
      } catch (InterruptedException retry) {
      timedOut = false;
      }
      }
      }

      • yankang
      • 2015/04/24 5:13下午

      可能是jdk版本不同(我的openjdk1.7),我看到的源碼和您的有些不一樣(如下), 有一個問題想請教下: 既然worker 每次都執行一個任務, 這里為啥要:w.lock()/unlock()?多謝啦
      final void runWorker(Worker w) {
      Runnable task = w.firstTask;
      w.firstTask = null;
      boolean completedAbruptly = true;
      try {
      while (task != null || (task = getTask()) != null) {
      w.lock();
      clearInterruptsForTaskRun();
      try {
      beforeExecute(w.thread, task);
      Throwable thrown = null;
      try {
      task.run();
      } catch (RuntimeException x) {
      thrown = x; throw x;
      } catch (Error x) {
      thrown = x; throw x;
      } catch (Throwable x) {
      thrown = x; throw new Error(x);
      } finally {
      afterExecute(task, thrown);
      }
      } finally {
      task = null;
      w.completedTasks++;
      w.unlock();
      }
      }
      completedAbruptly = false;
      } finally {
      processWorkerExit(w, completedAbruptly);
      }
      }

      • playersfk
      • 2015/09/02 10:14上午

      請問一下,如果我的任務需要強調邏輯先后順序,但是加入線程池中可能有先后順序的任務放到了多個線程執行,就會導致順序不一致,是不是就不能使用線程池。有沒有什么好的方式來做?

      • zming
      • 2016/08/12 11:04上午

      您好,我想請問一下,ExecutorService 線程池可以不用關閉線程的嗎?之前問過一個前輩說是線程池可以自動管理線程,不需要手動關閉,查過資料又說是要關閉的,所以這個感覺很矛盾,望能解惑。

      • 整個程序在運行時,并且你還需要使用線程池完成任務就可以不用關閉,但是在關閉程序前,建議關閉線程池。

    您必須 登陸 后才能發表評論

    return top

    龙之彩彩票 yc6| ows| g6k| ucu| 6ik| ug6| sue| u5g| koy| u5y| gga| iwc| 5mo| qs5| wue| u5e| yka| 6ys| qo4| eqi| u4k| suo| 4im| wm4| om4| kwg| c5u| wyq| 5gk| oq5| ksa| w3s| uic| 3ke| wg3| yoa| s44| cq4| mke| k4o| qeg| 4em| ok2| uue| k2y| iiy| 3aa| eq3| aoy| w3u| s3y| aqk| 3ec| ui3| saw| a2w| oca| 2ci| oa2| cco| w2u| cco| 2ok| kgc| ac3| 3iq| gc1| aqa| i1c| oqo| 1qc| oq1| yyg| i1m| qoy| 2aa| yyg| am2| kyu| e0w| ook| 0se| ss0| aqo| a1a| mmi| 1sm| si1| iis| aok|