diff --git a/site/source/docs/api_reference/emscripten.h.rst b/site/source/docs/api_reference/emscripten.h.rst index fdb9567ffcd86..ae9c8a785be4c 100644 --- a/site/source/docs/api_reference/emscripten.h.rst +++ b/site/source/docs/api_reference/emscripten.h.rst @@ -1280,6 +1280,38 @@ Functions :param em_socket_callback callback: Pointer to a callback function. The callback returns a file descriptor and the arbitrary ``userData`` passed to this function. +.. c:type:: em_poll_callback + + Function pointer type for the :c:func:`emscripten_poll_with_callback` callback, + defined as: :: + + typedef void (*em_poll_callback)(int fd, int revents); + + ``revents`` is delivered by value (the same bitmask :c:func:`poll` would leave + in ``pollfd.revents``), so there is nothing to own or free. + + +.. c:function:: int emscripten_poll_with_callback(int fd, int events, int timeout, em_poll_callback callback) + + Asynchronous, callback-based wait on a single fd's readiness. Registers interest + in ``events`` (``POLLIN``/``POLLOUT``/...) on ``fd`` and invokes + ``callback(fd, revents)`` once any of them is ready - or ``timeout`` milliseconds + elapse, in which case ``revents`` is ``0``. Unlike :c:func:`poll`, this never + blocks the calling stack, so it works without ASYNCIFY/JSPI; it is the single-fd + dual of a blocking :c:func:`poll` (wait on a set with several of these). + + Capability is a property of the descriptor type, checked before arming: only + types that announce readiness (sockets, pipes) can be waited on. An unsupported + fd is rejected with ``-EPERM`` even when it is currently ready, mirroring + :c:func:`epoll_ctl`. + + :param fd: The file descriptor to wait on. + :param events: Requested event mask, as for :c:func:`poll` (``POLLIN``, ``POLLOUT``, ...). + :param timeout: Milliseconds to wait before giving up; ``-1`` waits indefinitely and ``0`` is a non-blocking probe. + :param callback: Invoked with ``fd`` and the ready ``revents`` (``0`` on timeout). + :returns: ``0`` once armed, ``-EBADF`` if ``fd`` is not open, or ``-EPERM`` if ``fd``'s descriptor type cannot deliver readiness callbacks. + + Unaligned types =============== diff --git a/src/lib/libpipefs.js b/src/lib/libpipefs.js index c99ffb7bc29b6..23d4f8b5c8256 100644 --- a/src/lib/libpipefs.js +++ b/src/lib/libpipefs.js @@ -6,7 +6,7 @@ addToLibrary({ $PIPEFS__postset: () => addAtInit('PIPEFS.root = FS.mount(PIPEFS, {}, null);'), - $PIPEFS__deps: ['$FS'], + $PIPEFS__deps: ['$FS', '$notifyNodeListeners'], $PIPEFS: { BUCKET_BUFFER_SIZE: 1024 * 8, // 8KiB Buffer mount(mount) { @@ -21,23 +21,6 @@ addToLibrary({ // able to read from the read end after write end is closed. refcnt : 2, timestamp: new Date(), -#if PTHREADS || ASYNCIFY - readableHandlers: [], - registerReadableHandler: (callback) => { - callback.registerCleanupFunc(() => { - const i = pipe.readableHandlers.indexOf(callback); - if (i !== -1) pipe.readableHandlers.splice(i, 1); - }); - pipe.readableHandlers.push(callback); - }, - notifyReadableHandlers: () => { - while (pipe.readableHandlers.length > 0) { - const cb = pipe.readableHandlers.shift(); - if (cb) cb({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); - } - pipe.readableHandlers = []; - } -#endif }; pipe.buckets.push({ @@ -53,6 +36,8 @@ addToLibrary({ rNode.pipe = pipe; wNode.pipe = pipe; + // The read end's node carries the poll wait-queue; writes wake it. + pipe.readNode = rNode; var readableStream = FS.createStream({ path: rName, @@ -97,7 +82,8 @@ addToLibrary({ blocks: 0, }; }, - poll(stream, timeout, notifyCallback) { + pollable: true, + poll(stream) { var pipe = stream.node.pipe; if ((stream.flags & {{{ cDefs.O_ACCMODE }}}) === {{{ cDefs.O_WRONLY }}}) { @@ -108,10 +94,6 @@ addToLibrary({ return ({{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); } } - -#if PTHREADS || ASYNCIFY - if (notifyCallback) pipe.registerReadableHandler(notifyCallback); -#endif return 0; }, dup(stream) { @@ -233,9 +215,7 @@ addToLibrary({ if (freeBytesInCurrBuffer >= dataLen) { currBucket.buffer.set(data, currBucket.offset); currBucket.offset += dataLen; -#if PTHREADS || ASYNCIFY - pipe.notifyReadableHandlers(); -#endif + notifyNodeListeners(pipe.readNode, {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); return dataLen; } else if (freeBytesInCurrBuffer > 0) { currBucket.buffer.set(data.subarray(0, freeBytesInCurrBuffer), currBucket.offset); @@ -267,9 +247,7 @@ addToLibrary({ newBucket.buffer.set(data); } -#if PTHREADS || ASYNCIFY - pipe.notifyReadableHandlers(); -#endif + notifyNodeListeners(pipe.readNode, {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); return dataLen; }, close(stream) { diff --git a/src/lib/libsigs.js b/src/lib/libsigs.js index 746ee98ff5cef..14e1ce1b49651 100644 --- a/src/lib/libsigs.js +++ b/src/lib/libsigs.js @@ -722,6 +722,7 @@ sigs = { emscripten_pc_get_function__sig: 'pp', emscripten_pc_get_line__sig: 'ip', emscripten_performance_now__sig: 'd', + emscripten_poll_with_callback__sig: 'iiiip', emscripten_print_double__sig: 'idpi', emscripten_promise_all__sig: 'pppp', emscripten_promise_all_settled__sig: 'pppp', diff --git a/src/lib/libsockfs.js b/src/lib/libsockfs.js index 66bcdcb162a42..50c4c77f90ddc 100644 --- a/src/lib/libsockfs.js +++ b/src/lib/libsockfs.js @@ -8,7 +8,7 @@ addToLibrary({ $SOCKFS__postset: () => { addAtInit('SOCKFS.root = FS.mount(SOCKFS, {}, null);'); }, - $SOCKFS__deps: ['$FS', + $SOCKFS__deps: ['$FS', '$notifyNodeListeners', #if NODERAWSOCKETS '$nodeSockOps', #endif @@ -23,6 +23,16 @@ addToLibrary({ }, emit(event, param) { SOCKFS.callbacks[event]?.(param); + // Bridge socket readiness into the inode wait-queue. + var fd = event === 'error' ? param[0] : param; + var flags = { + 'message': {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}, + 'open': {{{ cDefs.POLLOUT }}}, + 'connection': {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}, + 'close': {{{ cDefs.POLLIN }}} | {{{ cDefs.POLLHUP }}}, + 'error': {{{ cDefs.POLLERR }}}, + }[event]; + if (flags) notifyNodeListeners(FS.getStream(fd)?.node, flags); }, mount(mount) { #if expectToReceiveOnModule('websocket') @@ -114,6 +124,8 @@ addToLibrary({ }, // node and stream ops are backend agnostic stream_ops: { + // Readiness is announced through SOCKFS.emit -> notifyNodeListeners. + pollable: true, poll(stream) { var sock = stream.node.sock; return sock.sock_ops.poll(sock); @@ -138,6 +150,8 @@ addToLibrary({ }, close(stream) { var sock = stream.node.sock; + // The fd is going away: wake waiters with POLLNVAL so they don't hang. + notifyNodeListeners(stream.node, {{{ cDefs.POLLNVAL }}}); sock.sock_ops.close(sock); } }, @@ -555,6 +569,8 @@ addToLibrary({ // push to queue for accept to pick up sock.pending.push(newsock); SOCKFS.emit('connection', newsock.stream.fd); + // A queued client makes the listener readable (POLLIN). + notifyNodeListeners(sock.stream.node, {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); } else { // create a peer on the listen socket so calling sendto // with the listen socket and an address will resolve diff --git a/src/lib/libsockfs_node.js b/src/lib/libsockfs_node.js index 067b435f2d75e..6ba7b712d2662 100644 --- a/src/lib/libsockfs_node.js +++ b/src/lib/libsockfs_node.js @@ -314,7 +314,7 @@ var NodeSockFSLibrary = { }); }, }, - $nodeSockOps__deps: ['$nodeSockHelpers', '$SOCKFS', '$ERRNO_CODES'], + $nodeSockOps__deps: ['$nodeSockHelpers', '$SOCKFS', '$ERRNO_CODES', '$notifyNodeListeners'], $nodeSockOps__postset: ` if (!ENVIRONMENT_IS_NODE) { throw new Error("NODERAWSOCKETS is currently only supported on Node.js environment.") @@ -507,6 +507,8 @@ var NodeSockFSLibrary = { try { conn.resume(); } catch (e) {} // paused by pauseOnConnect sock.pending.push(newsock); SOCKFS.emit('connection', newsock.stream.fd); + // A queued client makes the listener readable (POLLIN). + notifyNodeListeners(sock.stream.node, {{{ cDefs.POLLRDNORM }}} | {{{ cDefs.POLLIN }}}); }); server.on('error', (e) => { sock.error = nodeSockHelpers.nodeErrToErrno(e); diff --git a/src/lib/libsyscall.js b/src/lib/libsyscall.js index d38357a67eb5e..1d5b45bf63863 100644 --- a/src/lib/libsyscall.js +++ b/src/lib/libsyscall.js @@ -602,7 +602,7 @@ var SyscallsLibrary = { } #endif - var count = doPoll(fds, nfds, 0, undefined); + var count = doPoll(fds, nfds); #if ASSERTIONS if (!count && timeout != 0) warnOnce('non-zero poll() timeout not supported: ' + timeout) #endif @@ -610,79 +610,53 @@ var SyscallsLibrary = { }, #if PTHREADS || ASYNCIFY $doPollAsync__internal: true, - $doPollAsync__deps: ['$FS', '$doPoll'], + $doPollAsync__deps: ['$FS', '$doPoll', '$addNodeListener'], $doPollAsync: (fds, nfds, timeout) => { #if RUNTIME_DEBUG dbg('async poll start'); #endif - - // Enable event handlers only when the poll call is proxied from a worker. - // TODO: Could use `Promise.withResolvers` here if we know its available. var resolve; var promise = new Promise((resolve_) => { resolve = resolve_; }); - var cleanupFuncs = []; + var removers = []; + var timer; var notifyDone = false; - - function asyncPollComplete(count) { - if (notifyDone) { - return; - } + function complete(count) { + if (notifyDone) return; notifyDone = true; #if RUNTIME_DEBUG dbg('asyncPollComplete', count); #endif - cleanupFuncs.forEach(cb => cb()); + for (var r of removers) r(); + if (timer) clearTimeout(timer); resolve(count); } - function makeNotifyCallback(stream, pollfd) { - var cb = (flags) => { - if (notifyDone) { - return; - } -#if RUNTIME_DEBUG - dbg(`async poll notify: stream=${stream}`); -#endif - var events = {{{ makeGetValue('pollfd', C_STRUCTS.pollfd.events, 'i16') }}}; - flags &= events | {{{ cDefs.POLLERR }}} | {{{ cDefs.POLLHUP }}}; -#if ASSERTIONS - assert(flags) -#endif - {{{ makeSetValue('pollfd', C_STRUCTS.pollfd.revents, 'flags', 'i16') }}}; - asyncPollComplete(1); - } - cb.registerCleanupFunc = (f) => { - if (f) cleanupFuncs.push(f); - } - return cb; - } - if (timeout > 0) { - var t = setTimeout(() => { -#if RUNTIME_DEBUG - dbg('poll: timeout', timeout); -#endif - asyncPollComplete(0); - }, timeout); - cleanupFuncs.push(() => clearTimeout(t)); - } - // A zero timeout never registers notifications: the derivation alone - // answers, matching the non-blocking probe. - var count = doPoll(fds, nfds, timeout, makeNotifyCallback); + var count = doPoll(fds, nfds); if (count || !timeout) { - asyncPollComplete(count); + complete(count); + } else { + // Suspend: one waiter per fd; any wake re-derives the whole set. Deriving + // and registering with no event-loop turn between avoids a lost wakeup. + var recheck = () => { + if (notifyDone) return; + var c = doPoll(fds, nfds); + if (c) complete(c); + }; + for (var i = 0; i < nfds; i++) { + var pollfd = fds + {{{ C_STRUCTS.pollfd.__size__ }}} * i; + var stream = FS.getStream({{{ makeGetValue('pollfd', C_STRUCTS.pollfd.fd, 'i32') }}}); + if (stream) removers.push(addNodeListener(stream.node, recheck)); + } + if (timeout > 0) timer = setTimeout(() => complete(0), timeout); } return promise; }, #endif - // The shared readiness derivation: one pass over the pollfds, writing - // revents and returning the ready count. With a nonzero `timeout`, a - // readiness notification is also registered on each stream by the same - // `stream_ops.poll` call that derives it, so there is no window between - // registration and derivation; a zero `timeout` means the caller will not - // wait, so no notification is registered (the plain probe). + // Pure readiness derivation: one pass writing revents, returning the ready + // count. Registration is the consumer's job (doPollAsync, poll_with_callback). $doPoll__internal: true, $doPoll__deps: ['$FS'], - $doPoll: (fds, nfds, timeout, makeNotifyCallback) => { + $doPoll: (fds, nfds) => { var count = 0; for (var i = 0; i < nfds; i++) { var pollfd = fds + {{{ C_STRUCTS.pollfd.__size__ }}} * i; @@ -692,9 +666,7 @@ var SyscallsLibrary = { var stream = FS.getStream(fd); if (stream) { if (stream.stream_ops.poll) { - flags = timeout - ? stream.stream_ops.poll(stream, timeout, makeNotifyCallback(stream, pollfd)) - : stream.stream_ops.poll(stream, -1); + flags = stream.stream_ops.poll(stream); } else { flags = {{{ cDefs.POLLIN | cDefs.POLLOUT }}}; } @@ -705,15 +677,81 @@ var SyscallsLibrary = { } return count; }, - // libc routes zero-timeout poll() calls here: the same synchronous - // readiness derivation as __syscall_poll, but as a plain import that never - // suspends, so probes stay callable from any context (under JSPI, - // __syscall_poll is a suspending import and traps when called from a stack - // that wasn't entered through a promising export). + // Zero-timeout poll() probes route here: the same derivation as __syscall_poll + // but a non-suspending import, so they stay callable from any context (a + // suspending __syscall_poll traps off a non-promising stack under JSPI). __syscall_poll_nonblocking__proxy: 'sync', __syscall_poll_nonblocking__deps: ['$doPoll'], __syscall_poll_nonblocking: (fds, nfds) => { - return doPoll(fds, nfds, 0, undefined); + return doPoll(fds, nfds); + }, + + // The inode readiness wait-queue: lives on the node (so dup'd fds share it), + // consumed by emscripten_poll_with_callback and async poll, fed by producers + // (SOCKFS.emit, pipe writes). + $notifyNodeListeners__internal: true, + $notifyNodeListeners: (node, flags) => { + // Copy first: a woken listener removes itself as it completes. + for (var cb of node?.listeners?.slice() ?? []) cb(flags); + }, + $addNodeListener__internal: true, + $addNodeListener: (node, cb) => { + (node.listeners ??= []).push(cb); + return () => { + var i = node.listeners.indexOf(cb); + if (i >= 0) node.listeners.splice(i, 1); + }; + }, + + // Callback-based wait on a single fd: invokes callback(fd, revents) once any + // of `events` are ready, or `timeout` ms elapse (-1 = none, 0 = probe). Never + // suspends the stack, so it works without ASYNCIFY/JSPI. + // + // Returns 0 once armed, -EBADF for a bad fd, or -EPERM if the descriptor type + // can't deliver readiness callbacks (checked first, as epoll does). + emscripten_poll_with_callback__deps: ['$FS', '$callUserCallback', '$safeSetTimeout', '$addNodeListener'], + emscripten_poll_with_callback__proxy: 'sync', + emscripten_poll_with_callback: (fd, events, timeout, callback) => { + var stream = FS.getStream(fd); + if (!stream) return -{{{ cDefs.EBADF }}}; + // Capability is a property of the descriptor type: only types that announce + // readiness through $notifyNodeListeners (sockets, pipes) can be waited on. + if (!stream.stream_ops.pollable) return -{{{ cDefs.EPERM }}}; + + // poll() always reports these regardless of the requested events. + var mask = events | {{{ cDefs.POLLERR }}} | {{{ cDefs.POLLHUP }}} | {{{ cDefs.POLLNVAL }}}; + var done = false, removePoll, timer; + function finish(revents) { + if (done) return; + done = true; + removePoll?.(); + if (timer) clearTimeout(timer); + // Always deliver on a fresh tick: finish() can be reached synchronously + // (ready-now, or a close() wake); the callback must not run in the + // caller's stack. + safeSetTimeout(() => { + {{{ runtimeKeepalivePop() }}} + callUserCallback(() => { + {{{ makeDynCall('vii', 'callback') }}}(fd, revents); + }); + }, 0); + } + + {{{ runtimeKeepalivePush() }}} + var revents = (stream.stream_ops.poll?.(stream) ?? {{{ cDefs.POLLIN | cDefs.POLLOUT }}}) & mask; + if (revents || !timeout) { + // Ready now, or a zero-timeout probe. + finish(revents); + } else { + // Enqueue; a later notify wakes us and we re-derive readiness (the wake + // flags are just the trigger). + removePoll = addNodeListener(stream.node, (flags) => { + var ready = ((stream.stream_ops.poll?.(stream) ?? 0) | flags) & mask; + if (ready) finish(ready); + }); + if (timeout > 0) timer = setTimeout(() => finish(0), timeout); + } + return 0; }, __syscall_getcwd__deps: ['$lengthBytesUTF8', '$stringToUTF8'], __syscall_getcwd: (buf, size) => { diff --git a/system/include/emscripten/emscripten.h b/system/include/emscripten/emscripten.h index 43d2f2899dd0e..8adf1114f5793 100644 --- a/system/include/emscripten/emscripten.h +++ b/system/include/emscripten/emscripten.h @@ -20,6 +20,8 @@ * is up at http://kripken.github.io/emscripten-site/docs/api_reference/emscripten.h.html */ +#include + #include "em_asm.h" #include "em_js.h" #include "em_macros.h" @@ -68,6 +70,20 @@ void emscripten_set_socket_connection_callback(void *userData, em_socket_callbac void emscripten_set_socket_message_callback(void *userData, em_socket_callback callback); void emscripten_set_socket_close_callback(void *userData, em_socket_callback callback); +// Asynchronous, callback-based wait on a single fd's readiness. Registers +// interest in `events` (POLLIN/POLLOUT/...) on `fd` and invokes +// callback(fd, revents) once any of them is ready - or `timeout` ms elapse, in +// which case revents is 0. revents is delivered by value, so there is nothing to +// allocate, own, or keep alive across the call. Unlike poll() this never blocks +// the calling stack, so it works without ASYNCIFY/JSPI; it is the single-fd dual +// of a blocking poll() (wait on a set with several of these). +// +// Returns 0 once armed, -EBADF if `fd` is not open, or -EPERM if `fd`'s +// descriptor type cannot deliver readiness callbacks (checked first, so an +// unsupported fd is rejected even when currently ready, as epoll_ctl does). +typedef void (*em_poll_callback)(int fd, int revents); +int emscripten_poll_with_callback(int fd, int events, int timeout, em_poll_callback callback); + void _emscripten_push_main_loop_blocker(em_arg_callback_func func, void *arg, const char *name); void _emscripten_push_uncounted_main_loop_blocker(em_arg_callback_func func, void *arg, const char *name); #define emscripten_push_main_loop_blocker(func, arg) \ diff --git a/test/codesize/test_codesize_hello_dylink_all.json b/test/codesize/test_codesize_hello_dylink_all.json index eb27bc739ba6a..9195a4cc3b0fd 100644 --- a/test/codesize/test_codesize_hello_dylink_all.json +++ b/test/codesize/test_codesize_hello_dylink_all.json @@ -1,7 +1,7 @@ { - "a.out.js": 268495, + "a.out.js": 269296, "a.out.nodebug.wasm": 587520, - "total": 856015, + "total": 856816, "sent": [ "IMG_Init", "IMG_Load", @@ -839,6 +839,7 @@ "emscripten_pc_get_function", "emscripten_pc_get_line", "emscripten_performance_now", + "emscripten_poll_with_callback", "emscripten_print_double", "emscripten_promise_all", "emscripten_promise_all_settled", diff --git a/test/sockets/test_poll_callback.c b/test/sockets/test_poll_callback.c new file mode 100644 index 0000000000000..e00fb33ef955c --- /dev/null +++ b/test/sockets/test_poll_callback.c @@ -0,0 +1,131 @@ +/* + * Copyright 2026 The Emscripten Authors. All rights reserved. + * Emscripten is available under two separate licenses, the MIT license and the + * University of Illinois/NCSA Open Source License. Both these licenses can be + * found in the LICENSE file. + * + * Verifies emscripten_poll_with_callback against real sockets, all through the + * SOCKFS readiness -> poll-callback bridge, with no select and no main loop: + * - a datagram arriving on a bound UDP socket wakes the callback (POLLIN); + * - a listening socket is readable when a client is queued for accept; + * - closing an fd with a pending waiter delivers POLLNVAL; + * - the capability gate rejects a regular file (-EPERM, even though it is + * always "ready") and a bad fd (-EBADF). + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static int rx = -1, tx = -1, lfd = -1, client = -1; +static volatile int cancel_revents = -1; +static struct sockaddr_in rx_addr; + +static void fail(const char* why) { + printf("POLL CALLBACK FAIL: %s\n", why); + abort(); +} + +static void on_cancel(int fd, int revents) { + cancel_revents = revents; +} + +static void on_readable(int fd, int revents) { + if (!(revents & POLLIN)) fail("socket reported not readable"); + char buf[4]; + ssize_t n = recv(fd, buf, sizeof(buf), 0); + if (n != 4 || memcmp(buf, "ping", 4) != 0) fail("did not receive datagram"); + // The earlier close()-cancellation must have delivered POLLNVAL by now. + if (cancel_revents != POLLNVAL) fail("close did not deliver POLLNVAL"); + close(rx); + close(tx); + printf("POLL CALLBACK PASS\n"); +} + +// Listener is readable once a client is queued; accept, then chain the +// datagram phase for deterministic ordering. +static void on_accept(int fd, int revents) { + if (!(revents & POLLIN)) fail("listener reported not readable"); + struct sockaddr_in caddr; + socklen_t clen = sizeof(caddr); + int c = accept(lfd, (struct sockaddr*)&caddr, &clen); + if (c < 0) fail("accept after readiness"); + close(c); + close(lfd); + close(client); + + if (emscripten_poll_with_callback(rx, POLLIN, -1, on_readable) != 0) { + fail("poll_with_callback did not arm"); + } + if (sendto(tx, "ping", 4, 0, (struct sockaddr*)&rx_addr, sizeof(rx_addr)) != 4) { + fail("sendto"); + } +} + +int main(void) { + // Capability gate: a regular file can't deliver readiness callbacks, so it is + // rejected with -EPERM even though a regular file is always ready. + int filefd = open("/tmp/poll_cb.tmp", O_CREAT | O_RDWR, 0600); + assert(filefd >= 0); + if (emscripten_poll_with_callback(filefd, POLLIN, 0, on_readable) != -EPERM) { + fail("regular file should be rejected with -EPERM"); + } + close(filefd); + + // A bad fd is -EBADF. + if (emscripten_poll_with_callback(9999, POLLIN, 0, on_readable) != -EBADF) { + fail("invalid fd should be rejected with -EBADF"); + } + + // Closing an fd with a pending waiter delivers POLLNVAL (checked in on_readable). + int cfd = socket(AF_INET, SOCK_DGRAM, 0); + assert(cfd >= 0); + if (emscripten_poll_with_callback(cfd, POLLIN, -1, on_cancel) != 0) { + fail("poll_with_callback did not arm for cancel"); + } + close(cfd); + + // Prepare the datagram receiver up front; it is armed and fed from on_accept. + rx = socket(AF_INET, SOCK_DGRAM, 0); + tx = socket(AF_INET, SOCK_DGRAM, 0); + assert(rx >= 0 && tx >= 0); + memset(&rx_addr, 0, sizeof(rx_addr)); + rx_addr.sin_family = AF_INET; + rx_addr.sin_port = htons(0); // ephemeral + inet_pton(AF_INET, "127.0.0.1", &rx_addr.sin_addr); + if (bind(rx, (struct sockaddr*)&rx_addr, sizeof(rx_addr)) != 0) fail("bind rx"); + socklen_t l = sizeof(rx_addr); + if (getsockname(rx, (struct sockaddr*)&rx_addr, &l) != 0) fail("getsockname rx"); + + // A listening socket is pollable: arm a callback, then connect a client. The + // queued connection wakes the listener with POLLIN (see on_accept). + lfd = socket(AF_INET, SOCK_STREAM, 0); + assert(lfd >= 0); + struct sockaddr_in laddr; + memset(&laddr, 0, sizeof(laddr)); + laddr.sin_family = AF_INET; + laddr.sin_port = htons(0); + inet_pton(AF_INET, "127.0.0.1", &laddr.sin_addr); + assert(bind(lfd, (struct sockaddr*)&laddr, sizeof(laddr)) == 0); + socklen_t ll = sizeof(laddr); + assert(getsockname(lfd, (struct sockaddr*)&laddr, &ll) == 0); + assert(listen(lfd, 1) == 0); + if (emscripten_poll_with_callback(lfd, POLLIN, -1, on_accept) != 0) { + fail("listening socket should arm"); + } + client = socket(AF_INET, SOCK_STREAM, 0); + assert(client >= 0); + // connect is async; EINPROGRESS is expected, not an error. + connect(client, (struct sockaddr*)&laddr, sizeof(laddr)); + + return 0; +} diff --git a/test/sockets/test_poll_socket_blocking.c b/test/sockets/test_poll_socket_blocking.c new file mode 100644 index 0000000000000..8ebcdc4e6d0d7 --- /dev/null +++ b/test/sockets/test_poll_socket_blocking.c @@ -0,0 +1,78 @@ +/* + * Copyright 2026 The Emscripten Authors. All rights reserved. + * Emscripten is available under two separate licenses, the MIT license and the + * University of Illinois/NCSA Open Source License. Both these licenses can be + * found in the LICENSE file. + * + * A *blocking* poll() on a real socket is woken by data that arrives *after* the + * poll has already blocked. The datagram is sent on a delay (from another thread + * under -pthread, or a timer under JSPI) so poll() must suspend - the calling + * stack under JSPI, or the proxied worker under PROXY_TO_PTHREAD - and be woken + * by the arrival through the unified readiness wait-queue. Sockets gained this + * from the poll()/poll_with_callback convergence: sock_ops.poll() never + * registered a notifier before, so a blocking poll() on a socket woken purely by + * I/O could not work (it would block forever). + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef __EMSCRIPTEN_PTHREADS__ +#include +#endif + +static int rx = -1, tx = -1; +static struct sockaddr_in addr; + +static void send_ping(void* arg) { + assert(sendto(tx, "ping", 4, 0, (struct sockaddr*)&addr, sizeof(addr)) == 4); +} + +#ifdef __EMSCRIPTEN_PTHREADS__ +static void* sender(void* arg) { + usleep(100000); // let poll() block first + send_ping(NULL); + return NULL; +} +#endif + +int main(void) { + rx = socket(AF_INET, SOCK_DGRAM, 0); + tx = socket(AF_INET, SOCK_DGRAM, 0); + assert(rx >= 0 && tx >= 0); + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); + assert(bind(rx, (struct sockaddr*)&addr, sizeof(addr)) == 0); + socklen_t l = sizeof(addr); + assert(getsockname(rx, (struct sockaddr*)&addr, &l) == 0); + + // Arrange the datagram to arrive only after poll() is already blocking, so it + // can only complete by being woken - not by the initial readiness derivation. +#ifdef __EMSCRIPTEN_PTHREADS__ + pthread_t t; + assert(pthread_create(&t, NULL, sender, NULL) == 0); +#else + emscripten_async_call(send_ping, NULL, 100); +#endif + + struct pollfd pfd = { .fd = rx, .events = POLLIN }; + int n = poll(&pfd, 1, -1); // blocks; only the arrival can wake it + assert(n == 1 && (pfd.revents & POLLIN)); + + char buf[4]; + assert(recv(rx, buf, sizeof(buf), 0) == 4 && memcmp(buf, "ping", 4) == 0); + + close(rx); + close(tx); + printf("POLL SOCKET BLOCKING PASS\n"); + return 0; +} diff --git a/test/test_sockets.py b/test/test_sockets.py index 53f5bf9d90c29..01bf5e13d907b 100644 --- a/test/test_sockets.py +++ b/test/test_sockets.py @@ -475,6 +475,36 @@ def test_noderawsockets_udp_ipv6(self): self.skipTest('no IPv6 loopback available') self.do_runf('sockets/test_udp_ipv6.c', 'UDP IPV6 PASS', cflags=['-sNODERAWSOCKETS']) + @also_with_proxy_to_pthread + def test_noderawsockets_poll_callback(self): + # emscripten_poll_with_callback waits on real socket readiness via the SOCKFS + # emit -> poll-callback bridge: a UDP datagram arrival, and a listening socket + # becoming readable when a client is queued for accept. It rejects descriptor + # types that can't deliver callbacks (-EPERM) and bad fds (-EBADF). + self.do_runf('sockets/test_poll_callback.c', 'POLL CALLBACK PASS', cflags=['-sNODERAWSOCKETS', '-sFORCE_FILESYSTEM', '-sEXIT_RUNTIME']) + + def test_noderawsockets_poll_socket_blocking(self): + # A blocking poll() on a socket is woken by incoming data through the unified + # readiness wait-queue - a capability sockets gained from the poll() / + # poll_with_callback convergence (sock_ops.poll never registered notifiers). + self.do_runf('sockets/test_poll_socket_blocking.c', 'POLL SOCKET BLOCKING PASS', + cflags=['-sNODERAWSOCKETS', '-pthread', '-sPROXY_TO_PTHREAD', '-sEXIT_RUNTIME']) + + def test_noderawsockets_poll_socket_blocking_jspi(self): + # Same, but the blocking poll() suspends the wasm stack under JSPI. + # NODERAWSOCKETS runs under node rather than the browser, so gate JSPI on + # node's own support (v24) instead of require_jspi's browser-test path. + if 'EMTEST_SKIP_JSPI' in os.environ: + self.skipTest('skipping JSPI (EMTEST_SKIP_JSPI is set)') + if not self.try_require_node_version(24): + self.skipTest('JSPI requires node v24') + if not common.check_node_version(26): + self.node_args += ['--experimental-wasm-stack-switching'] + self.cflags += ['-Wno-experimental'] + self.set_setting('JSPI') + self.do_runf('sockets/test_poll_socket_blocking.c', 'POLL SOCKET BLOCKING PASS', + cflags=['-sNODERAWSOCKETS', '-sEXIT_RUNTIME']) + @also_with_proxy_to_pthread def test_noderawsockets_udp(self): # Self-contained loopback UDP echo: the server binds(:0)+getsockname for its