diff --git a/lib/protocol/Writer.js b/lib/protocol/Writer.js index 28cf501..5e3d989 100644 --- a/lib/protocol/Writer.js +++ b/lib/protocol/Writer.js @@ -132,7 +132,7 @@ Writer.prototype.finializeParameters = function finializeParameters( var stream, header; this._streamErrorListeners = []; - this._lobs.forEach((stream) => { + for (var stream of this._lobs) { if (stream._readableState.errored) { cb(stream._readableState.errored); return; @@ -141,7 +141,7 @@ Writer.prototype.finializeParameters = function finializeParameters( self._streamErrorListeners.push(errorListener); // keep track so it can // be removed later stream.once('error', errorListener); - }); + } function finalize() { /* jshint bitwise:false */ @@ -164,6 +164,7 @@ Writer.prototype.finializeParameters = function finializeParameters( stream.removeListener('error', onerror); stream.removeListener('end', onend); stream.removeListener('readable', onreadable); + stream.removeListener('close', onclose); } function onerror(err) { @@ -173,6 +174,13 @@ Writer.prototype.finializeParameters = function finializeParameters( cb(err); } + function onclose() { + // close events indicate no other events will be emitted, so the data + // was not completely consumed since end was not called + cleanup(); + cb(createDestroyedStreamError()); + } + function onend() { /* jshint validthis:true */ cleanup(); @@ -234,6 +242,8 @@ Writer.prototype.finializeParameters = function finializeParameters( function next() { if (!self._lobs.length || bytesRemainingForLOBs <= 0) { return cb(null); + } else if (self._lobs[0].destroyed) { + return cb(createDestroyedStreamError()); } // set readable stream stream = self._lobs[0]; @@ -248,6 +258,7 @@ Writer.prototype.finializeParameters = function finializeParameters( stream.on('error', onerror); stream.on('end', onend); stream.on('readable', onreadable); + stream.on('close', onclose); onreadable.call(stream); } @@ -276,18 +287,19 @@ Writer.prototype.finalizeWriteLobRequest = function finalizeWriteLobRequest( var self = this; var stream, header; - this._lobs.forEach((stream) => { + for (var stream of this._lobs) { if (stream._errored) { cb(stream._errored); return; } - }); + } function cleanup() { // remove event listeners stream.removeListener('error', onerror); stream.removeListener('end', onend); stream.removeListener('readable', onreadable); + stream.removeListener('close', onclose); } function onerror(err) { @@ -297,6 +309,12 @@ Writer.prototype.finalizeWriteLobRequest = function finalizeWriteLobRequest( cb(err); } + function onclose() { + // close events indicate no other events will be emitted + cleanup(); + cb(createDestroyedStreamError()); + } + function finalize() { /* jshint bitwise:false */ // update lob options in header @@ -375,6 +393,8 @@ Writer.prototype.finalizeWriteLobRequest = function finalizeWriteLobRequest( // no more lobs to write or not enough bytes remaining for next lob if (!self._lobs.length || bytesRemaining <= WRITE_LOB_REQUEST_HEADER_LENGTH) { return cb(null); + } else if (self._lobs[0].destroyed) { + return cb(createDestroyedStreamError()); } // set reabable stream stream = self._lobs[0]; @@ -397,6 +417,7 @@ Writer.prototype.finalizeWriteLobRequest = function finalizeWriteLobRequest( stream.on('error', onerror); stream.on('end', onend); stream.on('readable', onreadable); + stream.on('close', onclose); onreadable.call(stream); } @@ -1117,6 +1138,10 @@ function createReadStreamError() { return new Error('Chunk length larger than remaining bytes'); } +function createDestroyedStreamError() { + return new Error('Stream was destroyed before data could be completely consumed'); +} + function createInvalidLengthError(type) { return new Error(util.format('Invalid length or indicator value for %s type', type)); } @@ -1169,7 +1194,9 @@ util.inherits(LobTransform, Transform); // Wraps a Readable stream with a stream that is not in object mode function LobTransform(source, events, options) { + var self = this; this._source = source; + this._sourceEnded = false; Transform.call(this, options); // Forward all events indicated to the LobTransform wrapper this._proxiedEvents = []; @@ -1178,6 +1205,15 @@ function LobTransform(source, events, options) { source.on(event, listener); this._proxiedEvents.push({eventName: event, listener: listener}); } + // Destroy this transform if the source is closed before it ends + source.once('close', function () { + if (!self._sourceEnded) { + self.destroy(); + } + }); + source.once('end', function () { + self._sourceEnded = true; + }); source.pipe(this); } @@ -1192,4 +1228,5 @@ LobTransform.prototype._destroy = function _destroy() { self._source.removeListener(value.eventName, value.listener); }); this._source.unpipe(this); + this.emit('close'); } diff --git a/test/acceptance/db.Lob.js b/test/acceptance/db.Lob.js index d371c2e..2473c47 100644 --- a/test/acceptance/db.Lob.js +++ b/test/acceptance/db.Lob.js @@ -22,6 +22,8 @@ var async = require('async'); var stream = require('stream'); var db = require('../db')(); var RemoteDB = require('../db/RemoteDB'); +var common = require('../../lib/protocol/common'); +var DEFAULT_PACKET_SIZE = common.DEFAULT_PACKET_SIZE; var describeRemoteDB = db instanceof RemoteDB ? describe : describe.skip; var isRemoteDB = db instanceof RemoteDB; @@ -52,12 +54,12 @@ describe('db', function () { var client = db.client; var transaction = client._connection._transaction; + var dirname = path.join(__dirname, '..', 'fixtures', 'img'); + describe('IMAGES', function () { before(db.createImages.bind(db)); after(db.dropImages.bind(db)); - var dirname = path.join(__dirname, '..', 'fixtures', 'img'); - it('should return all images via callback', function (done) { var sql = 'select * from images order by NAME'; client.exec(sql, function (err, rows) { @@ -237,8 +239,6 @@ describe('db', function () { } }); - var dirname = path.join(__dirname, '..', 'fixtures', 'img'); - function testInsertReadableStream(inputStream, expected, done) { var statement; function prepareInsert(cb) { @@ -308,7 +308,73 @@ describe('db', function () { srcStream.pipe(transformStream); testInsertReadableStream(transformStream, expected, done); }); + }); + + describeRemoteDB('Quiet close stream', function () { + var preparedStatement; + before(function (done) { + if (isRemoteDB) { + db.createTable.bind(db)('STREAM_BLOB_TABLE', ['A BLOB'], null, function (err) { + if (err) done(err); + client.prepare('INSERT INTO STREAM_BLOB_TABLE VALUES (?)', function (err, stmt) { + if (err) done(err); + preparedStatement = stmt; + done(); + }); + }); + } else { + this.skip(); + done(); + } + }); + after(function (done) { + if (isRemoteDB) { + db.dropTable.bind(db)('STREAM_BLOB_TABLE', function () { + preparedStatement.drop(done); + }); + } else { + done(); + } + }); + + function testInsertClosingStream(streamOptions, destroyBefore, expectedErrMessage, done) { + var srcStream = fs.createReadStream(path.join(dirname, "lobby.jpg")); + var transformStream = new AbortTransform(streamOptions); + srcStream.pipe(transformStream); + if (destroyBefore) { + transformStream.destroy(); + } + preparedStatement.exec([transformStream], function (err) { + err.should.be.an.instanceof(Error); + err.message.should.equal(expectedErrMessage); + done(); + }); + } + var quietCloseErrMessage = "Stream was destroyed before data could be completely consumed"; + + it('should raise a destroyed error when given a destroyed stream', function (done) { + testInsertClosingStream({maxBytes: 50000}, true, quietCloseErrMessage, done); + }); + + it('should raise a destroyed error during an initial write lob execute', function (done) { + testInsertClosingStream({maxBytes: 50000}, false, quietCloseErrMessage, done); + }); + + it('should raise a destroyed error when stream is destroyed in between packet sends', function (done) { + testInsertClosingStream({maxBytes: DEFAULT_PACKET_SIZE + 1}, false, quietCloseErrMessage, done); + }); + + it('should raise a destroyed error when stream is destroyed during write lob request', function (done) { + testInsertClosingStream({maxBytes: DEFAULT_PACKET_SIZE + 1, throttleFlow: true}, + false, quietCloseErrMessage, done); + }); + + it('should raise custom errors provided by user stream', function (done) { + var streamError = new Error("Custom stream error"); + testInsertClosingStream({maxBytes: 50000, customError: streamError}, false, + streamError.message, done); + }); }); }); @@ -339,3 +405,36 @@ StrictMemoryTransform.prototype._transform = function _transform(chunk, encoding } tryPush(); } + +util.inherits(AbortTransform, stream.Transform); + +// Stream that will destroy itself once the maximum number of bytes are transformed +// When throttleFlow is true, the stream will avoid pushing a new chunk before the +// internal buffer is read +function AbortTransform(options) { + this._maxBytes = options.maxBytes; + this._customError = options.customError; + this._throttleFlow = options.throttleFlow; + this._currentBytes = 0; + stream.Transform.call(this, options); +} + +AbortTransform.prototype._transform = function _transform(chunk, encoding, cb) { + var self = this; + function tryPush() { + if (self.readableLength === 0 || !self._throttleFlow) { + if (chunk.length + self._currentBytes < self._maxBytes) { + self.push(chunk); + self._currentBytes += chunk.length; + } else { + self.push(chunk.slice(0, self._maxBytes - self._currentBytes)); + self._currentBytes = self._maxBytes; + self.destroy(self._customError); + } + cb(); + } else { + setImmediate(tryPush); + } + } + tryPush(); +} diff --git a/test/lib.ExecuteTask.js b/test/lib.ExecuteTask.js index cb973d1..a974703 100644 --- a/test/lib.ExecuteTask.js +++ b/test/lib.ExecuteTask.js @@ -505,6 +505,105 @@ describe('Lib', function () { }).run(next); }); + it('should run a task with two stream parameters both with read stream errors between writeLOB chunks', function (next) { + var content = new Content(200000); + var transform = new TransformWithError(150000, false); + var emptyStream = new Readable(); + transform.on('error', () => { + emptyStream.emit('error', new Error("A separate error")); + content.unpipe(); + content.destroy(); + }); + content.pipe(transform); + + var locatorId = new Buffer([1, 0, 0, 0, 0, 0, 0, 0]); + createExecuteTask({ + parameters: { + types: [TypeCode.BLOB, TypeCode.BLOB], + values: [transform, emptyStream] + }, + availableSize : 50000, + replies: [{ + type: MessageType.EXECUTE, + args: [null, { + writeLobReply: [locatorId, locatorId] + }] + }, { + type: MessageType.WRITE_LOB, + args: [null, {}] + }, { + type: MessageType.ROLLBACK, + args: [null, {}] + }] + }, function done(err) { + err.should.be.an.instanceOf(Error); + // The error should match with the first stream, since we are reading that one + err.message.should.equal("Error in transform"); + }).run(function () { + // Call next in the next iteration of the event loop to check that the done + // callback is only called once + setImmediate(function () { + next(); + }); + }); + }); + + function testStreamQuietClose(limit, next) { + var content = new Content(200000); + var transform = new TransformWithError(limit, false, true); // quietly destroy the stream + transform.on('close', () => { + content.unpipe(); + content.destroy(); + }); + content.pipe(transform); + if (limit === 0) { + transform.destroy(); + } + var locatorId = new Buffer([1, 0, 0, 0, 0, 0, 0, 0]); + + var replies; + if (limit >= 50000) { + replies = [{ + type: MessageType.EXECUTE, + args: [null, { + writeLobReply: [locatorId] + }] + }, { + type: MessageType.WRITE_LOB, + args: [null, {}] + }, { + type: MessageType.ROLLBACK, + args: [null, {}] + }]; + } else { + replies = []; + } + + createExecuteTask({ + parameters: { + types: [TypeCode.BLOB], + values: [transform] + }, + availableSize : 50000, + replies: replies + }, function done(err) { + err.should.be.an.instanceOf(Error); + err.message.should.equal('Stream was destroyed before data could be completely consumed'); + }).run(next); + } + + it('should run a task with a stream parameter already destroyed', function (next) { + testStreamQuietClose(0, next); + }); + + it('should run a task with a stream parameter that quietly closes in initial write lob execute', function (next) { + testStreamQuietClose(1, next); + }); + + it('should run a task with a stream parameter that quietly closes in between write lob chunks', function (next) { + testStreamQuietClose(150000, next); + }); + it('should accumulate rows affected', function () { var task = createExecuteTask(); task.pushReply({}); @@ -755,10 +854,11 @@ class Content extends Readable { } class TransformWithError extends Transform { - constructor(limit, errorOnFinal) { + constructor(limit, errorOnFinal, quietClose) { super(); this.limit = limit; this.errorOnFinal = Boolean(errorOnFinal); + this.quietClose = Boolean(quietClose); } _transform(chunk, encoding, callback) { @@ -766,7 +866,11 @@ class TransformWithError extends Transform { if (this.limit > 0) { callback(null, chunk); } else { - callback(new Error('Error in transform')); + if (this.quietClose) { + this.destroy(); + } else { + callback(new Error('Error in transform')); + } } }