KafkaのJava clientを試す

Kafkaのjava clientを試してみたメモです。


前回Kafkaをインストールしてコマンドラインツールで簡単な操作だけ試してみたので、
java clientから操作してみようと思います。

下記バージョンで試してみます。

  • kafka 2.0.0
  • kafka-clients 2.0.0
  • zookeeper 3.4.13
  • Ubuntu 16
  • Java 1.8.0_181

java以外のclientも色々あるようです。
Clients - Apache Kafka - Apache Software Foundation

Dependency

gradleで試してみます。
dependencyに下記を追加しました。

build.gradle

dependencies {
    compile 'org.apache.kafka:kafka-clients:2.0.0'
}

Producer

Producerを作成してみます。

Producer API
Apache Kafka
KafkaProducer
kafka 2.0.0 API

configuration

Propertiesに設定をしていきます。
BOOTSTRAP_SERVERS_CONFIGにBrokerのホスト、ポートを指定します。
複数ある場合はカンマ区切りで指定します。

KEY_SERIALIZER_CLASS_CONFIGとVALUE_SERIALIZER_CLASS_CONFIGにメッセージのkeyとvalueのシリアライザを指定します。
文字列としてメッセージを送信するためにStringSerializerを指定します。

作成したPropertiesを指定して、KafkaProducerインスタンスを生成します。

SyncSendProducer.java

Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

sync send

メッセージを同期送信してみます。

送信するメッセージとしてProducerRecordを作成します。
コンストラクタにトピック名と、送信するkey, valueを指定します。
今回はkeyを省略してトピック名とvalueだけ指定しています。

send(ProducerRecord<K, V>)でProducerRecordが同期送信され、Futureが返されます。
FutureからRecordMetadataを取得して、情報を表示してみます。

SyncSendProducer.java

IntStream.range(0, 100)
        .forEach(i -> {
            ProducerRecord<String, String> record = new ProducerRecord<>("mytopic", "my_value-" + i);
            try {
                // sync send
                Future<RecordMetadata> send = producer.send(record);
                RecordMetadata recordMetadata = send.get();
                System.out.println("=============================");
                System.out.println(LocalDateTime.now());
                System.out.println("topic: " + recordMetadata.topic());
                System.out.println("partition: " + recordMetadata.partition());
                System.out.println("offset: " + recordMetadata.offset());

                Thread.sleep(200L);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

async send

メッセージを非同期送信することも出来ます。

同期送信の時と同じ様にProducerRecordを作成し、
send(ProducerRecord<K, V>, Callback)で、Callbackを指定すると非同期送信されます。
送信が完了すると指定したCallbackが呼び出されます。
Callbackで取得出来るRecordMetadataの情報を表示してみます。

AsyncSendProducer.java

IntStream.range(0, 100)
        .forEach(i -> {
            ProducerRecord<String, String> record = new ProducerRecord<>("async_topic", "async_value-" + i);
            try {
                // async send
                producer.send(record, (metadata, exception) -> {
                    System.out.println("=============================");
                    System.out.println(LocalDateTime.now());
                    System.out.println("callback!!");
                    System.out.println("topic: " + metadata.topic());
                    System.out.println("partition: " + metadata.partition());
                    System.out.println("offset: " + metadata.offset());
                });

                Thread.sleep(100L);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

Consumer

Consumerを作成してみます。

Consumer API
Apache Kafka
KafkaConsumer
kafka 2.0.0 API

configuration

Propertiesに設定をしていきます。
BOOTSTRAP_SERVERS_CONFIGにBrokerのホスト、ポートをしていします。 複数ある場合はカンマ区切りで指定します。

GROUP_ID_CONFIGでConsumer Groupを指定します。
今回はGroupは一つにするので適当な名前を指定しておきます。

KEY_DESERIALIZER_CLASS_CONFIGとVALUE_DESERIALIZER_CLASS_CONFIGにメッセージのkeyとvalueのデシリアライザを指定します。
文字列としてメッセージを取得するためStringDeserializerを指定します。

作成したPropertiesを指定して、KafkaConsumerインスタンスを生成します。

AutoCommitConsumer.java

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myConsumerGroup");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

auto commit

topic(のpartition)でどこまでメッセージを読む際、どこまで読んだかを記録する必要があります。

Consumerで読み取ったoffset(位置)をcommitすることで、offsetを更新することが出来ます。
デフォルトでは5,000msecごとに自動でcommitが行われます。
commitされるoffsetは最後にpoll()して取得したレコードの最大のoffsetです。
まずデフォルトの設定で試してみようと思います。

consumer.subscribe()でtopicのリストを指定します。
consumer.poll()でレコードを取得します。
引数にはレコードががない場合に待機する時間を指定します(レコードがある場合はすぐに戻る)

ConsumerRecordsを取得し、イテレーションして各レコードを取得します。

各レコードの情報を出力し、partitionのoffsetの情報も出力してみます。

AutoCommitConsumer.java

consumer.subscribe(Collections.singletonList("sync_topic"));

try {
    while (true) {
        // Deprecated
        // ConsumerRecords<String, String> records = consumer.poll(1_000);
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1_000));

        if (records.count() > 0) {
            System.out.println("=============================");
            System.out.println("[record size] " + records.count());
        }
        records.forEach(record -> {
            System.out.println("=============================");
            System.out.println(LocalDateTime.now());
            System.out.println("topic: " + record.topic());
            System.out.println("partition: " + record.partition());
            System.out.println("key: " + record.key());
            System.out.println("value: " + record.value());
            System.out.println("offset: " + record.offset());
            TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
            OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
            if (offsetAndMetadata != null) {
                System.out.println("partition offset: " + offsetAndMetadata.offset());
            }
        });

        Thread.sleep(1_000L);
    }
} catch (InterruptedException e) {
    e.printStackTrace();
} finally {
    consumer.close();
}
動作確認

SyncProducerとAutoCommitのConsumerで確認してみます。

Producer ログ

offsetが1ずつインクリメントされています。

=============================
2018-10-09T04:16:23.143
topic: mytopic
partition: 0
offset: 0
=============================
2018-10-09T04:16:23.346
topic: mytopic
partition: 0
offset: 1
=============================
2018-10-09T04:16:23.548
topic: mytopic
partition: 0
offset: 2
=============================
2018-10-09T04:16:23.750
topic: mytopic
partition: 0
offset: 3
           :
           :
=============================
2018-10-09T04:16:42.894
topic: mytopic
partition: 0
offset: 97
=============================
2018-10-09T04:16:43.099
topic: mytopic
partition: 0
offset: 98
=============================
2018-10-09T04:16:43.301
topic: mytopic
partition: 0
offset: 99


Consumer ログ

[record size] が一度にpollして取得したレコードサイズです。
offsetがレコードのoffsetです。
partition offsetがpartitionのoffsetで、commitされることで更新されます。

Auto Commitなので5秒ごとにcommitされて、partition offsetの値が更新されています。

=============================
[record size] 4
=============================
2018-10-09T04:16:23.940
topic: mytopic
partition: 0
key: null
value: my_value-0
offset: 0
partition offset: 0
=============================
2018-10-09T04:16:23.942
topic: mytopic
partition: 0
key: null
value: my_value-1
offset: 1
partition offset: 0
=============================
2018-10-09T04:16:23.944
topic: mytopic
partition: 0
key: null
value: my_value-2
offset: 2
partition offset: 0
=============================
2018-10-09T04:16:23.945
topic: mytopic
partition: 0
key: null
value: my_value-3
offset: 3
partition offset: 0
           :
           :
=============================
[record size] 5
=============================
2018-10-09T04:16:27.985
topic: mytopic
partition: 0
key: null
value: my_value-14
offset: 14
partition offset: 0
=============================
2018-10-09T04:16:27.996
topic: mytopic
partition: 0
key: null
value: my_value-15
offset: 15
partition offset: 0
=============================
2018-10-09T04:16:28.001
topic: mytopic
partition: 0
key: null
value: my_value-16
offset: 16
partition offset: 0
=============================
2018-10-09T04:16:28.005
topic: mytopic
partition: 0
key: null
value: my_value-17
offset: 17
partition offset: 0
=============================
2018-10-09T04:16:28.009
topic: mytopic
partition: 0
key: null
value: my_value-18
offset: 18
partition offset: 0
=============================
[record size] 5
=============================
2018-10-09T04:16:29.016
topic: mytopic
partition: 0
key: null
value: my_value-19
offset: 19
partition offset: 19
=============================
2018-10-09T04:16:29.028
topic: mytopic
partition: 0
key: null
value: my_value-20
offset: 20
partition offset: 19
=============================
2018-10-09T04:16:29.035
topic: mytopic
partition: 0
key: null
value: my_value-21
offset: 21
partition offset: 19
           :
           :
=============================
[record size] 5
=============================
2018-10-09T04:16:33.125
topic: mytopic
partition: 0
key: null
value: my_value-39
offset: 39
partition offset: 19
=============================
2018-10-09T04:16:33.131
topic: mytopic
partition: 0
key: null
value: my_value-40
offset: 40
partition offset: 19
=============================
2018-10-09T04:16:33.133
topic: mytopic
partition: 0
key: null
value: my_value-41
offset: 41
partition offset: 19
=============================
2018-10-09T04:16:33.135
topic: mytopic
partition: 0
key: null
value: my_value-42
offset: 42
partition offset: 19
=============================
2018-10-09T04:16:33.137
topic: mytopic
partition: 0
key: null
value: my_value-43
offset: 43
partition offset: 19
=============================
[record size] 6
=============================
2018-10-09T04:16:34.141
topic: mytopic
partition: 0
key: null
value: my_value-44
offset: 44
partition offset: 44
=============================
2018-10-09T04:16:34.145
topic: mytopic
partition: 0
key: null
value: my_value-45
offset: 45
partition offset: 44
=============================
2018-10-09T04:16:34.146
topic: mytopic
partition: 0
key: null
value: my_value-46
offset: 46
partition offset: 44
           :
           :

manual commit

Consumerはデフォルトでauto commitですが、任意のタイミングでも手動commit可能です。
手動commitも同期と非同期があるので、同期の手動commitを試してみます。

Propertiesに下記を追加します。
ENABLE_AUTO_COMMIT_CONFIGをfalseにするとauto commitが無効になります。

ManualSyncCommitConsumer.java

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

auto commitの場合と同様にconsumer.poll()でレコードを取得します。
poll()で取得したレコード数処理するごとに、consumer.commitSync()でoffsetを同期commitします。

ManualSyncCommitConsumer.java

consumer.subscribe(Collections.singletonList("topic1"));

try {
    int readCount = 0;
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1_000));

        if (records.count() > 0) {
            System.out.println("=============================");
            System.out.println("[record size] " + records.count());
        }
        for (ConsumerRecord<String, String> record : records) {
            readCount++;
            System.out.println("=============================");
            System.out.println(LocalDateTime.now());
            System.out.println("topic: " + record.topic());
            System.out.println("partition: " + record.partition());
            System.out.println("key: " + record.key());
            System.out.println("value: " + record.value());
            System.out.println("offset: " + record.offset());

            TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
            OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
            if (offsetAndMetadata != null) {
                System.out.println("partition offset: " + offsetAndMetadata.offset());
            }

            if (readCount == records.count()) {
                consumer.commitSync();
                readCount = 0;
            }
        }

        Thread.sleep(1_000L);
    }
} catch (InterruptedException e) {
    e.printStackTrace();
} finally {
    consumer.close();
}

非同期commitする場合は、consumer.commitAsync()を指定します。

consumer.commitAsync();
動作確認

SyncProducerとmanual commitのConsumerで確認してみます。

Producer ログ

offsetが1ずつインクリメントされています。

=============================
2018-10-12T03:30:16.748
topic: topic1
partition: 0
offset: 0
=============================
2018-10-12T03:30:16.851
topic: topic1
partition: 0
offset: 1
=============================
2018-10-12T03:30:16.955
topic: topic1
partition: 0
offset: 2
=============================
2018-10-12T03:30:17.057
topic: topic1
partition: 0
offset: 3
           :
           :
=============================
2018-10-12T03:30:26.869
topic: topic1
partition: 0
offset: 97
=============================
2018-10-12T03:30:26.975
topic: topic1
partition: 0
offset: 98
=============================
2018-10-12T03:30:27.091
topic: topic1
partition: 0
offset: 99


Consumer ログ

[record size] が一度にpollして取得したレコードサイズです。
offsetがレコードのoffsetです。
partition offsetがpartitionのoffsetで、commitされることで更新されます。

manual commitでpollで取得したレコードを全部処理したあとにcommitしています。
consumer.commitSync()するとpollで取得したレコードの最大のoffsetがcommitされます。

[record size]で取得したレコード数処理されるごとにcommitされて、partition offsetの値が更新されています。

=============================
[record size] 1
=============================
2018-10-12T03:30:16.755
topic: topic1
partition: 0
key: null
value: my_value-0
offset: 0
=============================
[record size] 1
=============================
2018-10-12T03:30:17.764
topic: topic1
partition: 0
key: null
value: my_value-1
offset: 1
partition offset: 1
=============================
[record size] 8
=============================
2018-10-12T03:30:18.772
topic: topic1
partition: 0
key: null
value: my_value-2
offset: 2
partition offset: 2
=============================
2018-10-12T03:30:18.778
topic: topic1
partition: 0
key: null
value: my_value-3
offset: 3
partition offset: 2
=============================
2018-10-12T03:30:18.782
topic: topic1
partition: 0
key: null
value: my_value-4
offset: 4
partition offset: 2
=============================
2018-10-12T03:30:18.784
topic: topic1
partition: 0
key: null
value: my_value-5
offset: 5
partition offset: 2
=============================
2018-10-12T03:30:18.787
topic: topic1
partition: 0
key: null
value: my_value-6
offset: 6
partition offset: 2
=============================
2018-10-12T03:30:18.789
topic: topic1
partition: 0
key: null
value: my_value-7
offset: 7
partition offset: 2
=============================
2018-10-12T03:30:18.792
topic: topic1
partition: 0
key: null
value: my_value-8
offset: 8
partition offset: 2
=============================
2018-10-12T03:30:18.795
topic: topic1
partition: 0
key: null
value: my_value-9
offset: 9
partition offset: 2
=============================
[record size] 10
=============================
2018-10-12T03:30:19.804
topic: topic1
partition: 0
key: null
value: my_value-10
offset: 10
partition offset: 10
=============================
2018-10-12T03:30:19.809
topic: topic1
partition: 0
key: null
value: my_value-11
offset: 11
partition offset: 10
           :
           :
=============================
2018-10-12T03:30:19.836
topic: topic1
partition: 0
key: null
value: my_value-18
offset: 18
partition offset: 10
=============================
2018-10-12T03:30:19.837
topic: topic1
partition: 0
key: null
value: my_value-19
offset: 19
partition offset: 10
=============================
[record size] 10
=============================
2018-10-12T03:30:20.844
topic: topic1
partition: 0
key: null
value: my_value-20
offset: 20
partition offset: 20
=============================
2018-10-12T03:30:20.846
topic: topic1
partition: 0
key: null
value: my_value-21
offset: 21
partition offset: 20
           :
           :


試したのはこんなとこです。

サンプルコードは下記に上げました。

Producer
github.com
Consumer
github.com

終わり。