Skip to content

Commit 9560e40

Browse files
n-riescolgeiger
authored andcommitted
1 parent 5395f76 commit 9560e40

File tree

4 files changed

+80
-23
lines changed

4 files changed

+80
-23
lines changed

binding.cc

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -434,12 +434,22 @@ namespace zmq {
434434
return;
435435
}
436436

437-
Local<Value> argv[3];
437+
Local<Value> argv[4];
438438
argv[0] = Nan::New<Integer>(event_id);
439439
argv[1] = Nan::New<Integer>(event_value);
440440
argv[2] = Nan::New<String>(event_endpoint).ToLocalChecked();
441+
switch (event_id) {
442+
case ZMQ_EVENT_BIND_FAILED:
443+
case ZMQ_EVENT_ACCEPT_FAILED:
444+
case ZMQ_EVENT_CLOSE_FAILED:
445+
argv[3] = ExceptionFromError();
446+
break;
447+
default:
448+
argv[3] = Nan::Undefined();
449+
break;
450+
}
441451

442-
Nan::MakeCallback(this->handle(), callback_v.As<Function>(), 3, argv);
452+
Nan::MakeCallback(this->handle(), callback_v.As<Function>(), 4, argv);
443453
}
444454

445455
void

lib/index.js

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -530,8 +530,8 @@ Socket.prototype.monitor = function(interval, numOfEvents) {
530530
if (zmq.ZMQ_CAN_MONITOR) {
531531
var self = this;
532532

533-
self._zmq.onMonitorEvent = function(event_id, event_value, event_endpoint_addr) {
534-
self.emit(events[event_id], event_value, event_endpoint_addr);
533+
self._zmq.onMonitorEvent = function(event_id, event_value, event_endpoint_addr, ex) {
534+
self.emit(events[event_id], event_value, event_endpoint_addr, ex);
535535
}
536536

537537
self._zmq.onMonitorError = function(error) {
@@ -623,20 +623,26 @@ Socket.prototype.send = function(msg, flags, cb) {
623623
return this;
624624
};
625625

626-
627-
Socket.prototype._flushRead = function () {
628-
var message = this._zmq.readv(); // can throw
629-
if (!message) {
630-
return false;
631-
}
632-
633-
// Handle received message immediately to prevent memory leak in driver
626+
Socket.prototype._emitMessage = function (message) {
634627
if (message.length === 1) {
635628
// hot path
636629
this.emit('message', message[0]);
637630
} else {
638631
this.emit.apply(this, ['message'].concat(message));
639632
}
633+
}
634+
635+
Socket.prototype._flushRead = function () {
636+
try {
637+
var message = this._zmq.readv(); // can throw
638+
if (!message) {
639+
return false;
640+
}
641+
// Handle received message immediately to prevent memory leak in driver
642+
this._emitMessage(message)
643+
} catch (error) {
644+
this.emit('error', error); // can throw
645+
}
640646
return true;
641647
};
642648

@@ -669,17 +675,7 @@ Socket.prototype._flushReads = function() {
669675

670676
this._isFlushingReads = true;
671677

672-
var received;
673-
674-
do {
675-
try {
676-
received = this._flushRead();
677-
} catch (error) {
678-
this._isFlushingReads = false;
679-
this.emit('error', error); // can throw
680-
return;
681-
}
682-
} while (received);
678+
while (this._flushRead());
683679

684680
this._isFlushingReads = false;
685681

test/socket.monitor.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,17 @@ describe('socket.monitor', function() {
4848
msg.toString().should.equal('world');
4949
req.close();
5050
});
51+
52+
// Test that bind errors pass an Error both to the callback
53+
// and to the monitor event
54+
var doubleRep = zmq.socket('rep');
55+
doubleRep.monitor();
56+
doubleRep.on('bind_error', function (errno, bindAddr, ex) {
57+
(ex instanceof Error).should.equal(true);
58+
});
59+
doubleRep.bind('tcp://127.0.0.1:5423', function (error) {
60+
(error instanceof Error).should.equal(true);
61+
});
5162
});
5263
});
5364

test/socket.pub-sub.js

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,44 @@ describe('socket.pub-sub', function(){
9191
});
9292
});
9393

94+
it('should continue to deliver messages even after error in message handler is thrown', function(done){
95+
var n = 0;
96+
97+
sub.subscribe('');
98+
var errorHandlerCalled = 0;
99+
100+
sub.on('error', function (error) {
101+
errorHandlerCalled++;
102+
});
103+
104+
sub.on('message', function (msg) {
105+
msg.should.be.an.instanceof(Buffer);
106+
switch (n++) {
107+
case 0:
108+
msg.toString().should.equal('foo');
109+
throw Error('test error');
110+
break;
111+
case 1:
112+
msg.toString().should.equal('bar');
113+
sub.close();
114+
pub.close();
115+
errorHandlerCalled.should.eql(1)
116+
done();
117+
break;
118+
}
119+
});
120+
121+
var addr = "inproc://stuff_ssps";
122+
123+
sub.bind(addr, function (error) {
124+
if (error) throw error;
125+
pub.connect(addr);
126+
127+
setTimeout(function() {
128+
pub.send('foo');
129+
pub.send('bar');
130+
}, 100.0);
131+
});
132+
});
133+
94134
});

0 commit comments

Comments
 (0)