Project Name | Stars | Downloads | Repos Using This | Packages Using This | Most Recent Commit | Total Releases | Latest Release | Open Issues | License | Language |
---|---|---|---|---|---|---|---|---|---|---|
Pipeline | 4,140 | 5 months ago | 85 | July 18, 2017 | 1 | apache-2.0 | Jsonnet | |||
PipelineAI Kubeflow Distribution | ||||||||||
Orchest | 3,773 | 8 days ago | 14 | April 06, 2022 | 124 | agpl-3.0 | Python | |||
Build data pipelines, the easy way 🛠️ | ||||||||||
Docker Airflow | 3,589 | 2 months ago | 269 | apache-2.0 | Shell | |||||
Docker Apache Airflow | ||||||||||
Awesome Apache Airflow | 3,060 | 7 months ago | 2 | Shell | ||||||
Curated list of resources about Apache Airflow | ||||||||||
Elyra | 1,567 | 10 | 6 days ago | 89 | July 07, 2022 | 249 | apache-2.0 | Python | ||
Elyra extends JupyterLab with an AI centric approach. | ||||||||||
Airflow Tutorial | 669 | a year ago | 18 | mit | Jupyter Notebook | |||||
Apache Airflow tutorial | ||||||||||
Kube Airflow | 556 | 4 years ago | 30 | apache-2.0 | Python | |||||
A docker image and kubernetes config files to run Airflow on Kubernetes | ||||||||||
Data Pipelines With Apache Airflow | 441 | 2 months ago | 30 | other | Python | |||||
Code for Data Pipelines with Apache Airflow | ||||||||||
Astronomer | 441 | 6 days ago | 19 | other | Python | |||||
Helm Charts for the Astronomer Platform, Apache Airflow as a Service on Kubernetes | ||||||||||
Gather Deployment | 345 | a year ago | mit | Jupyter Notebook | ||||||
Gathers scalable Tensorflow, Python infrastructure deployment and practices, 100% Docker. |
This platform automates curating a local data warehouse of interesting, up-to-date public data sets. It enables users (well, mainly one user, me) to easily add data sets to the warehouse, build analyses that explore and answer questions with current data, and discover existing assets to accelerate exploring new questions.
At present, it uses docker to provision and run:
I like to research before I buy anything, especially if it's a big-ticket item. I've been considering buying a house for a while, but the methods I use for answering questions like "what phone should I buy?" or "how can I make my apartment less drafty in winter" haven't been adequate to answer questions I have about real estate. Fortunately, the real estate market I've grown fond of has the richest public data culture in the US (that I, a data scientist focused on Chicago-related issues, am aware of). This market's Assessor's Office regularly publishes data I can mine for answers to some of my biggest questions.
You can see documentation for this platform at https://docs.analytics-data-where-house.dev/. This project is still under active development and documentation will continue to evolve with the system.
To use this system, Docker Engine and Compose (v2.0.0 or higher) are the only hard requirements.
Having python and GNU make on your host system will provide a lot of quality of life improvements (mainly a streamlined setup process and useful makefile recipes), but they're not strictly necessary.
After the system is set up, you can easily add a Socrata data set to the warehouse by
SocrataTable
in /airflow/dags/sources/tables.py
:Look up the table's documentation page on the web and get the table_id
from the URL (it will be nine characters long, all lowercase and with a hyphen in the middle). Use that table_id
value, along with a sensible name for the table and cron expressions that indicates how frequently the system should check for data updates) to define a SocrataTable instance for the table.
COOK_COUNTY_PARCEL_SALES = SocrataTable(
table_id="wvhk-k5uv",
table_name="cook_county_parcel_sales",
schedule="0 6 4 * *",
)
Note: It's a convention in python to capitalize the names of constants, and as the table_id
and table_name
for a data set should be constant, I use the capitalized table_name
as the name of the data set's SocrataTable
instance variable.
/airflow/dags/
based on the update_data_raw_cook_county_parcel_sales
DAG below:After copying the code into a new file, you only have to make changes to the 4 lines numbered below:
1: Replace COOK_COUNTY_PARCEL_SALES
with the name of the SocrataTable
instance variable from tables.py
,
2: change the tags to reflect this data set,
3: change the name of this DAG's function name to reflect this data set, and
4: call that DAG function.
# This is the full file /airflow/dags/cook_county/update_raw_cook_county_parcel_sales.py
import datetime as dt
import logging
from airflow.decorators import dag
from tasks.socrata_tasks import update_socrata_table
from sources.tables import COOK_COUNTY_PARCEL_SALES as SOCRATA_TABLE ### 1.
task_logger = logging.getLogger("airflow.task")
@dag(
schedule=SOCRATA_TABLE.schedule,
start_date=dt.datetime(2022, 11, 1),
catchup=False,
tags=["cook_county", "parcels", "fact_table", "data_raw"], ### 2.
)
def update_data_raw_cook_county_parcel_sales(): ### 3.
update_1 = update_socrata_table(
socrata_table=SOCRATA_TABLE,
conn_id="dwh_db_conn",
task_logger=task_logger,
)
update_1
update_data_raw_cook_county_parcel_sales() ### 4.
Congratulations! You just defined a new data pipeline! After you unpause and run this DAG in the Airflow Web UI, the system will automatically build that data pipeline, add that data set to the warehouse, and update that data set on the schedule indicated in the SocrataTable
instance.
The Update-data DAGs for (at least) Socrata tables follow the pattern below:
Before downloading potentially gigabytes of data, we check the data source's metadata to determine if the source data has been updated since the most recent successful update of that data in the local data warehouse. Whether there is new data or not, we'll log the results of that check in the data_warehouse's metadata.table_metadata
table.
If the data source's data is fresher than the data in the local data warehouse, the system downloads the entire table from the data source (to a file in the Airflow-scheduler container) and then runs the load_data_tg
TaskGroup, which:
data_raw
schema if the data set is a new addition to the warehouse.make_dbt_data_raw_model
task automatically generates a data-set-specific dbt data_raw model file.
data_raw
table. Records that are entirely new or are updates of prior records (i.e., at least one source column has a changed value) are appended to the persisting data_raw
table.metadata.table_metadata
table is updated to indicate the table in the local data warehouse was successfully updated on this freshness check.
Those tasks make up the load_data_tg
Task Group.
If the local data warehouse has up-to-date data for a given data source, we will just record that finding in the metadata table and end the run.
Tables with geospatial features/columns will be downloaded in the .geojson format (which has a much more flexible structure than .csv files), while tables without geospatial features (ie flat tabular data) will be downloaded as .csv files. Different code is needed to correctly and efficiently read and ingest these different formats. So far, this platform has implemented data-loader TaskGroups to handle .geojson and .csv file formats, but this pattern is easy to extend if other data sources only offer other file formats.
Many public data tables are exported from production systems, where records represent something that can change over time. For example, in this building permit table, each record represents an application for a building permit. Rather than adding a new record any time the application process moves forward (e.g., when a fee was paid, a contact was added, or the permit gets issued), the original record gets updated. After this data is updated, the prior state of the table is gone (or at least no longer publicly available). This is ideal for intended users of the production system (i.e., people involved in the process who have to look up the current status of a permit request). But for someone seeking to understand the process, keeping all distinct versions or states of a record makes it possible to see how a record evolved. So I've developed this workflow to keep the original record and all distinct updates for (non "temp_") tables in the data_raw
schema.
This query shows the count of new or updated records grouped by the data-publication DateTime when the record was new to the local data warehouse.
DAGs put or developed in the /<repo>/airflow/dags/
directory will quickly be available through the web UI and can be manually triggered or run there.
At present, a local mount is created at /<repo>/data_raw
(host-side) to /opt/airflow/data_raw
(container-side), so changes you make to a DAG from your host machine will be (nearly immediately) available you can develop.
To generate and serve documentation for the data transformations executed by dbt, run the command below, and after the doc server has started up, go to http://localhost:18080 to explore the documentation UI.
The documentation will be mainly based on the sources, column names, and descriptions recorded in the .yml
file in the .../dbt/models/...
directories with table-or-view-producing dbt scripts.
[email protected]:.../your_local_repo$ make serve_dbt_docs
pgAdmin4 is a very feature-rich environment and makes it very convenient to test out queries or syntax and see the result.
great_expectations
This project already configures a great_expectations
Datasource and Data Connectors for the included dwh_db
database, but the Analytics-Data-Where-House documentation has a walkthrough on setting up a new Datasource.
To validate data, great_expectations
first needs a suite of expectations for that data. The ADWH docs also include a walkthrough showing the process of setting up a suite of expectations.
Now that you have generated a suite of expectations, you should check that the data meets those expectations by configuring and running a checkpoint. You'll very likely discover that the expectations generated by great_expectations
data profiling tools on a small batch of data will need some tweaks.
To see a walkthrough of this process, including the steps to revise expectations, the ADWH documentation has you covered!
While developing workflows, occassionally I'll run into permissions issues where Airflow tries to create things in a location that was created automatically outside of the specified volume locations or something and I've had to change (take) ownership of the location (from outside of the container) via a chown
command like the one below (where I'm -R
ecursively taking ownership of the dbt/
directory).
sudo chown -R $USER:$USER dbt/
Additionally, if you run into issues while debugging a dbt model where you're making changes to the model but getting the same error every time, try running the command below (to clean out the previously compiled dbt models and installed packages, then reinstall packages) and run the relevant DAG again to see if things update.
make clean_dbt