From 2c886255166f36271eec8ffbf34575fba813b66f Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Fri, 5 Apr 2024 14:00:39 +0200 Subject: [PATCH 1/4] first working state --- lib/protocol/Connection.js | 175 +++++++++++++++++++++++--------- lib/protocol/request/Part.js | 9 +- lib/protocol/request/Segment.js | 33 +++--- lib/util/Queue.js | 2 +- 4 files changed, 157 insertions(+), 62 deletions(-) diff --git a/lib/protocol/Connection.js b/lib/protocol/Connection.js index 38643d2..52f57b3 100644 --- a/lib/protocol/Connection.js +++ b/lib/protocol/Connection.js @@ -46,10 +46,10 @@ var MAX_AVAILABLE_SIZE = MAX_PACKET_SIZE - module.exports = Connection; util.inherits(Connection, EventEmitter); - +let idCounter = 0 function Connection(settings) { EventEmitter.call(this); - + this._id = idCounter++; var self = this; // public this.connectOptions = new part.ConnectOptions(); @@ -57,11 +57,11 @@ function Connection(settings) { this.protocolVersion = undefined; // private this._clientInfo = new ClientInfo(); - for(var key in settings) { - if(key.toUpperCase().startsWith("SESSIONVARIABLE:")) { + for (var key in settings) { + if (key.toUpperCase().startsWith("SESSIONVARIABLE:")) { var sv_key = key.substring(key.indexOf(":") + 1); var sv_value = settings[key]; - if(sv_key && sv_key.length > 0 && sv_value && sv_value.length > 0) { + if (sv_key && sv_key.length > 0 && sv_value && sv_value.length > 0) { this._clientInfo.setProperty(sv_key, sv_value); } delete settings[key]; @@ -149,7 +149,7 @@ Object.defineProperties(Connection.prototype, { case MessageType.DISCONNECT: return 'disconnecting'; default: - // do nothing + // do nothing } if (this._state.sessionId === -1) { return 'disconnected'; @@ -229,28 +229,44 @@ Connection.prototype._addListeners = function _addListeners(socket) { // register listerners on socket function ondata(chunk) { + process.stdout.write('.') + + if (!packet.data) { + self._socketWriteQueue.shift() + self._socketWriteNext() + } + packet.push(chunk); if (packet.isReady()) { + const packetCount = packet.header.packetCount; + process.stdout.write(`<< ${self._id}:${packetCount}\n`) if (self._state.sessionId !== packet.header.sessionId) { self._state.sessionId = packet.header.sessionId; self._state.packetCount = -1; } var buffer = packet.getData(); packet.clear(); - var cb = self._state.receive; + var cb = self._state.receives[packetCount]; self._state.receive = undefined; self._state.messageType = undefined; + self._state.receives[packetCount] = undefined; self.receive(buffer, cb); } } socket.on('data', ondata); function onerror(err) { - var cb = self._state && self._state.receive; - if (cb) { - self._state.receive = null; // a callback should be called only once - cb(err); - } else if (self.listeners('error').length) { + let called = false + for (const packetCount in self._state.receives) { + const cb = self._state.receives[packetCount] + if (cb) { + self._state.receives[packetCount] = undefined + called = true + cb(err) + } + } + if (called) { return } + if (self.listeners('error').length) { self.emit('error', err); } else { debug('onerror', err); @@ -291,45 +307,111 @@ Connection.prototype._clearQueue = function _clearQueue(err) { }; Connection.prototype.send = function send(message, receive) { - if (this._statementContext) { - message.unshift(PartKind.STATEMENT_CONTEXT, this._statementContext.getOptions()); - } - if (this._clientInfo.shouldSend(message.type)) { - message.add(PartKind.CLIENT_INFO, this._clientInfo.getUpdatedProperties()); + this._sendBuffer = [{ message, receive }] + return this._send(receive) + + // Not all request kinds can be included inside a batch request + const trigger = !this._sendBuffer + const buffer = this._sendBuffer ??= [] + + buffer.push({ message, receive }) + + if (trigger) { + setImmediate(() => { + this._send((err, reply) => { + const { receive } = buffer.shift() + receive(err, reply) + }) + }) } +} + +Connection.prototype._send = function send(receive) { + const sendBuffer = this._sendBuffer + this._sendBuffer = undefined + + const rawBuffer = [] + + let bufferLength = PACKET_HEADER_LENGTH + for (let i = 0; i < sendBuffer.length; i++) { + const { message } = sendBuffer[i] - debug('send', message); - trace('REQUEST', message); + if (this._statementContext) { + message.unshift(PartKind.STATEMENT_CONTEXT, this._statementContext.getOptions()); + this._statementContext = undefined + } + if (this._clientInfo.shouldSend(message.type)) { + message.add(PartKind.CLIENT_INFO, this._clientInfo.getUpdatedProperties()); + } + + debug('send', message); + trace('REQUEST', message); + + const buffers = message.toBuffer(); + bufferLength += buffers.byteLength + rawBuffer[i] = buffers + } - var size = MAX_PACKET_SIZE - PACKET_HEADER_LENGTH; - var buffer = message.toBuffer(size); - var packet = new Buffer(PACKET_HEADER_LENGTH + buffer.length); - buffer.copy(packet, PACKET_HEADER_LENGTH); + var packet = Buffer.allocUnsafe(PACKET_HEADER_LENGTH); var state = this._state; - state.messageType = message.type; + + // state.messageType = message.type; state.receive = receive; + // Increase packet count state.packetCount++; + const packetCount = state.packetCount; + + state.receives[packetCount] = receive; + + process.stdout.write(`++ ${this._id}:${packetCount}\n`) + // Session identifier bignum.writeUInt64LE(packet, state.sessionId, 0); // Packet sequence number in this session // Packets with the same sequence number belong to one request / reply pair - packet.writeUInt32LE(state.packetCount, 8); + packet.writeUInt32LE(packetCount, 8); // Used space in this packet - packet.writeUInt32LE(buffer.length, 12); + packet.writeUInt32LE(bufferLength - PACKET_HEADER_LENGTH, 12); // Total space in this packet - packet.writeUInt32LE(size, 16); + packet.writeUInt32LE(bufferLength, 16); // Number of segments in this packet - packet.writeUInt16LE(1, 20); + packet.writeUInt16LE(sendBuffer.length, 20); // Filler packet.fill(0x00, 22, PACKET_HEADER_LENGTH); - // Write request packet to socket - if (this._socket) { - this._socket.write(packet); + + const socketBuffer = [packet] + + let remaining = bufferLength - PACKET_HEADER_LENGTH + for (let i = 0; i < sendBuffer.length; i++) { + const { message } = sendBuffer[i] + const curRawBuffer = rawBuffer[i] + message.updateBuffer(curRawBuffer, remaining) + remaining -= curRawBuffer.byteLength + for (let i = 0; i < curRawBuffer.length; i++) { + socketBuffer.push(curRawBuffer[i]); + } + } + + socketBuffer._id = packetCount + this._socketWriteQueue ??= [] + this._socketWriteQueue.push(socketBuffer) + if (this._socketWriteQueue.length === 1) { + this._socketWriteNext() } }; +Connection.prototype._socketWriteNext = function _socketWriteNext() { + if (this._socketWriteQueue.length === 0) return + const drain = this._socketWriteQueue[0] + process.stdout.write(`>> ${this._id}:${drain._id}\n`) + this._socket.write(Buffer.concat(drain), () => {/* + this._socketWriteQueue.shift() + this._socketWriteNext()*/ + }) +} + Connection.prototype.getClientInfo = function getClientInfo() { return this._clientInfo; @@ -390,7 +472,7 @@ Connection.prototype.receive = function receive(buffer, cb) { if (error && error.fatal) { this.destroy(error); } - if(cb) { + if (cb) { cb(error, reply); } }; @@ -428,26 +510,26 @@ Connection.prototype._createAuthenticationManager = auth.createManager; Connection.prototype.connect = function connect(options, cb) { var self = this; var manager; - for(var key in options) { - if(key.toUpperCase().startsWith("SESSIONVARIABLE:")) { + for (var key in options) { + if (key.toUpperCase().startsWith("SESSIONVARIABLE:")) { var sv_key = key.substring(key.indexOf(":") + 1); var sv_value = options[key]; - if(sv_key && sv_key.length > 0 && sv_value && sv_value.length > 0) { + if (sv_key && sv_key.length > 0 && sv_value && sv_value.length > 0) { this._clientInfo.setProperty(sv_key, sv_value); } delete options[key]; } } this.connectOptions.setOptions([{ - name : common.ConnectOption.OS_USER, - value : this._clientInfo.getUser() + name: common.ConnectOption.OS_USER, + value: this._clientInfo.getUser() }]); this.clientContextOptions.setOptions([{ - name : common.ClientContextOption.CLIENT_APPLICATION_PROGRAM, - value : this._clientInfo.getApplication() + name: common.ClientContextOption.CLIENT_APPLICATION_PROGRAM, + value: this._clientInfo.getApplication() }]); - if(options["disableCloudRedirect"] == true) { - this._redirectType = common.RedirectType.REDIRECTION_DISABLED; + if (options["disableCloudRedirect"] == true) { + this._redirectType = common.RedirectType.REDIRECTION_DISABLED; } try { manager = this._createAuthenticationManager(options); @@ -477,7 +559,7 @@ Connection.prototype.connect = function connect(options, cb) { if (err) { return cb(err, reply); } - manager.initialize(reply.authentication, function(err) { + manager.initialize(reply.authentication, function (err) { if (err) return cb(err); var redirectOptions = [] if (typeof self._initialHost === 'undefined') { @@ -536,7 +618,7 @@ Connection.prototype.connect = function connect(options, cb) { useCesu8: self.useCesu8 } - if(this._redirectType == common.RedirectType.REDIRECTION_NONE) { + if (this._redirectType == common.RedirectType.REDIRECTION_NONE) { authOptions.dbConnectInfo = true; } @@ -634,14 +716,14 @@ Connection.prototype.rollback = function rollback(options, cb) { this.enqueue(request.rollback(options), cb); }; - // The function doesn't use the queue. It's used before the queue starts running +// The function doesn't use the queue. It's used before the queue starts running Connection.prototype.fetchDbConnectInfo = function (options, cb) { if (this.readyState == 'closed') { var err = new Error('Connection unexpectedly closed'); err.code = 'EHDBCLOSE'; return cb(err) } - this.send(request.dbConnectInfo(options), function(err, reply) { + this.send(request.dbConnectInfo(options), function (err, reply) { if (err) { return cb(err); } @@ -711,6 +793,7 @@ function ConnectionState() { this.packetCount = -1; this.messageType = undefined; this.receive = undefined; + this.receives = {}; } function Version(major, minor) { @@ -738,6 +821,6 @@ InitializationReply.read = function readInitializationReply(buffer) { return new InitializationReply(productVersion, protocolVersion); }; -var initializationRequestBuffer = new Buffer([ +var initializationRequestBuffer = Buffer.from([ 0xff, 0xff, 0xff, 0xff, 4, 20, 0, 4, 1, 0, 0, 1, 1, 1 ]); diff --git a/lib/protocol/request/Part.js b/lib/protocol/request/Part.js index 92756d8..e29df27 100644 --- a/lib/protocol/request/Part.js +++ b/lib/protocol/request/Part.js @@ -28,10 +28,13 @@ function Part(options) { this.buffer = undefined; } +Part.prototype.getLength = function () { + return PART_HEADER_LENGTH + util.alignLength(this.buffer.length, 8) +} -Part.prototype.toBuffer = function toBuffer(size) { +Part.prototype.toBuffer = function toBuffer(remaining) { var byteLength = util.alignLength(this.buffer.length, 8); - var buffer = new Buffer(PART_HEADER_LENGTH + byteLength); + var buffer = Buffer.allocUnsafe(PART_HEADER_LENGTH + byteLength); // Part kind, specifies nature of part data buffer.writeInt8(this.kind, 0); // Further attributes of part @@ -43,7 +46,7 @@ Part.prototype.toBuffer = function toBuffer(size) { // Length of part buffer in bytes buffer.writeInt32LE(this.buffer.length, 8); // Length in packet remaining without this part. - buffer.writeInt32LE(size, 12); + buffer.writeInt32LE(remaining, 12); // Has to be updated once the packet is constructed this.buffer.copy(buffer, PART_HEADER_LENGTH); if (this.buffer.length < byteLength) { buffer.fill(0x00, PART_HEADER_LENGTH + this.buffer.length); diff --git a/lib/protocol/request/Segment.js b/lib/protocol/request/Segment.js index d5162ac..6941453 100644 --- a/lib/protocol/request/Segment.js +++ b/lib/protocol/request/Segment.js @@ -72,19 +72,16 @@ Segment.prototype.add = function add(kind, args) { } }; -Segment.prototype.toBuffer = function toBuffer(size) { - size = size || MAX_SEGMENT_SIZE; - var remainingSize = size - SEGMENT_HEADER_LENGTH; +Segment.prototype.toBuffer = function toBuffer() { var length = SEGMENT_HEADER_LENGTH; var buffers = []; for (var i = 0; i < this.parts.length; i++) { - var buffer = partToBuffer(this.parts[i], remainingSize, this.useCesu8); - remainingSize -= buffer.length; - length += buffer.length; - buffers.push(buffer); + var part = partToBuffer(this.parts[i], this.useCesu8); + length += part.getLength(); + buffers.push(part); } - var header = new Buffer(SEGMENT_HEADER_LENGTH); + var header = Buffer.allocUnsafe(SEGMENT_HEADER_LENGTH); // Length of the segment, including the header header.writeInt32LE(length, 0); // Offset of the segment within the message buffer @@ -105,16 +102,28 @@ Segment.prototype.toBuffer = function toBuffer(size) { header.fill(0x00, 16, SEGMENT_HEADER_LENGTH); buffers.unshift(header); - return Buffer.concat(buffers, length); + buffers.byteLength = length; + // return Buffer.concat(buffers, length); + return buffers }; -function partToBuffer(pd, remainingSize, useCesu8) { +Segment.prototype.updateBuffer = function updateBuffer(buffers, remaining) { + remaining -= buffers[0].length + for (var i = 1; i < buffers.length; i++) { + const part = buffers[i] + remaining -= part.getLength() + buffers[i] = part.toBuffer(remaining) + } + return buffers +} + +function partToBuffer(pd, useCesu8) { var m = pd.module || data[pd.kind]; var part = new Part({ kind: pd.kind, useCesu8: useCesu8 }); part.argumentCount = m.getArgumentCount(pd.args); - m.write(part, pd.args, remainingSize); - return part.toBuffer(remainingSize); + m.write(part, pd.args); + return part; } diff --git a/lib/util/Queue.js b/lib/util/Queue.js index 85cb7dc..fbd857f 100644 --- a/lib/util/Queue.js +++ b/lib/util/Queue.js @@ -90,7 +90,7 @@ Queue.prototype.dequeue = function dequeue() { } function run() { - if (self.running && !self.busy) { + if (self.running/* && !self.busy*/) { // Queue is running and not busy self.busy = true; var task = self.queue.shift(); From f1552ed7f2027dbb6bcccd0abc838edcfebb4eeb Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Fri, 5 Apr 2024 19:24:58 +0200 Subject: [PATCH 2/4] Remove Queue completely --- lib/protocol/Connection.js | 139 +++++++++--------------------------- lib/protocol/ExecuteTask.js | 3 +- lib/util/Queue.js | 124 -------------------------------- lib/util/index.js | 1 - test/util.Queue.js | 94 ------------------------ 5 files changed, 35 insertions(+), 326 deletions(-) delete mode 100644 lib/util/Queue.js delete mode 100644 test/util.Queue.js diff --git a/lib/protocol/Connection.js b/lib/protocol/Connection.js index 52f57b3..9e69616 100644 --- a/lib/protocol/Connection.js +++ b/lib/protocol/Connection.js @@ -69,7 +69,7 @@ function Connection(settings) { } this._settings = settings || {}; this._socket = undefined; - this._queue = new util.Queue().pause(); + this._socketWriteQueue = []; this._state = new ConnectionState(); this._statementContext = undefined; this._transaction = new Transaction(); @@ -229,17 +229,14 @@ Connection.prototype._addListeners = function _addListeners(socket) { // register listerners on socket function ondata(chunk) { - process.stdout.write('.') - if (!packet.data) { self._socketWriteQueue.shift() - self._socketWriteNext() + self._socketWriteNext() } packet.push(chunk); if (packet.isReady()) { const packetCount = packet.header.packetCount; - process.stdout.write(`<< ${self._id}:${packetCount}\n`) if (self._state.sessionId !== packet.header.sessionId) { self._state.sessionId = packet.header.sessionId; self._state.packetCount = -1; @@ -307,52 +304,21 @@ Connection.prototype._clearQueue = function _clearQueue(err) { }; Connection.prototype.send = function send(message, receive) { - this._sendBuffer = [{ message, receive }] - return this._send(receive) - - // Not all request kinds can be included inside a batch request - const trigger = !this._sendBuffer - const buffer = this._sendBuffer ??= [] - - buffer.push({ message, receive }) - - if (trigger) { - setImmediate(() => { - this._send((err, reply) => { - const { receive } = buffer.shift() - receive(err, reply) - }) - }) + if (this._statementContext) { + message.unshift(PartKind.STATEMENT_CONTEXT, this._statementContext.getOptions()); + this._statementContext = undefined + } + if (this._clientInfo.shouldSend(message.type)) { + message.add(PartKind.CLIENT_INFO, this._clientInfo.getUpdatedProperties()); } -} - -Connection.prototype._send = function send(receive) { - const sendBuffer = this._sendBuffer - this._sendBuffer = undefined - - const rawBuffer = [] - - let bufferLength = PACKET_HEADER_LENGTH - for (let i = 0; i < sendBuffer.length; i++) { - const { message } = sendBuffer[i] - - if (this._statementContext) { - message.unshift(PartKind.STATEMENT_CONTEXT, this._statementContext.getOptions()); - this._statementContext = undefined - } - if (this._clientInfo.shouldSend(message.type)) { - message.add(PartKind.CLIENT_INFO, this._clientInfo.getUpdatedProperties()); - } - debug('send', message); - trace('REQUEST', message); + debug('send', message); + trace('REQUEST', message); - const buffers = message.toBuffer(); - bufferLength += buffers.byteLength - rawBuffer[i] = buffers - } + const buffers = message.toBuffer(); + var bufferLength = PACKET_HEADER_LENGTH + buffers.byteLength - var packet = Buffer.allocUnsafe(PACKET_HEADER_LENGTH); + var packet = Buffer.allocUnsafe(bufferLength); var state = this._state; @@ -365,8 +331,6 @@ Connection.prototype._send = function send(receive) { state.receives[packetCount] = receive; - process.stdout.write(`++ ${this._id}:${packetCount}\n`) - // Session identifier bignum.writeUInt64LE(packet, state.sessionId, 0); // Packet sequence number in this session @@ -377,26 +341,21 @@ Connection.prototype._send = function send(receive) { // Total space in this packet packet.writeUInt32LE(bufferLength, 16); // Number of segments in this packet - packet.writeUInt16LE(sendBuffer.length, 20); + packet.writeUInt16LE(1, 20); // Filler packet.fill(0x00, 22, PACKET_HEADER_LENGTH); - const socketBuffer = [packet] - + let offset = PACKET_HEADER_LENGTH let remaining = bufferLength - PACKET_HEADER_LENGTH - for (let i = 0; i < sendBuffer.length; i++) { - const { message } = sendBuffer[i] - const curRawBuffer = rawBuffer[i] - message.updateBuffer(curRawBuffer, remaining) - remaining -= curRawBuffer.byteLength - for (let i = 0; i < curRawBuffer.length; i++) { - socketBuffer.push(curRawBuffer[i]); - } + + message.updateBuffer(buffers, remaining) + for (let i = 0; i < buffers.length; i++) { + const buffer = buffers[i] + buffer.copy(packet, offset) + offset += buffer.length } - socketBuffer._id = packetCount - this._socketWriteQueue ??= [] - this._socketWriteQueue.push(socketBuffer) + this._socketWriteQueue.push(packet) if (this._socketWriteQueue.length === 1) { this._socketWriteNext() } @@ -405,14 +364,9 @@ Connection.prototype._send = function send(receive) { Connection.prototype._socketWriteNext = function _socketWriteNext() { if (this._socketWriteQueue.length === 0) return const drain = this._socketWriteQueue[0] - process.stdout.write(`>> ${this._id}:${drain._id}\n`) - this._socket.write(Buffer.concat(drain), () => {/* - this._socketWriteQueue.shift() - this._socketWriteNext()*/ - }) + this._socket.write(drain) } - Connection.prototype.getClientInfo = function getClientInfo() { return this._clientInfo; }; @@ -478,9 +432,7 @@ Connection.prototype.receive = function receive(buffer, cb) { }; Connection.prototype.enqueue = function enqueue(task, cb) { - var queueable; - - if (!this._socket || !this._queue || this.readyState === 'closed') { + if (!this._socket || this.readyState === 'closed') { var err = new Error('Connection closed'); err.code = 'EHDBCLOSE'; if (cb) { @@ -490,18 +442,15 @@ Connection.prototype.enqueue = function enqueue(task, cb) { } } if (util.isFunction(task)) { - queueable = this._queue.createTask(task, cb); - queueable.name = task.name; - } else if (util.isObject(task)) { + return task(cb) + } + if (util.isObject(task)) { if (task instanceof request.Segment) { - queueable = this._queue.createTask(this.send.bind(this, task), cb); - queueable.name = MessageTypeName[task.type]; - } else if (util.isFunction(task.run)) { - queueable = task; + return this.send(task, cb) + } + if (util.isFunction(task.run)) { + return task.run() } - } - if (queueable) { - this._queue.push(queueable); } }; @@ -551,7 +500,6 @@ Connection.prototype.connect = function connect(options, cb) { if (manager.sessionCookie) { self._settings.sessionCookie = manager.sessionCookie; } - self._queue.resume(); cb(null, reply); } @@ -633,14 +581,7 @@ Connection.prototype.disconnect = function disconnect(cb) { cb(err, reply); } - function enqueueDisconnect() { - self.enqueue(request.disconnect(), done); - } - - if (this.isIdle()) { - return enqueueDisconnect(); - } - this._queue.once('drain', enqueueDisconnect); + return this.send(request.disconnect(), done) }; Connection.prototype.executeDirect = function executeDirect(options, cb) { @@ -741,19 +682,11 @@ Connection.prototype._closeSilently = function _closeSilently() { }; Connection.prototype.close = function close() { - var self = this; - - function closeConnection() { - debug('close'); - self.destroy(); - } if (this.readyState === 'closed') { return; } - if (this.isIdle()) { - return closeConnection(); - } - this._queue.once('drain', closeConnection); + debug('close'); + this.destroy(); }; Connection.prototype.destroy = function destroy(err) { @@ -762,10 +695,6 @@ Connection.prototype.destroy = function destroy(err) { } }; -Connection.prototype.isIdle = function isIdle() { - return this._queue.empty && !this._queue.busy; -}; - Connection.prototype.setAutoCommit = function setAutoCommit(autoCommit) { this._transaction.autoCommit = autoCommit; }; diff --git a/lib/protocol/ExecuteTask.js b/lib/protocol/ExecuteTask.js index 7eaff8e..fca4121 100644 --- a/lib/protocol/ExecuteTask.js +++ b/lib/protocol/ExecuteTask.js @@ -45,12 +45,11 @@ ExecuteTask.create = function createExecuteTask(connection, options, cb) { return new ExecuteTask(connection, options, cb); }; -ExecuteTask.prototype.run = function run(next) { +ExecuteTask.prototype.run = function run() { var self = this; function done(err) { self.end(err); - next(); } function finalize(err) { diff --git a/lib/util/Queue.js b/lib/util/Queue.js deleted file mode 100644 index fbd857f..0000000 --- a/lib/util/Queue.js +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2013 SAP AG. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http: //www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, -// either express or implied. See the License for the specific -// language governing permissions and limitations under the License. -'use strict'; - -var util = require('util'); -var EventEmitter = require('events').EventEmitter; - -module.exports = Queue; - -util.inherits(Queue, EventEmitter); - -function Queue(immediate) { - EventEmitter.call(this); - - this.queue = []; - this.busy = false; - this.running = !!immediate; -} - -Object.defineProperty(Queue.prototype, 'empty', { - get: function isEmpty() { - return this.queue.length === 0; - } -}); - -Queue.prototype.unshift = function unshift(task) { - this.queue.unshift(task); - if (this.running) { - this.dequeue(); - } - return this; -}; - -Queue.prototype.push = function push(task) { - this.queue.push(task); - if (this.running) { - this.dequeue(); - } - return this; -}; - -Queue.prototype.resume = function resume() { - this.running = true; - if (this.queue.length) { - this.dequeue(); - } - return this; -}; - -Queue.prototype.pause = function pause() { - this.running = false; - return this; -}; - -Queue.prototype.abort = function abort(err) { - this.queue.forEach(t => t.receive(err)) - this.queue = []; - this.busy = false; - this.running = false; - this.removeAllListeners(); - return this; -}; - -Queue.prototype.createTask = function createTask(send, receive, name) { - return new Task(send, receive, name); -}; - -Queue.prototype.dequeue = function dequeue() { - var self = this; - - function next(err, name) { - /* jshint unused:false */ - self.busy = false; - if (self.queue.length) { - run(); - } else { - self.emit('drain'); - } - } - - function run() { - if (self.running/* && !self.busy*/) { - // Queue is running and not busy - self.busy = true; - var task = self.queue.shift(); - task.run(next); - } - } - run(); -}; - -function Task(send, receive, name) { - this.send = send; - this.receive = receive; - this.name = name; -} - -Task.prototype.run = function run(next) { - var self = this; - - function receive() { - /* jshint validthis:true */ - self.receive.apply(null, arguments); - next(null, self.name); - } - try { - this.send(receive); - } catch (err) { - process.nextTick(function () { - receive(err); - }); - } -}; diff --git a/lib/util/index.js b/lib/util/index.js index b534596..fedb1fe 100644 --- a/lib/util/index.js +++ b/lib/util/index.js @@ -102,7 +102,6 @@ function exportNativeUtil(fn) { exports.bignum = require('./bignum'); exports.calendar = require('./calendar'); exports.convert = require('./convert'); -exports.Queue = require('./Queue'); extend(exports, require('./zeropad')); function extend(obj) { diff --git a/test/util.Queue.js b/test/util.Queue.js deleted file mode 100644 index f8681df..0000000 --- a/test/util.Queue.js +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2013 SAP AG. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http: //www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, -// either express or implied. See the License for the specific -// language governing permissions and limitations under the License. -'use strict'; -/* jshint expr: true */ - -var lib = require('../lib'); -var Queue = lib.util.Queue; - -function createTask(reply, cb) { - function send(cb) { - setTimeout(function () { - cb(null, reply); - }, 1); - } - return Queue.prototype.createTask(send, cb, 'standard'); -} - -function createErrorTask(message, cb) { - function send(cb) { - setTimeout(function () { - cb(new Error(message)); - }, 1); - } - return Queue.prototype.createTask(send, cb, 'error'); -} - -function createThrowTask(message, cb) { - function send() { - throw new Error(message); - } - return Queue.prototype.createTask(send, cb, 'throw'); -} - -describe('Util', function () { - - describe('#Queue', function () { - - it('should create a standard queue', function (done) { - var replies = []; - var q = new Queue(); - q.empty.should.be.true; - q.busy.should.be.false; - q.running.should.be.false; - q.push(createTask('foo', function (err, reply) { - replies.push(reply); - })); - q.push(createErrorTask('abc', function (err) { - replies.push(err.message); - })); - q.unshift(createThrowTask('def', function (err) { - replies.push(err.message); - })); - q.push(createTask('bar', function (err, reply) { - replies.push(reply); - })); - q.on('drain', function () { - replies.should.eql(['def', 'foo', 'abc', 'bar']); - done(); - }); - q.resume(); - }); - - it('should create a running queue', function (done) { - var replies = []; - var q = new Queue(true); - q.empty.should.be.true; - q.busy.should.be.false; - q.running.should.be.true; - q.push(createTask('foo', function (err, reply) { - replies.push(reply); - })); - q.unshift(createTask('bar', function (err, reply) { - replies.push(reply); - })); - q.on('drain', function () { - replies.should.eql(['foo', 'bar']); - done(); - }); - }); - - }); - -}); \ No newline at end of file From 6a4420aa8c7bc5d0269563e5e9b60d9e54ca46b2 Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Fri, 5 Apr 2024 19:28:39 +0200 Subject: [PATCH 3/4] Remove connection id counter --- lib/protocol/Connection.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/protocol/Connection.js b/lib/protocol/Connection.js index 9e69616..70453fd 100644 --- a/lib/protocol/Connection.js +++ b/lib/protocol/Connection.js @@ -46,10 +46,8 @@ var MAX_AVAILABLE_SIZE = MAX_PACKET_SIZE - module.exports = Connection; util.inherits(Connection, EventEmitter); -let idCounter = 0 function Connection(settings) { EventEmitter.call(this); - this._id = idCounter++; var self = this; // public this.connectOptions = new part.ConnectOptions(); From 56e05065bb34968dcf0121f1cf4588469fe69476 Mon Sep 17 00:00:00 2001 From: Bob den Os Date: Fri, 5 Apr 2024 19:39:20 +0200 Subject: [PATCH 4/4] Remove outdated comment --- lib/protocol/request/Part.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/protocol/request/Part.js b/lib/protocol/request/Part.js index e29df27..f7192d5 100644 --- a/lib/protocol/request/Part.js +++ b/lib/protocol/request/Part.js @@ -46,7 +46,7 @@ Part.prototype.toBuffer = function toBuffer(remaining) { // Length of part buffer in bytes buffer.writeInt32LE(this.buffer.length, 8); // Length in packet remaining without this part. - buffer.writeInt32LE(remaining, 12); // Has to be updated once the packet is constructed + buffer.writeInt32LE(remaining, 12); this.buffer.copy(buffer, PART_HEADER_LENGTH); if (this.buffer.length < byteLength) { buffer.fill(0x00, PART_HEADER_LENGTH + this.buffer.length);