@@ -8,9 +8,10 @@ import highland from 'highland';
88
99import WebSocketGremlinConnection from './WebSocketGremlinConnection' ;
1010import MessageStream from './MessageStream' ;
11- import executeHandler from './executeHandler' ;
1211import * as Utils from './utils' ;
1312
13+ import Rx from 'rx' ;
14+
1415
1516class GremlinClient extends EventEmitter {
1617 constructor ( port = 8182 , host = 'localhost' , options = { } ) {
@@ -27,7 +28,6 @@ class GremlinClient extends EventEmitter {
2728 op : 'eval' ,
2829 processor : '' ,
2930 accept : 'application/json' ,
30- executeHandler,
3131 ...options ,
3232 path : path . length && ! path . startsWith ( '/' ) ? `/${ path } ` : path
3333 }
@@ -43,78 +43,73 @@ class GremlinClient extends EventEmitter {
4343
4444 this . commands = { } ;
4545
46- this . connection = this . createConnection ( {
46+ const connection = this . createConnection ( {
4747 port,
4848 host,
4949 path : this . options . path
5050 } ) ;
51+
52+ this . commands$ = new Rx . Subject ( ) ;
53+ this . commands$ . subscribe ( ( command ) => {
54+ const { message : { requestId } } = command ;
55+ this . commands [ requestId ] = command
56+ } ) ;
57+
58+ this . registerConnection ( connection ) ;
5159 }
5260
5361 createConnection ( { port, host, path } ) {
54- const connection = new WebSocketGremlinConnection ( { port, host, path } ) ;
62+ return new WebSocketGremlinConnection ( { port, host, path } ) ;
63+ }
64+
65+ registerConnection ( connection ) {
66+ this . connection = connection ;
67+
68+ const open$ = Rx . Observable . fromEvent ( connection , 'open' ) ;
69+ const error$ = Rx . Observable . fromEvent ( connection , 'error' ) ;
70+ const incomingMessages$ = Rx . Observable . fromEvent ( connection , 'message' )
71+ . map ( ( { data } ) => {
72+ const buffer = new Buffer ( data , 'binary' ) ;
73+ const rawMessage = JSON . parse ( buffer . toString ( 'utf-8' ) ) ;
74+
75+ return rawMessage ;
76+ } ) ;
77+ const close$ = Rx . Observable . fromEvent ( connection , 'close' ) ;
78+
79+ const canSend$ = Rx . Observable . merge (
80+ open$ . map ( true ) ,
81+ error$ . map ( false ) ,
82+ close$ . map ( false )
83+ )
84+
85+ open$ . subscribe ( ( connection ) => this . onConnectionOpen ( ) ) ;
86+ error$ . subscribe ( ( error ) => this . handleError ( error ) ) ;
87+
5588
56- connection . on ( 'open' , ( ) => this . onConnectionOpen ( ) ) ;
57- connection . on ( 'error' , ( error ) => this . handleError ( error ) ) ;
58- connection . on ( 'message' , ( message ) => this . handleProtocolMessage ( message ) ) ;
59- connection . on ( 'close' , ( event ) => this . handleDisconnection ( event ) )
89+ this . incomingMessages$ = incomingMessages$ ;
6090
61- return connection ;
91+ close$ . subscribe ( ( event ) => this . handleDisconnection ( event ) ) ;
92+
93+ const outgoingMessages$ = this . commands$
94+ . map ( ( { message } ) => message )
95+ . pausableBuffered ( canSend$ ) ;
96+
97+ outgoingMessages$
98+ . subscribe ( ( message ) => this . sendMessage ( message ) ) ;
6299 }
63100
64101 handleError ( err ) {
65102 this . connected = false ;
66103 this . emit ( 'error' , err ) ;
67104 }
68105
69- /**
70- * Process all incoming raw message events sent by Gremlin Server, and dispatch
71- * to the appropriate command.
72- *
73- * @param {MessageEvent } event
74- */
75- handleProtocolMessage ( message ) {
76- const { data } = message ;
77- const buffer = new Buffer ( data , 'binary' ) ;
78- const rawMessage = JSON . parse ( buffer . toString ( 'utf-8' ) ) ;
79- const {
80- requestId,
81- status : {
82- code : statusCode ,
83- message : statusMessage
84- }
85- } = rawMessage ;
86-
87- const { messageStream } = this . commands [ requestId ] ;
88-
89- switch ( statusCode ) {
90- case 200 : // SUCCESS
91- delete this . commands [ requestId ] ; // TODO: optimize performance
92- messageStream . push ( rawMessage ) ;
93- messageStream . push ( null ) ;
94- break ;
95- case 204 : // NO_CONTENT
96- delete this . commands [ requestId ] ;
97- messageStream . push ( null ) ;
98- break ;
99- case 206 : // PARTIAL_CONTENT
100- messageStream . push ( rawMessage ) ;
101- break ;
102- default :
103- delete this . commands [ requestId ] ;
104- messageStream . emit ( 'error' , new Error ( statusMessage + ' (Error ' + statusCode + ')' ) ) ;
105- break ;
106- }
107- }
108-
109106 /**
110107 * Handle the WebSocket onOpen event, flag the client as connected and
111108 * process command queue.
112109 */
113110 onConnectionOpen ( ) {
114111 this . connected = true ;
115112 this . emit ( 'connect' ) ;
116-
117- this . executeQueue ( ) ;
118113 } ;
119114
120115 /**
@@ -127,17 +122,6 @@ class GremlinClient extends EventEmitter {
127122 } ) ;
128123 } ;
129124
130- /**
131- * Process the current command queue, sending commands to Gremlin Server
132- * (First In, First Out).
133- */
134- executeQueue ( ) {
135- while ( this . queue . length > 0 ) {
136- let { message } = this . queue . shift ( ) ;
137- this . sendMessage ( message ) ;
138- }
139- } ;
140-
141125 /**
142126 * @param {Object } reason
143127 */
@@ -228,14 +212,13 @@ class GremlinClient extends EventEmitter {
228212 message = { } ;
229213 }
230214
231- const messageStream = this . messageStream ( script , bindings , message ) ;
232-
233- // TO CHECK: errors handling could be improved
234- // See https://groups.google.com/d/msg/nodejs/lJYT9hZxFu0/L59CFbqWGyYJ
235- // for an example using domains
236- const { executeHandler } = this . options ;
237-
238- executeHandler ( messageStream , callback ) ;
215+ this . observable ( script , bindings , message )
216+ . flatMap ( ( { result : { data } } ) => data )
217+ . toArray ( )
218+ . subscribe (
219+ ( results ) => callback ( null , results ) ,
220+ ( err ) => callback ( err )
221+ )
239222 }
240223
241224 /**
@@ -294,33 +277,57 @@ class GremlinClient extends EventEmitter {
294277 messageStream : stream
295278 } ;
296279
297- this . sendCommand ( command ) ; //todo improve for streams
280+ this . commands$ . onNext ( command ) ;
298281
299282 return stream ;
300283 } ;
301284
302- /**
303- * Send a command to Gremlin Server, or add it to queue if the connection
304- * is not established.
305- *
306- * @param {Object } command
307- */
308- sendCommand ( command ) {
309- const {
310- message,
311- message : {
312- requestId
313- }
314- } = command ;
315-
316- this . commands [ requestId ] = command ;
317-
318- if ( this . connected ) {
319- this . sendMessage ( message ) ;
320- } else {
321- this . queue . push ( command ) ;
285+ observable ( script , bindings , rawMessage ) {
286+ const command = {
287+ message : this . buildMessage ( script , bindings , rawMessage ) ,
322288 }
323- } ;
289+
290+ this . commands$ . onNext ( command ) ;
291+
292+ const commandMessages$ = this . incomingMessages$
293+ . filter ( ( { requestId } ) => requestId === command . message . requestId ) ;
294+
295+ const successMessage$ = commandMessages$
296+ . filter ( ( { status : { code } } ) => code === 200 ) ;
297+
298+ const continuationMessages$ = commandMessages$
299+ . filter ( ( { status : { code } } ) => code === 206 ) ;
300+
301+ const noContentMessage$ = commandMessages$
302+ . filter ( ( { status : { code } } ) => code === 204 )
303+ // Rewrite these in order to ensure the callback is always fired with an
304+ // Empty Array rather than a null value.
305+ // Mutating is perfectly fine here.
306+ . map ( ( message ) => {
307+ message . result . data = [ ]
308+ return message ;
309+ } ) ;
310+
311+ const terminationMessages$ = Rx . Observable . merge (
312+ successMessage$ , noContentMessage$
313+ ) ;
314+
315+ const errorMessages$ = commandMessages$
316+ . filter ( ( { status : { code } } ) => [ 200 , 204 , 206 ] . indexOf ( code ) === - 1 )
317+ . flatMap ( ( { status : { code, message } } ) => {
318+ return Rx . Observable . throw ( new Error ( message + ' (Error ' + code + ')' ) )
319+ } ) ;
320+
321+ const results$ = Rx . Observable . merge (
322+ successMessage$ ,
323+ continuationMessages$ ,
324+ noContentMessage$ ,
325+ errorMessages$
326+ )
327+ . takeUntil ( terminationMessages$ ) ;
328+
329+ return results$ ;
330+ }
324331}
325332
326333export default GremlinClient ;
0 commit comments