Kafka StreamsのTime Windowを試す

Kafka StreamsのTime Windowを試してみたメモです。


Kakfa StreamsではWindow処理をサポートしており、
一定時間ごとにgroup化して集計操作を行うことが出来ます。
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#windowing

前回、Kafka Streamsでword countを試したので、
それをTime Windowで集計してみようと思います。

下記バージョンで試してみます。

  • kafka 2.1.0
  • zookeeper 3.4.13
  • Ubuntu 16
  • Java 1.8.0_181
  • kafka-streams 2.1.0

Dependency

gradleを使って試してみます。

下記の依存関係を追加します。
kafka-streams以外には、model生成用にlombok、serde用にjacksonを追加しています。

build.gradle

dependencies {
    compile 'org.apache.kafka:kafka-streams:2.1.0'
    compileOnly 'org.projectlombok:lombok:1.18.6'
    compile 'com.fasterxml.jackson.core:jackson-core:2.9.8'
}

Model

word count用にCountStoreというmodelを作成します。
startとendはWindow処理の開始日時と終了日時を持つために定義しておきます。

CountStore.java

@NoArgsConstructor
@Data
public class CountStore {
    private String name;
    private int count;
    private String start;
    private String end;

    CountStore increment(String name) {
        this.name = name;
        this.count++;
        return this;
    }
}

Serde

CountStore用のSerdeを実装します。
(Serdeはserialize/deserializeの意味です)

それぞれKafkaのSerializer, Deserializerインターフェイスを実装します。
jacksonでCountStoreをserialize, deserializeします。

https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/Serializer.html
https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/Deserializer.html

CountStoreSerializer.java

public class CountStoreSerializer implements Serializer<CountStore> {
    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, CountStore data) {
        try {
            mapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
            return mapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return new byte[0];
    }

    @Override
    public void close() {
    }
}

CountStoreDeserializer.java

public class CountStoreDeserializer implements Deserializer<CountStore> {
    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public CountStore deserialize(String topic, byte[] data) {
        try {
            return mapper.readValue(data, CountStore.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public void close() {
    }
}

KafkaのSerdeインターフェイスを実装します。
serializer, deserializerに先程実装したCountStoreSerializer, CountStoreDeserializerを指定するだけです。

https://kafka.apache.org/21/javadoc/org/apache/kafka/common/serialization/Serde.html

CountStoreSerde.java

public class CountStoreSerde implements Serde<CountStore> {
    final private Serializer<CountStore> serializer;
    final private Deserializer<CountStore> deserializer;

    public CountStoreSerde() {
        this.serializer = new CountStoreSerializer();
        this.deserializer = new CountStoreDeserializer();
    }

    public CountStoreSerde(Serializer<CountStore> serializer, Deserializer<CountStore> deserializer) {
        this.serializer = serializer;
        this.deserializer = deserializer;
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        serializer.configure(configs, isKey);
        deserializer.configure(configs, isKey);
    }

    @Override
    public void close() {
        serializer.close();
        deserializer.close();
    }

    @Override
    public Serializer<CountStore> serializer() {
        return serializer;
    }

    @Override
    public Deserializer<CountStore> deserializer() {
        return deserializer;
    }
}

Tumbling time windows

Time Window処理にはいくつか種類があり、
Windowのサイズと、Windowの移動間隔が同じものをtumbling windowと言います。

公式の下記の図のように、Windowのサイズと同じ間隔でWindowが移動していきます。
この場合、それぞれのWindowは重ならないので、レコードはいずれか1つのWindowにのみ属します。

f:id:pppurple:20190427032630p:plain:w600
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#tumbling-time-windows

Consumer

consumerは下記のように実装。

TumblingWindowStream.java

    // configuration
    Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "tumbling_window");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

    StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<String, String> kStream = streamsBuilder.stream("tumbling");

    KTable<Windowed<String>, CountStore> tumblingWindowCount = kStream
            .groupBy((key, word) -> word)
            .windowedBy(TimeWindows.of(Duration.ofMillis(5_000L)).advanceBy(Duration.ofMillis(5_000L)))
            .aggregate(CountStore::new,
                    (k, v, countStore) -> countStore.increment(v),
                    Materialized.as("tumbling-counts-store").with(Serdes.String(), new CountStoreSerde())
            );

    tumblingWindowCount
            .toStream()
            .map((windowed, countStore) -> {
                String start = windowed.window().startTime().atZone(ZoneId.systemDefault()).format(formatter);
                String end = windowed.window().endTime().atZone(ZoneId.systemDefault()).format(formatter);
                countStore.setStart(start);
                countStore.setEnd(end);
                return new KeyValue<>(start, countStore);
            })
            .to("tumbling-count", Produced.with(Serdes.String(), new CountStoreSerde()));

    KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties);

    streams.start();

処理の概要です。

Kafka Streamsでconsumeするのに必要な設定を定義します。
topic名を指定してKStreamを生成します。
このあたりは前回のKafka Streamsのコードと特に変わりなし。

    // configuration
    Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "tumbling_window");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

    StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<String, String> kStream = streamsBuilder.stream("tumbling");

topicをconsumeして、value(word)でgroup化します。
windowedBy()でWindowを定義します。
TimeWindows.of(Duration.ofMillis(5_000L))で5秒のサイズのWindowを定義し
TimeWindows.advanceBy(Duration.ofMillis(5_000L))で5秒ごとにWindowが移動するようにします。

aggregate()で集計方法を設定します。
第1引数で集計に使用する初期値としてCountStoreを生成、
第2引数で集計方法を指定、
第3引数で集計の状態を保持するviewを指定します(key, valueのserdeを指定します)

    KTable<Windowed<String>, CountStore> tumblingWindowCountTable = kStream
            .groupBy((key, word) -> word)
            .windowedBy(TimeWindows.of(Duration.ofMillis(5_000L)).advanceBy(Duration.ofMillis(5_000L)))
            .aggregate(CountStore::new,
                    (k, v, countStore) -> countStore.increment(v),
                    Materialized.as("tumbling-counts-store").with(Serdes.String(), new CountStoreSerde())
            );

KTableをstream化します。
このKTabkeはwindowed KTableというWindowの情報とgroup化したkeyに関する情報を保持しており、
WindowのstartTime, endTimeを取得することが出来ます。
そこからstartTime, endTimeを取得して、CountStoreに設定します。
最後にto()で結果用のtopicへproduceします(ここでもkey, valueのserdeを指定します)

    tumblingWindowCountTable
            .toStream()
            .map((windowed, countStore) -> {
                String start = windowed.window().startTime().atZone(ZoneId.systemDefault()).format(formatter);
                String end = windowed.window().endTime().atZone(ZoneId.systemDefault()).format(formatter);
                countStore.setStart(start);
                countStore.setEnd(end);
                return new KeyValue<>(start, countStore);
            })
            .to("tumbling-count", Produced.with(Serdes.String(), new CountStoreSerde()));

後は普通にstreamを開始します。

    KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties);

    streams.start();
Producer

producerは単純にkeyはincrementして、valueは4つのwordをランダムにproduceします。
1秒ごとにproduceしていきます。

public class WordProducer {
    public static void main(String[] args) {
        // configuration
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass());

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        String[] words = {"apple", "banana", "orange", "lemon"};

        IntStream.range(0, 10_000)
                .forEach(i -> {
                    String key = "key" + i;
                    Random random = new Random();
                    String word = words[random.nextInt(words.length)];
                    ProducerRecord<String, String> record = new ProducerRecord<>("hopping", key, word);
                    try {
                        // sync send
                        Future<RecordMetadata> send = producer.send(record);
                        RecordMetadata recordMetadata = send.get();
                        System.out.println("partition: " + recordMetadata.partition() +
                                ", topic: " + recordMetadata.topic() +
                                ", offset: " + recordMetadata.offset() +
                                ", key: " + key +
                                ", value: " + word);
                        Thread.sleep(1_000L);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
    }
}
確認

consumeして確認してみます。


producerログ

partition: 0, topic: tumbling, offset: 0, key: key0, value: lemon
partition: 0, topic: tumbling, offset: 1, key: key1, value: banana
partition: 0, topic: tumbling, offset: 2, key: key2, value: orange
partition: 0, topic: tumbling, offset: 3, key: key3, value: lemon
partition: 0, topic: tumbling, offset: 4, key: key4, value: apple
partition: 0, topic: tumbling, offset: 5, key: key5, value: lemon
partition: 0, topic: tumbling, offset: 6, key: key6, value: orange
partition: 0, topic: tumbling, offset: 7, key: key7, value: lemon
partition: 0, topic: tumbling, offset: 8, key: key8, value: apple
partition: 0, topic: tumbling, offset: 9, key: key9, value: apple
partition: 0, topic: tumbling, offset: 10, key: key10, value: banana
partition: 0, topic: tumbling, offset: 11, key: key11, value: orange
partition: 0, topic: tumbling, offset: 12, key: key12, value: lemon
partition: 0, topic: tumbling, offset: 13, key: key13, value: lemon
partition: 0, topic: tumbling, offset: 14, key: key14, value: banana
partition: 0, topic: tumbling, offset: 15, key: key15, value: banana
partition: 0, topic: tumbling, offset: 16, key: key16, value: lemon
partition: 0, topic: tumbling, offset: 17, key: key17, value: lemon
partition: 0, topic: tumbling, offset: 18, key: key18, value: banana
partition: 0, topic: tumbling, offset: 19, key: key19, value: lemon
partition: 0, topic: tumbling, offset: 20, key: key20, value: orange
                       :
                       :
                       :


consumerログ

2019-05-08 01:23:30.000 - 2019-05-08 01:23:35.000 name:lemon, count: 1
2019-05-08 01:23:30.000 - 2019-05-08 01:23:35.000 name:banana, count: 1
2019-05-08 01:23:30.000 - 2019-05-08 01:23:35.000 name:orange, count: 1
2019-05-08 01:23:35.000 - 2019-05-08 01:23:40.000 name:apple, count: 1
2019-05-08 01:23:35.000 - 2019-05-08 01:23:40.000 name:orange, count: 1
2019-05-08 01:23:35.000 - 2019-05-08 01:23:40.000 name:lemon, count: 3
2019-05-08 01:23:40.000 - 2019-05-08 01:23:45.000 name:apple, count: 2
2019-05-08 01:23:40.000 - 2019-05-08 01:23:45.000 name:banana, count: 1
2019-05-08 01:23:40.000 - 2019-05-08 01:23:45.000 name:orange, count: 1
2019-05-08 01:23:40.000 - 2019-05-08 01:23:45.000 name:lemon, count: 1
2019-05-08 01:23:45.000 - 2019-05-08 01:23:50.000 name:banana, count: 2
2019-05-08 01:23:45.000 - 2019-05-08 01:23:50.000 name:lemon, count: 3
2019-05-08 01:23:50.000 - 2019-05-08 01:23:55.000 name:lemon, count: 1
2019-05-08 01:23:50.000 - 2019-05-08 01:23:55.000 name:orange, count: 1
2019-05-08 01:23:50.000 - 2019-05-08 01:23:55.000 name:banana, count: 3
                       :
                       :
                       :


5秒のWindowで5秒ずつ移動して集計されているか確認してみます。
(はじめの3レコードはWindow size以下なので省略)

01:23:35 ~ 01:23:40

--Producer
partition: 0, topic: tumbling, offset: 3, key: key3, value: lemon
partition: 0, topic: tumbling, offset: 4, key: key4, value: apple
partition: 0, topic: tumbling, offset: 5, key: key5, value: lemon
partition: 0, topic: tumbling, offset: 6, key: key6, value: orange
partition: 0, topic: tumbling, offset: 7, key: key7, value: lemon
--Consumer
2019-05-08 01:23:35.000 - 2019-05-08 01:23:40.000 name:apple, count: 1
2019-05-08 01:23:35.000 - 2019-05-08 01:23:40.000 name:orange, count: 1
2019-05-08 01:23:35.000 - 2019-05-08 01:23:40.000 name:lemon, count: 3

01:23:40 ~ 01:23:45

--Producer
partition: 0, topic: tumbling, offset: 8, key: key8, value: apple
partition: 0, topic: tumbling, offset: 9, key: key9, value: apple
partition: 0, topic: tumbling, offset: 10, key: key10, value: banana
partition: 0, topic: tumbling, offset: 11, key: key11, value: orange
partition: 0, topic: tumbling, offset: 12, key: key12, value: lemon
--Consumer
2019-05-08 01:23:40.000 - 2019-05-08 01:23:45.000 name:apple, count: 2
2019-05-08 01:23:40.000 - 2019-05-08 01:23:45.000 name:banana, count: 1
2019-05-08 01:23:40.000 - 2019-05-08 01:23:45.000 name:orange, count: 1
2019-05-08 01:23:40.000 - 2019-05-08 01:23:45.000 name:lemon, count: 1

01:23:45 ~ 01:23:50

--Producer
partition: 0, topic: tumbling, offset: 13, key: key13, value: lemon
partition: 0, topic: tumbling, offset: 14, key: key14, value: banana
partition: 0, topic: tumbling, offset: 15, key: key15, value: banana
partition: 0, topic: tumbling, offset: 16, key: key16, value: lemon
partition: 0, topic: tumbling, offset: 17, key: key17, value: lemon
--Consumer
2019-05-08 01:23:45.000 - 2019-05-08 01:23:50.000 name:banana, count: 2
2019-05-08 01:23:45.000 - 2019-05-08 01:23:50.000 name:lemon, count: 3


5秒のWindowで、5秒ごとにカウントできています。
レコードが重複せずにカウントされています。

Hopping time windows

Windowの移動間隔がWindowのサイズよりも小さく、Windowが重なって移動するものをHopping windowと言います。

Hopping windowは、公式の図の下記のようにWindowが一定時間ごとに移動してWindowが重なります。
この場合、レコードは複数のWindowに属する場合があります。

f:id:pppurple:20190427032733p:plain:w600
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#hopping-time-windows

Consumer

consumerは下記のように実装。
基本的にほぼTumblingWindowのconsumerといっしょです。

HoppingWindowStream.java

    // configuration
    Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "hopping_window");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

    StreamsBuilder streamsBuilder = new StreamsBuilder();
    KStream<String, String> kStream = streamsBuilder.stream("hopping");

    KTable<Windowed<String>, CountStore> tumblingWindowCountTable = kStream
            .groupBy((key, word) -> word)
            .windowedBy(TimeWindows.of(Duration.ofMillis(5_000L)).advanceBy(Duration.ofMillis(2_000L)))
            .aggregate(CountStore::new,
                    (k, v, countStore) -> countStore.increment(v),
                    Materialized.as("hopping-counts-store").with(Serdes.String(), new CountStoreSerde())
            );

    tumblingWindowCountTable
            .toStream()
            .map((windowed, countStore) -> {
                String start = windowed.window().startTime().atZone(ZoneId.systemDefault()).format(formatter);
                String end = windowed.window().endTime().atZone(ZoneId.systemDefault()).format(formatter);
                countStore.setStart(start);
                countStore.setEnd(end);
                return new KeyValue<>(start, countStore);
            })
            .to("hopping-count", Produced.with(Serdes.String(), new CountStoreSerde()));

    KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), properties);

    // streams.cleanUp();

    streams.start();

違っているのは下記のみで、
5秒サイズのWindowに対して、2秒間隔でWindowが移動しています。
なので、Windowが重なることになります。

      .windowedBy(TimeWindows.of(Duration.ofMillis(5_000L)).advanceBy(Duration.ofMillis(2_000L)))
Producer

ProducerはTumblingWindowの時と同じものを使用

確認

consumeして確認してみます。


producerログ

partition: 0, topic: hopping, offset: 0, key: key0, value: lemon
partition: 0, topic: hopping, offset: 1, key: key1, value: apple
partition: 0, topic: hopping, offset: 2, key: key2, value: orange
partition: 0, topic: hopping, offset: 3, key: key3, value: banana
partition: 0, topic: hopping, offset: 4, key: key4, value: lemon
partition: 0, topic: hopping, offset: 5, key: key5, value: banana
partition: 0, topic: hopping, offset: 6, key: key6, value: orange
partition: 0, topic: hopping, offset: 7, key: key7, value: apple
partition: 0, topic: hopping, offset: 8, key: key8, value: orange
partition: 0, topic: hopping, offset: 9, key: key9, value: orange
partition: 0, topic: hopping, offset: 10, key: key10, value: apple
partition: 0, topic: hopping, offset: 11, key: key11, value: banana
partition: 0, topic: hopping, offset: 12, key: key12, value: lemon
partition: 0, topic: hopping, offset: 13, key: key13, value: apple
partition: 0, topic: hopping, offset: 14, key: key14, value: banana
partition: 0, topic: hopping, offset: 15, key: key15, value: apple
partition: 0, topic: hopping, offset: 16, key: key16, value: orange
partition: 0, topic: hopping, offset: 17, key: key17, value: banana
partition: 0, topic: hopping, offset: 18, key: key18, value: lemon
partition: 0, topic: hopping, offset: 19, key: key19, value: banana
partition: 0, topic: hopping, offset: 20, key: key20, value: lemon
                       :
                       :
                       :

consumerログ

2019-04-19 01:51:16.000 - 2019-04-19 01:51:21.000 name:lemon, count: 1
2019-04-19 01:51:18.000 - 2019-04-19 01:51:23.000 name:lemon, count: 1
2019-04-19 01:51:16.000 - 2019-04-19 01:51:21.000 name:apple, count: 1
2019-04-19 01:51:18.000 - 2019-04-19 01:51:23.000 name:apple, count: 1
2019-04-19 01:51:20.000 - 2019-04-19 01:51:25.000 name:apple, count: 1
2019-04-19 01:51:18.000 - 2019-04-19 01:51:23.000 name:orange, count: 1
2019-04-19 01:51:20.000 - 2019-04-19 01:51:25.000 name:orange, count: 1
2019-04-19 01:51:18.000 - 2019-04-19 01:51:23.000 name:banana, count: 1
2019-04-19 01:51:20.000 - 2019-04-19 01:51:25.000 name:lemon, count: 1
2019-04-19 01:51:22.000 - 2019-04-19 01:51:27.000 name:lemon, count: 1
2019-04-19 01:51:20.000 - 2019-04-19 01:51:25.000 name:banana, count: 2
2019-04-19 01:51:22.000 - 2019-04-19 01:51:27.000 name:banana, count: 2
2019-04-19 01:51:24.000 - 2019-04-19 01:51:29.000 name:banana, count: 1
2019-04-19 01:51:22.000 - 2019-04-19 01:51:27.000 name:orange, count: 1
2019-04-19 01:51:22.000 - 2019-04-19 01:51:27.000 name:apple, count: 1
2019-04-19 01:51:24.000 - 2019-04-19 01:51:29.000 name:apple, count: 1
2019-04-19 01:51:24.000 - 2019-04-19 01:51:29.000 name:orange, count: 3
2019-04-19 01:51:26.000 - 2019-04-19 01:51:31.000 name:orange, count: 2
2019-04-19 01:51:28.000 - 2019-04-19 01:51:33.000 name:orange, count: 1
2019-04-19 01:51:26.000 - 2019-04-19 01:51:31.000 name:apple, count: 2
2019-04-19 01:51:28.000 - 2019-04-19 01:51:33.000 name:apple, count: 1
2019-04-19 01:51:26.000 - 2019-04-19 01:51:31.000 name:banana, count: 1
2019-04-19 01:51:28.000 - 2019-04-19 01:51:33.000 name:banana, count: 1
2019-04-19 01:51:30.000 - 2019-04-19 01:51:35.000 name:banana, count: 1
2019-04-19 01:51:28.000 - 2019-04-19 01:51:33.000 name:lemon, count: 1
2019-04-19 01:51:30.000 - 2019-04-19 01:51:35.000 name:lemon, count: 1
2019-04-19 01:51:28.000 - 2019-04-19 01:51:33.000 name:apple, count: 2
2019-04-19 01:51:30.000 - 2019-04-19 01:51:35.000 name:banana, count: 2
2019-04-19 01:51:30.000 - 2019-04-19 01:51:35.000 name:apple, count: 2
                       :
                       :
                       :

5秒のWindowで2秒ずつ移動して集計されているか確認してみます。

01:51:20 ~ 01:51:25

--Producer
partition: 0, topic: hopping, offset: 1, key: key1, value: apple
partition: 0, topic: hopping, offset: 2, key: key2, value: orange
partition: 0, topic: hopping, offset: 3, key: key3, value: banana
partition: 0, topic: hopping, offset: 4, key: key4, value: lemon
partition: 0, topic: hopping, offset: 5, key: key5, value: banana
--Consumer
2019-04-19 01:51:20.000 - 2019-04-19 01:51:25.000 name:apple, count: 1
2019-04-19 01:51:20.000 - 2019-04-19 01:51:25.000 name:orange, count: 1
2019-04-19 01:51:20.000 - 2019-04-19 01:51:25.000 name:lemon, count: 1
2019-04-19 01:51:20.000 - 2019-04-19 01:51:25.000 name:banana, count: 2

01:51:22 ~ 01:51:27

--Producer
partition: 0, topic: hopping, offset: 3, key: key3, value: banana
partition: 0, topic: hopping, offset: 4, key: key4, value: lemon
partition: 0, topic: hopping, offset: 5, key: key5, value: banana
partition: 0, topic: hopping, offset: 6, key: key6, value: orange
partition: 0, topic: hopping, offset: 7, key: key7, value: apple
--Consumer
2019-04-19 01:51:22.000 - 2019-04-19 01:51:27.000 name:lemon, count: 1
2019-04-19 01:51:22.000 - 2019-04-19 01:51:27.000 name:banana, count: 2
2019-04-19 01:51:22.000 - 2019-04-19 01:51:27.000 name:orange, count: 1
2019-04-19 01:51:22.000 - 2019-04-19 01:51:27.000 name:apple, count: 1

01:51:24 ~ 01:51:29

--Producer
partition: 0, topic: hopping, offset: 5, key: key5, value: banana
partition: 0, topic: hopping, offset: 6, key: key6, value: orange
partition: 0, topic: hopping, offset: 7, key: key7, value: apple
partition: 0, topic: hopping, offset: 8, key: key8, value: orange
partition: 0, topic: hopping, offset: 9, key: key9, value: orange
--Consumer
2019-04-19 01:51:24.000 - 2019-04-19 01:51:29.000 name:banana, count: 1
2019-04-19 01:51:24.000 - 2019-04-19 01:51:29.000 name:apple, count: 1
2019-04-19 01:51:24.000 - 2019-04-19 01:51:29.000 name:orange, count: 3


5秒のWindowで、2秒ごとにカウントできています。
2秒ごとなのでレコードが重複しているのがわかります。


こんなところです。


【参考】
https://qiita.com/masato/items/2250a757a96c7df2e83f
https://qiita.com/mkyz08/items/a3b866c46ca49c52e647
https://github.com/confluentinc/kafka-streams-examples/blob/5.2.1-post/src/main/java/io/confluent/examples/streams/interactivequeries/WordCountInteractiveQueriesExample.java
https://github.com/gwenshap/kafka-streams-stockstats/blob/master/src/main/java/com/shapira/examples/streams/stockstats/StockStatsExample.java



終わり


サンプルコードは下記にあげました。
Producer
github.com

Consumer
github.com