Kafka Streamsを試す
Kafka Streamsを試してみたメモです。
Kafka Streamsを使って、consumeしてみたメモです。
以前までのexampleではKafka Consumerを利用してconsumeしてましたが、Kafka Streamsでconsumeしてみます。
Kafka Streamsを利用することでストリーム処理としてconsumeの処理を記述することが出来ます。
Apache Kafka
Kafka Streams APIには高レベルのStreams DSL APIと低レベルのProcessor APIがありますが、
Streams DSL APIを使って試してみます。
・ Streams DSL API
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html
・ Processor API
https://kafka.apache.org/21/documentation/streams/developer-guide/processor-api.html
下記バージョンで試してみます。
Kafka 2.1 のドキュメントは下記を参照
https://kafka.apache.org/21/documentation/streams/
https://kafka.apache.org/21/documentation/streams/developer-guide/write-streams.html
https://kafka.apache.org/21/documentation/streams/developer-guide/config-streams.html
https://kafka.apache.org/21/documentation/#streamsconfigs
depencency
gradleで試してみます。
下記の依存関係を追加します。
consumer側の build.gradle
dependencies {
compile 'org.apache.kafka:kafka-streams:2.1.0'
}
Basic
単純にkafka streamsでtopicを読んで標準出力にprintしてみます。
Producer
Producer側は100個のランダムな文字列をproduceする単純なものにします。
produceしたイベントの内容を標準出力します。
IncrementalKeyProducer.java
public static void main(String[] args) { // configuration Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serdes.Integer().serializer().getClass()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass()); KafkaProducer<Integer, String> producer = new KafkaProducer<>(properties); IntStream.range(0, 100) .forEach(i -> { String value = RandomStringUtils.randomAlphabetic(5); ProducerRecord<Integer, String> record = new ProducerRecord<>("basic", i, value); try { // sync send Future<RecordMetadata> send = producer.send(record); RecordMetadata recordMetadata = send.get(); System.out.println("partition: " + recordMetadata.partition() + ", topic: " + recordMetadata.topic() + ", offset: " + recordMetadata.offset() + ", key: " + i + ", value: " + value); } catch (Exception e) { e.printStackTrace(); } }); }
Consumer
Kafka Streamsを使ってconsumeしてみます。
StreamsBuilderを利用してストリームを作成し、
ストリームに対する処理を記述して、start()で実行するだけです。
valueの先頭が大文字で始まるものだけにフィルタリングし、標準出力します。
BasicStream.java
public static void main(String[] args) { // configuration Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "basic_stream"); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder streamsBuilder = new StreamsBuilder(); KStream<Integer, String> kStream = streamsBuilder.stream("basic"); kStream.filter((key, value) -> value.matches("^[A-Z].+")) .foreach(((key, value) -> System.out.println("key: " + key + ", value: " + value))); KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties); streams.start(); }
確認
実行して確認してみます。
producerログ
partition: 0, topic: basic, offset: 0, key: 0, value: BBldC partition: 0, topic: basic, offset: 1, key: 1, value: YJgro partition: 0, topic: basic, offset: 2, key: 2, value: rPkCf partition: 0, topic: basic, offset: 3, key: 3, value: dFiro partition: 0, topic: basic, offset: 4, key: 4, value: iQlwV partition: 0, topic: basic, offset: 5, key: 5, value: Ayvjd partition: 0, topic: basic, offset: 6, key: 6, value: qoCqF partition: 0, topic: basic, offset: 7, key: 7, value: RrcNF partition: 0, topic: basic, offset: 8, key: 8, value: YJwwW partition: 0, topic: basic, offset: 9, key: 9, value: rDknq : : partition: 0, topic: basic, offset: 90, key: 90, value: kZRjU partition: 0, topic: basic, offset: 91, key: 91, value: tLeEs partition: 0, topic: basic, offset: 92, key: 92, value: DJIqk partition: 0, topic: basic, offset: 93, key: 93, value: xIuXr partition: 0, topic: basic, offset: 94, key: 94, value: Irysw partition: 0, topic: basic, offset: 95, key: 95, value: oHLuI partition: 0, topic: basic, offset: 96, key: 96, value: QKEkT partition: 0, topic: basic, offset: 97, key: 97, value: IwjFR partition: 0, topic: basic, offset: 98, key: 98, value: fgFNh partition: 0, topic: basic, offset: 99, key: 99, value: cYrPT
consumerログ
key: 0, value: BBldC key: 1, value: YJgro key: 5, value: Ayvjd key: 7, value: RrcNF key: 8, value: YJwwW : : key: 86, value: LjGHp key: 87, value: CgqhD key: 92, value: DJIqk key: 94, value: Irysw key: 96, value: QKEkT key: 97, value: IwjFR
先頭が大文字のものだけ出力されていることが分かります。
Word Count
tutorialにもあるword countを試してみます。
https://kafka.apache.org/21/documentation/streams/
https://kafka.apache.org/21/documentation/streams/quickstart
producer
{"apple", "studio", "witch", "purple", "hollywood", "tree"}
の中からwordを100個ランダムにproduceします。
それ以外は先程のproducerと同じです。
WordProducer.java
public static void main(String[] args) { // configuration Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass()); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); String[] words = {"apple", "studio", "witch", "purple", "hollywood", "tree"}; IntStream.range(0, 100) .forEach(i -> { String key = "key" + i; Random random = new Random(); String word = words[random.nextInt(words.length)]; ProducerRecord<String, String> record = new ProducerRecord<>("word", key, word); try { // sync send Future<RecordMetadata> send = producer.send(record); RecordMetadata recordMetadata = send.get(); System.out.println("partition: " + recordMetadata.partition() + ", topic: " + recordMetadata.topic() + ", offset: " + recordMetadata.offset() + ", key: " + key + ", value: " + word); } catch (Exception e) { e.printStackTrace(); } }); }
consumer
先程と同様にStreamでトピックを読み込みますが、
groupByで単語ごとに集計したKTableを作成し、集計の状態をcounts-storeという名前のMaterialized Viewとして保存します。
KTableをtoStream()でstreamに変換し、to()でword-countという名前のtopicに保存します。
後はこのword-countのtopicを別のconsumerで読めば結果を取得出来ます。
WordCount.java
public static void main(String[] args) { // configuration Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "word_count_stream"); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder streamsBuilder = new StreamsBuilder(); KStream<String, String> kStream = streamsBuilder.stream("word"); KTable<String, Long> wordCounts = kStream .groupBy((key, word) -> word) .count(Materialized.as("counts-store")); wordCounts.toStream() .to("word-count", Produced.with(Serdes.String(), Serdes.Long())); KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties); streams.start(); }
確認
実行して確認してみます。
producerログ
partition: 0, topic: word, offset: 0, key: key0, value: studio partition: 0, topic: word, offset: 1, key: key1, value: tree partition: 0, topic: word, offset: 2, key: key2, value: witch partition: 0, topic: word, offset: 3, key: key3, value: purple partition: 0, topic: word, offset: 4, key: key4, value: studio partition: 0, topic: word, offset: 5, key: key5, value: witch partition: 0, topic: word, offset: 6, key: key6, value: apple partition: 0, topic: word, offset: 7, key: key7, value: purple partition: 0, topic: word, offset: 8, key: key8, value: apple partition: 0, topic: word, offset: 9, key: key9, value: apple : : partition: 0, topic: word, offset: 90, key: key90, value: hollywood partition: 0, topic: word, offset: 91, key: key91, value: studio partition: 0, topic: word, offset: 92, key: key92, value: witch partition: 0, topic: word, offset: 93, key: key93, value: purple partition: 0, topic: word, offset: 94, key: key94, value: tree partition: 0, topic: word, offset: 95, key: key95, value: hollywood partition: 0, topic: word, offset: 96, key: key96, value: purple partition: 0, topic: word, offset: 97, key: key97, value: hollywood partition: 0, topic: word, offset: 98, key: key98, value: purple partition: 0, topic: word, offset: 99, key: key99, value: witch
consumerログ
apple : 16 studio : 25 tree : 12 hollywood : 15 purple : 19 witch : 13
wordごとにカウント出来ました。
Word Count2
consumer
tutorialの様に英文の中のwordをカウントしてみます。
valueに入ってくる英文を、flatMapValues()で空白で単語ごとに区切って(valueに対してflatMapするイメージ)、
groupByでカウントします。
filterで{"the", "a", "an"}の単語は除外します。
WordInTextCount.java
public static void main(String[] args) { // configuration Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my_stream"); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); List<String> ignoreWords = Arrays.asList("the", "a", "an"); StreamsBuilder streamsBuilder = new StreamsBuilder(); KStream<String, String> kStream = streamsBuilder.stream("text"); KTable<String, Long> wordCounts = kStream .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))) .filter(((key, word) -> !ignoreWords.contains(word))) .groupBy((key, word) -> word) .count(Materialized.as("text-store")); wordCounts.toStream() .to("words-in-text", Produced.with(Serdes.String(), Serdes.Long())); KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties); streams.start(); }
確認
実行して確認してみます。
producerログ
英文をproduceしてみます。
partition: 0, topic: text, offset: 0, key: key0, value: The Beatles were an English rock band formed in Liverpool in 1960. partition: 0, topic: text, offset: 1, key: key1, value: Led by primary songwriters Lennon and McCartney partition: 0, topic: text, offset: 2, key: key2, value: the Beatles built their reputation playing clubs in Liverpool and Hamburg over a three-year period from 1960, with Stuart Sutcliffe initially serving as bass player. The core trio of Lennon, McCartney and Harrison, together since 1958, went through a succession of drummers, including Pete Best, before asking Starr to join them in 1962. partition: 0, topic: text, offset: 3, key: key3, value: By early 1964, the Beatles were international stars, leading the British Invasion of the United States pop market and breaking numerous sales records. partition: 0, topic: text, offset: 4, key: key4, value: The Beatles are the best-selling band in history, with estimated sales of over 800 million records worldwide.
consumerログ
単語ごとにカウントされ、ignore wordsは除外されました。
english : 1 rock : 1 formed : 1 led : 1 primary : 1 songwriters : 1 built : 1 : : in : 5 history : 1 with : 2 estimated : 1 sales : 2 of : 4 over : 2 800 : 1 million : 1 records : 2 worldwide : 1
【参考】
Kafka Streams - Qiita
噛み砕いてKafka Streams #kafkajp
初めてのKafka Streams - CLOVER🍀
Data Integration as Service: Kafka-Streamsの紹介 - Qiita
ソースは下記にあげました。
Producer
github.com
Consumer
github.com
終わり