Project Name | Stars | Downloads | Repos Using This | Packages Using This | Most Recent Commit | Total Releases | Latest Release | Open Issues | License | Language |
---|---|---|---|---|---|---|---|---|---|---|
Cookbook | 11,769 | 6 months ago | 110 | apache-2.0 | ||||||
The Data Engineering Cookbook | ||||||||||
God Of Bigdata | 8,483 | 2 months ago | 3 | |||||||
专注大数据学习面试,大数据成神之路开启。Flink/Spark/Hadoop/Hbase/Hive... | ||||||||||
Confluent Kafka Go | 4,135 | 432 | a day ago | 69 | July 12, 2023 | 239 | apache-2.0 | Go | ||
Confluent's Apache Kafka Golang client | ||||||||||
Strimzi Kafka Operator | 4,047 | 3 | 7 hours ago | 42 | July 28, 2023 | 135 | apache-2.0 | Java | ||
Apache Kafka® running on Kubernetes | ||||||||||
Kafkajs | 3,352 | 39 | 629 | 15 days ago | 299 | February 27, 2023 | 287 | mit | JavaScript | |
A modern Apache Kafka client for node.js | ||||||||||
Kafka Monitor | 1,946 | 6 months ago | 27 | apache-2.0 | Java | |||||
Xinfra Monitor monitors the availability of Kafka clusters by producing synthetic workloads using end-to-end pipelines to obtain derived vital statistics - E2E latency, service produce/consume availability, offsets commit availability & latency, message loss rate and more. | ||||||||||
Kaf | 1,913 | 3 | 16 days ago | 74 | July 05, 2023 | 57 | apache-2.0 | Go | ||
Modern CLI for Apache Kafka, written in Go. | ||||||||||
Karafka | 1,819 | 22 | 6 | 15 hours ago | 133 | August 28, 2023 | 28 | other | Ruby | |
Ruby and Rails efficient Kafka processing framework | ||||||||||
Killrweather | 1,174 | 7 years ago | 23 | apache-2.0 | Scala | |||||
KillrWeather is a reference application (work in progress) showing how to easily integrate streaming and batch data processing with Apache Spark Streaming, Apache Cassandra, Apache Kafka and Akka for fast, streaming computations on time series data in asynchronous event-driven environments. | ||||||||||
Broadcaster | 821 | 3 | 3 months ago | 3 | February 26, 2020 | 27 | bsd-3-clause | Python | ||
Broadcast channels for async web apps. 📢 |
Recordbus connects to a MySQL instance as a replicant and produces the replication events read onto an Apache Kafka topic. The events are produced as a JSON-serialized map, keyed by the server-id which produced the event.
Recordbus accepts a single argument, its configuration file which has the following format:
mysql.host=localhost
mysql.port=3306
mysql.user=replicant
mysql.password=replicant
topic=recordbus
bootstrap.servers=localhost:9092
You'll need leiningen to build the project, you can then
run lein uberjar
to build the project.
To run it, just issue java -jar target/recordbus-0.1.0-standalone.jar <config-file>
Pre-built JARs are available in https://github.com/pyr/recordbus/releases/
Each JSON payload in the stream contains a type
key which
determines the rest of the payload's shape.
These three fields will be present in all events:
type
: one of unknown
, start-v3
, query
, stop
, rotate
, intvar
,
load
, slave
, create-file
, append-block
, exec-load
, delete-file
, new-load
,
user-var
, format-description
, xid
, begin-load-query
, execute-load-query
,
table-map
, pre-ga-write-rows
, pre-ga-update-rows
, pre-ga-delete-rows
,
write-rows
, update-rows
, delete-rows
, incident
, heartbeat
, ignorable
,
rows-query
, ext-write-rows
, ext-update-rows
, ext-delete-rows
, gtid
,
anonymous-gtid
, previous-gtids
.timestamp
: the timestamp of the eventserver-id
: server-id from which this request originated, also used as the record key.Here are the key event-type-specific fields:
type | fields |
---|---|
format-description | binlog-version, server-version, header-length |
gtid | gtid, flags |
query | sql, error-code, database, exec-time |
rotate | binlog-filename, binlog-position |
rows-query | binlog-filename |
table-map | database, table, column-types, column-metadata, column-nullability |
update-rows | cols-old, cols-new, rows, table-id |
write-rows | cols, rows, table-id |
delete-rows | cols, rows, table-id |
xid | xid |
Assuming a client had the following conversation with a MySQL server:
create database foobar;
use foobar;
create table user (
id int primary key not null auto_increment,
login varchar(255),
name varchar(255)
);
insert into user(login,name) values('bob','bob');
insert into user(login,name) values('bill','bill');
update user set name="Billy Bob" where id=2;
delete from user where id=1;
The following JSON payloads would be produced in the kafka topic:
{"server-id":1,
"timestamp":0,
"binlog-position":574,
"binlog-filename":"mysql-bin.000064",
"type":"rotate"}
{"server-id":1,
"timestamp":1426511022000,
"header-length":19,
"server-version":"10.0.17-MariaDB-log",
"binlog-version":4,
"type":"format-description"}
{"server-id":1,
"timestamp":1426512246000,
"exec-time":0,
"database":"",
"error-code":0,
"sql":"# Dum",
"type":"query"}
{"server-id":1,
"timestamp":1426512246000,
"exec-time":0,
"database":"foobar",
"error-code":0,
"sql":"create database foobar",
"type":"query"}
{"server-id":1,
"timestamp":1426512336000,
"exec-time":0,
"database":"",
"error-code":0,
"sql":"# Dum",
"type":"query"}
{"server-id":1,
"timestamp":1426512336000,
"exec-time":0,
"database":"foobar",
"error-code":0,
"sql":"create table user ( id int primary key not null auto_increment, login varchar(255), name varchar(255))",
"type":"query"}
{"server-id":1,
"timestamp":1426512368000,
"exec-time":0,
"database":"",
"error-code":0,
"sql":"BEGIN",
"type":"query"}
{"server-id":1,
"timestamp":1426512368000,
"column-nullability":[1,2],
"column-metadata":[0,765,765],
"column-types":[3,15,15],
"table":"user",
"database":"foobar",
"type":"table-map"}
{"server-id":1,
"timestamp":1426512368000,
"table-id":73,
"rows":[["1","bob","bob"]],
"cols":[0,1,2],
"type":"write-rows"}
{"server-id":1,
"timestamp":1426512368000,
"xid":32,
"type":"xid"}
{"server-id":1,
"timestamp":1426512383000,
"exec-time":0,
"database":"",
"error-code":0,
"sql":"BEGIN",
"type":"query"}
{"server-id":1,
"timestamp":1426512383000,
"column-nullability":[1,2],
"column-metadata":[0,765,765],
"column-types":[3,15,15],
"table":"user",
"database":"foobar",
"type":"table-map"}
{"server-id":1,
"timestamp":1426512383000,
"table-id":73,
"rows":[["2","bill","bill"]],
"cols":[0,1,2],
"type":"write-rows"}
{"server-id":1,
"timestamp":1426512383000,
"xid":33,
"type":"xid"}
{"server-id":1,
"timestamp":1426512399000,
"exec-time":0,
"database":"",
"error-code":0,
"sql":"BEGIN",
"type":"query"}
{"server-id":1,
"timestamp":1426512399000,
"column-nullability":[1,2],
"column-metadata":[0,765,765],
"column-types":[3,15,15],
"table":"user",
"database":"foobar",
"type":"table-map"}
{"server-id":1,
"timestamp":1426512399000,
"table-id":73,
"rows":[[["2","bill","bill"],["2","bill","Billy Bob"]]],
"cols-new":[0,1,2],
"cols-old":[0,1,2],
"type":"update-rows"}
{"server-id":1,
"timestamp":1426512399000,
"xid":34,
"type":"xid"}
{"server-id":1,
"timestamp":1426512411000,
"exec-time":0,
"database":"",
"error-code":0,
"sql":"BEGIN",
"type":"query"}
{"server-id":1,
"timestamp":1426512411000,
"column-nullability":[1,2],
"column-metadata":[0,765,765],
"column-types":[3,15,15],
"table":"user",
"database":"foobar",
"type":"table-map"}
{"server-id":1,
"timestamp":1426512411000,
"table-id":73,
"rows":[["1","bob","bob"]],
"cols":[0,1,2],
"type":"delete-rows"}
{"server-id":1,
"timestamp":1426512411000,
"xid":35,
"type":"xid"}
Copyright 2015 Pierre-Yves Ritschard [email protected]
Permission to use, copy, modify, and distribute this software for any purpose with or without fee is hereby granted, provided that the above notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.