Kafka ConsumerとKafka Streamsをgracefulにshutdownする
Kafka ConsumerとKafka Streamsをgracefulにshutdownする方法のメモです。
下記バージョンで試してみます。
Kafka Consumer
Kafka Consumerでポーリングループ中にshutdownするのはどうすればいいのかと思っていると
下記のようにShutdown Hookを利用して、KafkaConsumer.wakeup()を呼び出せばよいようです。
https://stackoverflow.com/questions/46581674/how-to-finish-kafka-consumer-safetyis-there-meaning-to-call-threadjoin-inside
https://stackoverflow.com/questions/35566011/shutting-down-kafka-consumer
Consumer
下記のようにshutdown hookを登録して、consumer.wakeup()を呼びだします。
http://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#wakeup--
KafkaConsumer.poll()が呼ばれているときにwakeup()が呼ばれると、WakeupExceptionがthrowされ、
ポーリングを中断することができます。
http://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-
http://kafka.apache.org/21/javadoc/org/apache/kafka/common/errors/WakeupException.html
あとはmainThread.join()でメインスレッドの終了を待機します。
ShutdownConsumer.java
final Thread mainThread = Thread.currentThread(); Runtime.getRuntime().addShutdownHook(new Thread("shutdown-thread") { public void run() { System.out.println("start to wakeup"); // thrown WakeUpException consumer.wakeup(); System.out.println("end to wakeup"); try { mainThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } });
main threadの方ではKafkaConsumerからthrowされるWakeupExceptionをcatchし、finallyでconsumer.close()します。
ShutdownConsumer.java
try { while (true) { ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(1_000)); // print information about topic record records.forEach(record -> System.out.println( "topic: " + record.topic() + ", partition: " + record.partition() + ", key: " + record.key() + ", value: " + record.value() )); Thread.sleep(1_000L); } } catch (InterruptedException e) { e.printStackTrace(); } catch (WakeupException ignore) { System.out.println("caught WakeUpException"); } finally { consumer.close(); System.out.println("closed consumer"); }
動作確認
consume中に停止してみます
producerログ
partition: 0, topic: s2, offset: 0, key: 0, value: IXywX partition: 0, topic: s2, offset: 1, key: 1, value: RrmzA partition: 0, topic: s2, offset: 2, key: 2, value: tzZWr partition: 0, topic: s2, offset: 3, key: 3, value: rLxrC partition: 0, topic: s2, offset: 4, key: 4, value: LTBWM partition: 0, topic: s2, offset: 5, key: 5, value: TuAZN partition: 0, topic: s2, offset: 6, key: 6, value: VoWcE partition: 0, topic: s2, offset: 7, key: 7, value: SihCu partition: 0, topic: s2, offset: 8, key: 8, value: XluLj partition: 0, topic: s2, offset: 9, key: 9, value: WaajJ partition: 0, topic: s2, offset: 10, key: 10, value: HXAtg partition: 0, topic: s2, offset: 11, key: 11, value: AFSzh
consumerログ
topic: s2, partition: 0, key: 0, value: IXywX topic: s2, partition: 0, key: 1, value: RrmzA topic: s2, partition: 0, key: 2, value: tzZWr topic: s2, partition: 0, key: 3, value: rLxrC topic: s2, partition: 0, key: 4, value: LTBWM topic: s2, partition: 0, key: 5, value: TuAZN topic: s2, partition: 0, key: 6, value: VoWcE topic: s2, partition: 0, key: 7, value: SihCu topic: s2, partition: 0, key: 8, value: XluLj topic: s2, partition: 0, key: 9, value: WaajJ topic: s2, partition: 0, key: 10, value: HXAtg start to wakeup end to wakeup caught WakeUpException closed consumer
KafkaConsumer.wakeup()したときは下記のように動作している。
KafkaConsumer.wakeup()すると、ConsumerNetworkClient.wakeup()が呼び出される。
KafkaConsumer.java
private final ConsumerNetworkClient client; /** * Wakeup the consumer. This method is thread-safe and is useful in particular to abort a long poll. * The thread which is blocking in an operation will throw {@link org.apache.kafka.common.errors.WakeupException}. * If no thread is blocking in a method which can throw {@link org.apache.kafka.common.errors.WakeupException}, the next call to such a method will raise it instead. */ @Override public void wakeup() { this.client.wakeup(); }
ConsumerNetworkClient側ではAtomicBooleanのwakeupをtrueに設定。
ConsumerNetworkClient.java
// this flag allows the client to be safely woken up without waiting on the lock above. It is // atomic to avoid the need to acquire the lock above in order to enable it concurrently. private final AtomicBoolean wakeup = new AtomicBoolean(false); private final KafkaClient client; /** * Wakeup an active poll. This will cause the polling thread to throw an exception either * on the current poll if one is active, or the next poll. */ public void wakeup() { // wakeup should be safe without holding the client lock since it simply delegates to // Selector's wakeup, which is thread-safe log.debug("Received user wakeup"); this.wakeup.set(true); this.client.wakeup(); }
KafkaConsumer.poll()ではwhileループでConsumerNetworkClient.maybeTriggerWakeup()を呼んでいる。
KafkaConsumer.java
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) { acquireAndEnsureOpen(); try { if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); } // poll for new data until the timeout expires do { client.maybeTriggerWakeup(); : : : } while (timer.notExpired()); return ConsumerRecords.empty(); } finally { release(); } }
ConsumerNetworkClient.maybeTriggerWakeup()ではAtomicBooleanのwakeupがtrueだと
WakeupExceptionをthrowしている。
ConsumerNetworkClient.java
public void maybeTriggerWakeup() { if (!wakeupDisabled.get() && wakeup.get()) { log.debug("Raising WakeupException in response to user wakeup"); wakeup.set(false); throw new WakeupException(); } }
Kafka Streams
Kafka Streamsもストリーム処理のため、streams.start()すると起動し続けます。
下記のdeveloper guideにある通り、shutdown hookを利用して、KafkaStreams.close()を呼ぶようにします。
https://kafka.apache.org/21/documentation/streams/developer-guide/write-streams
Consumer
ShutdownStream.java
StreamsBuilder streamsBuilder = new StreamsBuilder(); KStream<Integer, String> kStream = streamsBuilder.stream("s"); kStream.foreach(((key, value) -> System.out.println("key: " + key + ", value: " + value))); KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties); Runtime.getRuntime().addShutdownHook(new Thread("stream-shutdown-hook") { @Override public void run() { streams.close(); System.out.println("close stream."); } }); try { streams.start(); } catch (final Throwable ignore) { System.exit(1); }
動作確認
consume中に停止してみます
producerログ
partition: 0, topic: s, offset: 52, key: 0, value: nAJrA partition: 0, topic: s, offset: 53, key: 1, value: foIEm partition: 0, topic: s, offset: 54, key: 2, value: LlPPN partition: 0, topic: s, offset: 55, key: 3, value: zPtlf partition: 0, topic: s, offset: 56, key: 4, value: FytYg partition: 0, topic: s, offset: 57, key: 5, value: tQkdc partition: 0, topic: s, offset: 58, key: 6, value: TmXOl partition: 0, topic: s, offset: 59, key: 7, value: ObZsc partition: 0, topic: s, offset: 60, key: 8, value: mCowA partition: 0, topic: s, offset: 61, key: 9, value: sAYnN partition: 0, topic: s, offset: 62, key: 10, value: NoJXh partition: 0, topic: s, offset: 63, key: 11, value: mXwKm partition: 0, topic: s, offset: 64, key: 12, value: NMQZr
consumerログ
key: 0, value: nAJrA key: 1, value: foIEm key: 2, value: LlPPN key: 3, value: zPtlf key: 4, value: FytYg key: 5, value: tQkdc key: 6, value: TmXOl key: 7, value: ObZsc key: 8, value: mCowA close stream.
終わり。
サンプルコードは下記にあげました。
KafkaConsumer
github.com
KafkaStreams
github.com
producer
github.com