KafkaのCustom Partitionerを試す
kafkaのcustom partitionerを試してみたメモです。
前回kafka java clientを試した際は、partitionを1で試していたので、
複数partitionを設定した時の挙動を確認してみました。
partitionerを定義すると、メッセージを各partitionに自由なルールで振り分ける事が出来るので、
defaultのpartitionerとcustom partitionerを試してみようと思います。
下記バージョンで試してみます。
partition数
brokerの設定を変更し、partition数を2に設定します。
server.properties
$ sudo vi /usr/local/kafka/config/server.properties num.partitions=2
Default Partitioner
Partitionerを指定せずにproduceすると、defaultのpartitionerが使用されます。
keyを指定せずにnullでメッセージを送信するとラウンドロビンで振り分けられます。
Producer
Producerを作成します。
partitionerは特に指定せず、keyをnull、valueを連番で送信してみます。
送信したメッセージのpartition, topic, valueを標準出力します。
ProducerWithDefaultPartition.java
public class ProducerWithDefaultPartition { public void syncSend() { // configuration Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); IntStream.range(0, 100) .forEach(i -> { String value = "num" + i; ProducerRecord<String, String> record = new ProducerRecord<>("topicPar", value); try { // sync send Future<RecordMetadata> send = producer.send(record); RecordMetadata recordMetadata = send.get(); System.out.print("partition: " + recordMetadata.partition() + ", "); System.out.print("topic: " + recordMetadata.topic() + ", "); System.out.println("value: " + value); } catch (Exception e) { e.printStackTrace(); } }); } }
Consumer
Consumerを作成します。
読み込んだメッセージのpartition, topic, key, valueを標準出力します。
ConsumerWithPartition.java
public class ConsumerWithPartition { public void consumeFromEachPartition() { // configuration Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myConsumerGroup"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singletonList("topicPar")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1_000)); records.forEach(record -> { System.out.print("partition: " + record.partition() + ", "); System.out.print("topic: " + record.topic() + ", "); System.out.print("key: " + record.key() + ", " ); System.out.println("value: " + record.value()); }); } } finally { consumer.close(); } } }
実行
実行して確認してみます。
Producerログ
Producerのログを見てみると、2つのpartitionにラウンドロビンで振り分けられていることが分かります。
partition: 0, topic: topicPar, value: num0 partition: 1, topic: topicPar, value: num1 partition: 0, topic: topicPar, value: num2 partition: 1, topic: topicPar, value: num3 partition: 0, topic: topicPar, value: num4 partition: 1, topic: topicPar, value: num5 partition: 0, topic: topicPar, value: num6 partition: 1, topic: topicPar, value: num7 partition: 0, topic: topicPar, value: num8 partition: 1, topic: topicPar, value: num9 partition: 0, topic: topicPar, value: num10 : : partition: 1, topic: topicPar, value: num91 partition: 0, topic: topicPar, value: num92 partition: 1, topic: topicPar, value: num93 partition: 0, topic: topicPar, value: num94 partition: 1, topic: topicPar, value: num95 partition: 0, topic: topicPar, value: num96 partition: 1, topic: topicPar, value: num97 partition: 0, topic: topicPar, value: num98 partition: 1, topic: topicPar, value: num99
Consumerログ
Consumerのログを見てみると、2つのpartitionから読み込めているのが分かります。
1度のpollで、各partitionからまとめてメッセージを取得しているのが分かります。
partition: 0, topic: topicPar, key: null, value: num0 partition: 1, topic: topicPar, key: null, value: num1 partition: 1, topic: topicPar, key: null, value: num3 partition: 1, topic: topicPar, key: null, value: num5 partition: 1, topic: topicPar, key: null, value: num7 partition: 1, topic: topicPar, key: null, value: num9 partition: 0, topic: topicPar, key: null, value: num2 partition: 0, topic: topicPar, key: null, value: num4 partition: 0, topic: topicPar, key: null, value: num6 partition: 0, topic: topicPar, key: null, value: num8 partition: 0, topic: topicPar, key: null, value: num10 : : partition: 1, topic: topicPar, key: null, value: num87 partition: 1, topic: topicPar, key: null, value: num89 partition: 1, topic: topicPar, key: null, value: num91 partition: 1, topic: topicPar, key: null, value: num93 partition: 0, topic: topicPar, key: null, value: num86 partition: 0, topic: topicPar, key: null, value: num88 partition: 0, topic: topicPar, key: null, value: num90 partition: 0, topic: topicPar, key: null, value: num92 partition: 0, topic: topicPar, key: null, value: num94 partition: 1, topic: topicPar, key: null, value: num95 partition: 1, topic: topicPar, key: null, value: num97 partition: 1, topic: topicPar, key: null, value: num99 partition: 0, topic: topicPar, key: null, value: num96 partition: 0, topic: topicPar, key: null, value: num98
Custom Partitioner
keyを指定するとデフォルトではkeyのハッシュで振り分けされますが、
Partitionerを定義すると、partitionの振り分けのルールを自由に定義できます。
Partitioner
CustomPartitionerを作成してみます。
ランダムなアルファベットのkeyを生成し、1文字目が大文字か小文字かで振り分けるようにしてみます。
Partitionerインターフェースを実装し、partition()メソッドに振り分けのルールを記述します。
keyの1文字目が小文字の場合、partition-0に、
大文字もの場合はpartition-1に振り分けるようにします。
CustomPartitioner.java
public class CustomPartitioner implements Partitioner { @Override public void configure(Map<String, ?> configs) { } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic); int numPartitions = partitionInfos.size(); if (keyBytes == null || !(key instanceof String)) { throw new InvalidRecordException("Unexpected key."); } String strKey = (String) key; // put to partition-0 if key starts with LowerCase if (strKey.matches("^[a-z].*")) { return numPartitions - 2; } // put to partition-1 if key starts with UpperCase if (strKey.matches("^[A-Z].*")) { return numPartitions - 1; } throw new InvalidRecordException("Unexpected key."); } @Override public void close() { System.out.println("close!!"); } }
Producer
Producerを作成します。
ランダムなアルファベットのkeyを生成して、メッセージを送信します。
ProducerWithCustomPartition.java
public class ProducerWithCustomPartition { public void syncSend() { // configuration Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.producer.client.kafka.partitioner.CustomPartitioner"); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); IntStream.range(0, 100) .forEach(i -> { String key = RandomStringUtils.randomAlphabetic(5); String value = "num" + i; ProducerRecord<String, String> record = new ProducerRecord<>("topicCus", key, value); try { // sync send Future<RecordMetadata> send = producer.send(record); RecordMetadata recordMetadata = send.get(); System.out.print("partition: " + recordMetadata.partition() + ", "); System.out.print("topic: " + recordMetadata.topic() + ", "); System.out.print("key: " + key + ", "); System.out.println("value: " + value); } catch (Exception e) { e.printStackTrace(); } }); } }
Consumer
Consumerは先程と同じものを使用。(topic名だけ変更)
実行
実行して確認してみます。
Producerログ
Producerのログを見てみると、keyの1文字目が小文字の場合partition-0に、
大文字もの場合はpartition-1に振り分けられているのが分かります
partition: 1, topic: topicCus, key: EIMTH, value: num0 partition: 0, topic: topicCus, key: oRoTy, value: num1 partition: 0, topic: topicCus, key: pnXBL, value: num2 partition: 0, topic: topicCus, key: cZrHJ, value: num3 partition: 1, topic: topicCus, key: TXAIr, value: num4 partition: 1, topic: topicCus, key: JWNFM, value: num5 partition: 1, topic: topicCus, key: KpTOV, value: num6 partition: 1, topic: topicCus, key: OmVBO, value: num7 partition: 0, topic: topicCus, key: pKjZO, value: num8 partition: 0, topic: topicCus, key: jyvdE, value: num9 partition: 1, topic: topicCus, key: KknrC, value: num10 : : partition: 0, topic: topicCus, key: zgeIN, value: num90 partition: 0, topic: topicCus, key: hhpKl, value: num91 partition: 0, topic: topicCus, key: bnRdr, value: num92 partition: 0, topic: topicCus, key: okeGP, value: num93 partition: 1, topic: topicCus, key: AnGDo, value: num94 partition: 0, topic: topicCus, key: aUosF, value: num95 partition: 0, topic: topicCus, key: blmjO, value: num96 partition: 0, topic: topicCus, key: gWvaq, value: num97 partition: 0, topic: topicCus, key: ttoMO, value: num98 partition: 0, topic: topicCus, key: sIVGM, value: num99
Consumerログ
Consumerのログを見てみると、2つのpartitionから読み込めているのが分かります。
keyの1文字目が小文字の場合partition-0、 大文字もの場合はpartition-1から読めていることが分かります。
partition: 1, topic: topicCus, key: EIMTH, value: num0 partition: 1, topic: topicCus, key: TXAIr, value: num4 partition: 1, topic: topicCus, key: JWNFM, value: num5 partition: 0, topic: topicCus, key: oRoTy, value: num1 partition: 0, topic: topicCus, key: pnXBL, value: num2 partition: 0, topic: topicCus, key: cZrHJ, value: num3 partition: 1, topic: topicCus, key: KpTOV, value: num6 partition: 1, topic: topicCus, key: OmVBO, value: num7 partition: 1, topic: topicCus, key: KknrC, value: num10 partition: 0, topic: topicCus, key: pKjZO, value: num8 partition: 0, topic: topicCus, key: jyvdE, value: num9 : : partition: 0, topic: topicCus, key: zgeIN, value: num90 partition: 0, topic: topicCus, key: hhpKl, value: num91 partition: 0, topic: topicCus, key: bnRdr, value: num92 partition: 1, topic: topicCus, key: AnGDo, value: num94 partition: 0, topic: topicCus, key: okeGP, value: num93 partition: 0, topic: topicCus, key: aUosF, value: num95 partition: 0, topic: topicCus, key: blmjO, value: num96 partition: 0, topic: topicCus, key: gWvaq, value: num97 partition: 0, topic: topicCus, key: ttoMO, value: num98 partition: 0, topic: topicCus, key: sIVGM, value: num99
sampleは下記にあげました。
github.com
終わり。