diff --git a/lib/Client.js b/lib/Client.js index 3f367cc..547083a 100644 --- a/lib/Client.js +++ b/lib/Client.js @@ -260,6 +260,11 @@ Client.prototype.setClientInfo = function setClientInfo(key, val) { } }; +Client.prototype.isValid = function isValid(timeout, cb) { + // Timeout is inputted in seconds, convert to milliseconds + this._connection.isValid(timeout * 1000, cb); +} + Client.prototype._execute = function _execute(command, options, cb) { var result = this._createResult(this._connection, options); this._connection.executeDirect({ diff --git a/lib/protocol/Connection.js b/lib/protocol/Connection.js index 1b998e1..2cfd87e 100644 --- a/lib/protocol/Connection.js +++ b/lib/protocol/Connection.js @@ -250,6 +250,13 @@ Connection.prototype._addListeners = function _addListeners(socket) { socket.removeListener('close', onclose); } + function clearStateTimeout() { + if (self._state.timeoutObject) { + clearTimeout(self._state.timeoutObject); + self._state.timeoutObject = undefined; + } + } + // register listerners on socket function ondata(chunk) { packet.push(chunk); @@ -258,17 +265,24 @@ Connection.prototype._addListeners = function _addListeners(socket) { self._state.sessionId = packet.header.sessionId; self._state.packetCount = -1; } - var buffer = packet.getData(); - packet.clear(); - var cb = self._state.receive; - self._state.receive = undefined; - self._state.messageType = undefined; - self.receive(buffer, cb); + // Ensure the packet corresponds to the current request + if (self._state.packetCount === -1 || self._state.packetCount === packet.header.packetCount) { + var buffer = packet.getData(); + packet.clear(); + var cb = self._state.receive; + self._state.receive = undefined; + self._state.messageType = undefined; + clearStateTimeout(); + self.receive(buffer, cb); + } else { + packet.clear(); + } } } socket.on('data', ondata); function onerror(err) { + clearStateTimeout(); var cb = self._state && self._state.receive; if (cb) { self._state.receive = null; // a callback should be called only once @@ -313,7 +327,7 @@ Connection.prototype._clearQueue = function _clearQueue(err) { } }; -Connection.prototype.send = function send(message, receive) { +Connection.prototype.send = function send(message, options, receive) { if (this._statementContext) { message.unshift(PartKind.STATEMENT_CONTEXT, this._statementContext.getOptions()); } @@ -354,6 +368,14 @@ Connection.prototype.send = function send(message, receive) { if (this._socket) { this._socket.write(packet); } + var self = this; + function onTimeout() { + self._state.receive = undefined; + receive(new Error('Socket receive timeout (receive took longer than ' + options.communicationTimeout + ' ms)')); + } + if (options.communicationTimeout) { + state.timeoutObject = setTimeout(onTimeout, options.communicationTimeout); + } }; @@ -422,7 +444,7 @@ Connection.prototype.receive = function receive(buffer, cb) { } }; -Connection.prototype.enqueue = function enqueue(task, cb) { +Connection.prototype.enqueue = function enqueue(task, options, cb) { var queueable; if (!this._socket || !this._queue || this.readyState === 'closed') { @@ -439,7 +461,7 @@ Connection.prototype.enqueue = function enqueue(task, cb) { queueable.name = task.name; } else if (util.isObject(task)) { if (task instanceof request.Segment) { - queueable = this._queue.createTask(this.send.bind(this, task), cb); + queueable = this._queue.createTask(this.send.bind(this, task, options), cb); queueable.name = MessageTypeName[task.type]; } else if (util.isFunction(task.run)) { queueable = task; @@ -574,7 +596,7 @@ Connection.prototype.connect = function connect(options, cb) { clientId: self.clientId, connectOptions: self.connectOptions.getOptions(), useCesu8: self.useCesu8 - }), connReceive); + }), options, connReceive); }); } @@ -588,7 +610,7 @@ Connection.prototype.connect = function connect(options, cb) { authOptions.dbConnectInfo = true; } - this.send(request.authenticate(authOptions), authReceive); + this.send(request.authenticate(authOptions), options, authReceive); }; Connection.prototype.disconnect = function disconnect(cb) { @@ -600,7 +622,7 @@ Connection.prototype.disconnect = function disconnect(cb) { } function enqueueDisconnect() { - self.enqueue(request.disconnect(), done); + self.enqueue(request.disconnect(), {}, done); } if (this.isIdle()) { @@ -616,7 +638,7 @@ Connection.prototype.executeDirect = function executeDirect(options, cb) { scrollableCursor: this.scrollableCursor, useCesu8: this.useCesu8 }, options); - this.enqueue(request.executeDirect(options), cb); + this.enqueue(request.executeDirect(options), options, cb); }; Connection.prototype.prepare = function prepare(options, cb) { @@ -625,7 +647,7 @@ Connection.prototype.prepare = function prepare(options, cb) { scrollableCursor: this.scrollableCursor, useCesu8: this.useCesu8 }, options); - this.enqueue(request.prepare(options), cb); + this.enqueue(request.prepare(options), options, cb); }; Connection.prototype.readLob = function readLob(options, cb) { @@ -635,7 +657,7 @@ Connection.prototype.readLob = function readLob(options, cb) { }; } options.autoCommit = this.autoCommit; - this.enqueue(request.readLob(options), cb); + this.enqueue(request.readLob(options), options, cb); }; Connection.prototype.execute = function execute(options, cb) { @@ -646,7 +668,7 @@ Connection.prototype.execute = function execute(options, cb) { parameters: EMPTY_BUFFER }, options); if (options.parameters === EMPTY_BUFFER) { - return this.enqueue(request.execute(options), cb); + return this.enqueue(request.execute(options), options, cb); } this.enqueue(createExecuteTask(this, options, cb)); }; @@ -654,16 +676,16 @@ Connection.prototype.execute = function execute(options, cb) { Connection.prototype.fetchNext = function fetchNext(options, cb) { options.autoCommit = this.autoCommit; options.useCesu8 = this.useCesu8; - this.enqueue(request.fetchNext(options), cb); + this.enqueue(request.fetchNext(options), options, cb); }; Connection.prototype.closeResultSet = function closeResultSet(options, cb) { - this.enqueue(request.closeResultSet(options), cb); + this.enqueue(request.closeResultSet(options), options, cb); }; Connection.prototype.dropStatement = function dropStatement(options, cb) { options.useCesu8 = this.useCesu8; - this.enqueue(request.dropStatementId(options), cb); + this.enqueue(request.dropStatementId(options), options, cb); }; Connection.prototype.commit = function commit(options, cb) { @@ -671,7 +693,7 @@ Connection.prototype.commit = function commit(options, cb) { cb = options; options = {}; } - this.enqueue(request.commit(options), cb); + this.enqueue(request.commit(options), options, cb); }; Connection.prototype.rollback = function rollback(options, cb) { @@ -679,7 +701,7 @@ Connection.prototype.rollback = function rollback(options, cb) { cb = options; options = {}; } - this.enqueue(request.rollback(options), cb); + this.enqueue(request.rollback(options), options, cb); }; // The function doesn't use the queue. It's used before the queue starts running @@ -689,7 +711,7 @@ Connection.prototype.fetchDbConnectInfo = function (options, cb) { err.code = 'EHDBCLOSE'; return cb(err) } - this.send(request.dbConnectInfo(options), function(err, reply) { + this.send(request.dbConnectInfo(options), options, function(err, reply) { if (err) { return cb(err); } @@ -728,6 +750,21 @@ Connection.prototype.destroy = function destroy(err) { } }; +Connection.prototype.isValid = function isValid(timeout, cb) { + var options = { + command: 'SELECT 1 FROM DUMMY WHERE 1 = 0', + communicationTimeout: timeout > 0 ? timeout : undefined, + } + this.executeDirect(options, function (err, reply) { + if (err) { + // Currently, isValid will swallow all errors and indicate invalid + cb(null, false); + } else { + cb(null, true); + } + }); +} + Connection.prototype.isIdle = function isIdle() { return this._queue.empty && !this._queue.busy; }; @@ -759,6 +796,7 @@ function ConnectionState() { this.packetCount = -1; this.messageType = undefined; this.receive = undefined; + this.timeoutObject = undefined; } function Version(major, minor) { diff --git a/lib/protocol/ExecuteTask.js b/lib/protocol/ExecuteTask.js index 4df380e..163272f 100644 --- a/lib/protocol/ExecuteTask.js +++ b/lib/protocol/ExecuteTask.js @@ -203,14 +203,15 @@ ExecuteTask.prototype.sendExecute = function sendExecute(cb) { if (err) { return cb(err); } - self.connection.send(request.execute({ + var options = { autoCommit: self.autoCommit, holdCursorsOverCommit: self.holdCursorsOverCommit, scrollableCursor: self.scrollableCursor, statementId: self.statementId, parameters: parameters, useCesu8: self.connection.useCesu8 - }), cb); + }; + self.connection.send(request.execute(options), options, cb); }); }; @@ -221,24 +222,27 @@ ExecuteTask.prototype.sendWriteLobRequest = function sendWriteLobRequest(cb) { if (err) { return cb(err); } - self.connection.send(request.writeLob({ + var options = { writeLobRequest: buffer - }), cb); + }; + self.connection.send(request.writeLob(options), options, cb); }); }; ExecuteTask.prototype.sendCommit = function sendCommit(cb) { var self = this; - self.connection.send(request.commit({ + var options = { holdCursorsOverCommit: self.holdCursorsOverCommit - }), cb); + }; + self.connection.send(request.commit(options), options, cb); }; ExecuteTask.prototype.sendRollback = function sendRollback(cb) { var self = this; - self.connection.send(request.rollback({ + var options = { holdCursorsOverCommit: self.holdCursorsOverCommit - }), cb); + }; + self.connection.send(request.rollback(options), options, cb); }; function createInvalidFunctionCodeError() { diff --git a/test/acceptance/db.Events.js b/test/acceptance/db.Events.js index 03650e5..6960b5c 100644 --- a/test/acceptance/db.Events.js +++ b/test/acceptance/db.Events.js @@ -14,6 +14,8 @@ 'use strict'; var db = require('../db')(); +var RemoteDB = require('../db/RemoteDB'); +var describeRemoteDB = db instanceof RemoteDB ? describe : describe.skip; describe('db', function () { @@ -59,4 +61,52 @@ describe('db', function () { }); }); + describeRemoteDB('isValid', function () { + before(db.init.bind(db)); + after(function (done) { + if (client.readyState !== 'closed') { + db.end(done); + } else { + done(); + } + }); + var client = db.client; + + it('should be valid when connected', function (done) { + client.isValid(0, function (err, ret) { // no timeout + if (err) done(err); + ret.should.be.true(); + done(); + }) + }); + + it('should be valid with timeout when connected', function (done) { + client.isValid(1, function (err, ret) { // 1 second timeout + if (err) done(err); + ret.should.be.true(); + done(); + }); + }); + + it('should be invalid when disconnected', function (done) { + client.exec('SELECT CURRENT_CONNECTION FROM DUMMY', function (err, res) { + var connId = res[0].CURRENT_CONNECTION; + var adminDB = require('../db')(); + adminDB.init(function (err) { + if (err) done(err); + var adminClient = adminDB.client; + var disconnectSQL = "ALTER SYSTEM DISCONNECT SESSION '" + connId + "'"; + adminClient.exec(disconnectSQL, function (err) { + if (err) done(err); + client.isValid(0, function (err, ret) { // disconnected + if (err) done(err); + ret.should.be.false(); + adminDB.end(done); + }); + }); + }); + }); + }); + }); + }); diff --git a/test/hdb.Client.js b/test/hdb.Client.js index 705388c..8d6a88e 100644 --- a/test/hdb.Client.js +++ b/test/hdb.Client.js @@ -173,7 +173,7 @@ describe('hdb', function () { client._connection.getClientInfo().shouldSend(lib.common.MessageType.EXECUTE).should.eql(true); client._connection.getClientInfo().getProperty("SESSVAR1").should.equal("TESTVAR1"); client._connection.getClientInfo().getProperty("SESSVAR2").should.equal("TESTVAR2"); - client._connection.send(new lib.request.Segment(lib.common.MessageType.EXECUTE), null); + client._connection.send(new lib.request.Segment(lib.common.MessageType.EXECUTE), {}, null); client._connection.getClientInfo().shouldSend(lib.common.MessageType.EXECUTE).should.eql(false); client._connection.getClientInfo().getProperty("SESSVAR1").should.equal("TESTVAR1"); client._connection.getClientInfo().getProperty("SESSVAR2").should.equal("TESTVAR2"); @@ -188,14 +188,14 @@ describe('hdb', function () { client.setClientInfo("VARKEY1", "VARVAL1"); client._connection.getClientInfo().shouldSend(lib.common.MessageType.EXECUTE).should.eql(true); client._connection.getClientInfo().getProperty("VARKEY1").should.equal("VARVAL1"); - client._connection.send(new lib.request.Segment(lib.common.MessageType.EXECUTE), null); + client._connection.send(new lib.request.Segment(lib.common.MessageType.EXECUTE), {}, null); client._connection.getClientInfo().shouldSend(lib.common.MessageType.EXECUTE).should.eql(false); client.setClientInfo("VARKEY2", "VARVAL2"); client._connection.getClientInfo().shouldSend(lib.common.MessageType.EXECUTE).should.eql(true); client._connection.getClientInfo().getProperty("VARKEY1").should.equal("VARVAL1"); client._connection.getClientInfo().getProperty("VARKEY2").should.equal("VARVAL2"); - client._connection.send(new lib.request.Segment(lib.common.MessageType.EXECUTE), null); + client._connection.send(new lib.request.Segment(lib.common.MessageType.EXECUTE), {}, null); client._connection.getClientInfo().shouldSend(lib.common.MessageType.EXECUTE).should.eql(false); done(); }); @@ -211,7 +211,7 @@ describe('hdb', function () { } var sendCount = 0; - var mock_send = function(data, cb) { + var mock_send = function(data, options, cb) { sendCount += 1; if (sendCount === 1) { data.parts.forEach(function(part) { @@ -263,7 +263,7 @@ describe('hdb', function () { } var sendCount = 0; - var mock_send = function(data, cb) { + var mock_send = function(data, options, cb) { sendCount += 1; if (sendCount === 1) { data.parts.forEach(function(part) { @@ -313,7 +313,7 @@ describe('hdb', function () { } var sendCount = 0; - var mock_send = function(data, cb) { + var mock_send = function(data, options, cb) { sendCount += 1; if (sendCount === 1) { cb(undefined, mock_auth_reply); @@ -1104,7 +1104,7 @@ describe('hdb', function () { { name: 3, type: 3, value: 30041 } ] }; - var mock_send = function (data, cb) { + var mock_send = function (data, options, cb) { ++sendCount; if (sendCount == 1) { cb(new Error(), reply1); @@ -1224,7 +1224,7 @@ describe('hdb', function () { connectOptions: [] }; - var mock_send = function (data, cb) { + var mock_send = function (data, options, cb) { ++sendCount; if (sendCount == 1) { cb(undefined, reply1); diff --git a/test/lib.Connection.js b/test/lib.Connection.js index 3754d56..81ae210 100644 --- a/test/lib.Connection.js +++ b/test/lib.Connection.js @@ -46,7 +46,7 @@ function getAuthenticationPart(req) { }).shift().args; } -function sendAuthenticationRequest(req, done) { +function sendAuthenticationRequest(req, options, done) { var reply = { authentication: getAuthenticationPart(req) }; @@ -77,7 +77,7 @@ describe('Lib', function () { connection.open({}, function () { connection.getClientInfo().setProperty('LOCALE', 'en_US'); connection.getClientInfo().shouldSend(MessageType.EXECUTE).should.eql(true); - connection.send(new lib.request.Segment(MessageType.EXECUTE), null); + connection.send(new lib.request.Segment(MessageType.EXECUTE), {}, null); connection.getClientInfo().shouldSend(MessageType.EXECUTE).should.eql(false); connection.getClientInfo().getProperty('LOCALE').should.eql('en_US'); done(); @@ -221,7 +221,7 @@ describe('Lib', function () { it('should destroy socket after disconnect', function (done) { var connection = createConnection(); - connection.enqueue = function enqueue(msg, cb) { + connection.enqueue = function enqueue(msg, options, cb) { msg.type.should.equal(MessageType.DISCONNECT); setImmediate(function () { cb(); @@ -405,7 +405,8 @@ describe('Lib', function () { it('should report error in enqueue when connection is invalid', function (done) { var connection = createConnection(); connection._queue.pause(); - connection.enqueue(function firstTask() { }, function (err) { + var options = {}; + connection.enqueue(function firstTask() { }, options, function (err) { err.code.should.equal('EHDBCLOSE'); done(); }); @@ -413,7 +414,7 @@ describe('Lib', function () { it('should rollback a transaction', function () { var connection = createConnection(); - connection.enqueue = function enqueue(msg, done) { + connection.enqueue = function enqueue(msg, options, done) { done(msg); }; @@ -426,7 +427,7 @@ describe('Lib', function () { it('should commit a transaction', function () { var connection = createConnection(); - connection.enqueue = function enqueue(msg, done) { + connection.enqueue = function enqueue(msg, options, done) { done(msg); }; @@ -439,7 +440,7 @@ describe('Lib', function () { it('should execute a statement', function () { var connection = createConnection(); - connection.enqueue = function enqueue(msg, done) { + connection.enqueue = function enqueue(msg, options, done) { done(msg); }; @@ -457,7 +458,7 @@ describe('Lib', function () { connection._socket = { readyState: 'open' }; - connection.send = function (msg, cb) { + connection.send = function (msg, options, cb) { var segment = new lib.reply.Segment(segmentData.kind, segmentData.functionCode); var part = segmentData.parts[0]; segment.push(new lib.reply.Part(part.kind, part.attributes, part.argumentCount, part.buffer)); @@ -481,7 +482,7 @@ describe('Lib', function () { connection._socket = { readyState: 'open' }; - connection.send = function (msg, cb) { + connection.send = function (msg, options, cb) { cb(new Error('Request was not successful')); }; @@ -577,7 +578,7 @@ describe('Lib', function () { it('should fail to disconnect from the database', function (done) { var connection = createConnection(); var error = new Error('DISCONNECT_ERROR'); - connection.enqueue = function enqueue(msg, cb) { + connection.enqueue = function enqueue(msg, options, cb) { msg.type.should.equal(MessageType.DISCONNECT); setImmediate(function () { cb(error); @@ -617,6 +618,59 @@ describe('Lib', function () { }) }); + it('should receive the correct packet after timeout', function (done) { + var connection = createConnection(); + connection._parseReplySegment = function parseReplySegment(reply) { + return reply; + } + + var messageHeader = Buffer.from( + '0000000000000000' + // Session id + '00000000' + // Packet count + '01000000' + // Varpart length + '21000000' + // Varpart size + '0100' + // Number of segments + '00000000000000000000', // Extra options + 'hex'); + // Note that for simplicity the packet body is not valid + var firstReply = Buffer.concat([messageHeader, Buffer.from('01', 'hex')]); + messageHeader[8] = 1; // Update packet count + var secondReply = Buffer.concat([messageHeader, Buffer.from('02', 'hex')]); + + connection.open({}, function (err) { + (!!err).should.be.not.ok; + + // Overwrite MockSocket write + var firstWrite = true; + connection._socket.write = function write() { + if (firstWrite) { + firstWrite = false; + } else { + var self = this; + setImmediate(function () { + self.emit('data', firstReply); + self.emit('data', secondReply); + }); + } + } + + // Mimic connect + connection._queue.resume(); + connection._state.sessionId = 0; + + // 1 ms timeout + connection.isValid(1, function (err, ret) { + (!!err).should.be.not.ok; + ret.should.be.false(); // MockSocket does not write back yet + connection.executeDirect({ command: 'second sql' }, function (err, reply) { + if (err) done(err); + reply.should.eql(Buffer.from('02', 'hex')); + done(); + }); + }); + }); + }); + context('cesu-8 support', function() { it('should create a connection with a useCesu8 set correctly', function () { diff --git a/test/lib.ExecuteTask.js b/test/lib.ExecuteTask.js index c8074ad..e59bc6d 100644 --- a/test/lib.ExecuteTask.js +++ b/test/lib.ExecuteTask.js @@ -647,7 +647,7 @@ function Connection(size, sizeForLobs, replies) { this.useCesu8 = true; } -Connection.prototype.send = function (msg, cb) { +Connection.prototype.send = function (msg, options, cb) { var reply = this.replies.shift(); msg.type.should.equal(reply.type); if (typeof reply.checkMessage === 'function') {