Kafkaで特定のoffsetからconsumeする
Kafkaで特定のoffsetからconsumeするのを試したメモです。
以前までのexampleはすべて、Consumerが最後にcommitしたoffsetからconsumeしてましたが、
Kafkaでは特定のoffsetからconsumeすることが出来るので試してみました。
これによって、同じメッセージを複数回読んだり、特定のメッセージを飛ばしたりすることが出来ます。
下記バージョンで試してみます。
Producer
Producerは以前使ったProducerを使用します。
partitionerは特に指定せず(default)、keyをnull、valueを連番でproduceします。
produceしたメッセージのpartition, topic, offset, valueを標準出力します。
ProducerWithDefaultPartition.java
public class ProducerWithDefaultPartition { public void syncSend() { // configuration Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); IntStream.range(0, 100) .forEach(i -> { String value = "num" + i; ProducerRecord<String, String> record = new ProducerRecord<>("beginning", value); try { // sync send Future<RecordMetadata> send = producer.send(record); RecordMetadata recordMetadata = send.get(); System.out.print("partition: " + recordMetadata.partition() + ", "); System.out.print("topic: " + recordMetadata.topic() + ", "); System.out.print("offset: " + recordMetadata.offset() + ", "); System.out.println("value: " + value); } catch (Exception e) { e.printStackTrace(); } }); } }
Consumer
一番最初のoffsetからの読み込みと、特定のoffsetからの読み込みを試してみます。
seekToBeginning
一番最初のoffsetから読む場合は、KafkaConsumerにSeekToBeginning()という専用のメソッドが用意してあります。
同様に一番最後のoffsetから読むためにseekToEnd()という専用のメソッドも用意されてます。
http://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
Consumerのサンプルコードは下記です。
SeekToBeginningConsumer.java
public class SeekToBeginningConsumer { public void seekToStart() { // configuration Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "seekToBeginningGroup"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); List<TopicPartition> topicPartitions = new ArrayList<>(); for (PartitionInfo partitionInfo : consumer.partitionsFor("beginning")) { topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); } // seek from first consumer.assign(topicPartitions); consumer.seekToBeginning(consumer.assignment()); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1_000)); records.forEach(record -> { System.out.println("partition: " + record.partition() + ", topic: " + record.topic() + ", offset: " + record.offset() + ", key: " + record.key() + ", value: " + record.value()); }); } } finally { consumer.close(); } } }
partitionsFor()でtopic名を指定して、partitionのmetadataを取得し、
TopicPartitionのListを作成します。
List<TopicPartition> topicPartitions = new ArrayList<>(); for (PartitionInfo partitionInfo : consumer.partitionsFor("beginning")) { topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); }
assign()でpartitionを手動でassign出来ます。
TopicPartitionのListを指定します。
seekToBeginning()でTopicPartitionのCollectionを指定することで、最初のoffsetから読み込みます。
ここではassignment()でconsumerにassignされた全partitionのSetを指定してます。
(手動でassignしているので、consumer.assignment()で取得されるpartitionはtopicPartitionsと同じ)
// seek from first
consumer.assign(topicPartitions);
consumer.seekToBeginning(consumer.assignment());
確認
実行して確認してみます。
Producerログ
はじめにConsumerを起動せずにProducerだけ実行します。
num0~num99までのvalueを送信します。
partition: 0, topic: beginning, offset: 0, value: num0 partition: 0, topic: beginning, offset: 1, value: num1 partition: 0, topic: beginning, offset: 2, value: num2 partition: 0, topic: beginning, offset: 3, value: num3 partition: 0, topic: beginning, offset: 4, value: num4 partition: 0, topic: beginning, offset: 5, value: num5 partition: 0, topic: beginning, offset: 6, value: num6 partition: 0, topic: beginning, offset: 7, value: num7 partition: 0, topic: beginning, offset: 8, value: num8 partition: 0, topic: beginning, offset: 9, value: num9 : : : : partition: 0, topic: beginning, offset: 90, value: num90 partition: 0, topic: beginning, offset: 91, value: num91 partition: 0, topic: beginning, offset: 92, value: num92 partition: 0, topic: beginning, offset: 93, value: num93 partition: 0, topic: beginning, offset: 94, value: num94 partition: 0, topic: beginning, offset: 95, value: num95 partition: 0, topic: beginning, offset: 96, value: num96 partition: 0, topic: beginning, offset: 97, value: num97 partition: 0, topic: beginning, offset: 98, value: num98 partition: 0, topic: beginning, offset: 99, value: num99
Consumerログ
Producerを実行したあとに、Consumerを実行してみます。
最初のoffsetからメッセージを読み込めているのが分かります。
何度実行しても最初から読み込まれます。
partition: 0, topic: beginning, offset: 0, key: null, value: num0 partition: 0, topic: beginning, offset: 1, key: null, value: num1 partition: 0, topic: beginning, offset: 2, key: null, value: num2 partition: 0, topic: beginning, offset: 3, key: null, value: num3 partition: 0, topic: beginning, offset: 4, key: null, value: num4 partition: 0, topic: beginning, offset: 5, key: null, value: num5 partition: 0, topic: beginning, offset: 6, key: null, value: num6 partition: 0, topic: beginning, offset: 7, key: null, value: num7 partition: 0, topic: beginning, offset: 8, key: null, value: num8 partition: 0, topic: beginning, offset: 9, key: null, value: num9 : : : : partition: 0, topic: beginning, offset: 90, key: null, value: num90 partition: 0, topic: beginning, offset: 91, key: null, value: num91 partition: 0, topic: beginning, offset: 92, key: null, value: num92 partition: 0, topic: beginning, offset: 93, key: null, value: num93 partition: 0, topic: beginning, offset: 94, key: null, value: num94 partition: 0, topic: beginning, offset: 95, key: null, value: num95 partition: 0, topic: beginning, offset: 96, key: null, value: num96 partition: 0, topic: beginning, offset: 97, key: null, value: num97 partition: 0, topic: beginning, offset: 98, key: null, value: num98 partition: 0, topic: beginning, offset: 99, key: null, value: num99
seekToAnyOffset
特定のoffsetからメッセージを読み込んでみます。
KafkaConsumerにseek()というメソッドがあるので、partitionとoffsetを指定することで任意のoffsetから読むことが出来ます。
Consumerのサンプルコードは下記です。
SeekToAnyOffsetConsumer.java
public class SeekToAnyOffsetConsumer { public void seekToAynOffset() { // configuration Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "seekToAynOffsetGroup"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); List<TopicPartition> topicPartitions = new ArrayList<>(); for (PartitionInfo partitionInfo : consumer.partitionsFor("any")) { topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); } // specify offset consumer.assign(topicPartitions); for (TopicPartition partition : topicPartitions) { consumer.seek(partition, 50); } try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1_000)); records.forEach(record -> { System.out.println("partition: " + record.partition() + ", topic: " + record.topic() + ", offset: " + record.offset() + ", key: " + record.key() + ", value: " + record.value()); }); } } finally { consumer.close(); } } }
TopicPartitionのListを作成するところまでは先程と一緒です。
そのあと、各partitionに対してseek()でpartitionと読み始めるoffsetを指定します。
次のpoll()で指定されたoffsetから読み始めます。
今回はoffset 50から読むように指定してみます。
// specify offset consumer.assign(topicPartitions); for (TopicPartition partition : topicPartitions) { consumer.seek(partition, 50); }
確認
Producerログ
先程と同じです。
はじめにConsumerを起動せずにProducerだけ実行します。
num0~num99までのvalueを送信します。
partition: 0, topic: any, offset: 0, value: num0 partition: 0, topic: any, offset: 1, value: num1 partition: 0, topic: any, offset: 2, value: num2 partition: 0, topic: any, offset: 3, value: num3 partition: 0, topic: any, offset: 4, value: num4 partition: 0, topic: any, offset: 5, value: num5 partition: 0, topic: any, offset: 6, value: num6 partition: 0, topic: any, offset: 7, value: num7 partition: 0, topic: any, offset: 8, value: num8 partition: 0, topic: any, offset: 9, value: num9 : : : : partition: 0, topic: any, offset: 90, value: num90 partition: 0, topic: any, offset: 91, value: num91 partition: 0, topic: any, offset: 92, value: num92 partition: 0, topic: any, offset: 93, value: num93 partition: 0, topic: any, offset: 94, value: num94 partition: 0, topic: any, offset: 95, value: num95 partition: 0, topic: any, offset: 96, value: num96 partition: 0, topic: any, offset: 97, value: num97 partition: 0, topic: any, offset: 98, value: num98 partition: 0, topic: any, offset: 99, value: num99
Consumerログ
Producerを実行したあとに、Consumerを実行してみます。
offset 0~49のメッセージは飛ばされ、offset: 50のメッセージから読み込みが開始されているのが分かります。
partition: 0, topic: any, offset: 50, key: null, value: num50 partition: 0, topic: any, offset: 51, key: null, value: num51 partition: 0, topic: any, offset: 52, key: null, value: num52 partition: 0, topic: any, offset: 53, key: null, value: num53 partition: 0, topic: any, offset: 54, key: null, value: num54 partition: 0, topic: any, offset: 55, key: null, value: num55 partition: 0, topic: any, offset: 56, key: null, value: num56 partition: 0, topic: any, offset: 57, key: null, value: num57 partition: 0, topic: any, offset: 58, key: null, value: num58 partition: 0, topic: any, offset: 59, key: null, value: num59 : : : : partition: 0, topic: any, offset: 90, key: null, value: num90 partition: 0, topic: any, offset: 91, key: null, value: num91 partition: 0, topic: any, offset: 92, key: null, value: num92 partition: 0, topic: any, offset: 93, key: null, value: num93 partition: 0, topic: any, offset: 94, key: null, value: num94 partition: 0, topic: any, offset: 95, key: null, value: num95 partition: 0, topic: any, offset: 96, key: null, value: num96 partition: 0, topic: any, offset: 97, key: null, value: num97 partition: 0, topic: any, offset: 98, key: null, value: num98 partition: 0, topic: any, offset: 99, key: null, value: num99
終わり。
ソースコードは下記にあげました。
Producer
github.com
Consumer
github.com