Recordbus

recordbus: mysql binlog to apache kafka
Alternatives To Recordbus
Project NameStarsDownloadsRepos Using ThisPackages Using ThisMost Recent CommitTotal ReleasesLatest ReleaseOpen IssuesLicenseLanguage
Cookbook11,769
6 months ago110apache-2.0
The Data Engineering Cookbook
God Of Bigdata8,483
2 months ago3
专注大数据学习面试,大数据成神之路开启。Flink/Spark/Hadoop/Hbase/Hive...
Confluent Kafka Go4,135432a day ago69July 12, 2023239apache-2.0Go
Confluent's Apache Kafka Golang client
Strimzi Kafka Operator4,04737 hours ago42July 28, 2023135apache-2.0Java
Apache Kafka® running on Kubernetes
Kafkajs3,3523962915 days ago299February 27, 2023287mitJavaScript
A modern Apache Kafka client for node.js
Kafka Monitor1,946
6 months ago27apache-2.0Java
Xinfra Monitor monitors the availability of Kafka clusters by producing synthetic workloads using end-to-end pipelines to obtain derived vital statistics - E2E latency, service produce/consume availability, offsets commit availability & latency, message loss rate and more.
Kaf1,913316 days ago74July 05, 202357apache-2.0Go
Modern CLI for Apache Kafka, written in Go.
Karafka1,81922615 hours ago133August 28, 202328otherRuby
Ruby and Rails efficient Kafka processing framework
Killrweather1,174
7 years ago23apache-2.0Scala
KillrWeather is a reference application (work in progress) showing how to easily integrate streaming and batch data processing with Apache Spark Streaming, Apache Cassandra, Apache Kafka and Akka for fast, streaming computations on time series data in asynchronous event-driven environments.
Broadcaster82133 months ago3February 26, 202027bsd-3-clausePython
Broadcast channels for async web apps. 📢
Alternatives To Recordbus
Select To Compare


Alternative Project Comparisons
Readme

recordbus: stream SQL replication events to Apache Kafka.

Build Status

Recordbus connects to a MySQL instance as a replicant and produces the replication events read onto an Apache Kafka topic. The events are produced as a JSON-serialized map, keyed by the server-id which produced the event.

Configuration

Recordbus accepts a single argument, its configuration file which has the following format:

mysql.host=localhost
mysql.port=3306
mysql.user=replicant
mysql.password=replicant
topic=recordbus
bootstrap.servers=localhost:9092

Building and running

You'll need leiningen to build the project, you can then run lein uberjar to build the project.

To run it, just issue java -jar target/recordbus-0.1.0-standalone.jar <config-file>

Pre-built JARs are available in https://github.com/pyr/recordbus/releases/

Use-cases

  • Live cache updates from MySQL
  • Materialized views from MySQL events

Event Types

Each JSON payload in the stream contains a type key which determines the rest of the payload's shape.

These three fields will be present in all events:

  • type: one of unknown, start-v3, query, stop, rotate, intvar, load, slave, create-file, append-block, exec-load, delete-file, new-load, user-var, format-description, xid, begin-load-query, execute-load-query, table-map, pre-ga-write-rows, pre-ga-update-rows, pre-ga-delete-rows, write-rows, update-rows, delete-rows, incident, heartbeat, ignorable, rows-query, ext-write-rows, ext-update-rows, ext-delete-rows, gtid, anonymous-gtid, previous-gtids.
  • timestamp: the timestamp of the event
  • server-id: server-id from which this request originated, also used as the record key.

Here are the key event-type-specific fields:

type fields
format-description binlog-version, server-version, header-length
gtid gtid, flags
query sql, error-code, database, exec-time
rotate binlog-filename, binlog-position
rows-query binlog-filename
table-map database, table, column-types, column-metadata, column-nullability
update-rows cols-old, cols-new, rows, table-id
write-rows cols, rows, table-id
delete-rows cols, rows, table-id
xid xid

Assuming a client had the following conversation with a MySQL server:

create database foobar;
use foobar;
create table user (
  id int primary key not null auto_increment,
  login varchar(255),
  name varchar(255)
);
insert into user(login,name) values('bob','bob');
insert into user(login,name) values('bill','bill');
update user set name="Billy Bob" where id=2;
delete from user where id=1;

The following JSON payloads would be produced in the kafka topic:

Preamble

{"server-id":1,
 "timestamp":0,
 "binlog-position":574,
 "binlog-filename":"mysql-bin.000064",
 "type":"rotate"}
{"server-id":1,
 "timestamp":1426511022000,
 "header-length":19,
 "server-version":"10.0.17-MariaDB-log",
 "binlog-version":4,
 "type":"format-description"}

Create database and Create table

{"server-id":1,
 "timestamp":1426512246000,
 "exec-time":0,
 "database":"",
 "error-code":0,
 "sql":"# Dum",
 "type":"query"}
{"server-id":1,
 "timestamp":1426512246000,
 "exec-time":0,
 "database":"foobar",
 "error-code":0,
 "sql":"create database foobar",
 "type":"query"}
{"server-id":1,
 "timestamp":1426512336000,
 "exec-time":0,
 "database":"",
 "error-code":0,
 "sql":"# Dum",
 "type":"query"}
{"server-id":1,
 "timestamp":1426512336000,
 "exec-time":0,
 "database":"foobar",
 "error-code":0,
 "sql":"create table user ( id int primary key not null auto_increment, login varchar(255), name varchar(255))",
 "type":"query"}

Inserts

{"server-id":1,
 "timestamp":1426512368000,
 "exec-time":0,
 "database":"",
 "error-code":0,
 "sql":"BEGIN",
 "type":"query"}
{"server-id":1,
 "timestamp":1426512368000,
 "column-nullability":[1,2],
 "column-metadata":[0,765,765],
 "column-types":[3,15,15],
 "table":"user",
 "database":"foobar",
 "type":"table-map"}
{"server-id":1,
 "timestamp":1426512368000,
 "table-id":73,
 "rows":[["1","bob","bob"]],
 "cols":[0,1,2],
 "type":"write-rows"}
{"server-id":1,
 "timestamp":1426512368000,
 "xid":32,
 "type":"xid"}

{"server-id":1,
 "timestamp":1426512383000,
 "exec-time":0,
 "database":"",
 "error-code":0,
 "sql":"BEGIN",
 "type":"query"}
{"server-id":1,
 "timestamp":1426512383000,
 "column-nullability":[1,2],
 "column-metadata":[0,765,765],
 "column-types":[3,15,15],
 "table":"user",
 "database":"foobar",
 "type":"table-map"}
{"server-id":1,
 "timestamp":1426512383000,
 "table-id":73,
 "rows":[["2","bill","bill"]],
 "cols":[0,1,2],
 "type":"write-rows"}
{"server-id":1,
 "timestamp":1426512383000,
 "xid":33,
 "type":"xid"}

Updates

{"server-id":1,
 "timestamp":1426512399000,
 "exec-time":0,
 "database":"",
 "error-code":0,
 "sql":"BEGIN",
 "type":"query"}
{"server-id":1,
 "timestamp":1426512399000,
 "column-nullability":[1,2],
 "column-metadata":[0,765,765],
 "column-types":[3,15,15],
 "table":"user",
 "database":"foobar",
 "type":"table-map"}
{"server-id":1,
 "timestamp":1426512399000,
 "table-id":73,
 "rows":[[["2","bill","bill"],["2","bill","Billy Bob"]]],
 "cols-new":[0,1,2],
 "cols-old":[0,1,2],
 "type":"update-rows"}
{"server-id":1,
 "timestamp":1426512399000,
 "xid":34,
 "type":"xid"}

Deletes

{"server-id":1,
 "timestamp":1426512411000,
 "exec-time":0,
 "database":"",
 "error-code":0,
 "sql":"BEGIN",
 "type":"query"}
{"server-id":1,
 "timestamp":1426512411000,
 "column-nullability":[1,2],
 "column-metadata":[0,765,765],
 "column-types":[3,15,15],
 "table":"user",
 "database":"foobar",
 "type":"table-map"}
{"server-id":1,
 "timestamp":1426512411000,
 "table-id":73,
 "rows":[["1","bob","bob"]],
 "cols":[0,1,2],
 "type":"delete-rows"}
{"server-id":1,
 "timestamp":1426512411000,
 "xid":35,
 "type":"xid"}

Caveats

  • No support for keeping track of offsets
  • No configurable key or serialization
  • MySQL only (a PostgreSQL implementation would be nice)

License

Copyright 2015 Pierre-Yves Ritschard [email protected]

Permission to use, copy, modify, and distribute this software for any purpose with or without fee is hereby granted, provided that the above notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

Popular Kafka Projects
Popular Apache Projects
Popular Data Processing Categories

Get A Weekly Email With Trending Projects For These Categories
No Spam. Unsubscribe easily at any time.
Mysql
Table
Clojure
Apache
Kafka
Replication