Kafkaで特定のoffsetからconsumeする

Kafkaで特定のoffsetからconsumeするのを試したメモです。


以前までのexampleはすべて、Consumerが最後にcommitしたoffsetからconsumeしてましたが、
Kafkaでは特定のoffsetからconsumeすることが出来るので試してみました。
これによって、同じメッセージを複数回読んだり、特定のメッセージを飛ばしたりすることが出来ます。


下記バージョンで試してみます。

  • kafka 2.0.0
  • kafka-clients 2.0.0
  • zookeeper 3.4.13
  • Ubuntu 16
  • Java 1.8.0_181

Producer

Producerは以前使ったProducerを使用します。

partitionerは特に指定せず(default)、keyをnull、valueを連番でproduceします。
produceしたメッセージのpartition, topic, offset, valueを標準出力します。

ProducerWithDefaultPartition.java

public class ProducerWithDefaultPartition {
    public void syncSend() {
        // configuration
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        IntStream.range(0, 100)
                .forEach(i -> {
                    String value = "num" + i;
                    ProducerRecord<String, String> record = new ProducerRecord<>("beginning",  value);
                    try {
                        // sync send
                        Future<RecordMetadata> send = producer.send(record);
                        RecordMetadata recordMetadata = send.get();
                        System.out.print("partition: " + recordMetadata.partition() + ", ");
                        System.out.print("topic: " + recordMetadata.topic() + ", ");
                        System.out.print("offset: " + recordMetadata.offset() + ", ");
                        System.out.println("value: " + value);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
    }
}

Consumer

一番最初のoffsetからの読み込みと、特定のoffsetからの読み込みを試してみます。

seekToBeginning

一番最初のoffsetから読む場合は、KafkaConsumerにSeekToBeginning()という専用のメソッドが用意してあります。
同様に一番最後のoffsetから読むためにseekToEnd()という専用のメソッドも用意されてます。

http://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Consumerのサンプルコードは下記です。

SeekToBeginningConsumer.java

public class SeekToBeginningConsumer {
    public void seekToStart() {
        // configuration
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "seekToBeginningGroup");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        List<TopicPartition> topicPartitions = new ArrayList<>();
        for (PartitionInfo partitionInfo : consumer.partitionsFor("beginning")) {
            topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
        }

        // seek from first
        consumer.assign(topicPartitions);
        consumer.seekToBeginning(consumer.assignment());

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1_000));

                records.forEach(record -> {
                    System.out.println("partition: " + record.partition() +
                            ", topic: " + record.topic() +
                            ", offset: " + record.offset() +
                            ", key: " + record.key() +
                            ", value: " + record.value());
                });
            }
        } finally {
            consumer.close();
        }
    }
}

partitionsFor()でtopic名を指定して、partitionのmetadataを取得し、
TopicPartitionのListを作成します。

        List<TopicPartition> topicPartitions = new ArrayList<>();
        for (PartitionInfo partitionInfo : consumer.partitionsFor("beginning")) {
            topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
        }

assign()でpartitionを手動でassign出来ます。
TopicPartitionのListを指定します。
seekToBeginning()でTopicPartitionのCollectionを指定することで、最初のoffsetから読み込みます。
ここではassignment()でconsumerにassignされた全partitionのSetを指定してます。
(手動でassignしているので、consumer.assignment()で取得されるpartitionはtopicPartitionsと同じ)

        // seek from first
        consumer.assign(topicPartitions);
        consumer.seekToBeginning(consumer.assignment());
確認

実行して確認してみます。

Producerログ

はじめにConsumerを起動せずにProducerだけ実行します。
num0~num99までのvalueを送信します。

partition: 0, topic: beginning, offset: 0, value: num0
partition: 0, topic: beginning, offset: 1, value: num1
partition: 0, topic: beginning, offset: 2, value: num2
partition: 0, topic: beginning, offset: 3, value: num3
partition: 0, topic: beginning, offset: 4, value: num4
partition: 0, topic: beginning, offset: 5, value: num5
partition: 0, topic: beginning, offset: 6, value: num6
partition: 0, topic: beginning, offset: 7, value: num7
partition: 0, topic: beginning, offset: 8, value: num8
partition: 0, topic: beginning, offset: 9, value: num9
                        :
                        :
                        :
                        :
partition: 0, topic: beginning, offset: 90, value: num90
partition: 0, topic: beginning, offset: 91, value: num91
partition: 0, topic: beginning, offset: 92, value: num92
partition: 0, topic: beginning, offset: 93, value: num93
partition: 0, topic: beginning, offset: 94, value: num94
partition: 0, topic: beginning, offset: 95, value: num95
partition: 0, topic: beginning, offset: 96, value: num96
partition: 0, topic: beginning, offset: 97, value: num97
partition: 0, topic: beginning, offset: 98, value: num98
partition: 0, topic: beginning, offset: 99, value: num99

Consumerログ

Producerを実行したあとに、Consumerを実行してみます。
最初のoffsetからメッセージを読み込めているのが分かります。
何度実行しても最初から読み込まれます。

partition: 0, topic: beginning, offset: 0, key: null, value: num0
partition: 0, topic: beginning, offset: 1, key: null, value: num1
partition: 0, topic: beginning, offset: 2, key: null, value: num2
partition: 0, topic: beginning, offset: 3, key: null, value: num3
partition: 0, topic: beginning, offset: 4, key: null, value: num4
partition: 0, topic: beginning, offset: 5, key: null, value: num5
partition: 0, topic: beginning, offset: 6, key: null, value: num6
partition: 0, topic: beginning, offset: 7, key: null, value: num7
partition: 0, topic: beginning, offset: 8, key: null, value: num8
partition: 0, topic: beginning, offset: 9, key: null, value: num9
                        :
                        :
                        :
                        :
partition: 0, topic: beginning, offset: 90, key: null, value: num90
partition: 0, topic: beginning, offset: 91, key: null, value: num91
partition: 0, topic: beginning, offset: 92, key: null, value: num92
partition: 0, topic: beginning, offset: 93, key: null, value: num93
partition: 0, topic: beginning, offset: 94, key: null, value: num94
partition: 0, topic: beginning, offset: 95, key: null, value: num95
partition: 0, topic: beginning, offset: 96, key: null, value: num96
partition: 0, topic: beginning, offset: 97, key: null, value: num97
partition: 0, topic: beginning, offset: 98, key: null, value: num98
partition: 0, topic: beginning, offset: 99, key: null, value: num99
seekToAnyOffset

特定のoffsetからメッセージを読み込んでみます。
KafkaConsumerにseek()というメソッドがあるので、partitionとoffsetを指定することで任意のoffsetから読むことが出来ます。

Consumerのサンプルコードは下記です。

SeekToAnyOffsetConsumer.java

public class SeekToAnyOffsetConsumer {
    public void seekToAynOffset() {
        // configuration
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "seekToAynOffsetGroup");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        List<TopicPartition> topicPartitions = new ArrayList<>();
        for (PartitionInfo partitionInfo : consumer.partitionsFor("any")) {
            topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
        }

        // specify offset
        consumer.assign(topicPartitions);
        for (TopicPartition partition : topicPartitions) {
            consumer.seek(partition, 50);
        }

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1_000));

                records.forEach(record -> {
                    System.out.println("partition: " + record.partition() +
                            ", topic: " + record.topic() +
                            ", offset: " + record.offset() +
                            ", key: " + record.key() +
                            ", value: " + record.value());
                });
            }
        } finally {
            consumer.close();
        }
    }
}

 
TopicPartitionのListを作成するところまでは先程と一緒です。
そのあと、各partitionに対してseek()でpartitionと読み始めるoffsetを指定します。
次のpoll()で指定されたoffsetから読み始めます。
今回はoffset 50から読むように指定してみます。

        // specify offset
        consumer.assign(topicPartitions);
        for (TopicPartition partition : topicPartitions) {
            consumer.seek(partition, 50);
        }
確認

Producerログ

先程と同じです。
はじめにConsumerを起動せずにProducerだけ実行します。
num0~num99までのvalueを送信します。

partition: 0, topic: any, offset: 0, value: num0
partition: 0, topic: any, offset: 1, value: num1
partition: 0, topic: any, offset: 2, value: num2
partition: 0, topic: any, offset: 3, value: num3
partition: 0, topic: any, offset: 4, value: num4
partition: 0, topic: any, offset: 5, value: num5
partition: 0, topic: any, offset: 6, value: num6
partition: 0, topic: any, offset: 7, value: num7
partition: 0, topic: any, offset: 8, value: num8
partition: 0, topic: any, offset: 9, value: num9
                        :
                        :
                        :
                        :
partition: 0, topic: any, offset: 90, value: num90
partition: 0, topic: any, offset: 91, value: num91
partition: 0, topic: any, offset: 92, value: num92
partition: 0, topic: any, offset: 93, value: num93
partition: 0, topic: any, offset: 94, value: num94
partition: 0, topic: any, offset: 95, value: num95
partition: 0, topic: any, offset: 96, value: num96
partition: 0, topic: any, offset: 97, value: num97
partition: 0, topic: any, offset: 98, value: num98
partition: 0, topic: any, offset: 99, value: num99


Consumerログ

Producerを実行したあとに、Consumerを実行してみます。
offset 0~49のメッセージは飛ばされ、offset: 50のメッセージから読み込みが開始されているのが分かります。

partition: 0, topic: any, offset: 50, key: null, value: num50
partition: 0, topic: any, offset: 51, key: null, value: num51
partition: 0, topic: any, offset: 52, key: null, value: num52
partition: 0, topic: any, offset: 53, key: null, value: num53
partition: 0, topic: any, offset: 54, key: null, value: num54
partition: 0, topic: any, offset: 55, key: null, value: num55
partition: 0, topic: any, offset: 56, key: null, value: num56
partition: 0, topic: any, offset: 57, key: null, value: num57
partition: 0, topic: any, offset: 58, key: null, value: num58
partition: 0, topic: any, offset: 59, key: null, value: num59
                        :
                        :
                        :
                        :
partition: 0, topic: any, offset: 90, key: null, value: num90
partition: 0, topic: any, offset: 91, key: null, value: num91
partition: 0, topic: any, offset: 92, key: null, value: num92
partition: 0, topic: any, offset: 93, key: null, value: num93
partition: 0, topic: any, offset: 94, key: null, value: num94
partition: 0, topic: any, offset: 95, key: null, value: num95
partition: 0, topic: any, offset: 96, key: null, value: num96
partition: 0, topic: any, offset: 97, key: null, value: num97
partition: 0, topic: any, offset: 98, key: null, value: num98
partition: 0, topic: any, offset: 99, key: null, value: num99


終わり。


【参考】
https://www.programcreek.com/java-api-examples/?class=org.apache.kafka.clients.consumer.KafkaConsumer&method=seekToBeginning


ソースコードは下記にあげました。

Producer
github.com

Consumer
github.com