Awesome Open Source
Awesome Open Source

sequelize-kafka-connect

Node.js Kafka Connect connector for MySQL, Postgres, SQLite and MSSQL databases

Build Status

Coverage Status

Use API

npm install --save sequelize-kafka-connect

A note on native mode

If you are using the native mode (config: { noptions: {} }). You will have to manually install node-rdkafka alongside kafka-connect. (This requires a Node.js version between 9 and 12 and will not work with Node.js >= 13, last tested with 12.16.1)

On Mac OS High Sierra / Mojave: CPPFLAGS=-I/usr/local/opt/openssl/include LDFLAGS=-L/usr/local/opt/openssl/lib yarn add --frozen-lockfile [email protected]

Otherwise: yarn add --frozen-lockfile [email protected]

(Please also note: Doing this with npm does not work, it will remove your deps, npm i -g yarn)

database -> kafka

const { runSourceConnector } = require("sequelize-kafka-connect");
runSourceConnector(config, [], onError).then(config => {
    //runs forever until: config.stop();
});

kafka -> database

const { runSinkConnector } = require("sequelize-kafka-connect");
runSinkConnector(config, [], onError).then(config => {
    //runs forever until: config.stop();
});

kafka -> database (with custom topic (no source-task topic))

const { runSinkConnector, ConverterFactory } = require("sequelize-kafka-connect");

const tableSchema = {
    "id": {
        "type": "integer",
        "allowNull": false,
        "primaryKey": true
    },
    "name": {
        "type": "varchar(255)",
        "allowNull": true
    }
};

const etlFunc = (messageValue, callback) => {

    //type is an example json format field
    if (messageValue.type === "publish") {
        return callback(null, {
            id: messageValue.payload.id,
            name: messageValue.payload.name
        });
    }

    if (messageValue.type === "unpublish") {
        return callback(null, null); //null value will cause deletion
    }

    callback(new Error("unknown messageValue.type"));
};

const converter = ConverterFactory.createSinkSchemaConverter(tableSchema, etlFunc);

runSinkConnector(config, [converter], onError).then(config => {
    //runs forever until: config.stop();
});

/*
    this example would be able to store kafka message values
    that look like this (so completely unrelated to messages created by a default SourceTask)
    {
        payload: {
            id: 123,
            name: "bla"
        },
        type: "publish"
    }
*/

Use CLI

note: in BETA 🌱

npm install -g sequelize-kafka-connect
# run source etl: database -> kafka
nkc-sequelize-source --help
# run sink etl: kafka -> database
nkc-sequelize-sink --help

Config(uration)

const config = {
    kafka: {
        //zkConStr: "localhost:2181/",
        kafkaHost: "localhost:9092",
        logger: null,
        groupId: "kc-sequelize-test",
        clientName: "kc-sequelize-test-name",
        workerPerPartition: 1,
        options: {
            sessionTimeout: 8000,
            protocol: ["roundrobin"],
            fromOffset: "earliest", //latest
            fetchMaxBytes: 1024 * 100,
            fetchMinBytes: 1,
            fetchMaxWaitMs: 10,
            heartbeatInterval: 250,
            retryMinTimeout: 250,
            requireAcks: 1,
            //ackTimeoutMs: 100,
            //partitionerType: 3
        }
    },
    topic: "sc_test_topic",
    partitions: 1,
    maxTasks: 1,
    pollInterval: 2000,
    produceKeyed: true,
    produceCompressionType: 0,
    connector: {
        options: {
            host: "localhost",
            port: 5432,
            dialect: "sqlite",
            pool: {
                max: 5,
                min: 0,
                idle: 10000
            },
            storage: path.join(__dirname, "test-db.sqlite")
        },
        database: null,
        user: null,
        password: null,
        maxPollCount: 50,
        table: "accounts",
        incrementingColumnName: "id"
    },
    http: {
        port: 3149,
        middlewares: []
    },
    enableMetrics: true,
    batch: {
        batchSize: 100, 
        commitEveryNBatch: 1, 
        concurrency: 1,
        commitSync: true
    }
};

Native Clients Config(uration)

const config = {
   
    kafka: {
        noptions: {
            "metadata.broker.list": "localhost:9092",
            "group.id": "n-test-group",
            "enable.auto.commit": false,
            "debug": "all",
            "event_cb": true,
            "client.id": "kcs-test"
        },
        tconf: {
            "auto.offset.reset": "earliest",
            "request.required.acks": 1
        }
    },
   
    topic: "sc_test_topic",
    partitions: 1,
    maxTasks: 1,
    pollInterval: 2000,
    produceKeyed: true,
    produceCompressionType: 0,
    connector: {
        options: {
            host: "localhost",
            port: 5432,
            dialect: "sqlite",
            pool: {
                max: 5,
                min: 0,
                idle: 10000
            },
            storage: path.join(__dirname, "test-db.sqlite")
        },
        database: null,
        user: null,
        password: null,
        maxPollCount: 50,
        table: "accounts",
        incrementingColumnName: "id"
    },
    http: {
        port: 3149,
        middlewares: []
    },
    enableMetrics: true
};
Alternatives To Sequelize Kafka Connect
Select To Compare


Alternative Project Comparisons
Related Awesome Lists
Top Programming Languages
Top Projects

Get A Weekly Email With Trending Projects For These Topics
No Spam. Unsubscribe easily at any time.
Javascript (1,060,850
Mysql (31,092
Postgres (23,413
Sqlite (9,857
Kafka (9,567
Connect (8,707
Sequelize (3,078
Etl (2,385
Mssql (1,425
Kafka Connect (369