@@ -11,6 +11,15 @@ export const states = {
1111 errored : Symbol ( 'errored' ) ,
1212} ;
1313
14+ /*
15+ * A contract for a promise that requires a clean up
16+ * function be called after the promise finishes.
17+ */
18+ type PromiseWithCleanUp < T > = {
19+ promise : Promise < T > ,
20+ cleanup : ( ) => void ,
21+ }
22+
1423/**
1524 * @typedef {Object } StreamToAsyncIterator~Options
1625 * @property {number } [size] - the size of each read from the stream for each iteration
@@ -111,9 +120,22 @@ export default class StreamToAsyncIterator<TVal> {
111120 */
112121 async next ( ) : Promise < Iteration < TVal >> {
113122 if ( this . _state === states . notReadable ) {
123+ const read = this . _untilReadable ( ) ;
124+ const end = this . _untilEnd ( ) ;
125+
114126 //need to wait until the stream is readable or ended
115- await Promise . race ( [ this . _untilReadable ( ) , this . _untilEnd ( ) ] ) ;
116- return this . next ( ) ;
127+ try {
128+ await Promise . race ( [ read . promise , end . promise ] ) ;
129+ return this . next ( ) ;
130+ }
131+ catch ( e ) {
132+ throw e
133+ }
134+ finally {
135+ //need to clean up any hanging event listeners
136+ read . cleanup ( )
137+ end . cleanup ( )
138+ }
117139 } else if ( this . _state === states . ended ) {
118140 return { done : true } ;
119141 } else if (this._state === states.errored) {
@@ -138,34 +160,63 @@ export default class StreamToAsyncIterator<TVal> {
138160 * @private
139161 * @returns {Promise }
140162 */
141- _untilReadable ( ) : Promise < void > {
142- return new Promise ( ( resolve , reject ) => {
143- const handleReadable = ( ) => {
163+ _untilReadable ( ) : PromiseWithCleanUp < void > {
164+ //let is used here instead of const because the exact reference is
165+ //required to remove it, this is why it is not a curried function that
166+ //accepts resolve & reject as parameters.
167+ let eventListener = null ;
168+
169+ const promise = new Promise ( ( resolve , reject ) => {
170+ eventListener = ( ) => {
144171 this . _state = states . readable ;
145172 this . _rejections . delete ( reject ) ;
173+
174+ // we set this to null to info the clean up not to do anything
175+ eventListener = null ;
146176 resolve ( ) ;
147177 } ;
148178
149- this . _stream . once ( 'readable' , handleReadable ) ;
179+ //on is used here instead of once, because
180+ //the listener is remove afterwards anyways.
181+ this . _stream . once ( 'readable' , eventListener ) ;
150182 this . _rejections . add ( reject ) ;
151183 } ) ;
184+
185+ const cleanup = ( ) = > {
186+ if ( eventListener == null ) return ;
187+ this . _stream . removeListener ( 'readable' , eventListener ) ;
188+ } ;
189+
190+ return { cleanup , promise }
152191 }
153192
154193 /**
155194 * Waits until the stream is ended. Rejects if the stream errored out.
156195 * @private
157196 * @returns { Promise }
158197 */
159- _untilEnd(): Promise< void > {
160- return new Promise ( ( resolve , reject ) => {
161- const handleEnd = ( ) => {
198+ _untilEnd(): PromiseWithCleanUp< void > {
199+ let eventListener = null ;
200+
201+ const promise = new Promise ( ( resolve , reject ) => {
202+ eventListener = ( ) => {
162203 this . _state = states . ended ;
163204 this . _rejections . delete ( reject ) ;
205+
206+ eventListener = null
164207 resolve ( ) ;
165208 } ;
166- this . _stream . once ( 'end' , handleEnd ) ;
209+
210+ this . _stream . once ( 'end' , eventListener ) ;
167211 this . _rejections . add ( reject ) ;
168- } )
212+ } ) ;
213+
214+ const cleanup = ( ) => {
215+ if ( eventListener == null ) return ;
216+ this . _stream . removeListener ( 'end' , eventListener ) ;
217+ } ;
218+
219+ return { cleanup, promise }
169220 }
170221}
171222
0 commit comments