|
| 1 | +//. # Fluture Node |
| 2 | +//. |
| 3 | +//. Common Node API's wrapped to return [Fluture][] Futures. |
| 4 | +//. |
| 5 | +//. ## API |
| 6 | + |
| 7 | +import Future from 'fluture'; |
| 8 | + |
| 9 | +//# once :: String -> EventEmitter -> Future Error a |
| 10 | +//. |
| 11 | +//. Resolve a Future with the first event emitted over |
| 12 | +//. the given event emitter under the given event name. |
| 13 | +//. |
| 14 | +//. When the Future is cancelled, it removes any trace of |
| 15 | +//. itself from the event emitter. |
| 16 | +//. |
| 17 | +//. ```js |
| 18 | +//. > const emitter = new EventEmitter (); |
| 19 | +//. > setTimeout (() => emitter.emit ('answer', 42), 100); |
| 20 | +//. > once ('answer') (emitter); |
| 21 | +//. Future.of (42); |
| 22 | +//. ``` |
| 23 | +export const once = event => emitter => Future ((rej, res) => { |
| 24 | + const removeListeners = () => { |
| 25 | + emitter.removeListener ('error', onError); |
| 26 | + emitter.removeListener (event, onEvent); |
| 27 | + }; |
| 28 | + const onError = x => { |
| 29 | + removeListeners (); |
| 30 | + rej (x); |
| 31 | + }; |
| 32 | + const onEvent = x => { |
| 33 | + removeListeners (); |
| 34 | + res (x); |
| 35 | + }; |
| 36 | + emitter.once ('error', onError); |
| 37 | + emitter.once (event, onEvent); |
| 38 | + return removeListeners; |
| 39 | +}); |
| 40 | + |
| 41 | +//# buffer :: ReadableStream a -> Future Error (Array a) |
| 42 | +//. |
| 43 | +//. Buffer all data on a Stream into a Future of an Array. |
| 44 | +//. |
| 45 | +//. When the Future is cancelled, it removes any trace of |
| 46 | +//. itself from the Stream. |
| 47 | +//. |
| 48 | +//. ```js |
| 49 | +//. > const stream = new Readable ({read: () => {}}); |
| 50 | +//. > setTimeout (() => { |
| 51 | +//. . stream.push ('hello'); |
| 52 | +//. . stream.push ('world'); |
| 53 | +//. . stream.push (null); |
| 54 | +//. . }, 100); |
| 55 | +//. > buffer (stream); |
| 56 | +//. Future.of ([Buffer.from ('hello'), Buffer.from ('world')]); |
| 57 | +//. ``` |
| 58 | +export const buffer = stream => Future ((rej, res) => { |
| 59 | + const chunks = []; |
| 60 | + const removeListeners = () => { |
| 61 | + stream.removeListener ('data', onData); |
| 62 | + stream.removeListener ('error', rej); |
| 63 | + stream.removeListener ('end', onEnd); |
| 64 | + }; |
| 65 | + const onData = d => chunks.push (d); |
| 66 | + const onEnd = () => { |
| 67 | + removeListeners (); |
| 68 | + res (chunks); |
| 69 | + }; |
| 70 | + const onError = e => { |
| 71 | + removeListeners (); |
| 72 | + rej (e); |
| 73 | + }; |
| 74 | + stream.on ('data', onData); |
| 75 | + stream.once ('error', onError); |
| 76 | + stream.once ('end', onEnd); |
| 77 | + return removeListeners; |
| 78 | +}); |
| 79 | + |
| 80 | +//. [Fluture]: https://github.com/fluture-js/Fluture |
0 commit comments