@@ -247,77 +247,21 @@ class GremlinClient extends EventEmitter {
247247 )
248248 }
249249
250- /**
251- * Execute the script and return a stream of distinct/single results.
252- * This method reemits a distinct data event for each returned result, which
253- * makes the stream behave as if `resultIterationBatchSize` was set to 1.
254- *
255- * If you do not wish this behavior, please use client.messageStream() instead.
256- *
257- * Even though this method uses Highland.js internally, it does not return
258- * a high level Highland readable stream so we do not risk having to deal
259- * with unexpected API breaking changes as Highland.js evolves.
260- *
261- * @return {ReadableStream } A Node.js Stream2
262- */
263- stream ( script , bindings , message ) {
264- const messageStream = this . messageStream ( script , bindings , message ) ;
265- const _ = highland ; // override lo-dash locally
266-
267- // Create a local highland 'through' pipeline so we don't expose
268- // a Highland stream to the end user, but a standard Node.js Stream2
269- const through = _ . pipeline (
270- _ . map ( ( { result : { data } } ) => data ) ,
271- _ . sequence ( ) ,
272- ) ;
273-
274- let rawStream = messageStream . pipe ( through ) ;
275-
276- messageStream . on ( 'error' , e => {
277- rawStream . emit ( 'error' , new Error ( e ) ) ;
278- } ) ;
279-
280- return rawStream ;
281- }
282-
283- /**
284- * Execute the script and return a stream of raw messages returned by Gremlin
285- * Server.
286- * This method does not reemit one distinct data event per result. It directly
287- * emits the raw messages returned by Gremlin Server as they are received.
288- *
289- * Although public, this is a low level method intended to be used for
290- * advanced usages.
291- *
292- * @public
293- * @param {String|Function } script
294- * @param {Object } bindings
295- * @param {Object } message
296- * @return {MessageStream }
297- */
298- messageStream ( script , bindings , rawMessage ) {
299- let stream = new MessageStream ( { objectMode : true } ) ;
300-
301- const command = {
302- message : this . buildMessage ( script , bindings , rawMessage ) ,
303- messageStream : stream ,
304- } ;
305-
306- this . commands$ . onNext ( command ) ;
307-
308- return stream ;
309- }
310-
311250 observable ( script , bindings , rawMessage ) {
312251 const command = {
313252 message : this . buildMessage ( script , bindings , rawMessage ) ,
314253 }
315254
255+ // This actually sends the command to Gremlin Server
316256 this . commands$ . onNext ( command ) ;
317257
258+ // Create a new Observable of of incoming messages, but filter only
259+ // incoming messages to the command we just send.
318260 const commandMessages$ = this . incomingMessages$
319261 . filter ( ( { requestId } ) => requestId === command . message . requestId ) ;
320262
263+ // Off of these messages, create new Observables for each message code
264+ // TODO: this could be a custom operator.
321265 const successMessage$ = commandMessages$
322266 . filter ( hasCode ( 200 ) )
323267 const continuationMessages$ = commandMessages$
@@ -332,6 +276,8 @@ class GremlinClient extends EventEmitter {
332276 return message ;
333277 } ) ;
334278
279+ // That Observable will ultimately emit a single object which indicates
280+ // that we should not expect any other messages;
335281 const terminationMessages$ = Rx . Observable . merge (
336282 successMessage$ , noContentMessage$
337283 ) ;
0 commit comments