From 72056d2ed951feccdc0b472af99ae9a3385874d5 Mon Sep 17 00:00:00 2001 From: Guy Bedford Date: Tue, 23 Jun 2026 19:40:12 -0700 Subject: [PATCH 1/3] Unify fd readiness onto one poll wait-queue; add emscripten_poll_with_callback Adds emscripten_poll_with_callback(fd, events, timeout, cb): a non-blocking single-fd poll that invokes cb(fd, revents) when the fd is ready or the timeout elapses. revents is passed by value. It does not suspend the caller, so it works without ASYNCIFY/JSPI. Returns -EBADF for a bad fd and -EPERM if the descriptor type can't deliver readiness callbacks (checked before arming, even when ready); closing an fd wakes its waiters with POLLNVAL. It is meant as an integration point for async runtimes and event loops that need to await I/O readiness without a blocking call or a stack switch: e.g. waiting for a socket to become readable/writable, or for an async-completion fd, and dispatching when ready. In ASYNCIFY/JSPI builds it complements blocking poll()/select(); in plain synchronous builds it is the only way to wait on an fd without spinning a poll loop. To support it, fd readiness now uses a single wait-queue on the file node, replacing three separate mechanisms (the socket event callbacks, the pipe readable handlers, and the blocking-poll notifier): - stream_ops.poll(stream) is now pure derivation; it no longer registers. - producers wake waiters via $notifyPollCallback(node, flags): SOCKFS.emit bridges socket events, PIPEFS writes wake the read end. - consumers register via $addPollCallback(node, cb): the async __syscall_poll registers one waiter per fd (re-deriving the set on wake), and emscripten_poll_with_callback registers a single-fd waiter. Sockets now feed the same seam, so blocking poll()/select() on a socket is woken by incoming data; previously sock_ops.poll() ignored the notifier. Tests: test_poll_callback (callback readiness, -EPERM/-EBADF gate, POLLNVAL close), test_poll_socket_blocking (blocking poll() woken by a delayed send; hangs before this change, passes after), and the core poll/ppoll/select/pipe blocking suites, including PROXY_TO_PTHREAD variants. --- .../docs/api_reference/emscripten.h.rst | 32 ++++ src/lib/libpipefs.js | 38 +--- src/lib/libsigs.js | 1 + src/lib/libsockfs.js | 22 ++- src/lib/libsyscall.js | 173 ++++++++++++------ system/include/emscripten/emscripten.h | 16 ++ test/codesize/test_codesize_hello_O0.json | 8 +- .../test_codesize_hello_dylink_all.json | 5 +- .../test_codesize_minimal_O0.expected.js | 2 + test/codesize/test_codesize_minimal_O0.json | 8 +- test/codesize/test_unoptimized_code_size.json | 12 +- test/sockets/test_poll_callback.c | 117 ++++++++++++ test/sockets/test_poll_socket_blocking.c | 78 ++++++++ test/test_sockets.py | 20 ++ 14 files changed, 425 insertions(+), 107 deletions(-) create mode 100644 test/sockets/test_poll_callback.c create mode 100644 test/sockets/test_poll_socket_blocking.c diff --git a/site/source/docs/api_reference/emscripten.h.rst b/site/source/docs/api_reference/emscripten.h.rst index fdb9567ffcd86..ae9c8a785be4c 100644 --- a/site/source/docs/api_reference/emscripten.h.rst +++ b/site/source/docs/api_reference/emscripten.h.rst @@ -1280,6 +1280,38 @@ Functions :param em_socket_callback callback: Pointer to a callback function. The callback returns a file descriptor and the arbitrary ``userData`` passed to this function. +.. c:type:: em_poll_callback + + Function pointer type for the :c:func:`emscripten_poll_with_callback` callback, + defined as: :: + + typedef void (*em_poll_callback)(int fd, int revents); + + ``revents`` is delivered by value (the same bitmask :c:func:`poll` would leave + in ``pollfd.revents``), so there is nothing to own or free. + + +.. c:function:: int emscripten_poll_with_callback(int fd, int events, int timeout, em_poll_callback callback) + + Asynchronous, callback-based wait on a single fd's readiness. Registers interest + in ``events`` (``POLLIN``/``POLLOUT``/...) on ``fd`` and invokes + ``callback(fd, revents)`` once any of them is ready - or ``timeout`` milliseconds + elapse, in which case ``revents`` is ``0``. Unlike :c:func:`poll`, this never + blocks the calling stack, so it works without ASYNCIFY/JSPI; it is the single-fd + dual of a blocking :c:func:`poll` (wait on a set with several of these). + + Capability is a property of the descriptor type, checked before arming: only + types that announce readiness (sockets, pipes) can be waited on. An unsupported + fd is rejected with ``-EPERM`` even when it is currently ready, mirroring + :c:func:`epoll_ctl`. + + :param fd: The file descriptor to wait on. + :param events: Requested event mask, as for :c:func:`poll` (``POLLIN``, ``POLLOUT``, ...). + :param timeout: Milliseconds to wait before giving up; ``-1`` waits indefinitely and ``0`` is a non-blocking probe. + :param callback: Invoked with ``fd`` and the ready ``revents`` (``0`` on timeout). + :returns: ``0`` once armed, ``-EBADF`` if ``fd`` is not open, or ``-EPERM`` if ``fd``'s descriptor type cannot deliver readiness callbacks. + + Unaligned types =============== diff --git a/src/lib/libpipefs.js b/src/lib/libpipefs.js index c99ffb7bc29b6..7d3694db2d143 100644 --- a/src/lib/libpipefs.js +++ b/src/lib/libpipefs.js @@ -6,7 +6,7 @@ addToLibrary({ $PIPEFS__postset: () => addAtInit('PIPEFS.root = FS.mount(PIPEFS, {}, null);'), - $PIPEFS__deps: ['$FS'], + $PIPEFS__deps: ['$FS', '$notifyPollCallback'], $PIPEFS: { BUCKET_BUFFER_SIZE: 1024 * 8, // 8KiB Buffer mount(mount) { @@ -21,23 +21,6 @@ addToLibrary({ // able to read from the read end after write end is closed. refcnt : 2, timestamp: new Date(), -#if PTHREADS || ASYNCIFY - readableHandlers: [], - registerReadableHandler: (callback) => { - callback.registerCleanupFunc(() => { - const i = pipe.readableHandlers.indexOf(callback); - if (i !== -1) pipe.readableHandlers.splice(i, 1); - }); - pipe.readableHandlers.push(callback); - }, - notifyReadableHandlers: () => { - while (pipe.readableHandlers.length > 0) { - const cb = pipe.readableHandlers.shift(); - if (cb) cb({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); - } - pipe.readableHandlers = []; - } -#endif }; pipe.buckets.push({ @@ -53,6 +36,8 @@ addToLibrary({ rNode.pipe = pipe; wNode.pipe = pipe; + // The read end's node carries the poll wait-queue; writes wake it. + pipe.readNode = rNode; var readableStream = FS.createStream({ path: rName, @@ -97,7 +82,10 @@ addToLibrary({ blocks: 0, }; }, - poll(stream, timeout, notifyCallback) { + // Pure readiness derivation; registration/notification go through the + // shared node wait-queue (notifyPollCallback on write/close). + pollAsync: true, + poll(stream) { var pipe = stream.node.pipe; if ((stream.flags & {{{ cDefs.O_ACCMODE }}}) === {{{ cDefs.O_WRONLY }}}) { @@ -108,10 +96,6 @@ addToLibrary({ return ({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); } } - -#if PTHREADS || ASYNCIFY - if (notifyCallback) pipe.registerReadableHandler(notifyCallback); -#endif return 0; }, dup(stream) { @@ -233,9 +217,7 @@ addToLibrary({ if (freeBytesInCurrBuffer >= dataLen) { currBucket.buffer.set(data, currBucket.offset); currBucket.offset += dataLen; -#if PTHREADS || ASYNCIFY - pipe.notifyReadableHandlers(); -#endif + notifyPollCallback(pipe.readNode, {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); return dataLen; } else if (freeBytesInCurrBuffer > 0) { currBucket.buffer.set(data.subarray(0, freeBytesInCurrBuffer), currBucket.offset); @@ -267,9 +249,7 @@ addToLibrary({ newBucket.buffer.set(data); } -#if PTHREADS || ASYNCIFY - pipe.notifyReadableHandlers(); -#endif + notifyPollCallback(pipe.readNode, {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); return dataLen; }, close(stream) { diff --git a/src/lib/libsigs.js b/src/lib/libsigs.js index 746ee98ff5cef..14e1ce1b49651 100644 --- a/src/lib/libsigs.js +++ b/src/lib/libsigs.js @@ -722,6 +722,7 @@ sigs = { emscripten_pc_get_function__sig: 'pp', emscripten_pc_get_line__sig: 'ip', emscripten_performance_now__sig: 'd', + emscripten_poll_with_callback__sig: 'iiiip', emscripten_print_double__sig: 'idpi', emscripten_promise_all__sig: 'pppp', emscripten_promise_all_settled__sig: 'pppp', diff --git a/src/lib/libsockfs.js b/src/lib/libsockfs.js index 66bcdcb162a42..b25186ab49c66 100644 --- a/src/lib/libsockfs.js +++ b/src/lib/libsockfs.js @@ -8,7 +8,7 @@ addToLibrary({ $SOCKFS__postset: () => { addAtInit('SOCKFS.root = FS.mount(SOCKFS, {}, null);'); }, - $SOCKFS__deps: ['$FS', + $SOCKFS__deps: ['$FS', '$notifyPollCallback', #if NODERAWSOCKETS '$nodeSockOps', #endif @@ -23,6 +23,18 @@ addToLibrary({ }, emit(event, param) { SOCKFS.callbacks[event]?.(param); + // Bridge socket readiness into the generic poll-callback wait-queue, so + // emscripten_poll_with_callback observes every socket event the same way + // the legacy global callbacks do. + var fd = event === 'error' ? param[0] : param; + var flags = { + 'message': {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}, + 'open': {{{ cDefs.POLLOUT }}}, + 'connection': {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}, + 'close': {{{ cDefs.POLLIN }}} | {{{ cDefs.POLLHUP }}}, + 'error': {{{ cDefs.POLLERR }}}, + }[event]; + if (flags) notifyPollCallback(FS.getStream(fd)?.node, flags); }, mount(mount) { #if expectToReceiveOnModule('websocket') @@ -114,6 +126,11 @@ addToLibrary({ }, // node and stream ops are backend agnostic stream_ops: { + // Sockets announce readiness through SOCKFS.emit -> notifyPollCallback, so + // they can be awaited with emscripten_poll_with_callback. Listening sockets + // are excluded for now: 'connection' is emitted on the accepted fd, not the + // listener, so accept readiness isn't yet delivered through the seam. + pollAsync: (stream) => !stream.node.sock.server, poll(stream) { var sock = stream.node.sock; return sock.sock_ops.poll(sock); @@ -138,6 +155,9 @@ addToLibrary({ }, close(stream) { var sock = stream.node.sock; + // Wake any pending poll-callback waiters: the fd is going away (POLLNVAL), + // so they complete and release their keepalive rather than hang. + notifyPollCallback(stream.node, {{{ cDefs.POLLNVAL }}}); sock.sock_ops.close(sock); } }, diff --git a/src/lib/libsyscall.js b/src/lib/libsyscall.js index d38357a67eb5e..e3ca26e02e809 100644 --- a/src/lib/libsyscall.js +++ b/src/lib/libsyscall.js @@ -602,7 +602,7 @@ var SyscallsLibrary = { } #endif - var count = doPoll(fds, nfds, 0, undefined); + var count = doPoll(fds, nfds); #if ASSERTIONS if (!count && timeout != 0) warnOnce('non-zero poll() timeout not supported: ' + timeout) #endif @@ -610,79 +610,59 @@ var SyscallsLibrary = { }, #if PTHREADS || ASYNCIFY $doPollAsync__internal: true, - $doPollAsync__deps: ['$FS', '$doPoll'], + $doPollAsync__deps: ['$FS', '$doPoll', '$addPollCallback'], $doPollAsync: (fds, nfds, timeout) => { #if RUNTIME_DEBUG dbg('async poll start'); #endif - - // Enable event handlers only when the poll call is proxied from a worker. - // TODO: Could use `Promise.withResolvers` here if we know its available. var resolve; var promise = new Promise((resolve_) => { resolve = resolve_; }); - var cleanupFuncs = []; + var removers = []; + var timer; var notifyDone = false; - - function asyncPollComplete(count) { - if (notifyDone) { - return; - } + function complete(count) { + if (notifyDone) return; notifyDone = true; #if RUNTIME_DEBUG dbg('asyncPollComplete', count); #endif - cleanupFuncs.forEach(cb => cb()); + removers.forEach((r) => r()); + if (timer) clearTimeout(timer); resolve(count); } - function makeNotifyCallback(stream, pollfd) { - var cb = (flags) => { - if (notifyDone) { - return; - } -#if RUNTIME_DEBUG - dbg(`async poll notify: stream=${stream}`); -#endif - var events = {{{ makeGetValue('pollfd', C_STRUCTS.pollfd.events, 'i16') }}}; - flags &= events | {{{ cDefs.POLLERR }}} | {{{ cDefs.POLLHUP }}}; -#if ASSERTIONS - assert(flags) -#endif - {{{ makeSetValue('pollfd', C_STRUCTS.pollfd.revents, 'flags', 'i16') }}}; - asyncPollComplete(1); - } - cb.registerCleanupFunc = (f) => { - if (f) cleanupFuncs.push(f); - } - return cb; - } - if (timeout > 0) { - var t = setTimeout(() => { -#if RUNTIME_DEBUG - dbg('poll: timeout', timeout); -#endif - asyncPollComplete(0); - }, timeout); - cleanupFuncs.push(() => clearTimeout(t)); - } - // A zero timeout never registers notifications: the derivation alone - // answers, matching the non-blocking probe. - var count = doPoll(fds, nfds, timeout, makeNotifyCallback); + var count = doPoll(fds, nfds); if (count || !timeout) { - asyncPollComplete(count); + // Ready now, or a zero-timeout probe: the derivation alone answers. + complete(count); + } else { + // Suspend: register one waiter per fd on its node wait-queue. Any wake + // re-derives the whole set (the wake flags are only the trigger) and + // resolves with the full ready count, as poll() reports. Registration + // follows the derivation with no event-loop turn between, so on the + // (single) main thread there is no lost-wakeup window. + var recheck = () => { + if (notifyDone) return; + var c = doPoll(fds, nfds); + if (c) complete(c); + }; + for (var i = 0; i < nfds; i++) { + var pollfd = fds + {{{ C_STRUCTS.pollfd.__size__ }}} * i; + var stream = FS.getStream({{{ makeGetValue('pollfd', C_STRUCTS.pollfd.fd, 'i32') }}}); + if (stream) removers.push(addPollCallback(stream.node, recheck)); + } + if (timeout > 0) timer = setTimeout(() => complete(0), timeout); } return promise; }, #endif - // The shared readiness derivation: one pass over the pollfds, writing - // revents and returning the ready count. With a nonzero `timeout`, a - // readiness notification is also registered on each stream by the same - // `stream_ops.poll` call that derives it, so there is no window between - // registration and derivation; a zero `timeout` means the caller will not - // wait, so no notification is registered (the plain probe). + // The shared readiness derivation: one pure pass over the pollfds, writing + // revents and returning the ready count. Notification is not registered here - + // that is the consumer's job, on the node wait-queue (see doPollAsync and + // emscripten_poll_with_callback). $doPoll__internal: true, $doPoll__deps: ['$FS'], - $doPoll: (fds, nfds, timeout, makeNotifyCallback) => { + $doPoll: (fds, nfds) => { var count = 0; for (var i = 0; i < nfds; i++) { var pollfd = fds + {{{ C_STRUCTS.pollfd.__size__ }}} * i; @@ -691,13 +671,7 @@ var SyscallsLibrary = { var flags = {{{ cDefs.POLLNVAL }}}; var stream = FS.getStream(fd); if (stream) { - if (stream.stream_ops.poll) { - flags = timeout - ? stream.stream_ops.poll(stream, timeout, makeNotifyCallback(stream, pollfd)) - : stream.stream_ops.poll(stream, -1); - } else { - flags = {{{ cDefs.POLLIN | cDefs.POLLOUT }}}; - } + flags = stream.stream_ops.poll ? stream.stream_ops.poll(stream) : ({{{ cDefs.POLLIN | cDefs.POLLOUT }}}); } flags &= events | {{{ cDefs.POLLERR }}} | {{{ cDefs.POLLHUP }}}; if (flags) count++; @@ -713,7 +687,84 @@ var SyscallsLibrary = { __syscall_poll_nonblocking__proxy: 'sync', __syscall_poll_nonblocking__deps: ['$doPoll'], __syscall_poll_nonblocking: (fds, nfds) => { - return doPoll(fds, nfds, 0, undefined); + return doPoll(fds, nfds); + }, + + // The descriptor-layer readiness wait-queue. It lives on the *node* (so dup'd + // fds sharing a node share one queue) and is the single seam consumed by both + // emscripten_poll_with_callback (single-fd) and the async __syscall_poll + // (multi-fd), and fed by every producer (SOCKFS.emit, pipe writes, ...). + // `stream_ops.poll(stream)` is pure derivation; registration and notification + // are entirely here. + $notifyPollCallback: (node, flags) => { + // Copy first: a woken waiter removes itself as it completes. + node?.pollCallbacks?.slice().forEach((cb) => cb(flags)); + }, + $addPollCallback: (node, cb) => { + (node.pollCallbacks ??= []).push(cb); + return () => { + var i = node.pollCallbacks.indexOf(cb); + if (i >= 0) node.pollCallbacks.splice(i, 1); + }; + }, + + // Asynchronous, callback-based wait on a single fd. Registers interest in + // `events` on `fd` and invokes callback(fd, revents) once any are ready, or + // `timeout` ms elapse (-1 = no timeout, 0 = probe), with revents passed by + // value. Unlike poll() this never suspends the calling stack, so it works + // without ASYNCIFY/JSPI. The single-fd, by-value dual of a blocking poll() - + // waiting on a set is just several of these. + // + // Returns 0 once armed/scheduled, -EBADF if `fd` is not open, or -EPERM if the + // descriptor type cannot deliver readiness callbacks (checked first, so an + // unsupported fd is rejected even when it is currently ready - as epoll does). + emscripten_poll_with_callback__deps: ['$FS', '$callUserCallback', '$safeSetTimeout', '$addPollCallback'], + emscripten_poll_with_callback__proxy: 'sync', + emscripten_poll_with_callback: (fd, events, timeout, callback) => { + var stream = FS.getStream(fd); + if (!stream) return -{{{ cDefs.EBADF }}}; + // Capability is a property of the descriptor: only types that announce + // readiness through $notifyPollCallback can be waited on. `pollAsync` is a + // flag, or a predicate when an instance (e.g. a listening socket) can't. + var pollAsync = stream.stream_ops.pollAsync; + if (typeof pollAsync == 'function') pollAsync = pollAsync(stream); + if (!pollAsync) return -{{{ cDefs.EPERM }}}; + + // poll() always reports these regardless of the requested events. + var mask = events | {{{ cDefs.POLLERR }}} | {{{ cDefs.POLLHUP }}} | {{{ cDefs.POLLNVAL }}}; + var done = false, removePoll, timer; + function finish(revents) { + if (done) return; + done = true; + removePoll?.(); + if (timer) clearTimeout(timer); + // Always deliver on a fresh tick: finish() can be reached synchronously + // (ready-now, or a close() that wakes us), and the callback must never run + // - nor trigger runtime exit via maybeExit - inside the caller's stack. + safeSetTimeout(() => { + {{{ runtimeKeepalivePop() }}} + callUserCallback(() => { + {{{ makeDynCall('vii', 'callback') }}}(fd, revents); + }); + }, 0); + } + + {{{ runtimeKeepalivePush() }}} + var revents = (stream.stream_ops.poll?.(stream) ?? {{{ cDefs.POLLIN | cDefs.POLLOUT }}}) & mask; + if (revents || !timeout) { + // Ready now, or a zero-timeout probe. + finish(revents); + } else { + // Enqueue on the node wait-queue; a later notify wakes us. On wake we + // re-derive the full current readiness (the wake flags are only the edge + // that triggered us) and report it as poll() would. + removePoll = addPollCallback(stream.node, (flags) => { + var ready = ((stream.stream_ops.poll?.(stream) ?? 0) | flags) & mask; + if (ready) finish(ready); + }); + if (timeout > 0) timer = setTimeout(() => finish(0), timeout); + } + return 0; }, __syscall_getcwd__deps: ['$lengthBytesUTF8', '$stringToUTF8'], __syscall_getcwd: (buf, size) => { diff --git a/system/include/emscripten/emscripten.h b/system/include/emscripten/emscripten.h index 43d2f2899dd0e..8adf1114f5793 100644 --- a/system/include/emscripten/emscripten.h +++ b/system/include/emscripten/emscripten.h @@ -20,6 +20,8 @@ * is up at http://kripken.github.io/emscripten-site/docs/api_reference/emscripten.h.html */ +#include + #include "em_asm.h" #include "em_js.h" #include "em_macros.h" @@ -68,6 +70,20 @@ void emscripten_set_socket_connection_callback(void *userData, em_socket_callbac void emscripten_set_socket_message_callback(void *userData, em_socket_callback callback); void emscripten_set_socket_close_callback(void *userData, em_socket_callback callback); +// Asynchronous, callback-based wait on a single fd's readiness. Registers +// interest in `events` (POLLIN/POLLOUT/...) on `fd` and invokes +// callback(fd, revents) once any of them is ready - or `timeout` ms elapse, in +// which case revents is 0. revents is delivered by value, so there is nothing to +// allocate, own, or keep alive across the call. Unlike poll() this never blocks +// the calling stack, so it works without ASYNCIFY/JSPI; it is the single-fd dual +// of a blocking poll() (wait on a set with several of these). +// +// Returns 0 once armed, -EBADF if `fd` is not open, or -EPERM if `fd`'s +// descriptor type cannot deliver readiness callbacks (checked first, so an +// unsupported fd is rejected even when currently ready, as epoll_ctl does). +typedef void (*em_poll_callback)(int fd, int revents); +int emscripten_poll_with_callback(int fd, int events, int timeout, em_poll_callback callback); + void _emscripten_push_main_loop_blocker(em_arg_callback_func func, void *arg, const char *name); void _emscripten_push_uncounted_main_loop_blocker(em_arg_callback_func func, void *arg, const char *name); #define emscripten_push_main_loop_blocker(func, arg) \ diff --git a/test/codesize/test_codesize_hello_O0.json b/test/codesize/test_codesize_hello_O0.json index 35c3470b5e2e5..5362751d4b9ad 100644 --- a/test/codesize/test_codesize_hello_O0.json +++ b/test/codesize/test_codesize_hello_O0.json @@ -1,10 +1,10 @@ { - "a.out.js": 23483, - "a.out.js.gz": 8523, + "a.out.js": 23518, + "a.out.js.gz": 8538, "a.out.nodebug.wasm": 15115, "a.out.nodebug.wasm.gz": 7464, - "total": 38598, - "total_gz": 15987, + "total": 38633, + "total_gz": 16002, "sent": [ "fd_write" ], diff --git a/test/codesize/test_codesize_hello_dylink_all.json b/test/codesize/test_codesize_hello_dylink_all.json index eb27bc739ba6a..c42532de8a7b5 100644 --- a/test/codesize/test_codesize_hello_dylink_all.json +++ b/test/codesize/test_codesize_hello_dylink_all.json @@ -1,7 +1,7 @@ { - "a.out.js": 268495, + "a.out.js": 269286, "a.out.nodebug.wasm": 587520, - "total": 856015, + "total": 856806, "sent": [ "IMG_Init", "IMG_Load", @@ -839,6 +839,7 @@ "emscripten_pc_get_function", "emscripten_pc_get_line", "emscripten_performance_now", + "emscripten_poll_with_callback", "emscripten_print_double", "emscripten_promise_all", "emscripten_promise_all_settled", diff --git a/test/codesize/test_codesize_minimal_O0.expected.js b/test/codesize/test_codesize_minimal_O0.expected.js index b97d7b35a7ad0..872a36d43dacd 100644 --- a/test/codesize/test_codesize_minimal_O0.expected.js +++ b/test/codesize/test_codesize_minimal_O0.expected.js @@ -1015,6 +1015,8 @@ Module['FS_createPreloadedFile'] = FS.createPreloadedFile; 'addDays', 'getSocketFromFD', 'getSocketAddress', + 'notifyPollCallback', + 'addPollCallback', 'FS_createPreloadedFile', 'FS_preloadFile', 'FS_modeStringToFlags', diff --git a/test/codesize/test_codesize_minimal_O0.json b/test/codesize/test_codesize_minimal_O0.json index 325e0d534493a..d55fb5fc10338 100644 --- a/test/codesize/test_codesize_minimal_O0.json +++ b/test/codesize/test_codesize_minimal_O0.json @@ -1,10 +1,10 @@ { - "a.out.js": 18723, - "a.out.js.gz": 6778, + "a.out.js": 18758, + "a.out.js.gz": 6793, "a.out.nodebug.wasm": 1015, "a.out.nodebug.wasm.gz": 602, - "total": 19738, - "total_gz": 7380, + "total": 19773, + "total_gz": 7395, "sent": [], "imports": [], "exports": [ diff --git a/test/codesize/test_unoptimized_code_size.json b/test/codesize/test_unoptimized_code_size.json index 3e8dbb92eef88..507e85276c037 100644 --- a/test/codesize/test_unoptimized_code_size.json +++ b/test/codesize/test_unoptimized_code_size.json @@ -1,16 +1,16 @@ { - "hello_world.js": 55305, - "hello_world.js.gz": 17405, + "hello_world.js": 55350, + "hello_world.js.gz": 17420, "hello_world.wasm": 15115, "hello_world.wasm.gz": 7464, "no_asserts.js": 25683, "no_asserts.js.gz": 8690, "no_asserts.wasm": 12229, "no_asserts.wasm.gz": 6004, - "strict.js": 53033, - "strict.js.gz": 16620, + "strict.js": 53078, + "strict.js.gz": 16635, "strict.wasm": 15115, "strict.wasm.gz": 7461, - "total": 176480, - "total_gz": 63644 + "total": 176570, + "total_gz": 63674 } diff --git a/test/sockets/test_poll_callback.c b/test/sockets/test_poll_callback.c new file mode 100644 index 0000000000000..afa83dce08bdf --- /dev/null +++ b/test/sockets/test_poll_callback.c @@ -0,0 +1,117 @@ +/* + * Copyright 2026 The Emscripten Authors. All rights reserved. + * Emscripten is available under two separate licenses, the MIT license and the + * University of Illinois/NCSA Open Source License. Both these licenses can be + * found in the LICENSE file. + * + * Verifies emscripten_poll_with_callback against a real socket: a datagram + * arriving on a bound UDP socket wakes the callback through the SOCKFS readiness + * -> poll-callback bridge, with no select and no main loop. + * Also checks the capability gate: a descriptor type that cannot deliver + * readiness callbacks (a regular file) is rejected with -EPERM even though it is + * always "ready", and a bad fd with -EBADF. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static int rx = -1, tx = -1; +static volatile int cancel_revents = -1; + +static void fail(const char* why) { + printf("POLL CALLBACK FAIL: %s\n", why); + abort(); +} + +static void on_cancel(int fd, int revents) { + cancel_revents = revents; +} + +// revents arrives by value - nothing to own or free. +static void on_readable(int fd, int revents) { + if (!(revents & POLLIN)) fail("socket reported not readable"); + char buf[4]; + ssize_t n = recv(fd, buf, sizeof(buf), 0); + if (n != 4 || memcmp(buf, "ping", 4) != 0) fail("did not receive datagram"); + // The earlier close()-cancellation must have delivered POLLNVAL by now. + if (cancel_revents != POLLNVAL) fail("close did not deliver POLLNVAL"); + close(rx); + close(tx); + printf("POLL CALLBACK PASS\n"); +} + +int main(void) { + // Capability gate: a regular file can't deliver readiness callbacks, so it is + // rejected with -EPERM even though a regular file is always ready. + int filefd = open("/tmp/poll_cb.tmp", O_CREAT | O_RDWR, 0600); + assert(filefd >= 0); + if (emscripten_poll_with_callback(filefd, POLLIN, 0, on_readable) != -EPERM) { + fail("regular file should be rejected with -EPERM"); + } + close(filefd); + + // A bad fd is -EBADF. + if (emscripten_poll_with_callback(9999, POLLIN, 0, on_readable) != -EBADF) { + fail("invalid fd should be rejected with -EBADF"); + } + + // A listening socket can't yet deliver accept readiness through the seam, so + // it is reported unpollable rather than arming a waiter that never fires. + int lfd = socket(AF_INET, SOCK_STREAM, 0); + assert(lfd >= 0); + struct sockaddr_in laddr; + memset(&laddr, 0, sizeof(laddr)); + laddr.sin_family = AF_INET; + inet_pton(AF_INET, "127.0.0.1", &laddr.sin_addr); + assert(bind(lfd, (struct sockaddr*)&laddr, sizeof(laddr)) == 0); + assert(listen(lfd, 1) == 0); + if (emscripten_poll_with_callback(lfd, POLLIN, 0, on_readable) != -EPERM) { + fail("listening socket should be rejected with -EPERM"); + } + close(lfd); + + // Closing an fd with a pending waiter delivers POLLNVAL (no leak, no hang). + // The callback fires on a later tick, so it's checked in on_readable below. + int cfd = socket(AF_INET, SOCK_DGRAM, 0); + assert(cfd >= 0); + if (emscripten_poll_with_callback(cfd, POLLIN, -1, on_cancel) != 0) { + fail("poll_with_callback did not arm for cancel"); + } + close(cfd); + + rx = socket(AF_INET, SOCK_DGRAM, 0); + tx = socket(AF_INET, SOCK_DGRAM, 0); + assert(rx >= 0 && tx >= 0); + + struct sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(0); // ephemeral + inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); + if (bind(rx, (struct sockaddr*)&addr, sizeof(addr)) != 0) fail("bind"); + + socklen_t l = sizeof(addr); + if (getsockname(rx, (struct sockaddr*)&addr, &l) != 0) fail("getsockname"); + + // Arm the callback before the datagram is sent: rx isn't readable yet, so this + // registers a waiter that the arriving datagram (SOCKFS 'message' emit) wakes. + if (emscripten_poll_with_callback(rx, POLLIN, -1, on_readable) != 0) { + fail("poll_with_callback did not arm"); + } + + if (sendto(tx, "ping", 4, 0, (struct sockaddr*)&addr, sizeof(addr)) != 4) { + fail("sendto"); + } + + return 0; +} diff --git a/test/sockets/test_poll_socket_blocking.c b/test/sockets/test_poll_socket_blocking.c new file mode 100644 index 0000000000000..8ebcdc4e6d0d7 --- /dev/null +++ b/test/sockets/test_poll_socket_blocking.c @@ -0,0 +1,78 @@ +/* + * Copyright 2026 The Emscripten Authors. All rights reserved. + * Emscripten is available under two separate licenses, the MIT license and the + * University of Illinois/NCSA Open Source License. Both these licenses can be + * found in the LICENSE file. + * + * A *blocking* poll() on a real socket is woken by data that arrives *after* the + * poll has already blocked. The datagram is sent on a delay (from another thread + * under -pthread, or a timer under JSPI) so poll() must suspend - the calling + * stack under JSPI, or the proxied worker under PROXY_TO_PTHREAD - and be woken + * by the arrival through the unified readiness wait-queue. Sockets gained this + * from the poll()/poll_with_callback convergence: sock_ops.poll() never + * registered a notifier before, so a blocking poll() on a socket woken purely by + * I/O could not work (it would block forever). + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef __EMSCRIPTEN_PTHREADS__ +#include +#endif + +static int rx = -1, tx = -1; +static struct sockaddr_in addr; + +static void send_ping(void* arg) { + assert(sendto(tx, "ping", 4, 0, (struct sockaddr*)&addr, sizeof(addr)) == 4); +} + +#ifdef __EMSCRIPTEN_PTHREADS__ +static void* sender(void* arg) { + usleep(100000); // let poll() block first + send_ping(NULL); + return NULL; +} +#endif + +int main(void) { + rx = socket(AF_INET, SOCK_DGRAM, 0); + tx = socket(AF_INET, SOCK_DGRAM, 0); + assert(rx >= 0 && tx >= 0); + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); + assert(bind(rx, (struct sockaddr*)&addr, sizeof(addr)) == 0); + socklen_t l = sizeof(addr); + assert(getsockname(rx, (struct sockaddr*)&addr, &l) == 0); + + // Arrange the datagram to arrive only after poll() is already blocking, so it + // can only complete by being woken - not by the initial readiness derivation. +#ifdef __EMSCRIPTEN_PTHREADS__ + pthread_t t; + assert(pthread_create(&t, NULL, sender, NULL) == 0); +#else + emscripten_async_call(send_ping, NULL, 100); +#endif + + struct pollfd pfd = { .fd = rx, .events = POLLIN }; + int n = poll(&pfd, 1, -1); // blocks; only the arrival can wake it + assert(n == 1 && (pfd.revents & POLLIN)); + + char buf[4]; + assert(recv(rx, buf, sizeof(buf), 0) == 4 && memcmp(buf, "ping", 4) == 0); + + close(rx); + close(tx); + printf("POLL SOCKET BLOCKING PASS\n"); + return 0; +} diff --git a/test/test_sockets.py b/test/test_sockets.py index 53f5bf9d90c29..2362d529dd3df 100644 --- a/test/test_sockets.py +++ b/test/test_sockets.py @@ -475,6 +475,26 @@ def test_noderawsockets_udp_ipv6(self): self.skipTest('no IPv6 loopback available') self.do_runf('sockets/test_udp_ipv6.c', 'UDP IPV6 PASS', cflags=['-sNODERAWSOCKETS']) + @also_with_proxy_to_pthread + def test_noderawsockets_poll_callback(self): + # emscripten_poll_with_callback waits on a real socket's readiness (a UDP + # datagram arrival) via the SOCKFS emit -> poll-callback bridge, and rejects + # descriptor types that can't deliver callbacks (-EPERM) and bad fds (-EBADF). + self.do_runf('sockets/test_poll_callback.c', 'POLL CALLBACK PASS', cflags=['-sNODERAWSOCKETS', '-sFORCE_FILESYSTEM', '-sEXIT_RUNTIME']) + + def test_noderawsockets_poll_socket_blocking(self): + # A blocking poll() on a socket is woken by incoming data through the unified + # readiness wait-queue - a capability sockets gained from the poll() / + # poll_with_callback convergence (sock_ops.poll never registered notifiers). + self.do_runf('sockets/test_poll_socket_blocking.c', 'POLL SOCKET BLOCKING PASS', + cflags=['-sNODERAWSOCKETS', '-pthread', '-sPROXY_TO_PTHREAD', '-sEXIT_RUNTIME']) + + def test_noderawsockets_poll_socket_blocking_jspi(self): + # Same, but the blocking poll() suspends the wasm stack under JSPI. + self.require_jspi() + self.do_runf('sockets/test_poll_socket_blocking.c', 'POLL SOCKET BLOCKING PASS', + cflags=['-sNODERAWSOCKETS', '-sEXIT_RUNTIME']) + @also_with_proxy_to_pthread def test_noderawsockets_udp(self): # Self-contained loopback UDP echo: the server binds(:0)+getsockname for its From 39c656a56a137545ad4de7db5b1d33a59732ad5d Mon Sep 17 00:00:00 2001 From: Guy Bedford Date: Thu, 25 Jun 2026 12:35:51 -0700 Subject: [PATCH 2/3] fixup test --- test/test_sockets.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/test/test_sockets.py b/test/test_sockets.py index 2362d529dd3df..3ec43161c3b54 100644 --- a/test/test_sockets.py +++ b/test/test_sockets.py @@ -491,7 +491,16 @@ def test_noderawsockets_poll_socket_blocking(self): def test_noderawsockets_poll_socket_blocking_jspi(self): # Same, but the blocking poll() suspends the wasm stack under JSPI. - self.require_jspi() + # NODERAWSOCKETS runs under node rather than the browser, so gate JSPI on + # node's own support (v24) instead of require_jspi's browser-test path. + if 'EMTEST_SKIP_JSPI' in os.environ: + self.skipTest('skipping JSPI (EMTEST_SKIP_JSPI is set)') + if not self.try_require_node_version(24): + self.skipTest('JSPI requires node v24') + if not common.check_node_version(26): + self.node_args += ['--experimental-wasm-stack-switching'] + self.cflags += ['-Wno-experimental'] + self.set_setting('JSPI') self.do_runf('sockets/test_poll_socket_blocking.c', 'POLL SOCKET BLOCKING PASS', cflags=['-sNODERAWSOCKETS', '-sEXIT_RUNTIME']) From 5007c4e92007c762cafff1a8a9cec75ab2d55b10 Mon Sep 17 00:00:00 2001 From: Guy Bedford Date: Thu, 25 Jun 2026 14:26:16 -0700 Subject: [PATCH 3/3] pr feedback --- src/lib/libpipefs.js | 10 +- src/lib/libsockfs.js | 22 ++-- src/lib/libsockfs_node.js | 4 +- src/lib/libsyscall.js | 97 ++++++++--------- test/codesize/test_codesize_hello_O0.json | 8 +- .../test_codesize_hello_dylink_all.json | 4 +- .../test_codesize_minimal_O0.expected.js | 2 - test/codesize/test_codesize_minimal_O0.json | 8 +- test/codesize/test_unoptimized_code_size.json | 12 +-- test/sockets/test_poll_callback.c | 100 ++++++++++-------- test/test_sockets.py | 7 +- 11 files changed, 135 insertions(+), 139 deletions(-) diff --git a/src/lib/libpipefs.js b/src/lib/libpipefs.js index 7d3694db2d143..23d4f8b5c8256 100644 --- a/src/lib/libpipefs.js +++ b/src/lib/libpipefs.js @@ -6,7 +6,7 @@ addToLibrary({ $PIPEFS__postset: () => addAtInit('PIPEFS.root = FS.mount(PIPEFS, {}, null);'), - $PIPEFS__deps: ['$FS', '$notifyPollCallback'], + $PIPEFS__deps: ['$FS', '$notifyNodeListeners'], $PIPEFS: { BUCKET_BUFFER_SIZE: 1024 * 8, // 8KiB Buffer mount(mount) { @@ -82,9 +82,7 @@ addToLibrary({ blocks: 0, }; }, - // Pure readiness derivation; registration/notification go through the - // shared node wait-queue (notifyPollCallback on write/close). - pollAsync: true, + pollable: true, poll(stream) { var pipe = stream.node.pipe; @@ -217,7 +215,7 @@ addToLibrary({ if (freeBytesInCurrBuffer >= dataLen) { currBucket.buffer.set(data, currBucket.offset); currBucket.offset += dataLen; - notifyPollCallback(pipe.readNode, {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); + notifyNodeListeners(pipe.readNode, {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); return dataLen; } else if (freeBytesInCurrBuffer > 0) { currBucket.buffer.set(data.subarray(0, freeBytesInCurrBuffer), currBucket.offset); @@ -249,7 +247,7 @@ addToLibrary({ newBucket.buffer.set(data); } - notifyPollCallback(pipe.readNode, {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); + notifyNodeListeners(pipe.readNode, {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); return dataLen; }, close(stream) { diff --git a/src/lib/libsockfs.js b/src/lib/libsockfs.js index b25186ab49c66..50c4c77f90ddc 100644 --- a/src/lib/libsockfs.js +++ b/src/lib/libsockfs.js @@ -8,7 +8,7 @@ addToLibrary({ $SOCKFS__postset: () => { addAtInit('SOCKFS.root = FS.mount(SOCKFS, {}, null);'); }, - $SOCKFS__deps: ['$FS', '$notifyPollCallback', + $SOCKFS__deps: ['$FS', '$notifyNodeListeners', #if NODERAWSOCKETS '$nodeSockOps', #endif @@ -23,9 +23,7 @@ addToLibrary({ }, emit(event, param) { SOCKFS.callbacks[event]?.(param); - // Bridge socket readiness into the generic poll-callback wait-queue, so - // emscripten_poll_with_callback observes every socket event the same way - // the legacy global callbacks do. + // Bridge socket readiness into the inode wait-queue. var fd = event === 'error' ? param[0] : param; var flags = { 'message': {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}, @@ -34,7 +32,7 @@ addToLibrary({ 'close': {{{ cDefs.POLLIN }}} | {{{ cDefs.POLLHUP }}}, 'error': {{{ cDefs.POLLERR }}}, }[event]; - if (flags) notifyPollCallback(FS.getStream(fd)?.node, flags); + if (flags) notifyNodeListeners(FS.getStream(fd)?.node, flags); }, mount(mount) { #if expectToReceiveOnModule('websocket') @@ -126,11 +124,8 @@ addToLibrary({ }, // node and stream ops are backend agnostic stream_ops: { - // Sockets announce readiness through SOCKFS.emit -> notifyPollCallback, so - // they can be awaited with emscripten_poll_with_callback. Listening sockets - // are excluded for now: 'connection' is emitted on the accepted fd, not the - // listener, so accept readiness isn't yet delivered through the seam. - pollAsync: (stream) => !stream.node.sock.server, + // Readiness is announced through SOCKFS.emit -> notifyNodeListeners. + pollable: true, poll(stream) { var sock = stream.node.sock; return sock.sock_ops.poll(sock); @@ -155,9 +150,8 @@ addToLibrary({ }, close(stream) { var sock = stream.node.sock; - // Wake any pending poll-callback waiters: the fd is going away (POLLNVAL), - // so they complete and release their keepalive rather than hang. - notifyPollCallback(stream.node, {{{ cDefs.POLLNVAL }}}); + // The fd is going away: wake waiters with POLLNVAL so they don't hang. + notifyNodeListeners(stream.node, {{{ cDefs.POLLNVAL }}}); sock.sock_ops.close(sock); } }, @@ -575,6 +569,8 @@ addToLibrary({ // push to queue for accept to pick up sock.pending.push(newsock); SOCKFS.emit('connection', newsock.stream.fd); + // A queued client makes the listener readable (POLLIN). + notifyNodeListeners(sock.stream.node, {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); } else { // create a peer on the listen socket so calling sendto // with the listen socket and an address will resolve diff --git a/src/lib/libsockfs_node.js b/src/lib/libsockfs_node.js index 067b435f2d75e..6ba7b712d2662 100644 --- a/src/lib/libsockfs_node.js +++ b/src/lib/libsockfs_node.js @@ -314,7 +314,7 @@ var NodeSockFSLibrary = { }); }, }, - $nodeSockOps__deps: ['$nodeSockHelpers', '$SOCKFS', '$ERRNO_CODES'], + $nodeSockOps__deps: ['$nodeSockHelpers', '$SOCKFS', '$ERRNO_CODES', '$notifyNodeListeners'], $nodeSockOps__postset: ` if (!ENVIRONMENT_IS_NODE) { throw new Error("NODERAWSOCKETS is currently only supported on Node.js environment.") @@ -507,6 +507,8 @@ var NodeSockFSLibrary = { try { conn.resume(); } catch (e) {} // paused by pauseOnConnect sock.pending.push(newsock); SOCKFS.emit('connection', newsock.stream.fd); + // A queued client makes the listener readable (POLLIN). + notifyNodeListeners(sock.stream.node, {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); }); server.on('error', (e) => { sock.error = nodeSockHelpers.nodeErrToErrno(e); diff --git a/src/lib/libsyscall.js b/src/lib/libsyscall.js index e3ca26e02e809..1d5b45bf63863 100644 --- a/src/lib/libsyscall.js +++ b/src/lib/libsyscall.js @@ -610,7 +610,7 @@ var SyscallsLibrary = { }, #if PTHREADS || ASYNCIFY $doPollAsync__internal: true, - $doPollAsync__deps: ['$FS', '$doPoll', '$addPollCallback'], + $doPollAsync__deps: ['$FS', '$doPoll', '$addNodeListener'], $doPollAsync: (fds, nfds, timeout) => { #if RUNTIME_DEBUG dbg('async poll start'); @@ -626,21 +626,17 @@ var SyscallsLibrary = { #if RUNTIME_DEBUG dbg('asyncPollComplete', count); #endif - removers.forEach((r) => r()); + for (var r of removers) r(); if (timer) clearTimeout(timer); resolve(count); } var count = doPoll(fds, nfds); if (count || !timeout) { - // Ready now, or a zero-timeout probe: the derivation alone answers. complete(count); } else { - // Suspend: register one waiter per fd on its node wait-queue. Any wake - // re-derives the whole set (the wake flags are only the trigger) and - // resolves with the full ready count, as poll() reports. Registration - // follows the derivation with no event-loop turn between, so on the - // (single) main thread there is no lost-wakeup window. + // Suspend: one waiter per fd; any wake re-derives the whole set. Deriving + // and registering with no event-loop turn between avoids a lost wakeup. var recheck = () => { if (notifyDone) return; var c = doPoll(fds, nfds); @@ -649,17 +645,15 @@ var SyscallsLibrary = { for (var i = 0; i < nfds; i++) { var pollfd = fds + {{{ C_STRUCTS.pollfd.__size__ }}} * i; var stream = FS.getStream({{{ makeGetValue('pollfd', C_STRUCTS.pollfd.fd, 'i32') }}}); - if (stream) removers.push(addPollCallback(stream.node, recheck)); + if (stream) removers.push(addNodeListener(stream.node, recheck)); } if (timeout > 0) timer = setTimeout(() => complete(0), timeout); } return promise; }, #endif - // The shared readiness derivation: one pure pass over the pollfds, writing - // revents and returning the ready count. Notification is not registered here - - // that is the consumer's job, on the node wait-queue (see doPollAsync and - // emscripten_poll_with_callback). + // Pure readiness derivation: one pass writing revents, returning the ready + // count. Registration is the consumer's job (doPollAsync, poll_with_callback). $doPoll__internal: true, $doPoll__deps: ['$FS'], $doPoll: (fds, nfds) => { @@ -671,7 +665,11 @@ var SyscallsLibrary = { var flags = {{{ cDefs.POLLNVAL }}}; var stream = FS.getStream(fd); if (stream) { - flags = stream.stream_ops.poll ? stream.stream_ops.poll(stream) : ({{{ cDefs.POLLIN | cDefs.POLLOUT }}}); + if (stream.stream_ops.poll) { + flags = stream.stream_ops.poll(stream); + } else { + flags = {{{ cDefs.POLLIN | cDefs.POLLOUT }}}; + } } flags &= events | {{{ cDefs.POLLERR }}} | {{{ cDefs.POLLHUP }}}; if (flags) count++; @@ -679,56 +677,46 @@ var SyscallsLibrary = { } return count; }, - // libc routes zero-timeout poll() calls here: the same synchronous - // readiness derivation as __syscall_poll, but as a plain import that never - // suspends, so probes stay callable from any context (under JSPI, - // __syscall_poll is a suspending import and traps when called from a stack - // that wasn't entered through a promising export). + // Zero-timeout poll() probes route here: the same derivation as __syscall_poll + // but a non-suspending import, so they stay callable from any context (a + // suspending __syscall_poll traps off a non-promising stack under JSPI). __syscall_poll_nonblocking__proxy: 'sync', __syscall_poll_nonblocking__deps: ['$doPoll'], __syscall_poll_nonblocking: (fds, nfds) => { return doPoll(fds, nfds); }, - // The descriptor-layer readiness wait-queue. It lives on the *node* (so dup'd - // fds sharing a node share one queue) and is the single seam consumed by both - // emscripten_poll_with_callback (single-fd) and the async __syscall_poll - // (multi-fd), and fed by every producer (SOCKFS.emit, pipe writes, ...). - // `stream_ops.poll(stream)` is pure derivation; registration and notification - // are entirely here. - $notifyPollCallback: (node, flags) => { - // Copy first: a woken waiter removes itself as it completes. - node?.pollCallbacks?.slice().forEach((cb) => cb(flags)); - }, - $addPollCallback: (node, cb) => { - (node.pollCallbacks ??= []).push(cb); + // The inode readiness wait-queue: lives on the node (so dup'd fds share it), + // consumed by emscripten_poll_with_callback and async poll, fed by producers + // (SOCKFS.emit, pipe writes). + $notifyNodeListeners__internal: true, + $notifyNodeListeners: (node, flags) => { + // Copy first: a woken listener removes itself as it completes. + for (var cb of node?.listeners?.slice() ?? []) cb(flags); + }, + $addNodeListener__internal: true, + $addNodeListener: (node, cb) => { + (node.listeners ??= []).push(cb); return () => { - var i = node.pollCallbacks.indexOf(cb); - if (i >= 0) node.pollCallbacks.splice(i, 1); + var i = node.listeners.indexOf(cb); + if (i >= 0) node.listeners.splice(i, 1); }; }, - // Asynchronous, callback-based wait on a single fd. Registers interest in - // `events` on `fd` and invokes callback(fd, revents) once any are ready, or - // `timeout` ms elapse (-1 = no timeout, 0 = probe), with revents passed by - // value. Unlike poll() this never suspends the calling stack, so it works - // without ASYNCIFY/JSPI. The single-fd, by-value dual of a blocking poll() - - // waiting on a set is just several of these. + // Callback-based wait on a single fd: invokes callback(fd, revents) once any + // of `events` are ready, or `timeout` ms elapse (-1 = none, 0 = probe). Never + // suspends the stack, so it works without ASYNCIFY/JSPI. // - // Returns 0 once armed/scheduled, -EBADF if `fd` is not open, or -EPERM if the - // descriptor type cannot deliver readiness callbacks (checked first, so an - // unsupported fd is rejected even when it is currently ready - as epoll does). - emscripten_poll_with_callback__deps: ['$FS', '$callUserCallback', '$safeSetTimeout', '$addPollCallback'], + // Returns 0 once armed, -EBADF for a bad fd, or -EPERM if the descriptor type + // can't deliver readiness callbacks (checked first, as epoll does). + emscripten_poll_with_callback__deps: ['$FS', '$callUserCallback', '$safeSetTimeout', '$addNodeListener'], emscripten_poll_with_callback__proxy: 'sync', emscripten_poll_with_callback: (fd, events, timeout, callback) => { var stream = FS.getStream(fd); if (!stream) return -{{{ cDefs.EBADF }}}; - // Capability is a property of the descriptor: only types that announce - // readiness through $notifyPollCallback can be waited on. `pollAsync` is a - // flag, or a predicate when an instance (e.g. a listening socket) can't. - var pollAsync = stream.stream_ops.pollAsync; - if (typeof pollAsync == 'function') pollAsync = pollAsync(stream); - if (!pollAsync) return -{{{ cDefs.EPERM }}}; + // Capability is a property of the descriptor type: only types that announce + // readiness through $notifyNodeListeners (sockets, pipes) can be waited on. + if (!stream.stream_ops.pollable) return -{{{ cDefs.EPERM }}}; // poll() always reports these regardless of the requested events. var mask = events | {{{ cDefs.POLLERR }}} | {{{ cDefs.POLLHUP }}} | {{{ cDefs.POLLNVAL }}}; @@ -739,8 +727,8 @@ var SyscallsLibrary = { removePoll?.(); if (timer) clearTimeout(timer); // Always deliver on a fresh tick: finish() can be reached synchronously - // (ready-now, or a close() that wakes us), and the callback must never run - // - nor trigger runtime exit via maybeExit - inside the caller's stack. + // (ready-now, or a close() wake); the callback must not run in the + // caller's stack. safeSetTimeout(() => { {{{ runtimeKeepalivePop() }}} callUserCallback(() => { @@ -755,10 +743,9 @@ var SyscallsLibrary = { // Ready now, or a zero-timeout probe. finish(revents); } else { - // Enqueue on the node wait-queue; a later notify wakes us. On wake we - // re-derive the full current readiness (the wake flags are only the edge - // that triggered us) and report it as poll() would. - removePoll = addPollCallback(stream.node, (flags) => { + // Enqueue; a later notify wakes us and we re-derive readiness (the wake + // flags are just the trigger). + removePoll = addNodeListener(stream.node, (flags) => { var ready = ((stream.stream_ops.poll?.(stream) ?? 0) | flags) & mask; if (ready) finish(ready); }); diff --git a/test/codesize/test_codesize_hello_O0.json b/test/codesize/test_codesize_hello_O0.json index 5362751d4b9ad..35c3470b5e2e5 100644 --- a/test/codesize/test_codesize_hello_O0.json +++ b/test/codesize/test_codesize_hello_O0.json @@ -1,10 +1,10 @@ { - "a.out.js": 23518, - "a.out.js.gz": 8538, + "a.out.js": 23483, + "a.out.js.gz": 8523, "a.out.nodebug.wasm": 15115, "a.out.nodebug.wasm.gz": 7464, - "total": 38633, - "total_gz": 16002, + "total": 38598, + "total_gz": 15987, "sent": [ "fd_write" ], diff --git a/test/codesize/test_codesize_hello_dylink_all.json b/test/codesize/test_codesize_hello_dylink_all.json index c42532de8a7b5..9195a4cc3b0fd 100644 --- a/test/codesize/test_codesize_hello_dylink_all.json +++ b/test/codesize/test_codesize_hello_dylink_all.json @@ -1,7 +1,7 @@ { - "a.out.js": 269286, + "a.out.js": 269296, "a.out.nodebug.wasm": 587520, - "total": 856806, + "total": 856816, "sent": [ "IMG_Init", "IMG_Load", diff --git a/test/codesize/test_codesize_minimal_O0.expected.js b/test/codesize/test_codesize_minimal_O0.expected.js index 872a36d43dacd..b97d7b35a7ad0 100644 --- a/test/codesize/test_codesize_minimal_O0.expected.js +++ b/test/codesize/test_codesize_minimal_O0.expected.js @@ -1015,8 +1015,6 @@ Module['FS_createPreloadedFile'] = FS.createPreloadedFile; 'addDays', 'getSocketFromFD', 'getSocketAddress', - 'notifyPollCallback', - 'addPollCallback', 'FS_createPreloadedFile', 'FS_preloadFile', 'FS_modeStringToFlags', diff --git a/test/codesize/test_codesize_minimal_O0.json b/test/codesize/test_codesize_minimal_O0.json index d55fb5fc10338..325e0d534493a 100644 --- a/test/codesize/test_codesize_minimal_O0.json +++ b/test/codesize/test_codesize_minimal_O0.json @@ -1,10 +1,10 @@ { - "a.out.js": 18758, - "a.out.js.gz": 6793, + "a.out.js": 18723, + "a.out.js.gz": 6778, "a.out.nodebug.wasm": 1015, "a.out.nodebug.wasm.gz": 602, - "total": 19773, - "total_gz": 7395, + "total": 19738, + "total_gz": 7380, "sent": [], "imports": [], "exports": [ diff --git a/test/codesize/test_unoptimized_code_size.json b/test/codesize/test_unoptimized_code_size.json index 507e85276c037..3e8dbb92eef88 100644 --- a/test/codesize/test_unoptimized_code_size.json +++ b/test/codesize/test_unoptimized_code_size.json @@ -1,16 +1,16 @@ { - "hello_world.js": 55350, - "hello_world.js.gz": 17420, + "hello_world.js": 55305, + "hello_world.js.gz": 17405, "hello_world.wasm": 15115, "hello_world.wasm.gz": 7464, "no_asserts.js": 25683, "no_asserts.js.gz": 8690, "no_asserts.wasm": 12229, "no_asserts.wasm.gz": 6004, - "strict.js": 53078, - "strict.js.gz": 16635, + "strict.js": 53033, + "strict.js.gz": 16620, "strict.wasm": 15115, "strict.wasm.gz": 7461, - "total": 176570, - "total_gz": 63674 + "total": 176480, + "total_gz": 63644 } diff --git a/test/sockets/test_poll_callback.c b/test/sockets/test_poll_callback.c index afa83dce08bdf..e00fb33ef955c 100644 --- a/test/sockets/test_poll_callback.c +++ b/test/sockets/test_poll_callback.c @@ -4,12 +4,13 @@ * University of Illinois/NCSA Open Source License. Both these licenses can be * found in the LICENSE file. * - * Verifies emscripten_poll_with_callback against a real socket: a datagram - * arriving on a bound UDP socket wakes the callback through the SOCKFS readiness - * -> poll-callback bridge, with no select and no main loop. - * Also checks the capability gate: a descriptor type that cannot deliver - * readiness callbacks (a regular file) is rejected with -EPERM even though it is - * always "ready", and a bad fd with -EBADF. + * Verifies emscripten_poll_with_callback against real sockets, all through the + * SOCKFS readiness -> poll-callback bridge, with no select and no main loop: + * - a datagram arriving on a bound UDP socket wakes the callback (POLLIN); + * - a listening socket is readable when a client is queued for accept; + * - closing an fd with a pending waiter delivers POLLNVAL; + * - the capability gate rejects a regular file (-EPERM, even though it is + * always "ready") and a bad fd (-EBADF). */ #include @@ -25,8 +26,9 @@ #include #include -static int rx = -1, tx = -1; +static int rx = -1, tx = -1, lfd = -1, client = -1; static volatile int cancel_revents = -1; +static struct sockaddr_in rx_addr; static void fail(const char* why) { printf("POLL CALLBACK FAIL: %s\n", why); @@ -37,7 +39,6 @@ static void on_cancel(int fd, int revents) { cancel_revents = revents; } -// revents arrives by value - nothing to own or free. static void on_readable(int fd, int revents) { if (!(revents & POLLIN)) fail("socket reported not readable"); char buf[4]; @@ -50,6 +51,26 @@ static void on_readable(int fd, int revents) { printf("POLL CALLBACK PASS\n"); } +// Listener is readable once a client is queued; accept, then chain the +// datagram phase for deterministic ordering. +static void on_accept(int fd, int revents) { + if (!(revents & POLLIN)) fail("listener reported not readable"); + struct sockaddr_in caddr; + socklen_t clen = sizeof(caddr); + int c = accept(lfd, (struct sockaddr*)&caddr, &clen); + if (c < 0) fail("accept after readiness"); + close(c); + close(lfd); + close(client); + + if (emscripten_poll_with_callback(rx, POLLIN, -1, on_readable) != 0) { + fail("poll_with_callback did not arm"); + } + if (sendto(tx, "ping", 4, 0, (struct sockaddr*)&rx_addr, sizeof(rx_addr)) != 4) { + fail("sendto"); + } +} + int main(void) { // Capability gate: a regular file can't deliver readiness callbacks, so it is // rejected with -EPERM even though a regular file is always ready. @@ -65,23 +86,7 @@ int main(void) { fail("invalid fd should be rejected with -EBADF"); } - // A listening socket can't yet deliver accept readiness through the seam, so - // it is reported unpollable rather than arming a waiter that never fires. - int lfd = socket(AF_INET, SOCK_STREAM, 0); - assert(lfd >= 0); - struct sockaddr_in laddr; - memset(&laddr, 0, sizeof(laddr)); - laddr.sin_family = AF_INET; - inet_pton(AF_INET, "127.0.0.1", &laddr.sin_addr); - assert(bind(lfd, (struct sockaddr*)&laddr, sizeof(laddr)) == 0); - assert(listen(lfd, 1) == 0); - if (emscripten_poll_with_callback(lfd, POLLIN, 0, on_readable) != -EPERM) { - fail("listening socket should be rejected with -EPERM"); - } - close(lfd); - - // Closing an fd with a pending waiter delivers POLLNVAL (no leak, no hang). - // The callback fires on a later tick, so it's checked in on_readable below. + // Closing an fd with a pending waiter delivers POLLNVAL (checked in on_readable). int cfd = socket(AF_INET, SOCK_DGRAM, 0); assert(cfd >= 0); if (emscripten_poll_with_callback(cfd, POLLIN, -1, on_cancel) != 0) { @@ -89,29 +94,38 @@ int main(void) { } close(cfd); + // Prepare the datagram receiver up front; it is armed and fed from on_accept. rx = socket(AF_INET, SOCK_DGRAM, 0); tx = socket(AF_INET, SOCK_DGRAM, 0); assert(rx >= 0 && tx >= 0); + memset(&rx_addr, 0, sizeof(rx_addr)); + rx_addr.sin_family = AF_INET; + rx_addr.sin_port = htons(0); // ephemeral + inet_pton(AF_INET, "127.0.0.1", &rx_addr.sin_addr); + if (bind(rx, (struct sockaddr*)&rx_addr, sizeof(rx_addr)) != 0) fail("bind rx"); + socklen_t l = sizeof(rx_addr); + if (getsockname(rx, (struct sockaddr*)&rx_addr, &l) != 0) fail("getsockname rx"); - struct sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_port = htons(0); // ephemeral - inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); - if (bind(rx, (struct sockaddr*)&addr, sizeof(addr)) != 0) fail("bind"); - - socklen_t l = sizeof(addr); - if (getsockname(rx, (struct sockaddr*)&addr, &l) != 0) fail("getsockname"); - - // Arm the callback before the datagram is sent: rx isn't readable yet, so this - // registers a waiter that the arriving datagram (SOCKFS 'message' emit) wakes. - if (emscripten_poll_with_callback(rx, POLLIN, -1, on_readable) != 0) { - fail("poll_with_callback did not arm"); - } - - if (sendto(tx, "ping", 4, 0, (struct sockaddr*)&addr, sizeof(addr)) != 4) { - fail("sendto"); + // A listening socket is pollable: arm a callback, then connect a client. The + // queued connection wakes the listener with POLLIN (see on_accept). + lfd = socket(AF_INET, SOCK_STREAM, 0); + assert(lfd >= 0); + struct sockaddr_in laddr; + memset(&laddr, 0, sizeof(laddr)); + laddr.sin_family = AF_INET; + laddr.sin_port = htons(0); + inet_pton(AF_INET, "127.0.0.1", &laddr.sin_addr); + assert(bind(lfd, (struct sockaddr*)&laddr, sizeof(laddr)) == 0); + socklen_t ll = sizeof(laddr); + assert(getsockname(lfd, (struct sockaddr*)&laddr, &ll) == 0); + assert(listen(lfd, 1) == 0); + if (emscripten_poll_with_callback(lfd, POLLIN, -1, on_accept) != 0) { + fail("listening socket should arm"); } + client = socket(AF_INET, SOCK_STREAM, 0); + assert(client >= 0); + // connect is async; EINPROGRESS is expected, not an error. + connect(client, (struct sockaddr*)&laddr, sizeof(laddr)); return 0; } diff --git a/test/test_sockets.py b/test/test_sockets.py index 3ec43161c3b54..01bf5e13d907b 100644 --- a/test/test_sockets.py +++ b/test/test_sockets.py @@ -477,9 +477,10 @@ def test_noderawsockets_udp_ipv6(self): @also_with_proxy_to_pthread def test_noderawsockets_poll_callback(self): - # emscripten_poll_with_callback waits on a real socket's readiness (a UDP - # datagram arrival) via the SOCKFS emit -> poll-callback bridge, and rejects - # descriptor types that can't deliver callbacks (-EPERM) and bad fds (-EBADF). + # emscripten_poll_with_callback waits on real socket readiness via the SOCKFS + # emit -> poll-callback bridge: a UDP datagram arrival, and a listening socket + # becoming readable when a client is queued for accept. It rejects descriptor + # types that can't deliver callbacks (-EPERM) and bad fds (-EBADF). self.do_runf('sockets/test_poll_callback.c', 'POLL CALLBACK PASS', cflags=['-sNODERAWSOCKETS', '-sFORCE_FILESYSTEM', '-sEXIT_RUNTIME']) def test_noderawsockets_poll_socket_blocking(self):