Project Name | Stars | Downloads | Repos Using This | Packages Using This | Most Recent Commit | Total Releases | Latest Release | Open Issues | License | Language |
---|---|---|---|---|---|---|---|---|---|---|
Go_spider | 1,629 | 2 | 6 years ago | 1 | August 09, 2015 | 18 | mpl-2.0 | Go | ||
[爬虫框架 (golang)] An awesome Go concurrent Crawler(spider) framework. The crawler is flexible and modular. It can be expanded to an Individualized crawler easily or you can use the default crawl components only. | ||||||||||
Pypeln | 1,412 | 5 | 6 | 6 months ago | 36 | January 06, 2022 | 22 | mit | Python | |
Concurrent data pipelines in Python >>> | ||||||||||
Ktvvideoprocess | 547 | 5 years ago | 5 | October 22, 2018 | mit | Objective-C | ||||
A High-Performance video effects processing framework. | ||||||||||
Pipeline | 404 | 2 | 5 years ago | 1 | June 18, 2018 | 4 | mit | Go | ||
Pipeline is a package to build multi-staged concurrent workflows with a centralized logging output. | ||||||||||
Taskctl | 162 | 2 years ago | 11 | April 26, 2021 | 9 | gpl-3.0 | Go | |||
Concurrent task runner, developer's routine tasks automation toolkit. Simple modern alternative to GNU Make 🧰 | ||||||||||
Throttle Concurrent Builds Plugin | 101 | 3 | a month ago | 13 | April 11, 2016 | 14 | mit | Java | ||
Go Piper | 93 | 8 years ago | 1 | July 20, 2015 | apache-2.0 | Go | ||||
Library for creating and managing concurrent tasks in Go Language | ||||||||||
Aws Concurrent Data Orchestration Pipeline Emr Livy | 66 | 5 years ago | 5 | apache-2.0 | Python | |||||
This code demonstrates the architecture featured on the AWS Big Data blog (https://aws.amazon.com/blogs/big-data/ ) which creates a concurrent data pipeline by using Amazon EMR and Apache Livy. This pipeline is orchestrated by Apache Airflow. | ||||||||||
Plumbingplusplus | 49 | 11 years ago | bsl-1.0 | C++ | ||||||
"unix pipes" or automatic concurrent pipelining in C++ | ||||||||||
Papaline | 46 | 5 years ago | 23 | December 17, 2015 | epl-1.0 | Clojure | ||||
Clojure concurrent pipleline on core.async |
Pypeln (pronounced as "pypeline") is a simple yet powerful Python library for creating concurrent data pipelines.
For more information take a look at the Documentation.
Install Pypeln using pip:
pip install pypeln
With Pypeln you can easily create multi-stage data pipelines using 3 type of workers:
You can create a pipeline based on multiprocessing.Process workers by using the process
module:
import pypeln as pl
import time
from random import random
def slow_add1(x):
time.sleep(random()) # <= some slow computation
return x + 1
def slow_gt3(x):
time.sleep(random()) # <= some slow computation
return x > 3
data = range(10) # [0, 1, 2, ..., 9]
stage = pl.process.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.process.filter(slow_gt3, stage, workers=2)
data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]
At each stage the you can specify the numbers of workers
. The maxsize
parameter limits the maximum amount of elements that the stage can hold simultaneously.
You can create a pipeline based on threading.Thread workers by using the thread
module:
import pypeln as pl
import time
from random import random
def slow_add1(x):
time.sleep(random()) # <= some slow computation
return x + 1
def slow_gt3(x):
time.sleep(random()) # <= some slow computation
return x > 3
data = range(10) # [0, 1, 2, ..., 9]
stage = pl.thread.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.thread.filter(slow_gt3, stage, workers=2)
data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]
Here we have the exact same situation as in the previous case except that the worker are Threads.
You can create a pipeline based on asyncio.Task workers by using the task
module:
import pypeln as pl
import asyncio
from random import random
async def slow_add1(x):
await asyncio.sleep(random()) # <= some slow computation
return x + 1
async def slow_gt3(x):
await asyncio.sleep(random()) # <= some slow computation
return x > 3
data = range(10) # [0, 1, 2, ..., 9]
stage = pl.task.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.task.filter(slow_gt3, stage, workers=2)
data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]
Conceptually similar but everything is running in a single thread and Task workers are created dynamically. If the code is running inside an async task can use await
on the stage instead to avoid blocking:
import pypeln as pl
import asyncio
from random import random
async def slow_add1(x):
await asyncio.sleep(random()) # <= some slow computation
return x + 1
async def slow_gt3(x):
await asyncio.sleep(random()) # <= some slow computation
return x > 3
def main():
data = range(10) # [0, 1, 2, ..., 9]
stage = pl.task.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.task.filter(slow_gt3, stage, workers=2)
data = await stage # e.g. [5, 6, 9, 4, 8, 10, 7]
asyncio.run(main())
The sync
module implements all operations using synchronous generators. This module is useful for debugging or when you don't need to perform heavy CPU or IO tasks but still want to retain element order information that certain functions like pl.*.ordered
rely on.
import pypeln as pl
import time
from random import random
def slow_add1(x):
return x + 1
def slow_gt3(x):
return x > 3
data = range(10) # [0, 1, 2, ..., 9]
stage = pl.sync.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.sync.filter(slow_gt3, stage, workers=2)
data = list(stage) # [4, 5, 6, 7, 8, 9, 10]
Common arguments such as workers
and maxsize
are accepted by this module's functions for API compatibility purposes but are ignored.
You can create pipelines using different worker types such that each type is the best for its given task so you can get the maximum performance out of your code:
data = get_iterable()
data = pl.task.map(f1, data, workers=100)
data = pl.thread.flat_map(f2, data, workers=10)
data = filter(f3, data)
data = pl.process.map(f4, data, workers=5, maxsize=200)
Notice that here we even used a regular python filter
, since stages are iterables Pypeln integrates smoothly with any python code, just be aware of how each stage behaves.
In the spirit of being a true pipeline library, Pypeln also lets you create your pipelines using the pipe |
operator:
data = (
range(10)
| pl.process.map(slow_add1, workers=3, maxsize=4)
| pl.process.filter(slow_gt3, workers=2)
| list
)
A sample script is provided to run the tests in a container (either Docker or Podman is supported), to run tests:
$ bash scripts/run-tests.sh
This script can also receive a python version to check test against, i.e
$ bash scripts/run-tests.sh 3.7
MIT