|
1 | 1 | from functools import partial |
2 | | -from .composable import compose |
| 2 | +from .composable import compose, composable |
3 | 3 | from multiprocess.pool import ThreadPool, Pool |
4 | 4 |
|
5 | 5 | __all__ = [ |
|
15 | 15 | 'thread_syntax', |
16 | 16 | 'p', |
17 | 17 | 't', |
| 18 | + 'until', |
| 19 | + 'when', |
| 20 | + 'lazy_pipe', |
18 | 21 | ] |
19 | 22 |
|
20 | 23 | def puts(data): |
@@ -129,42 +132,51 @@ def __rshift__(self, rhs): |
129 | 132 | # lazy_pipe |
130 | 133 | ##### |
131 | 134 |
|
| 135 | +def neg(x): |
| 136 | + return not x |
| 137 | + |
132 | 138 | class until: |
133 | 139 | def __init__(self, cond): |
134 | 140 | self.cond = cond |
135 | 141 |
|
| 142 | +class when: |
| 143 | + def __init__(self, cond): |
| 144 | + self.cond = cond |
| 145 | + |
136 | 146 | class lazy_pipe: |
137 | 147 | def __init__(self, source): |
138 | 148 | self.source = source |
139 | 149 | self.func = composable(lambda x: x) |
140 | 150 |
|
141 | | - def __or__(self, left): |
142 | | - if isinstance(left, dump): |
| 151 | + def __or__(self, rhs): |
| 152 | + if isinstance(rhs, dump): |
143 | 153 | # dump termination |
144 | 154 | return self.dump() |
145 | | - elif isinstance(left, until): |
146 | | - return self.until(left.cond) |
| 155 | + elif isinstance(rhs, when): |
| 156 | + return self.when(rhs.cond) |
| 157 | + elif isinstance(rhs, until): |
| 158 | + return self.when(compose(neg, rhs.cond)) |
147 | 159 | else: |
148 | 160 | # read actions |
149 | | - self.func = composable(left) * self.func |
| 161 | + self.func = composable(rhs) * self.func |
150 | 162 | return self |
151 | 163 |
|
152 | 164 | def dump(self): |
153 | 165 | return self.func(self.source) |
154 | 166 |
|
155 | | - def until(self, cond): |
| 167 | + def when(self, cond): |
156 | 168 | if hasattr(self.source, '__call__'): |
157 | 169 | data = self.source() |
158 | | - while not cond(data): |
| 170 | + while cond(data): |
159 | 171 | self.func(data) |
160 | 172 | data = self.source() |
161 | 173 | return self |
162 | 174 | elif hasattr(self.source, '__iter__'): |
163 | 175 | for data in self.source: |
164 | 176 | if cond(data): |
165 | | - break |
166 | | - else: |
167 | 177 | self.func(data) |
| 178 | + else: |
| 179 | + break |
168 | 180 | return self |
169 | 181 | else: |
170 | 182 | raise TypeError('pipeline source need be callable or iterable with until condition') |
0 commit comments