Kafka Connect Lambda

A Kafka Connect sink plugin to invoke AWS Lambda functions
Alternatives To Kafka Connect Lambda
Project NameStarsDownloadsRepos Using ThisPackages Using ThisMost Recent CommitTotal ReleasesLatest ReleaseOpen IssuesLicenseLanguage
Practical.cleanarchitecture1,456
11 days ago21C#
Full-stack .Net 7 Clean Architecture (Microservices + Dapr, Modular Monolith, Monolith), Blazor, Angular 15, React 18, Vue 3, Domain-Driven Design, CQRS, SOLID, Asp.Net Core Identity Custom Storage, Identity Server, Entity Framework Core, Selenium, SignalR, Hosted Services, Health Checks, Rate Limiting, Cloud (Azure, AWS) Services, ...
Seldon Server1,420
3 years ago44June 28, 201726apache-2.0Java
Machine Learning Platform and Recommendation Engine built on Kubernetes
Eventhorizon1,41851314 days ago68January 03, 202246apache-2.0Go
Event Sourcing for Go!
Nagios Plugins1,091
3 months ago55otherPython
450+ AWS, Hadoop, Cloud, Kafka, Docker, Elasticsearch, RabbitMQ, Redis, HBase, Solr, Cassandra, ZooKeeper, HDFS, Yarn, Hive, Presto, Drill, Impala, Consul, Spark, Jenkins, Travis CI, Git, MySQL, Linux, DNS, Whois, SSL Certs, Yum Security Updates, Kubernetes, Cloudera etc...
Devops Bash Tools1,079
21 days ago1mitShell
800+ DevOps Bash Scripts - AWS, GCP, Kubernetes, Docker, CI/CD, APIs, SQL, PostgreSQL, MySQL, Hive, Impala, Kafka, Hadoop, Jenkins, GitHub, GitLab, BitBucket, Azure DevOps, TeamCity, Spotify, MP3, LDAP, Code/Build Linting, pkg mgmt for Linux, Mac, Python, Perl, Ruby, NodeJS, Golang, Advanced dotfiles: .bashrc, .vimrc, .gitconfig, .screenrc, .tmux.conf, .psqlrc ...
Sitewhere854619a year ago18June 19, 2017112otherJava
SiteWhere is an industrial strength open-source application enablement platform for the Internet of Things (IoT). It provides a multi-tenant microservice-based infrastructure that includes device/asset management, data ingestion, big-data storage, and integration through a modern, scalable architecture. SiteWhere provides REST APIs for all system functionality. SiteWhere provides SDKs for many common device platforms including Android, iOS, Arduino, and any Java-capable platform such as Raspberry Pi rapidly accelerating the speed of innovation.
Chronos568
a month ago21mitTypeScript
📊 📊 📊 Monitors the health and web traffic of servers, microservices, Kubernetes/Kafka clusters, containers, and AWS services with real-time data monitoring and receive automated notifications over Slack or email.
Agile_data_code_2435
8 days ago7mitJupyter Notebook
Code for Agile Data Science 2.0, O'Reilly 2017, Second Edition
Kafka Connect Storage Cloud231
21 hours ago152otherJava
Kafka Connect suite of connectors for Cloud storage (Amazon S3)
Firecamp188
3 years ago8May 08, 201822apache-2.0Go
Serverless Platform for the stateful services
Alternatives To Kafka Connect Lambda
Select To Compare


Alternative Project Comparisons
Readme

kafka-connect-lambda

A Kafka Connect sink plugin to invoke AWS Lambda functions.

Compatibility Matrix

kafka-connect-lambda Kafka Connect API AWS SDK
1.1.0 2.2.0 1.11.592
1.1.1 2.2.0 1.11.592
1.2.0 2.3.0 1.11.651
1.3.0 2.8.1 1.11.1034

Due to a compatibility issue with Apache httpcomponents, connector versions 1.1.1 and earlier may not work with Kafka Connect versions greater than 2.2

Building

Build the connector with Maven using the standard lifecycle goals:

mvn clean
mvn package

Configuring

In addition to the standard Kafka Connect connector configuration properties, the kafka-connect-lambda properties available are:

Property Required Default value Description
aws.credentials.provider.class No Default AWS provider chain Class name of an AWSCredentialsProvider implementation
aws.lambda.function.arn Yes Full ARN of the Lambda function
aws.lambda.invocation.timeout.ms No 300000 Time to wait for a lambda invocation before continuing
aws.lambda.invocation.mode No SYNC SYNC for a synchronous invocation; otherwise ASYNC
aws.lambda.invocation.failure.mode No STOP Whether to STOP processing, or DROP and continue after an invocation failure
aws.lambda.batch.enabled No true true to batch messages together before an invocation; otherwise false
aws.region Yes AWS region of the Lambda function
http.proxy.host No HTTP proxy host name
http.proxy.port No HTTP proxy port number
retriable.error.codes No 500,503,504 HTTP status codes that will trigger an invocation retry
retry.backoff.millis No 500 Time to append between invocation retries
retries.max No 5 Maximum number of invocation retries
topics Yes Comma-delimited Kafka topics names to sink
payload.formatter.class No com.nordstrom.kafka.connect.formatters.PlainPayloadFormatter Specifies the formatter to use.
payload.formatter.key.schema.visibility No min Determines whether schema (if present) is included. Only applies to JsonPayloadFormatter
payload.formatter.value.schema.visibility No min Determines whether schema (if present) is included. Only applies to JsonPayloadFormatter

Formatters

The connector includes two payload.formatter.class implementations:

  • com.nordstrom.kafka.connect.formatters.PlainPayloadFormatter
  • com.nordstrom.kafka.connect.formatters.JsonPayloadFormatter

Including the full schema information in the invocation payload may result in very large messages. Therefore, use the schema.visibility key and value properties to control how much of the schema, if present, to include in the invocation payload: none, min, or all (default=min). These settings apply to the JsonPayloadFormatter only; The PlainPayloadFormatter always includes the min schema information.

Configuration Examples

An example configuration represented as JSON data for use with the Kafka Connect REST interface:

{
  "name": "example-lambda-connector",
  "config": {
    "tasks.max": "1",
    "connector.class": "com.nordstrom.kafka.connect.lambda.LambdaSinkConnector",
    "topics": "<Your Kafka topics>",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "aws.region": "<Your AWS region>",
    "aws.lambda.function.arn": "<Your function ARN>",
    "aws.lambda.batch.enabled": "false"
  }
}

IAM assume-role options

By supplying com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider as the aws.credentials.provider.class configuration, the connector can assume an IAM Role. The role must include a policy that allows lambda:InvokeFunction and lambda:InvokeAsync actions.

Property Required Description
aws.credentials.provider.role.arn Yes Full ARN of the IAM Role to assume
aws.credentials.provider.session.name Yes Name that uniquely identifies a session while the role is being assumed
aws.credentials.provider.external.id No External identifier used by the kafka-connect-lambda when assuming the role

Invocation payloads

The default invocation payload is a JSON representation of a SinkRecord object, which contains the Kafka message in the value field. When aws.lambda.batch.enabled is true, the invocation payload is an array of these records.

Avro schema

This simple schema record describes our "hello, world" message.

{
  "type": "record",
  "name": "Hello",
  "doc": "An example Avro-encoded `Hello` message.",
  "namespace": "com.nordstrom.kafka.example",
  "fields": [
    {
      "name": "language",
      "type": {
        "type": "enum",
        "name": "language",
        "symbols": [ "ENGLISH", "FRENCH", "ITALIAN", "SPANISH"
        ]
      }
    },
    {
      "name": "greeting",
      "type": "string"
    }
  ]
}

PlainPayloadFormatter

This example uses the following (partial) connector configuration which defaults to payload.formatter=com.nordstrom.kafka.connect.formatters.PlainPayloadFormatter:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
aws.lambda.batch.enabled=false

Expected output:

{
    "key": "my_key",
    "keySchemaName": null,
    "value": "Struct{language=ENGLISH,greeting=hello, world}",
    "valueSchemaName": "com.nordstrom.kafka.example.Hello",
    "topic": "example-stream",
    "partition": 1,
    "offset": 0,
    "timestamp": 1567723257583,
    "timestampTypeName": "CreateTime"
}

JsonPayloadFormatter

This example uses the following (partial) connector configuration with key and value schema visibility as min (the default):

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
aws.lambda.batch.enabled=false
payload.formatter.class=com.nordstrom.kafka.connect.formatters.JsonPayloadFormatter

Expected output:

{
    "key": "my_key",
    "keySchemaName": null,
    "keySchemaVersion": null,
    "value": {
        "language": "ENGLISH",
        "greeting": "hello, world"
    },
    "valueSchemaName": "com.nordstrom.kafka.example.Hello",
    "valueSchemaVersion": "1",
    "topic": "example-stream",
    "partition": 1,
    "offset": 0,
    "timestamp": 1567723257583,
    "timestampTypeName": "CreateTime"
}

Try the example demo

Follow the demo in order to: create an AWS Lambda function, build the connector plugin, run the connector, and send a message.

Create an AWS Lambda function

With an active AWS account, can create a simple AWS Lambda function using the CloudFormation template in the config/ directory:

aws cloudformation create-stack \
  --stack-name example-lambda-stack \
  --capabilities CAPABILITY_NAMED_IAM \
  --template-body file://config/cloudformation.yml

To make sure our Lambda works, invoke it directly and view the result payload in result.txt:

aws lambda invoke --function-name example-function --payload '{"value": "my example"}' result.txt

The function simply sends the payload back to you in result.txt as serialized json.

Use the describe-stacks command to fetch the CloudFormation output value for ExampleFunctionArn, which we'll need later when setting up our connector configuration:

aws cloudformation describe-stacks --stack-name example-lambda-stack --query "Stacks[0].Outputs[]"

Build the connector plugin

mvn clean package

Once built, a kafka-connect-lambda uber-jar is in the target/plugin directory.

Run the connector using Docker Compose

Ensure you have AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables exported in your shell. Docker Compose will pass these values into the connect container.

Use the provided Docker Compose file and run docker-compose up.

With the Kafka Connect REST interface, verify the Lambda sink connector is installed and ready: curl http://localhost:8083/connector-plugins.

Next, supply a connector configuration. You can use config/connector.json.example as a starting-point. Fill in values for <Your AWS Region> and <Your function ARN> and run:

curl -XPOST -H 'Content-Type: application/json' http://localhost:8083/connectors -d @config/connector.json

Run the connector using the Confluent Platform

Run the ZooKeeper and Kafka components from the Confluent Platform.

Next, configure a Java properties-file containing your connector configuration. You can use config/connector.properties.example as a starting-point. Fill in values for <Your AWS Region> and <Your function ARN>.

Ensure you have AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables exported in your shell. Then, run the connector in "standalone-mode":

connect-standalone config/worker.properties config/connector.properties

Send messages

Using the Kafka console producer, send a message to the example-stream topic. Your example-lambda-connector will read the message from the topic and invoke the AWS Lambda example-function.

Use the AWS Console to read the output of your message sent from the CloudWatch logs for the Lambda.

Popular Kafka Projects
Popular Amazon Web Services Projects
Popular Data Processing Categories
Related Searches

Get A Weekly Email With Trending Projects For These Categories
No Spam. Unsubscribe easily at any time.
Java
Amazon Web Services
Role
Schema
Lambda Functions
Kafka
Aws Lambda
Kafka Connect