Pypeln

Concurrent data pipelines in Python >>>
Alternatives To Pypeln
Project NameStarsDownloadsRepos Using ThisPackages Using ThisMost Recent CommitTotal ReleasesLatest ReleaseOpen IssuesLicenseLanguage
Go_spider1,62926 years ago1August 09, 201518mpl-2.0Go
[爬虫框架 (golang)] An awesome Go concurrent Crawler(spider) framework. The crawler is flexible and modular. It can be expanded to an Individualized crawler easily or you can use the default crawl components only.
Pypeln1,412566 months ago36January 06, 202222mitPython
Concurrent data pipelines in Python >>>
Ktvvideoprocess547
5 years ago5October 22, 2018mitObjective-C
A High-Performance video effects processing framework.
Pipeline40425 years ago1June 18, 20184mitGo
Pipeline is a package to build multi-staged concurrent workflows with a centralized logging output.
Taskctl162
2 years ago11April 26, 20219gpl-3.0Go
Concurrent task runner, developer's routine tasks automation toolkit. Simple modern alternative to GNU Make 🧰
Throttle Concurrent Builds Plugin101
3a month ago13April 11, 201614mitJava
Go Piper93
8 years ago1July 20, 2015apache-2.0Go
Library for creating and managing concurrent tasks in Go Language
Aws Concurrent Data Orchestration Pipeline Emr Livy66
5 years ago5apache-2.0Python
This code demonstrates the architecture featured on the AWS Big Data blog (https://aws.amazon.com/blogs/big-data/ ) which creates a concurrent data pipeline by using Amazon EMR and Apache Livy. This pipeline is orchestrated by Apache Airflow.
Plumbingplusplus49
11 years agobsl-1.0C++
"unix pipes" or automatic concurrent pipelining in C++
Papaline46
5 years ago23December 17, 2015epl-1.0Clojure
Clojure concurrent pipleline on core.async
Alternatives To Pypeln
Select To Compare


Alternative Project Comparisons
Readme

Pypeln

Coverage


Pypeln (pronounced as "pypeline") is a simple yet powerful Python library for creating concurrent data pipelines.

Main Features

  • Simple: Pypeln was designed to solve medium data tasks that require parallelism and concurrency where using frameworks like Spark or Dask feels exaggerated or unnatural.
  • Easy-to-use: Pypeln exposes a familiar functional API compatible with regular Python code.
  • Flexible: Pypeln enables you to build pipelines using Processes, Threads and asyncio.Tasks via the exact same API.
  • Fine-grained Control: Pypeln allows you to have control over the memory and cpu resources used at each stage of your pipelines.

For more information take a look at the Documentation.

diagram

Installation

Install Pypeln using pip:

pip install pypeln

Basic Usage

With Pypeln you can easily create multi-stage data pipelines using 3 type of workers:

Processes

You can create a pipeline based on multiprocessing.Process workers by using the process module:

import pypeln as pl
import time
from random import random

def slow_add1(x):
    time.sleep(random()) # <= some slow computation
    return x + 1

def slow_gt3(x):
    time.sleep(random()) # <= some slow computation
    return x > 3

data = range(10) # [0, 1, 2, ..., 9] 

stage = pl.process.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.process.filter(slow_gt3, stage, workers=2)

data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]

At each stage the you can specify the numbers of workers. The maxsize parameter limits the maximum amount of elements that the stage can hold simultaneously.

Threads

You can create a pipeline based on threading.Thread workers by using the thread module:

import pypeln as pl
import time
from random import random

def slow_add1(x):
    time.sleep(random()) # <= some slow computation
    return x + 1

def slow_gt3(x):
    time.sleep(random()) # <= some slow computation
    return x > 3

data = range(10) # [0, 1, 2, ..., 9] 

stage = pl.thread.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.thread.filter(slow_gt3, stage, workers=2)

data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]

Here we have the exact same situation as in the previous case except that the worker are Threads.

Tasks

You can create a pipeline based on asyncio.Task workers by using the task module:

import pypeln as pl
import asyncio
from random import random

async def slow_add1(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x + 1

async def slow_gt3(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x > 3

data = range(10) # [0, 1, 2, ..., 9] 

stage = pl.task.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.task.filter(slow_gt3, stage, workers=2)

data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]

Conceptually similar but everything is running in a single thread and Task workers are created dynamically. If the code is running inside an async task can use await on the stage instead to avoid blocking:

import pypeln as pl
import asyncio
from random import random

async def slow_add1(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x + 1

async def slow_gt3(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x > 3


def main():
    data = range(10) # [0, 1, 2, ..., 9] 

    stage = pl.task.map(slow_add1, data, workers=3, maxsize=4)
    stage = pl.task.filter(slow_gt3, stage, workers=2)

    data = await stage # e.g. [5, 6, 9, 4, 8, 10, 7]

asyncio.run(main())

Sync

The sync module implements all operations using synchronous generators. This module is useful for debugging or when you don't need to perform heavy CPU or IO tasks but still want to retain element order information that certain functions like pl.*.ordered rely on.

import pypeln as pl
import time
from random import random

def slow_add1(x):
    return x + 1

def slow_gt3(x):
    return x > 3

data = range(10) # [0, 1, 2, ..., 9] 

stage = pl.sync.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.sync.filter(slow_gt3, stage, workers=2)

data = list(stage) # [4, 5, 6, 7, 8, 9, 10]

Common arguments such as workers and maxsize are accepted by this module's functions for API compatibility purposes but are ignored.

Mixed Pipelines

You can create pipelines using different worker types such that each type is the best for its given task so you can get the maximum performance out of your code:

data = get_iterable()
data = pl.task.map(f1, data, workers=100)
data = pl.thread.flat_map(f2, data, workers=10)
data = filter(f3, data)
data = pl.process.map(f4, data, workers=5, maxsize=200)

Notice that here we even used a regular python filter, since stages are iterables Pypeln integrates smoothly with any python code, just be aware of how each stage behaves.

Pipe Operator

In the spirit of being a true pipeline library, Pypeln also lets you create your pipelines using the pipe | operator:

data = (
    range(10)
    | pl.process.map(slow_add1, workers=3, maxsize=4)
    | pl.process.filter(slow_gt3, workers=2)
    | list
)

Run Tests

A sample script is provided to run the tests in a container (either Docker or Podman is supported), to run tests:

$ bash scripts/run-tests.sh

This script can also receive a python version to check test against, i.e

$ bash scripts/run-tests.sh 3.7

Related Stuff

Contributors

License

MIT

Popular Pipeline Projects
Popular Concurrent Projects
Popular Data Processing Categories
Related Searches

Get A Weekly Email With Trending Projects For These Categories
No Spam. Unsubscribe easily at any time.
Python
Pipeline
Thread
Concurrent
Aiohttp