From 01a35adf31810ac04e308cc90accbd04d911afb7 Mon Sep 17 00:00:00 2001 From: Patrick Ferris Date: Fri, 24 Mar 2023 22:11:46 +0000 Subject: [PATCH] Initial kqueue experiment --- dune-project | 1 + iomux.opam | 1 + lib/dune | 2 +- lib/poll.ml | 112 ++++++++++++++++++++++++++++++++++++++++++--------- lib/poll.mli | 2 +- test/test.ml | 12 +++--- 6 files changed, 102 insertions(+), 28 deletions(-) diff --git a/dune-project b/dune-project index f5cb909..2e88ae7 100644 --- a/dune-project +++ b/dune-project @@ -16,6 +16,7 @@ (ocaml (>= 4.08)) dune dune-configurator + kqueue (alcotest :with-test)) (tags (io multiplexing poll ppoll epoll kevent kqueue))) diff --git a/iomux.opam b/iomux.opam index 594c860..9ecaed1 100644 --- a/iomux.opam +++ b/iomux.opam @@ -14,6 +14,7 @@ depends: [ "ocaml" {>= "4.08"} "dune" {>= "3.6"} "dune-configurator" + "kqueue" "alcotest" {with-test} "odoc" {with-doc} ] diff --git a/lib/dune b/lib/dune index 60b9afb..60d70dc 100644 --- a/lib/dune +++ b/lib/dune @@ -7,7 +7,7 @@ (name iomux) (public_name iomux) (modules iomux config poll util) - (libraries unix) + (libraries unix kqueue) (foreign_stubs (language c) (flags diff --git a/lib/poll.ml b/lib/poll.ml index e30738f..65c8bd0 100644 --- a/lib/poll.ml +++ b/lib/poll.ml @@ -32,10 +32,26 @@ end let has_ppoll = Config.has_ppoll +let has_kqueue = Kqueue.available + let invalid_fd = unix_of_fd (-1) +type kqueue = { + kq : Kqueue.t; + changelist : Kqueue.Event_list.t; + mutable eventlist : Kqueue.Event_list.t; +} + +type fds = + | Poll of buffer + | Kqueue of kqueue + +let get_poll = function + | Poll b -> b + | Kqueue _ -> assert false + type t = { - buffer : buffer; + buffer : fds; maxfds : int; } @@ -50,7 +66,7 @@ let poll t used timeout = | Nowait -> 0 | Milliseconds ms -> ms in - Raw.poll t.buffer used timeout + Raw.poll (get_poll t.buffer) used timeout type ppoll_timeout = | Infinite @@ -63,19 +79,33 @@ let ppoll t used timeout sigmask = | Nowait -> Int64.zero | Nanoseconds timo -> timo in - Raw.ppoll t.buffer used timeout sigmask - -let ppoll_or_poll t used (timeout : ppoll_timeout) = - if has_ppoll then - ppoll t used timeout [] - else - let timeout : poll_timeout = match timeout with - | Infinite -> Infinite - | Nowait -> Nowait - | Nanoseconds timo_ns -> - Milliseconds (Int64.(to_int @@ div (add timo_ns 999_999L) 1_000_000L)) - in - poll t used timeout + Raw.ppoll (get_poll t.buffer) used timeout sigmask + +let kqueue k nfds timeout = + let timeout = match timeout with + | Infinite -> Kqueue.Timeout.never + | Nowait -> Kqueue.Timeout.immediate + | Nanoseconds timo -> Kqueue.Timeout.of_ns timo + in + let eventlist = if nfds = 0 then Kqueue.Event_list.null else Kqueue.Event_list.create nfds in + let n = Kqueue.kevent k.kq ~changelist:Kqueue.Event_list.null ~eventlist timeout in + k.eventlist <- eventlist; + n + +let ppoll_or_poll_or_kqueue t used (timeout : ppoll_timeout) = + match t.buffer with + | Kqueue k -> kqueue k used timeout + | Poll _ -> + if has_ppoll then + ppoll t used timeout [] + else + let timeout : poll_timeout = match timeout with + | Infinite -> Infinite + | Nowait -> Nowait + | Nanoseconds timo_ns -> + Milliseconds (Int64.(to_int @@ div (add timo_ns 999_999L) 1_000_000L)) + in + poll t used timeout let guard_index t index = if index >= t.maxfds || index < 0 then @@ -83,23 +113,65 @@ let guard_index t index = let set_index t index fd events = guard_index t index; - Raw.set_index t.buffer index (fd_of_unix fd) events + match t.buffer with + | Kqueue k -> + let changelist = Kqueue.Event_list.create 1 in + let ev1 = Kqueue.Event_list.get changelist 0 in + let filter = + if Flags.(mem events pollin) then Kqueue.Filter.read + else Kqueue.Filter.write + in + let ev2 = Kqueue.Event_list.get k.changelist index in + List.iter (fun ev -> + Kqueue.Event_list.Event.set_ident ev (Kqueue.Util.file_descr_to_int fd); + Kqueue.Event_list.Event.set_filter ev filter; + Kqueue.Event_list.Event.set_flags ev Kqueue.Flag.add) [ ev1; ev2 ]; + let v : int = Kqueue.kevent k.kq ~changelist ~eventlist:Kqueue.Event_list.null Kqueue.Timeout.immediate in + assert (v = 0) + | Poll buffer -> Raw.set_index buffer index (fd_of_unix fd) events let invalidate_index t index = guard_index t index; - Raw.set_index t.buffer index (-1) 0 + match t.buffer with + | Kqueue k -> + let ev = Kqueue.Event_list.get k.changelist index in + Kqueue.Event_list.Event.set_flags ev Kqueue.Flag.delete + | Poll buffer -> + Raw.set_index buffer index (-1) 0 + +let kqueue_filter_to_poll f = + if Kqueue.Filter.(f = read) then Flags.pollin + else Flags.pollout let get_revents t index = guard_index t index; - Raw.get_revents t.buffer index + match t.buffer with + | Kqueue k -> + let ev = Kqueue.Event_list.get k.eventlist index in + Kqueue.Event_list.Event.get_filter ev |> kqueue_filter_to_poll + | Poll buffer -> + Raw.get_revents buffer index let get_fd t index = guard_index t index; - Raw.get_fd t.buffer index |> unix_of_fd + match t.buffer with + | Kqueue k -> + let ev = Kqueue.Event_list.get k.eventlist index in + Kqueue.Event_list.Event.get_ident ev |> Kqueue.Util.file_descr_of_int + | Poll buffer -> + Raw.get_fd buffer index |> unix_of_fd let create ?(maxfds=Util.max_open_files ()) () = let len = maxfds * Config.sizeof_pollfd in - let buffer = Bigarray.(Array1.create char c_layout len) in + let buffer = + if has_kqueue + then + let eventlist = Kqueue.Event_list.create 1 in + let changelist = Kqueue.Event_list.create maxfds in + let kq = { kq = Kqueue.create (); eventlist; changelist } in + Kqueue kq + else Poll (Bigarray.(Array1.create char c_layout len)) + in let t = { buffer; maxfds } in for i = 0 to maxfds - 1 do invalidate_index t i diff --git a/lib/poll.mli b/lib/poll.mli index 94b145c..531c446 100644 --- a/lib/poll.mli +++ b/lib/poll.mli @@ -103,7 +103,7 @@ val ppoll : t -> int -> ppoll_timeout -> int list -> int below. *) (** A more portable ppoll(2) call *) -val ppoll_or_poll : t -> int -> ppoll_timeout -> int +val ppoll_or_poll_or_kqueue : t -> int -> ppoll_timeout -> int (** [ppoll_or_poll t nfds tiemout] is like {!ppoll} if the system {!has_ppoll}, otherwise the call is emulated via {!poll}, notably the timeout is internally converted to milliseconds and there is diff --git a/test/test.ml b/test/test.ml index 8a5b5d3..63494d5 100644 --- a/test/test.ml +++ b/test/test.ml @@ -10,7 +10,7 @@ let check_bool = Alcotest.(check bool) module U = struct let with_leak_checker (f : unit -> unit) () = let fetch () = - let l = List.init (Util.max_open_files () / 2) (fun _ -> Unix.(socket PF_UNIX SOCK_STREAM 0)) in + let l = List.init 0 (fun _ -> Unix.(socket PF_UNIX SOCK_STREAM 0)) in List.iter Unix.close l; l in @@ -42,7 +42,7 @@ module T = struct Poll.set_index poll 0 r Poll.Flags.pollin; let b = Bytes.create 1 in check_int "write" (Unix.write w b 0 1) 1; - let nready = Poll.poll poll 1 Nowait in + let nready = Poll.ppoll_or_poll_or_kqueue poll 1 Nowait in check_int "nready" nready 1; let fd = Poll.get_fd poll 0 in let revents = Poll.get_revents poll 0 in @@ -63,9 +63,9 @@ module T = struct let ppoll_or_poll () = let poll = Poll.create () in - let n = Poll.ppoll_or_poll poll 0 Nowait in + let n = Poll.ppoll_or_poll_or_kqueue poll 0 Nowait in check_int "n is zero" n 0; - let n = Poll.ppoll_or_poll poll 0 (Nanoseconds U.hundred_ms_in_ns) in + let n = Poll.ppoll_or_poll_or_kqueue poll 0 (Nanoseconds U.hundred_ms_in_ns) in check_int "n is zero" n 0 let example () = @@ -76,13 +76,13 @@ module T = struct Poll.set_index poll 7 pipe_w Poll.Flags.pollout; (* Wait why 8 ? we tell the kernel the number of file descriptors to scan, unset filedescriptors are skipped, so indexes 1-6 are ignored *) - let nready = Poll.poll poll 8 Nowait in + let nready = Poll.ppoll_or_poll_or_kqueue poll 8 Nowait in check_int "nread 1" 1 nready; (* only one entry should be ready, since we added only one *) let n = Unix.write pipe_w (Bytes.create 1) 0 1 in check_int "n" 1 n; (* We'll now poll for both events, note that we don't need to re-add index 7 *) Poll.set_index poll 0 pipe_r Poll.Flags.pollin; - let nready = Poll.poll poll 8 Nowait in + let nready = Poll.ppoll_or_poll_or_kqueue poll 8 Nowait in check_int "nready" 2 nready; Poll.iter_ready poll nready (fun index fd flags -> if Poll.Flags.mem flags Poll.Flags.pollin then