Project Name | Stars | Downloads | Repos Using This | Packages Using This | Most Recent Commit | Total Releases | Latest Release | Open Issues | License | Language |
---|---|---|---|---|---|---|---|---|---|---|
Dagster | 6,951 | 2 | 89 | 17 hours ago | 495 | July 06, 2022 | 1,592 | apache-2.0 | Python | |
An orchestration platform for the development, production, and observation of data assets. | ||||||||||
Mage Ai | 3,696 | a day ago | 9 | June 27, 2022 | 54 | apache-2.0 | Python | |||
🧙 The modern replacement for Airflow. Build, run, and manage data pipelines for integrating and transforming data. | ||||||||||
Transmogrifai | 2,099 | 3 | a year ago | 9 | June 11, 2020 | 44 | bsd-3-clause | Scala | ||
TransmogrifAI (pronounced trăns-mŏgˈrə-fī) is an AutoML library for building modular, reusable, strongly typed machine learning workflows on Apache Spark with minimal hand-tuning | ||||||||||
Mleap | 1,434 | 15 | 12 | a month ago | 26 | May 07, 2021 | 103 | apache-2.0 | Scala | |
MLeap: Deploy ML Pipelines to Production | ||||||||||
Digandburied | 645 | 7 years ago | 4 | GCC Machine Description | ||||||
挖坑与填坑 | ||||||||||
Goodreads_etl_pipeline | 593 | 3 years ago | mit | Python | ||||||
An end-to-end GoodReads Data Pipeline for Building Data Lake, Data Warehouse and Analytics Platform. | ||||||||||
Keystone | 472 | 6 years ago | 5 | March 03, 2017 | 39 | apache-2.0 | Scala | |||
Simplifying robust end-to-end machine learning on Apache Spark. | ||||||||||
Koober | 301 | 5 years ago | 3 | Scala | ||||||
Sparkflow | 290 | 2 years ago | 13 | May 18, 2019 | 8 | mit | Python | |||
Easy to use library to bring Tensorflow on Apache Spark | ||||||||||
Big Data Rosetta Code | 274 | 6 months ago | 10 | apache-2.0 | Scala | |||||
Code snippets for solving common big data problems in various platforms. Inspired by Rosetta Code |
Copyright 2018 ABSA Group Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
master | develop |
---|---|
Hyperdrive is a configurable and scalable ingestion platform that allows data movement and transformation from streaming sources with exactly-once fault-tolerance semantics by using Apache Spark Structured Streaming.
In Hyperdrive, each ingestion is defined by the three components reader, transformer and writer. This separation allows adapting to different streaming sources and sinks, while reusing transformations common across multiple ingestion pipelines.
Similar to batch processing, data ingestion pipelines are needed to process streaming data sources. While solutions for data pipelines exist, exactly-once fault-tolerance in streaming processing is an intricate problem and cannot be solved with the same strategies that exist for batch processing.
This is the gap the Hyperdrive aims to fill, by leveraging the exactly-once guarantee of Spark's Structured Streaming and by providing a flexible data pipeline.
The data ingestion pipeline of Hyperdrive consists of four components: readers, transformers, writers.
KafkaStreamReader
- reads from a Kafka topic.ConfluentAvroDecodingTransformer
- decodes the payload as Confluent Avro (through ABRiS), retrieving the schema from the specified Schema Registry. This transformer is capable of seamlessly handling whatever schemas the payload messages are using.ConfluentAvroEncodingTransformer
- encodes the payload as Confluent Avro (through ABRiS), updating the schema to the specified Schema Registry. This transformer is capable of seamlessly handling whatever schema the dataframe is using.ColumnSelectorStreamTransformer
- selects all columns from the decoded DataFrame.AddDateVersionTransformerStreamWriter
- adds columns for ingestion date and an auto-incremented version number, to be used for partitioning.ParquetStreamWriter
- writes the DataFrame as Parquet, in append mode.KafkaStreamWriter
- writes to a Kafka topic.Custom components can be implemented using the Component Archetype following the API defined in the package za.co.absa.hyperdrive.ingestor.api
StreamReader
, StreamTransformer
or StreamWriter
StreamReaderFactory
, StreamTransformerFactory
or StreamWriterFactory
After that, the new component will be able to be seamlessly invoked from the driver.
Hyperdrive has to be executed with Spark. Due to Spark-Kafka integration issues, it will only work with Spark 2.3 and higher.
git clone [email protected]:AbsaOSS/hyperdrive.git
mvn clean package
Given a configuration file has already been created, hyperdrive can be executed as follows:
spark-submit --class za.co.absa.hyperdrive.driver.drivers.PropertiesIngestionDriver driver/target/driver*.jar config.properties
Alternatively, configuration properties can also be passed as command-line arguments
spark-submit --class za.co.absa.hyperdrive.driver.drivers.CommandLineIngestionDriver driver/target/driver*.jar \
component.ingestor=spark \
component.reader=za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader \
# more properties ...
The configuration file may be created from the template located at driver/src/resources/Ingestion.properties.template
.
CommandLineIngestionDriverDockerTest
may be consulted for a working pipeline configuration.
Property Name | Required | Description |
---|---|---|
component.ingestor |
Yes | Defines the ingestion pipeline. Only spark is currently supported. |
component.reader |
Yes | Fully qualified name of reader component, e.g.za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader
|
component.transformer.id.{order} |
No | An arbitrary but unique string, referenced in this documentation as {transformer-id}
|
component.transformer.class.{transformer-id} |
No | Fully qualified name of transformer component, e.g. za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selection.ColumnSelectorStreamTransformer
|
component.writer |
Yes | Fully qualified name of writer component, e.g. za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter
|
Multiple transformers can be configured in the pipeline, including multiple instances of the same transformer.
For each transformer instance, component.transformer.id.{order}
and component.transformer.class.{transformer-id}
have to specified, where {order}
and {transformer-id}
need to be unique.
In the above table, {order}
must be an integer and may be negative. {transformer-id}
is only used within the configuration
to identify which configuration options belong to a certain transformer instance.
Property Name | Required | Description |
---|---|---|
ingestor.spark.app.name |
Yes | User-defined name of the Spark application. See Spark property spark.app.name
|
ingestor.spark.await.termination.timeout |
No | Timeout in milliseconds. Stops query when timeout is reached. This option is only valid with termination method awaitTermination
|
Property Name | Required | Description |
---|---|---|
reader.kafka.topic |
Yes | The name of the kafka topic to ingest data from. Equivalent to Spark property subscribe
|
reader.kafka.brokers |
Yes | List of kafka broker URLs . Equivalent to Spark property kafka.bootstrap.servers
|
Any additional properties for kafka can be added with the prefix reader.option.
. E.g. the property kafka.security.protocol
can be added as reader.option.kafka.security.protocol
See e.g. the Structured Streaming + Kafka Integration Guide for optional kafka properties.
The ConfluentAvroStreamDecodingTransformer
is built on ABRiS. More details about the configuration properties can be found there.
Caution: The ConfluentAvroStreamDecodingTransformer
requires the property reader.kafka.topic
to be set.
Property Name | Required | Description |
---|---|---|
transformer.{transformer-id}.schema.registry.url |
Yes | URL of Schema Registry, e.g. http://localhost:8081. Equivalent to ABRiS property SchemaManager.PARAM_SCHEMA_REGISTRY_URL
|
transformer.{transformer-id}.value.schema.id |
Yes | The schema id. Use latest or explicitly provide a number. Equivalent to ABRiS property SchemaManager.PARAM_VALUE_SCHEMA_ID
|
transformer.{transformer-id}.value.schema.naming.strategy |
Yes | Subject name strategy of Schema Registry. Possible values are topic.name , record.name or topic.record.name . Equivalent to ABRiS property SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY
|
transformer.{transformer-id}.value.schema.record.name |
Yes for naming strategies record.name and topic.record.name
|
Name of the record. Equivalent to ABRiS property SchemaManager.PARAM_SCHEMA_NAME_FOR_RECORD_STRATEGY
|
transformer.{transformer-id}.value.schema.record.namespace |
Yes for naming strategies record.name and topic.record.name
|
Namespace of the record. Equivalent to ABRiS property SchemaManager.PARAM_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY
|
transformer.{transformer-id}.consume.keys |
No | If set to true , keys will be consumed and added as columns to the dataframe. Key columns will be prefixed with key__
|
transformer.{transformer-id}.key.schema.id |
Yes if consume.keys is true |
The schema id for the key. |
transformer.{transformer-id}.key.schema.naming.strategy |
Yes if consume.keys is true |
Subject name strategy for key |
transformer.{transformer-id}.key.schema.record.name |
Yes for key naming strategies record.name and topic.record.name
|
Name of the record. |
transformer.{transformer-id}.key.schema.record.namespace |
Yes for key naming strategies record.name and topic.record.name
|
Namespace of the record. |
For detailed information on the subject name strategy, please take a look at the Schema Registry Documentation.
The ConfluentAvroStreamEncodingTransformer
is built on ABRiS. More details about the configuration properties can be found there.
Caution: The ConfluentAvroStreamEncodingTransformer
requires the property writer.kafka.topic
to be set.
Property Name | Required | Description |
---|---|---|
transformer.{transformer-id}.schema.registry.url |
Yes | URL of Schema Registry, e.g. http://localhost:8081. Equivalent to ABRiS property SchemaManager.PARAM_SCHEMA_REGISTRY_URL
|
transformer.{transformer-id}.value.schema.naming.strategy |
Yes | Subject name strategy of Schema Registry. Possible values are topic.name , record.name or topic.record.name . Equivalent to ABRiS property SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY
|
transformer.{transformer-id}.value.schema.record.name |
Yes for naming strategies record.name and topic.record.name
|
Name of the record. Equivalent to ABRiS property SchemaManager.PARAM_SCHEMA_NAME_FOR_RECORD_STRATEGY
|
transformer.{transformer-id}.value.schema.record.namespace |
Yes for naming strategies record.name and topic.record.name
|
Namespace of the record. Equivalent to ABRiS property SchemaManager.PARAM_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY
|
transformer.{transformer-id}.produce.keys |
No | If set to true , keys will be produced according to the properties key.column.prefix and key.column.names of the Hyperdrive Context
|
transformer.{transformer-id}.key.schema.naming.strategy |
Yes if produce.keys is true |
Subject name strategy for key |
transformer.{transformer-id}.key.schema.record.name |
Yes for key naming strategies record.name and topic.record.name
|
Name of the record. |
transformer.{transformer-id}.key.schema.record.namespace |
Yes for key naming strategies record.name and topic.record.name
|
Namespace of the record. |
Property Name | Required | Description |
---|---|---|
transformer.{transformer-id}.columns.to.select |
Yes | Comma-separated list of columns to select. * can be used to select all columns. Only existing columns using column names may be selected (i.e. expressions cannot be constructed) |
The AddDateVersionTransformer
adds the columns hyperdrive_date
and hyperdrive_version
. hyperdrive_date
is the ingestion date (or a user-defined date), while hyperdrive_version
is a number automatically incremented with every ingestion, starting at 1.
For the auto-increment to work, hyperdrive_date
and hyperdrive_version
need to be defined as partition columns.
Caution: This transformer requires a writer which defines writer.parquet.destination.directory
.
Property Name | Required | Description |
---|---|---|
transformer.{transformer-id}.report.date |
No | User-defined date for hyperdrive_date in format yyyy-MM-dd . Default date is the date of the ingestion |
See Pipeline settings for details about {transformer-id}
.
Property Name | Required | Description |
---|---|---|
writer.parquet.destination.directory |
Yes | Destination path of the sink. Equivalent to Spark property path for the DataStreamWriter
|
writer.parquet.partition.columns |
No | Comma-separated list of columns to partition by. |
writer.parquet.metadata.check |
No | Set this option to true if the consistency of the metadata log should be checked prior to the query. For very large tables, the check may be very expensive |
writer.common.trigger.type |
No | See Combination writer properties |
writer.common.trigger.processing.time |
No | See Combination writer properties |
Any additional properties for the DataStreamWriter
can be added with the prefix writer.parquet.options
, e.g. writer.parquet.options.key=value
Property Name | Required | Description |
---|---|---|
writer.kafka.topic |
Yes | The name of the kafka topic to ingest data from. Equivalent to Spark property topic
|
writer.kafka.brokers |
Yes | List of kafka broker URLs . Equivalent to Spark property kafka.bootstrap.servers
|
writer.common.trigger.type |
No | See Combination writer properties |
writer.common.trigger.processing.time |
No | See Combination writer properties |
Property Name | Required | Description |
---|---|---|
writer.common.checkpoint.location |
Yes | Used for Spark property checkpointLocation . The checkpoint location has to be unique among different workflows. |
writer.common.trigger.type |
No | Either Once for one-time execution or ProcessingTime for micro-batch executions for micro-batch execution. Default: Once . |
writer.common.trigger.processing.time |
No | Interval in ms for micro-batch execution (using ProcessingTime ). Default: 0ms, i.e. execution as fast as possible. |
Trigger (writer.common.trigger.type ) |
Timeout (ingestor.spark.termination.timeout ) |
Runtime | Details |
---|---|---|---|
Once | No timeout | Limited | Consumes all data that is available at the beginning of the micro-batch. The query processes exactly one micro-batch and stops then, even if more data would be available at the end of the micro-batch. |
ProcessingTime | With timeout | Limited | Consumes data in micro-batches and only stops when the timeout is reached or the query is killed. |
ProcessingTime | No timeout | Long-running | Consumes data in micro-batches and only stops when the query is killed. |
ProcessingTime
is configured, and regardless of what micro-batch interval is configured.
To limit the size of a micro-batch, the property reader.option.maxOffsetsPerTrigger
should be used. See also http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
Once
. If the timeout is reached before the micro-batch is processed, it won't be completed and no data will be committed. Such a behavior seems quite unpredictable and therefore we don't recommend it.See the Spark Documentation for more information about triggers.
HyperdriveContext
is an object intended to be used by the components to share data. It is a key-value store,
where the key is a string and the value can be of any type. The following context variables are currently used by the default implementation.
Name | Type | Description |
---|---|---|
key.column.prefix | String | If ConfluentAvroDecodingTransformer is configured to consume keys, it prefixes the key columns with key__ such that they can be distinguished in the dataframe. If key__ happens to be a prefix of a value column, a random alphanumeric string is used instead. |
key.column.names | Seq[String] | If ConfluentAvroDecodingTransformer is configured to consume keys, it contains the original column names (without prefix) in the key schema. |
Hyperdrive uses Apache Commons Configuration 2. This allows properties to be referenced, e.g. like so
transformer.[avro.decoder].schema.registry.url=http://localhost:8081
writer.kafka.schema.registry.url=${transformer.[avro.decoder].schema.registry.url}
Hyperdrive ingestions may be triggered using the Workflow Manager, which is developed in a separate repository: AbsaOSS/hyperdrive-trigger
A key feature of the Workflow Manager are triggers, which define when an ingestion should be executed and how it should be requested. The workflow manager supports cron-based triggers as well as triggers that listen to a notification topic.