Analytics_data_where_house

An analytics engineering sandbox focusing on real estates prices in Cook County, IL
Alternatives To Analytics_data_where_house
Project NameStarsDownloadsRepos Using ThisPackages Using ThisMost Recent CommitTotal ReleasesLatest ReleaseOpen IssuesLicenseLanguage
Pipeline4,140
5 months ago85July 18, 20171apache-2.0Jsonnet
PipelineAI Kubeflow Distribution
Orchest3,773
8 days ago14April 06, 2022124agpl-3.0Python
Build data pipelines, the easy way 🛠️
Docker Airflow3,589
2 months ago269apache-2.0Shell
Docker Apache Airflow
Awesome Apache Airflow3,060
7 months ago2Shell
Curated list of resources about Apache Airflow
Elyra1,567106 days ago89July 07, 2022249apache-2.0Python
Elyra extends JupyterLab with an AI centric approach.
Airflow Tutorial669
a year ago18mitJupyter Notebook
Apache Airflow tutorial
Kube Airflow556
4 years ago30apache-2.0Python
A docker image and kubernetes config files to run Airflow on Kubernetes
Data Pipelines With Apache Airflow441
2 months ago30otherPython
Code for Data Pipelines with Apache Airflow
Astronomer441
6 days ago19otherPython
Helm Charts for the Astronomer Platform, Apache Airflow as a Service on Kubernetes
Gather Deployment345
a year agomitJupyter Notebook
Gathers scalable Tensorflow, Python infrastructure deployment and practices, 100% Docker.
Alternatives To Analytics_data_where_house
Select To Compare


Alternative Project Comparisons
Readme

Analytics Data Where House

This platform automates curating a local data warehouse of interesting, up-to-date public data sets. It enables users (well, mainly one user, me) to easily add data sets to the warehouse, build analyses that explore and answer questions with current data, and discover existing assets to accelerate exploring new questions.

At present, it uses docker to provision and run:

  • a PostgreSQL + PostGIS database as the data warehouse,
  • a pgAdmin4 database administration interface,
  • Airflow components to orchestrate tasks (note: uses a LocalExecutor),
  • dbt to manage data transformation and cleaning tasks, serve and facilitate search of the data dictionary and catalog,
  • great_expectations to ensure data meets and
  • custom python code that makes it easy to implement an ELT pipeline for any other table hosted by Socrata.

Motivation

I like to research before I buy anything, especially if it's a big-ticket item. I've been considering buying a house for a while, but the methods I use for answering questions like "what phone should I buy?" or "how can I make my apartment less drafty in winter" haven't been adequate to answer questions I have about real estate. Fortunately, the real estate market I've grown fond of has the richest public data culture in the US (that I, a data scientist focused on Chicago-related issues, am aware of). This market's Assessor's Office regularly publishes data I can mine for answers to some of my biggest questions.

Documentation

You can see documentation for this platform at https://docs.analytics-data-where-house.dev/. This project is still under active development and documentation will continue to evolve with the system.

System Requirements

To use this system, Docker Engine and Compose (v2.0.0 or higher) are the only hard requirements.

Having python and GNU make on your host system will provide a lot of quality of life improvements (mainly a streamlined setup process and useful makefile recipes), but they're not strictly necessary.

Usage

After the system is set up, you can easily add a Socrata data set to the warehouse by

  1. Define the SocrataTable in /airflow/dags/sources/tables.py:

Look up the table's documentation page on the web and get the table_id from the URL (it will be nine characters long, all lowercase and with a hyphen in the middle). Use that table_id value, along with a sensible name for the table and cron expressions that indicates how frequently the system should check for data updates) to define a SocrataTable instance for the table.

COOK_COUNTY_PARCEL_SALES = SocrataTable(
    table_id="wvhk-k5uv",
    table_name="cook_county_parcel_sales",
    schedule="0 6 4 * *",
)

Note: It's a convention in python to capitalize the names of constants, and as the table_id and table_name for a data set should be constant, I use the capitalized table_name as the name of the data set's SocrataTable instance variable.

  1. Create a DAG in a file in /airflow/dags/ based on the update_data_raw_cook_county_parcel_sales DAG below:

After copying the code into a new file, you only have to make changes to the 4 lines numbered below: 1: Replace COOK_COUNTY_PARCEL_SALES with the name of the SocrataTable instance variable from tables.py, 2: change the tags to reflect this data set, 3: change the name of this DAG's function name to reflect this data set, and 4: call that DAG function.

# This is the full file /airflow/dags/cook_county/update_raw_cook_county_parcel_sales.py
import datetime as dt
import logging

from airflow.decorators import dag

from tasks.socrata_tasks import update_socrata_table
from sources.tables import COOK_COUNTY_PARCEL_SALES as SOCRATA_TABLE   ### 1.

task_logger = logging.getLogger("airflow.task")


@dag(
    schedule=SOCRATA_TABLE.schedule,
    start_date=dt.datetime(2022, 11, 1),
    catchup=False,
    tags=["cook_county", "parcels", "fact_table", "data_raw"],        ### 2.
)
def update_data_raw_cook_county_parcel_sales():                       ### 3.
    update_1 = update_socrata_table(
        socrata_table=SOCRATA_TABLE,
        conn_id="dwh_db_conn",
        task_logger=task_logger,
    )
    update_1
update_data_raw_cook_county_parcel_sales()                            ### 4.

Congratulations! You just defined a new data pipeline! After you unpause and run this DAG in the Airflow Web UI, the system will automatically build that data pipeline, add that data set to the warehouse, and update that data set on the schedule indicated in the SocrataTable instance.

Socrata Table Ingestion Flow

The Update-data DAGs for (at least) Socrata tables follow the pattern below:

  • Check the metadata of the table's data source (via api if available, or if not, by scraping where possible)
    • If the local data warehouse's data is stale:
      • download and ingest all new records into a temporary table,
      • identify new records and updates to prior records, and
      • add any new or updated records to a running table of all distinct records
    • If the local data warehouse's data is as fresh as the source:
      • update the freshness-check-metadata table and end

Simple Update DAG Flow

Before downloading potentially gigabytes of data, we check the data source's metadata to determine if the source data has been updated since the most recent successful update of that data in the local data warehouse. Whether there is new data or not, we'll log the results of that check in the data_warehouse's metadata.table_metadata table.

check_table_metadata TaskGroup

Freshness check metadata Table in pgAdmin4

If the data source's data is fresher than the data in the local data warehouse, the system downloads the entire table from the data source (to a file in the Airflow-scheduler container) and then runs the load_data_tg TaskGroup, which:

  1. Loads it into a "temp" table (via the appropriate data-loader TaskGroup).

load_data_tg TaskGroup loaders minimized

  1. Creates a persisting table for this data set in the data_raw schema if the data set is a new addition to the warehouse.
  2. Checks if the initial dbt data_raw deduplication model exists, and if not, the make_dbt_data_raw_model task automatically generates a data-set-specific dbt data_raw model file.

load_data_tg TaskGroup data_raw table-maker and dbt model generator

  1. Compares all records from the latest data set (in the "temp" table) against all records previously added to the persisting data_raw table. Records that are entirely new or are updates of prior records (i.e., at least one source column has a changed value) are appended to the persisting data_raw table.
  • Note: updated records do not replace the prior records here. All distinct versions are kept so that it's possible to examine changes to a record over time.
  1. The metadata.table_metadata table is updated to indicate the table in the local data warehouse was successfully updated on this freshness check.

load_data_tg TaskGroup data_raw table-maker and dbt model generator

Those tasks make up the load_data_tg Task Group.

load_data_tg TaskGroup High Level

If the local data warehouse has up-to-date data for a given data source, we will just record that finding in the metadata table and end the run.

Local data is fresh so we will note that and end

Data Loader task_groups

Tables with geospatial features/columns will be downloaded in the .geojson format (which has a much more flexible structure than .csv files), while tables without geospatial features (ie flat tabular data) will be downloaded as .csv files. Different code is needed to correctly and efficiently read and ingest these different formats. So far, this platform has implemented data-loader TaskGroups to handle .geojson and .csv file formats, but this pattern is easy to extend if other data sources only offer other file formats.

data-loading TaskGroups in load_data_tg TaskGroup

Many public data tables are exported from production systems, where records represent something that can change over time. For example, in this building permit table, each record represents an application for a building permit. Rather than adding a new record any time the application process moves forward (e.g., when a fee was paid, a contact was added, or the permit gets issued), the original record gets updated. After this data is updated, the prior state of the table is gone (or at least no longer publicly available). This is ideal for intended users of the production system (i.e., people involved in the process who have to look up the current status of a permit request). But for someone seeking to understand the process, keeping all distinct versions or states of a record makes it possible to see how a record evolved. So I've developed this workflow to keep the original record and all distinct updates for (non "temp_") tables in the data_raw schema.

This query shows the count of new or updated records grouped by the data-publication DateTime when the record was new to the local data warehouse.

Counts of distinct records in data_raw table by when the source published that data set version

Developing DAGs

DAGs put or developed in the /<repo>/airflow/dags/ directory will quickly be available through the web UI and can be manually triggered or run there.

At present, a local mount is created at /<repo>/data_raw (host-side) to /opt/airflow/data_raw (container-side), so changes you make to a DAG from your host machine will be (nearly immediately) available you can develop.

Serving dbt Data Documentation and Discovery UI

To generate and serve documentation for the data transformations executed by dbt, run the command below, and after the doc server has started up, go to http://localhost:18080 to explore the documentation UI.

The documentation will be mainly based on the sources, column names, and descriptions recorded in the .yml file in the .../dbt/models/... directories with table-or-view-producing dbt scripts.

[email protected]:.../your_local_repo$ make serve_dbt_docs

dbt documentation page with table lineage graph

Developing queries and exploring data in pgAdmin4

pgAdmin4 is a very feature-rich environment and makes it very convenient to test out queries or syntax and see the result.

pgAdmin4's geospatial query viewer

Data Validation with great_expectations

Setting up New Data Sources

This project already configures a great_expectations Datasource and Data Connectors for the included dwh_db database, but the Analytics-Data-Where-House documentation has a walkthrough on setting up a new Datasource.

Generating a Suite of Expectations for a Data Set

To validate data, great_expectations first needs a suite of expectations for that data. The ADWH docs also include a walkthrough showing the process of setting up a suite of expectations.

Configuring a Checkpoint and Validating a Data Set

Now that you have generated a suite of expectations, you should check that the data meets those expectations by configuring and running a checkpoint. You'll very likely discover that the expectations generated by great_expectations data profiling tools on a small batch of data will need some tweaks.

To see a walkthrough of this process, including the steps to revise expectations, the ADWH documentation has you covered!

Troubleshooting Notes

While developing workflows, occassionally I'll run into permissions issues where Airflow tries to create things in a location that was created automatically outside of the specified volume locations or something and I've had to change (take) ownership of the location (from outside of the container) via a chown command like the one below (where I'm -Recursively taking ownership of the dbt/ directory).

sudo chown -R $USER:$USER dbt/

Additionally, if you run into issues while debugging a dbt model where you're making changes to the model but getting the same error every time, try running the command below (to clean out the previously compiled dbt models and installed packages, then reinstall packages) and run the relevant DAG again to see if things update.

make clean_dbt
Popular Docker Projects
Popular Airflow Projects
Popular Virtualization Categories
Related Searches

Get A Weekly Email With Trending Projects For These Categories
No Spam. Unsubscribe easily at any time.
Python
Docker
Airflow
Data Engineering