KafkaのCustom Partitionerを試す

kafkaのcustom partitionerを試してみたメモです。


前回kafka java clientを試した際は、partitionを1で試していたので、
複数partitionを設定した時の挙動を確認してみました。

partitionerを定義すると、メッセージを各partitionに自由なルールで振り分ける事が出来るので、
defaultのpartitionerとcustom partitionerを試してみようと思います。

下記バージョンで試してみます。

  • kafka 2.0.0
  • kafka-clients 2.0.0
  • zookeeper 3.4.13
  • Ubuntu 16
  • Java 1.8.0_181

partition数

brokerの設定を変更し、partition数を2に設定します。

server.properties

$ sudo vi /usr/local/kafka/config/server.properties
num.partitions=2

Default Partitioner

Partitionerを指定せずにproduceすると、defaultのpartitionerが使用されます。
keyを指定せずにnullでメッセージを送信するとラウンドロビンで振り分けられます。

Producer

Producerを作成します。
partitionerは特に指定せず、keyをnull、valueを連番で送信してみます。
送信したメッセージのpartition, topic, valueを標準出力します。

ProducerWithDefaultPartition.java

public class ProducerWithDefaultPartition {
    public void syncSend() {
        // configuration
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        IntStream.range(0, 100)
                .forEach(i -> {
                    String value = "num" + i;
                    ProducerRecord<String, String> record = new ProducerRecord<>("topicPar",  value);
                    try {
                        // sync send
                        Future<RecordMetadata> send = producer.send(record);
                        RecordMetadata recordMetadata = send.get();
                        System.out.print("partition: " + recordMetadata.partition() + ", ");
                        System.out.print("topic: " + recordMetadata.topic() + ", ");
                        System.out.println("value: " + value);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
    }
}
Consumer

Consumerを作成します。
読み込んだメッセージのpartition, topic, key, valueを標準出力します。

ConsumerWithPartition.java

public class ConsumerWithPartition {
    public void consumeFromEachPartition() {
        // configuration
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myConsumerGroup");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList("topicPar"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1_000));

                records.forEach(record -> {
                    System.out.print("partition: " + record.partition() + ", ");
                    System.out.print("topic: " + record.topic() + ", ");
                    System.out.print("key: " + record.key() + ", " );
                    System.out.println("value: " + record.value());
                });
            }
        } finally {
            consumer.close();
        }
    }
}
実行

実行して確認してみます。

Producerログ

Producerのログを見てみると、2つのpartitionにラウンドロビンで振り分けられていることが分かります。

partition: 0, topic: topicPar, value: num0
partition: 1, topic: topicPar, value: num1
partition: 0, topic: topicPar, value: num2
partition: 1, topic: topicPar, value: num3
partition: 0, topic: topicPar, value: num4
partition: 1, topic: topicPar, value: num5
partition: 0, topic: topicPar, value: num6
partition: 1, topic: topicPar, value: num7
partition: 0, topic: topicPar, value: num8
partition: 1, topic: topicPar, value: num9
partition: 0, topic: topicPar, value: num10
               :
               :
partition: 1, topic: topicPar, value: num91
partition: 0, topic: topicPar, value: num92
partition: 1, topic: topicPar, value: num93
partition: 0, topic: topicPar, value: num94
partition: 1, topic: topicPar, value: num95
partition: 0, topic: topicPar, value: num96
partition: 1, topic: topicPar, value: num97
partition: 0, topic: topicPar, value: num98
partition: 1, topic: topicPar, value: num99

 
Consumerログ

Consumerのログを見てみると、2つのpartitionから読み込めているのが分かります。
1度のpollで、各partitionからまとめてメッセージを取得しているのが分かります。

partition: 0, topic: topicPar, key: null, value: num0
partition: 1, topic: topicPar, key: null, value: num1
partition: 1, topic: topicPar, key: null, value: num3
partition: 1, topic: topicPar, key: null, value: num5
partition: 1, topic: topicPar, key: null, value: num7
partition: 1, topic: topicPar, key: null, value: num9
partition: 0, topic: topicPar, key: null, value: num2
partition: 0, topic: topicPar, key: null, value: num4
partition: 0, topic: topicPar, key: null, value: num6
partition: 0, topic: topicPar, key: null, value: num8
partition: 0, topic: topicPar, key: null, value: num10
               :
               :
partition: 1, topic: topicPar, key: null, value: num87
partition: 1, topic: topicPar, key: null, value: num89
partition: 1, topic: topicPar, key: null, value: num91
partition: 1, topic: topicPar, key: null, value: num93
partition: 0, topic: topicPar, key: null, value: num86
partition: 0, topic: topicPar, key: null, value: num88
partition: 0, topic: topicPar, key: null, value: num90
partition: 0, topic: topicPar, key: null, value: num92
partition: 0, topic: topicPar, key: null, value: num94
partition: 1, topic: topicPar, key: null, value: num95
partition: 1, topic: topicPar, key: null, value: num97
partition: 1, topic: topicPar, key: null, value: num99
partition: 0, topic: topicPar, key: null, value: num96
partition: 0, topic: topicPar, key: null, value: num98

Custom Partitioner

keyを指定するとデフォルトではkeyのハッシュで振り分けされますが、
Partitionerを定義すると、partitionの振り分けのルールを自由に定義できます。

Partitioner

CustomPartitionerを作成してみます。
ランダムなアルファベットのkeyを生成し、1文字目が大文字か小文字かで振り分けるようにしてみます。

Partitionerインターフェースを実装し、partition()メソッドに振り分けのルールを記述します。
keyの1文字目が小文字の場合、partition-0に、
大文字もの場合はpartition-1に振り分けるようにします。

CustomPartitioner.java

public class CustomPartitioner implements Partitioner {
    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value,
                         byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
        int numPartitions = partitionInfos.size();

        if (keyBytes == null || !(key instanceof String)) {
            throw new InvalidRecordException("Unexpected key.");
        }

        String strKey = (String) key;

        // put to partition-0 if key starts with LowerCase
        if (strKey.matches("^[a-z].*")) {
            return numPartitions - 2;
        }
        // put to partition-1 if key starts with UpperCase
        if (strKey.matches("^[A-Z].*")) {
            return numPartitions - 1;
        }

        throw new InvalidRecordException("Unexpected key.");
    }

    @Override
    public void close() {
        System.out.println("close!!");
    }
}
Producer

Producerを作成します。
ランダムなアルファベットのkeyを生成して、メッセージを送信します。

ProducerWithCustomPartition.java

public class ProducerWithCustomPartition {
    public void syncSend() {
        // configuration
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.producer.client.kafka.partitioner.CustomPartitioner");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        IntStream.range(0, 100)
                .forEach(i -> {
                    String key = RandomStringUtils.randomAlphabetic(5);
                    String value = "num" + i;
                    ProducerRecord<String, String> record = new ProducerRecord<>("topicCus", key, value);
                    try {
                        // sync send
                        Future<RecordMetadata> send = producer.send(record);
                        RecordMetadata recordMetadata = send.get();
                        System.out.print("partition: " + recordMetadata.partition() + ", ");
                        System.out.print("topic: " + recordMetadata.topic() + ", ");
                        System.out.print("key: " + key + ", ");
                        System.out.println("value: " + value);

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
    }
}
Consumer

Consumerは先程と同じものを使用。(topic名だけ変更)

実行

実行して確認してみます。

Producerログ

Producerのログを見てみると、keyの1文字目が小文字の場合partition-0に、
大文字もの場合はpartition-1に振り分けられているのが分かります

partition: 1, topic: topicCus, key: EIMTH, value: num0
partition: 0, topic: topicCus, key: oRoTy, value: num1
partition: 0, topic: topicCus, key: pnXBL, value: num2
partition: 0, topic: topicCus, key: cZrHJ, value: num3
partition: 1, topic: topicCus, key: TXAIr, value: num4
partition: 1, topic: topicCus, key: JWNFM, value: num5
partition: 1, topic: topicCus, key: KpTOV, value: num6
partition: 1, topic: topicCus, key: OmVBO, value: num7
partition: 0, topic: topicCus, key: pKjZO, value: num8
partition: 0, topic: topicCus, key: jyvdE, value: num9
partition: 1, topic: topicCus, key: KknrC, value: num10
               :
               :
partition: 0, topic: topicCus, key: zgeIN, value: num90
partition: 0, topic: topicCus, key: hhpKl, value: num91
partition: 0, topic: topicCus, key: bnRdr, value: num92
partition: 0, topic: topicCus, key: okeGP, value: num93
partition: 1, topic: topicCus, key: AnGDo, value: num94
partition: 0, topic: topicCus, key: aUosF, value: num95
partition: 0, topic: topicCus, key: blmjO, value: num96
partition: 0, topic: topicCus, key: gWvaq, value: num97
partition: 0, topic: topicCus, key: ttoMO, value: num98
partition: 0, topic: topicCus, key: sIVGM, value: num99

 
Consumerログ

Consumerのログを見てみると、2つのpartitionから読み込めているのが分かります。
keyの1文字目が小文字の場合partition-0、 大文字もの場合はpartition-1から読めていることが分かります。

partition: 1, topic: topicCus, key: EIMTH, value: num0
partition: 1, topic: topicCus, key: TXAIr, value: num4
partition: 1, topic: topicCus, key: JWNFM, value: num5
partition: 0, topic: topicCus, key: oRoTy, value: num1
partition: 0, topic: topicCus, key: pnXBL, value: num2
partition: 0, topic: topicCus, key: cZrHJ, value: num3
partition: 1, topic: topicCus, key: KpTOV, value: num6
partition: 1, topic: topicCus, key: OmVBO, value: num7
partition: 1, topic: topicCus, key: KknrC, value: num10
partition: 0, topic: topicCus, key: pKjZO, value: num8
partition: 0, topic: topicCus, key: jyvdE, value: num9
               :
               :
partition: 0, topic: topicCus, key: zgeIN, value: num90
partition: 0, topic: topicCus, key: hhpKl, value: num91
partition: 0, topic: topicCus, key: bnRdr, value: num92
partition: 1, topic: topicCus, key: AnGDo, value: num94
partition: 0, topic: topicCus, key: okeGP, value: num93
partition: 0, topic: topicCus, key: aUosF, value: num95
partition: 0, topic: topicCus, key: blmjO, value: num96
partition: 0, topic: topicCus, key: gWvaq, value: num97
partition: 0, topic: topicCus, key: ttoMO, value: num98
partition: 0, topic: topicCus, key: sIVGM, value: num99


sampleは下記にあげました。
github.com

github.com

終わり。