KafkaのJava clientを試す
Kafkaのjava clientを試してみたメモです。
前回Kafkaをインストールしてコマンドラインツールで簡単な操作だけ試してみたので、
java clientから操作してみようと思います。
下記バージョンで試してみます。
java以外のclientも色々あるようです。
Clients - Apache Kafka - Apache Software Foundation
Dependency
gradleで試してみます。
dependencyに下記を追加しました。
build.gradle
dependencies {
compile 'org.apache.kafka:kafka-clients:2.0.0'
}
Producer
Producerを作成してみます。
Producer API
Apache Kafka
KafkaProducer
kafka 2.0.0 API
configuration
Propertiesに設定をしていきます。
BOOTSTRAP_SERVERS_CONFIGにBrokerのホスト、ポートを指定します。
複数ある場合はカンマ区切りで指定します。
KEY_SERIALIZER_CLASS_CONFIGとVALUE_SERIALIZER_CLASS_CONFIGにメッセージのkeyとvalueのシリアライザを指定します。
文字列としてメッセージを送信するためにStringSerializerを指定します。
作成したPropertiesを指定して、KafkaProducerインスタンスを生成します。
SyncSendProducer.java
Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
sync send
メッセージを同期送信してみます。
送信するメッセージとしてProducerRecordを作成します。
コンストラクタにトピック名と、送信するkey, valueを指定します。
今回はkeyを省略してトピック名とvalueだけ指定しています。
send(ProducerRecord<K, V>)でProducerRecordが同期送信され、Futureが返されます。
FutureからRecordMetadataを取得して、情報を表示してみます。
SyncSendProducer.java
IntStream.range(0, 100) .forEach(i -> { ProducerRecord<String, String> record = new ProducerRecord<>("mytopic", "my_value-" + i); try { // sync send Future<RecordMetadata> send = producer.send(record); RecordMetadata recordMetadata = send.get(); System.out.println("============================="); System.out.println(LocalDateTime.now()); System.out.println("topic: " + recordMetadata.topic()); System.out.println("partition: " + recordMetadata.partition()); System.out.println("offset: " + recordMetadata.offset()); Thread.sleep(200L); } catch (Exception e) { e.printStackTrace(); } });
async send
メッセージを非同期送信することも出来ます。
同期送信の時と同じ様にProducerRecordを作成し、
send(ProducerRecord<K, V>, Callback)で、Callbackを指定すると非同期送信されます。
送信が完了すると指定したCallbackが呼び出されます。
Callbackで取得出来るRecordMetadataの情報を表示してみます。
AsyncSendProducer.java
IntStream.range(0, 100) .forEach(i -> { ProducerRecord<String, String> record = new ProducerRecord<>("async_topic", "async_value-" + i); try { // async send producer.send(record, (metadata, exception) -> { System.out.println("============================="); System.out.println(LocalDateTime.now()); System.out.println("callback!!"); System.out.println("topic: " + metadata.topic()); System.out.println("partition: " + metadata.partition()); System.out.println("offset: " + metadata.offset()); }); Thread.sleep(100L); } catch (Exception e) { e.printStackTrace(); } });
Consumer
Consumerを作成してみます。
Consumer API
Apache Kafka
KafkaConsumer
kafka 2.0.0 API
configuration
Propertiesに設定をしていきます。
BOOTSTRAP_SERVERS_CONFIGにBrokerのホスト、ポートをしていします。 複数ある場合はカンマ区切りで指定します。
GROUP_ID_CONFIGでConsumer Groupを指定します。
今回はGroupは一つにするので適当な名前を指定しておきます。
KEY_DESERIALIZER_CLASS_CONFIGとVALUE_DESERIALIZER_CLASS_CONFIGにメッセージのkeyとvalueのデシリアライザを指定します。
文字列としてメッセージを取得するためStringDeserializerを指定します。
作成したPropertiesを指定して、KafkaConsumerインスタンスを生成します。
AutoCommitConsumer.java
Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myConsumerGroup"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
auto commit
topic(のpartition)でどこまでメッセージを読む際、どこまで読んだかを記録する必要があります。
Consumerで読み取ったoffset(位置)をcommitすることで、offsetを更新することが出来ます。
デフォルトでは5,000msecごとに自動でcommitが行われます。
commitされるoffsetは最後にpoll()して取得したレコードの最大のoffsetです。
まずデフォルトの設定で試してみようと思います。
consumer.subscribe()でtopicのリストを指定します。
consumer.poll()でレコードを取得します。
引数にはレコードががない場合に待機する時間を指定します(レコードがある場合はすぐに戻る)
ConsumerRecordsを取得し、イテレーションして各レコードを取得します。
各レコードの情報を出力し、partitionのoffsetの情報も出力してみます。
AutoCommitConsumer.java
consumer.subscribe(Collections.singletonList("sync_topic")); try { while (true) { // Deprecated // ConsumerRecords<String, String> records = consumer.poll(1_000); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1_000)); if (records.count() > 0) { System.out.println("============================="); System.out.println("[record size] " + records.count()); } records.forEach(record -> { System.out.println("============================="); System.out.println(LocalDateTime.now()); System.out.println("topic: " + record.topic()); System.out.println("partition: " + record.partition()); System.out.println("key: " + record.key()); System.out.println("value: " + record.value()); System.out.println("offset: " + record.offset()); TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition); if (offsetAndMetadata != null) { System.out.println("partition offset: " + offsetAndMetadata.offset()); } }); Thread.sleep(1_000L); } } catch (InterruptedException e) { e.printStackTrace(); } finally { consumer.close(); }
動作確認
SyncProducerとAutoCommitのConsumerで確認してみます。
Producer ログ
offsetが1ずつインクリメントされています。
============================= 2018-10-09T04:16:23.143 topic: mytopic partition: 0 offset: 0 ============================= 2018-10-09T04:16:23.346 topic: mytopic partition: 0 offset: 1 ============================= 2018-10-09T04:16:23.548 topic: mytopic partition: 0 offset: 2 ============================= 2018-10-09T04:16:23.750 topic: mytopic partition: 0 offset: 3 : : ============================= 2018-10-09T04:16:42.894 topic: mytopic partition: 0 offset: 97 ============================= 2018-10-09T04:16:43.099 topic: mytopic partition: 0 offset: 98 ============================= 2018-10-09T04:16:43.301 topic: mytopic partition: 0 offset: 99
Consumer ログ
[record size] が一度にpollして取得したレコードサイズです。
offsetがレコードのoffsetです。
partition offsetがpartitionのoffsetで、commitされることで更新されます。
Auto Commitなので5秒ごとにcommitされて、partition offsetの値が更新されています。
============================= [record size] 4 ============================= 2018-10-09T04:16:23.940 topic: mytopic partition: 0 key: null value: my_value-0 offset: 0 partition offset: 0 ============================= 2018-10-09T04:16:23.942 topic: mytopic partition: 0 key: null value: my_value-1 offset: 1 partition offset: 0 ============================= 2018-10-09T04:16:23.944 topic: mytopic partition: 0 key: null value: my_value-2 offset: 2 partition offset: 0 ============================= 2018-10-09T04:16:23.945 topic: mytopic partition: 0 key: null value: my_value-3 offset: 3 partition offset: 0 : : ============================= [record size] 5 ============================= 2018-10-09T04:16:27.985 topic: mytopic partition: 0 key: null value: my_value-14 offset: 14 partition offset: 0 ============================= 2018-10-09T04:16:27.996 topic: mytopic partition: 0 key: null value: my_value-15 offset: 15 partition offset: 0 ============================= 2018-10-09T04:16:28.001 topic: mytopic partition: 0 key: null value: my_value-16 offset: 16 partition offset: 0 ============================= 2018-10-09T04:16:28.005 topic: mytopic partition: 0 key: null value: my_value-17 offset: 17 partition offset: 0 ============================= 2018-10-09T04:16:28.009 topic: mytopic partition: 0 key: null value: my_value-18 offset: 18 partition offset: 0 ============================= [record size] 5 ============================= 2018-10-09T04:16:29.016 topic: mytopic partition: 0 key: null value: my_value-19 offset: 19 partition offset: 19 ============================= 2018-10-09T04:16:29.028 topic: mytopic partition: 0 key: null value: my_value-20 offset: 20 partition offset: 19 ============================= 2018-10-09T04:16:29.035 topic: mytopic partition: 0 key: null value: my_value-21 offset: 21 partition offset: 19 : : ============================= [record size] 5 ============================= 2018-10-09T04:16:33.125 topic: mytopic partition: 0 key: null value: my_value-39 offset: 39 partition offset: 19 ============================= 2018-10-09T04:16:33.131 topic: mytopic partition: 0 key: null value: my_value-40 offset: 40 partition offset: 19 ============================= 2018-10-09T04:16:33.133 topic: mytopic partition: 0 key: null value: my_value-41 offset: 41 partition offset: 19 ============================= 2018-10-09T04:16:33.135 topic: mytopic partition: 0 key: null value: my_value-42 offset: 42 partition offset: 19 ============================= 2018-10-09T04:16:33.137 topic: mytopic partition: 0 key: null value: my_value-43 offset: 43 partition offset: 19 ============================= [record size] 6 ============================= 2018-10-09T04:16:34.141 topic: mytopic partition: 0 key: null value: my_value-44 offset: 44 partition offset: 44 ============================= 2018-10-09T04:16:34.145 topic: mytopic partition: 0 key: null value: my_value-45 offset: 45 partition offset: 44 ============================= 2018-10-09T04:16:34.146 topic: mytopic partition: 0 key: null value: my_value-46 offset: 46 partition offset: 44 : :
manual commit
Consumerはデフォルトでauto commitですが、任意のタイミングでも手動commit可能です。
手動commitも同期と非同期があるので、同期の手動commitを試してみます。
Propertiesに下記を追加します。
ENABLE_AUTO_COMMIT_CONFIGをfalseにするとauto commitが無効になります。
ManualSyncCommitConsumer.java
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
auto commitの場合と同様にconsumer.poll()でレコードを取得します。
poll()で取得したレコード数処理するごとに、consumer.commitSync()でoffsetを同期commitします。
ManualSyncCommitConsumer.java
consumer.subscribe(Collections.singletonList("topic1")); try { int readCount = 0; while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1_000)); if (records.count() > 0) { System.out.println("============================="); System.out.println("[record size] " + records.count()); } for (ConsumerRecord<String, String> record : records) { readCount++; System.out.println("============================="); System.out.println(LocalDateTime.now()); System.out.println("topic: " + record.topic()); System.out.println("partition: " + record.partition()); System.out.println("key: " + record.key()); System.out.println("value: " + record.value()); System.out.println("offset: " + record.offset()); TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition); if (offsetAndMetadata != null) { System.out.println("partition offset: " + offsetAndMetadata.offset()); } if (readCount == records.count()) { consumer.commitSync(); readCount = 0; } } Thread.sleep(1_000L); } } catch (InterruptedException e) { e.printStackTrace(); } finally { consumer.close(); }
非同期commitする場合は、consumer.commitAsync()を指定します。
consumer.commitAsync();
動作確認
SyncProducerとmanual commitのConsumerで確認してみます。
Producer ログ
offsetが1ずつインクリメントされています。
============================= 2018-10-12T03:30:16.748 topic: topic1 partition: 0 offset: 0 ============================= 2018-10-12T03:30:16.851 topic: topic1 partition: 0 offset: 1 ============================= 2018-10-12T03:30:16.955 topic: topic1 partition: 0 offset: 2 ============================= 2018-10-12T03:30:17.057 topic: topic1 partition: 0 offset: 3 : : ============================= 2018-10-12T03:30:26.869 topic: topic1 partition: 0 offset: 97 ============================= 2018-10-12T03:30:26.975 topic: topic1 partition: 0 offset: 98 ============================= 2018-10-12T03:30:27.091 topic: topic1 partition: 0 offset: 99
Consumer ログ
[record size] が一度にpollして取得したレコードサイズです。
offsetがレコードのoffsetです。
partition offsetがpartitionのoffsetで、commitされることで更新されます。
manual commitでpollで取得したレコードを全部処理したあとにcommitしています。
consumer.commitSync()するとpollで取得したレコードの最大のoffsetがcommitされます。
[record size]で取得したレコード数処理されるごとにcommitされて、partition offsetの値が更新されています。
============================= [record size] 1 ============================= 2018-10-12T03:30:16.755 topic: topic1 partition: 0 key: null value: my_value-0 offset: 0 ============================= [record size] 1 ============================= 2018-10-12T03:30:17.764 topic: topic1 partition: 0 key: null value: my_value-1 offset: 1 partition offset: 1 ============================= [record size] 8 ============================= 2018-10-12T03:30:18.772 topic: topic1 partition: 0 key: null value: my_value-2 offset: 2 partition offset: 2 ============================= 2018-10-12T03:30:18.778 topic: topic1 partition: 0 key: null value: my_value-3 offset: 3 partition offset: 2 ============================= 2018-10-12T03:30:18.782 topic: topic1 partition: 0 key: null value: my_value-4 offset: 4 partition offset: 2 ============================= 2018-10-12T03:30:18.784 topic: topic1 partition: 0 key: null value: my_value-5 offset: 5 partition offset: 2 ============================= 2018-10-12T03:30:18.787 topic: topic1 partition: 0 key: null value: my_value-6 offset: 6 partition offset: 2 ============================= 2018-10-12T03:30:18.789 topic: topic1 partition: 0 key: null value: my_value-7 offset: 7 partition offset: 2 ============================= 2018-10-12T03:30:18.792 topic: topic1 partition: 0 key: null value: my_value-8 offset: 8 partition offset: 2 ============================= 2018-10-12T03:30:18.795 topic: topic1 partition: 0 key: null value: my_value-9 offset: 9 partition offset: 2 ============================= [record size] 10 ============================= 2018-10-12T03:30:19.804 topic: topic1 partition: 0 key: null value: my_value-10 offset: 10 partition offset: 10 ============================= 2018-10-12T03:30:19.809 topic: topic1 partition: 0 key: null value: my_value-11 offset: 11 partition offset: 10 : : ============================= 2018-10-12T03:30:19.836 topic: topic1 partition: 0 key: null value: my_value-18 offset: 18 partition offset: 10 ============================= 2018-10-12T03:30:19.837 topic: topic1 partition: 0 key: null value: my_value-19 offset: 19 partition offset: 10 ============================= [record size] 10 ============================= 2018-10-12T03:30:20.844 topic: topic1 partition: 0 key: null value: my_value-20 offset: 20 partition offset: 20 ============================= 2018-10-12T03:30:20.846 topic: topic1 partition: 0 key: null value: my_value-21 offset: 21 partition offset: 20 : :
試したのはこんなとこです。
サンプルコードは下記に上げました。
Producer
github.com
Consumer
github.com
終わり。