Kafka Streams Scala

Thin Scala wrapper for the Kafka Streams API
Alternatives To Kafka Streams Scala
Project NameStarsDownloadsRepos Using ThisPackages Using ThisMost Recent CommitTotal ReleasesLatest ReleaseOpen IssuesLicenseLanguage
Faust6,5011415a month ago46February 25, 2020280otherPython
Python Stream Processing
Ksql5,448
a day ago1,262otherJava
The database purpose-built for stream processing applications.
Kafka Ui5,239
21 hours ago278apache-2.0Java
Open-Source Web UI for Apache Kafka Management
Liftbridge2,4132a month ago67September 09, 202244apache-2.0Go
Lightweight, fault-tolerant message streams.
Kafka Streams Examples2,136
a day ago19apache-2.0Java
Demo applications and code examples for Apache Kafka's Streams API.
Goka2,0763172 months ago62July 12, 202218bsd-3-clauseGo
Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
Flinkstreamsql1,873
5 months ago86apache-2.0Java
基于开源的flink,对其实时sql进行扩展;主要实现了流与维表的join,支持原生flink SQL所有的语法
Alpakka Kafka1,39681313 days ago37July 26, 2021110otherScala
Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
Go Streams1,344916 days ago18February 11, 202218mitGo
A lightweight stream processing library for Go
Alpakka1,249211210 days ago45November 30, 2021220otherScala
Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
Alternatives To Kafka Streams Scala
Select To Compare


Alternative Project Comparisons
Readme

kafka-streams-scala

This is a thin Scala wrapper for the Kafka Streams API. It does not intend to provide a Scala-idiomatic API, but rather intends to make the original API simpler to use from Scala. In particular, it provides the following adjustments:

  • Scala lambda expressions can be used directly
  • when aggregating and counting, counts are converted from Java Longs to Scala Longs
  • when using a flatMap operation, this lets you use a Scala Iterable
  • Serdes (Serializers/Deserializers) can be implicitly found in the scope

This API also contains a few Serdes (Serializers/Deserializers):

  • to convert Scala Int/Long/Double to/from their binary representation
  • to convert Scala Int/Long/Double to/from string representation
  • to convert case classes to/from JSON

Finally, the API provides the following extensions:

  • KStreamS.split() (see documentation below)

Usage of the Kafka Streams API in Scala

The main objects are:

  • KStreamsBuilderS as the entry point to build streams or tables
  • KStreamS as a wrapper around KStream
  • KGroupedStreamS as a wrapper around KGroupedStream
  • KTableS as a wrapper around KTable
  • KGroupedTable as a wrapper around KGroupedTable

Using the builder

With the original Java API, you would create an instance of KStreamBuilder, then use it to create streams or tables. Here, KStreamsBuilderS is an object that can be used directly:

val stream: KStreamS[String, String] = KStreamBuilderS.stream[String, String]("my-stream")

val table: KTableS[String, String] = KStreamBuilderS.table[String, String]("my-table")

When starting the application, you just need to unwrap the KStreamBuilder by calling KStreamBuilderS.inner:

val streams = new KafkaStreams(KStreamBuilderS.inner, config)

Serdes (declare them as implicit)

It is a common mistake to forget to specify Serdes when using the Java API, then resulting in class cast errors when objects are serialized or deserialized.

To work around this issue, this API requires Serdes to be used. Most of the times, it is enough to declare your Serdes as implicit values, and they will be picked up automatically:

implicit val stringSerde: Serde[String] = Serdes.String()
implicit val userSerde: Serde[User] = new MyUserSerde

val usersByIdStream = KStreamBuilderS.stream[String, User]("users-by-id")

Resolution is based on the type of the object to serialize/deserialize, so make sure you have a Serde of the appropriate type. If not, you should see an error such as:

Error:(87, 80) could not find implicit value for parameter keySerde: org.apache.kafka.common.serialization.Serde[String]

If, on the other hand, you have multiple Serdes for the same type, you might see the following error:

Error:(88, 80) ambiguous implicit values:
 both value stringSerde2 of type org.apache.kafka.common.serialization.Serde[String]
 and value stringSerde1 of type org.apache.kafka.common.serialization.Serde[String]
 match expected type org.apache.kafka.common.serialization.Serde[String]

In this case, just pass the Serde explicitly:

val usersByIdStream = KStreamBuilderS.stream[String, User]("users-by-id")(stringSerde, userSerde)

Usage of the Serdes in Scala

To convert Scala Int/Long/Double to/from their binary representation:

import com.github.aseigneurin.kafka.serialization.scala._

implicit val intSerde = IntAsStringSerde
implicit val longSerde = LongAsStringSerde
implicit val doubleSerde = DoubleAsStringSerde

To convert Scala Int/Long/Double to/from string representation:

import com.github.aseigneurin.kafka.serialization.scala._

implicit val intSerde = IntSerde
implicit val longSerde = LongSerde
implicit val doubleSerde = DoubleSerde

To convert case classes to/from JSON:

  • define a case class
  • create an instance of JsonSerde with the case class as the generic type

Example:

import com.github.aseigneurin.kafka.serialization.scala._

case class User(name: String)

implicit val stringSerde = Serdes.String
implicit val userSerde = new JsonSerde[User]

// read JSON -> case class
KStreamBuilderS.stream[String, User]("users")
  .mapValues { user => user.name }
  .to("names")

// write case class -> JSON
KStreamBuilderS.stream[String, String]("names")
  .mapValues { name => User(name) }
  .to("users")

Example

This repository contains a Scala version of the Java Word Count Demo.

Here is the code to implement a word count:

val props = new Properties()
// ...

implicit val stringSerde = Serdes.String
implicit val longSerde = LongAsStringSerde

val source = KStreamBuilderS.stream[String, String]("streams-file-input")

val counts: KTableS[String, Long] = source
  .flatMapValues { value => value.toLowerCase(Locale.getDefault).split(" ") }
  .map { (_, value) => (value, value) }
  .groupByKey
  .count("Counts")

counts.to("streams-wordcount-output")

val streams = new KafkaStreams(KStreamBuilderS.inner, props)
streams.start()

Extensions

KStreamS.split()

This method applies a predicate and returns two KStreamSs, one with the messages that match the predicate, and another one with the messages that don't match.

The two KStreamSs are returned in a tuple that can be easily deconstructed:

def isValidMessage(v: ...): Boolean = ???

val (goodMessages, badMessages) = deserializedMessages.split((k, v) => isValidMessage(v))
Popular Stream Projects
Popular Kafka Projects
Popular Control Flow Categories
Related Searches

Get A Weekly Email With Trending Projects For These Categories
No Spam. Unsubscribe easily at any time.
Scala
Stream
Kafka
Kafka Streams