Awesome Open Source
Awesome Open Source

python-stream

说明

数据流式框架, 可用作数据清洗, 数据预处理, 数据迁移等应用场景

更优雅的流式数据处理方式

安装


pip install git+https://github.com/sandabuliu/python-stream.git

or

git clone https://github.com/sandabuliu/python-stream.git
cd python-agent
python setup.py install

QuickStart


Examples

Word Count
from pystream.executor.source import Memory
from pystream.executor.executor import Map, Iterator, ReducebyKey
    
data = Memory([
    'Wikipedia is a free online encyclopedia, created and edited by volunteers around the world and hosted by the Wikimedia Foundation.',
    'Search thousands of wikis, start a free wiki, compare wiki software.',
    'The official Wikipedia Android app is designed to help you find, discover, and explore knowledge on Wikipedia.'
])
p = data | Map(lambda x: x.split(' ')) | Iterator(lambda x: (x.strip('.,'), 1)) | ReducebyKey(lambda x, y: x+y)
result = {}
for key, value in p:
    result[key] = value
print result.items()

执行结果

[('and', 3), ('wiki', 2), ('compare', 1), ('help', 1), ('is', 2), ('Wikipedia', 3), ('discover', 1), ('hosted', 1), ('Android', 1), ('find', 1), ('Foundation', 1), ('knowledge', 1), ('to', 1), ('by', 2), ('start', 1), ('online', 1), ('you', 1), ('thousands', 1), ('app', 1), ('edited', 1), ('Search', 1), ('around', 1), ('free', 2), ('explore', 1), ('designed', 1), ('world', 1), ('The', 1), ('the', 2), ('a', 2), ('on', 1), ('created', 1), ('Wikimedia', 1), ('official', 1), ('encyclopedia', 1), ('of', 1), ('wikis', 1), ('volunteers', 1), ('software', 1)]
计算π
from random import random
from pystream.executor.source import Faker
from pystream.executor.executor import Executor, Map, Group

class Pi(Executor):
    def __init__(self, **kwargs):
        super(Pi, self).__init__(**kwargs)
        self.counter = 0
        self.result = 0

    def handle(self, item):
        self.counter += 1
        self.result += item
        return 4.0*self.result/self.counter

s = Faker(lambda: random(), 100000) | Map(lambda x: x*2-1) | Group(size=2) | Map(lambda x: 1 if x[0]**2+x[1]**2 <= 1 else 0) | Pi()

res = None
for _ in s:
    res = _
print res

执行结果

3.14728
排序
from random import randint
from pystream.executor.source import Memory
from pystream.executor.executor import Sort
m = Memory([randint(0, 100) for i in range(10)]) | Sort()

for i in m:
    print list(i)

执行结果

[94]
[94, 99]
[18, 94, 99]
[18, 40, 94, 99]
[18, 26, 40, 94, 99]
[18, 26, 40, 63, 94, 99]
[18, 26, 40, 63, 83, 94, 99]
[3, 18, 26, 40, 63, 83, 94, 99]
[3, 18, 26, 40, 63, 83, 83, 94, 99]
[3, 16, 18, 26, 40, 63, 83, 83, 94, 99]
在 hadoop 中使用
wordcount
mapper.py
from pystream.executor.source import Stdin
from pystream.executor.executor import Map, Iterator
from pystream.executor.output import Stdout

s = Stdin() | Map(lambda x: x.strip().split()) | Iterator(lambda x: "%s\t1" % x) | Stdout()
s.start()
reducer.py
from pystream.executor.source import Stdin
from pystream.executor.executor import Map, ReducebySortedKey
from pystream.executor.output import Stdout

s = Stdin() | Map(lambda x: x.strip().split('\t')) | ReducebySortedKey(lambda x, y: int(x)+int(y)) | Map(lambda x: '%s\t%s' % x) | Stdout()
s.start()
解析 NGINX 日志
from pystream.config import rule
from pystream.executor.source import File
from pystream.executor.executor import Parser
s = File('/var/log/nginx/access.log') | Parser(rule('nginx'))

for item in s:
    print item

执行结果

{'status': '400', 'body_bytes_sent': 173, 'remote_user': '-', 'http_referer': '-', 'remote_addr': '198.35.46.20', 'request': '\\x05\\x01\\x00', 'version': None, 'http_user_agent': '-', 'time_local': datetime.datetime(2017, 2, 15, 13, 11, 3), 'path': None, 'method': None}
{'status': '400', 'body_bytes_sent': 173, 'remote_user': '-', 'http_referer': '-', 'remote_addr': '198.35.46.20', 'request': '\\x05\\x01\\x00', 'version': None, 'http_user_agent': '-', 'time_local': datetime.datetime(2017, 2, 15, 13, 11, 3), 'path': None, 'method': None}
{'status': '400', 'body_bytes_sent': 173, 'remote_user': '-', 'http_referer': '-', 'remote_addr': '198.35.46.20', 'request': '\\x05\\x01\\x00', 'version': None, 'http_user_agent': '-', 'time_local': datetime.datetime(2017, 2, 15, 13, 11, 3), 'path': None, 'method': None}
{'status': '400', 'body_bytes_sent': 173, 'remote_user': '-', 'http_referer': '-', 'remote_addr': '198.35.46.20', 'request': '\\x05\\x01\\x00', 'version': None, 'http_user_agent': '-', 'time_local': datetime.datetime(2017, 2, 15, 13, 11, 3), 'path': None, 'method': None}
{'status': '400', 'body_bytes_sent': 173, 'remote_user': '-', 'http_referer': '-', 'remote_addr': '198.35.46.20', 'request': '\\x05\\x01\\x00', 'version': None, 'http_user_agent': '-', 'time_local': datetime.datetime(2017, 2, 15, 13, 11, 3), 'path': None, 'method': None}
导出数据库数据
from sqlalchemy import create_engine
from pystream.executor.source import SQL
from pystream.executor.output import Csv
from pystream.executor.wraps import Batch

engine = create_engine('mysql://root:[email protected]:3306/test')  
conn = engine.connect()
s = SQL(conn, 'select * from faker') | Batch(Csv('/tmp/output'))

for item in s:
    print item['data']
    print item['exception']
conn.close()

数据源

读取文件数据
from pystream.executor.source import Tail, File, Csv
Tail('/var/log/nginx/access.log')
File('/var/log/nginx/*.log')
Csv('/tmp/test*.csv')
读取 TCP 流数据
from pystream.executor.source import TCPClient
TCPClient('/tmp/pystream.sock')
TCPClient(('127.0.0.1', 10000))
读取 python 数据
from Queue import Queue as Q
from random import randint
from pystream.executor.source import Memory, Faker, Queue
queue = Q(10)

Memory([1, 2, 3, 4])
Faker(randint, 1000)
Queue(queue)
读取常用模块数据
from pystream.executor.source import SQL, Kafka
SQL(conn, 'select * from faker')   # 读取数据库数据
Kafka('topic1', '127.0.0.1:9092')  # 读取 kafka 数据

数据输出

输出到文件
from pystream.executor.output import File, Csv
File('/tmp/output')
Csv('/tmp/output.csv')
通过HTTP输出
from pystream.executor.output import HTTPRequest
HTTPRequest('http://127.0.0.1/api/data')
输出到kafka
from pystream.executor.output import Kafka
Kafka('topic', '127.0.0.1:9092')

中间件

队列
from pystream.executor.source import Tail
from pystream.executor.output import Stdout
from pystream.executor.middleware import Queue

s = Tail('/Users/tongbin01/PycharmProjects/python-stream/README.md') | Queue() | Stdout()
s.start()
订阅
from random import randint
from pystream.executor.source import Tail
from pystream.executor.executor import Map
from pystream.executor.output import Stdout

from pystream.executor.middleware import Subscribe
from pystream.executor.wraps import Daemonic

sub = Tail('/var/log/messages') | Map(lambda x: (str(randint(1, 2)), x.strip())) | Subscribe()
Daemonic(sub).start()

s = sub['1'] | Map(lambda x: x.strip()) | Stdout()
s.start()

TodoList

  • 订阅器(Subscribe)客户端超时处理
  • 并行计算
  • HTTP 异步输出/异步源
  • 添加其他基础输出/基础源
  • 添加对其他常用模块的支持, 如 redis, kafka, flume, log-stash, 各种数据库等

Copyright © 2017 [email protected]


Get A Weekly Email With Trending Projects For These Topics
No Spam. Unsubscribe easily at any time.
python (50,901
data (376
stream (229
log (141
structure (28

Find Open Source By Browsing 7,000 Topics Across 59 Categories