@@ -8,9 +8,10 @@ import highland from 'highland';
88
99import WebSocketGremlinConnection from './WebSocketGremlinConnection' ;
1010import MessageStream from './MessageStream' ;
11- import executeHandler from './executeHandler' ;
1211import * as Utils from './utils' ;
1312
13+ import Rx from 'rx' ;
14+
1415
1516class GremlinClient extends EventEmitter {
1617 constructor ( port = 8182 , host = 'localhost' , options = { } ) {
@@ -27,7 +28,6 @@ class GremlinClient extends EventEmitter {
2728 op : 'eval' ,
2829 processor : '' ,
2930 accept : 'application/json' ,
30- executeHandler,
3131 ...options ,
3232 path : path . length && ! path . startsWith ( '/' ) ? `/${ path } ` : path
3333 }
@@ -43,22 +43,59 @@ class GremlinClient extends EventEmitter {
4343
4444 this . commands = { } ;
4545
46- this . connection = this . createConnection ( {
46+ const connection = this . createConnection ( {
4747 port,
4848 host,
4949 path : this . options . path
5050 } ) ;
51+
52+ this . commands$ = new Rx . Subject ( ) ;
53+ this . commands$ . subscribe ( ( command ) => {
54+ const { message : { requestId } } = command ;
55+ this . commands [ requestId ] = command
56+ } ) ;
57+
58+ this . registerConnection ( connection ) ;
5159 }
5260
5361 createConnection ( { port, host, path } ) {
54- const connection = new WebSocketGremlinConnection ( { port, host, path } ) ;
62+ return new WebSocketGremlinConnection ( { port, host, path } ) ;
63+ }
64+
65+ registerConnection ( connection ) {
66+ this . connection = connection ;
67+
68+ const open$ = Rx . Observable . fromEvent ( connection , 'open' ) ;
69+ const error$ = Rx . Observable . fromEvent ( connection , 'error' ) ;
70+ const incomingMessages$ = Rx . Observable . fromEvent ( connection , 'message' )
71+ . map ( ( { data } ) => {
72+ const buffer = new Buffer ( data , 'binary' ) ;
73+ const rawMessage = JSON . parse ( buffer . toString ( 'utf-8' ) ) ;
74+
75+ return rawMessage ;
76+ } ) ;
77+ const close$ = Rx . Observable . fromEvent ( connection , 'close' ) ;
78+
79+ const canSend$ = Rx . Observable . merge (
80+ open$ . map ( true ) ,
81+ error$ . map ( false ) ,
82+ close$ . map ( false )
83+ )
84+
85+ open$ . subscribe ( ( connection ) => this . onConnectionOpen ( ) ) ;
86+ error$ . subscribe ( ( error ) => this . handleError ( error ) ) ;
5587
56- connection . on ( 'open' , ( ) => this . onConnectionOpen ( ) ) ;
57- connection . on ( 'error' , ( error ) => this . handleError ( error ) ) ;
58- connection . on ( 'message' , ( message ) => this . handleProtocolMessage ( message ) ) ;
59- connection . on ( 'close' , ( event ) => this . handleDisconnection ( event ) )
6088
61- return connection ;
89+ this . incomingMessages$ = incomingMessages$ ;
90+
91+ close$ . subscribe ( ( event ) => this . handleDisconnection ( event ) ) ;
92+
93+ const outgoingMessages$ = this . commands$
94+ . map ( ( { message } ) => message )
95+ . pausableBuffered ( canSend$ ) ;
96+
97+ outgoingMessages$
98+ . subscribe ( ( message ) => this . sendMessage ( message ) ) ;
6299 }
63100
64101 handleError ( err ) {
@@ -73,35 +110,32 @@ class GremlinClient extends EventEmitter {
73110 * @param {MessageEvent } event
74111 */
75112 handleProtocolMessage ( message ) {
76- const { data } = message ;
77- const buffer = new Buffer ( data , 'binary' ) ;
78- const rawMessage = JSON . parse ( buffer . toString ( 'utf-8' ) ) ;
79113 const {
80114 requestId,
81115 status : {
82116 code : statusCode ,
83117 message : statusMessage
84118 }
85- } = rawMessage ;
119+ } = message ;
86120
87- const { messageStream } = this . commands [ requestId ] ;
121+ const { observable } = this . commands [ requestId ] ;
88122
89123 switch ( statusCode ) {
90124 case 200 : // SUCCESS
91125 delete this . commands [ requestId ] ; // TODO: optimize performance
92- messageStream . push ( rawMessage ) ;
93- messageStream . push ( null ) ;
126+ observable . onNext ( message ) ;
127+ observable . push ( null ) ;
94128 break ;
95129 case 204 : // NO_CONTENT
96130 delete this . commands [ requestId ] ;
97- messageStream . push ( null ) ;
131+ observable . push ( null ) ;
98132 break ;
99133 case 206 : // PARTIAL_CONTENT
100- messageStream . push ( rawMessage ) ;
134+ observable . push ( message ) ;
101135 break ;
102136 default :
103137 delete this . commands [ requestId ] ;
104- messageStream . emit ( 'error' , new Error ( statusMessage + ' (Error ' + statusCode + ')' ) ) ;
138+ observable . emit ( 'error' , new Error ( statusMessage + ' (Error ' + statusCode + ')' ) ) ;
105139 break ;
106140 }
107141 }
@@ -113,8 +147,6 @@ class GremlinClient extends EventEmitter {
113147 onConnectionOpen ( ) {
114148 this . connected = true ;
115149 this . emit ( 'connect' ) ;
116-
117- this . executeQueue ( ) ;
118150 } ;
119151
120152 /**
@@ -127,17 +159,6 @@ class GremlinClient extends EventEmitter {
127159 } ) ;
128160 } ;
129161
130- /**
131- * Process the current command queue, sending commands to Gremlin Server
132- * (First In, First Out).
133- */
134- executeQueue ( ) {
135- while ( this . queue . length > 0 ) {
136- let { message } = this . queue . shift ( ) ;
137- this . sendMessage ( message ) ;
138- }
139- } ;
140-
141162 /**
142163 * @param {Object } reason
143164 */
@@ -228,14 +249,13 @@ class GremlinClient extends EventEmitter {
228249 message = { } ;
229250 }
230251
231- const messageStream = this . messageStream ( script , bindings , message ) ;
232-
233- // TO CHECK: errors handling could be improved
234- // See https://groups.google.com/d/msg/nodejs/lJYT9hZxFu0/L59CFbqWGyYJ
235- // for an example using domains
236- const { executeHandler } = this . options ;
237-
238- executeHandler ( messageStream , callback ) ;
252+ this . observable ( script , bindings , message )
253+ . flatMap ( ( { result : { data } } ) => data )
254+ . toArray ( )
255+ . subscribe (
256+ ( results ) => callback ( null , results ) ,
257+ ( err ) => callback ( err )
258+ )
239259 }
240260
241261 /**
@@ -294,33 +314,57 @@ class GremlinClient extends EventEmitter {
294314 messageStream : stream
295315 } ;
296316
297- this . sendCommand ( command ) ; //todo improve for streams
317+ this . commands$ . onNext ( command ) ;
298318
299319 return stream ;
300320 } ;
301321
302- /**
303- * Send a command to Gremlin Server, or add it to queue if the connection
304- * is not established.
305- *
306- * @param {Object } command
307- */
308- sendCommand ( command ) {
309- const {
310- message,
311- message : {
312- requestId
313- }
314- } = command ;
322+ observable ( script , bindings , rawMessage ) {
323+ const command = {
324+ message : this . buildMessage ( script , bindings , rawMessage ) ,
325+ }
315326
316- this . commands [ requestId ] = command ;
327+ this . commands$ . onNext ( command ) ;
317328
318- if ( this . connected ) {
319- this . sendMessage ( message ) ;
320- } else {
321- this . queue . push ( command ) ;
322- }
323- } ;
329+ const commandMessages$ = this . incomingMessages$
330+ . filter ( ( { requestId } ) => requestId === command . message . requestId ) ;
331+
332+ const successMessage$ = commandMessages$
333+ . filter ( ( { status : { code } } ) => code === 200 ) ;
334+
335+ const continuationMessages$ = commandMessages$
336+ . filter ( ( { status : { code } } ) => code === 206 ) ;
337+
338+ const noContentMessage$ = commandMessages$
339+ . filter ( ( { status : { code } } ) => code === 204 )
340+ // Rewrite these in order to ensure the callback is always fired with an
341+ // Empty Array rather than a null value.
342+ // Mutating is perfectly fine here.
343+ . map ( ( message ) => {
344+ message . result . data = [ ]
345+ return message ;
346+ } ) ;
347+
348+ const terminationMessages$ = Rx . Observable . merge (
349+ successMessage$ , noContentMessage$
350+ ) ;
351+
352+ const errorMessages$ = commandMessages$
353+ . filter ( ( { status : { code } } ) => [ 200 , 204 , 206 ] . indexOf ( code ) === - 1 )
354+ . flatMap ( ( { status : { code, message } } ) => {
355+ return Rx . Observable . throw ( new Error ( message + ' (Error ' + code + ')' ) )
356+ } ) ;
357+
358+ const results$ = Rx . Observable . merge (
359+ successMessage$ ,
360+ continuationMessages$ ,
361+ noContentMessage$ ,
362+ errorMessages$
363+ )
364+ . takeUntil ( terminationMessages$ ) ;
365+
366+ return results$ ;
367+ }
324368}
325369
326370export default GremlinClient ;
0 commit comments