JavaのThreadPoolを試す
JavaのThreadPoolを試してみたメモです。
Javaのjava.util.concurrent.ExecutorsのThreadPoolを触ってみました。
- Executors
- Executor
- ExecutorService
- ScheduledExecutorService
- テスト用クラス
- newSingleThreadExecutor
- newFixedThreadPool
- newCachedThreadPool
- newSingleThreadScheduledExecutor
- newScheduledthreadpool
- newWorkStealingPool
Executors
ExecutorsはExecutor,ExecutorService,ScheduledExecutorServiceのファクトリメソッドや
ユーティリティメソッドを提供するクラスです。
Executor
Executorインターフェースは戻り値のないタスクを実行するメソッドを提供します。
void execute()だけが定義されています。
ExecutorService
ExecutorServiceはExecutorのサブインターフェースで、
タスクの終了を管理するメソッドや、戻り値のあるタスク(Future)のメソッドを提供します。
ScheduledExecutorService
ScheduledExecutorServiceはExecutorServiceのサブインターフェースで、
タスクの繰り返し実行や、一定時間待機後のタスク実行を提供します。
テスト用クラス
テスト用に下記のクラスを用意しました。
ExecutorSupplierはExecutorを受け取って、doSomethingでタスクを実行します。
private static class ExecutorSupplier { private final Executor executor; public ExecutorSupplier(Executor executor) { this.executor = executor; } public void doSomething() { executor.execute( () -> { // doSomething System.out.println(new Date() + ":" + Thread.currentThread().getName()); } ); } }
ScheduledExecutorSupplierはScheduledExecutorServiceを受け取って、
doSomethingで1,000msec後にタスクを実行します。
private static class ScheduledExecutorSupplier { private final ScheduledExecutorService scheduledExecutorService; public ScheduledExecutorSupplier(ScheduledExecutorService scheduledExecutorService) { this.scheduledExecutorService = scheduledExecutorService; } public void doSomething() { scheduledExecutorService.schedule( () -> { // doSomething System.out.println(new Date() + ":" + Thread.currentThread().getName()); }, 1_000L, TimeUnit.MILLISECONDS ); } }
newSingleThreadExecutor
newSingleThreadExecutorは単一のワーカースレッドを使用するExecutorを生成します。
@Test public void newSingleThreadExecutorTest() throws InterruptedException { // newSingleThreadExecutor ExecutorService executorService = Executors.newSingleThreadExecutor(); ExecutorSupplier supplier = new ExecutorSupplier(executorService); try { supplier.doSomething(); supplier.doSomething(); supplier.doSomething(); Thread.sleep(500); } finally { executorService.shutdown(); } }
実行してみるとすべて同じWorkerで実行されています。
Tue Mar 26 20:36:16 JST 2017:pool-3-thread-1 Tue Mar 26 20:36:16 JST 2017:pool-3-thread-1 Tue Mar 26 20:36:16 JST 2017:pool-3-thread-1
newFixedThreadPool
newFixedThreadPoolは引数で指定した数のスレッドを再利用するスレッドプールを作成します。
@Test public void newFixedThreadPoolTest() throws InterruptedException { // newFixedThreadPool ExecutorService executorService = Executors.newFixedThreadPool(2); ExecutorSupplier supplier = new ExecutorSupplier(executorService); try { supplier.doSomething(); supplier.doSomething(); supplier.doSomething(); supplier.doSomething(); Thread.sleep(500); } finally { executorService.shutdown(); } }
実行して見ると2つのWorkerが再利用されて実行されているのが分かります。
Tue Mar 26 20:40:00 JST 2017:pool-1-thread-2 Tue Mar 26 20:40:00 JST 2017:pool-1-thread-1 Tue Mar 26 20:40:00 JST 2017:pool-1-thread-2 Tue Mar 26 20:40:00 JST 2017:pool-1-thread-1
newCachedThreadPool
newCachedThreadPoolは新規にスレッドを生成しますが、再利用可能な場合はスレッドを再利用します。
60秒間使用されなかったスレッドはスレッドプールから削除されます。
下記のように連続でdoSomething()を実行してみると、
@Test public void newCachedThreadPoolTest() throws InterruptedException { // newCachedThreadPool ExecutorService executorService = Executors.newCachedThreadPool(); ExecutorSupplier supplier = new ExecutorSupplier(executorService); try { supplier.doSomething(); supplier.doSomething(); supplier.doSomething(); Thread.sleep(500); } finally { executorService.shutdown(); } }
新しいスレッドが作成され実行されています。
Wed Mar 26 20:23:47 JST 2017:pool-2-thread-1 Wed Mar 26 20:23:47 JST 2017:pool-2-thread-2 Wed Mar 26 20:23:47 JST 2017:pool-2-thread-3
下記のように、それぞれのdoSomething()の間にsleepを入れて実行してみると、
@Test public void newCachedThreadPoolUsingCacheTest() throws InterruptedException { // newCachedThreadPool ExecutorService executorService = Executors.newCachedThreadPool(); ExecutorSupplier supplier = new ExecutorSupplier(executorService); try { supplier.doSomething(); Thread.sleep(500); supplier.doSomething(); Thread.sleep(500); supplier.doSomething(); Thread.sleep(500); } finally { executorService.shutdown(); } }
キャッシュされたスレッドが再利用されているのが分かります。
Wed Mar 26 20:23:52 JST 2017:pool-5-thread-1 Wed Mar 26 20:23:53 JST 2017:pool-5-thread-1 Wed Mar 26 20:23:53 JST 2017:pool-5-thread-1
newSingleThreadScheduledExecutor
newSingleThreadScheduledExecutorは単一のワーカースレッドを使用するScheduledExecutorを生成します。
@Test public void newSingleThreadScheduledExecutorTest() throws InterruptedException { // newSingleThreadScheduledExecutor ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); ScheduledExecutorSupplier supplier = new ScheduledExecutorSupplier(scheduledExecutorService); try { System.out.println(new Date() + ":start"); supplier.doSomething(); supplier.doSomething(); supplier.doSomething(); Thread.sleep(4_000); } finally { scheduledExecutorService.shutdown(); } }
実行すると、タスクが1秒後に実行されてることが分かります。
Wed Mar 26 20:30:54 JST 2017:start Wed Mar 26 20:30:55 JST 2017:pool-1-thread-1 Wed Mar 26 20:30:55 JST 2017:pool-1-thread-1 Wed Mar 26 20:30:55 JST 2017:pool-1-thread-1
newScheduledthreadpool
newScheduledthreadpoolは引数で指定した数のスレッドを再利用するScheduledExecutorを作成します。
@Test public void newScheduledThreadPoolTest() throws InterruptedException { // newScheduledThreadPool ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); ScheduledExecutorSupplier supplier = new ScheduledExecutorSupplier(scheduledExecutorService); try { System.out.println(new Date() + ":start"); supplier.doSomething(); supplier.doSomething(); supplier.doSomething(); supplier.doSomething(); Thread.sleep(4_000); } finally { scheduledExecutorService.shutdown(); } }
実行して見ると2つのWorkerが再利用されて実行されているのが分かります。
Wed Mar 26 20:45:02 JST 2017:start Wed Mar 26 20:45:03 JST 2017:pool-1-thread-1 Wed Mar 26 20:45:03 JST 2017:pool-1-thread-2 Wed Mar 26 20:45:03 JST 2017:pool-1-thread-1 Wed Mar 26 20:45:03 JST 2017:pool-1-thread-2
newWorkStealingPool
newWorkStealingPoolは、work-stealingスレッドプールを作成します。
work-stealingでは各ワーカースレッドがワークキューを所持し、タスクを取り出して実行していきます。
タスクがなくなると、他のワーカースレッドのワークキューのタスクを実行します。
作成されるワーカースレッドはJVMが使用可能なプロセッサー数から算出されます。
@Test public void newWorkStealingPoolTest() throws InterruptedException { // newWorkStealingPool ExecutorService executorService = Executors.newWorkStealingPool(); ExecutorSupplier supplier = new ExecutorSupplier(executorService); try { supplier.doSomething(); supplier.doSomething(); supplier.doSomething(); supplier.doSomething(); supplier.doSomething(); Thread.sleep(500); } finally { executorService.shutdown(); } }
5つのタスクを実行した場合。
5つWorkerが生成され実行されています。
Wed Mar 26 20:55:39 JST 2017:ForkJoinPool-1-worker-4 Wed Mar 26 20:55:39 JST 2017:ForkJoinPool-1-worker-2 Wed Mar 26 20:55:39 JST 2017:ForkJoinPool-1-worker-1 Wed Mar 26 20:55:39 JST 2017:ForkJoinPool-1-worker-3 Wed Mar 26 20:55:39 JST 2017:ForkJoinPool-1-worker-5
5つのタスクを実行した場合。
8つWorkerが生成され実行されています。
Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-1 Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-0 Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-4 Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-1 Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-5 Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-7 Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-3 Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-2 Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-0 Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-6
ソースコードは下記にあげました。
github.com
終わり。
【参考】