JavaのThreadPoolを試す

JavaのThreadPoolを試してみたメモです。

Javajava.util.concurrent.ExecutorsのThreadPoolを触ってみました。

Executors

ExecutorsはExecutor,ExecutorService,ScheduledExecutorServiceのファクトリメソッドや
ユーティリティメソッドを提供するクラスです。

Executor

Executorインターフェースは戻り値のないタスクを実行するメソッドを提供します。
void execute()だけが定義されています。

ExecutorService

ExecutorServiceはExecutorのサブインターフェースで、
タスクの終了を管理するメソッドや、戻り値のあるタスク(Future)のメソッドを提供します。

ScheduledExecutorService

ScheduledExecutorServiceはExecutorServiceのサブインターフェースで、
タスクの繰り返し実行や、一定時間待機後のタスク実行を提供します。

テスト用クラス

テスト用に下記のクラスを用意しました。

ExecutorSupplierはExecutorを受け取って、doSomethingでタスクを実行します。

private static class ExecutorSupplier {
    private final Executor executor;

    public ExecutorSupplier(Executor executor) {
        this.executor = executor;
    }

    public void doSomething() {
        executor.execute(
                () -> {
                    // doSomething
                    System.out.println(new Date() + ":" + Thread.currentThread().getName());
                }
        );
    }
}

ScheduledExecutorSupplierはScheduledExecutorServiceを受け取って、
doSomethingで1,000msec後にタスクを実行します。

private static class ScheduledExecutorSupplier {
    private final ScheduledExecutorService scheduledExecutorService;

    public ScheduledExecutorSupplier(ScheduledExecutorService scheduledExecutorService) {
        this.scheduledExecutorService = scheduledExecutorService;
    }

    public void doSomething() {
        scheduledExecutorService.schedule(
                () -> {
                    // doSomething
                    System.out.println(new Date() + ":" + Thread.currentThread().getName());
                },
                1_000L,
                TimeUnit.MILLISECONDS
        );
    }
}

newSingleThreadExecutor

newSingleThreadExecutorは単一のワーカースレッドを使用するExecutorを生成します。

@Test
public void newSingleThreadExecutorTest() throws InterruptedException {
    // newSingleThreadExecutor
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    ExecutorSupplier supplier = new ExecutorSupplier(executorService);
    try {
        supplier.doSomething();
        supplier.doSomething();
        supplier.doSomething();
        Thread.sleep(500);
    } finally {
        executorService.shutdown();
    }
}

実行してみるとすべて同じWorkerで実行されています。

Tue Mar 26 20:36:16 JST 2017:pool-3-thread-1
Tue Mar 26 20:36:16 JST 2017:pool-3-thread-1
Tue Mar 26 20:36:16 JST 2017:pool-3-thread-1

newFixedThreadPool

newFixedThreadPoolは引数で指定した数のスレッドを再利用するスレッドプールを作成します。

@Test
public void newFixedThreadPoolTest() throws InterruptedException {
    // newFixedThreadPool
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    ExecutorSupplier supplier = new ExecutorSupplier(executorService);
    try {
        supplier.doSomething();
        supplier.doSomething();
        supplier.doSomething();
        supplier.doSomething();
        Thread.sleep(500);
    } finally {
        executorService.shutdown();
    }
}

実行して見ると2つのWorkerが再利用されて実行されているのが分かります。

Tue Mar 26 20:40:00 JST 2017:pool-1-thread-2
Tue Mar 26 20:40:00 JST 2017:pool-1-thread-1
Tue Mar 26 20:40:00 JST 2017:pool-1-thread-2
Tue Mar 26 20:40:00 JST 2017:pool-1-thread-1

newCachedThreadPool

newCachedThreadPoolは新規にスレッドを生成しますが、再利用可能な場合はスレッドを再利用します。
60秒間使用されなかったスレッドはスレッドプールから削除されます。

下記のように連続でdoSomething()を実行してみると、

@Test
public void newCachedThreadPoolTest() throws InterruptedException {
    // newCachedThreadPool
    ExecutorService executorService = Executors.newCachedThreadPool();
    ExecutorSupplier supplier = new ExecutorSupplier(executorService);
    try {
        supplier.doSomething();
        supplier.doSomething();
        supplier.doSomething();
        Thread.sleep(500);
    } finally {
        executorService.shutdown();
    }
}

新しいスレッドが作成され実行されています。

Wed Mar 26 20:23:47 JST 2017:pool-2-thread-1
Wed Mar 26 20:23:47 JST 2017:pool-2-thread-2
Wed Mar 26 20:23:47 JST 2017:pool-2-thread-3

下記のように、それぞれのdoSomething()の間にsleepを入れて実行してみると、

@Test
public void newCachedThreadPoolUsingCacheTest() throws InterruptedException {
    // newCachedThreadPool
    ExecutorService executorService = Executors.newCachedThreadPool();
    ExecutorSupplier supplier = new ExecutorSupplier(executorService);
    try {
        supplier.doSomething();
        Thread.sleep(500);
        supplier.doSomething();
        Thread.sleep(500);
        supplier.doSomething();
        Thread.sleep(500);
    } finally {
        executorService.shutdown();
    }
}

キャッシュされたスレッドが再利用されているのが分かります。

Wed Mar 26 20:23:52 JST 2017:pool-5-thread-1
Wed Mar 26 20:23:53 JST 2017:pool-5-thread-1
Wed Mar 26 20:23:53 JST 2017:pool-5-thread-1

newSingleThreadScheduledExecutor

newSingleThreadScheduledExecutorは単一のワーカースレッドを使用するScheduledExecutorを生成します。

@Test
public void newSingleThreadScheduledExecutorTest() throws InterruptedException {
    // newSingleThreadScheduledExecutor
    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    ScheduledExecutorSupplier supplier = new ScheduledExecutorSupplier(scheduledExecutorService);
    try {
        System.out.println(new Date() + ":start");
        supplier.doSomething();
        supplier.doSomething();
        supplier.doSomething();
        Thread.sleep(4_000);
    } finally {
        scheduledExecutorService.shutdown();
    }
}

実行すると、タスクが1秒後に実行されてることが分かります。

Wed Mar 26 20:30:54 JST 2017:start
Wed Mar 26 20:30:55 JST 2017:pool-1-thread-1
Wed Mar 26 20:30:55 JST 2017:pool-1-thread-1
Wed Mar 26 20:30:55 JST 2017:pool-1-thread-1

newScheduledthreadpool

newScheduledthreadpoolは引数で指定した数のスレッドを再利用するScheduledExecutorを作成します。

@Test
public void newScheduledThreadPoolTest() throws InterruptedException {
    // newScheduledThreadPool
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
    ScheduledExecutorSupplier supplier = new ScheduledExecutorSupplier(scheduledExecutorService);
    try {
        System.out.println(new Date() + ":start");
        supplier.doSomething();
        supplier.doSomething();
        supplier.doSomething();
        supplier.doSomething();
        Thread.sleep(4_000);
    } finally {
        scheduledExecutorService.shutdown();
    }
}

実行して見ると2つのWorkerが再利用されて実行されているのが分かります。

Wed Mar 26 20:45:02 JST 2017:start
Wed Mar 26 20:45:03 JST 2017:pool-1-thread-1
Wed Mar 26 20:45:03 JST 2017:pool-1-thread-2
Wed Mar 26 20:45:03 JST 2017:pool-1-thread-1
Wed Mar 26 20:45:03 JST 2017:pool-1-thread-2

newWorkStealingPool

newWorkStealingPoolは、work-stealingスレッドプールを作成します。
work-stealingでは各ワーカースレッドがワークキューを所持し、タスクを取り出して実行していきます。
タスクがなくなると、他のワーカースレッドのワークキューのタスクを実行します。
作成されるワーカースレッドはJVMが使用可能なプロセッサー数から算出されます。

@Test
public void newWorkStealingPoolTest() throws InterruptedException {
    // newWorkStealingPool
    ExecutorService executorService = Executors.newWorkStealingPool();
    ExecutorSupplier supplier = new ExecutorSupplier(executorService);
    try {
        supplier.doSomething();
        supplier.doSomething();
        supplier.doSomething();
        supplier.doSomething();
        supplier.doSomething();
        Thread.sleep(500);
    } finally {
        executorService.shutdown();
    }
}

5つのタスクを実行した場合。
5つWorkerが生成され実行されています。

Wed Mar 26 20:55:39 JST 2017:ForkJoinPool-1-worker-4
Wed Mar 26 20:55:39 JST 2017:ForkJoinPool-1-worker-2
Wed Mar 26 20:55:39 JST 2017:ForkJoinPool-1-worker-1
Wed Mar 26 20:55:39 JST 2017:ForkJoinPool-1-worker-3
Wed Mar 26 20:55:39 JST 2017:ForkJoinPool-1-worker-5

5つのタスクを実行した場合。
8つWorkerが生成され実行されています。

Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-1
Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-0
Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-4
Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-1
Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-5
Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-7
Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-3
Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-2
Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-0
Wed Mar 26 21:11:21 JST 2017:ForkJoinPool-1-worker-6

ソースコードは下記にあげました。
github.com

終わり。

【参考】