KafkaのConsumer Groupを試す

kafkaのConsumer Groupを試してみたメモです。

前回までは1つのconsumerで試していたので、
複数のconsumerと、複数のConsumer Groupのパターンを試してみました。

Consumer Group

Consumer Groupについての説明は下記の公式ドキュメントに記載があります。
https://kafka.apache.org/documentation/#intro_consumers

https://kafka.apache.org/20/images/consumer-groups.png

この図は公式に載っている
topic, partition, consumer, consumer groupの関係の図です。

複雑に見えますが、以下の認識です。

  • consumerは1つのConsumer Groupに属する。
  • Consumer Groupには複数のconsumerを含めることが出来る。
  • topicのメッセージはConsumer Group単位で読み込む。
    (topicのメッセージは複数のConsumer Groupが読み込むことが出来る)
  • topicの各partitionのメッセージはConsumer Group内の1つのconsumerだけが読むことが出来る。
    (同一のpartitionをConsumer Groupの複数のconsumerが読むことは出来ない)
  • Consumer Group内のconsumerはtopicの1つ以上のpartitionを読み込むことが出来る。

 

1つのConsumer Groupに複数のconsumerがある場合と、
複数のConsumer Groupに複数のconsumerがある場合の挙動を確認してみます。


下記バージョンで試してみます。

  • kafka 2.0.0
  • kafka-clients 2.0.0
  • zookeeper 3.4.13
  • Ubuntu 16
  • Java 1.8.0_181

設定

Brokerの設定を変更し、partition数を4つに設定します。
zookeeperとKafkaを起動します。

server.properties

$ sudo vi /usr/local/kafka/config/server.properties
num.partitions=4

Producer

Producerは以前使った、Producerを使用します。

partitionerは特に指定せず(default)、keyをnull、valueを連番でproduceしてみます。
produceしたメッセージのpartition, topic, valueを標準出力します。

ProducerWithDefaultPartition.java

    public void syncSend() {
        // configuration
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        IntStream.range(0, 100)
                .forEach(i -> {
                    String value = "num" + i;
                    ProducerRecord<String, String> record = new ProducerRecord<>("topicPar",  value);
                    try {
                        // sync send
                        Future<RecordMetadata> send = producer.send(record);
                        RecordMetadata recordMetadata = send.get();
                        System.out.print("partition: " + recordMetadata.partition() + ", ");
                        System.out.print("topic: " + recordMetadata.topic() + ", ");
                        System.out.println("value: " + value);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
    }

Consumer

Consumerは1スレッドで複数起動することができないので、
マルチスレッドでconsumeするためのConsumeTaskを作成します。

生成時にconsumer group名、consumer名、topic名を指定します。

consumer group名を指定しない場合は、固定のconsumer group名にします。

読み込んだメッセージのconsumer group、consumer, partition, topic, key, valueを標準出力します。

ConsumeTask.java

package com.example.producer.client.kafka.consumergroup;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumeTask implements Runnable {
    private final KafkaConsumer<String, String> consumer;
    private final String consumerGroupName;
    private final String consumerName;
    private final String topic;

    public ConsumeTask(String consumerName, String topic) {
        this("myConsumerGroup", consumerName, topic);
    }

    public ConsumeTask(String consumerGroupName, String consumerName, String topic) {
        this.consumerGroupName = consumerGroupName;
        this.consumerName = consumerName;
        this.topic = topic;

        // configuration
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupName);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<>(properties);
    }

    @Override
    public void run() {
        consumer.subscribe(Collections.singletonList(topic));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1_000));

                // print information about topic record
                records.forEach(record -> {
                    System.out.println("group: " + consumerGroupName
                            + ", consumer: " + consumerName
                            + ", partition: " + record.partition()
                            + ", topic: " + record.topic()
                            + ", key: " + record.key()
                            + ", value: " + record.value()
                    );
                });
            }
        } finally {
            consumer.close();
        }
    }
}

こんな感じでConsumeTaskをExecutorServiceで複数スレッドを生成して呼び出します。
これで複数のconsumerを起動することが出来ます。

    int numConsumers = 2;
    ExecutorService executorService = Executors.newFixedThreadPool(numConsumers);

    IntStream.rangeClosed(1, numConsumers)
            .forEach(i -> {
                ConsumeTask consumeTask = new ConsumeTask("myConsumer" + i, "five");
                executorService.submit(consumeTask);
            });

Consumer × 1

ConsumerGroup内に1つのConsumerのパターン

1つのConsumerGroupで1つのConsumerで試してみます。

結果

Producerログ

keyを指定していないので、0~3の4つのpartitionにラウンドロビンでメッセージが送信されています。

partition: 3, topic: sin, value: num0
partition: 0, topic: sin, value: num1
partition: 2, topic: sin, value: num2
partition: 1, topic: sin, value: num3
partition: 3, topic: sin, value: num4
partition: 0, topic: sin, value: num5
partition: 2, topic: sin, value: num6
partition: 1, topic: sin, value: num7
partition: 3, topic: sin, value: num8
partition: 0, topic: sin, value: num9
               :
               :
partition: 2, topic: sin, value: num90
partition: 1, topic: sin, value: num91
partition: 3, topic: sin, value: num92
partition: 0, topic: sin, value: num93
partition: 2, topic: sin, value: num94
partition: 1, topic: sin, value: num95
partition: 3, topic: sin, value: num96
partition: 0, topic: sin, value: num97
partition: 2, topic: sin, value: num98
partition: 1, topic: sin, value: num99

Consumerログ

1つのConsumerですべてのpartitionからメッセージを読めているのが分かります。

group: myConsumerGroup, consumer: myConsumer, partition: 3, topic: sin, key: null, value: num0
group: myConsumerGroup, consumer: myConsumer, partition: 2, topic: sin, key: null, value: num2
group: myConsumerGroup, consumer: myConsumer, partition: 2, topic: sin, key: null, value: num6
group: myConsumerGroup, consumer: myConsumer, partition: 3, topic: sin, key: null, value: num4
group: myConsumerGroup, consumer: myConsumer, partition: 0, topic: sin, key: null, value: num1
group: myConsumerGroup, consumer: myConsumer, partition: 0, topic: sin, key: null, value: num5
group: myConsumerGroup, consumer: myConsumer, partition: 1, topic: sin, key: null, value: num3
group: myConsumerGroup, consumer: myConsumer, partition: 1, topic: sin, key: null, value: num7
group: myConsumerGroup, consumer: myConsumer, partition: 3, topic: sin, key: null, value: num8
group: myConsumerGroup, consumer: myConsumer, partition: 0, topic: sin, key: null, value: num9
               :
               :
group: myConsumerGroup, consumer: myConsumer, partition: 0, topic: sin, key: null, value: num89
group: myConsumerGroup, consumer: myConsumer, partition: 1, topic: sin, key: null, value: num87
group: myConsumerGroup, consumer: myConsumer, partition: 2, topic: sin, key: null, value: num94
group: myConsumerGroup, consumer: myConsumer, partition: 3, topic: sin, key: null, value: num92
group: myConsumerGroup, consumer: myConsumer, partition: 3, topic: sin, key: null, value: num96
group: myConsumerGroup, consumer: myConsumer, partition: 0, topic: sin, key: null, value: num93
group: myConsumerGroup, consumer: myConsumer, partition: 1, topic: sin, key: null, value: num91
group: myConsumerGroup, consumer: myConsumer, partition: 1, topic: sin, key: null, value: num95
group: myConsumerGroup, consumer: myConsumer, partition: 2, topic: sin, key: null, value: num98
group: myConsumerGroup, consumer: myConsumer, partition: 0, topic: sin, key: null, value: num97
group: myConsumerGroup, consumer: myConsumer, partition: 1, topic: sin, key: null, value: num99

Consumer × 2

ConsumerGroup内に2つのConsumerのパターン

1つのConsumerGroupに2つのConsumerで試してみます。

結果

Producerログ

先程と同じ。
keyを指定していないので、0~3の4つのpartitionにラウンドロビンでメッセージが送信されています。

partition: 0, topic: two, value: num0
partition: 2, topic: two, value: num1
partition: 1, topic: two, value: num2
partition: 3, topic: two, value: num3
partition: 0, topic: two, value: num4
partition: 2, topic: two, value: num5
partition: 1, topic: two, value: num6
partition: 3, topic: two, value: num7
partition: 0, topic: two, value: num8
partition: 2, topic: two, value: num9
               :
               :
partition: 1, topic: two, value: num90
partition: 3, topic: two, value: num91
partition: 0, topic: two, value: num92
partition: 2, topic: two, value: num93
partition: 1, topic: two, value: num94
partition: 3, topic: two, value: num95
partition: 0, topic: two, value: num96
partition: 2, topic: two, value: num97
partition: 1, topic: two, value: num98
partition: 3, topic: two, value: num99

Consumerログ

myConsumer1はpartition0とpartition1からメッセージを読み込み、
myConsumer2はpartition2とpartition3からメッセージを読み込んでいるのが分かります。

group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: two, key: null, value: num0
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: two, key: null, value: num1
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: two, key: null, value: num2
group: myConsumerGroup, consumer: myConsumer2, partition: 3, topic: two, key: null, value: num3
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: two, key: null, value: num4
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: two, key: null, value: num5
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: two, key: null, value: num6
group: myConsumerGroup, consumer: myConsumer2, partition: 3, topic: two, key: null, value: num7
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: two, key: null, value: num8
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: two, key: null, value: num9
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: two, key: null, value: num10
group: myConsumerGroup, consumer: myConsumer2, partition: 3, topic: two, key: null, value: num11
               :
               :
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: two, key: null, value: num88
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: two, key: null, value: num89
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: two, key: null, value: num90
group: myConsumerGroup, consumer: myConsumer2, partition: 3, topic: two, key: null, value: num91
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: two, key: null, value: num92
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: two, key: null, value: num93
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: two, key: null, value: num94
group: myConsumerGroup, consumer: myConsumer2, partition: 3, topic: two, key: null, value: num95
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: two, key: null, value: num96
group: myConsumerGroup, consumer: myConsumer2, partition: 2, topic: two, key: null, value: num97
group: myConsumerGroup, consumer: myConsumer1, partition: 1, topic: two, key: null, value: num98
group: myConsumerGroup, consumer: myConsumer2, partition: 3, topic: two, key: null, value: num99

Consumer × 4

ConsumerGroup内に4つのConsumerのパターン

1つのConsumerGroupに4つのConsumerで試してみます。
partition数と同じConsumerの数です。

結果

Producerログ

先程と同じ。
keyを指定していないので、0~3の4つのpartitionにラウンドロビンでメッセージが送信されています。

partition: 2, topic: four, value: num0
partition: 1, topic: four, value: num1
partition: 3, topic: four, value: num2
partition: 0, topic: four, value: num3
partition: 2, topic: four, value: num4
partition: 1, topic: four, value: num5
partition: 3, topic: four, value: num6
partition: 0, topic: four, value: num7
partition: 2, topic: four, value: num8
partition: 1, topic: four, value: num9
               :
               :
partition: 3, topic: four, value: num90
partition: 0, topic: four, value: num91
partition: 2, topic: four, value: num92
partition: 1, topic: four, value: num93
partition: 3, topic: four, value: num94
partition: 0, topic: four, value: num95
partition: 2, topic: four, value: num96
partition: 1, topic: four, value: num97
partition: 3, topic: four, value: num98
partition: 0, topic: four, value: num99

Consumerログ

myConsumer1はpartition0,
myConsumer2はpartition1,
myConsumer3はpartition2,
myConsumer4はpartition3からそれぞれメッセージを読み込んでいるのが分かります。
それぞれのconsumerがそれぞれ1つずつpartitionを読み込んでいます。

group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: four, key: null, value: num0
group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: four, key: null, value: num1
group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: four, key: null, value: num2
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: four, key: null, value: num3
group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: four, key: null, value: num4
group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: four, key: null, value: num5
group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: four, key: null, value: num6
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: four, key: null, value: num7
group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: four, key: null, value: num8
group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: four, key: null, value: num9
group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: four, key: null, value: num10
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: four, key: null, value: num11
               :
               :
group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: four, key: null, value: num90
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: four, key: null, value: num91
group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: four, key: null, value: num92
group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: four, key: null, value: num93
group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: four, key: null, value: num94
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: four, key: null, value: num95
group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: four, key: null, value: num96
group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: four, key: null, value: num97
group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: four, key: null, value: num98
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: four, key: null, value: num99

Consumer × 5

ConsumerGroup内に5つのConsumerのパターン

1つのConsumerGroupに4つのConsumerで試してみます。
partition数よりもconsumerの数が1つが多いです。

結果

Producerログ

先程と同じ。
keyを指定していないので、0~3の4つのpartitionにラウンドロビンでメッセージが送信されています。

partition: 3, topic: five, value: num0
partition: 0, topic: five, value: num1
partition: 2, topic: five, value: num2
partition: 1, topic: five, value: num3
partition: 3, topic: five, value: num4
partition: 0, topic: five, value: num5
partition: 2, topic: five, value: num6
partition: 1, topic: five, value: num7
partition: 3, topic: five, value: num8
partition: 0, topic: five, value: num9
               :
               :
partition: 2, topic: five, value: num90
partition: 1, topic: five, value: num91
partition: 3, topic: five, value: num92
partition: 0, topic: five, value: num93
partition: 2, topic: five, value: num94
partition: 1, topic: five, value: num95
partition: 3, topic: five, value: num96
partition: 0, topic: five, value: num97
partition: 2, topic: five, value: num98
partition: 1, topic: five, value: num99

Consumerログ

myConsumer1はpartition0,
myConsumer2はpartition1,
myConsumer3はpartition2,
myConsumer4はpartition3からそれぞれメッセージを読み込んでいるのが分かります。
myConsumer5は何も読めないので、持て余していることが分かります。

group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: five, key: null, value: num0
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: five, key: null, value: num1
group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: five, key: null, value: num2
group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: five, key: null, value: num3
group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: five, key: null, value: num4
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: five, key: null, value: num5
group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: five, key: null, value: num6
group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: five, key: null, value: num7
group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: five, key: null, value: num8
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: five, key: null, value: num9
group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: five, key: null, value: num10
group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: five, key: null, value: num11
               :
               :
group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: five, key: null, value: num90
group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: five, key: null, value: num91
group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: five, key: null, value: num92
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: five, key: null, value: num93
group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: five, key: null, value: num94
group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: five, key: null, value: num95
group: myConsumerGroup, consumer: myConsumer4, partition: 3, topic: five, key: null, value: num96
group: myConsumerGroup, consumer: myConsumer1, partition: 0, topic: five, key: null, value: num97
group: myConsumerGroup, consumer: myConsumer3, partition: 2, topic: five, key: null, value: num98
group: myConsumerGroup, consumer: myConsumer2, partition: 1, topic: five, key: null, value: num99

2 Group (Consumer × 2, Consumer × 4)

2つのConsumerGroup。
ConsumerGroup1には2つのConsumer、
ConsumerGroup2には4つのConsumer
のパターン。

https://kafka.apache.org/20/images/consumer-groups.png
この図に近い形です。(Brokerのサーバは別れていませんが)

ConsumerGroupを2つ作成して、それぞれのgroupからconsumeしてみます。

結果

Producerログ

先程と同じ。
keyを指定していないので、0~3の4つのpartitionにラウンドロビンでメッセージが送信されています。

partition: 3, topic: gtopic, value: num0
partition: 0, topic: gtopic, value: num1
partition: 2, topic: gtopic, value: num2
partition: 1, topic: gtopic, value: num3
partition: 3, topic: gtopic, value: num4
partition: 0, topic: gtopic, value: num5
partition: 2, topic: gtopic, value: num6
partition: 1, topic: gtopic, value: num7
partition: 3, topic: gtopic, value: num8
partition: 0, topic: gtopic, value: num9
               :
               :
partition: 2, topic: gtopic, value: num90
partition: 1, topic: gtopic, value: num91
partition: 3, topic: gtopic, value: num92
partition: 0, topic: gtopic, value: num93
partition: 2, topic: gtopic, value: num94
partition: 1, topic: gtopic, value: num95
partition: 3, topic: gtopic, value: num96
partition: 0, topic: gtopic, value: num97
partition: 2, topic: gtopic, value: num98
partition: 1, topic: gtopic, value: num99

Consumerログ

group1

consumer1はpartition0とpartition1からメッセージを読み込み、
consumer2はpartition2とpartition3からメッセージを読み込んでいるのが分かります。

group: group1, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num1
group: group1, consumer: consumer2, partition: 3, topic: gtopic, key: null, value: num0
group: group1, consumer: consumer2, partition: 2, topic: gtopic, key: null, value: num2
group: group1, consumer: consumer1, partition: 1, topic: gtopic, key: null, value: num3
group: group1, consumer: consumer2, partition: 3, topic: gtopic, key: null, value: num4
group: group1, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num5
group: group1, consumer: consumer2, partition: 2, topic: gtopic, key: null, value: num6
group: group1, consumer: consumer1, partition: 1, topic: gtopic, key: null, value: num7
group: group1, consumer: consumer2, partition: 3, topic: gtopic, key: null, value: num8
group: group1, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num9
group: group1, consumer: consumer2, partition: 2, topic: gtopic, key: null, value: num10
group: group1, consumer: consumer1, partition: 1, topic: gtopic, key: null, value: num11
               :
               :
group: group1, consumer: consumer2, partition: 2, topic: gtopic, key: null, value: num90
group: group1, consumer: consumer1, partition: 1, topic: gtopic, key: null, value: num91
group: group1, consumer: consumer2, partition: 3, topic: gtopic, key: null, value: num92
group: group1, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num93
group: group1, consumer: consumer2, partition: 2, topic: gtopic, key: null, value: num94
group: group1, consumer: consumer1, partition: 1, topic: gtopic, key: null, value: num95
group: group1, consumer: consumer2, partition: 3, topic: gtopic, key: null, value: num96
group: group1, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num97
group: group1, consumer: consumer2, partition: 2, topic: gtopic, key: null, value: num98
group: group1, consumer: consumer1, partition: 1, topic: gtopic, key: null, value: num99


group2

consumer1はpartition0,
consumer2はpartition1,
consumer3はpartition2,
consumer4はpartition3からそれぞれメッセージを読み込んでいるのが分かります。

group: group2, consumer: consumer4, partition: 3, topic: gtopic, key: null, value: num0
group: group2, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num1
group: group2, consumer: consumer3, partition: 2, topic: gtopic, key: null, value: num2
group: group2, consumer: consumer2, partition: 1, topic: gtopic, key: null, value: num3
group: group2, consumer: consumer4, partition: 3, topic: gtopic, key: null, value: num4
group: group2, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num5
group: group2, consumer: consumer3, partition: 2, topic: gtopic, key: null, value: num6
group: group2, consumer: consumer2, partition: 1, topic: gtopic, key: null, value: num7
group: group2, consumer: consumer4, partition: 3, topic: gtopic, key: null, value: num8
group: group2, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num9
group: group2, consumer: consumer3, partition: 2, topic: gtopic, key: null, value: num10
group: group2, consumer: consumer2, partition: 1, topic: gtopic, key: null, value: num11
               :
               :
group: group2, consumer: consumer3, partition: 2, topic: gtopic, key: null, value: num90
group: group2, consumer: consumer2, partition: 1, topic: gtopic, key: null, value: num91
group: group2, consumer: consumer4, partition: 3, topic: gtopic, key: null, value: num92
group: group2, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num93
group: group2, consumer: consumer3, partition: 2, topic: gtopic, key: null, value: num94
group: group2, consumer: consumer2, partition: 1, topic: gtopic, key: null, value: num95
group: group2, consumer: consumer4, partition: 3, topic: gtopic, key: null, value: num96
group: group2, consumer: consumer1, partition: 0, topic: gtopic, key: null, value: num97
group: group2, consumer: consumer3, partition: 2, topic: gtopic, key: null, value: num98
group: group2, consumer: consumer2, partition: 1, topic: gtopic, key: null, value: num99

 
確認したのはこんなとこです。
 
 
【参考】
consumeタスクはこのあたりを参考にしました。
https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/
 

サンプルコードは下記に置きました。

Producer
github.com

Consumer
github.com


おわり。