Project Name | Stars | Downloads | Repos Using This | Packages Using This | Most Recent Commit | Total Releases | Latest Release | Open Issues | License | Language |
---|---|---|---|---|---|---|---|---|---|---|
Practical.cleanarchitecture | 1,456 | 11 days ago | 21 | C# | ||||||
Full-stack .Net 7 Clean Architecture (Microservices + Dapr, Modular Monolith, Monolith), Blazor, Angular 15, React 18, Vue 3, Domain-Driven Design, CQRS, SOLID, Asp.Net Core Identity Custom Storage, Identity Server, Entity Framework Core, Selenium, SignalR, Hosted Services, Health Checks, Rate Limiting, Cloud (Azure, AWS) Services, ... | ||||||||||
Seldon Server | 1,420 | 3 years ago | 44 | June 28, 2017 | 26 | apache-2.0 | Java | |||
Machine Learning Platform and Recommendation Engine built on Kubernetes | ||||||||||
Eventhorizon | 1,418 | 5 | 13 | 14 days ago | 68 | January 03, 2022 | 46 | apache-2.0 | Go | |
Event Sourcing for Go! | ||||||||||
Nagios Plugins | 1,091 | 3 months ago | 55 | other | Python | |||||
450+ AWS, Hadoop, Cloud, Kafka, Docker, Elasticsearch, RabbitMQ, Redis, HBase, Solr, Cassandra, ZooKeeper, HDFS, Yarn, Hive, Presto, Drill, Impala, Consul, Spark, Jenkins, Travis CI, Git, MySQL, Linux, DNS, Whois, SSL Certs, Yum Security Updates, Kubernetes, Cloudera etc... | ||||||||||
Devops Bash Tools | 1,079 | 21 days ago | 1 | mit | Shell | |||||
800+ DevOps Bash Scripts - AWS, GCP, Kubernetes, Docker, CI/CD, APIs, SQL, PostgreSQL, MySQL, Hive, Impala, Kafka, Hadoop, Jenkins, GitHub, GitLab, BitBucket, Azure DevOps, TeamCity, Spotify, MP3, LDAP, Code/Build Linting, pkg mgmt for Linux, Mac, Python, Perl, Ruby, NodeJS, Golang, Advanced dotfiles: .bashrc, .vimrc, .gitconfig, .screenrc, .tmux.conf, .psqlrc ... | ||||||||||
Sitewhere | 854 | 6 | 19 | a year ago | 18 | June 19, 2017 | 112 | other | Java | |
SiteWhere is an industrial strength open-source application enablement platform for the Internet of Things (IoT). It provides a multi-tenant microservice-based infrastructure that includes device/asset management, data ingestion, big-data storage, and integration through a modern, scalable architecture. SiteWhere provides REST APIs for all system functionality. SiteWhere provides SDKs for many common device platforms including Android, iOS, Arduino, and any Java-capable platform such as Raspberry Pi rapidly accelerating the speed of innovation. | ||||||||||
Chronos | 568 | a month ago | 21 | mit | TypeScript | |||||
📊 📊 📊 Monitors the health and web traffic of servers, microservices, Kubernetes/Kafka clusters, containers, and AWS services with real-time data monitoring and receive automated notifications over Slack or email. | ||||||||||
Agile_data_code_2 | 435 | 8 days ago | 7 | mit | Jupyter Notebook | |||||
Code for Agile Data Science 2.0, O'Reilly 2017, Second Edition | ||||||||||
Kafka Connect Storage Cloud | 231 | 21 hours ago | 152 | other | Java | |||||
Kafka Connect suite of connectors for Cloud storage (Amazon S3) | ||||||||||
Firecamp | 188 | 3 years ago | 8 | May 08, 2018 | 22 | apache-2.0 | Go | |||
Serverless Platform for the stateful services |
A Kafka Connect sink plugin to invoke AWS Lambda functions.
kafka-connect-lambda | Kafka Connect API | AWS SDK |
---|---|---|
1.1.0 | 2.2.0 | 1.11.592 |
1.1.1 | 2.2.0 | 1.11.592 |
1.2.0 | 2.3.0 | 1.11.651 |
1.3.0 | 2.8.1 | 1.11.1034 |
Due to a compatibility issue with Apache httpcomponents, connector versions 1.1.1 and earlier may not work with Kafka Connect versions greater than 2.2
Build the connector with Maven using the standard lifecycle goals:
mvn clean
mvn package
In addition to the standard Kafka Connect connector configuration properties, the kafka-connect-lambda
properties available are:
Property | Required | Default value | Description |
---|---|---|---|
aws.credentials.provider.class |
No | Default AWS provider chain | Class name of an AWSCredentialsProvider implementation |
aws.lambda.function.arn |
Yes | Full ARN of the Lambda function | |
aws.lambda.invocation.timeout.ms |
No | 300000 |
Time to wait for a lambda invocation before continuing |
aws.lambda.invocation.mode |
No | SYNC |
SYNC for a synchronous invocation; otherwise ASYNC
|
aws.lambda.invocation.failure.mode |
No | STOP |
Whether to STOP processing, or DROP and continue after an invocation failure |
aws.lambda.batch.enabled |
No | true |
true to batch messages together before an invocation; otherwise false
|
aws.region |
Yes | AWS region of the Lambda function | |
http.proxy.host |
No | HTTP proxy host name | |
http.proxy.port |
No | HTTP proxy port number | |
retriable.error.codes |
No | 500,503,504 |
HTTP status codes that will trigger an invocation retry |
retry.backoff.millis |
No | 500 |
Time to append between invocation retries |
retries.max |
No | 5 |
Maximum number of invocation retries |
topics |
Yes | Comma-delimited Kafka topics names to sink | |
payload.formatter.class |
No | com.nordstrom.kafka.connect.formatters.PlainPayloadFormatter |
Specifies the formatter to use. |
payload.formatter.key.schema.visibility |
No | min |
Determines whether schema (if present) is included. Only applies to JsonPayloadFormatter |
payload.formatter.value.schema.visibility |
No | min |
Determines whether schema (if present) is included. Only applies to JsonPayloadFormatter |
The connector includes two payload.formatter.class
implementations:
com.nordstrom.kafka.connect.formatters.PlainPayloadFormatter
com.nordstrom.kafka.connect.formatters.JsonPayloadFormatter
Including the full schema information in the invocation payload may result in very large messages. Therefore, use the schema.visibility
key and value properties to control how much of the schema, if present, to include in the invocation payload: none
, min
, or all
(default=min
). These settings apply to the JsonPayloadFormatter
only; The PlainPayloadFormatter
always includes the min
schema information.
An example configuration represented as JSON data for use with the Kafka Connect REST interface:
{
"name": "example-lambda-connector",
"config": {
"tasks.max": "1",
"connector.class": "com.nordstrom.kafka.connect.lambda.LambdaSinkConnector",
"topics": "<Your Kafka topics>",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"aws.region": "<Your AWS region>",
"aws.lambda.function.arn": "<Your function ARN>",
"aws.lambda.batch.enabled": "false"
}
}
By supplying com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider
as the aws.credentials.provider.class
configuration, the connector can assume an IAM Role. The role must include a policy that allows lambda:InvokeFunction
and lambda:InvokeAsync
actions.
Property | Required | Description |
---|---|---|
aws.credentials.provider.role.arn |
Yes | Full ARN of the IAM Role to assume |
aws.credentials.provider.session.name |
Yes | Name that uniquely identifies a session while the role is being assumed |
aws.credentials.provider.external.id |
No | External identifier used by the kafka-connect-lambda when assuming the role |
The default invocation payload is a JSON representation of a SinkRecord object, which contains the Kafka message in the value
field. When aws.lambda.batch.enabled
is true
, the invocation payload is an array of these records.
This simple schema record describes our "hello, world" message.
{
"type": "record",
"name": "Hello",
"doc": "An example Avro-encoded `Hello` message.",
"namespace": "com.nordstrom.kafka.example",
"fields": [
{
"name": "language",
"type": {
"type": "enum",
"name": "language",
"symbols": [ "ENGLISH", "FRENCH", "ITALIAN", "SPANISH"
]
}
},
{
"name": "greeting",
"type": "string"
}
]
}
This example uses the following (partial) connector configuration which defaults to payload.formatter=com.nordstrom.kafka.connect.formatters.PlainPayloadFormatter
:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
aws.lambda.batch.enabled=false
Expected output:
{
"key": "my_key",
"keySchemaName": null,
"value": "Struct{language=ENGLISH,greeting=hello, world}",
"valueSchemaName": "com.nordstrom.kafka.example.Hello",
"topic": "example-stream",
"partition": 1,
"offset": 0,
"timestamp": 1567723257583,
"timestampTypeName": "CreateTime"
}
This example uses the following (partial) connector configuration with key and value schema visibility as min
(the default):
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
aws.lambda.batch.enabled=false
payload.formatter.class=com.nordstrom.kafka.connect.formatters.JsonPayloadFormatter
Expected output:
{
"key": "my_key",
"keySchemaName": null,
"keySchemaVersion": null,
"value": {
"language": "ENGLISH",
"greeting": "hello, world"
},
"valueSchemaName": "com.nordstrom.kafka.example.Hello",
"valueSchemaVersion": "1",
"topic": "example-stream",
"partition": 1,
"offset": 0,
"timestamp": 1567723257583,
"timestampTypeName": "CreateTime"
}
Follow the demo in order to: create an AWS Lambda function, build the connector plugin, run the connector, and send a message.
With an active AWS account, can create a simple AWS Lambda function using the CloudFormation template in the config/
directory:
aws cloudformation create-stack \
--stack-name example-lambda-stack \
--capabilities CAPABILITY_NAMED_IAM \
--template-body file://config/cloudformation.yml
To make sure our Lambda works, invoke it directly and view the result payload in result.txt
:
aws lambda invoke --function-name example-function --payload '{"value": "my example"}' result.txt
The function simply sends the payload
back to you in result.txt
as serialized json.
Use the describe-stacks
command to fetch the CloudFormation output value for ExampleFunctionArn
, which we'll need later when setting up our connector configuration:
aws cloudformation describe-stacks --stack-name example-lambda-stack --query "Stacks[0].Outputs[]"
mvn clean package
Once built, a kafka-connect-lambda
uber-jar is in the target/plugin
directory.
Ensure you have AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
environment variables exported in your shell. Docker Compose will pass these values into the connect
container.
Use the provided Docker Compose file and run docker-compose up
.
With the Kafka Connect REST interface, verify the Lambda sink connector is installed and ready: curl http://localhost:8083/connector-plugins
.
Next, supply a connector configuration. You can use config/connector.json.example
as a starting-point. Fill in values for <Your AWS Region>
and <Your function ARN>
and run:
curl -XPOST -H 'Content-Type: application/json' http://localhost:8083/connectors -d @config/connector.json
Run the ZooKeeper and Kafka components from the Confluent Platform.
Next, configure a Java properties-file containing your connector configuration. You can use config/connector.properties.example
as a starting-point. Fill in values for <Your AWS Region>
and <Your function ARN>
.
Ensure you have AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
environment variables exported in your shell. Then, run the connector in "standalone-mode":
connect-standalone config/worker.properties config/connector.properties
Using the Kafka console producer, send a message to the example-stream
topic. Your example-lambda-connector
will read the message from the topic and invoke the AWS Lambda example-function
.
Use the AWS Console to read the output of your message sent from the CloudWatch logs for the Lambda.