Skip to content

Commit a767d27

Browse files
author
czheo
committed
add green threads syntax
1 parent 933c676 commit a767d27

File tree

3 files changed

+24
-3
lines changed

3 files changed

+24
-3
lines changed

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
packages=find_packages(),
1212
install_requires=[
1313
'multiprocess',
14+
'eventlet',
1415
],
1516
setup_requires=[
1617
'pytest-runner',

syntax_sugar/pipe.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from functools import partial
22
from .composable import compose, composable
33
from multiprocess.pool import ThreadPool, Pool
4+
from eventlet import GreenPool
45

56
__all__ = [
67
'END',
@@ -10,8 +11,10 @@
1011
'puts',
1112
'process_syntax',
1213
'thread_syntax',
14+
'green_thread_syntax',
1315
'p',
1416
't',
17+
'g',
1518
]
1619

1720
def puts(data, end="\n"):
@@ -49,6 +52,12 @@ class ProcessSyntax(MultiTaskSyntax):
4952
class ThreadSyntax(MultiTaskSyntax):
5053
pass
5154

55+
class GreenThreadSyntax(MultiTaskSyntax):
56+
pass
57+
58+
class EventSyntax(MultiTaskSyntax):
59+
pass
60+
5261
def multitask(fn, poolsize, data, pool_constructor):
5362
with pool_constructor(poolsize) as p:
5463
if not hasattr(data, '__iter__'):
@@ -63,8 +72,14 @@ def multiprocess(fn, poolsize, data):
6372
def multithread(fn, poolsize, data):
6473
return multitask(fn, poolsize, data, ThreadPool)
6574

75+
def multigreenthread(fn, poolsize, data):
76+
p = GreenPool(poolsize)
77+
return list(p.imap(fn, data))
78+
79+
6680
process_syntax = p = ProcessSyntax()
6781
thread_syntax = t = ThreadSyntax()
82+
green_thread_syntax = g = GreenThreadSyntax()
6883

6984
class pipe:
7085
def __init__(self, data = None):
@@ -110,6 +125,8 @@ def __or__(self, rhs):
110125
self.action = compose(partial(multiprocess, rhs.func, rhs.poolsize), self.action)
111126
elif isinstance(rhs, ThreadSyntax):
112127
self.action = compose(partial(multithread, rhs.func, rhs.poolsize), self.action)
128+
elif isinstance(rhs, GreenThreadSyntax):
129+
self.action = compose(partial(multigreenthread, rhs.func, rhs.poolsize), self.action)
113130
elif isinstance(rhs, tuple):
114131
self.action = compose(partial(*rhs), self.action)
115132
elif isinstance(rhs, pipe):

tests/test_pipe.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,14 @@ def test_pipe_connect():
3030
# not all processes in the pool are necessarily used
3131
#
3232
def test_pipe_multiprocess():
33-
assert pipe(range(10)) | p[lambda x: x**2] * 3 | END == [x ** 2 for x in range(10)]
33+
assert pipe(100) | range | p[lambda x: x**2] * 3 | sorted | END == [x ** 2 for x in range(100)]
3434

3535
def test_pipe_multithread():
36-
assert pipe(range(10)) | [lambda x: x**2] * 3 | END == [x ** 2 for x in range(10)]
37-
assert pipe(range(10)) | t[lambda x: x**2] * 3 | END == [x ** 2 for x in range(10)]
36+
assert pipe(100) | range | [lambda x: x**2] * 3 | sorted | END == [x ** 2 for x in range(100)]
37+
assert pipe(100) | range | t[lambda x: x**2] * 3 | sorted | END == [x ** 2 for x in range(100)]
38+
39+
def test_pipe_multigreenthread():
40+
assert pipe(100) | range | g[lambda x: x**2] * 3 | sorted | END == [x ** 2 for x in range(100)]
3841

3942
def test_redirect():
4043
p = pipe(range(10)) | each(str) | ''.join

0 commit comments

Comments
 (0)