@@ -17,6 +17,22 @@ const hasCode = (filterCode) => ({ status: { code } }) => code === filterCode;
1717
1818const isErrorMessage = ( { status : { code } } ) => [ 200 , 204 , 206 ] . indexOf ( code ) === - 1 ;
1919
20+ const serializeToBinary = ( message , accept ) => {
21+ let serializedMessage = accept + JSON . stringify ( message ) ;
22+ serializedMessage = unescape ( encodeURIComponent ( serializedMessage ) ) ;
23+
24+ // Let's start packing the message into binary
25+ // mimeLength(1) + mimeType Length + serializedMessage Length
26+ let binaryMessage = new Uint8Array ( 1 + serializedMessage . length ) ;
27+ binaryMessage [ 0 ] = accept . length ;
28+
29+ for ( let i = 0 ; i < serializedMessage . length ; i ++ ) {
30+ binaryMessage [ i + 1 ] = serializedMessage . charCodeAt ( i ) ;
31+ }
32+
33+ return binaryMessage ;
34+ }
35+
2036class GremlinClient extends EventEmitter {
2137 constructor ( port = 8182 , host = 'localhost' , options = { } ) {
2238 super ( ) ;
@@ -47,38 +63,37 @@ class GremlinClient extends EventEmitter {
4763
4864 this . commands = { } ;
4965
50- const connection = this . createConnection ( {
51- port,
52- host,
53- path : this . options . path
54- } ) ;
5566
5667 this . commands$ = new Rx . Subject ( ) ;
5768 this . commands$ . subscribe ( ( command ) => {
5869 const { message : { requestId } } = command ;
5970 this . commands [ requestId ] = command
6071 } ) ;
6172
62- this . registerConnection ( connection ) ;
63- }
73+ const connection = this . createConnection ( {
74+ port,
75+ host,
76+ path : this . options . path
77+ } ) ;
6478
65- createConnection ( { port, host, path } ) {
66- return new WebSocketGremlinConnection ( { port, host, path } ) ;
67- }
79+ const connections$ = Rx . Observable . create ( ( observer ) => observer . next ( connection ) ) ;
6880
69- registerConnection ( connection ) {
70- this . connection = connection ;
81+ const open$ = connections$
82+ . flatMap ( ( connection ) => Rx . Observable . fromEvent ( connection , 'open' ) ) ;
7183
72- const open$ = Rx . Observable . fromEvent ( connection , 'open' ) ;
73- const error$ = Rx . Observable . fromEvent ( connection , 'error' ) ;
74- const incomingMessages$ = Rx . Observable . fromEvent ( connection , 'message' )
84+ const error$ = connections$
85+ . flatMap ( ( connection ) => Rx . Observable . fromEvent ( connection , 'error' ) ) ;
86+
87+ const incomingMessages$ = connections$
88+ . flatMap ( ( connection ) => Rx . Observable . fromEvent ( connection , 'message' ) )
7589 . map ( ( { data } ) => {
7690 const buffer = new Buffer ( data , 'binary' ) ;
7791 const rawMessage = JSON . parse ( buffer . toString ( 'utf-8' ) ) ;
7892
7993 return rawMessage ;
8094 } ) ;
81- const close$ = Rx . Observable . fromEvent ( connection , 'close' ) ;
95+ const close$ = connections$
96+ . flatMap ( ( connection ) => Rx . Observable . fromEvent ( connection , 'close' ) ) ;
8297
8398 const canSend$ = Rx . Observable . merge (
8499 open$ . map ( true ) ,
@@ -95,11 +110,18 @@ class GremlinClient extends EventEmitter {
95110 close$ . subscribe ( ( event ) => this . handleDisconnection ( event ) ) ;
96111
97112 const outgoingMessages$ = this . commands$
98- . map ( ( { message } ) => message )
99- . pausableBuffered ( canSend$ ) ;
113+ . map ( ( { message } ) => serializeToBinary ( message , this . options . accept ) )
114+ . pausableBuffered ( canSend$ )
115+ . combineLatest ( connections$ ) ;
100116
101117 outgoingMessages$
102- . subscribe ( ( message ) => this . sendMessage ( message ) ) ;
118+ . subscribe ( ( [ binaryMessage , connection ] ) =>
119+ connection . sendMessage ( binaryMessage )
120+ ) ;
121+ }
122+
123+ createConnection ( { port, host, path } ) {
124+ return new WebSocketGremlinConnection ( { port, host, path } ) ;
103125 }
104126
105127 handleError ( err ) {
@@ -177,22 +199,6 @@ class GremlinClient extends EventEmitter {
177199 return message ;
178200 } ;
179201
180- sendMessage ( message ) {
181- let serializedMessage = this . options . accept + JSON . stringify ( message ) ;
182- serializedMessage = unescape ( encodeURIComponent ( serializedMessage ) ) ;
183-
184- // Let's start packing the message into binary
185- // mimeLength(1) + mimeType Length + serializedMessage Length
186- let binaryMessage = new Uint8Array ( 1 + serializedMessage . length ) ;
187- binaryMessage [ 0 ] = this . options . accept . length ;
188-
189- for ( let i = 0 ; i < serializedMessage . length ; i ++ ) {
190- binaryMessage [ i + 1 ] = serializedMessage . charCodeAt ( i ) ;
191- }
192-
193- this . connection . sendMessage ( binaryMessage ) ;
194- } ;
195-
196202 /**
197203 * Asynchronously send a script to Gremlin Server for execution and fire
198204 * the provided callback when all results have been fetched.
0 commit comments