Kafka ConsumerとKafka Streamsをgracefulにshutdownする

Kafka ConsumerとKafka Streamsをgracefulにshutdownする方法のメモです。


下記バージョンで試してみます。

  • kafka 2.1.0
  • zookeeper 3.4.13
  • Ubuntu 16
  • Java 1.8.0_181
  • kafka-streams 2.1.0
  • kafka-clients 2.1.0

Kafka Consumer

Kafka Consumerでポーリングループ中にshutdownするのはどうすればいいのかと思っていると
下記のようにShutdown Hookを利用して、KafkaConsumer.wakeup()を呼び出せばよいようです。
https://stackoverflow.com/questions/46581674/how-to-finish-kafka-consumer-safetyis-there-meaning-to-call-threadjoin-inside
https://stackoverflow.com/questions/35566011/shutting-down-kafka-consumer

Consumer

下記のようにshutdown hookを登録して、consumer.wakeup()を呼びだします。
http://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#wakeup--

KafkaConsumer.poll()が呼ばれているときにwakeup()が呼ばれると、WakeupExceptionがthrowされ、
ポーリングを中断することができます。
http://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-
http://kafka.apache.org/21/javadoc/org/apache/kafka/common/errors/WakeupException.html

あとはmainThread.join()でメインスレッドの終了を待機します。

ShutdownConsumer.java

    final Thread mainThread = Thread.currentThread();

    Runtime.getRuntime().addShutdownHook(new Thread("shutdown-thread") {
        public void run() {
            System.out.println("start to wakeup");

            // thrown WakeUpException
            consumer.wakeup();

            System.out.println("end to wakeup");
            try {
                mainThread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

main threadの方ではKafkaConsumerからthrowされるWakeupExceptionをcatchし、finallyでconsumer.close()します。

ShutdownConsumer.java

    try {
        while (true) {
            ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1_000));

            // print information about topic record
            records.forEach(record -> System.out.println( "topic: " + record.topic()
                    + ", partition: " + record.partition()
                    + ", key: " + record.key()
                    + ", value: " + record.value()
            ));
            Thread.sleep(1_000L);
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (WakeupException ignore) {
        System.out.println("caught WakeUpException");
    } finally {
        consumer.close();
        System.out.println("closed consumer");
    }
動作確認

consume中に停止してみます

producerログ

partition: 0, topic: s2, offset: 0, key: 0, value: IXywX
partition: 0, topic: s2, offset: 1, key: 1, value: RrmzA
partition: 0, topic: s2, offset: 2, key: 2, value: tzZWr
partition: 0, topic: s2, offset: 3, key: 3, value: rLxrC
partition: 0, topic: s2, offset: 4, key: 4, value: LTBWM
partition: 0, topic: s2, offset: 5, key: 5, value: TuAZN
partition: 0, topic: s2, offset: 6, key: 6, value: VoWcE
partition: 0, topic: s2, offset: 7, key: 7, value: SihCu
partition: 0, topic: s2, offset: 8, key: 8, value: XluLj
partition: 0, topic: s2, offset: 9, key: 9, value: WaajJ
partition: 0, topic: s2, offset: 10, key: 10, value: HXAtg
partition: 0, topic: s2, offset: 11, key: 11, value: AFSzh

consumerログ

topic: s2, partition: 0, key: 0, value: IXywX
topic: s2, partition: 0, key: 1, value: RrmzA
topic: s2, partition: 0, key: 2, value: tzZWr
topic: s2, partition: 0, key: 3, value: rLxrC
topic: s2, partition: 0, key: 4, value: LTBWM
topic: s2, partition: 0, key: 5, value: TuAZN
topic: s2, partition: 0, key: 6, value: VoWcE
topic: s2, partition: 0, key: 7, value: SihCu
topic: s2, partition: 0, key: 8, value: XluLj
topic: s2, partition: 0, key: 9, value: WaajJ
topic: s2, partition: 0, key: 10, value: HXAtg
start to wakeup
end to wakeup
caught WakeUpException
closed consumer


KafkaConsumer.wakeup()したときは下記のように動作している。

KafkaConsumer.wakeup()すると、ConsumerNetworkClient.wakeup()が呼び出される。

KafkaConsumer.java

    private final ConsumerNetworkClient client;

    /**
     * Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll.
     * The thread which is blocking in an operation will throw {@link org.apache.kafka.common.errors.WakeupException}.
     * If no thread is blocking in a method which can throw {@link org.apache.kafka.common.errors.WakeupException}, the next call to such a method will raise it instead.
     */
    @Override
    public void wakeup() {
        this.client.wakeup();
    }

ConsumerNetworkClient側ではAtomicBooleanのwakeupをtrueに設定。

ConsumerNetworkClient.java

    // this flag allows the client to be safely woken up without waiting on the lock above. It is
    // atomic to avoid the need to acquire the lock above in order to enable it concurrently.
    private final AtomicBoolean wakeup = new AtomicBoolean(false);

    private final KafkaClient client;

    /**
     * Wakeup an active poll. This will cause the polling thread to throw an exception either
     * on the current poll if one is active, or the next poll.
     */
    public void wakeup() {
        // wakeup should be safe without holding the client lock since it simply delegates to
        // Selector's wakeup, which is thread-safe
        log.debug("Received user wakeup");
        this.wakeup.set(true);
        this.client.wakeup();
    }

KafkaConsumer.poll()ではwhileループでConsumerNetworkClient.maybeTriggerWakeup()を呼んでいる。

KafkaConsumer.java

    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
        acquireAndEnsureOpen();
        try {
            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
            }

            // poll for new data until the timeout expires
            do {
                client.maybeTriggerWakeup();

								       :
								       :
								       :

            } while (timer.notExpired());

            return ConsumerRecords.empty();
        } finally {
            release();
        }
    }

ConsumerNetworkClient.maybeTriggerWakeup()ではAtomicBooleanのwakeupがtrueだと
WakeupExceptionをthrowしている。

ConsumerNetworkClient.java

    public void maybeTriggerWakeup() {
        if (!wakeupDisabled.get() && wakeup.get()) {
            log.debug("Raising WakeupException in response to user wakeup");
            wakeup.set(false);
            throw new WakeupException();
        }
    }

Kafka Streams

Kafka Streamsもストリーム処理のため、streams.start()すると起動し続けます。

下記のdeveloper guideにある通り、shutdown hookを利用して、KafkaStreams.close()を呼ぶようにします。
https://kafka.apache.org/21/documentation/streams/developer-guide/write-streams

Consumer

ShutdownStream.java

    StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<Integer, String> kStream = streamsBuilder.stream("s");

    kStream.foreach(((key, value) -> System.out.println("key: " + key + ", value: " + value)));

    KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties);

    Runtime.getRuntime().addShutdownHook(new Thread("stream-shutdown-hook") {
        @Override
        public void run() {
            streams.close();
            System.out.println("close stream.");
        }
    });

    try {
        streams.start();
    } catch (final Throwable ignore) {
        System.exit(1);
    }
動作確認

consume中に停止してみます

producerログ

partition: 0, topic: s, offset: 52, key: 0, value: nAJrA
partition: 0, topic: s, offset: 53, key: 1, value: foIEm
partition: 0, topic: s, offset: 54, key: 2, value: LlPPN
partition: 0, topic: s, offset: 55, key: 3, value: zPtlf
partition: 0, topic: s, offset: 56, key: 4, value: FytYg
partition: 0, topic: s, offset: 57, key: 5, value: tQkdc
partition: 0, topic: s, offset: 58, key: 6, value: TmXOl
partition: 0, topic: s, offset: 59, key: 7, value: ObZsc
partition: 0, topic: s, offset: 60, key: 8, value: mCowA
partition: 0, topic: s, offset: 61, key: 9, value: sAYnN
partition: 0, topic: s, offset: 62, key: 10, value: NoJXh
partition: 0, topic: s, offset: 63, key: 11, value: mXwKm
partition: 0, topic: s, offset: 64, key: 12, value: NMQZr

consumerログ

key: 0, value: nAJrA
key: 1, value: foIEm
key: 2, value: LlPPN
key: 3, value: zPtlf
key: 4, value: FytYg
key: 5, value: tQkdc
key: 6, value: TmXOl
key: 7, value: ObZsc
key: 8, value: mCowA
close stream.


終わり。


サンプルコードは下記にあげました。

KafkaConsumer
github.com

KafkaStreams
github.com

producer
github.com