Skip to content

Commit 40f4d02

Browse files
author
czheo
committed
refactoring multiprocess/multithreads
1 parent 08cb18f commit 40f4d02

File tree

1 file changed

+18
-26
lines changed

1 file changed

+18
-26
lines changed

syntax_sugar/pipe.py

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -33,43 +33,35 @@ class Debug:
3333

3434
DEBUG = Debug()
3535

36-
class MultiTask:
37-
def __init__(self, func):
36+
class MultiTaskSyntax:
37+
def __init__(self):
3838
self.poolsize = 1
39-
self.func = func
4039
def __mul__(self, other):
4140
self.poolsize = other
4241
return self
42+
def __getitem__(self, func):
43+
self.func = func
44+
return self
4345

44-
class MultiProcess(MultiTask):
46+
class ProcessSyntax(MultiTaskSyntax):
4547
pass
4648

47-
class MultiThread(MultiTask):
49+
class ThreadSyntax(MultiTaskSyntax):
4850
pass
4951

50-
class ProcessSyntax:
51-
def __getitem__(self, func):
52-
return MultiProcess(func)
53-
54-
class ThreadSyntax:
55-
def __getitem__(self, func):
56-
return MultiThread(func)
52+
def multitask(fn, poolsize, data, pool_constructor):
53+
with pool_constructor(poolsize) as p:
54+
if not hasattr(data, '__iter__'):
55+
data= [data]
56+
return p.map(fn, data)[0]
57+
else:
58+
return p.map(fn, data)
5759

5860
def multiprocess(fn, poolsize, data):
59-
p = Pool(poolsize)
60-
if not hasattr(data, '__iter__'):
61-
data= [data]
62-
return p.map(fn, data)[0]
63-
else:
64-
return p.map(fn, data)
61+
return multitask(fn, poolsize, data, Pool)
6562

6663
def multithread(fn, poolsize, data):
67-
p = ThreadPool(poolsize)
68-
if not hasattr(data, '__iter__'):
69-
data= [data]
70-
return p.map(fn, data)[0]
71-
else:
72-
return p.map(fn, data)
64+
return multitask(fn, poolsize, data, ThreadPool)
7365

7466
process_syntax = p = ProcessSyntax()
7567
thread_syntax = t = ThreadSyntax()
@@ -114,9 +106,9 @@ def __or__(self, rhs):
114106
poolsize = len(rhs)
115107
new_action = rhs[0]
116108
self.action = compose(partial(multithread, new_action, poolsize), self.action)
117-
elif isinstance(rhs, MultiProcess):
109+
elif isinstance(rhs, ProcessSyntax):
118110
self.action = compose(partial(multiprocess, rhs.func, rhs.poolsize), self.action)
119-
elif isinstance(rhs, MultiThread):
111+
elif isinstance(rhs, ThreadSyntax):
120112
self.action = compose(partial(multithread, rhs.func, rhs.poolsize), self.action)
121113
elif isinstance(rhs, tuple):
122114
self.action = compose(partial(*rhs), self.action)

0 commit comments

Comments
 (0)