From 5f200a37f246d77e0fe4e625fd156554bee40207 Mon Sep 17 00:00:00 2001 From: Danila Poyarkov Date: Tue, 31 Mar 2026 22:28:52 +0300 Subject: [PATCH 1/2] Add Mint-backed WebSocket support --- README.md | 2 +- docs/architecture.md | 2 +- docs/javascript-api.md | 2 +- lib/quickbeam/context.ex | 49 +++- lib/quickbeam/runtime.ex | 41 ++- lib/quickbeam/web_socket.ex | 429 +++++++++++++++++++++++++++++++ mix.exs | 2 + mix.lock | 2 + priv/ts/websocket.ts | 244 +++++++++++++----- test/wpt/wpt_websocket_test.exs | 436 ++++++++++++++++++++++++++++++++ 10 files changed, 1129 insertions(+), 80 deletions(-) create mode 100644 lib/quickbeam/web_socket.ex create mode 100644 test/wpt/wpt_websocket_test.exs diff --git a/README.md b/README.md index 7cc314e..7b698fc 100644 --- a/README.md +++ b/README.md @@ -349,7 +349,7 @@ Standard browser APIs backed by BEAM primitives, not JS polyfills: | `document`, `querySelector`, `createElement` | lexbor (native C DOM) | | `URL`, `URLSearchParams` | `:uri_string` | | `EventSource` (SSE) | `:httpc` streaming | -| `WebSocket` | `:gun` | +| `WebSocket` | `Mint.WebSocket` | | `Worker` | BEAM process per worker | | `BroadcastChannel` | `:pg` (distributed) | | `navigator.locks` | GenServer + monitors | diff --git a/docs/architecture.md b/docs/architecture.md index 98049e0..b2a4293 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -105,7 +105,7 @@ original BEAM term. The runtime loads different polyfill sets based on the `:apis` option: - **`:browser`** (default) — Web APIs backed by OTP: fetch (`:httpc`), - URL (`:uri_string`), crypto.subtle (`:crypto`), WebSocket (`:gun`), + URL (`:uri_string`), crypto.subtle (`:crypto`), WebSocket (`Mint.WebSocket`), Worker (BEAM processes), BroadcastChannel (`:pg`), localStorage (ETS), navigator.locks (GenServer), DOM (lexbor), streams, events, etc. diff --git a/docs/javascript-api.md b/docs/javascript-api.md index a625bd3..a811053 100644 --- a/docs/javascript-api.md +++ b/docs/javascript-api.md @@ -177,7 +177,7 @@ Loaded by default. These are Web platform APIs backed by OTP. |---|---| | `BroadcastChannel` | `:pg` (distributed process groups) | | `MessageChannel` / `MessagePort` | — | -| `WebSocket` | `:gun` | +| `WebSocket` | `Mint.WebSocket` | | `EventSource` | `:httpc` streaming | | `Worker` | Spawns a separate BEAM GenServer with its own JS runtime | diff --git a/lib/quickbeam/context.ex b/lib/quickbeam/context.ex index 1d6da4a..adf7071 100644 --- a/lib/quickbeam/context.ex +++ b/lib/quickbeam/context.ex @@ -45,6 +45,7 @@ defmodule QuickBEAM.Context do handlers: %{}, pending: %{}, workers: %{}, + websockets: %{}, next_worker_id: 1 ] @@ -55,6 +56,7 @@ defmodule QuickBEAM.Context do handlers: map(), pending: map(), workers: map(), + websockets: map(), next_worker_id: pos_integer() } @@ -411,6 +413,28 @@ defmodule QuickBEAM.Context do {:context_worker, action} -> handle_worker_call(action, args, call_id, state) + {:with_caller, fun} -> + caller = self() + + Task.start(fn -> + try do + args = if is_list(args), do: args, else: [args] + result = fun.(args, caller) + + QuickBEAM.Native.pool_resolve_call_term(resource, context_id, call_id, result) + rescue + e -> + QuickBEAM.Native.pool_reject_call_term( + resource, + context_id, + call_id, + Exception.message(e) + ) + end + end) + + {:noreply, state} + handler -> Task.start(fn -> try do @@ -462,9 +486,26 @@ defmodule QuickBEAM.Context do {:noreply, state} end + def handle_info({:websocket_started, socket_id, pid}, state) do + ref = Process.monitor(pid) + websockets = Map.put(state.websockets, ref, {pid, socket_id}) + {:noreply, %{state | websockets: websockets}} + end + + def handle_info({:websocket_event, message}, state) do + QuickBEAM.Native.pool_send_message(state.pool_resource, state.context_id, message) + {:noreply, state} + end + def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do - {_worker_id, workers} = Map.pop(state.workers, ref) - {:noreply, %{state | workers: workers}} + case Map.pop(state.workers, ref) do + {nil, workers} -> + {_socket, websockets} = Map.pop(state.websockets, ref) + {:noreply, %{state | workers: workers, websockets: websockets}} + + {_worker_id, workers} -> + {:noreply, %{state | workers: workers}} + end end def handle_info({ref, result}, state) when is_reference(ref) do @@ -481,6 +522,10 @@ defmodule QuickBEAM.Context do Process.exit(pid, :shutdown) end + for {_ref, {pid, _id}} <- state.websockets do + Process.exit(pid, :shutdown) + end + QuickBEAM.Native.pool_destroy_context(state.pool_resource, state.context_id) :ok end diff --git a/lib/quickbeam/runtime.ex b/lib/quickbeam/runtime.ex index 6a88107..79951da 100644 --- a/lib/quickbeam/runtime.ex +++ b/lib/quickbeam/runtime.ex @@ -5,13 +5,14 @@ defmodule QuickBEAM.Runtime do require Logger @enforce_keys [:resource] - defstruct [:resource, handlers: %{}, monitors: %{}, workers: %{}, pending: %{}] + defstruct [:resource, handlers: %{}, monitors: %{}, workers: %{}, websockets: %{}, pending: %{}] @type t :: %__MODULE__{ resource: reference(), handlers: map(), monitors: map(), workers: map(), + websockets: map(), pending: map() } @@ -173,7 +174,10 @@ defmodule QuickBEAM.Runtime do "__storage_key" => &QuickBEAM.Storage.key/1, "__storage_length" => &QuickBEAM.Storage.length/1, "__eventsource_open" => {:with_caller, &QuickBEAM.EventSource.open/2}, - "__eventsource_close" => &QuickBEAM.EventSource.close/1 + "__eventsource_close" => &QuickBEAM.EventSource.close/1, + "__ws_connect" => {:with_caller, &QuickBEAM.WebSocket.connect/2}, + "__ws_send" => &QuickBEAM.WebSocket.send_frame/1, + "__ws_close" => &QuickBEAM.WebSocket.close/1 } @beam_handlers %{ @@ -656,6 +660,17 @@ defmodule QuickBEAM.Runtime do end end + def handle_info({:websocket_started, socket_id, pid}, state) do + ref = Process.monitor(pid) + websockets = Map.put(state.websockets, ref, {pid, socket_id}) + {:noreply, %{state | websockets: websockets}} + end + + def handle_info({:websocket_event, message}, state) do + QuickBEAM.Native.send_message(state.resource, message) + {:noreply, state} + end + def handle_info({:eventsource_open, id}, state) do QuickBEAM.Native.send_message(state.resource, ["__eventsource_open", id]) {:noreply, state} @@ -701,13 +716,19 @@ defmodule QuickBEAM.Runtime do {:noreply, %{state | workers: workers}} nil -> - case Map.pop(state.monitors, ref) do - {nil, _} -> - {:noreply, state} + case Map.pop(state.websockets, ref) do + {nil, websockets} -> + case Map.pop(state.monitors, ref) do + {nil, _} -> + {:noreply, %{state | websockets: websockets}} + + {callback_id, monitors} -> + QuickBEAM.Native.send_message(state.resource, ["__qb_down", callback_id, reason]) + {:noreply, %{state | monitors: monitors, websockets: websockets}} + end - {callback_id, monitors} -> - QuickBEAM.Native.send_message(state.resource, ["__qb_down", callback_id, reason]) - {:noreply, %{state | monitors: monitors}} + {_socket, websockets} -> + {:noreply, %{state | websockets: websockets}} end end end @@ -729,6 +750,10 @@ defmodule QuickBEAM.Runtime do @impl true def terminate(_reason, %{resource: resource} = state) do + for {_ref, {pid, _id}} <- state.websockets do + Process.exit(pid, :shutdown) + end + drain_beam_calls(resource, state.handlers) QuickBEAM.Native.stop_runtime(resource) :ok diff --git a/lib/quickbeam/web_socket.ex b/lib/quickbeam/web_socket.ex new file mode 100644 index 0000000..8b1e901 --- /dev/null +++ b/lib/quickbeam/web_socket.ex @@ -0,0 +1,429 @@ +defmodule QuickBEAM.WebSocket do + @moduledoc false + use GenServer + + defstruct [ + :id, + :owner, + :owner_ref, + :url, + :protocols, + :conn, + :request_ref, + :websocket, + :upgrade_status, + :upgrade_headers, + :protocol, + :pending_close, + closed?: false, + close_sent?: false + ] + + @type state :: %__MODULE__{ + id: String.t(), + owner: pid(), + owner_ref: reference(), + url: String.t(), + protocols: [String.t()], + conn: Mint.HTTP.t() | nil, + request_ref: reference() | nil, + websocket: term() | nil, + upgrade_status: non_neg_integer() | nil, + upgrade_headers: [{String.t(), String.t()}], + protocol: String.t(), + pending_close: {non_neg_integer(), String.t()} | nil, + closed?: boolean(), + close_sent?: boolean() + } + + @spec connect([String.t() | [String.t()]], pid()) :: String.t() + def connect([url, protocols], owner_pid) do + id = Integer.to_string(System.unique_integer([:positive])) + + {:ok, pid} = + GenServer.start_link( + __MODULE__, + %{ + id: id, + owner: owner_pid, + url: url, + protocols: List.wrap(protocols) + }, + name: {:global, registry_name(id)} + ) + + send(owner_pid, {:websocket_started, id, pid}) + id + end + + @spec send_frame([String.t() | [String.t() | binary()]]) :: nil + def send_frame([id, [kind, payload]]) do + case :global.whereis_name(registry_name(id)) do + pid when is_pid(pid) -> GenServer.cast(pid, {:send, kind, payload}) + _ -> :ok + end + + nil + end + + @spec close([String.t() | non_neg_integer() | String.t()]) :: nil + def close([id, code, reason]) do + case :global.whereis_name(registry_name(id)) do + pid when is_pid(pid) -> GenServer.cast(pid, {:close, code, reason}) + _ -> :ok + end + + nil + end + + @impl true + def init(%{id: id, owner: owner, url: url, protocols: protocols}) do + owner_ref = Process.monitor(owner) + send(self(), :connect) + + {:ok, + %__MODULE__{ + id: id, + owner: owner, + owner_ref: owner_ref, + url: url, + protocols: protocols, + upgrade_headers: [], + protocol: "" + }} + end + + @impl true + def handle_info(:connect, state) do + case open_connection(state) do + {:ok, state} -> {:noreply, state} + {:error, state, reason} -> {:stop, reason, emit_error_and_close(state, reason)} + end + end + + def handle_info({:DOWN, ref, :process, _pid, _reason}, %{owner_ref: ref} = state) do + {:stop, :normal, state} + end + + def handle_info(message, %{conn: nil} = state) do + if message == :connect do + {:noreply, state} + else + {:noreply, state} + end + end + + def handle_info(message, state) do + case Mint.WebSocket.stream(state.conn, message) do + {:ok, conn, responses} -> + state = %{state | conn: conn} + handle_responses(state, responses) + + {:error, conn, reason, responses} -> + state = %{state | conn: conn} + + state = + case handle_response_list(state, responses) do + {:ok, state} -> state + {:stop, state} -> state + end + + {:stop, reason, emit_error_and_close(state, reason)} + + :unknown -> + {:noreply, state} + end + end + + @impl true + def handle_cast({:send, _kind, _payload}, %{websocket: nil} = state) do + {:noreply, state} + end + + def handle_cast({:send, kind, payload}, state) do + frame = + case kind do + "text" -> {:text, payload} + "binary" -> {:binary, payload} + end + + case stream_frame(state, frame) do + {:ok, state} -> {:noreply, state} + {:error, state, reason} -> {:stop, reason, emit_error_and_close(state, reason)} + end + end + + def handle_cast({:close, _code, _reason}, %{closed?: true} = state) do + {:noreply, state} + end + + def handle_cast({:close, code, reason}, %{websocket: nil} = state) do + {:noreply, %{state | pending_close: {code, reason}}} + end + + def handle_cast({:close, code, reason}, state) do + case do_close(state, code, reason) do + {:ok, state} -> {:noreply, state} + {:error, state, error} -> {:stop, error, emit_error_and_close(state, error)} + end + end + + @impl true + def terminate(_reason, %{conn: conn}) do + if conn, do: Mint.HTTP.close(conn) + :ok + end + + defp handle_responses(state, responses) do + case handle_response_list(state, responses) do + {:ok, state} -> {:noreply, state} + {:stop, state} -> {:stop, :normal, state} + end + end + + defp handle_response_list(state, responses) do + Enum.reduce_while(responses, {:ok, state}, fn response, {:ok, state} -> + case handle_response(state, response) do + {:ok, state} -> {:cont, {:ok, state}} + {:stop, state} -> {:halt, {:stop, state}} + end + end) + end + + defp handle_response(state, {:status, ref, status}) when ref == state.request_ref do + {:ok, %{state | upgrade_status: status}} + end + + defp handle_response(state, {:headers, ref, headers}) when ref == state.request_ref do + headers = state.upgrade_headers ++ headers + {:ok, %{state | upgrade_headers: headers}} + end + + defp handle_response(state, {:done, ref}) when ref == state.request_ref do + case Mint.WebSocket.new(state.conn, ref, state.upgrade_status, state.upgrade_headers) do + {:ok, conn, websocket} -> + protocol = response_header(state.upgrade_headers, "sec-websocket-protocol") || "" + + state = + %{state | conn: conn, websocket: websocket, protocol: protocol, upgrade_status: nil} + |> Map.put(:upgrade_headers, []) + |> notify_open() + + case state.pending_close do + {code, reason} -> + case do_close(%{state | pending_close: nil}, code, reason) do + {:ok, state} -> {:ok, state} + {:error, state, error} -> {:stop, emit_error_and_close(state, error)} + end + + nil -> + {:ok, state} + end + + {:error, conn, reason} -> + state = %{state | conn: conn} + {:stop, emit_error_and_close(state, reason)} + end + end + + defp handle_response(state, {:data, ref, data}) + when ref == state.request_ref and not is_nil(state.websocket) do + case Mint.WebSocket.decode(state.websocket, data) do + {:ok, websocket, frames} -> + handle_frames(%{state | websocket: websocket}, frames) + + {:error, websocket, reason} -> + {:stop, emit_error_and_close(%{state | websocket: websocket}, reason)} + end + end + + defp handle_response(state, _response), do: {:ok, state} + + defp handle_frames(state, frames) do + Enum.reduce_while(frames, {:ok, state}, fn frame, {:ok, state} -> + case handle_frame(state, frame) do + {:ok, state} -> {:cont, {:ok, state}} + {:stop, state} -> {:halt, {:stop, state}} + end + end) + end + + defp handle_frame(state, {:text, text}) do + {:ok, notify_text_message(state, text)} + end + + defp handle_frame(state, {:binary, data}) do + {:ok, notify_binary_message(state, data)} + end + + defp handle_frame(state, {:ping, data}) do + case stream_frame(state, {:pong, data}) do + {:ok, state} -> {:ok, state} + {:error, state, error} -> {:stop, emit_error_and_close(state, error)} + end + end + + defp handle_frame(state, {:pong, _data}) do + {:ok, state} + end + + defp handle_frame(state, {:close, code, reason}) do + state = + if state.close_sent? do + state + else + case stream_frame(state, :close) do + {:ok, state} -> state + {:error, state, _reason} -> state + end + end + + state = emit_close(%{state | close_sent?: true}, code || 1005, reason || "", true) + {:stop, state} + end + + defp open_connection(state) do + with {:ok, %{scheme: scheme, host: host, port: port, path: path} = info} <- + parse_url(state.url), + {:ok, conn} <- Mint.HTTP.connect(http_scheme(scheme), host, port, connect_opts(info)), + {:ok, conn, ref} <- + Mint.WebSocket.upgrade( + websocket_scheme(scheme), + conn, + path, + upgrade_headers(state.protocols) + ) do + {:ok, %{state | conn: conn, request_ref: ref}} + else + {:error, %Mint.TransportError{} = reason} -> {:error, state, reason} + {:error, %Mint.HTTPError{} = reason} -> {:error, state, reason} + {:error, %Mint.WebSocketError{} = reason} -> {:error, state, reason} + {:error, conn, reason} -> {:error, %{state | conn: conn}, reason} + {:error, reason} -> {:error, state, reason} + end + end + + defp do_close(%{websocket: nil} = state, _code, _reason) do + {:ok, state} + end + + defp do_close(state, code, reason) do + frame = + case {code, reason} do + {1000, ""} -> :close + {close_code, close_reason} -> {:close, close_code, close_reason} + end + + case stream_frame(state, frame) do + {:ok, state} -> {:ok, %{state | close_sent?: true}} + {:error, state, error} -> {:error, state, error} + end + end + + defp stream_frame(state, frame) do + with {:ok, websocket, data} <- Mint.WebSocket.encode(state.websocket, frame), + {:ok, conn} <- Mint.WebSocket.stream_request_body(state.conn, state.request_ref, data) do + {:ok, %{state | conn: conn, websocket: websocket}} + else + {:error, websocket, reason} when is_struct(websocket, Mint.WebSocket) -> + {:error, %{state | websocket: websocket}, reason} + + {:error, conn, reason} -> + {:error, %{state | conn: conn}, reason} + end + end + + defp parse_url(url) do + uri = URI.parse(url) + + cond do + uri.scheme not in ["ws", "wss"] -> + {:error, ArgumentError.exception("unsupported WebSocket scheme")} + + is_nil(uri.host) or uri.host == "" -> + {:error, ArgumentError.exception("missing WebSocket host")} + + true -> + {:ok, + %{ + scheme: uri.scheme, + host: uri.host, + port: uri.port || default_port(uri.scheme), + path: path_with_query(uri) + }} + end + end + + defp path_with_query(%URI{path: path, query: nil}) when path in [nil, ""], do: "/" + defp path_with_query(%URI{path: path, query: nil}), do: path + defp path_with_query(%URI{path: path, query: query}) when path in [nil, ""], do: "/?" <> query + defp path_with_query(%URI{path: path, query: query}), do: path <> "?" <> query + + defp connect_opts(%{scheme: "ws"}), do: [protocols: [:http1]] + + defp connect_opts(%{scheme: "wss", host: host}) do + [ + protocols: [:http1], + transport_opts: [ + verify: :verify_peer, + cacerts: :public_key.cacerts_get(), + server_name_indication: String.to_charlist(host), + customize_hostname_check: [match_fun: :public_key.pkix_verify_hostname_match_fun(:https)] + ] + ] + end + + defp upgrade_headers([]), do: [] + defp upgrade_headers(protocols), do: [{"sec-websocket-protocol", Enum.join(protocols, ", ")}] + + defp http_scheme("ws"), do: :http + defp http_scheme("wss"), do: :https + + defp websocket_scheme("ws"), do: :ws + defp websocket_scheme("wss"), do: :wss + + defp default_port("ws"), do: 80 + defp default_port("wss"), do: 443 + + defp response_header(headers, name) do + Enum.find_value(headers, fn + {key, value} when is_binary(key) -> if String.downcase(key) == name, do: value + _ -> nil + end) + end + + defp notify_open(state) do + send(state.owner, {:websocket_event, ["__ws_open", state.id, state.protocol]}) + state + end + + defp notify_text_message(state, payload) do + send(state.owner, {:websocket_event, ["__ws_message", state.id, payload]}) + state + end + + defp notify_binary_message(state, payload) do + send(state.owner, {:websocket_event, ["__ws_message", state.id, {:bytes, payload}]}) + state + end + + defp emit_error_and_close(state, reason) do + state + |> notify_error(reason) + |> emit_close(1006, "", false) + end + + defp notify_error(state, reason) do + send(state.owner, {:websocket_event, ["__ws_error", state.id, Exception.message(reason)]}) + state + end + + defp emit_close(%{closed?: true} = state, _code, _reason, _was_clean), do: state + + defp emit_close(state, code, reason, was_clean) do + send(state.owner, {:websocket_event, ["__ws_close", state.id, code, reason, was_clean]}) + %{state | closed?: true} + end + + defp registry_name(id), do: {__MODULE__, id} +end diff --git a/mix.exs b/mix.exs index 55b7efe..c2d24c4 100644 --- a/mix.exs +++ b/mix.exs @@ -70,8 +70,10 @@ defmodule QuickBEAM.MixProject do {:ex_slop, "~> 0.2", only: [:dev, :test], runtime: false}, {:oxc, "~> 0.5.0"}, {:npm, "~> 0.5.1"}, + {:mint_web_socket, "~> 1.0"}, {:nimble_pool, "~> 1.1"}, {:bandit, "~> 1.0", only: :test}, + {:websock_adapter, "~> 0.5", only: :test}, {:benchee, "~> 1.3", only: :bench, runtime: false}, {:quickjs_ex, "~> 0.3.1", only: :bench, runtime: false}, {:ex_doc, "~> 0.35", only: :dev, runtime: false} diff --git a/mix.lock b/mix.lock index d10a150..bd98bb6 100644 --- a/mix.lock +++ b/mix.lock @@ -21,6 +21,7 @@ "makeup_erlang": {:hex, :makeup_erlang, "1.0.3", "4252d5d4098da7415c390e847c814bad3764c94a814a0b4245176215615e1035", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "953297c02582a33411ac6208f2c6e55f0e870df7f80da724ed613f10e6706afd"}, "mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"}, "mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"}, + "mint_web_socket": {:hex, :mint_web_socket, "1.0.5", "60354efeb49b1eccf95dfb75f55b08d692e211970fe735a5eb3188b328be2a90", [:mix], [{:mint, ">= 1.4.1 and < 2.0.0-0", [hex: :mint, repo: "hexpm", optional: false]}], "hexpm", "04b35663448fc758f3356cce4d6ac067ca418bbafe6972a3805df984b5f12e61"}, "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, @@ -39,6 +40,7 @@ "telemetry": {:hex, :telemetry, "1.4.1", "ab6de178e2b29b58e8256b92b382ea3f590a47152ca3651ea857a6cae05ac423", [:rebar3], [], "hexpm", "2172e05a27531d3d31dd9782841065c50dd5c3c7699d95266b2edd54c2dafa1c"}, "thousand_island": {:hex, :thousand_island, "1.4.3", "2158209580f633be38d43ec4e3ce0a01079592b9657afff9080d5d8ca149a3af", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6e4ce09b0fd761a58594d02814d40f77daff460c48a7354a15ab353bb998ea0b"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, + "websock_adapter": {:hex, :websock_adapter, "0.5.9", "43dc3ba6d89ef5dec5b1d0a39698436a1e856d000d84bf31a3149862b01a287f", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "5534d5c9adad3c18a0f58a9371220d75a803bf0b9a3d87e6fe072faaeed76a08"}, "zig_get": {:hex, :zig_get, "0.15.2", "a6ccaa894213839ba95615bf9be2b2c9268e37ab9547a2344830202cfd6d7cc0", [:mix], [], "hexpm", "e6b0028f2d5a8da791ff8037deff5b492784017d8d241e598377a24bd765f56f"}, "zig_parser": {:hex, :zig_parser, "0.6.0", "b1296c64bf5c2592de2eb4bc8bbf79d85b1d6a3ab444c443becd7af579d85681", [:mix], [{:pegasus, "~> 0.2.4", [hex: :pegasus, repo: "hexpm", optional: false]}], "hexpm", "bb7a1523b69f7f8f6b74ba5bf9dabc83f512c0cd67d9eebba1c501a529a2f95e"}, "zigler": {:hex, :zigler, "0.15.2", "d3e8c7b6d88be2eea72c341376b7e7b0aa36248e26d5798b580cad9815311559", [:mix], [{:protoss, "~> 1.0", [hex: :protoss, repo: "hexpm", optional: false]}, {:zig_get, "0.15.2", [hex: :zig_get, repo: "hexpm", optional: false]}, {:zig_parser, "~> 0.6", [hex: :zig_parser, repo: "hexpm", optional: false]}], "hexpm", "6bf96df41e281a70147e8826e439e24945f0222e08d058fc544da84f5c7ea8dd"}, diff --git a/priv/ts/websocket.ts b/priv/ts/websocket.ts index 4b04e64..848975e 100644 --- a/priv/ts/websocket.ts +++ b/priv/ts/websocket.ts @@ -1,9 +1,61 @@ import { Blob, SYM_BYTES } from './blob' import { DOMException } from './dom-exception' -import { Event, MessageEvent, CloseEvent } from './event' +import { Event, MessageEvent, CloseEvent, ErrorEvent } from './event' import { EventTarget } from './event-target' -const SYM_HANDLE_EVENT = Symbol('handleEvent') +const websocketRegistry = new Map() +const PROTOCOL_RE = /^[!#$%&'*+\-.^_`|~0-9A-Za-z]+$/ + +function normalizeWebSocketURL(input: string): string { + let parsed: URL + + try { + parsed = new URL(input) + } catch { + throw new DOMException('The URL is invalid.', 'SyntaxError') + } + + if (parsed.hash !== '') { + throw new DOMException('The URL contains a fragment identifier.', 'SyntaxError') + } + + if (parsed.protocol === 'http:') parsed.protocol = 'ws:' + if (parsed.protocol === 'https:') parsed.protocol = 'wss:' + + if (parsed.protocol !== 'ws:' && parsed.protocol !== 'wss:') { + throw new DOMException('The URL scheme must be ws, wss, http, or https.', 'SyntaxError') + } + + return parsed.href +} + +function normalizeProtocols(protocols?: string | string[]): string[] { + const values = typeof protocols === 'string' ? [protocols] : [...(protocols ?? [])] + const seen = new Set() + + for (const protocol of values) { + if (!PROTOCOL_RE.test(protocol)) { + throw new DOMException('The subprotocol is invalid.', 'SyntaxError') + } + + const lower = protocol.toLowerCase() + if (seen.has(lower)) { + throw new DOMException('The subprotocol list contains duplicates.', 'SyntaxError') + } + + seen.add(lower) + } + + return values +} + +function isArrayBufferView(value: unknown): value is ArrayBufferView { + return ArrayBuffer.isView(value) +} + +function arrayBufferFrom(bytes: Uint8Array): ArrayBuffer { + return bytes.buffer.slice(bytes.byteOffset, bytes.byteOffset + bytes.byteLength) as ArrayBuffer +} class WebSocket extends EventTarget { static readonly CONNECTING = 0 @@ -31,120 +83,178 @@ class WebSocket extends EventTarget { constructor(url: string, protocols?: string | string[]) { super() - this.url = url - let protoArray: string[] = [] - if (typeof protocols === 'string') protoArray = [protocols] - else if (protocols) protoArray = protocols - this.#id = Beam.callSync('__ws_connect', url, protoArray) as string + const normalizedUrl = normalizeWebSocketURL(url) + const normalizedProtocols = normalizeProtocols(protocols) + + this.url = normalizedUrl + this.#id = Beam.callSync('__ws_connect', normalizedUrl, normalizedProtocols) as string + websocketRegistry.set(this.#id, this) } get readyState(): number { return this.#readyState } + get protocol(): string { return this.#protocol } + get binaryType(): BinaryType { return this.#binaryType } + set binaryType(value: BinaryType) { - this.#binaryType = value + if (value === 'blob' || value === 'arraybuffer') { + this.#binaryType = value + } } + get bufferedAmount(): number { return this.#bufferedAmount } - send(data: string | ArrayBuffer | Uint8Array | Blob): void { + send(data: string | BufferSource | Blob): void { if (this.#readyState === WebSocket.CONNECTING) { throw new DOMException( 'WebSocket is not open: readyState 0 (CONNECTING)', 'InvalidStateError' ) } + if (this.#readyState !== WebSocket.OPEN) return - let payload: string | Uint8Array + let payload: ['text', string] | ['binary', Uint8Array] + let size = 0 + if (typeof data === 'string') { - payload = data - } else if (data instanceof Uint8Array) { - payload = data - } else if (data instanceof ArrayBuffer) { - payload = new Uint8Array(data) + payload = ['text', data] + size = new TextEncoder().encode(data).byteLength } else if (data instanceof Blob) { - payload = data[SYM_BYTES]() + const bytes = data[SYM_BYTES]() + payload = ['binary', bytes] + size = bytes.byteLength + } else if (data instanceof ArrayBuffer) { + const bytes = new Uint8Array(data) + payload = ['binary', bytes] + size = bytes.byteLength + } else if (isArrayBufferView(data)) { + const bytes = new Uint8Array(data.buffer, data.byteOffset, data.byteLength) + payload = ['binary', bytes] + size = bytes.byteLength } else { - payload = JSON.stringify(data) + throw new TypeError('WebSocket.send requires a string, Blob, ArrayBuffer, or ArrayBufferView') } + this.#bufferedAmount += size + queueMicrotask(() => { + this.#bufferedAmount = Math.max(0, this.#bufferedAmount - size) + }) + void Beam.call('__ws_send', this.#id, payload) } close(code?: number, reason?: string): void { if (this.#readyState === WebSocket.CLOSING || this.#readyState === WebSocket.CLOSED) return - if (code !== undefined && code !== 1000 && (code < 3000 || code > 4999)) { + if (code === undefined && reason !== undefined) { throw new DOMException( - `The code must be either 1000, or between 3000 and 4999. ${code} is neither.`, + 'A close reason may only be given if a code is also supplied.', 'InvalidAccessError' ) } + if (code !== undefined) { + if (!Number.isInteger(code)) { + throw new DOMException('The close code must be an integer.', 'InvalidAccessError') + } + + if (code !== 1000 && (code < 3000 || code > 4999)) { + throw new DOMException( + `The code must be either 1000, or between 3000 and 4999. ${code} is neither.`, + 'InvalidAccessError' + ) + } + } + + if (reason !== undefined && new TextEncoder().encode(reason).byteLength > 123) { + throw new DOMException('The close reason must not be greater than 123 bytes.', 'SyntaxError') + } + this.#readyState = WebSocket.CLOSING void Beam.call('__ws_close', this.#id, code ?? 1000, reason ?? '') } - [SYM_HANDLE_EVENT]( - type: string, - detail?: { - data?: unknown - code?: number - reason?: string - protocol?: string - wasClean?: boolean - } - ): void { - switch (type) { - case 'open': - this.#readyState = WebSocket.OPEN - this.#protocol = detail?.protocol ?? '' - { - const ev = new Event('open') - this.onopen?.(ev) - this.dispatchEvent(ev) - } - break - - case 'message': { - let messageData: unknown = detail?.data - if (this.#binaryType === 'arraybuffer' && messageData instanceof Uint8Array) { - messageData = messageData.buffer - } - const ev = new MessageEvent('message', { data: messageData }) - this.onmessage?.(ev) - this.dispatchEvent(ev) - break - } + _onOpen(protocol: string): void { + this.#readyState = WebSocket.OPEN + this.#protocol = protocol + const event = new Event('open') + this.dispatchEvent(event) + this.onopen?.(event) + } - case 'close': { - this.#readyState = WebSocket.CLOSED - const ev = new CloseEvent('close', { - code: detail?.code ?? 1006, - reason: detail?.reason ?? '', - wasClean: detail?.wasClean ?? false - }) - this.onclose?.(ev) - this.dispatchEvent(ev) - break - } + _onMessage(data: unknown): void { + let messageData = data - case 'error': { - const ev = new Event('error') - this.onerror?.(ev) - this.dispatchEvent(ev) - break - } + if (data instanceof Uint8Array) { + messageData = + this.#binaryType === 'arraybuffer' ? arrayBufferFrom(data) : new Blob([data]) } + + const event = new MessageEvent('message', { data: messageData }) + this.dispatchEvent(event) + this.onmessage?.(event) + } + + _onError(reason: string): void { + const event = new ErrorEvent('error', { message: reason }) + this.dispatchEvent(event) + this.onerror?.(event) + } + + _onClose(code: number, reason: string, wasClean: boolean): void { + this.#readyState = WebSocket.CLOSED + websocketRegistry.delete(this.#id) + const event = new CloseEvent('close', { code, reason, wasClean }) + this.dispatchEvent(event) + this.onclose?.(event) } } +declare const __qb_register_dispatcher: (fn: (msg: unknown) => boolean) => void + +__qb_register_dispatcher((msg: unknown): boolean => { + if (!Array.isArray(msg) || msg.length < 3) return false + + const [type, id, ...rest] = msg + if (typeof id !== 'string') return false + + const websocket = websocketRegistry.get(id) + if (!websocket) return false + + switch (type) { + case '__ws_open': + websocket._onOpen((rest[0] as string) ?? '') + return true + + case '__ws_message': + websocket._onMessage(rest[0]) + return true + + case '__ws_error': + websocket._onError(String(rest[0] ?? 'WebSocket error')) + return true + + case '__ws_close': + websocket._onClose( + typeof rest[0] === 'number' ? rest[0] : 1006, + typeof rest[1] === 'string' ? rest[1] : '', + rest[2] === true + ) + return true + + default: + return false + } +}) + export { WebSocket } diff --git a/test/wpt/wpt_websocket_test.exs b/test/wpt/wpt_websocket_test.exs new file mode 100644 index 0000000..bfdf8ad --- /dev/null +++ b/test/wpt/wpt_websocket_test.exs @@ -0,0 +1,436 @@ +defmodule QuickBEAM.WPT.WebSocketTest do + @moduledoc "Ported from WPT: Create-http-urls.any.js, Create-invalid-urls.any.js, Create-valid-url-array-protocols.any.js, Create-valid-url-binaryType-blob.any.js, Create-valid-url-protocol-empty.any.js, Create-valid-url-protocol-setCorrectly.any.js, Create-protocol-with-space.any.js, Create-protocols-repeated.any.js, Create-nonAscii-protocol-string.any.js, binaryType-wrong-value.any.js, close-invalid.any.js, Close-onlyReason.any.js, Close-server-initiated-close.any.js, Close-undefined.any.js, Send-binary-arraybuffer.any.js, Send-binary-blob.any.js" + use ExUnit.Case, async: false + use Plug.Router + + plug(:match) + plug(:dispatch) + + defmodule EchoSocket do + @behaviour WebSock + + @impl true + def init(state), do: {:ok, state} + + @impl true + def handle_in({"Goodbye", opcode: :text}, state), + do: {:stop, :normal, {1000, "Goodbye"}, state} + + def handle_in({data, opcode: opcode}, state), do: {:push, {opcode, data}, state} + + @impl true + def handle_info(_msg, state), do: {:ok, state} + + @impl true + def terminate(_reason, _state), do: :ok + end + + get "/echo" do + WebSockAdapter.upgrade(conn, EchoSocket, %{}, []) + end + + get "/protocol" do + conn = + case get_req_header(conn, "sec-websocket-protocol") do + [header] -> + case negotiated_protocol(header) do + nil -> conn + protocol -> put_resp_header(conn, "sec-websocket-protocol", protocol) + end + + _ -> + conn + end + + WebSockAdapter.upgrade(conn, EchoSocket, %{}, []) + end + + match _ do + send_resp(conn, 404, "not found") + end + + setup_all do + {:ok, server} = + Bandit.start_link(plug: __MODULE__, port: 0, ip: :loopback, startup_log: false) + + {:ok, {_addr, port}} = ThousandIsland.listener_info(server) + {:ok, pool} = QuickBEAM.ContextPool.start_link() + + base_host = "127.0.0.1:#{port}" + + %{ + pool: pool, + ws_echo_url: "ws://#{base_host}/echo", + http_echo_url: "http://#{base_host}/echo", + protocol_url: "ws://#{base_host}/protocol" + } + end + + setup do + {:ok, rt} = QuickBEAM.start() + + on_exit(fn -> + try do + QuickBEAM.stop(rt) + catch + :exit, _ -> :ok + end + end) + + %{rt: rt} + end + + describe "WPT WebSocket constructor" do + test "http URLs normalize to ws", %{ + rt: rt, + http_echo_url: http_echo_url, + ws_echo_url: ws_echo_url + } do + assert {:ok, true} = + QuickBEAM.eval(rt, """ + const ws = new WebSocket(#{inspect(http_echo_url)}); + const ok = ws.url === #{inspect(ws_echo_url)}; + ws.close(); + ok; + """) + end + + test "invalid URLs throw SyntaxError", %{rt: rt, http_echo_url: http_echo_url} do + fragment_url = http_echo_url <> "#test" + + assert {:ok, [true, true, true, true, true, true]} = + QuickBEAM.eval(rt, """ + const inputs = [ + 'ws://foo bar.com/', + 'ftp://example.com/', + 'mailto:example@example.org', + 'about:blank', + #{inspect(fragment_url)}, + '#test' + ]; + + inputs.map((input) => { + try { + new WebSocket(input); + return false; + } catch (e) { + return e instanceof DOMException && e.name === 'SyntaxError'; + } + }); + """) + end + + test "protocol is empty before connection is established", %{rt: rt, ws_echo_url: ws_echo_url} do + assert {:ok, true} = + QuickBEAM.eval(rt, """ + const ws = new WebSocket(#{inspect(ws_echo_url)}); + const ok = ws.protocol === ''; + ws.close(); + ok; + """) + end + + test "repeated protocols throw SyntaxError", %{rt: rt, ws_echo_url: ws_echo_url} do + assert {:ok, true} = + QuickBEAM.eval(rt, """ + try { + new WebSocket(#{inspect(ws_echo_url)}, ['echo', 'echo']); + false; + } catch (e) { + e instanceof DOMException && e.name === 'SyntaxError'; + } + """) + end + + test "protocols with spaces throw SyntaxError", %{rt: rt, ws_echo_url: ws_echo_url} do + assert {:ok, true} = + QuickBEAM.eval(rt, """ + try { + new WebSocket(#{inspect(ws_echo_url)}, 'ec ho'); + false; + } catch (e) { + e instanceof DOMException && e.name === 'SyntaxError'; + } + """) + end + + test "non-ascii protocols throw SyntaxError", %{rt: rt, ws_echo_url: ws_echo_url} do + assert {:ok, true} = + QuickBEAM.eval(rt, """ + try { + new WebSocket(#{inspect(ws_echo_url)}, '\u0080echo'); + false; + } catch (e) { + e instanceof DOMException && e.name === 'SyntaxError'; + } + """) + end + end + + describe "WPT WebSocket connection state" do + test "binaryType defaults to blob", %{rt: rt, ws_echo_url: ws_echo_url} do + assert {:ok, true} = + QuickBEAM.eval( + rt, + """ + await new Promise((resolve, reject) => { + const ws = new WebSocket(#{inspect(ws_echo_url)}); + ws.onopen = () => { + const ok = ws.binaryType === 'blob'; + ws.close(); + resolve(ok); + }; + ws.onerror = () => reject(new Error('unexpected error')); + }); + """, + timeout: 5000 + ) + end + + test "invalid binaryType assignment is ignored", %{rt: rt, ws_echo_url: ws_echo_url} do + assert {:ok, true} = + QuickBEAM.eval( + rt, + """ + await new Promise((resolve, reject) => { + const ws = new WebSocket(#{inspect(ws_echo_url)}); + ws.onopen = () => { + ws.binaryType = 'notBlobOrArrayBuffer'; + const ok = ws.binaryType === 'blob'; + ws.close(); + resolve(ok); + }; + ws.onerror = () => reject(new Error('unexpected error')); + }); + """, + timeout: 5000 + ) + end + + test "string protocol is negotiated", %{rt: rt, protocol_url: protocol_url} do + assert {:ok, true} = + QuickBEAM.eval( + rt, + """ + await new Promise((resolve, reject) => { + const ws = new WebSocket(#{inspect(protocol_url)}, 'echo'); + ws.onopen = () => { + const ok = ws.protocol === 'echo'; + ws.close(); + resolve(ok); + }; + ws.onerror = () => reject(new Error('unexpected error')); + }); + """, + timeout: 5000 + ) + end + + test "array protocols are accepted", %{rt: rt, protocol_url: protocol_url} do + assert {:ok, true} = + QuickBEAM.eval( + rt, + """ + await new Promise((resolve, reject) => { + const ws = new WebSocket(#{inspect(protocol_url)}, ['echo', 'chat']); + ws.onopen = () => { + const ok = ws.readyState === WebSocket.OPEN && ws.protocol === 'echo'; + ws.close(); + resolve(ok); + }; + ws.onerror = () => reject(new Error('unexpected error')); + }); + """, + timeout: 5000 + ) + end + end + + describe "WPT WebSocket close" do + test "invalid close codes throw InvalidAccessError", %{rt: rt, ws_echo_url: ws_echo_url} do + assert {:ok, [true, true, true, true, true, true]} = + QuickBEAM.eval( + rt, + """ + await Promise.all([ + 0, + 500, + NaN, + 'string', + null, + 0x10000 + 1000 + ].map((value) => new Promise((resolve, reject) => { + const ws = new WebSocket(#{inspect(ws_echo_url)}); + ws.onopen = () => { + let ok = false; + try { + ws.close(value); + } catch (e) { + ok = e instanceof DOMException && e.name === 'InvalidAccessError'; + } + ws.close(); + resolve(ok); + }; + ws.onerror = () => reject(new Error('unexpected error')); + }))); + """, + timeout: 5000 + ) + end + + test "close with only reason throws InvalidAccessError", %{rt: rt, ws_echo_url: ws_echo_url} do + assert {:ok, true} = + QuickBEAM.eval( + rt, + """ + await new Promise((resolve, reject) => { + const ws = new WebSocket(#{inspect(ws_echo_url)}); + ws.onopen = () => { + let ok = false; + try { + ws.close(undefined, 'Close with only reason'); + } catch (e) { + ok = e instanceof DOMException && e.name === 'InvalidAccessError'; + } + ws.close(); + resolve(ok); + }; + ws.onerror = () => reject(new Error('unexpected error')); + }); + """, + timeout: 5000 + ) + end + + test "close(undefined) succeeds", %{rt: rt, ws_echo_url: ws_echo_url} do + assert {:ok, true} = + QuickBEAM.eval( + rt, + """ + await new Promise((resolve, reject) => { + const ws = new WebSocket(#{inspect(ws_echo_url)}); + ws.onopen = () => ws.close(undefined); + ws.onclose = () => resolve(true); + ws.onerror = () => reject(new Error('unexpected error')); + }); + """, + timeout: 5000 + ) + end + + test "server initiated close is clean", %{rt: rt, ws_echo_url: ws_echo_url} do + assert {:ok, true} = + QuickBEAM.eval( + rt, + """ + await new Promise((resolve, reject) => { + const ws = new WebSocket(#{inspect(ws_echo_url)}); + let opened = false; + ws.onopen = () => { + opened = true; + ws.send('Goodbye'); + }; + ws.onclose = (event) => { + resolve(opened && ws.readyState === WebSocket.CLOSED && event.wasClean === true); + }; + ws.onerror = () => reject(new Error('unexpected error')); + }); + """, + timeout: 5000 + ) + end + end + + describe "WPT WebSocket binary sending" do + test "ArrayBuffer echoes as ArrayBuffer when binaryType is arraybuffer", %{ + rt: rt, + ws_echo_url: ws_echo_url + } do + assert {:ok, true} = + QuickBEAM.eval( + rt, + """ + await new Promise((resolve, reject) => { + const ws = new WebSocket(#{inspect(ws_echo_url)}); + ws.binaryType = 'arraybuffer'; + ws.onopen = () => { + const data = new ArrayBuffer(15); + ws.send(data); + if (ws.bufferedAmount !== 15) { + reject(new Error(`expected bufferedAmount 15, got ${ws.bufferedAmount}`)); + } + }; + ws.onmessage = (event) => { + const ok = event.data instanceof ArrayBuffer && event.data.byteLength === 15; + ws.close(); + resolve(ok); + }; + ws.onerror = () => reject(new Error('unexpected error')); + }); + """, + timeout: 5000 + ) + end + + test "Blob echoes as Blob by default", %{rt: rt, ws_echo_url: ws_echo_url} do + assert {:ok, true} = + QuickBEAM.eval( + rt, + """ + await new Promise((resolve, reject) => { + const ws = new WebSocket(#{inspect(ws_echo_url)}); + ws.onopen = () => { + const data = new Blob([new Uint8Array(65000)]); + ws.send(data); + }; + ws.onmessage = (event) => { + const ok = event.data instanceof Blob && event.data.size === 65000; + ws.close(); + resolve(ok); + }; + ws.onerror = () => reject(new Error('unexpected error')); + }); + """, + timeout: 5000 + ) + end + end + + describe "Context integration" do + test "WebSocket works in contexts", %{pool: pool, ws_echo_url: ws_echo_url} do + {:ok, ctx} = QuickBEAM.Context.start_link(pool: pool) + + on_exit(fn -> + try do + QuickBEAM.Context.stop(ctx) + catch + :exit, _ -> :ok + end + end) + + assert {:ok, "context"} = + QuickBEAM.Context.eval( + ctx, + """ + await new Promise((resolve, reject) => { + const ws = new WebSocket(#{inspect(ws_echo_url)}); + ws.onopen = () => ws.send('context'); + ws.onmessage = (event) => { + ws.close(); + resolve(event.data); + }; + ws.onerror = () => reject(new Error('unexpected error')); + }); + """, + timeout: 5000 + ) + end + end + + defp negotiated_protocol(header) do + header + |> String.split(",") + |> Enum.map(&String.trim/1) + |> Enum.find(&(&1 == "echo")) + end +end From 10dde0e2995f62c41305878e59cfcb04106a6cf7 Mon Sep 17 00:00:00 2001 From: Danila Poyarkov Date: Tue, 31 Mar 2026 22:38:00 +0300 Subject: [PATCH 2/2] Propagate module eval job errors --- lib/quickbeam/worker.zig | 33 ++++++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/lib/quickbeam/worker.zig b/lib/quickbeam/worker.zig index 21e2a0c..4e11cef 100644 --- a/lib/quickbeam/worker.zig +++ b/lib/quickbeam/worker.zig @@ -99,6 +99,19 @@ pub const WorkerState = struct { } } + fn drain_jobs_or_set_error(self: *WorkerState, result: *Result) bool { + var pctx: ?*qjs.JSContext = null; + while (true) { + const ret = qjs.JS_ExecutePendingJob(self.rt, &pctx); + if (ret > 0) continue; + if (ret < 0) { + self.set_error_term_from_ctx(pctx orelse self.ctx, result); + return false; + } + return true; + } + } + pub fn next_timer_timeout_ns(self: *WorkerState) ?u64 { var min_deadline: ?i128 = null; var it = self.timers.valueIterator(); @@ -427,7 +440,10 @@ pub const WorkerState = struct { const val = qjs.JS_EvalFunction(self.ctx, func); defer qjs.JS_FreeValue(self.ctx, val); - self.drain_jobs(); + + if (!self.drain_jobs_or_set_error(result)) { + return; + } if (js.js_is_exception(val)) { self.set_error_term(result); @@ -517,7 +533,10 @@ pub const WorkerState = struct { const eval_result = qjs.JS_EvalFunction(self.ctx, val); defer qjs.JS_FreeValue(self.ctx, eval_result); - self.drain_jobs(); + + if (!self.drain_jobs_or_set_error(result)) { + return; + } if (js.js_is_exception(eval_result)) { self.set_error_term(result); @@ -664,12 +683,16 @@ pub const WorkerState = struct { } fn set_error_term(self: *WorkerState, result: *Result) void { - const exc = qjs.JS_GetException(self.ctx); - defer qjs.JS_FreeValue(self.ctx, exc); + self.set_error_term_from_ctx(self.ctx, result); + } + + fn set_error_term_from_ctx(self: *WorkerState, ctx: *qjs.JSContext, result: *Result) void { + const exc = qjs.JS_GetException(ctx); + defer qjs.JS_FreeValue(ctx, exc); const term_env = beam.alloc_env(); result.ok = false; - result.term = js_to_beam.convert_error_with_limits(self.ctx, exc, term_env, self.convert_limits()); + result.term = js_to_beam.convert_error_with_limits(ctx, exc, term_env, self.convert_limits()); result.env = term_env; }