Mockedstreams

Scala DSL for Unit-Testing Processing Topologies in Kafka Streams
Alternatives To Mockedstreams
Project NameStarsDownloadsRepos Using ThisPackages Using ThisMost Recent CommitTotal ReleasesLatest ReleaseOpen IssuesLicenseLanguage
Faust6,5011415a month ago46February 25, 2020280otherPython
Python Stream Processing
Ksql5,448
a day ago1,262otherJava
The database purpose-built for stream processing applications.
Kafka Ui5,239
a day ago278apache-2.0Java
Open-Source Web UI for Apache Kafka Management
Liftbridge2,4132a month ago67September 09, 202244apache-2.0Go
Lightweight, fault-tolerant message streams.
Kafka Streams Examples2,136
a day ago19apache-2.0Java
Demo applications and code examples for Apache Kafka's Streams API.
Goka2,0763172 months ago62July 12, 202218bsd-3-clauseGo
Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
Flinkstreamsql1,873
5 months ago86apache-2.0Java
基于开源的flink,对其实时sql进行扩展;主要实现了流与维表的join,支持原生flink SQL所有的语法
Alpakka Kafka1,39681313 days ago37July 26, 2021110otherScala
Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
Go Streams1,344916 days ago18February 11, 202218mitGo
A lightweight stream processing library for Go
Alpakka1,249211210 days ago45November 30, 2021220otherScala
Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
Alternatives To Mockedstreams
Select To Compare


Alternative Project Comparisons
Readme

Mocked Streams

Build Status Codacy Badge codecov License GitHub stars Maven Central

Documentation located at http://mockedstreams.madewithtea.com/

Mocked Streams 3.9.0 (git) is a library for Scala 2.12 and 2.13 which allows you to unit-test processing topologies of Kafka Streams applications (since Apache Kafka >=0.10.1) without Zookeeper and Kafka Brokers. Further, you can use your favourite Scala testing framework e.g. ScalaTest and Specs2. Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your SBT dependencies:

libraryDependencies += "com.madewithtea" %% "mockedstreams" % "3.9.0" % "test"

Java 8 port of Mocked Streams is Mockafka

Apache Kafka Compatibility

Mocked Streams Version Apache Kafka Version
3.9.0 2.7.0.0
3.8.0 2.6.1.0
3.7.0 2.5.0.0
3.6.0 2.4.1.0
3.5.2 2.4.0.0
3.5.1 2.4.0.0
3.5.0 2.4.0.0
3.4.0 2.3.0.0
3.3.0 2.2.0.0
3.2.0 2.1.1.0
3.1.0 2.1.0.0
2.2.0 2.1.0.0
2.1.0 2.0.0.0
2.0.0 2.0.0.0
1.8.0 1.1.1.0
1.7.0 1.1.0.0
1.6.0 1.0.1.0
1.5.0 1.0.0.0
1.4.0 0.11.0.1
1.3.0 0.11.0.0
1.2.1 0.10.2.1
1.2.0 0.10.2.0
1.1.0 0.10.1.1
1.0.0 0.10.1.0

Simple Example

It wraps the org.apache.kafka.streams.TopologyTestDriver class, but adds more syntactic sugar to keep your test code simple:

import com.madewithtea.mockedstreams.MockedStreams

val input = Seq(("x", "v1"), ("y", "v2"))
val exp = Seq(("x", "V1"), ("y", "V2"))
val strings = Serdes.String()

MockedStreams()
  .topology { builder => builder.stream(...) [...] } // Scala DSL
  .input("topic-in", strings, strings, input)
  .output("topic-out", strings, strings) shouldEqual exp

Multiple Input / Output Example and State

It also allows you to have multiple input and output streams. If your topology uses state stores you need to define them using .stores(stores: Seq[String]):

import com.madewithtea.mockedstreams.MockedStreams

val mstreams = MockedStreams()
  .topology { builder => builder.stream(...) [...] } // Scala DSL
  .input("in-a", strings, ints, inputA)
  .input("in-b", strings, ints, inputB)
  .stores(Seq("store-name"))

mstreams.output("out-a", strings, ints) shouldEqual(expectedA)
mstreams.output("out-b", strings, ints) shouldEqual(expectedB)

Record order and multiple emissions

The records provided to the mocked stream will be submitted to the topology during the test in the order in which they appear in the fixture. You can also submit records multiple times to the same topics, at various moments in your scenario.

This can be handy to validate that your topology behaviour is or is not dependent on the order in which the records are received and processed.

In the example below, 2 records are first submitted to topic A, then 3 to topic B, then 1 more to topic A again.

val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2)))
val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5)))
val secondInputForTopicA = Seq(("y", int(4)))

val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9))

val builder = MockedStreams()
  .topology(topologyTables) // Scala DSL
  .input(InputATopic, strings, ints, firstInputForTopicA)
  .input(InputBTopic, strings, ints, firstInputForTopicB)
  .input(InputATopic, strings, ints, secondInputForTopicA)

State Store

When you define your state stores via .stores(stores: Seq[String]) since 1.2, you are able to verify the state store content via the .stateTable(name: String) method:

import com.madewithtea.mockedstreams.MockedStreams

 val mstreams = MockedStreams()
  .topology { builder => builder.stream(...) [...] } // Scala DSL
  .input("in-a", strings, ints, inputA)
  .input("in-b", strings, ints, inputB)
  .stores(Seq("store-name"))

 mstreams.stateTable("store-name") shouldEqual Map('a' -> 1) 

Window State Store

When you define your state stores via .stores(stores: Seq[String]) since 1.2 and added the timestamp extractor to the config, you are able to verify the window state store content via the .windowStateTable(name: String, key: K) method:

import com.madewithtea.mockedstreams.MockedStreams

val props = new Properties
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
  classOf[TimestampExtractors.CustomTimestampExtractor].getName)

val mstreams = MockedStreams()
  .topology { builder => builder.stream(...) [...] } // Scala DSL
  .input("in-a", strings, ints, inputA)
  .stores(Seq("store-name"))
  .config(props)

mstreams.windowStateTable("store-name", "x") shouldEqual someMapX
mstreams.windowStateTable("store-name", "y") shouldEqual someMapY

Adding Timestamps

With .input the input records timestamps are set to 0 default timestamp of 0. This e.g. prevents testing Join windows of Kafka streams as it cannot produce records with different timestamps. However, using .inputWithTime allows adding timestamps like in the following example:

val inputA = Seq(
  ("x", int(1), 1000L),
  ("x", int(1), 1001L),
  ("x", int(1), 1002L)
)

val builder = MockedStreams()
  .topology(topology1WindowOutput) // Scala DSL
  .inputWithTime(InputCTopic, strings, ints, inputA)
  .stores(Seq(StoreName))

Custom Streams Configuration

Sometimes you need to pass a custom configuration to Kafka Streams:

import com.madewithtea.mockedstreams.MockedStreams

  val props = new Properties
  props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[CustomExtractor].getName)

  val mstreams = MockedStreams()
  .topology { builder => builder.stream(...) [...] } // Scala DSL
  .config(props)
  .input("in-a", strings, ints, inputA)
  .input("in-b", strings, ints, inputB)
  .stores(Seq("store-name"))

mstreams.output("out-a", strings, ints) shouldEqual(expectedA)
mstreams.output("out-b", strings, ints) shouldEqual(expectedB)

Companies using Mocked Streams

Popular Stream Projects
Popular Kafka Projects
Popular Control Flow Categories
Related Searches

Get A Weekly Email With Trending Projects For These Categories
No Spam. Unsubscribe easily at any time.
Testing
Scala
Stream
Kafka
Topology
Unit Testing
Kafka Streams