Skip to content

Commit 14d578a

Browse files
sleipnirAdriano Santospolvalente
authored
[Feat] Error handler in streams (#451)
* chore: release new version * bump 0.10.2 -> 0.11.0 * feat: added new function to handle side-effects * chore: added doc, remove comments * feat: added error handler unary and stream pipelines * test: added many more tests * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * fix: correct return type in doc * Update lib/grpc/stream/operators.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * chore: updated after review * docs: adds a better explanation of the different types of input * mix format * test: reintroduces tests that were removed by mistake * docs: introduces documentation for error handling and side effects * Update README.md Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update README.md Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update lib/grpc/stream.ex Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update test/grpc/integration/server_test.exs Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update test/grpc/stream_test.exs Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * Update test/grpc/stream_test.exs Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com> * chore: resolve review comments * chore: remove markdown clutter --------- Co-authored-by: Adriano Santos <adriano.santos@v3.com.br> Co-authored-by: Paulo Valente <16843419+polvalente@users.noreply.github.com>
1 parent af23fd8 commit 14d578a

File tree

13 files changed

+559
-101
lines changed

13 files changed

+559
-101
lines changed

README.md

Lines changed: 106 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
- [Unary RPC using Stream API](#unary-rpc-using-stream-api)
1818
- [Server-Side Streaming](#server-side-streaming)
1919
- [Bidirectional Streaming](#bidirectional-streaming)
20+
- [Effects and Error Handling](#effects-and-error-handling)
21+
- [Side Effects](#side-effects-with-effect2)
22+
- [Recovery from errors](#recovery-from-errors)
23+
- [Unified Error Matching and Propagation](#unified-error-matching-and-propagation)
2024
- [Application Startup](#application-startup)
2125
- [Client Usage](#client-usage)
2226
- [Basic Connection and RPC](#basic-connection-and-rpc)
@@ -101,8 +105,9 @@ defmodule HelloworldStreams.Server do
101105
alias Helloworld.HelloReply
102106

103107
@spec say_unary_hello(HelloRequest.t(), GRPC.Server.Stream.t()) :: any()
104-
def say_unary_hello(request, _materializer) do
105-
GRPC.Stream.unary(request)
108+
def say_unary_hello(request, materializer) do
109+
request
110+
|> GRPC.Stream.unary(materializer: materializer)
106111
|> GRPC.Stream.map(fn %HelloReply{} = reply ->
107112
%HelloReply{message: "[Reply] #{reply.message}"}
108113
end)
@@ -144,28 +149,104 @@ def say_bid_stream_hello(request, materializer) do
144149
|> GRPC.Stream.run_with(materializer)
145150
end
146151
```
147-
The Stream API supports composable stream transformations via `ask`, `map`, `run` and others functions, enabling clean and declarative stream pipelines. See the table below:
148-
149-
| Function | Description | Parameters / Options |
150-
|:---------------------------------|:-------------|:----------------------|
151-
| **`from(input, opts \\\\ [])`** | Converts a gRPC stream (or list) into a `Flow` with backpressure support. Allows joining with external `GenStage` producers. | **Parameters:**<br>• `input` — stream, list, or gRPC struct.<br>**Options:**<br>• `:join_with` — PID or name of an external `GenStage` producer.<br>• `:dispatcher` — dispatcher module (default: `GenStage.DemandDispatcher`).<br>• `:propagate_context` — if `true`, propagates the materializer context.<br>• `:materializer` — the current `%GRPC.Server.Stream{}`.<br>• Other options supported by `Flow`. |
152-
| **`unary(input, opts \\\\ [])`** | Creates a `Flow` from a single gRPC request (unary). Useful for non-streaming calls that still leverage the Flow API. | **Parameters:**<br>• `input` — single gRPC message.<br>**Options:** same as `from/2`. |
153-
| **`to_flow(stream)`** | Returns the underlying `Flow` from a `GRPC.Stream`. If uninitialized, returns `Flow.from_enumerable([])`. | **Parameters:**<br>• `stream``%GRPC.Stream{}` struct. |
154-
| **`run(stream)`** | Executes the `Flow` for a unary stream and returns the first materialized result. | **Parameters:**<br>• `stream``%GRPC.Stream{}` with `unary: true` option. |
155-
| **`run_with(stream, materializer, opts \\\\ [])`** | Executes the `Flow` and sends responses into the gRPC server stream. Supports `:dry_run` for test mode without sending messages. | **Parameters:**<br>• `stream``%GRPC.Stream{}`.<br>• `materializer``%GRPC.Server.Stream{}`.<br>**Options:**<br>• `:dry_run` — if `true`, responses are not sent. |
156-
| **`ask(stream, target, timeout \\\\ 5000)`** | Sends a request to an external process (`PID` or named process) and waits for a response (`{:response, msg}`). Returns an updated stream or an error. | **Parameters:**<br>• `stream``%GRPC.Stream{}`.<br>• `target` — PID or atom.<br>• `timeout` — in milliseconds. |
157-
| **`ask!(stream, target, timeout \\\\ 5000)`** | Same as `ask/3`, but raises an exception on failure (aborts the Flow). | Same parameters as `ask/3`. |
158-
| **`filter(stream, fun)`** | Filters items in the stream by applying a concurrent predicate function. | **Parameters:**<br>• `stream``%GRPC.Stream{}`.<br>• `fun` — function `(item -> boolean)`. |
159-
| **`flat_map(stream, fun)`** | Applies a function returning a list or enumerable, flattening the results. | **Parameters:**<br>• `stream``%GRPC.Stream{}`.<br>• `fun``(item -> Enumerable.t())`. |
160-
| **`map(stream, fun)`** | Applies a transformation function to each item in the stream. | **Parameters:**<br>• `stream``%GRPC.Stream{}`.<br>• `fun``(item -> term)`. |
161-
| **`map_with_context(stream, fun)`** | Applies a function to each item, passing the stream context (e.g., headers) as an additional argument. | **Parameters:**<br>• `stream``%GRPC.Stream{}`.<br>• `fun``(context, item -> term)`. |
162-
| **`partition(stream, opts \\\\ [])`** | Partitions the stream to group items by key or condition before stateful operations like `reduce/3`. | **Parameters:**<br>• `stream``%GRPC.Stream{}`.<br>• `opts` — partitioning options (`Flow.partition/2`). |
163-
| **`reduce(stream, acc_fun, reducer_fun)`** | Reduces the stream using an accumulator, useful for aggregations. | **Parameters:**<br>• `stream``%GRPC.Stream{}`.<br>• `acc_fun` — initializer function `() -> acc`.<br>• `reducer_fun``(item, acc -> acc)`. |
164-
| **`uniq(stream)`** | Emits only distinct items from the stream (no custom uniqueness criteria). | **Parameters:**<br>• `stream``%GRPC.Stream{}`. |
165-
| **`uniq_by(stream, fun)`** | Emits only unique items based on the return value of the provided function. | **Parameters:**<br>• `stream``%GRPC.Stream{}`.<br>• `fun``(item -> term)` for uniqueness determination. |
166-
| **`get_headers(stream)`** | Retrieves HTTP/2 headers from a `%GRPC.Server.Stream{}`. | **Parameters:**<br>• `stream``%GRPC.Server.Stream{}`.<br>**Returns:** `map` containing decoded headers. |
167-
168-
For a complete list of available operators see [here](lib/grpc/stream.ex).
152+
The Stream API supports composable stream transformations via `ask`, `map`, `run` and others functions, enabling clean and declarative stream pipelines. For a complete list of available operators see [here](lib/grpc/stream.ex).
153+
154+
---
155+
156+
### Effects and Error Handling
157+
158+
#### Side Effects
159+
160+
The `effect/2` operator executes user-defined functions for each element in the stream, allowing the integration of non-transformative actions such as logging, metrics, or external notifications.
161+
162+
Unlike transformation operators (e.g., `map/2`), `effect/2` does not modify or filter values — it preserves the original stream while executing the provided callback safely for each emitted element.
163+
164+
```elixir
165+
iex> parent = self()
166+
iex> stream =
167+
...> GRPC.Stream.from([1, 2, 3])
168+
...> |> GRPC.Stream.effect(fn x -> send(parent, {:seen, x * 2}) end)
169+
...> |> GRPC.Stream.to_flow()
170+
...> |> Enum.to_list()
171+
iex> assert_receive {:seen, 2}
172+
iex> assert_receive {:seen, 4}
173+
iex> assert_receive {:seen, 6}
174+
iex> stream
175+
[1, 2, 3]
176+
```
177+
178+
Key characteristics:
179+
180+
* The callback function (`effect_fun`) is invoked for each item emitted downstream.
181+
* The result of the callback is ignored, ensuring that the stream’s structure and values remain unchanged.
182+
* Execution is lazy and occurs only when the stream is materialized using run/1, run_with/3, or to_flow/1.
183+
* Exceptions raised inside the callback are captured internally, preventing interruption of the dataflow.
184+
185+
This operator is designed for observability, telemetry, auditing, and integration with external systems that must react to events flowing through the gRPC stream.
186+
187+
---
188+
189+
#### Recovery from errors
190+
191+
The `map_error/2` operator intercepts and transforms errors or exceptions emitted by previous stages in a stream pipeline.
192+
193+
It provides a unified mechanism for handling:
194+
195+
* Expected errors, such as validation or domain failures (`{:error, reason}`)
196+
* Unexpected runtime errors, including raised or thrown exceptions inside other operators.
197+
198+
```elixir
199+
iex> GRPC.Stream.from([1, 2])
200+
...> |> GRPC.Stream.map(fn
201+
...> 2 -> raise "boom"
202+
...> x -> x
203+
...> end)
204+
...> |> GRPC.Stream.map_error(fn
205+
...> {:error, {:exception, _reason}} ->
206+
...> {:error, GRPC.RPCError.exception(message: "Booomm")}
207+
...> end)
208+
```
209+
210+
In this example:
211+
212+
* The function inside `map/2` raises an exception for the value `2`.
213+
* `map_error/2` captures and transforms that error into a structured `GRPC.RPCError` response.
214+
* The stream continues processing without being interrupted.
215+
216+
This makes map_error/2 suitable for input validation, runtime fault recovery, and user-facing error translation within gRPC pipelines.
217+
218+
---
219+
220+
#### Unified Error Matching and Propagation
221+
222+
All stream operators share a unified error propagation model that guarantees consistent handling of exceptions and failures across the pipeline.
223+
224+
This ensures that user-defined functions within the stream — whether pure transformations, side effects, or external calls — always produce a predictable and recoverable result, maintaining the integrity of the dataflow even in the presence of unexpected errors.
225+
226+
```elixir
227+
def say_unary_hello(request, _materializer) do
228+
GRPCStream.unary(request)
229+
|> GRPCStream.ask(Transformer)
230+
|> GRPCStream.map(fn
231+
%HelloReply{} = reply ->
232+
%HelloReply{message: "[Reply] #{reply.message}"}
233+
234+
{:error, reason} ->
235+
{:error, GRPC.RPCError.exception(message: "error calling external process: #{inspect(reason)}")}
236+
237+
error ->
238+
Logger.error("Unknown error")
239+
error
240+
end)
241+
|> GRPCStream.run()
242+
end
243+
```
244+
245+
By normalizing all possible outcomes, `GRPC.Stream` ensures fault-tolerant, exception-safe pipelines where operators can freely raise, throw, or return tuples without breaking the flow execution.
246+
247+
This unified model allows developers to build composable and reliable streaming pipelines that gracefully recover from both domain and runtime errors.
248+
249+
>_NOTE_: In the example above, we could use `map_error/2` instead of `map/2` to handle error cases explicitly. However, since the function also performs a transformation on successful values, `map/2` remains appropriate and useful in this context.
169250
170251
---
171252

@@ -175,7 +256,7 @@ Add the server supervisor to your application's supervision tree:
175256

176257
```elixir
177258
defmodule Helloworld.Application do
178-
@ false
259+
@moduledoc false
179260
use Application
180261

181262
@impl true

examples/helloworld_streams/lib/helloworld_streams/server.ex

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,12 @@ defmodule HelloworldStreams.Server do
1414
def say_unary_hello(request, _materializer) do
1515
GRPCStream.unary(request)
1616
|> GRPCStream.ask(Transformer)
17-
|> GRPCStream.map(fn %HelloReply{} = reply ->
18-
%HelloReply{message: "[Reply] #{reply.message}"}
17+
|> GRPCStream.map(fn
18+
%HelloReply{} = reply ->
19+
%HelloReply{message: "[Reply] #{reply.message}"}
20+
21+
{:error, reason} ->
22+
GRPC.RPCError.exception(message: "[Error] #{inspect(reason)}")
1923
end)
2024
|> GRPCStream.run()
2125
end

lib/grpc/client/connection.ex

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
defmodule GRPC.Client.Connection do
22
@moduledoc """
3-
Connection manager for gRPC client channels, with optional **load balancing**
4-
and **name resolution** support.
3+
Connection manager for gRPC client channels, with optional load balancing
4+
and name resolution support.
55
66
A `Conn` process manages one or more underlying gRPC connections
7-
(`GRPC.Channel` structs) and exposes a **virtual channel** to be used by
7+
(`GRPC.Channel` structs) and exposes a virtual channel to be used by
88
client stubs. The orchestration process runs as a `GenServer` registered
9-
globally (via `:global`), so only one orchestrator exists **per connection**
9+
globally (via `:global`), so only one orchestrator exists per connection
1010
in a BEAM node.
1111
1212
## Overview

lib/grpc/client/resolver.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ defmodule GRPC.Client.Resolver do
22
@moduledoc """
33
Behaviour for gRPC client resolvers.
44
5-
A gRPC resolver is responsible for translating a **target string** into
5+
A gRPC resolver is responsible for translating a target string into
66
a list of connection endpoints (addresses) and an optional `ServiceConfig`.
77
88
gRPC supports multiple naming schemes, allowing clients to connect

lib/grpc/client/resolver/ipv6.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ defmodule GRPC.Client.Resolver.IPv6 do
99
1010
ipv6:[addr][:port][,[addr][:port],...]
1111
12-
- IPv6 addresses **must** be enclosed in square brackets (`[...]`).
12+
- IPv6 addresses must be enclosed in square brackets (`[...]`).
1313
- The port is optional; if not provided, the default port is `443`.
1414
- Multiple addresses can be comma-separated.
1515
- `service_config` is always `nil` as IPv6 literals do not support DNS TXT or xDS.

lib/grpc/client/resolver/unix.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ defmodule GRPC.Client.Resolver.Unix do
1010
1111
unix:///absolute/path/to/socket
1212
13-
- The scheme **must** be `unix`.
13+
- The scheme must be `unix`.
1414
- The path must be absolute (`/var/run/my.sock`).
1515
- The port is not used in Unix sockets; `:port` will be `nil`.
1616
- The socket type is indicated via `:socket => :unix`.

lib/grpc/client/service_config.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ defmodule GRPC.Client.ServiceConfig do
99
1010
According to the gRPC specification ([service_config.md](https://github.com/grpc/grpc/blob/master/doc/service_config.md)):
1111
12-
- **loadBalancingConfig**: a list of load balancing policies.
12+
- loadBalancingConfig: a list of load balancing policies.
1313
The client should pick the first policy it supports. Common values are:
1414
- `"pick_first"`: always pick the first server.
1515
- `"round_robin"`: distribute calls across servers in round-robin.
1616
17-
- **methodConfig**: a list of configurations applied to specific methods or services.
17+
- methodConfig: a list of configurations applied to specific methods or services.
1818
Each entry can include:
1919
- `"name"`: a list of `{ "service": "<service_name>", "method": "<method_name>" }`
2020
or `{ "service": "<service_name>" }` to match all methods in the service.

lib/grpc/protoc/cli.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ defmodule GRPC.Protoc.CLI do
22
@moduledoc """
33
`protoc` plugin for generating Elixir code.
44
5-
`protoc-gen-elixir` (this name is important) **must** be in `$PATH`. You are not supposed
5+
`protoc-gen-elixir` (this name is important) must be in `$PATH`. You are not supposed
66
to call it directly, but only through `protoc`.
77
88
## Examples

0 commit comments

Comments
 (0)