Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions site/source/docs/api_reference/emscripten.h.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1280,6 +1280,38 @@ Functions
:param em_socket_callback callback: Pointer to a callback function. The callback returns a file descriptor and the arbitrary ``userData`` passed to this function.


.. c:type:: em_poll_callback

Function pointer type for the :c:func:`emscripten_poll_with_callback` callback,
defined as: ::

typedef void (*em_poll_callback)(int fd, int revents);

``revents`` is delivered by value (the same bitmask :c:func:`poll` would leave
in ``pollfd.revents``), so there is nothing to own or free.


.. c:function:: int emscripten_poll_with_callback(int fd, int events, int timeout, em_poll_callback callback)

Asynchronous, callback-based wait on a single fd's readiness. Registers interest
in ``events`` (``POLLIN``/``POLLOUT``/...) on ``fd`` and invokes
``callback(fd, revents)`` once any of them is ready - or ``timeout`` milliseconds
elapse, in which case ``revents`` is ``0``. Unlike :c:func:`poll`, this never
blocks the calling stack, so it works without ASYNCIFY/JSPI; it is the single-fd
dual of a blocking :c:func:`poll` (wait on a set with several of these).

Capability is a property of the descriptor type, checked before arming: only
types that announce readiness (sockets, pipes) can be waited on. An unsupported
fd is rejected with ``-EPERM`` even when it is currently ready, mirroring
:c:func:`epoll_ctl`.

:param fd: The file descriptor to wait on.
:param events: Requested event mask, as for :c:func:`poll` (``POLLIN``, ``POLLOUT``, ...).
:param timeout: Milliseconds to wait before giving up; ``-1`` waits indefinitely and ``0`` is a non-blocking probe.
:param callback: Invoked with ``fd`` and the ready ``revents`` (``0`` on timeout).
:returns: ``0`` once armed, ``-EBADF`` if ``fd`` is not open, or ``-EPERM`` if ``fd``'s descriptor type cannot deliver readiness callbacks.


Unaligned types
===============

Expand Down
36 changes: 7 additions & 29 deletions src/lib/libpipefs.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

addToLibrary({
$PIPEFS__postset: () => addAtInit('PIPEFS.root = FS.mount(PIPEFS, {}, null);'),
$PIPEFS__deps: ['$FS'],
$PIPEFS__deps: ['$FS', '$notifyNodeListeners'],
$PIPEFS: {
BUCKET_BUFFER_SIZE: 1024 * 8, // 8KiB Buffer
mount(mount) {
Expand All @@ -21,23 +21,6 @@ addToLibrary({
// able to read from the read end after write end is closed.
refcnt : 2,
timestamp: new Date(),
#if PTHREADS || ASYNCIFY
readableHandlers: [],
registerReadableHandler: (callback) => {
callback.registerCleanupFunc(() => {
const i = pipe.readableHandlers.indexOf(callback);
if (i !== -1) pipe.readableHandlers.splice(i, 1);
});
pipe.readableHandlers.push(callback);
},
notifyReadableHandlers: () => {
while (pipe.readableHandlers.length > 0) {
const cb = pipe.readableHandlers.shift();
if (cb) cb({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
}
pipe.readableHandlers = [];
}
#endif
};

pipe.buckets.push({
Expand All @@ -53,6 +36,8 @@ addToLibrary({

rNode.pipe = pipe;
wNode.pipe = pipe;
// The read end's node carries the poll wait-queue; writes wake it.
pipe.readNode = rNode;

var readableStream = FS.createStream({
path: rName,
Expand Down Expand Up @@ -97,7 +82,8 @@ addToLibrary({
blocks: 0,
};
},
poll(stream, timeout, notifyCallback) {
pollable: true,
poll(stream) {
var pipe = stream.node.pipe;

if ((stream.flags & {{{ cDefs.O_ACCMODE }}}) === {{{ cDefs.O_WRONLY }}}) {
Expand All @@ -108,10 +94,6 @@ addToLibrary({
return ({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
}
}

#if PTHREADS || ASYNCIFY
if (notifyCallback) pipe.registerReadableHandler(notifyCallback);
#endif
return 0;
},
dup(stream) {
Expand Down Expand Up @@ -233,9 +215,7 @@ addToLibrary({
if (freeBytesInCurrBuffer >= dataLen) {
currBucket.buffer.set(data, currBucket.offset);
currBucket.offset += dataLen;
#if PTHREADS || ASYNCIFY
pipe.notifyReadableHandlers();
#endif
notifyNodeListeners(pipe.readNode, {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
return dataLen;
} else if (freeBytesInCurrBuffer > 0) {
currBucket.buffer.set(data.subarray(0, freeBytesInCurrBuffer), currBucket.offset);
Expand Down Expand Up @@ -267,9 +247,7 @@ addToLibrary({
newBucket.buffer.set(data);
}

#if PTHREADS || ASYNCIFY
pipe.notifyReadableHandlers();
#endif
notifyNodeListeners(pipe.readNode, {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
return dataLen;
},
close(stream) {
Expand Down
1 change: 1 addition & 0 deletions src/lib/libsigs.js
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ sigs = {
emscripten_pc_get_function__sig: 'pp',
emscripten_pc_get_line__sig: 'ip',
emscripten_performance_now__sig: 'd',
emscripten_poll_with_callback__sig: 'iiiip',
emscripten_print_double__sig: 'idpi',
emscripten_promise_all__sig: 'pppp',
emscripten_promise_all_settled__sig: 'pppp',
Expand Down
18 changes: 17 additions & 1 deletion src/lib/libsockfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ addToLibrary({
$SOCKFS__postset: () => {
addAtInit('SOCKFS.root = FS.mount(SOCKFS, {}, null);');
},
$SOCKFS__deps: ['$FS',
$SOCKFS__deps: ['$FS', '$notifyNodeListeners',
#if NODERAWSOCKETS
'$nodeSockOps',
#endif
Expand All @@ -23,6 +23,16 @@ addToLibrary({
},
emit(event, param) {
SOCKFS.callbacks[event]?.(param);
// Bridge socket readiness into the inode wait-queue.
var fd = event === 'error' ? param[0] : param;
var flags = {
'message': {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}},
'open': {{{ cDefs.POLLOUT }}},
'connection': {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}},
'close': {{{ cDefs.POLLIN }}} | {{{ cDefs.POLLHUP }}},
'error': {{{ cDefs.POLLERR }}},
}[event];
if (flags) notifyNodeListeners(FS.getStream(fd)?.node, flags);
},
mount(mount) {
#if expectToReceiveOnModule('websocket')
Expand Down Expand Up @@ -114,6 +124,8 @@ addToLibrary({
},
// node and stream ops are backend agnostic
stream_ops: {
// Readiness is announced through SOCKFS.emit -> notifyNodeListeners.
pollable: true,
poll(stream) {
var sock = stream.node.sock;
return sock.sock_ops.poll(sock);
Expand All @@ -138,6 +150,8 @@ addToLibrary({
},
close(stream) {
var sock = stream.node.sock;
// The fd is going away: wake waiters with POLLNVAL so they don't hang.
notifyNodeListeners(stream.node, {{{ cDefs.POLLNVAL }}});
sock.sock_ops.close(sock);
}
},
Expand Down Expand Up @@ -555,6 +569,8 @@ addToLibrary({
// push to queue for accept to pick up
sock.pending.push(newsock);
SOCKFS.emit('connection', newsock.stream.fd);
// A queued client makes the listener readable (POLLIN).
notifyNodeListeners(sock.stream.node, {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
} else {
// create a peer on the listen socket so calling sendto
// with the listen socket and an address will resolve
Expand Down
4 changes: 3 additions & 1 deletion src/lib/libsockfs_node.js
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ var NodeSockFSLibrary = {
});
},
},
$nodeSockOps__deps: ['$nodeSockHelpers', '$SOCKFS', '$ERRNO_CODES'],
$nodeSockOps__deps: ['$nodeSockHelpers', '$SOCKFS', '$ERRNO_CODES', '$notifyNodeListeners'],
$nodeSockOps__postset: `
if (!ENVIRONMENT_IS_NODE) {
throw new Error("NODERAWSOCKETS is currently only supported on Node.js environment.")
Expand Down Expand Up @@ -507,6 +507,8 @@ var NodeSockFSLibrary = {
try { conn.resume(); } catch (e) {} // paused by pauseOnConnect
sock.pending.push(newsock);
SOCKFS.emit('connection', newsock.stream.fd);
// A queued client makes the listener readable (POLLIN).
notifyNodeListeners(sock.stream.node, {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}});
});
server.on('error', (e) => {
sock.error = nodeSockHelpers.nodeErrToErrno(e);
Expand Down
Loading