Luigi Warehouse

A luigi powered analytics / warehouse stack
Alternatives To Luigi Warehouse
Project NameStarsDownloadsRepos Using ThisPackages Using ThisMost Recent CommitTotal ReleasesLatest ReleaseOpen IssuesLicenseLanguage
Dev Setup5,802
a year ago34otherPython
macOS development environment setup: Easy-to-understand instructions with automated setup scripts for developer tools like Vim, Sublime Text, Bash, iTerm, Python data analysis, Spark, Hadoop MapReduce, AWS, Heroku, JavaScript web development, Android development, common data stores, and dev-based OS X defaults.
7 days ago95gpl-3.0Rust
Seed your development database with real data ⚡️
Aws Sdk Pandas3,587563 days ago140August 01, 202335apache-2.0Python
pandas on AWS - Easy integration with Athena, Glue, Redshift, Timestream, Neptune, OpenSearch, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).
Wal G2,712
4 days ago62March 16, 2022250otherGo
Archival and Restoration for databases in the Cloud
Devops Bash Tools1,851
12 days ago4mitShell
1000+ DevOps Bash Scripts - AWS, GCP, Kubernetes, Docker, CI/CD, APIs, SQL, PostgreSQL, MySQL, Hive, Impala, Kafka, Hadoop, Jenkins, GitHub, GitLab, BitBucket, Azure DevOps, TeamCity, Spotify, MP3, LDAP, Code/Build Linting, pkg mgmt for Linux, Mac, Python, Perl, Ruby, NodeJS, Golang, Advanced dotfiles: .bashrc, .vimrc, .gitconfig, .screenrc, tmux..
Learn Devops1,195
a day ago1HCL
I am using this repository to document my devops journey. I follow the process of learning everything by tasks. Every task has an associated objective that encompasses an underlying concept. Concepts including CloudProviders, Containers, ContainersOrchestration, Databases, InfrastructureAsCode, Interview, VersionControl etc in progress
Nagios Plugins1,105
2 months ago66otherPython
450+ AWS, Hadoop, Cloud, Kafka, Docker, Elasticsearch, RabbitMQ, Redis, HBase, Solr, Cassandra, ZooKeeper, HDFS, Yarn, Hive, Presto, Drill, Impala, Consul, Spark, Jenkins, Travis CI, Git, MySQL, Linux, DNS, Whois, SSL Certs, Yum Security Updates, Kubernetes, Cloudera etc...
Nodejs Book737
2 months ago239JavaScript
Node.js교과서 소스 코드
2 months ago1June 18, 20212mitGo
Highly configurable prompt builder for Bash, ZSH and PowerShell written in Go.
10 months agomit
:pencil:Today I Learned / 기억은 기록을 이길 수 없다.
Alternatives To Luigi Warehouse
Select To Compare

Alternative Project Comparisons


A boilerplate implementation of Luigi at Groupon


  • Luigi is a Python package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more

  • Luigi-Warehouse adds

  • example workflows (i.e. replicating postgresql tables to redshift)

  • more data sources

  • variable data sources that do not rely on default luigi behavior/configs (i.e. VariableS3Client)

Install / Setup

  • Install python3 - This repo has been tested against python 3.4+

python install

Developers - if you're wanting to modify/use the workflows with your custom logic
  • Clone this repo
  • pip3 install -r requirements.txt if you want full functionality of all data sources
  • mkdir your-path-to/data
  • Put your credentials and settings in luigi.cfg. luigi.cfg-example shows some possible options. You can also $ export LUIGI_CONFIG_PATH=/path/to/your/luigi.cfg && python...
  • You're ready to replicate or move data around...

Getting Started

  • Some example workflows are included. Assumptions, Args & Comments are in the File
File Description Main Class(es) replicates all data from a google sheet to a redshift table (full copy/replace) Run replicates all data from a google sheet to a hadoop hive table via spark (full copy/replace) main replicates postgres tables to redshift (incrementally or full copy/replace) Run - PerformIncrementalImport PerformFullImport spark app that replicates postgres tables to hadoop(hive) (incrementally or copy/replace) Run - RunIncremental RunFromScratch replicates a salesforce report or SOQL to a redshift table(full copy/replace) SOQLtoRedshift ReporttoRedshift replicates given teradata SQL to redshift table (incrementally or full copy/replace) Run replicates all data from typeform responses to a redshift table (full copy/replace) Run extracts users,orgs,tickets,ticket_events from zendesk to redshift (partially incremental) Run generic class to extract from zendesk API and load to hadoop hive via spark (incrementally or full copy/replace) ZendeskSpark
  • Example to start the luigi scheduler daemon
$ ./start_luigi_server.bash
  • Example to run a workflow with multiple workers in parallel
$ LUIGI_CONFIG_PATH=/path/to/your/luigi.cfg && python3 luigi_warehouse/ Run --params here --workers 50

Data Sources

Dependent python packages required & API reference

Luigi - Spotify/Luigi
Postgres / Redshift - psycopg2
MySQL - pymysql
Adwords - googleads : API Reference
Googlesheets - gspread : API Reference
Slack - slackclient : API Reference
Five9 - suds : API Reference
Twilio - twilio : API Reference
Livechat - API Reference
Zendesk - zdesk : API Reference
Shiftplanning - API Reference
Kochava - API Reference
Teradata - teradata
  • requires some configuring to install. We typically have to do
$ mv ~/.odbc.ini ~/.odbc.ini.orig 
$ cp /opt/teradata/client/15.10/odbc_64/odbcinst.ini ~/.odbcinst.ini 
$ cp /opt/teradata/client/15.10/odbc_64/odbc.ini ~/.odbc.ini
OnboardIQ - API Reference
AppBoy - API Reference
Salesforce - simple-salesforce : API Reference
  • Props to cghall for the capability to query salesforce reports directly using the analytics API

  • Also available are SalesforceBulk and SalesforceBulkJob classes which use the Salesforce bulk API

Braintree - braintree : API Reference
Typeform - API Reference
Checkr - API Reference
AWS - boto : boto3


  • We currently use slack or email for job status notifications which can easily be added

  • luigi-slack

from luigi_slack import SlackBot, notify
slack_channel = 'luigi-status-messages'

if __name__ == '__main__':
  slack_channel = 'luigi-status-messages'
  slacker = SlackBot(token=luigi.configuration.get_config().get('slackbots', 'BOWSER_SLACK_API_KEY'),
  with notify(slacker): 
import boto3

class Email:
  def __init__(self, region, aws_access_key_id, aws_secret_access_key):
    self.client = boto3.client('ses',region_name=region,aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

  def send(self, from_, to_list, subject, body):
    return self.client.send_email(Source=from_,
                                  Destination={'ToAddresses': to_list},
                                                     {'Data': subject},
                                                             {'Data': body},
                                                              {'Data':' '}

Data Validation

  • Targeted towards ensuring successful replication of data to Redshift (see modules/
  • if the same number of columns in the csv are in the target table
  • if the columns have the same datatypes in the same order (VARCHAR is acceptable for any python datatype)
    • uses python_redshift_dtypes to convert
  • Checks for load errors for the target:schema:table provided since the load_start provided timestamp
  • Use the wrapper class RunAnywayTarget if you want to make it easier as we make each validation scheme better

  • pass in the taskobj with the following attributes

    • type = ['LoadError', 'Structure']
    • target = Redshift
    • table =
    • schema =
    • local_file = local csv file path
    • load_start = when you started to copy the records from S3
  • doing RunAnywayTarget(self).done() will not do validation

  • doing RunAnywayTarget(self).validation() will do the validation and if successful also say we're done the task

  • Takes the following args
  1. target_cols : a list of columns ordered for how you want your dataframe to be structured
  2. df : your dataframe you want restructured
  • example: I my dataframe to have columns in this order ['one','two','three','four','five','six']
>>> from validation import OrderedDF
>>> import pandas as pd
>>> test = [[None,'',1,7,8],[None,'',2,5,6]]
>>> test = pd.DataFrame(test,columns=['one','two','four','five','three'])
>>> test
    one two  four  five  three
0  None         1     7      8
1  None         2     5      6
>>> result = OrderedDF(['one','two','three','four','five','six'],t)
>>> result.df
    one two  three  four  five   six
0  None          8     1     7  None
1  None          6     2     5  None
  • This class will fix tables for you
  1. Check for copy errors
  2. Handle the copy errors
  • Add column(s) if needed
  • Change dtype(s) if needed
  1. Get orig table's schema
  2. Craft new table's schema with changes from errors
  3. Make the change and retry the copy and remove duplicate * records
  4. While there are copy errors
  • handle the errors

  • attempt to fix

  • retry copy

  • remove duplicate * records

  • To run use

StructureDynamic(target_schema=  ,# redshift schema your table is in
                 target_table=    # your table
                      add_cols=  ,# True or False for if you want columns added in attempting to fix
                      change_dtypes=  ,# True or False if you want column data types changed in attempting to fix
                      copy=           ,# copy command you attempted
                      load_start=      # when you started the copy command, '%Y-%m-%d %H:%M:$S
  • Example usage:
    • sql prep: create the table
CREATE TABLE public.test(id INT, col VARCHAR);
INSERT INTO test VALUES (1,'2');
INSERT INTO test VALUES (2, 'two');
  • test.csv: create the csv you want to attempt to copy
  • we attempt to copy normally but we get load errors because one of the columns isn't right
COPY public.test FROM 's3://luigi-godata/test.csv' 
CREDENTIALS 'aws_access_key_id=XXXX;aws_secret_access_key=XXXX'
  • we run ValidationDynamic
from validation import StructureDynamic
copy = '''COPY public.test FROM 's3://luigi-godata/test.csv' 
          CREDENTIALS 'aws_access_key_id=XXXX;aws_secret_access_key=XXXX'
StructureDynamic(target_schema='public',target_table='test').run(add_cols=True,change_dtypes=True,copy=copy,load_start='2016-10-6 10:15:00')
  • our table is fixed and called public.test
  • our original table is kept as public.test_orig_backup
  • stdout lists the stl_load_errors
  • the changes made to the table's ddl is printed to stdout
Popular Amazon Web Services Projects
Popular Mysql Projects
Popular Cloud Computing Categories
Related Searches

Get A Weekly Email With Trending Projects For These Categories
No Spam. Unsubscribe easily at any time.
Google Sheets