From 6c399dd4483555bd8f3304aef6d01dfebfb4be0d Mon Sep 17 00:00:00 2001 From: Antonio Jimenez Date: Sun, 12 Apr 2026 00:41:50 +0200 Subject: [PATCH 01/21] Sketch UringSystem with new PollingSystem APIs --- .../src/main/resources/scala-native/uring.c | 19 + .../cats/effect/unsafe/UringSystem.scala | 450 ++++++++++++++++++ 2 files changed, 469 insertions(+) create mode 100644 core/native/src/main/resources/scala-native/uring.c create mode 100644 core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala diff --git a/core/native/src/main/resources/scala-native/uring.c b/core/native/src/main/resources/scala-native/uring.c new file mode 100644 index 0000000000..a115cb35b5 --- /dev/null +++ b/core/native/src/main/resources/scala-native/uring.c @@ -0,0 +1,19 @@ +#include + +struct io_uring_sqe *ce_io_uring_get_sqe(struct io_uring *ring) { + return io_uring_get_sqe(ring); +} + +void ce_io_uring_cq_advance(struct io_uring *ring, unsigned nr) { + io_uring_cq_advance(ring, nr); +} + +void ce_io_uring_prep_cancel64(struct io_uring_sqe *sqe, __u64 user_data, + int flags) { + io_uring_prep_cancel64(sqe, user_data, flags); +} + +void ce_io_uring_prep_poll_add(struct io_uring_sqe *sqe, int fd, + unsigned int pollmask) { + io_uring_prep_poll_add(sqe, fd, pollmask); +} diff --git a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala new file mode 100644 index 0000000000..1ec9510752 --- /dev/null +++ b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala @@ -0,0 +1,450 @@ +/* + * Copyright 2020-2025 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import cats.effect.kernel.{Cont, MonadCancelThrow} +import cats.effect.std.Mutex +import cats.effect.unsafe.metrics.PollerMetrics +import cats.syntax.all._ +import cats.~> + +import org.typelevel.scalaccompat.annotation._ + +import scala.scalanative.libc.stdlib +import scala.scalanative.posix.errno._ +import scala.scalanative.posix.string._ +import scala.scalanative.runtime.Intrinsics +import scala.scalanative.unsafe._ +import scala.scalanative.unsigned._ + +import java.io.IOException +import java.util.{Collections, IdentityHashMap, Set} + +object UringSystem extends PollingSystem { + + import uringNative._ + import uringNativeOps._ + + private[this] final val MaxEvents = 64 + + type Api = Uring + + def close(): Unit = () + + def makeApi(ctx: PollingContext[Poller]): Api = + new UringApi(ctx) + + def makePoller(): Poller = { + val ring = stdlib.malloc(sizeof[io_uring]).asInstanceOf[Ptr[io_uring]] + if (ring == null) + throw new IOException(fromCString(strerror(errno))) + + val flags = IORING_SETUP_SUBMIT_ALL | + IORING_SETUP_COOP_TASKRUN | + IORING_SETUP_TASKRUN_FLAG | + IORING_SETUP_SINGLE_ISSUER | + IORING_SETUP_DEFER_TASKRUN + + val ret = io_uring_queue_init(MaxEvents.toUInt, ring, flags.toUInt) + if (ret < 0) { + stdlib.free(ring.asInstanceOf[Ptr[Byte]]) + throw new IOException(fromCString(strerror(-ret))) + } + + new Poller(ring) + } + + def closePoller(poller: Poller): Unit = poller.close() + + def poll(poller: Poller, nanos: Long): PollResult = + poller.poll(nanos) + + def processReadyEvents(poller: Poller): Boolean = + poller.processReadyEvents() + + def needsPoll(poller: Poller): Boolean = poller.needsPoll() + + def interrupt(targetThread: Thread, targetPoller: Poller): Unit = () + + def metrics(poller: Poller): PollerMetrics = poller.metrics() + + abstract class Uring private[UringSystem] () { + def call(prep: Ptr[io_uring_sqe] => Unit): IO[Int] + + def bracket(prep: Ptr[io_uring_sqe] => Unit)( + release: Int => IO[Unit] + ): Resource[IO, Int] + } + + object Uring { + def get: IO[Uring] = + IO.pollers.flatMap { + _.collectFirst { case ring: Uring => ring } + .liftTo[IO](new RuntimeException("No UringSystem installed")) + } + } + + private final class UringApi(ctx: PollingContext[Poller]) + extends Uring + with FileDescriptorPoller { + private[this] val noopRelease: Int => IO[Unit] = _ => IO.unit + + def call(prep: Ptr[io_uring_sqe] => Unit): IO[Int] = + exec(prep)(noopRelease) + + def bracket(prep: Ptr[io_uring_sqe] => Unit)( + release: Int => IO[Unit] + ): Resource[IO, Int] = + Resource.makeFull[IO, Int](poll => poll(exec(prep)(release)))(release(_)) + + private def exec(prep: Ptr[io_uring_sqe] => Unit)( + release: Int => IO[Unit] + ): IO[Int] = + IO.cont { + new Cont[IO, Int, Int] { + def apply[F[_]]( + implicit F: MonadCancelThrow[F] + ): (Either[Throwable, Int] => Unit, F[Int], IO ~> F) => F[Int] = { + (resume, get, lift) => + F.uncancelable { poll => + val submit = IO.async_[__u64] { cb => + ctx.accessPoller { poller => + val sqe = poller.getSqe(resume) + prep(sqe) + cb(Right(sqe.user_data)) + } + } + + lift(submit) + .flatMap { addr => + F.onCancel( + poll(get), + lift(cancel(addr)).ifM( + F.unit, + get.flatMap { rtn => + if (rtn < 0) + F.raiseError(new IOException(fromCString(strerror(-rtn)))) + else lift(release(rtn)) + } + ) + ) + } + .flatTap(e => F.raiseWhen(e < 0)(new IOException(fromCString(strerror(-e))))) + } + } + } + } + + private[this] def cancel(addr: __u64): IO[Boolean] = + IO.async_[Int] { cb => + ctx.accessPoller { poller => + val sqe = poller.getSqe(cb) + io_uring_prep_cancel64(sqe, addr, 0) + } + }.map(_ == 0) + + def registerFileDescriptor( + fd: Int, + reads: Boolean, + writes: Boolean + ): Resource[IO, FileDescriptorPollHandle] = + Resource.eval { + (Mutex[IO], Mutex[IO]).mapN { (readMutex, writeMutex) => + new FileDescriptorPollHandle { + def pollReadRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = + readMutex.lock.surround { + a.tailRecM { a => + f(a).flatTap { r => + if (r.isRight) IO.unit + else call(io_uring_prep_poll_add(_, fd, POLLIN.toUInt)).void + } + } + } + + def pollWriteRec[A, B](a: A)(f: A => IO[Either[A, B]]): IO[B] = + writeMutex.lock.surround { + a.tailRecM { a => + f(a).flatTap { r => + if (r.isRight) IO.unit + else call(io_uring_prep_poll_add(_, fd, POLLOUT.toUInt)).void + } + } + } + } + } + } + } + + final class Poller private[UringSystem] (ring: Ptr[io_uring]) { + private[this] var pendingSubmissions: Boolean = false + private[this] val callbacks: Set[Either[Throwable, Int] => Unit] = + Collections.newSetFromMap(new IdentityHashMap) + + private[UringSystem] def metrics(): PollerMetrics = PollerMetrics.noop + + private[UringSystem] def getSqe( + cb: Either[Throwable, Int] => Unit + ): Ptr[io_uring_sqe] = { + pendingSubmissions = true + val sqe = io_uring_get_sqe(ring) + io_uring_sqe_set_data(sqe, cb) + callbacks.add(cb) + sqe + } + + private[UringSystem] def close(): Unit = { + io_uring_queue_exit(ring) + stdlib.free(ring.asInstanceOf[Ptr[Byte]]) + } + + private[UringSystem] def needsPoll(): Boolean = + pendingSubmissions || !callbacks.isEmpty() + + private[UringSystem] def poll(nanos: Long): PollResult = { + + if (nanos == 0) { + if (pendingSubmissions) { + var rtn = io_uring_submit(ring) + while (rtn == -EBUSY) { + processReadyEvents() + rtn = io_uring_submit(ring) + } + pendingSubmissions = false + } + } else { + val timeoutSpec = + if (nanos == -1) null + else { + val ts = stackalloc[__kernel_timespec]() + ts.tv_sec = nanos / 1000000000L + ts.tv_nsec = nanos % 1000000000L + ts + } + + val cqe = stackalloc[Ptr[io_uring_cqe]]() + if (pendingSubmissions) { + var rtn = + io_uring_submit_and_wait_timeout(ring, cqe, 0.toUInt, timeoutSpec, null) + while (rtn == -EBUSY) { + processReadyEvents() + rtn = io_uring_submit(ring) + } + } else { + io_uring_wait_cqe_timeout(ring, cqe, timeoutSpec) + } + pendingSubmissions = false + } + + val cqes = stackalloc[Ptr[io_uring_cqe]](MaxEvents) + val filledCount = io_uring_peek_batch_cqe(ring, cqes, MaxEvents.toUInt).toInt + + if (filledCount > 0) { + if (filledCount < MaxEvents) PollResult.Complete else PollResult.Incomplete + } else PollResult.Interrupted + } + + private[UringSystem] def processReadyEvents(): Boolean = { + val cqes = stackalloc[Ptr[io_uring_cqe]](MaxEvents) + val filledCount = io_uring_peek_batch_cqe(ring, cqes, MaxEvents.toUInt).toInt + + var i = 0 + val ptr = cqes + while (i < filledCount) { + val cqe = !(ptr + i.toLong) + val cb = io_uring_cqe_get_data[Either[Throwable, Int] => Unit](cqe) + val res = cqe.res + cb(Right(res)) + callbacks.remove(cb) + i += 1 + } + + io_uring_cq_advance(ring, filledCount.toUInt) + filledCount > 0 + } + } + + private final val POLLIN: Int = 0x001 + private final val POLLOUT: Int = 0x004 + + @nowarn212 + @extern + private object uringNative { + + final val IORING_SETUP_SUBMIT_ALL = 1 << 7 + final val IORING_SETUP_COOP_TASKRUN = 1 << 8 + final val IORING_SETUP_TASKRUN_FLAG = 1 << 9 + final val IORING_SETUP_SINGLE_ISSUER = 1 << 12 + final val IORING_SETUP_DEFER_TASKRUN = 1 << 13 + + type __u8 = CUnsignedChar + type __u16 = CUnsignedShort + type __s32 = CInt + type __u32 = CUnsignedInt + type __u64 = CUnsignedLongLong + + type __kernel_time64_t = CLongLong + type __kernel_timespec = CStruct2[__kernel_time64_t, CLongLong] + + type io_uring = CStruct9[ + io_uring_sq, + io_uring_cq, + CUnsignedInt, + CInt, + CUnsignedInt, + CInt, + __u8, + CArray[__u8, Nat._3], + CUnsignedInt + ] + + type io_uring_cq = CStruct12[ + Ptr[CUnsignedInt], + Ptr[CUnsignedInt], + Ptr[CUnsignedInt], + Ptr[CUnsignedInt], + Ptr[CUnsignedInt], + Ptr[CUnsignedInt], + Ptr[io_uring_cqe], + CSize, + Ptr[Byte], + CUnsignedInt, + CUnsignedInt, + CArray[CUnsignedInt, Nat._2] + ] + + type io_uring_cqe = CStruct3[__u64, __s32, __u32] + + type io_uring_sq = CStruct15[ + Ptr[CUnsignedInt], + Ptr[CUnsignedInt], + Ptr[CUnsignedInt], + Ptr[CUnsignedInt], + Ptr[CUnsignedInt], + Ptr[CUnsignedInt], + Ptr[CUnsignedInt], + Ptr[io_uring_sqe], + CUnsignedInt, + CUnsignedInt, + CSize, + Ptr[Byte], + CUnsignedInt, + CUnsignedInt, + CArray[CUnsignedInt, Nat._2] + ] + + type io_uring_sqe = CStruct10[ + __u8, + __u8, + __u16, + __s32, + __u64, + __u64, + __u32, + __u32, + __u64, + CArray[__u64, Nat._3] + ] + + def io_uring_queue_init( + entries: CUnsignedInt, + ring: Ptr[io_uring], + flags: CUnsignedInt + ): CInt = extern + + def io_uring_queue_exit(ring: Ptr[io_uring]): Unit = extern + + @name("ce_io_uring_get_sqe") + def io_uring_get_sqe(ring: Ptr[io_uring]): Ptr[io_uring_sqe] = extern + + def io_uring_submit(ring: Ptr[io_uring]): CInt = extern + + def io_uring_submit_and_wait_timeout( + ring: Ptr[io_uring], + cqe_ptr: Ptr[Ptr[io_uring_cqe]], + wait_nr: CUnsignedInt, + ts: Ptr[__kernel_timespec], + sigmask: Ptr[Byte] + ): CInt = extern + + def io_uring_wait_cqe_timeout( + ring: Ptr[io_uring], + cqe_ptr: Ptr[Ptr[io_uring_cqe]], + ts: Ptr[__kernel_timespec] + ): CInt = extern + + def io_uring_peek_batch_cqe( + ring: Ptr[io_uring], + cqes: Ptr[Ptr[io_uring_cqe]], + count: CUnsignedInt + ): CUnsignedInt = extern + + @name("ce_io_uring_cq_advance") + def io_uring_cq_advance(ring: Ptr[io_uring], nr: CUnsignedInt): Unit = extern + + @name("ce_io_uring_prep_cancel64") + def io_uring_prep_cancel64( + sqe: Ptr[io_uring_sqe], + user_data: __u64, + flags: CInt + ): Unit = extern + + @name("ce_io_uring_prep_poll_add") + def io_uring_prep_poll_add( + sqe: Ptr[io_uring_sqe], + fd: CInt, + pollmask: CUnsignedInt + ): Unit = extern + } + + private object uringNativeOps { + + import uringNative._ + + def io_uring_sqe_set_data[A <: AnyRef](sqe: Ptr[io_uring_sqe], data: A): Unit = + sqe.user_data = Intrinsics + .castRawPtrToLong( + Intrinsics.castObjectToRawPtr(data) + ) + .toULong + + def io_uring_cqe_get_data[A <: AnyRef](cqe: Ptr[io_uring_cqe]): A = + Intrinsics + .castRawPtrToObject( + Intrinsics.castLongToRawPtr(cqe.user_data.toLong) + ) + .asInstanceOf[A] + + implicit final class io_uring_sqeOps(val sqe: Ptr[io_uring_sqe]) extends AnyVal { + def user_data: __u64 = sqe._9 + def user_data_=(v: __u64): Unit = !sqe.at9 = v + } + + implicit final class io_uring_cqeOps(val cqe: Ptr[io_uring_cqe]) extends AnyVal { + def user_data: __u64 = cqe._1 + def res: __s32 = cqe._2 + def flags: __u32 = cqe._3 + } + + implicit final class __kernel_timespecOps(val ts: Ptr[__kernel_timespec]) extends AnyVal { + def tv_sec: __kernel_time64_t = ts._1 + def tv_sec_=(v: __kernel_time64_t): Unit = !ts.at1 = v + def tv_nsec: CLongLong = ts._2 + def tv_nsec_=(v: CLongLong): Unit = !ts.at2 = v + } + } +} From 915d36051e4db4f19726c235789db63f6a017f20 Mon Sep 17 00:00:00 2001 From: Antonio Jimenez Date: Tue, 5 May 2026 09:39:01 +0200 Subject: [PATCH 02/21] Make UringSystem multi-threaded Adapts the Native UringSystem to the multi-poller runtime that Scala Native 0.5+ enables. Mirrors the structure of the JVM prototype from GSoC (https://github.com/armanbilge/fs2-io_uring/pull/78) --- .../cats/effect/unsafe/UringSystem.scala | 121 ++++++++++++++---- 1 file changed, 97 insertions(+), 24 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala index 1ec9510752..0855ef410f 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala @@ -28,12 +28,14 @@ import org.typelevel.scalaccompat.annotation._ import scala.scalanative.libc.stdlib import scala.scalanative.posix.errno._ import scala.scalanative.posix.string._ +import scala.scalanative.posix.unistd import scala.scalanative.runtime.Intrinsics import scala.scalanative.unsafe._ import scala.scalanative.unsigned._ import java.io.IOException import java.util.{Collections, IdentityHashMap, Set} +import java.util.concurrent.ConcurrentLinkedDeque object UringSystem extends PollingSystem { @@ -66,7 +68,15 @@ object UringSystem extends PollingSystem { throw new IOException(fromCString(strerror(-ret))) } - new Poller(ring) + val pipeFds = stackalloc[CInt](2) + if (unistd.pipe(pipeFds) != 0) { + val msg = fromCString(strerror(errno)) + io_uring_queue_exit(ring) + stdlib.free(ring.asInstanceOf[Ptr[Byte]]) + throw new IOException(msg) + } + + new Poller(ring, pipeFds(0), pipeFds(1)) } def closePoller(poller: Poller): Unit = poller.close() @@ -79,7 +89,8 @@ object UringSystem extends PollingSystem { def needsPoll(poller: Poller): Boolean = poller.needsPoll() - def interrupt(targetThread: Thread, targetPoller: Poller): Unit = () + def interrupt(targetThread: Thread, targetPoller: Poller): Unit = + targetPoller.wakeup() def metrics(poller: Poller): PollerMetrics = poller.metrics() @@ -122,27 +133,29 @@ object UringSystem extends PollingSystem { ): (Either[Throwable, Int] => Unit, F[Int], IO ~> F) => F[Int] = { (resume, get, lift) => F.uncancelable { poll => - val submit = IO.async_[__u64] { cb => + val submit = IO.async_[(__u64, Poller)] { cb => ctx.accessPoller { poller => val sqe = poller.getSqe(resume) prep(sqe) - cb(Right(sqe.user_data)) + cb(Right((sqe.user_data, poller))) } } lift(submit) - .flatMap { addr => - F.onCancel( - poll(get), - lift(cancel(addr)).ifM( - F.unit, - get.flatMap { rtn => - if (rtn < 0) - F.raiseError(new IOException(fromCString(strerror(-rtn)))) - else lift(release(rtn)) - } + .flatMap { + case (addr, submittedPoller) => + F.onCancel( + poll(get), + lift(cancel(addr, submittedPoller)).ifM( + F.unit, + // If cannot cancel, fallback to get + get.flatMap { rtn => + if (rtn < 0) + F.raiseError(new IOException(fromCString(strerror(-rtn)))) + else lift(release(rtn)) + } + ) ) - ) } .flatTap(e => F.raiseWhen(e < 0)(new IOException(fromCString(strerror(-e))))) } @@ -150,11 +163,15 @@ object UringSystem extends PollingSystem { } } - private[this] def cancel(addr: __u64): IO[Boolean] = + private[this] def cancel(addr: __u64, submittedPoller: Poller): IO[Boolean] = IO.async_[Int] { cb => - ctx.accessPoller { poller => - val sqe = poller.getSqe(cb) - io_uring_prep_cancel64(sqe, addr, 0) + ctx.accessPoller { currentPoller => + if (currentPoller eq submittedPoller) { + val sqe = currentPoller.getSqe(cb) + io_uring_prep_cancel64(sqe, addr, 0) + } else { + submittedPoller.enqueueCancelOperation(addr, cb) + } } }.map(_ == 0) @@ -190,11 +207,23 @@ object UringSystem extends PollingSystem { } } - final class Poller private[UringSystem] (ring: Ptr[io_uring]) { + final class Poller private[UringSystem] ( + ring: Ptr[io_uring], + readEnd: CInt, + writeEnd: CInt + ) { private[this] var pendingSubmissions: Boolean = false + private[this] var listeningWakeup: Boolean = false + private[this] val callbacks: Set[Either[Throwable, Int] => Unit] = Collections.newSetFromMap(new IdentityHashMap) + private[this] val cancelOperations + : ConcurrentLinkedDeque[(__u64, Either[Throwable, Int] => Unit)] = + new ConcurrentLinkedDeque + + private[this] val wakeupHandler: Either[Throwable, Int] => Unit = _ => () + private[UringSystem] def metrics(): PollerMetrics = PollerMetrics.noop private[UringSystem] def getSqe( @@ -207,16 +236,54 @@ object UringSystem extends PollingSystem { sqe } + private[UringSystem] def enqueueCancelOperation( + addr: __u64, + cb: Either[Throwable, Int] => Unit + ): Unit = { + cancelOperations.add((addr, cb)) + wakeup() + } + + private[UringSystem] def wakeup(): Unit = { + val buf = stackalloc[Byte](1) + !buf = 1.toByte + unistd.write(writeEnd, buf, sizeof[Byte]) + () + } + + private[this] def armWakeup(): Unit = { + val sqe = io_uring_get_sqe(ring) + io_uring_sqe_set_data(sqe, wakeupHandler) + io_uring_prep_poll_add(sqe, readEnd, POLLIN.toUInt) + pendingSubmissions = true + listeningWakeup = true + } + + private[this] def drainCancelQueue(): Unit = { + var op = cancelOperations.poll() + while (op ne null) { + val sqe = getSqe(op._2) + io_uring_prep_cancel64(sqe, op._1, 0) + op = cancelOperations.poll() + } + } + private[UringSystem] def close(): Unit = { io_uring_queue_exit(ring) stdlib.free(ring.asInstanceOf[Ptr[Byte]]) + unistd.close(readEnd) + unistd.close(writeEnd) + () } private[UringSystem] def needsPoll(): Boolean = - pendingSubmissions || !callbacks.isEmpty() + pendingSubmissions || !callbacks.isEmpty() || !cancelOperations.isEmpty() private[UringSystem] def poll(nanos: Long): PollResult = { + if (!listeningWakeup) armWakeup() + drainCancelQueue() + if (nanos == 0) { if (pendingSubmissions) { var rtn = io_uring_submit(ring) @@ -267,9 +334,15 @@ object UringSystem extends PollingSystem { while (i < filledCount) { val cqe = !(ptr + i.toLong) val cb = io_uring_cqe_get_data[Either[Throwable, Int] => Unit](cqe) - val res = cqe.res - cb(Right(res)) - callbacks.remove(cb) + if (cb eq wakeupHandler) { + val buf = stackalloc[Byte](1) + unistd.read(readEnd, buf, sizeof[Byte]) + listeningWakeup = false + } else { + val res = cqe.res + cb(Right(res)) + callbacks.remove(cb) + } i += 1 } From d743a886d3d3dcd2d240422f9f0a5e381d6f6c91 Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Sat, 9 May 2026 17:38:21 +0200 Subject: [PATCH 03/21] Drop SINGLE_ISSUER/DEFER_TASKRUN flags incompatbile with per-worker ring --- .../src/main/scala/cats/effect/unsafe/UringSystem.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala index 0855ef410f..09bc2ffcc3 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala @@ -58,9 +58,7 @@ object UringSystem extends PollingSystem { val flags = IORING_SETUP_SUBMIT_ALL | IORING_SETUP_COOP_TASKRUN | - IORING_SETUP_TASKRUN_FLAG | - IORING_SETUP_SINGLE_ISSUER | - IORING_SETUP_DEFER_TASKRUN + IORING_SETUP_TASKRUN_FLAG val ret = io_uring_queue_init(MaxEvents.toUInt, ring, flags.toUInt) if (ret < 0) { From 9eabab1e537771429f81ba61c69b02ccbe8dc961 Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Sat, 9 May 2026 17:39:24 +0200 Subject: [PATCH 04/21] Add @link("uring") to extern bindings --- core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala index 09bc2ffcc3..ed0d4d29d7 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala @@ -353,6 +353,7 @@ object UringSystem extends PollingSystem { private final val POLLOUT: Int = 0x004 @nowarn212 + @link("uring") @extern private object uringNative { From 1e21565b690c01c8f18f2b55d3c6bd34c8bf199a Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Sat, 9 May 2026 18:50:48 +0200 Subject: [PATCH 05/21] Set wait_nr to 1 because workers park in the polling call --- core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala index ed0d4d29d7..21ed2a83e7 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala @@ -304,7 +304,7 @@ object UringSystem extends PollingSystem { val cqe = stackalloc[Ptr[io_uring_cqe]]() if (pendingSubmissions) { var rtn = - io_uring_submit_and_wait_timeout(ring, cqe, 0.toUInt, timeoutSpec, null) + io_uring_submit_and_wait_timeout(ring, cqe, 1.toUInt, timeoutSpec, null) while (rtn == -EBUSY) { processReadyEvents() rtn = io_uring_submit(ring) From f0f78aac58f0500f6d956144f0687b9e6612e539 Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Sat, 9 May 2026 18:55:58 +0200 Subject: [PATCH 06/21] Add minimal UringSystemSuite testing startup --- .../cats/effect/unsafe/UringSystemSuite.scala | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala diff --git a/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala b/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala new file mode 100644 index 0000000000..927b92b087 --- /dev/null +++ b/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2020-2025 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import scala.scalanative.meta.LinktimeInfo + +class UringSystemSuite extends BaseSuite { + + private[this] var uringRuntime: IORuntime = _ + + override def runtime(): IORuntime = uringRuntime + + override def beforeAll(): Unit = + if (LinktimeInfo.isLinux) { + val (blocking, blockDown) = + IORuntime.createDefaultBlockingExecutionContext( + threadPrefix = s"io-blocking-${getClass.getName}") + val (compute, api, compDown) = + IORuntime.createWorkStealingComputeThreadPool( + threadPrefix = s"io-compute-${getClass.getName}", + blockerThreadPrefix = s"io-blocker-${getClass.getName}", + pollingSystem = UringSystem + ) + uringRuntime = IORuntime( + compute, + blocking, + compute, + List(api), + { () => + compDown() + blockDown() + }, + IORuntimeConfig() + ) + } + + override def afterAll(): Unit = + if (uringRuntime ne null) uringRuntime.shutdown() + + real("start the UringSystem") { + IO(assume(LinktimeInfo.isLinux, "UringSystem is only supported on Linux")) *> + UringSystem.Uring.get.map(uring => assert(uring ne null)) + } +} From 1254cfbcfbaa063bfb9036ae1b0cd0ff0970cc81 Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Sat, 9 May 2026 19:28:16 +0200 Subject: [PATCH 07/21] Compile uring.c only when liburing extern is reachable --- core/native/src/main/resources/scala-native/uring.c | 4 ++++ .../src/main/scala/cats/effect/unsafe/UringSystem.scala | 1 + 2 files changed, 5 insertions(+) diff --git a/core/native/src/main/resources/scala-native/uring.c b/core/native/src/main/resources/scala-native/uring.c index a115cb35b5..39c8926f68 100644 --- a/core/native/src/main/resources/scala-native/uring.c +++ b/core/native/src/main/resources/scala-native/uring.c @@ -1,3 +1,5 @@ +#ifdef CATS_EFFECT_URING + #include struct io_uring_sqe *ce_io_uring_get_sqe(struct io_uring *ring) { @@ -17,3 +19,5 @@ void ce_io_uring_prep_poll_add(struct io_uring_sqe *sqe, int fd, unsigned int pollmask) { io_uring_prep_poll_add(sqe, fd, pollmask); } + +#endif diff --git a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala index 21ed2a83e7..6e2d0b2ef4 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala @@ -354,6 +354,7 @@ object UringSystem extends PollingSystem { @nowarn212 @link("uring") + @define("CATS_EFFECT_URING") @extern private object uringNative { From 19990e068cd6cc8b6a23ec5bacb4896bfe01ec3a Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Sat, 9 May 2026 19:29:07 +0200 Subject: [PATCH 08/21] Rename uringNative/uringNativeOps to liburing/liburingOps --- .../main/scala/cats/effect/unsafe/UringSystem.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala index 6e2d0b2ef4..8cbafac590 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala @@ -39,8 +39,8 @@ import java.util.concurrent.ConcurrentLinkedDeque object UringSystem extends PollingSystem { - import uringNative._ - import uringNativeOps._ + import liburing._ + import liburingOps._ private[this] final val MaxEvents = 64 @@ -356,7 +356,7 @@ object UringSystem extends PollingSystem { @link("uring") @define("CATS_EFFECT_URING") @extern - private object uringNative { + private object liburing { final val IORING_SETUP_SUBMIT_ALL = 1 << 7 final val IORING_SETUP_COOP_TASKRUN = 1 << 8 @@ -484,9 +484,9 @@ object UringSystem extends PollingSystem { ): Unit = extern } - private object uringNativeOps { + private object liburingOps { - import uringNative._ + import liburing._ def io_uring_sqe_set_data[A <: AnyRef](sqe: Ptr[io_uring_sqe], data: A): Unit = sqe.user_data = Intrinsics From 3b90808d3005c0466bf7fcd38da636b6e46e6acc Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Sat, 9 May 2026 20:24:13 +0200 Subject: [PATCH 09/21] Mirror fs2-io_uring's sqe/cqe accessor and prep_rw shapes --- .../src/main/resources/scala-native/uring.c | 10 -- .../cats/effect/unsafe/UringSystem.scala | 125 ++++++++++++++---- 2 files changed, 99 insertions(+), 36 deletions(-) diff --git a/core/native/src/main/resources/scala-native/uring.c b/core/native/src/main/resources/scala-native/uring.c index 39c8926f68..866aacb732 100644 --- a/core/native/src/main/resources/scala-native/uring.c +++ b/core/native/src/main/resources/scala-native/uring.c @@ -10,14 +10,4 @@ void ce_io_uring_cq_advance(struct io_uring *ring, unsigned nr) { io_uring_cq_advance(ring, nr); } -void ce_io_uring_prep_cancel64(struct io_uring_sqe *sqe, __u64 user_data, - int flags) { - io_uring_prep_cancel64(sqe, user_data, flags); -} - -void ce_io_uring_prep_poll_add(struct io_uring_sqe *sqe, int fd, - unsigned int pollmask) { - io_uring_prep_poll_add(sqe, fd, pollmask); -} - #endif diff --git a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala index 8cbafac590..e3864353ae 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala @@ -356,7 +356,7 @@ object UringSystem extends PollingSystem { @link("uring") @define("CATS_EFFECT_URING") @extern - private object liburing { + private[unsafe] object liburing { final val IORING_SETUP_SUBMIT_ALL = 1 << 7 final val IORING_SETUP_COOP_TASKRUN = 1 << 8 @@ -364,6 +364,10 @@ object UringSystem extends PollingSystem { final val IORING_SETUP_SINGLE_ISSUER = 1 << 12 final val IORING_SETUP_DEFER_TASKRUN = 1 << 13 + final val IORING_OP_NOP = 0 + final val IORING_OP_POLL_ADD = 6 + final val IORING_OP_ASYNC_CANCEL = 14 + type __u8 = CUnsignedChar type __u16 = CUnsignedShort type __s32 = CInt @@ -373,6 +377,8 @@ object UringSystem extends PollingSystem { type __kernel_time64_t = CLongLong type __kernel_timespec = CStruct2[__kernel_time64_t, CLongLong] + type __kernel_rwf_t = CUnsignedInt + type io_uring = CStruct9[ io_uring_sq, io_uring_cq, @@ -468,54 +474,121 @@ object UringSystem extends PollingSystem { @name("ce_io_uring_cq_advance") def io_uring_cq_advance(ring: Ptr[io_uring], nr: CUnsignedInt): Unit = extern + } + + private[unsafe] object liburingOps { + + import liburing._ + + def io_uring_prep_rw( + op: Int, + sqe: Ptr[io_uring_sqe], + fd: Int, + addr: Ptr[?], + len: CUnsignedInt, + offset: __u64 + ): Unit = { + sqe.opcode = op.toUByte + sqe.flags = 0.toUByte + sqe.ioprio = 0.toUShort + sqe.fd = fd + sqe.off = offset + sqe.addr = if (addr eq null) 0.toULong else addr.toLong.toULong + sqe.len = len + sqe.rw_flags = 0.toUInt + sqe.__pad2(0) = 0.toULong + sqe.__pad2(1) = 0.toULong + sqe.__pad2(2) = 0.toULong + } + + def io_uring_prep_nop(sqe: Ptr[io_uring_sqe]): Unit = + io_uring_prep_rw(IORING_OP_NOP, sqe, -1, null, 0.toUInt, 0.toULong) - @name("ce_io_uring_prep_cancel64") def io_uring_prep_cancel64( sqe: Ptr[io_uring_sqe], user_data: __u64, flags: CInt - ): Unit = extern + ): Unit = { + io_uring_prep_rw(IORING_OP_ASYNC_CANCEL, sqe, -1, null, 0.toUInt, 0.toULong) + sqe.addr = user_data + sqe.cancel_flags = flags.toUInt + } - @name("ce_io_uring_prep_poll_add") def io_uring_prep_poll_add( sqe: Ptr[io_uring_sqe], fd: CInt, - pollmask: CUnsignedInt - ): Unit = extern - } - - private object liburingOps { - - import liburing._ + poll_mask: CUnsignedInt + ): Unit = { + io_uring_prep_rw(IORING_OP_POLL_ADD, sqe, fd, null, 0.toUInt, 0.toULong) + sqe.poll32_events = poll_mask + } def io_uring_sqe_set_data[A <: AnyRef](sqe: Ptr[io_uring_sqe], data: A): Unit = - sqe.user_data = Intrinsics - .castRawPtrToLong( - Intrinsics.castObjectToRawPtr(data) - ) - .toULong + sqe.user_data = Intrinsics.castRawPtrToLong(Intrinsics.castObjectToRawPtr(data)).toULong def io_uring_cqe_get_data[A <: AnyRef](cqe: Ptr[io_uring_cqe]): A = Intrinsics - .castRawPtrToObject( - Intrinsics.castLongToRawPtr(cqe.user_data.toLong) - ) + .castRawPtrToObject(Intrinsics.castLongToRawPtr(cqe.user_data.toLong)) .asInstanceOf[A] - implicit final class io_uring_sqeOps(val sqe: Ptr[io_uring_sqe]) extends AnyVal { - def user_data: __u64 = sqe._9 - def user_data_=(v: __u64): Unit = !sqe.at9 = v + implicit final class io_uring_sqeOps(val io_uring_sqe: Ptr[io_uring_sqe]) extends AnyVal { + def opcode: __u8 = io_uring_sqe._1 + def opcode_=(opcode: __u8): Unit = !io_uring_sqe.at1 = opcode + + def flags: __u8 = io_uring_sqe._2 + def flags_=(flags: __u8): Unit = !io_uring_sqe.at2 = flags + + def ioprio: __u16 = io_uring_sqe._3 + def ioprio_=(ioprio: __u16): Unit = !io_uring_sqe.at3 = ioprio + + def fd: __s32 = io_uring_sqe._4 + def fd_=(fd: __s32): Unit = !io_uring_sqe.at4 = fd + + def off: __u64 = io_uring_sqe._5 + def off_=(off: __u64): Unit = !io_uring_sqe.at5 = off + + def addr: __u64 = io_uring_sqe._6 + def addr_=(addr: __u64): Unit = !io_uring_sqe.at6 = addr + + def len: __u32 = io_uring_sqe._7 + def len_=(len: __u32): Unit = !io_uring_sqe.at7 = len + + def rw_flags: __kernel_rwf_t = io_uring_sqe._8 + def rw_flags_=(rw_flags: __kernel_rwf_t): Unit = !io_uring_sqe.at8 = rw_flags + + def poll32_events: __u32 = io_uring_sqe._8 + def poll32_events_=(poll32_events: __u32): Unit = !io_uring_sqe.at8 = poll32_events + + def msg_flags: __u32 = io_uring_sqe._8 + def msg_flags_=(msg_flags: __u32): Unit = !io_uring_sqe.at8 = msg_flags + + def accept_flags: __u32 = io_uring_sqe._8 + def accept_flags_=(accept_flags: __u32): Unit = !io_uring_sqe.at8 = accept_flags + + def cancel_flags: __u32 = io_uring_sqe._8 + def cancel_flags_=(cancel_flags: __u32): Unit = !io_uring_sqe.at8 = cancel_flags + + def user_data: __u64 = io_uring_sqe._9 + def user_data_=(user_data: __u64): Unit = !io_uring_sqe.at9 = user_data + + def __pad2: CArray[__u64, Nat._3] = io_uring_sqe._10 } - implicit final class io_uring_cqeOps(val cqe: Ptr[io_uring_cqe]) extends AnyVal { - def user_data: __u64 = cqe._1 - def res: __s32 = cqe._2 - def flags: __u32 = cqe._3 + implicit final class io_uring_cqeOps(val io_uring_cqe: Ptr[io_uring_cqe]) extends AnyVal { + def user_data: __u64 = io_uring_cqe._1 + def user_data_=(user_data: __u64): Unit = !io_uring_cqe.at1 = user_data + + def res: __s32 = io_uring_cqe._2 + def res_=(res: __s32): Unit = !io_uring_cqe.at2 = res + + def flags: __u32 = io_uring_cqe._3 + def flags_=(flags: __u32): Unit = !io_uring_cqe.at3 = flags } implicit final class __kernel_timespecOps(val ts: Ptr[__kernel_timespec]) extends AnyVal { def tv_sec: __kernel_time64_t = ts._1 def tv_sec_=(v: __kernel_time64_t): Unit = !ts.at1 = v + def tv_nsec: CLongLong = ts._2 def tv_nsec_=(v: CLongLong): Unit = !ts.at2 = v } From e2794add97e48123246b9a8bb5b9697d37664064 Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Sat, 9 May 2026 20:26:16 +0200 Subject: [PATCH 10/21] Add submit noop test --- .../scala/cats/effect/unsafe/UringSystemSuite.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala b/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala index 927b92b087..a961724916 100644 --- a/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala +++ b/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala @@ -17,6 +17,8 @@ package cats.effect package unsafe +import cats.effect.unsafe.UringSystem.liburingOps._ + import scala.scalanative.meta.LinktimeInfo class UringSystemSuite extends BaseSuite { @@ -56,4 +58,13 @@ class UringSystemSuite extends BaseSuite { IO(assume(LinktimeInfo.isLinux, "UringSystem is only supported on Linux")) *> UringSystem.Uring.get.map(uring => assert(uring ne null)) } + + real("submit a nop SQE and resume on completion") { + IO(assume(LinktimeInfo.isLinux, "UringSystem is only supported on Linux")) *> + UringSystem + .Uring + .get + .flatMap { uring => uring.call(io_uring_prep_nop) } + .map(rtn => assertEquals(rtn, 0)) + } } From 7e9f8a6a074a2b850c450e15abeb9e52a4c759af Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Sat, 9 May 2026 23:19:13 +0200 Subject: [PATCH 11/21] Add cancel a pending poll_add test --- .../cats/effect/unsafe/UringSystemSuite.scala | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala b/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala index a961724916..836e048e8b 100644 --- a/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala +++ b/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala @@ -20,6 +20,13 @@ package unsafe import cats.effect.unsafe.UringSystem.liburingOps._ import scala.scalanative.meta.LinktimeInfo +import scala.scalanative.posix.errno._ +import scala.scalanative.posix.fcntl._ +import scala.scalanative.posix.string._ +import scala.scalanative.posix.unistd +import scala.scalanative.unsafe._ + +import java.io.IOException class UringSystemSuite extends BaseSuite { @@ -67,4 +74,54 @@ class UringSystemSuite extends BaseSuite { .flatMap { uring => uring.call(io_uring_prep_nop) } .map(rtn => assertEquals(rtn, 0)) } + + real("cancel a pending poll_add") { + IO(assume(LinktimeInfo.isLinux, "UringSystem is only supported on Linux")) *> + pipeHandle.use { + case (readFd, _) => + Resource + .eval(FileDescriptorPoller.get) + .flatMap(_.registerFileDescriptor(readFd, true, false)) + .use { handle => + Deferred[IO, Unit].flatMap { entered => + val read = + handle.pollReadRec(()) { _ => entered.complete(()).void *> IO(Left(())) } + read.start.flatMap { fiber => + entered.get *> fiber.cancel *> fiber.join.map(oc => assert(oc.isCanceled)) + } + } + } + } + } + + ///////////////////////////////////////////////////////////////// + // Helpers + ///////////////////////////////////////////////////////////////// + + private def pipeHandle: Resource[IO, (Int, Int)] = + Resource + .make { + IO { + val fd = stackalloc[CInt](2) + if (unistd.pipe(fd) != 0) + throw new IOException(fromCString(strerror(errno))) + (fd(0), fd(1)) + } + } { + case (readFd, writeFd) => + IO { + unistd.close(readFd) + unistd.close(writeFd) + () + } + } + .evalTap { + case (readFd, writeFd) => + IO { + if (fcntl(readFd, F_SETFL, O_NONBLOCK) != 0) + throw new IOException(fromCString(strerror(errno))) + if (fcntl(writeFd, F_SETFL, O_NONBLOCK) != 0) + throw new IOException(fromCString(strerror(errno))) + } + } } From cb863f7bdc560ef3e0751b26c255685e4e5d99b4 Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Sat, 9 May 2026 23:20:17 +0200 Subject: [PATCH 12/21] Use integer ids instead of object pointers in io_uring user_data --- .../cats/effect/unsafe/UringSystem.scala | 66 ++++++++++++------- 1 file changed, 43 insertions(+), 23 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala index e3864353ae..30f6effab3 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala @@ -25,16 +25,16 @@ import cats.~> import org.typelevel.scalaccompat.annotation._ +import scala.collection.mutable.{ArrayBuffer, LongMap} import scala.scalanative.libc.stdlib import scala.scalanative.posix.errno._ import scala.scalanative.posix.string._ import scala.scalanative.posix.unistd -import scala.scalanative.runtime.Intrinsics import scala.scalanative.unsafe._ import scala.scalanative.unsigned._ import java.io.IOException -import java.util.{Collections, IdentityHashMap, Set} +import java.util.BitSet import java.util.concurrent.ConcurrentLinkedDeque object UringSystem extends PollingSystem { @@ -213,24 +213,37 @@ object UringSystem extends PollingSystem { private[this] var pendingSubmissions: Boolean = false private[this] var listeningWakeup: Boolean = false - private[this] val callbacks: Set[Either[Throwable, Int] => Unit] = - Collections.newSetFromMap(new IdentityHashMap) + private[this] val ids: BitSet = { + val bs = new BitSet(Short.MaxValue) + bs.set(0) // reserve id 0 for the wakeup poll_add + bs + } + + private[this] val callbacks: LongMap[Either[Throwable, Int] => Unit] = + LongMap.empty private[this] val cancelOperations : ConcurrentLinkedDeque[(__u64, Either[Throwable, Int] => Unit)] = new ConcurrentLinkedDeque - private[this] val wakeupHandler: Either[Throwable, Int] => Unit = _ => () - private[UringSystem] def metrics(): PollerMetrics = PollerMetrics.noop + private[this] def nextId(): Long = { + val id = ids.nextClearBit(1) + if (id >= Short.MaxValue) + throw new IOException("io_uring callback id space exhausted") + ids.set(id) + id.toLong + } + private[UringSystem] def getSqe( cb: Either[Throwable, Int] => Unit ): Ptr[io_uring_sqe] = { pendingSubmissions = true val sqe = io_uring_get_sqe(ring) - io_uring_sqe_set_data(sqe, cb) - callbacks.add(cb) + val id = nextId() + callbacks.put(id, cb) + sqe.user_data = id.toULong sqe } @@ -251,7 +264,7 @@ object UringSystem extends PollingSystem { private[this] def armWakeup(): Unit = { val sqe = io_uring_get_sqe(ring) - io_uring_sqe_set_data(sqe, wakeupHandler) + sqe.user_data = 0L.toULong io_uring_prep_poll_add(sqe, readEnd, POLLIN.toUInt) pendingSubmissions = true listeningWakeup = true @@ -275,7 +288,7 @@ object UringSystem extends PollingSystem { } private[UringSystem] def needsPoll(): Boolean = - pendingSubmissions || !callbacks.isEmpty() || !cancelOperations.isEmpty() + pendingSubmissions || callbacks.nonEmpty || !cancelOperations.isEmpty() private[UringSystem] def poll(nanos: Long): PollResult = { @@ -327,24 +340,39 @@ object UringSystem extends PollingSystem { val cqes = stackalloc[Ptr[io_uring_cqe]](MaxEvents) val filledCount = io_uring_peek_batch_cqe(ring, cqes, MaxEvents.toUInt).toInt + val toInvoke = + new ArrayBuffer[(Either[Throwable, Int] => Unit, Int)](filledCount) + var i = 0 val ptr = cqes while (i < filledCount) { val cqe = !(ptr + i.toLong) - val cb = io_uring_cqe_get_data[Either[Throwable, Int] => Unit](cqe) - if (cb eq wakeupHandler) { + val id = cqe.user_data.toLong + if (id == 0L) { + // This is the wakeup event, we just need to drain the pipe and re-arm the wakeup val buf = stackalloc[Byte](1) unistd.read(readEnd, buf, sizeof[Byte]) listeningWakeup = false } else { - val res = cqe.res - cb(Right(res)) - callbacks.remove(cb) + // Normal event, look up the callback and schedule it for invocation + val cb = callbacks.remove(id) + ids.clear(id.toInt) + if (cb.isDefined) toInvoke += ((cb.get, cqe.res)) } i += 1 } io_uring_cq_advance(ring, filledCount.toUInt) + + // Invoke callbacks + var j = 0 + val n = toInvoke.size + while (j < n) { + val pair = toInvoke(j) + pair._1(Right(pair._2)) + j += 1 + } + filledCount > 0 } } @@ -523,14 +551,6 @@ object UringSystem extends PollingSystem { sqe.poll32_events = poll_mask } - def io_uring_sqe_set_data[A <: AnyRef](sqe: Ptr[io_uring_sqe], data: A): Unit = - sqe.user_data = Intrinsics.castRawPtrToLong(Intrinsics.castObjectToRawPtr(data)).toULong - - def io_uring_cqe_get_data[A <: AnyRef](cqe: Ptr[io_uring_cqe]): A = - Intrinsics - .castRawPtrToObject(Intrinsics.castLongToRawPtr(cqe.user_data.toLong)) - .asInstanceOf[A] - implicit final class io_uring_sqeOps(val io_uring_sqe: Ptr[io_uring_sqe]) extends AnyVal { def opcode: __u8 = io_uring_sqe._1 def opcode_=(opcode: __u8): Unit = !io_uring_sqe.at1 = opcode From 2160c7b7fcd73ac3e28d4b272366fa9fd0c8aae5 Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Sun, 10 May 2026 01:06:29 +0200 Subject: [PATCH 13/21] Add submit nop SQEs in parallel and resume on completion test --- .../cats/effect/unsafe/UringSystemSuite.scala | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala b/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala index 836e048e8b..e3ccd93616 100644 --- a/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala +++ b/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala @@ -75,6 +75,26 @@ class UringSystemSuite extends BaseSuite { .map(rtn => assertEquals(rtn, 0)) } + // TODO: Test is failing when run with concurrent fibers. + // Error 1: Unrecoverable NullPointerException in user thread + // Error 2: Unhandled signal 11, si_addr=0x20, si_addr=0x100000080, si_addr=0x71558e7388 + // Error 3: + /* + [cats-effect-tests-test:548690] [ScalaNative GC|Warning] Thread id=134175356978880, stackBottom=0x7a08227d7e08, state=Managed, alive=yes + [cats-effect-tests-test:548690] [ScalaNative GC|Warning] Possible causes: + - Thread blocked in native code without @blocking annotation + - Thread crashed without cleanup + - Infinite loop in native code + - Deadlock with resource held by waiting thread + */ + real("submit nop SQEs in parallel and resume on completion") { + IO(assume(LinktimeInfo.isLinux, "UringSystem is only supported on Linux")) *> + UringSystem.Uring.get.flatMap { uring => + val op = uring.call(io_uring_prep_nop).map(rtn => assertEquals(rtn, 0)) + op.replicateA_(200).parReplicateA_(8) + } + } + real("cancel a pending poll_add") { IO(assume(LinktimeInfo.isLinux, "UringSystem is only supported on Linux")) *> pipeHandle.use { From c2bcd0d1ed3a16b84e4400ac2dbd6340ce266020 Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Mon, 11 May 2026 00:58:23 +0200 Subject: [PATCH 14/21] Add blocking annotation --- core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala index 30f6effab3..016d72f41a 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala @@ -480,6 +480,7 @@ object UringSystem extends PollingSystem { def io_uring_submit(ring: Ptr[io_uring]): CInt = extern + @blocking def io_uring_submit_and_wait_timeout( ring: Ptr[io_uring], cqe_ptr: Ptr[Ptr[io_uring_cqe]], @@ -488,6 +489,7 @@ object UringSystem extends PollingSystem { sigmask: Ptr[Byte] ): CInt = extern + @blocking def io_uring_wait_cqe_timeout( ring: Ptr[io_uring], cqe_ptr: Ptr[Ptr[io_uring_cqe]], From c0cc17c1e00734f053de4bfb466960be2b1be7cb Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Mon, 11 May 2026 00:59:25 +0200 Subject: [PATCH 15/21] Handle callbacks in one pass --- .../cats/effect/unsafe/UringSystem.scala | 21 +++++-------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala index 016d72f41a..5e19f43463 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala @@ -25,7 +25,7 @@ import cats.~> import org.typelevel.scalaccompat.annotation._ -import scala.collection.mutable.{ArrayBuffer, LongMap} +import scala.collection.mutable.LongMap import scala.scalanative.libc.stdlib import scala.scalanative.posix.errno._ import scala.scalanative.posix.string._ @@ -328,8 +328,7 @@ object UringSystem extends PollingSystem { pendingSubmissions = false } - val cqes = stackalloc[Ptr[io_uring_cqe]](MaxEvents) - val filledCount = io_uring_peek_batch_cqe(ring, cqes, MaxEvents.toUInt).toInt + val filledCount = io_uring_peek_batch_cqe(ring, cqesPtr, MaxEvents.toUInt).toInt if (filledCount > 0) { if (filledCount < MaxEvents) PollResult.Complete else PollResult.Incomplete @@ -337,7 +336,7 @@ object UringSystem extends PollingSystem { } private[UringSystem] def processReadyEvents(): Boolean = { - val cqes = stackalloc[Ptr[io_uring_cqe]](MaxEvents) + val cqes = cqesPtr val filledCount = io_uring_peek_batch_cqe(ring, cqes, MaxEvents.toUInt).toInt val toInvoke = @@ -346,7 +345,7 @@ object UringSystem extends PollingSystem { var i = 0 val ptr = cqes while (i < filledCount) { - val cqe = !(ptr + i.toLong) + val cqe = !(cqes + i.toLong) val id = cqe.user_data.toLong if (id == 0L) { // This is the wakeup event, we just need to drain the pipe and re-arm the wakeup @@ -357,22 +356,12 @@ object UringSystem extends PollingSystem { // Normal event, look up the callback and schedule it for invocation val cb = callbacks.remove(id) ids.clear(id.toInt) - if (cb.isDefined) toInvoke += ((cb.get, cqe.res)) + if (cb.isDefined) cb.get(Right(cqe.res)) } i += 1 } io_uring_cq_advance(ring, filledCount.toUInt) - - // Invoke callbacks - var j = 0 - val n = toInvoke.size - while (j < n) { - val pair = toInvoke(j) - pair._1(Right(pair._2)) - j += 1 - } - filledCount > 0 } } From 2e69aa4312f87f53849ad7e369f542b9a69f9c35 Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Mon, 11 May 2026 00:59:55 +0200 Subject: [PATCH 16/21] Remove unused Array and comments --- .../src/main/scala/cats/effect/unsafe/UringSystem.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala index 5e19f43463..a0e44aa616 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala @@ -339,21 +339,15 @@ object UringSystem extends PollingSystem { val cqes = cqesPtr val filledCount = io_uring_peek_batch_cqe(ring, cqes, MaxEvents.toUInt).toInt - val toInvoke = - new ArrayBuffer[(Either[Throwable, Int] => Unit, Int)](filledCount) - var i = 0 - val ptr = cqes while (i < filledCount) { val cqe = !(cqes + i.toLong) val id = cqe.user_data.toLong if (id == 0L) { - // This is the wakeup event, we just need to drain the pipe and re-arm the wakeup val buf = stackalloc[Byte](1) unistd.read(readEnd, buf, sizeof[Byte]) listeningWakeup = false } else { - // Normal event, look up the callback and schedule it for invocation val cb = callbacks.remove(id) ids.clear(id.toInt) if (cb.isDefined) cb.get(Right(cqe.res)) From 3af960c8f871ff6c72f314b409f642e966bd1b17 Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Mon, 11 May 2026 01:01:31 +0200 Subject: [PATCH 17/21] Heap-allocate cqes pointer buffer mirroring EpollSystem.eventsArray --- .../src/main/scala/cats/effect/unsafe/UringSystem.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala index a0e44aa616..5d68b16b52 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala @@ -226,6 +226,10 @@ object UringSystem extends PollingSystem { : ConcurrentLinkedDeque[(__u64, Either[Throwable, Int] => Unit)] = new ConcurrentLinkedDeque + private[this] val cqesArray: Array[Byte] = new Array[Byte](MaxEvents * 8) + @inline private[this] def cqesPtr: Ptr[Ptr[io_uring_cqe]] = + cqesArray.atUnsafe(0).asInstanceOf[Ptr[Ptr[io_uring_cqe]]] + private[UringSystem] def metrics(): PollerMetrics = PollerMetrics.noop private[this] def nextId(): Long = { From 9f358a1f32e06346327ca189891a6262e212f225 Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Mon, 11 May 2026 01:02:32 +0200 Subject: [PATCH 18/21] remove todo after fixing the problem --- .../cats/effect/unsafe/UringSystemSuite.scala | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala b/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala index e3ccd93616..9f8d9adca5 100644 --- a/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala +++ b/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala @@ -75,18 +75,14 @@ class UringSystemSuite extends BaseSuite { .map(rtn => assertEquals(rtn, 0)) } - // TODO: Test is failing when run with concurrent fibers. - // Error 1: Unrecoverable NullPointerException in user thread - // Error 2: Unhandled signal 11, si_addr=0x20, si_addr=0x100000080, si_addr=0x71558e7388 - // Error 3: - /* - [cats-effect-tests-test:548690] [ScalaNative GC|Warning] Thread id=134175356978880, stackBottom=0x7a08227d7e08, state=Managed, alive=yes - [cats-effect-tests-test:548690] [ScalaNative GC|Warning] Possible causes: - - Thread blocked in native code without @blocking annotation - - Thread crashed without cleanup - - Infinite loop in native code - - Deadlock with resource held by waiting thread - */ + real("submit a nop SQE and resume on completion many times in a row") { + IO(assume(LinktimeInfo.isLinux, "UringSystem is only supported on Linux")) *> + UringSystem.Uring.get.flatMap { uring => + val op = uring.call(io_uring_prep_nop).map(rtn => assertEquals(rtn, 0)) + op.replicateA_(20) + } + } + real("submit nop SQEs in parallel and resume on completion") { IO(assume(LinktimeInfo.isLinux, "UringSystem is only supported on Linux")) *> UringSystem.Uring.get.flatMap { uring => From ef71039bf42fad3770a2f160aa4b25d142e306bb Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Mon, 11 May 2026 01:03:08 +0200 Subject: [PATCH 19/21] Add submit nop SQEs in parallel and resume on completion many times in a row test --- .../scala/cats/effect/unsafe/UringSystemSuite.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala b/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala index 9f8d9adca5..7470b57a68 100644 --- a/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala +++ b/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala @@ -87,7 +87,15 @@ class UringSystemSuite extends BaseSuite { IO(assume(LinktimeInfo.isLinux, "UringSystem is only supported on Linux")) *> UringSystem.Uring.get.flatMap { uring => val op = uring.call(io_uring_prep_nop).map(rtn => assertEquals(rtn, 0)) - op.replicateA_(200).parReplicateA_(8) + op.parReplicateA_(10) + } + } + + real("submit nop SQEs in parallel and resume on completion many times in a row") { + IO(assume(LinktimeInfo.isLinux, "UringSystem is only supported on Linux")) *> + UringSystem.Uring.get.flatMap { uring => + val op = uring.call(io_uring_prep_nop).map(rtn => assertEquals(rtn, 0)) + op.replicateA_(20).parReplicateA_(10) } } From 3491436d28eea20b7e9f23cb42c183e403943e99 Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Mon, 11 May 2026 01:06:05 +0200 Subject: [PATCH 20/21] formatting --- .../cats/effect/unsafe/UringSystemSuite.scala | 113 +++++++++--------- 1 file changed, 54 insertions(+), 59 deletions(-) diff --git a/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala b/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala index 7470b57a68..9d3c602399 100644 --- a/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala +++ b/tests/native/src/test/scala/cats/effect/unsafe/UringSystemSuite.scala @@ -30,92 +30,87 @@ import java.io.IOException class UringSystemSuite extends BaseSuite { + override def munitIgnore: Boolean = !LinktimeInfo.isLinux + private[this] var uringRuntime: IORuntime = _ override def runtime(): IORuntime = uringRuntime - override def beforeAll(): Unit = - if (LinktimeInfo.isLinux) { - val (blocking, blockDown) = - IORuntime.createDefaultBlockingExecutionContext( - threadPrefix = s"io-blocking-${getClass.getName}") - val (compute, api, compDown) = - IORuntime.createWorkStealingComputeThreadPool( - threadPrefix = s"io-compute-${getClass.getName}", - blockerThreadPrefix = s"io-blocker-${getClass.getName}", - pollingSystem = UringSystem - ) - uringRuntime = IORuntime( - compute, - blocking, - compute, - List(api), - { () => - compDown() - blockDown() - }, - IORuntimeConfig() + override def beforeAll(): Unit = { + val (blocking, blockDown) = + IORuntime.createDefaultBlockingExecutionContext( + threadPrefix = s"io-blocking-${getClass.getName}") + val (compute, api, compDown) = + IORuntime.createWorkStealingComputeThreadPool( + threadPrefix = s"io-compute-${getClass.getName}", + blockerThreadPrefix = s"io-blocker-${getClass.getName}", + pollingSystem = UringSystem ) - } + uringRuntime = IORuntime( + compute, + blocking, + compute, + List(api), + { () => + compDown() + blockDown() + }, + IORuntimeConfig() + ) + } override def afterAll(): Unit = if (uringRuntime ne null) uringRuntime.shutdown() real("start the UringSystem") { - IO(assume(LinktimeInfo.isLinux, "UringSystem is only supported on Linux")) *> - UringSystem.Uring.get.map(uring => assert(uring ne null)) + UringSystem.Uring.get.map(uring => assert(uring ne null)) } real("submit a nop SQE and resume on completion") { - IO(assume(LinktimeInfo.isLinux, "UringSystem is only supported on Linux")) *> - UringSystem - .Uring - .get - .flatMap { uring => uring.call(io_uring_prep_nop) } - .map(rtn => assertEquals(rtn, 0)) + UringSystem + .Uring + .get + .flatMap { uring => uring.call(io_uring_prep_nop) } + .map(rtn => assertEquals(rtn, 0)) } real("submit a nop SQE and resume on completion many times in a row") { - IO(assume(LinktimeInfo.isLinux, "UringSystem is only supported on Linux")) *> - UringSystem.Uring.get.flatMap { uring => - val op = uring.call(io_uring_prep_nop).map(rtn => assertEquals(rtn, 0)) - op.replicateA_(20) - } + UringSystem.Uring.get.flatMap { uring => + val op = uring.call(io_uring_prep_nop).map(rtn => assertEquals(rtn, 0)) + op.replicateA_(20) + } } real("submit nop SQEs in parallel and resume on completion") { - IO(assume(LinktimeInfo.isLinux, "UringSystem is only supported on Linux")) *> - UringSystem.Uring.get.flatMap { uring => - val op = uring.call(io_uring_prep_nop).map(rtn => assertEquals(rtn, 0)) - op.parReplicateA_(10) - } + UringSystem.Uring.get.flatMap { uring => + val op = uring.call(io_uring_prep_nop).map(rtn => assertEquals(rtn, 0)) + op.parReplicateA_(10) + } } real("submit nop SQEs in parallel and resume on completion many times in a row") { - IO(assume(LinktimeInfo.isLinux, "UringSystem is only supported on Linux")) *> - UringSystem.Uring.get.flatMap { uring => - val op = uring.call(io_uring_prep_nop).map(rtn => assertEquals(rtn, 0)) - op.replicateA_(20).parReplicateA_(10) - } + UringSystem.Uring.get.flatMap { uring => + val op = uring.call(io_uring_prep_nop).map(rtn => assertEquals(rtn, 0)) + op.replicateA_(20).parReplicateA_(10) + } } real("cancel a pending poll_add") { - IO(assume(LinktimeInfo.isLinux, "UringSystem is only supported on Linux")) *> - pipeHandle.use { - case (readFd, _) => - Resource - .eval(FileDescriptorPoller.get) - .flatMap(_.registerFileDescriptor(readFd, true, false)) - .use { handle => - Deferred[IO, Unit].flatMap { entered => - val read = - handle.pollReadRec(()) { _ => entered.complete(()).void *> IO(Left(())) } - read.start.flatMap { fiber => - entered.get *> fiber.cancel *> fiber.join.map(oc => assert(oc.isCanceled)) - } + pipeHandle.use { + case (readFd, _) => + Resource + .eval(FileDescriptorPoller.get) + .flatMap(_.registerFileDescriptor(readFd, true, false)) + .use { handle => + Deferred[IO, Unit].flatMap { entered => + val read = + handle.pollReadRec(()) { _ => entered.complete(()).void *> IO(Left(())) } + read.start.flatMap { fiber => + entered.get *> fiber.cancel *> fiber.join.map(oc => assert(oc.isCanceled)) } } - } + } + } } ///////////////////////////////////////////////////////////////// From b58e9071dd834656603396088a0ba4d78075a7af Mon Sep 17 00:00:00 2001 From: "antonio.jimenez" Date: Mon, 11 May 2026 01:17:10 +0200 Subject: [PATCH 21/21] Replace MaxEvents literal with PollingTicks constant --- core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala index 5d68b16b52..204c11f766 100644 --- a/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala +++ b/core/native/src/main/scala/cats/effect/unsafe/UringSystem.scala @@ -42,7 +42,7 @@ object UringSystem extends PollingSystem { import liburing._ import liburingOps._ - private[this] final val MaxEvents = 64 + private[this] final val MaxEvents = WorkStealingThreadPoolConstants.PollingTicks type Api = Uring