Kafka Streamsを試す

Kafka Streamsを試してみたメモです。


Kafka Streamsを使って、consumeしてみたメモです。


以前までのexampleではKafka Consumerを利用してconsumeしてましたが、Kafka Streamsでconsumeしてみます。
Kafka Streamsを利用することでストリーム処理としてconsumeの処理を記述することが出来ます。
Apache Kafka


Kafka Streams APIには高レベルのStreams DSL APIと低レベルのProcessor APIがありますが、
Streams DSL APIを使って試してみます。

・ Streams DSL API
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html
Processor API
https://kafka.apache.org/21/documentation/streams/developer-guide/processor-api.html


下記バージョンで試してみます。

  • kafka 2.1.0
  • zookeeper 3.4.13
  • Ubuntu 16
  • Java 1.8.0_181
  • kafka-streams 2.1.0


Kafka 2.1 のドキュメントは下記を参照
https://kafka.apache.org/21/documentation/streams/
https://kafka.apache.org/21/documentation/streams/developer-guide/write-streams.html
https://kafka.apache.org/21/documentation/streams/developer-guide/config-streams.html
https://kafka.apache.org/21/documentation/#streamsconfigs

depencency

gradleで試してみます。
下記の依存関係を追加します。

consumer側の build.gradle

dependencies {
    compile 'org.apache.kafka:kafka-streams:2.1.0'
}

Basic

単純にkafka streamsでtopicを読んで標準出力にprintしてみます。

Producer

Producer側は100個のランダムな文字列をproduceする単純なものにします。
produceしたイベントの内容を標準出力します。

IncrementalKeyProducer.java

public static void main(String[] args) {
    // configuration
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serdes.Integer().serializer().getClass());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass());

    KafkaProducer<Integer, String> producer = new KafkaProducer<>(properties);

    IntStream.range(0, 100)
            .forEach(i -> {
                String value = RandomStringUtils.randomAlphabetic(5);
                ProducerRecord<Integer, String> record = new ProducerRecord<>("basic", i,  value);
                try {
                    // sync send
                    Future<RecordMetadata> send = producer.send(record);
                    RecordMetadata recordMetadata = send.get();
                    System.out.println("partition: " + recordMetadata.partition() +
                            ", topic: " + recordMetadata.topic() +
                            ", offset: " + recordMetadata.offset() +
                            ", key: " + i +
                            ", value: " + value);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
}
Consumer

Kafka Streamsを使ってconsumeしてみます。

StreamsBuilderを利用してストリームを作成し、
ストリームに対する処理を記述して、start()で実行するだけです。

valueの先頭が大文字で始まるものだけにフィルタリングし、標準出力します。

BasicStream.java

public static void main(String[] args) {
    // configuration
    Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "basic_stream");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<Integer, String> kStream = streamsBuilder.stream("basic");

    kStream.filter((key, value) -> value.matches("^[A-Z].+"))
            .foreach(((key, value) -> System.out.println("key: " + key + ", value: " + value)));

    KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties);

    streams.start();
}
確認

実行して確認してみます。

producerログ

partition: 0, topic: basic, offset: 0, key: 0, value: BBldC
partition: 0, topic: basic, offset: 1, key: 1, value: YJgro
partition: 0, topic: basic, offset: 2, key: 2, value: rPkCf
partition: 0, topic: basic, offset: 3, key: 3, value: dFiro
partition: 0, topic: basic, offset: 4, key: 4, value: iQlwV
partition: 0, topic: basic, offset: 5, key: 5, value: Ayvjd
partition: 0, topic: basic, offset: 6, key: 6, value: qoCqF
partition: 0, topic: basic, offset: 7, key: 7, value: RrcNF
partition: 0, topic: basic, offset: 8, key: 8, value: YJwwW
partition: 0, topic: basic, offset: 9, key: 9, value: rDknq
                :
                :
partition: 0, topic: basic, offset: 90, key: 90, value: kZRjU
partition: 0, topic: basic, offset: 91, key: 91, value: tLeEs
partition: 0, topic: basic, offset: 92, key: 92, value: DJIqk
partition: 0, topic: basic, offset: 93, key: 93, value: xIuXr
partition: 0, topic: basic, offset: 94, key: 94, value: Irysw
partition: 0, topic: basic, offset: 95, key: 95, value: oHLuI
partition: 0, topic: basic, offset: 96, key: 96, value: QKEkT
partition: 0, topic: basic, offset: 97, key: 97, value: IwjFR
partition: 0, topic: basic, offset: 98, key: 98, value: fgFNh
partition: 0, topic: basic, offset: 99, key: 99, value: cYrPT

consumerログ

key: 0, value: BBldC
key: 1, value: YJgro
key: 5, value: Ayvjd
key: 7, value: RrcNF
key: 8, value: YJwwW
          :
          :
key: 86, value: LjGHp
key: 87, value: CgqhD
key: 92, value: DJIqk
key: 94, value: Irysw
key: 96, value: QKEkT
key: 97, value: IwjFR

先頭が大文字のものだけ出力されていることが分かります。

Word Count

tutorialにもあるword countを試してみます。
https://kafka.apache.org/21/documentation/streams/
https://kafka.apache.org/21/documentation/streams/quickstart

producer

{"apple", "studio", "witch", "purple", "hollywood", "tree"}
の中からwordを100個ランダムにproduceします。
それ以外は先程のproducerと同じです。

WordProducer.java

public static void main(String[] args) {
    // configuration
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass());

    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

    String[] words = {"apple", "studio", "witch", "purple", "hollywood", "tree"};

    IntStream.range(0, 100)
            .forEach(i -> {
                String key = "key" + i;
                Random random = new Random();
                String word = words[random.nextInt(words.length)];
                ProducerRecord<String, String> record = new ProducerRecord<>("word", key, word);
                try {
                    // sync send
                    Future<RecordMetadata> send = producer.send(record);
                    RecordMetadata recordMetadata = send.get();
                    System.out.println("partition: " + recordMetadata.partition() +
                            ", topic: " + recordMetadata.topic() +
                            ", offset: " + recordMetadata.offset() +
                            ", key: " + key +
                            ", value: " + word);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
}
consumer

先程と同様にStreamでトピックを読み込みますが、
groupByで単語ごとに集計したKTableを作成し、集計の状態をcounts-storeという名前のMaterialized Viewとして保存します。

KTableをtoStream()でstreamに変換し、to()でword-countという名前のtopicに保存します。

後はこのword-countのtopicを別のconsumerで読めば結果を取得出来ます。

WordCount.java

public static void main(String[] args) {
    // configuration
    Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "word_count_stream");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<String, String> kStream = streamsBuilder.stream("word");

    KTable<String, Long> wordCounts = kStream
            .groupBy((key, word) -> word)
            .count(Materialized.as("counts-store"));
    wordCounts.toStream()
            .to("word-count", Produced.with(Serdes.String(), Serdes.Long()));

    KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties);

    streams.start();
}
確認

実行して確認してみます。

producerログ

partition: 0, topic: word, offset: 0, key: key0, value: studio
partition: 0, topic: word, offset: 1, key: key1, value: tree
partition: 0, topic: word, offset: 2, key: key2, value: witch
partition: 0, topic: word, offset: 3, key: key3, value: purple
partition: 0, topic: word, offset: 4, key: key4, value: studio
partition: 0, topic: word, offset: 5, key: key5, value: witch
partition: 0, topic: word, offset: 6, key: key6, value: apple
partition: 0, topic: word, offset: 7, key: key7, value: purple
partition: 0, topic: word, offset: 8, key: key8, value: apple
partition: 0, topic: word, offset: 9, key: key9, value: apple
                :
                :
partition: 0, topic: word, offset: 90, key: key90, value: hollywood
partition: 0, topic: word, offset: 91, key: key91, value: studio
partition: 0, topic: word, offset: 92, key: key92, value: witch
partition: 0, topic: word, offset: 93, key: key93, value: purple
partition: 0, topic: word, offset: 94, key: key94, value: tree
partition: 0, topic: word, offset: 95, key: key95, value: hollywood
partition: 0, topic: word, offset: 96, key: key96, value: purple
partition: 0, topic: word, offset: 97, key: key97, value: hollywood
partition: 0, topic: word, offset: 98, key: key98, value: purple
partition: 0, topic: word, offset: 99, key: key99, value: witch


consumerログ

apple : 16
studio : 25
tree : 12
hollywood : 15
purple : 19
witch : 13

wordごとにカウント出来ました。

Word Count2

consumer

tutorialの様に英文の中のwordをカウントしてみます。

valueに入ってくる英文を、flatMapValues()で空白で単語ごとに区切って(valueに対してflatMapするイメージ)、
groupByでカウントします。

filterで{"the", "a", "an"}の単語は除外します。

WordInTextCount.java

public static void main(String[] args) {
    // configuration
    Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my_stream");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    List<String> ignoreWords = Arrays.asList("the", "a", "an");

    StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<String, String> kStream = streamsBuilder.stream("text");

    KTable<String, Long> wordCounts = kStream
            .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
            .filter(((key, word) -> !ignoreWords.contains(word)))
            .groupBy((key, word) -> word)
            .count(Materialized.as("text-store"));
    wordCounts.toStream()
            .to("words-in-text", Produced.with(Serdes.String(), Serdes.Long()));

    KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties);

    streams.start();
}
確認

実行して確認してみます。

producerログ
英文をproduceしてみます。

partition: 0, topic: text, offset: 0, key: key0, value: The Beatles were an English rock band formed in Liverpool in 1960.
partition: 0, topic: text, offset: 1, key: key1, value: Led by primary songwriters Lennon and McCartney
partition: 0, topic: text, offset: 2, key: key2, value: the Beatles built their reputation playing clubs in Liverpool and Hamburg over a three-year period from 1960, with Stuart Sutcliffe initially serving as bass player. The core trio of Lennon, McCartney and Harrison, together since 1958, went through a succession of drummers, including Pete Best, before asking Starr to join them in 1962.
partition: 0, topic: text, offset: 3, key: key3, value: By early 1964, the Beatles were international stars, leading the British Invasion of the United States pop market and breaking numerous sales records.
partition: 0, topic: text, offset: 4, key: key4, value: The Beatles are the best-selling band in history, with estimated sales of over 800 million records worldwide.

consumerログ
単語ごとにカウントされ、ignore wordsは除外されました。

english : 1
rock : 1
formed : 1
led : 1
primary : 1
songwriters : 1
built : 1
 :
 :
in : 5
history : 1
with : 2
estimated : 1
sales : 2
of : 4
over : 2
800 : 1
million : 1
records : 2
worldwide : 1


【参考】
Kafka Streams - Qiita
噛み砕いてKafka Streams #kafkajp
初めてのKafka Streams - CLOVER🍀
Data Integration as Service: Kafka-Streamsの紹介 - Qiita


ソースは下記にあげました。
Producer
github.com

Consumer
github.com


終わり