Skip to content

Releases: nomisRev/kotlin-kafka

0.4.0

08 Apr 05:24
ccafffa
Compare
Choose a tag to compare

0.4.0

This release is sees a big new feature Publishing and a lot of minor improvements and fixes towards 1.0 in the receiver.

PublisherScope

A PublishScope, that can offer message (doesn't await ack), or publish which is offer + ack.
The block however waits all the offer inside, similar to coroutineScope and re-throws any failed offer.

With transaction block, that wraps the block in the correct transaction semantics and has same behavior of await offer. A transaction blocks cannot be nested, thanks @PublisherDSL.

publisher.publishScope {
  offer((1..10).map {
    ProducerRecord(topic.name(), "$it", "msg-$it")
  })
  publish((11..20).map {
    ProducerRecord(topic.name(), "$it", "msg-$it")
  })
  transaction {
    // transaction { } illegal to be called here DslMarker magic
    offer((21..30).map {
      ProducerRecord(topic.name(), "$it", "msg-$it")
    })
    publish((31..40).map {
      ProducerRecord(topic.name(), "$it", "msg-$it")
    })
  }// Waits until all offer finished in transaction, fails if any failed

  // looping
  (0..100).forEach {
    delay(100.milliseconds)
    val record = ProducerRecord(topic.name(), "$it", "msg-$it")
    offer(record)
  }
  
  // streaming
  flow(1..100)
    .onEach { delay(100.milliseconds) }
    .map { ProducerRecord(topic.name(), "$it", "msg-$it") }
    .collect { offer(it) }
}

See KafkaPublisherSpec for more examples.

Flow publish records

Often we need to receiver/consume events from Kafka, and as a result we need to publish new events to Kafka. That typically requires a streaming solution to produce records into Kafka, and keeping track of all published records into Kafka and their lifecycle and wiring that back into a stream is tricky. So we kotlin-kafka now offers Flow.produce build in the same style as PublisherScope!

produce will send message to Kafka, and stream Result of RecordMetadata back to the user.
It will not stop sending messages if any error occurs, you can throw it in the collector if you want the stream to stop. Otherwise, use produceOrThrow.

Any encountered errors will be sent to the collector as [Result.failure], and they will also be rethrown if the Flow completes without handling them (e.g. using Flow.catch). Check Kafka's [Callback] documentation for more information on when and which errors are thrown, and which are recoverable.

suspend fun publish messages(bootStrapServers: String, topic: String) {
 val publisherSettings = PublisherSettings(
   bootstrapServers = bootStrapServers,
   keySerializer = StringSerializer(),
   valueSerializer = StringSerializer()
 )
 (0..10_000).asFlow()
   .onEach { delay(10.milliseconds) }
   .map { index ->
     ProducerRecord<String, String>(topic.name(), index % 4, "$index", "Message $index")
   }.produce(publisherSettings)
   .collect { metadata: Result<RecordMetadata> ->
     metadata
      .onSuccess { println("partition: ${it.partition()}, offset: ${it.offset}") }
      .onFailure { println("Failed to send: $it") }
   }

All, and any feedback welcome! ☺️

What's Changed

Full Changelog: 0.3.1...0.4.0

0.3.1

22 May 07:53
Compare
Choose a tag to compare

0.3.1

Fixes breaking change introduced by KotlinX Coroutines 1.7.x, reported in #127.
This release contains no actual changes except for dependency updates.

What's Changed

Full Changelog: 0.3.0...0.3.1

0.3.0

29 Jul 10:34
20025a7
Compare
Choose a tag to compare

This release implements a custom ConsumerLoop to make strong guarantees about committing offsets, and to hide the complexity of the low-level Java SDK.

This API is inspired by Alpakka Kafka and Reactor Kafka.
More details can be found in the PR: #58

0.2.0

02 Jul 12:00
Compare
Choose a tag to compare
  • new API for committing offsets in batches, inspired by FS2-kafka
  • Several small API improvements
  • Breaking change from com.github.nomisrev to io.github.nomisrev can be refactored with find+replace.