Kafka Scala Examples

Examples of Avro, Kafka, Schema Registry, Kafka Streams, Interactive Queries, KSQL, Kafka Connect in Scala
Alternatives To Kafka Scala Examples
Project NameStarsDownloadsRepos Using ThisPackages Using ThisMost Recent CommitTotal ReleasesLatest ReleaseOpen IssuesLicenseLanguage
Cp Demo463
8 hours ago17apache-2.0Shell
Confluent Platform Demo including Apache Kafka, ksqlDB, Control Center, Schema Registry, Security, Schema Linking, and Cluster Linking
Apicurio Registry39413 hours ago28September 23, 2022192apache-2.0Java
An API/Schema registry - stores APIs and Schemas.
Gomicrorpc137
3 years agogpl-3.0Go
go micro research
Stream Registry10928 days ago33August 25, 202249apache-2.0Java
Stream Discovery and Stream Orchestration
Invoke Altdsbackdoor107
7 years agoPowerShell
Jsm.go1023117 days ago166May 27, 20223apache-2.0Go
JetStream Management Library for Golang
Embedded Kafka Schema Registry922a month ago31July 22, 2022mitScala
A library that provides in-memory instances of both Kafka and Confluent Schema Registry to run your tests against.
Pravega Samples51
6 months ago19apache-2.0Java
Sample Applications for Pravega.
Registry Follower Tutorial42
3 years agoiscJavaScript
write you a registry follower for great good
Kafka Scala Examples42
8 months ago23Scala
Examples of Avro, Kafka, Schema Registry, Kafka Streams, Interactive Queries, KSQL, Kafka Connect in Scala
Alternatives To Kafka Scala Examples
Select To Compare


Alternative Project Comparisons
Readme

kafka-scala-examples

Build Status Scala Steward badge

Examples in Scala of

Local environment

# start locally
# - zookeeper
# - kafka
# - kafka-rest
# - kafka-ui
# - schema-registry
# - schema-registry-ui
# - ksql-server
# - ksql-cli
# - kafka-connect
# - kafka-connect-ui
docker-compose up

# (mac|linux) view kafka ui
[open|xdg-open] http://localhost:8000

# (mac|linux) view schema-registry ui
[open|xdg-open] http://localhost:8001

# (mac|linux) view kafka-connect ui
[open|xdg-open] http://localhost:8002

# cleanup
docker-compose down -v

If containers are crashing, make sure you have enough resources

# verify memory and cpu usage
docker ps -q | xargs docker stats --no-stream

# verify status
docker inspect <CONTAINER_NAME> | jq '.[].State'

avro

Description

Avro serialization and deserialization examples of

Demo

# console
sbt avro/console

# generate avro classes
# avro/target/scala-2.12/classes/com/kafka/demo/User.class
sbt clean avro/compile

# test
sbt clean avro/test

Resources

kafka

Description

Kafka apis example of

Demo

# access kafka
docker exec -it local-kafka bash

# create topic
# convention <MESSAGE_TYPE>.<DATASET_NAME>.<DATA_NAME>
# example [example.no-schema.original|example.no-schema.cakesolutions]
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic <TOPIC_NAME>

# delete topic
kafka-topics --zookeeper zookeeper:2181 \
  --delete --topic <TOPIC_NAME>

# view topic
kafka-topics --zookeeper zookeeper:2181 --list 
kafka-topics --zookeeper zookeeper:2181 --describe --topic <TOPIC_NAME>

# view topic offset
kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list kafka:9092 \
  --time -1 \
  --topic <TOPIC_NAME>

# list consumer groups
kafka-consumer-groups --bootstrap-server kafka:9092 --list

# view consumer group offset
kafka-consumer-groups \
  --bootstrap-server kafka:9092 \
  --group <GROUP_NAME> \
  --describe

# reset consumer group offset
kafka-consumer-groups \
  --bootstrap-server kafka:9092 \
  --group <GROUP_NAME> \
  --topic <TOPIC_NAME> \
  --reset-offsets \
  --to-earliest \
  --execute

# console producer
kafka-console-producer --broker-list kafka:9092 --topic <TOPIC_NAME>
kafkacat -P -b 0 -t <TOPIC_NAME>

# console consumer
kafka-console-consumer --bootstrap-server kafka:9092 --topic <TOPIC_NAME> --from-beginning
kafkacat -C -b 0 -t <TOPIC_NAME>

# producer example
sbt "kafka/runMain com.kafka.demo.original.Producer"
sbt "kafka/runMain com.kafka.demo.cakesolutions.Producer"

# consumer example
sbt "kafka/runMain com.kafka.demo.original.Consumer"
sbt "kafka/runMain com.kafka.demo.cakesolutions.Consumer"

# test
sbt clean kafka/test
sbt "test:testOnly *KafkaSpec"

Resources

schema-registry

Description

# register schema
# convention <TOPIC_NAME>-key or <TOPIC_NAME>-value
http -v POST :8081/subjects/example.with-schema.simple-value/versions \
  Accept:application/vnd.schemaregistry.v1+json \
  schema='{"type":"string"}'

# import schema from file
http -v POST :8081/subjects/example.with-schema.user-value/versions \
  Accept:application/vnd.schemaregistry.v1+json \
  [email protected]/src/main/avro/user.avsc

# export schema to file
http :8081/subjects/example.with-schema.user-value/versions/latest \
  | jq -r '.schema|fromjson' \
  | tee avro/src/main/avro/user-latest.avsc

# list subjects
http -v :8081/subjects

# list subject's versions
http -v :8081/subjects/example.with-schema.simple-value/versions

# fetch by version
http -v :8081/subjects/example.with-schema.simple-value/versions/1

# fetch by id
http -v :8081/schemas/ids/1

# test compatibility
http -v POST :8081/compatibility/subjects/example.with-schema.simple-value/versions/latest \
  Accept:application/vnd.schemaregistry.v1+json \
  schema='{"type":"string"}'

# delete version
http -v DELETE :8081/subjects/example.with-schema.simple-value/versions/1

# delete latest version
http -v DELETE :8081/subjects/example.with-schema.simple-value/versions/latest

# delete subject
http -v DELETE :8081/subjects/example.with-schema.simple-value

# stringify
jq tostring avro/src/main/avro/user.avsc

Demo

# generate SpecificRecord classes under "schema-registry/target/scala-2.12/src_managed/main/compiled_avro"
sbt clean schema-registry/avroScalaGenerateSpecific

# (optional) create schema
http -v POST :8081/subjects/example.with-schema.payment-key/versions \
  Accept:application/vnd.schemaregistry.v1+json \
  schema='{"type":"string"}'
http -v POST :8081/subjects/example.with-schema.payment-value/versions \
  Accept:application/vnd.schemaregistry.v1+json \
  [email protected]/src/main/avro/Payment.avsc

# access kafka
docker exec -it local-kafka bash

# (optional) create topic
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic example.with-schema.payment

# console producer (binary)
kafka-console-producer --broker-list kafka:9092 --topic example.with-schema.payment

# console consumer (binary)
kafka-console-consumer --bootstrap-server kafka:9092 --topic example.with-schema.payment

# access schema-registry
docker exec -it local-schema-registry bash

# avro console producer
# example "MyKey",{"id":"MyId","amount":10}
kafka-avro-console-producer --broker-list kafka:29092 \
  --topic example.with-schema.payment \
  --property schema.registry.url=http://schema-registry:8081 \
  --property parse.key=true \
  --property key.separator=, \
  --property key.schema='{"type":"string"}' \
  --property value.schema='{"namespace":"io.confluent.examples.clients.basicavro","type":"record","name":"Payment","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"}]}'

# avro console consumer
kafka-avro-console-consumer --bootstrap-server kafka:29092 \
  --topic example.with-schema.payment \
  --property schema.registry.url=http://schema-registry:8081 \
  --property schema.id.separator=: \
  --property print.key=true \
  --property print.schema.ids=true \
  --property key.separator=, \
  --from-beginning

# producer example
sbt "schema-registry/runMain com.kafka.demo.specific.Producer"

# consumer example
sbt "schema-registry/runMain com.kafka.demo.specific.Consumer"

# tests
sbt "schema-registry/test:testOnly *KafkaSchemaRegistrySpecificSpec"
# producer example
sbt "schema-registry/runMain com.kafka.demo.generic.Producer"

# consumer example
sbt "schema-registry/runMain com.kafka.demo.generic.Consumer"

# tests
sbt "schema-registry/test:testOnly *KafkaSchemaRegistryGenericSpec"

Resources

Alternatives

TODO

  • generic + schema evolution
  • ovotech
  • multi-schema
  • formulation

kafka-streams

Description

Kafka Streams apis examples

Demo-1

# access kafka
docker exec -it local-kafka bash

# create topic
# example [example.to-upper-case-app.input|example.to-upper-case-app.output]
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic <TOPIC_NAME>

# ToUpperCaseApp example (input topic required)
sbt "streams/runMain com.kafka.demo.streams.ToUpperCaseApp"

# produce
kafka-console-producer --broker-list kafka:9092 \
  --topic example.to-upper-case-app.input

# consume
kafka-console-consumer --bootstrap-server kafka:9092 \
  --topic example.to-upper-case-app.output

# test
sbt clean streams/test

Demo-2

Tested with embedded-kafka and embedded-kafka-schema-registry

# access kafka
docker exec -it local-kafka bash

# create topic
# example [json.streams-json-to-avro-app.input|avro.streams-json-to-avro-app.output]
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic <TOPIC_NAME>

# produce (default StringSerializer)
kafka-console-producer \
  --broker-list kafka:9092 \
  --property "parse.key=true" \
  --property "key.separator=:" \
  --property "key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer" \
  --property "value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer" \
  --topic <TOPIC_NAME>

# consume (default StringDeserializer)
kafka-console-consumer \
  --bootstrap-server kafka:9092 \
  --from-beginning \
  --property "print.key=true" \
  --property "key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer" \
  --property "value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer" \
  --topic <TOPIC_NAME>

# access schema-registry
docker exec -it local-schema-registry bash

# consume avro
kafka-avro-console-consumer --bootstrap-server kafka:29092 \
  --property schema.registry.url=http://schema-registry:8081 \
  --property schema.id.separator=: \
  --property print.key=true \
  --property print.schema.ids=true \
  --property key.separator=, \
  --from-beginning \
  --topic <TOPIC_NAME>

# JsonToAvroApp example (input topic required)
sbt "streams-json-avro/runMain com.kafka.demo.JsonToAvroApp"

# test
sbt clean streams-json-avro/test

Example

# json
mykey:{"valueInt":42,"valueString":"foo"}

# log
[json.streams-json-to-avro-app.input]: mykey, JsonModel(42,foo)
[avro.streams-json-to-avro-app.output]: KeyAvroModel(mykey), ValueAvroModel(42,FOO)

Demo-3

TODO

  • CatsKafkaStreamsApp [source]
# run app
sbt -jvm-debug 5005 "cats-kafka-streams/runMain com.kafka.demo.CatsKafkaStreamsApp"

Demo-4

TODO

# run app
sbt -jvm-debug 5005 "zio-kafka-streams/runMain com.kafka.demo.ZioKafkaStreamsApp"

Resources

ksql

Description

Setup Kafka

# access kafka
docker exec -it local-kafka bash

# create topic
kafka-topics --zookeeper zookeeper:2181 \
  --create --if-not-exists --replication-factor 1 --partitions 1 --topic USER_PROFILE

# produce sample data
kafka-console-producer --broker-list kafka:9092 --topic USER_PROFILE << EOF
{"userid": 1000, "firstname": "Alison", "lastname": "Smith", "countrycode": "GB", "rating": 4.7}
EOF

# consume
kafka-console-consumer --bootstrap-server kafka:9092 --topic USER_PROFILE --from-beginning

Access KSQL CLI

  • using the server

    # access ksql-server
    docker exec -it local-ksql-server bash
    
    # start ksql cli
    ksql http://ksql-server:8088
    
  • using a local instance

    # connect to local cli
    docker exec -it local-ksql-cli ksql http://ksql-server:8088
    
  • using a temporary instance

    # connect to remote server
    docker run --rm \
      --network=kafka-scala-examples_local_kafka_network \
      -it confluentinc/cp-ksql-cli http://ksql-server:8088
    

Execute SQL statements

# create stream
CREATE STREAM user_profile (\
  userid INT, \
  firstname VARCHAR, \
  lastname VARCHAR, \
  countrycode VARCHAR, \
  rating DOUBLE \
  ) WITH (KAFKA_TOPIC = 'USER_PROFILE', VALUE_FORMAT = 'JSON');

# verify stream
list streams;
describe user_profile;

# query stream
SELECT userid, firstname, lastname, countrycode, rating FROM user_profile EMIT CHANGES;

Expect the consumer and the query to show the generated data

# generate data
docker run --rm \
  -v $(pwd)/local/ksql:/datagen \
  --network=kafka-scala-examples_local_kafka_network \
  -it confluentinc/ksql-examples ksql-datagen \
  bootstrap-server=kafka:29092 \
  schemaRegistryUrl=http://schema-registry:8081 \
  schema=datagen/user_profile.avro \
  format=json \
  topic=USER_PROFILE \
  key=userid \
  maxInterval=5000 \
  iterations=100

kafka-connect

Setup PostgreSQL locally

# create shared network
docker-compose up

# start postgres
docker-compose -f docker-compose.postgres.yml up

# (mac|linux) view postgres ui
# [schema=public|database=postgres|username=postgres|password=postgres]
[open|xdg-open] http://localhost:8080

Setup connectors

# list connector
http -v :8083/connectors

# init data to generate schema
cp local/connect/data/resources-0.txt.orig local/connect/data/resources-0.txt

# setup spooldir source connector
http -v --json POST :8083/connectors < local/connect/config/source-spooldir-connector.json

# ingest data
echo "{\"accountId\":\"123\",\"resourceType\":\"XXX\",\"value\":\"X1\"}" > local/connect/data/resources-1.txt

# setup jdbc sink connector
# topic = SCHEMA.DATABASE = "public.postgres"
http -v --json POST :8083/connectors < local/connect/config/sink-jdbc-connector.json

# verify data
docker exec -it local-postgres bash -c "psql -U postgres postgres"
select * from public.postgres;

# cleanup
docker-compose -f docker-compose.postgres.yml down -v

extra

Resources

Tools

Companies

Popular Stream Projects
Popular Registry Projects
Popular Control Flow Categories
Related Searches

Get A Weekly Email With Trending Projects For These Categories
No Spam. Unsubscribe easily at any time.
Scala
Stream
Schema
Kafka
Registry
Avro
Confluent
Kafka Streams
Kafka Connect