Kafka StreamsのTime Windowを試す
Kafka StreamsのTime Windowを試してみたメモです。
Kakfa StreamsではWindow処理をサポートしており、
一定時間ごとにgroup化して集計操作を行うことが出来ます。
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#windowing
前回、Kafka Streamsでword countを試したので、
それをTime Windowで集計してみようと思います。
下記バージョンで試してみます。
Dependency
gradleを使って試してみます。
下記の依存関係を追加します。
kafka-streams以外には、model生成用にlombok、serde用にjacksonを追加しています。
build.gradle
dependencies { compile 'org.apache.kafka:kafka-streams:2.1.0' compileOnly 'org.projectlombok:lombok:1.18.6' compile 'com.fasterxml.jackson.core:jackson-core:2.9.8' }
Model
word count用にCountStoreというmodelを作成します。
startとendはWindow処理の開始日時と終了日時を持つために定義しておきます。
CountStore.java
@NoArgsConstructor @Data public class CountStore { private String name; private int count; private String start; private String end; CountStore increment(String name) { this.name = name; this.count++; return this; } }
Serde
CountStore用のSerdeを実装します。
(Serdeはserialize/deserializeの意味です)
それぞれKafkaのSerializer, Deserializerインターフェイスを実装します。
jacksonでCountStoreをserialize, deserializeします。
https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/Serializer.html
https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/Deserializer.html
CountStoreSerializer.java
public class CountStoreSerializer implements Serializer<CountStore> { private ObjectMapper mapper = new ObjectMapper(); @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public byte[] serialize(String topic, CountStore data) { try { mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); return mapper.writeValueAsBytes(data); } catch (JsonProcessingException e) { e.printStackTrace(); } return new byte[0]; } @Override public void close() { } }
CountStoreDeserializer.java
public class CountStoreDeserializer implements Deserializer<CountStore> { private ObjectMapper mapper = new ObjectMapper(); @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public CountStore deserialize(String topic, byte[] data) { try { return mapper.readValue(data, CountStore.class); } catch (IOException e) { e.printStackTrace(); } return null; } @Override public void close() { } }
KafkaのSerdeインターフェイスを実装します。
serializer, deserializerに先程実装したCountStoreSerializer, CountStoreDeserializerを指定するだけです。
https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/Serde.html
CountStoreSerde.java
public class CountStoreSerde implements Serde<CountStore> { final private Serializer<CountStore> serializer; final private Deserializer<CountStore> deserializer; public CountStoreSerde() { this.serializer = new CountStoreSerializer(); this.deserializer = new CountStoreDeserializer(); } public CountStoreSerde(Serializer<CountStore> serializer, Deserializer<CountStore> deserializer) { this.serializer = serializer; this.deserializer = deserializer; } @Override public void configure(Map<String, ?> configs, boolean isKey) { serializer.configure(configs, isKey); deserializer.configure(configs, isKey); } @Override public void close() { serializer.close(); deserializer.close(); } @Override public Serializer<CountStore> serializer() { return serializer; } @Override public Deserializer<CountStore> deserializer() { return deserializer; } }
Tumbling time windows
Time Window処理にはいくつか種類があり、
Windowのサイズと、Windowの移動間隔が同じものをtumbling windowと言います。
公式の下記の図のように、Windowのサイズと同じ間隔でWindowが移動していきます。
この場合、それぞれのWindowは重ならないので、レコードはいずれか1つのWindowにのみ属します。
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#tumbling-time-windows
Consumer
consumerは下記のように実装。
TumblingWindowStream.java
// configuration Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "tumbling_window"); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); StreamsBuilder streamsBuilder = new StreamsBuilder(); KStream<String, String> kStream = streamsBuilder.stream("tumbling"); KTable<Windowed<String>, CountStore> tumblingWindowCount = kStream .groupBy((key, word) -> word) .windowedBy(TimeWindows.of(Duration.ofMillis(5_000L)).advanceBy(Duration.ofMillis(5_000L))) .aggregate(CountStore::new, (k, v, countStore) -> countStore.increment(v), Materialized.as("tumbling-counts-store").with(Serdes.String(), new CountStoreSerde()) ); tumblingWindowCount .toStream() .map((windowed, countStore) -> { String start = windowed.window().startTime().atZone(ZoneId.systemDefault()).format(formatter); String end = windowed.window().endTime().atZone(ZoneId.systemDefault()).format(formatter); countStore.setStart(start); countStore.setEnd(end); return new KeyValue<>(start, countStore); }) .to("tumbling-count", Produced.with(Serdes.String(), new CountStoreSerde())); KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties); streams.start();
処理の概要です。
Kafka Streamsでconsumeするのに必要な設定を定義します。
topic名を指定してKStreamを生成します。
このあたりは前回のKafka Streamsのコードと特に変わりなし。
// configuration Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "tumbling_window"); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); StreamsBuilder streamsBuilder = new StreamsBuilder(); KStream<String, String> kStream = streamsBuilder.stream("tumbling");
topicをconsumeして、value(word)でgroup化します。
windowedBy()でWindowを定義します。
TimeWindows.of(Duration.ofMillis(5_000L))で5秒のサイズのWindowを定義し
TimeWindows.advanceBy(Duration.ofMillis(5_000L))で5秒ごとにWindowが移動するようにします。
aggregate()で集計方法を設定します。
第1引数で集計に使用する初期値としてCountStoreを生成、
第2引数で集計方法を指定、
第3引数で集計の状態を保持するviewを指定します(key, valueのserdeを指定します)
KTable<Windowed<String>, CountStore> tumblingWindowCountTable = kStream .groupBy((key, word) -> word) .windowedBy(TimeWindows.of(Duration.ofMillis(5_000L)).advanceBy(Duration.ofMillis(5_000L))) .aggregate(CountStore::new, (k, v, countStore) -> countStore.increment(v), Materialized.as("tumbling-counts-store").with(Serdes.String(), new CountStoreSerde()) );
KTableをstream化します。
このKTabkeはwindowed KTableというWindowの情報とgroup化したkeyに関する情報を保持しており、
WindowのstartTime, endTimeを取得することが出来ます。
そこからstartTime, endTimeを取得して、CountStoreに設定します。
最後にto()で結果用のtopicへproduceします(ここでもkey, valueのserdeを指定します)
tumblingWindowCountTable .toStream() .map((windowed, countStore) -> { String start = windowed.window().startTime().atZone(ZoneId.systemDefault()).format(formatter); String end = windowed.window().endTime().atZone(ZoneId.systemDefault()).format(formatter); countStore.setStart(start); countStore.setEnd(end); return new KeyValue<>(start, countStore); }) .to("tumbling-count", Produced.with(Serdes.String(), new CountStoreSerde()));
後は普通にstreamを開始します。
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties);
streams.start();
Producer
producerは単純にkeyはincrementして、valueは4つのwordをランダムにproduceします。
1秒ごとにproduceしていきます。
public class WordProducer { public static void main(String[] args) { // configuration Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass()); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); String[] words = {"apple", "banana", "orange", "lemon"}; IntStream.range(0, 10_000) .forEach(i -> { String key = "key" + i; Random random = new Random(); String word = words[random.nextInt(words.length)]; ProducerRecord<String, String> record = new ProducerRecord<>("hopping", key, word); try { // sync send Future<RecordMetadata> send = producer.send(record); RecordMetadata recordMetadata = send.get(); System.out.println("partition: " + recordMetadata.partition() + ", topic: " + recordMetadata.topic() + ", offset: " + recordMetadata.offset() + ", key: " + key + ", value: " + word); Thread.sleep(1_000L); } catch (Exception e) { e.printStackTrace(); } }); } }
確認
consumeして確認してみます。
producerログ
partition: 0, topic: tumbling, offset: 0, key: key0, value: lemon partition: 0, topic: tumbling, offset: 1, key: key1, value: banana partition: 0, topic: tumbling, offset: 2, key: key2, value: orange partition: 0, topic: tumbling, offset: 3, key: key3, value: lemon partition: 0, topic: tumbling, offset: 4, key: key4, value: apple partition: 0, topic: tumbling, offset: 5, key: key5, value: lemon partition: 0, topic: tumbling, offset: 6, key: key6, value: orange partition: 0, topic: tumbling, offset: 7, key: key7, value: lemon partition: 0, topic: tumbling, offset: 8, key: key8, value: apple partition: 0, topic: tumbling, offset: 9, key: key9, value: apple partition: 0, topic: tumbling, offset: 10, key: key10, value: banana partition: 0, topic: tumbling, offset: 11, key: key11, value: orange partition: 0, topic: tumbling, offset: 12, key: key12, value: lemon partition: 0, topic: tumbling, offset: 13, key: key13, value: lemon partition: 0, topic: tumbling, offset: 14, key: key14, value: banana partition: 0, topic: tumbling, offset: 15, key: key15, value: banana partition: 0, topic: tumbling, offset: 16, key: key16, value: lemon partition: 0, topic: tumbling, offset: 17, key: key17, value: lemon partition: 0, topic: tumbling, offset: 18, key: key18, value: banana partition: 0, topic: tumbling, offset: 19, key: key19, value: lemon partition: 0, topic: tumbling, offset: 20, key: key20, value: orange : : :
consumerログ
2019-05-08 01:23:30.000 - 2019-05-08 01:23:35.000 name:lemon, count: 1 2019-05-08 01:23:30.000 - 2019-05-08 01:23:35.000 name:banana, count: 1 2019-05-08 01:23:30.000 - 2019-05-08 01:23:35.000 name:orange, count: 1 2019-05-08 01:23:35.000 - 2019-05-08 01:23:40.000 name:apple, count: 1 2019-05-08 01:23:35.000 - 2019-05-08 01:23:40.000 name:orange, count: 1 2019-05-08 01:23:35.000 - 2019-05-08 01:23:40.000 name:lemon, count: 3 2019-05-08 01:23:40.000 - 2019-05-08 01:23:45.000 name:apple, count: 2 2019-05-08 01:23:40.000 - 2019-05-08 01:23:45.000 name:banana, count: 1 2019-05-08 01:23:40.000 - 2019-05-08 01:23:45.000 name:orange, count: 1 2019-05-08 01:23:40.000 - 2019-05-08 01:23:45.000 name:lemon, count: 1 2019-05-08 01:23:45.000 - 2019-05-08 01:23:50.000 name:banana, count: 2 2019-05-08 01:23:45.000 - 2019-05-08 01:23:50.000 name:lemon, count: 3 2019-05-08 01:23:50.000 - 2019-05-08 01:23:55.000 name:lemon, count: 1 2019-05-08 01:23:50.000 - 2019-05-08 01:23:55.000 name:orange, count: 1 2019-05-08 01:23:50.000 - 2019-05-08 01:23:55.000 name:banana, count: 3 : : :
5秒のWindowで5秒ずつ移動して集計されているか確認してみます。
(はじめの3レコードはWindow size以下なので省略)
01:23:35 ~ 01:23:40
--Producer partition: 0, topic: tumbling, offset: 3, key: key3, value: lemon partition: 0, topic: tumbling, offset: 4, key: key4, value: apple partition: 0, topic: tumbling, offset: 5, key: key5, value: lemon partition: 0, topic: tumbling, offset: 6, key: key6, value: orange partition: 0, topic: tumbling, offset: 7, key: key7, value: lemon --Consumer 2019-05-08 01:23:35.000 - 2019-05-08 01:23:40.000 name:apple, count: 1 2019-05-08 01:23:35.000 - 2019-05-08 01:23:40.000 name:orange, count: 1 2019-05-08 01:23:35.000 - 2019-05-08 01:23:40.000 name:lemon, count: 3
01:23:40 ~ 01:23:45
--Producer partition: 0, topic: tumbling, offset: 8, key: key8, value: apple partition: 0, topic: tumbling, offset: 9, key: key9, value: apple partition: 0, topic: tumbling, offset: 10, key: key10, value: banana partition: 0, topic: tumbling, offset: 11, key: key11, value: orange partition: 0, topic: tumbling, offset: 12, key: key12, value: lemon --Consumer 2019-05-08 01:23:40.000 - 2019-05-08 01:23:45.000 name:apple, count: 2 2019-05-08 01:23:40.000 - 2019-05-08 01:23:45.000 name:banana, count: 1 2019-05-08 01:23:40.000 - 2019-05-08 01:23:45.000 name:orange, count: 1 2019-05-08 01:23:40.000 - 2019-05-08 01:23:45.000 name:lemon, count: 1
01:23:45 ~ 01:23:50
--Producer partition: 0, topic: tumbling, offset: 13, key: key13, value: lemon partition: 0, topic: tumbling, offset: 14, key: key14, value: banana partition: 0, topic: tumbling, offset: 15, key: key15, value: banana partition: 0, topic: tumbling, offset: 16, key: key16, value: lemon partition: 0, topic: tumbling, offset: 17, key: key17, value: lemon --Consumer 2019-05-08 01:23:45.000 - 2019-05-08 01:23:50.000 name:banana, count: 2 2019-05-08 01:23:45.000 - 2019-05-08 01:23:50.000 name:lemon, count: 3
5秒のWindowで、5秒ごとにカウントできています。
レコードが重複せずにカウントされています。
Hopping time windows
Windowの移動間隔がWindowのサイズよりも小さく、Windowが重なって移動するものをHopping windowと言います。
Hopping windowは、公式の図の下記のようにWindowが一定時間ごとに移動してWindowが重なります。
この場合、レコードは複数のWindowに属する場合があります。
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#hopping-time-windows
Consumer
consumerは下記のように実装。
基本的にほぼTumblingWindowのconsumerといっしょです。
HoppingWindowStream.java
// configuration Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "hopping_window"); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); StreamsBuilder streamsBuilder = new StreamsBuilder(); KStream<String, String> kStream = streamsBuilder.stream("hopping"); KTable<Windowed<String>, CountStore> tumblingWindowCountTable = kStream .groupBy((key, word) -> word) .windowedBy(TimeWindows.of(Duration.ofMillis(5_000L)).advanceBy(Duration.ofMillis(2_000L))) .aggregate(CountStore::new, (k, v, countStore) -> countStore.increment(v), Materialized.as("hopping-counts-store").with(Serdes.String(), new CountStoreSerde()) ); tumblingWindowCountTable .toStream() .map((windowed, countStore) -> { String start = windowed.window().startTime().atZone(ZoneId.systemDefault()).format(formatter); String end = windowed.window().endTime().atZone(ZoneId.systemDefault()).format(formatter); countStore.setStart(start); countStore.setEnd(end); return new KeyValue<>(start, countStore); }) .to("hopping-count", Produced.with(Serdes.String(), new CountStoreSerde())); KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties); // streams.cleanUp(); streams.start();
違っているのは下記のみで、
5秒サイズのWindowに対して、2秒間隔でWindowが移動しています。
なので、Windowが重なることになります。
.windowedBy(TimeWindows.of(Duration.ofMillis(5_000L)).advanceBy(Duration.ofMillis(2_000L)))
Producer
ProducerはTumblingWindowの時と同じものを使用
確認
consumeして確認してみます。
producerログ
partition: 0, topic: hopping, offset: 0, key: key0, value: lemon partition: 0, topic: hopping, offset: 1, key: key1, value: apple partition: 0, topic: hopping, offset: 2, key: key2, value: orange partition: 0, topic: hopping, offset: 3, key: key3, value: banana partition: 0, topic: hopping, offset: 4, key: key4, value: lemon partition: 0, topic: hopping, offset: 5, key: key5, value: banana partition: 0, topic: hopping, offset: 6, key: key6, value: orange partition: 0, topic: hopping, offset: 7, key: key7, value: apple partition: 0, topic: hopping, offset: 8, key: key8, value: orange partition: 0, topic: hopping, offset: 9, key: key9, value: orange partition: 0, topic: hopping, offset: 10, key: key10, value: apple partition: 0, topic: hopping, offset: 11, key: key11, value: banana partition: 0, topic: hopping, offset: 12, key: key12, value: lemon partition: 0, topic: hopping, offset: 13, key: key13, value: apple partition: 0, topic: hopping, offset: 14, key: key14, value: banana partition: 0, topic: hopping, offset: 15, key: key15, value: apple partition: 0, topic: hopping, offset: 16, key: key16, value: orange partition: 0, topic: hopping, offset: 17, key: key17, value: banana partition: 0, topic: hopping, offset: 18, key: key18, value: lemon partition: 0, topic: hopping, offset: 19, key: key19, value: banana partition: 0, topic: hopping, offset: 20, key: key20, value: lemon : : :
consumerログ
2019-04-19 01:51:16.000 - 2019-04-19 01:51:21.000 name:lemon, count: 1 2019-04-19 01:51:18.000 - 2019-04-19 01:51:23.000 name:lemon, count: 1 2019-04-19 01:51:16.000 - 2019-04-19 01:51:21.000 name:apple, count: 1 2019-04-19 01:51:18.000 - 2019-04-19 01:51:23.000 name:apple, count: 1 2019-04-19 01:51:20.000 - 2019-04-19 01:51:25.000 name:apple, count: 1 2019-04-19 01:51:18.000 - 2019-04-19 01:51:23.000 name:orange, count: 1 2019-04-19 01:51:20.000 - 2019-04-19 01:51:25.000 name:orange, count: 1 2019-04-19 01:51:18.000 - 2019-04-19 01:51:23.000 name:banana, count: 1 2019-04-19 01:51:20.000 - 2019-04-19 01:51:25.000 name:lemon, count: 1 2019-04-19 01:51:22.000 - 2019-04-19 01:51:27.000 name:lemon, count: 1 2019-04-19 01:51:20.000 - 2019-04-19 01:51:25.000 name:banana, count: 2 2019-04-19 01:51:22.000 - 2019-04-19 01:51:27.000 name:banana, count: 2 2019-04-19 01:51:24.000 - 2019-04-19 01:51:29.000 name:banana, count: 1 2019-04-19 01:51:22.000 - 2019-04-19 01:51:27.000 name:orange, count: 1 2019-04-19 01:51:22.000 - 2019-04-19 01:51:27.000 name:apple, count: 1 2019-04-19 01:51:24.000 - 2019-04-19 01:51:29.000 name:apple, count: 1 2019-04-19 01:51:24.000 - 2019-04-19 01:51:29.000 name:orange, count: 3 2019-04-19 01:51:26.000 - 2019-04-19 01:51:31.000 name:orange, count: 2 2019-04-19 01:51:28.000 - 2019-04-19 01:51:33.000 name:orange, count: 1 2019-04-19 01:51:26.000 - 2019-04-19 01:51:31.000 name:apple, count: 2 2019-04-19 01:51:28.000 - 2019-04-19 01:51:33.000 name:apple, count: 1 2019-04-19 01:51:26.000 - 2019-04-19 01:51:31.000 name:banana, count: 1 2019-04-19 01:51:28.000 - 2019-04-19 01:51:33.000 name:banana, count: 1 2019-04-19 01:51:30.000 - 2019-04-19 01:51:35.000 name:banana, count: 1 2019-04-19 01:51:28.000 - 2019-04-19 01:51:33.000 name:lemon, count: 1 2019-04-19 01:51:30.000 - 2019-04-19 01:51:35.000 name:lemon, count: 1 2019-04-19 01:51:28.000 - 2019-04-19 01:51:33.000 name:apple, count: 2 2019-04-19 01:51:30.000 - 2019-04-19 01:51:35.000 name:banana, count: 2 2019-04-19 01:51:30.000 - 2019-04-19 01:51:35.000 name:apple, count: 2 : : :
5秒のWindowで2秒ずつ移動して集計されているか確認してみます。
01:51:20 ~ 01:51:25
--Producer partition: 0, topic: hopping, offset: 1, key: key1, value: apple partition: 0, topic: hopping, offset: 2, key: key2, value: orange partition: 0, topic: hopping, offset: 3, key: key3, value: banana partition: 0, topic: hopping, offset: 4, key: key4, value: lemon partition: 0, topic: hopping, offset: 5, key: key5, value: banana --Consumer 2019-04-19 01:51:20.000 - 2019-04-19 01:51:25.000 name:apple, count: 1 2019-04-19 01:51:20.000 - 2019-04-19 01:51:25.000 name:orange, count: 1 2019-04-19 01:51:20.000 - 2019-04-19 01:51:25.000 name:lemon, count: 1 2019-04-19 01:51:20.000 - 2019-04-19 01:51:25.000 name:banana, count: 2
01:51:22 ~ 01:51:27
--Producer partition: 0, topic: hopping, offset: 3, key: key3, value: banana partition: 0, topic: hopping, offset: 4, key: key4, value: lemon partition: 0, topic: hopping, offset: 5, key: key5, value: banana partition: 0, topic: hopping, offset: 6, key: key6, value: orange partition: 0, topic: hopping, offset: 7, key: key7, value: apple --Consumer 2019-04-19 01:51:22.000 - 2019-04-19 01:51:27.000 name:lemon, count: 1 2019-04-19 01:51:22.000 - 2019-04-19 01:51:27.000 name:banana, count: 2 2019-04-19 01:51:22.000 - 2019-04-19 01:51:27.000 name:orange, count: 1 2019-04-19 01:51:22.000 - 2019-04-19 01:51:27.000 name:apple, count: 1
01:51:24 ~ 01:51:29
--Producer partition: 0, topic: hopping, offset: 5, key: key5, value: banana partition: 0, topic: hopping, offset: 6, key: key6, value: orange partition: 0, topic: hopping, offset: 7, key: key7, value: apple partition: 0, topic: hopping, offset: 8, key: key8, value: orange partition: 0, topic: hopping, offset: 9, key: key9, value: orange --Consumer 2019-04-19 01:51:24.000 - 2019-04-19 01:51:29.000 name:banana, count: 1 2019-04-19 01:51:24.000 - 2019-04-19 01:51:29.000 name:apple, count: 1 2019-04-19 01:51:24.000 - 2019-04-19 01:51:29.000 name:orange, count: 3
5秒のWindowで、2秒ごとにカウントできています。
2秒ごとなのでレコードが重複しているのがわかります。
こんなところです。
【参考】
https://qiita.com/masato/items/2250a757a96c7df2e83f
https://qiita.com/mkyz08/items/a3b866c46ca49c52e647
https://github.com/confluentinc/kafka-streams-examples/blob/5.2.1-post/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExample.java
https://github.com/gwenshap/kafka-streams-stockstats/blob/master/src/main/java/com/shapira/examples/streams/stockstats/StockStatsExample.java
終わり
サンプルコードは下記にあげました。
Producer
github.com
Consumer
github.com