Skip to content

Latest commit

 

History

History
187 lines (132 loc) · 5.76 KB

README.adoc

File metadata and controls

187 lines (132 loc) · 5.76 KB

Melsicon Kafka Streams Playground

Purpose

This source demonstrates how to process a stream of sensor data using Kafka Streams.

The sensors produce a stream of records, including sensor ID, a timestamp and the current state (on or off). The desired result is a stream of records enriched with the duration the sensor has been in this state.

Example

For example, a stream

Table 1. Sensor Data
Name Timestamp State

Sensor 1

1984-01-22T15:45:00Z

off

Sensor 1

1984-01-22T15:45:10Z

off

Sensor 1

1984-01-22T15:45:30Z

on

Sensor 1

1984-01-22T15:46:30Z

off

should produce

Table 2. Enriched Data
Name Timestamp State Duration

Sensor 1

1984-01-22T15:45:00Z

off

10s

Sensor 1

1984-01-22T15:45:00Z

off

30s

Sensor 1

1984-01-22T15:45:30Z

on

60s

Which tells us that “Sensor 1” was “off” from 15:45:00 for 30 seconds and “on” from 15:45:30 for 60 seconds.

Note that the second “off” reading produced an intermediate result.

Design decisions

Duplicate readings of the same state generate intermediate results, and delayed readings (timestamps preceding previously seen values) are treated as errors.

These are deliberate choices and can easily be changed.

Implementation of Business Logic

Care has been taken to keep the business logic independent of implementation details like serialization formats.

The data model is in the model directory, the business logic in topology.

The tests test the topology with seven different (de-)serializers, protocol buffers, JSON, Apache Avro, the Confluent variants of them and Amazon Ion. Since the example needs an input format, a result format and a format for the state store we have 343 (73) different combinations which are all tested.

While this abstraction might not be necessary in practice, it demonstrates two important design considerations:

  • The business logic should only depend on a data model, not capabilities of the serialization mechanism.

We can simply use Duration::between, which is a simple call and easy to understand and test, instead of cluttering our logic with conversions and unnecessary error-prone calculations.

  • The choice of (de-)serializers should depend on the requirements, not on what is just at hand.

While internal processing pipelines tend (but don’t have) to use one serialization mechanism, it perfectly valid and a good design decision to use different mechanisms for parts interfacing with external components.

Since the business logic is independent of the serialization mechanism, changing it is simple and normally does not require retesting. Be aware of subtle pitfalls tough, while our Java code uses nanosecond precision, which are correctly handled with protocol buffers and JSON, Avro truncates the time stamps to milliseconds. We would not catch these in our tests since we use only second steps for testing, but this would show up when using real world data and comparing with equals.

By refactoring the business logic to depend only on an abstract store, we speed up testing by a factor of twenty (bazel test //src/test/java/de/melsicon/kafka/topology:ProcessorTest vs. //src/test/java/de/melsicon/kafka/topology:TopologyTest), which demonstrates a potential for improvement in development speed and testability.

Running

Prerequisites

You need Bazelisk installed, with HomeBrew just use brew install bazelbuild/tap/bazelisk.

Tests

To run all tests, use

bazel test //...

To run a single test, use

bazel test //src/test/java/de/melsicon/kafka/topology:all

The tests run with an embedded Kafka and mock schema registry, when necessary.

Main App

The main app needs Kafka running at localhost, port 9092 (see application.yaml). Start it with

bazel run //:kafka-sensors

Watch the results with

kafka-console-consumer --bootstrap-server localhost:9092 --topic result-topic

and publish sensor values with

kafka-console-producer --broker-list localhost:9092 --topic input-topic

The main app runs with JSON in- and output, so you can use sample sensor values like

{"id":"7331","time":443634300.0,"state":"off"}
{"id":"7331","time":443634310.0,"state":"off"}
{"id":"7331","time":443634330.0,"state":"on"}
{"id":"7331","time":443634390.0,"state":"off"}

Benchmark

Run the JMH microbenchmarks with

bazel run //:benchmark

Notes

Mapping

As noted in Implementation of Business Logic the business login is independent of the serialization, in the spirit of hexagonal architecture. This of course requires some mapping, where we mostly use MapStruct for. This necessitates some limitations in data model naming conventions. MapStruct uses a fixed und quite unflexible accessor naming strategy, so you can’t really decide that protocol buffers should have one convention but Immutables another. Especially for Immutables we are forced to use JavaBeans-style naming convention, although this is not a JEE application.