Project Name | Stars | Downloads | Repos Using This | Packages Using This | Most Recent Commit | Total Releases | Latest Release | Open Issues | License | Language |
---|---|---|---|---|---|---|---|---|---|---|
Faust | 6,501 | 14 | 15 | a month ago | 46 | February 25, 2020 | 280 | other | Python | |
Python Stream Processing | ||||||||||
Ksql | 5,448 | a day ago | 1,262 | other | Java | |||||
The database purpose-built for stream processing applications. | ||||||||||
Kafka Ui | 5,239 | 21 hours ago | 278 | apache-2.0 | Java | |||||
Open-Source Web UI for Apache Kafka Management | ||||||||||
Liftbridge | 2,413 | 2 | a month ago | 67 | September 09, 2022 | 44 | apache-2.0 | Go | ||
Lightweight, fault-tolerant message streams. | ||||||||||
Kafka Streams Examples | 2,136 | a day ago | 19 | apache-2.0 | Java | |||||
Demo applications and code examples for Apache Kafka's Streams API. | ||||||||||
Goka | 2,076 | 3 | 17 | 2 months ago | 62 | July 12, 2022 | 18 | bsd-3-clause | Go | |
Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go. | ||||||||||
Flinkstreamsql | 1,873 | 5 months ago | 86 | apache-2.0 | Java | |||||
基于开源的flink,对其实时sql进行扩展;主要实现了流与维表的join,支持原生flink SQL所有的语法 | ||||||||||
Alpakka Kafka | 1,396 | 8 | 131 | 3 days ago | 37 | July 26, 2021 | 110 | other | Scala | |
Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka. | ||||||||||
Go Streams | 1,344 | 9 | 16 days ago | 18 | February 11, 2022 | 18 | mit | Go | ||
A lightweight stream processing library for Go | ||||||||||
Alpakka | 1,249 | 2 | 112 | 10 days ago | 45 | November 30, 2021 | 220 | other | Scala | |
Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka. |
This is a thin Scala wrapper for the Kafka Streams API. It does not intend to provide a Scala-idiomatic API, but rather intends to make the original API simpler to use from Scala. In particular, it provides the following adjustments:
Long
s to Scala Long
sflatMap
operation, this lets you use a Scala Iterable
Serde
s (Serializers/Deserializers) can be implicitly found in the scopeThis API also contains a few Serde
s (Serializers/Deserializers):
Int
/Long
/Double
to/from their binary representationInt
/Long
/Double
to/from string representationFinally, the API provides the following extensions:
KStreamS.split()
(see documentation below)The main objects are:
KStreamsBuilderS
as the entry point to build streams or tablesKStreamS
as a wrapper around KStream
KGroupedStreamS
as a wrapper around KGroupedStream
KTableS
as a wrapper around KTable
KGroupedTable
as a wrapper around KGroupedTable
With the original Java API, you would create an instance of KStreamBuilder
, then use it to create streams or tables. Here, KStreamsBuilderS
is an object
that can be used directly:
val stream: KStreamS[String, String] = KStreamBuilderS.stream[String, String]("my-stream")
val table: KTableS[String, String] = KStreamBuilderS.table[String, String]("my-table")
When starting the application, you just need to unwrap the KStreamBuilder
by calling KStreamBuilderS.inner
:
val streams = new KafkaStreams(KStreamBuilderS.inner, config)
It is a common mistake to forget to specify Serde
s when using the Java API, then resulting in class cast errors when objects are serialized or deserialized.
To work around this issue, this API requires Serde
s to be used. Most of the times, it is enough to declare your Serde
s as implicit
values, and they will be picked up automatically:
implicit val stringSerde: Serde[String] = Serdes.String()
implicit val userSerde: Serde[User] = new MyUserSerde
val usersByIdStream = KStreamBuilderS.stream[String, User]("users-by-id")
Resolution is based on the type of the object to serialize/deserialize, so make sure you have a Serde
of the appropriate type. If not, you should see an error such as:
Error:(87, 80) could not find implicit value for parameter keySerde: org.apache.kafka.common.serialization.Serde[String]
If, on the other hand, you have multiple Serde
s for the same type, you might see the following error:
Error:(88, 80) ambiguous implicit values:
both value stringSerde2 of type org.apache.kafka.common.serialization.Serde[String]
and value stringSerde1 of type org.apache.kafka.common.serialization.Serde[String]
match expected type org.apache.kafka.common.serialization.Serde[String]
In this case, just pass the Serde
explicitly:
val usersByIdStream = KStreamBuilderS.stream[String, User]("users-by-id")(stringSerde, userSerde)
To convert Scala Int
/Long
/Double
to/from their binary representation:
import com.github.aseigneurin.kafka.serialization.scala._
implicit val intSerde = IntAsStringSerde
implicit val longSerde = LongAsStringSerde
implicit val doubleSerde = DoubleAsStringSerde
To convert Scala Int
/Long
/Double
to/from string representation:
import com.github.aseigneurin.kafka.serialization.scala._
implicit val intSerde = IntSerde
implicit val longSerde = LongSerde
implicit val doubleSerde = DoubleSerde
To convert case classes to/from JSON:
case class
JsonSerde
with the case class as the generic typeExample:
import com.github.aseigneurin.kafka.serialization.scala._
case class User(name: String)
implicit val stringSerde = Serdes.String
implicit val userSerde = new JsonSerde[User]
// read JSON -> case class
KStreamBuilderS.stream[String, User]("users")
.mapValues { user => user.name }
.to("names")
// write case class -> JSON
KStreamBuilderS.stream[String, String]("names")
.mapValues { name => User(name) }
.to("users")
This repository contains a Scala version of the Java Word Count Demo.
Here is the code to implement a word count:
val props = new Properties()
// ...
implicit val stringSerde = Serdes.String
implicit val longSerde = LongAsStringSerde
val source = KStreamBuilderS.stream[String, String]("streams-file-input")
val counts: KTableS[String, Long] = source
.flatMapValues { value => value.toLowerCase(Locale.getDefault).split(" ") }
.map { (_, value) => (value, value) }
.groupByKey
.count("Counts")
counts.to("streams-wordcount-output")
val streams = new KafkaStreams(KStreamBuilderS.inner, props)
streams.start()
This method applies a predicate and returns two KStreamS
s, one with the messages that match the predicate, and another one with the messages that don't match.
The two KStreamS
s are returned in a tuple that can be easily deconstructed:
def isValidMessage(v: ...): Boolean = ???
val (goodMessages, badMessages) = deserializedMessages.split((k, v) => isValidMessage(v))