-
Notifications
You must be signed in to change notification settings - Fork 4
Initial rtun package #511
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Initial rtun package #511
Conversation
WalkthroughAdds RTUN protobufs and generated validators; implements a framed RTUN transport (Session, virtConn, listener, closedSet), gateway server and Dialer, registry/handler with auth/metrics, presence/directory/locator/route subsystems with in-memory implementations and tests; adjusts STS URL construction; vendors gRPC health; extends metrics API with observable gauges. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client as ReverseTunnel client
participant Handler as ReverseTunnel Handler
participant Registry as Registry
participant Session as Session
Note over Client,Handler: bidi RTUN Link established
Client->>Handler: Link(stream)
Handler->>Handler: ValidateAuth(ctx)
Client-->>Handler: Frame{Hello}
Handler->>Handler: ValidateHello(Hello)
Handler->>Session: NewSession(link)
Handler->>Registry: Register(clientID, session)
loop frames
Client-->>Handler: Frame{Syn/Data/Fin/Rst}
Handler->>Session: route frames
Session-->>Handler: frame events
Handler-->>Client: Frame{Data/Fin/Rst}
end
Client-->>Handler: disconnect
Handler->>Registry: Unregister(clientID)
sequenceDiagram
autonumber
participant Caller as Gateway Dialer
participant Gateway as Gateway Server
participant Registry as Registry
participant OwnerSession as Owner Session
Caller->>Gateway: Open(stream)
Caller->>Gateway: OpenRequest{gsid, client_id, port}
Gateway->>Registry: Lookup(client_id)
alt client not found
Gateway-->>Caller: OpenResponse{gsid, NotFound}
else client found
Gateway->>OwnerSession: Open(port)
OwnerSession-->>Gateway: net.Conn
Gateway-->>Caller: OpenResponse{gsid, Opened}
loop bridge frames
Caller-->>Gateway: Frame{Data/Fin/Rst}
Gateway->>OwnerSession: write/close
OwnerSession-->>Gateway: Frame{Data/Fin/Rst}
Gateway-->>Caller: Frame{Data/Fin/Rst}
end
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 12
🧹 Nitpick comments (12)
pkg/lambda/grpc/config/sts.go (1)
8-8: Good refactor: preferurl.URLfor endpoint construction.Using
url.URLwith explicitSchemeandHostfields is more idiomatic and safer than string formatting. This approach is less error-prone and aligns with Go best practices for URL construction.Note: This change is tangential to the main rtun PR but represents a worthwhile improvement to the codebase.
Also applies to: 26-29
pkg/rtun/match/route_test.go (1)
72-73: Check errors from presence operations
Presence.SetPortsandPresence.Announcecan fail (especially for future implementations); ignoring their errors will let the test pass even if the setup breaks. Userequire.NoErrorso the test fails loudly when these calls fail.pkg/rtun/match/memory/presence_test.go (1)
134-138: Assert Locations errors in isolation test.We currently drop the error returns, so a lookup failure would go unnoticed. Capture and assert the errors like the other tests.
- locs1, _ := p.Locations(ctx, "client-1") - locs2, _ := p.Locations(ctx, "client-2") + locs1, err := p.Locations(ctx, "client-1") + require.NoError(t, err) + locs2, err := p.Locations(ctx, "client-2") + require.NoError(t, err)pkg/rtun/gateway/client_conn_test.go (1)
58-59: Surface accept errors in testsIf
ln.Accept()fails, we silently push a nil conn and the subsequent test logic will panic with an opaque nil-pointer error. Capture the returned error and fail the test immediately so we get a clear diagnostic.- go func() { c, _ := ln.Accept(); accepted <- c }() + go func() { + c, err := ln.Accept() + require.NoError(t, err) + accepted <- c + }()pkg/rtun/server/handler.go (1)
137-140: Tidy comment punctuation for golint.
go-lintflagged the comment ongrpcLink; add a trailing period to silence the check.pkg/rtun/gateway/client.go (2)
146-149: Start the reader immediately to avoid inbound backpressureIf the caller writes before reading, the stream won’t be drained until the first Read, potentially stalling the remote. Start r.loop eagerly.
gc := &gatewayConn{ stream: stream, cc: cc, gsid: gsid, cancel: cancel, doneCh: doneCh, } - gc.r = newReader(stream, gsid, d.readBufferCap, doneCh) + gc.r = newReader(stream, gsid, d.readBufferCap, doneCh) + gc.r.start() gc.w = newWriter(stream, gsid, d.writeQueueCap, doneCh) return gc, nil
72-80: Validate port range earlyReject invalid ports (>65535 or ==0) before sending OpenRequest to avoid unnecessary gateway work.
func (d *Dialer) DialContext(ctx context.Context, clientID string, port uint32) (net.Conn, error) { + if port == 0 || port > 65535 { + return nil, fmt.Errorf("invalid port: %d", port) + }Also applies to: 95-104
proto/c1/connectorapi/rtun/v1/rtun.proto (1)
8-8: Request/response type lint: same message for both sidesBuf flags using the same type (Frame) for request and response. Two options:
- Option A (code change): Introduce wrappers and update RPC. Bigger ripple across code.
- Option B (config): Suppress this rule for this RPC in buf.yaml.
Option A (proto change):
service ReverseTunnelService { - rpc Link(stream Frame) returns (stream Frame); + rpc Link(stream ReverseTunnelLinkRequest) returns (stream ReverseTunnelLinkResponse); } +message ReverseTunnelLinkRequest { Frame frame = 1; } +message ReverseTunnelLinkResponse { Frame frame = 1; }Option B (buf config): Add a lint ignore for this RPC (request/response unique) to allow symmetric streaming without wrappers. I can draft the buf.yaml update if you prefer.
pkg/rtun/transport/conn.go (2)
154-176: Decrement active-SID gauge on local CloseMetrics gauge only decrements on remote FIN/RST. On local Close, decrement to keep gauge accurate.
func (c *virtConn) Close() error { c.writeMu.Lock() if c.writeClosed { c.writeMu.Unlock() return nil } c.writeClosed = true c.writeMu.Unlock() // send FIN (ack=false) _ = c.mux.link.Send(&rtunpb.Frame{Sid: c.sid, Kind: &rtunpb.Frame_Fin{Fin: &rtunpb.Fin{Ack: false}}}) // Unblock any pending Read by closing the read side c.closeReadOnce.Do(func() { c.readMu.Lock() if c.readErr == nil { c.readErr = io.EOF } c.readMu.Unlock() close(c.readCh) }) c.stopIdleTimer() c.mux.removeConn(c.sid) + if c.mux.m != nil { + c.mux.m.incSidsActive(c.mux.link.Context(), -1) + } return nil }
120-152: Write deadline cannot bound Link.Send blockingSetWriteDeadline only checks before each chunk; it cannot abort a blocked Send since Link lacks context. Consider extending Link with SendContext to honor write deadlines.
I can draft a minimal interface addition (SendContext/RecvContext) and thread ctx from deadlines if you want.
pkg/rtun/transport/session.go (2)
18-29: Remove dead pending-buffer plumbing (and option) or implement itFields and options for DATA-before-SYN buffering exist but aren’t used (pending map always empty; maxPendingSIDs unused; maxPendingBufferSize unused). This adds cognitive load.
Suggested cleanup:
- Remove options.WithMaxPendingSIDs and options.maxPendingSIDs.
- Drop Session.pending and related drain block.
- Remove maxPendingBufferSize constant.
- Update NewSession to not set maxPendingSIDs.
I can provide a full patch if you choose to trim this now.
Also applies to: 54-60, 96-101, 126-136, 196-201, 260-266
143-155: Listener opts are unusedListen(ctx, port, opts ...Option) ignores opts. Either wire them (per-listener config) or drop the parameter.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (4)
pb/c1/connectorapi/rtun/v1/gateway.pb.gois excluded by!**/*.pb.gopb/c1/connectorapi/rtun/v1/gateway_grpc.pb.gois excluded by!**/*.pb.gopb/c1/connectorapi/rtun/v1/rtun.pb.gois excluded by!**/*.pb.gopb/c1/connectorapi/rtun/v1/rtun_grpc.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (45)
pb/c1/connectorapi/rtun/v1/gateway.pb.validate.go(1 hunks)pb/c1/connectorapi/rtun/v1/rtun.pb.validate.go(1 hunks)pkg/lambda/grpc/config/sts.go(2 hunks)pkg/rtun/gateway/client.go(1 hunks)pkg/rtun/gateway/client_conn_test.go(1 hunks)pkg/rtun/gateway/errors.go(1 hunks)pkg/rtun/gateway/grpc_options.go(1 hunks)pkg/rtun/gateway/integration_test.go(1 hunks)pkg/rtun/gateway/metrics.go(1 hunks)pkg/rtun/gateway/options.go(1 hunks)pkg/rtun/gateway/server.go(1 hunks)pkg/rtun/match/directory.go(1 hunks)pkg/rtun/match/errors.go(1 hunks)pkg/rtun/match/locator.go(1 hunks)pkg/rtun/match/locator_test.go(1 hunks)pkg/rtun/match/memory/directory.go(1 hunks)pkg/rtun/match/memory/directory_test.go(1 hunks)pkg/rtun/match/memory/presence.go(1 hunks)pkg/rtun/match/memory/presence_test.go(1 hunks)pkg/rtun/match/presence.go(1 hunks)pkg/rtun/match/route.go(1 hunks)pkg/rtun/match/route_test.go(1 hunks)pkg/rtun/server/auth.go(1 hunks)pkg/rtun/server/grpc_options.go(1 hunks)pkg/rtun/server/handler.go(1 hunks)pkg/rtun/server/metrics.go(1 hunks)pkg/rtun/server/options.go(1 hunks)pkg/rtun/server/registry.go(1 hunks)pkg/rtun/server/server_integration_test.go(1 hunks)pkg/rtun/transport/closedset.go(1 hunks)pkg/rtun/transport/closedset_test.go(1 hunks)pkg/rtun/transport/conn.go(1 hunks)pkg/rtun/transport/conn_test.go(1 hunks)pkg/rtun/transport/errors.go(1 hunks)pkg/rtun/transport/listener.go(1 hunks)pkg/rtun/transport/session.go(1 hunks)pkg/rtun/transport/session_race_test.go(1 hunks)pkg/rtun/transport/session_test.go(1 hunks)proto/c1/connectorapi/rtun/v1/gateway.proto(1 hunks)proto/c1/connectorapi/rtun/v1/rtun.proto(1 hunks)vendor/google.golang.org/grpc/health/client.go(1 hunks)vendor/google.golang.org/grpc/health/logging.go(1 hunks)vendor/google.golang.org/grpc/health/producer.go(1 hunks)vendor/google.golang.org/grpc/health/server.go(1 hunks)vendor/modules.txt(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (33)
pkg/rtun/server/auth.go (1)
pb/c1/connectorapi/rtun/v1/rtun.pb.go (3)
Hello(217-223)Hello(236-236)Hello(251-253)
pkg/rtun/match/locator.go (4)
pkg/rtun/match/memory/presence.go (1)
Presence(12-16)pkg/rtun/match/presence.go (1)
Presence(15-21)pkg/rtun/match/directory.go (1)
ErrNotImplemented(9-9)pkg/rtun/match/errors.go (1)
ErrClientOffline(6-6)
pkg/rtun/gateway/grpc_options.go (2)
pkg/rtun/server/grpc_options.go (1)
RecommendedGRPCServerOptions(12-28)vendor/google.golang.org/grpc/server.go (2)
KeepaliveParams(294-303)KeepaliveEnforcementPolicy(306-310)
pkg/rtun/transport/conn_test.go (3)
pkg/rtun/transport/session.go (1)
NewSession(108-141)pb/c1/connectorapi/rtun/v1/rtun.pb.go (14)
Frame(79-92)Frame(105-105)Frame(120-122)Frame_Syn(191-193)Frame_Syn(209-209)Syn(269-274)Syn(287-287)Syn(302-304)Frame_Rst(203-205)Frame_Rst(215-215)Rst(401-406)Rst(419-419)Rst(434-436)RstCode_RST_CODE_INTERNAL(31-31)pkg/rtun/transport/errors.go (2)
ErrClosed(7-7)ErrConnReset(6-6)
pkg/rtun/gateway/metrics.go (2)
pkg/rtun/server/handler.go (1)
Handler(15-26)pkg/metrics/metrics.go (1)
Dimensionless(29-29)
pkg/rtun/match/locator_test.go (4)
pkg/rtun/match/memory/presence.go (2)
NewPresence(18-23)Presence(12-16)pkg/rtun/match/locator.go (1)
Locator(8-10)pkg/rtun/match/presence.go (1)
Presence(15-21)pkg/rtun/match/errors.go (1)
ErrClientOffline(6-6)
pkg/rtun/server/grpc_options.go (2)
pkg/rtun/gateway/grpc_options.go (1)
RecommendedGRPCServerOptions(12-25)vendor/google.golang.org/grpc/server.go (5)
MaxRecvMsgSize(407-411)MaxSendMsgSize(415-419)MaxConcurrentStreams(423-430)KeepaliveParams(294-303)KeepaliveEnforcementPolicy(306-310)
pkg/rtun/match/memory/directory.go (1)
pkg/rtun/match/directory.go (1)
Directory(11-15)
vendor/google.golang.org/grpc/health/server.go (1)
vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go (3)
HealthCheckResponse_SERVING(45-45)HealthCheckResponse_SERVICE_UNKNOWN(47-47)HealthCheckResponse_NOT_SERVING(46-46)
pkg/rtun/match/memory/presence_test.go (1)
pkg/rtun/match/memory/presence.go (1)
NewPresence(18-23)
pkg/rtun/transport/listener.go (2)
pkg/rtun/transport/session.go (1)
Session(85-105)pkg/rtun/transport/errors.go (1)
ErrClosed(7-7)
vendor/google.golang.org/grpc/health/producer.go (4)
vendor/google.golang.org/grpc/internal/internal.go (2)
RegisterClientHealthCheckListener(37-37)HealthCheckFunc(33-33)vendor/google.golang.org/grpc/clientconn.go (1)
ClientConnInterface(589-595)vendor/google.golang.org/grpc/balancer/subconn.go (2)
SubConn(51-92)SubConnState(108-117)vendor/google.golang.org/grpc/stream.go (1)
StreamDesc(65-76)
pkg/rtun/gateway/options.go (3)
pkg/rtun/server/handler.go (1)
Handler(15-26)pkg/rtun/server/options.go (2)
WithMetricsHandler(22-24)Option(15-15)pkg/rtun/transport/session.go (2)
WithMetricsHandler(70-74)Option(16-16)
pkg/rtun/server/options.go (3)
pkg/rtun/server/handler.go (1)
Handler(15-26)pkg/rtun/gateway/options.go (2)
WithMetricsHandler(14-16)Option(7-7)pkg/rtun/transport/session.go (2)
WithMetricsHandler(70-74)Option(16-16)
pkg/rtun/match/memory/presence.go (1)
pkg/rtun/match/presence.go (1)
Presence(15-21)
pkg/rtun/match/route.go (4)
pkg/rtun/match/locator.go (1)
Locator(8-10)pkg/rtun/match/directory.go (1)
Directory(11-15)vendor/google.golang.org/grpc/dialoptions.go (3)
DialOption(102-104)WithContextDialer(463-467)WithTransportCredentials(404-408)pkg/rtun/server/registry.go (1)
Registry(14-18)
pkg/rtun/server/metrics.go (2)
pkg/rtun/server/handler.go (1)
Handler(15-26)pkg/metrics/metrics.go (1)
Dimensionless(29-29)
vendor/google.golang.org/grpc/health/client.go (3)
vendor/google.golang.org/grpc/internal/internal.go (1)
HealthCheckFunc(33-33)vendor/google.golang.org/grpc/stream.go (1)
ClientStream(94-146)vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go (1)
HealthCheckResponse_SERVING(45-45)
pkg/rtun/server/registry.go (2)
pkg/rtun/transport/session.go (2)
Session(85-105)Option(16-16)pkg/rtun/server/options.go (1)
Option(15-15)
pkg/rtun/gateway/client_conn_test.go (6)
pkg/rtun/server/registry.go (1)
NewRegistry(20-30)pkg/rtun/server/handler.go (1)
NewHandler(28-41)pkg/rtun/gateway/server.go (2)
NewServer(29-45)Server(20-26)pkg/rtun/transport/session.go (2)
Link(78-82)NewSession(108-141)pkg/rtun/gateway/client.go (1)
NewDialer(56-70)pkg/rtun/transport/errors.go (1)
ErrClosed(7-7)
pkg/rtun/transport/session.go (2)
pb/c1/connectorapi/rtun/v1/rtun.pb.go (27)
Frame(79-92)Frame(105-105)Frame(120-122)Frame_Syn(191-193)Frame_Syn(209-209)Syn(269-274)Syn(287-287)Syn(302-304)Frame_Rst(203-205)Frame_Rst(215-215)Rst(401-406)Rst(419-419)Rst(434-436)RstCode_RST_CODE_NO_LISTENER(28-28)Frame_Data(195-197)Frame_Data(211-211)Data(313-318)Data(331-331)Data(346-348)RstCode_RST_CODE_INTERNAL(31-31)Frame_Fin(199-201)Frame_Fin(213-213)Fin(357-362)Fin(375-375)Fin(390-392)Frame_Hello(187-189)Frame_Hello(207-207)pkg/metrics/metrics.go (1)
Dimensionless(29-29)
pkg/rtun/transport/session_test.go (3)
pb/c1/connectorapi/rtun/v1/rtun.pb.go (21)
Frame(79-92)Frame(105-105)Frame(120-122)Frame_Syn(191-193)Frame_Syn(209-209)Syn(269-274)Syn(287-287)Syn(302-304)Frame_Data(195-197)Frame_Data(211-211)Data(313-318)Data(331-331)Data(346-348)Frame_Fin(199-201)Frame_Fin(213-213)Fin(357-362)Fin(375-375)Fin(390-392)Frame_Rst(203-205)Frame_Rst(215-215)RstCode_RST_CODE_NO_LISTENER(28-28)pkg/rtun/transport/session.go (1)
NewSession(108-141)pkg/rtun/transport/errors.go (1)
ErrTimeout(8-8)
pkg/rtun/transport/session_race_test.go (2)
pkg/rtun/transport/session.go (1)
NewSession(108-141)pb/c1/connectorapi/rtun/v1/rtun.pb.go (13)
Frame(79-92)Frame(105-105)Frame(120-122)Frame_Syn(191-193)Frame_Syn(209-209)Syn(269-274)Syn(287-287)Syn(302-304)Frame_Data(195-197)Frame_Data(211-211)Data(313-318)Data(331-331)Data(346-348)
pkg/rtun/match/memory/directory_test.go (1)
pkg/rtun/match/memory/directory.go (2)
NewDirectory(23-27)ErrServerNotFound(10-10)
pkg/rtun/match/route_test.go (8)
pkg/rtun/match/memory/presence.go (2)
NewPresence(18-23)Presence(12-16)pkg/rtun/match/memory/directory.go (2)
NewDirectory(23-27)Directory(13-16)pkg/rtun/server/registry.go (1)
NewRegistry(20-30)pkg/rtun/server/handler.go (1)
NewHandler(28-41)pkg/rtun/transport/session.go (2)
Link(78-82)NewSession(108-141)pkg/rtun/match/route.go (2)
OwnerRouter(16-20)LocalReverseDial(45-62)pkg/rtun/match/presence.go (1)
Presence(15-21)pkg/rtun/match/directory.go (1)
Directory(11-15)
pkg/rtun/gateway/server.go (5)
pb/c1/connectorapi/rtun/v1/gateway_grpc.pb.go (4)
UnimplementedReverseDialerServer(71-71)UnimplementedReverseDialerServer(73-75)UnimplementedReverseDialerServer(76-76)ReverseDialer_OpenServer(101-101)pkg/rtun/server/registry.go (1)
Registry(14-18)pkg/rtun/gateway/options.go (1)
Option(7-7)pb/c1/connectorapi/rtun/v1/gateway.pb.go (24)
GatewayRequest_OpenReq(95-97)GatewayRequest_OpenReq(103-103)GatewayResponse(108-117)GatewayResponse(130-130)GatewayResponse(145-147)GatewayResponse_Frame(182-184)GatewayResponse_Frame(188-188)GatewayResponse_OpenResp(178-180)GatewayResponse_OpenResp(186-186)OpenResponse(253-263)OpenResponse(276-276)OpenResponse(291-293)OpenResponse_NotFound(331-333)OpenResponse_NotFound(339-339)NotFound(343-347)NotFound(360-360)NotFound(375-377)OpenResponse_Opened(335-337)OpenResponse_Opened(341-341)Opened(379-383)Opened(396-396)Opened(411-413)GatewayRequest_Frame(99-101)GatewayRequest_Frame(105-105)pb/c1/connectorapi/rtun/v1/rtun.pb.go (19)
Frame(79-92)Frame(105-105)Frame(120-122)Frame_Rst(203-205)Frame_Rst(215-215)Rst(401-406)Rst(419-419)Rst(434-436)RstCode_RST_CODE_INTERNAL(31-31)Frame_Data(195-197)Frame_Data(211-211)Data(313-318)Data(331-331)Data(346-348)Frame_Fin(199-201)Frame_Fin(213-213)Fin(357-362)Fin(375-375)Fin(390-392)
pkg/rtun/server/server_integration_test.go (5)
pb/c1/connectorapi/rtun/v1/rtun.pb.go (8)
Hello(217-223)Hello(236-236)Hello(251-253)Frame(79-92)Frame(105-105)Frame(120-122)Frame_Hello(187-189)Frame_Hello(207-207)pb/c1/connectorapi/rtun/v1/rtun_grpc.pb.go (2)
ReverseTunnel_LinkClient(51-51)NewReverseTunnelClient(36-38)pkg/rtun/server/registry.go (1)
NewRegistry(20-30)pkg/rtun/server/handler.go (1)
NewHandler(28-41)pkg/rtun/transport/session.go (2)
Link(78-82)NewSession(108-141)
pkg/rtun/gateway/integration_test.go (9)
pb/c1/connectorapi/rtun/v1/rtun.pb.go (8)
Hello(217-223)Hello(236-236)Hello(251-253)Frame(79-92)Frame(105-105)Frame(120-122)Frame_Hello(187-189)Frame_Hello(207-207)pb/c1/connectorapi/rtun/v1/rtun_grpc.pb.go (3)
ReverseTunnel_LinkClient(51-51)RegisterReverseTunnelServer(79-88)NewReverseTunnelClient(36-38)pkg/rtun/server/registry.go (1)
NewRegistry(20-30)pkg/rtun/server/handler.go (1)
NewHandler(28-41)pkg/rtun/gateway/server.go (1)
NewServer(29-45)pb/c1/connectorapi/rtun/v1/gateway_grpc.pb.go (1)
RegisterReverseDialerServer(85-94)pkg/rtun/transport/session.go (2)
Link(78-82)NewSession(108-141)pkg/rtun/gateway/client.go (1)
NewDialer(56-70)pkg/rtun/gateway/errors.go (1)
ErrNotFound(6-6)
pkg/rtun/transport/conn.go (3)
pkg/rtun/transport/session.go (1)
Session(85-105)pkg/rtun/transport/errors.go (2)
ErrTimeout(8-8)ErrClosed(7-7)pb/c1/connectorapi/rtun/v1/rtun.pb.go (19)
Frame(79-92)Frame(105-105)Frame(120-122)Frame_Data(195-197)Frame_Data(211-211)Data(313-318)Data(331-331)Data(346-348)Frame_Fin(199-201)Frame_Fin(213-213)Fin(357-362)Fin(375-375)Fin(390-392)Frame_Rst(203-205)Frame_Rst(215-215)Rst(401-406)Rst(419-419)Rst(434-436)RstCode_RST_CODE_INTERNAL(31-31)
pb/c1/connectorapi/rtun/v1/rtun.pb.validate.go (1)
pb/c1/connectorapi/rtun/v1/rtun.pb.go (28)
Frame(79-92)Frame(105-105)Frame(120-122)Frame_Hello(187-189)Frame_Hello(207-207)Frame_Syn(191-193)Frame_Syn(209-209)Frame_Data(195-197)Frame_Data(211-211)Frame_Fin(199-201)Frame_Fin(213-213)Frame_Rst(203-205)Frame_Rst(215-215)Hello(217-223)Hello(236-236)Hello(251-253)Syn(269-274)Syn(287-287)Syn(302-304)Data(313-318)Data(331-331)Data(346-348)Fin(357-362)Fin(375-375)Fin(390-392)Rst(401-406)Rst(419-419)Rst(434-436)
pkg/rtun/gateway/client.go (7)
vendor/google.golang.org/grpc/clientconn.go (1)
DialContext(239-318)vendor/google.golang.org/grpc/dialoptions.go (1)
WithTransportCredentials(404-408)pb/c1/connectorapi/rtun/v1/gateway_grpc.pb.go (2)
NewReverseDialerClient(39-41)ReverseDialer_OpenClient(54-54)pb/c1/connectorapi/rtun/v1/gateway.pb.go (14)
GatewayRequest(25-34)GatewayRequest(47-47)GatewayRequest(62-64)GatewayRequest_OpenReq(95-97)GatewayRequest_OpenReq(103-103)OpenRequest(192-199)OpenRequest(212-212)OpenRequest(227-229)OpenResponse_NotFound(331-333)OpenResponse_NotFound(339-339)OpenResponse_Opened(335-337)OpenResponse_Opened(341-341)GatewayRequest_Frame(99-101)GatewayRequest_Frame(105-105)pkg/rtun/gateway/errors.go (2)
ErrProtocol(8-8)ErrNotFound(6-6)pb/c1/connectorapi/rtun/v1/rtun.pb.go (18)
Frame_Data(195-197)Frame_Data(211-211)Data(313-318)Data(331-331)Data(346-348)Frame_Fin(199-201)Frame_Fin(213-213)Frame_Rst(203-205)Frame_Rst(215-215)Rst(401-406)Rst(419-419)Rst(434-436)Frame(79-92)Frame(105-105)Frame(120-122)Fin(357-362)Fin(375-375)Fin(390-392)pkg/rtun/transport/errors.go (1)
ErrClosed(7-7)
pkg/rtun/server/handler.go (6)
pb/c1/connectorapi/rtun/v1/rtun_grpc.pb.go (5)
UnimplementedReverseTunnelServer(65-65)UnimplementedReverseTunnelServer(67-69)UnimplementedReverseTunnelServer(70-70)ReverseTunnelServer(56-58)ReverseTunnel_LinkServer(95-95)pkg/rtun/server/registry.go (1)
Registry(14-18)pkg/rtun/server/auth.go (1)
TokenValidator(10-16)pkg/rtun/server/options.go (4)
Option(15-15)ErrProtocol(11-11)ErrHelloTimeout(12-12)WithMetricsHandler(22-24)pkg/rtun/transport/session.go (5)
Option(16-16)Link(78-82)WithAllowedPorts(39-52)WithMetricsHandler(70-74)NewSession(108-141)pb/c1/connectorapi/rtun/v1/rtun.pb.go (6)
Frame(79-92)Frame(105-105)Frame(120-122)Hello(217-223)Hello(236-236)Hello(251-253)
pb/c1/connectorapi/rtun/v1/gateway.pb.validate.go (1)
pb/c1/connectorapi/rtun/v1/gateway.pb.go (30)
GatewayRequest(25-34)GatewayRequest(47-47)GatewayRequest(62-64)GatewayRequest_OpenReq(95-97)GatewayRequest_OpenReq(103-103)GatewayRequest_Frame(99-101)GatewayRequest_Frame(105-105)GatewayResponse(108-117)GatewayResponse(130-130)GatewayResponse(145-147)GatewayResponse_OpenResp(178-180)GatewayResponse_OpenResp(186-186)GatewayResponse_Frame(182-184)GatewayResponse_Frame(188-188)OpenRequest(192-199)OpenRequest(212-212)OpenRequest(227-229)OpenResponse(253-263)OpenResponse(276-276)OpenResponse(291-293)OpenResponse_NotFound(331-333)OpenResponse_NotFound(339-339)OpenResponse_Opened(335-337)OpenResponse_Opened(341-341)NotFound(343-347)NotFound(360-360)NotFound(375-377)Opened(379-383)Opened(396-396)Opened(411-413)
🪛 GitHub Check: buf-lint-and-breaking-change-detection
proto/c1/connectorapi/rtun/v1/gateway.proto
[failure] 12-12:
RPC response type "GatewayResponse" should be named "OpenResponse" or "ReverseDialerOpenResponse".
[failure] 12-12:
RPC request type "GatewayRequest" should be named "OpenRequest" or "ReverseDialerOpenRequest".
[failure] 11-11:
Service name "ReverseDialer" should be suffixed with "Service".
proto/c1/connectorapi/rtun/v1/rtun.proto
[failure] 8-8:
RPC response type "Frame" should be named "LinkResponse" or "ReverseTunnelLinkResponse".
[failure] 8-8:
RPC request type "Frame" should be named "LinkRequest" or "ReverseTunnelLinkRequest".
[failure] 8-8:
RPC "Link" has the same type "c1.connectorapi.rtun.v1.Frame" for the request and response.
[failure] 7-7:
Service name "ReverseTunnel" should be suffixed with "Service".
🪛 GitHub Check: go-lint
pkg/rtun/transport/conn_test.go
[failure] 44-44:
comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
pkg/rtun/gateway/server.go
[failure] 273-273:
comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
[failure] 138-138:
comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
pkg/rtun/gateway/client.go
[failure] 150-150:
Error return value of stream.CloseSend is not checked (errcheck)
[failure] 131-131:
Error return value of stream.CloseSend is not checked (errcheck)
[failure] 123-123:
Error return value of stream.CloseSend is not checked (errcheck)
[failure] 117-117:
Error return value of stream.CloseSend is not checked (errcheck)
[failure] 109-109:
Error return value of stream.CloseSend is not checked (errcheck)
[failure] 100-100:
Error return value of stream.CloseSend is not checked (errcheck)
pkg/rtun/server/handler.go
[failure] 137-137:
Comment should end in a period (godot)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (5)
pkg/rtun/match/errors.go (1)
5-7: Sentinel looks good.Nice to have a package-scoped
ErrClientOfflinewith a contextual message for upstream matching.pkg/rtun/transport/errors.go (1)
5-8: Consistent transport error surface.The trio of sentinel errors with rtun-prefixed messages is clear and ready for callers to match.
pkg/rtun/match/presence.go (1)
8-21: Interface shape is clear.Docstring and method set give implementations precise guidance on lease + port responsibilities.
vendor/google.golang.org/grpc/health/logging.go (1)
19-23: Logger wiring matches upstream.Initializing the health component logger keeps vendor code consistent with gRPC defaults.
pkg/rtun/match/memory/presence.go (1)
25-33: LGTM: Announce TTL handling and locking look correctStraightforward in‑mem presence; copies ports; prunes expired leases lazily. No issues found.
pkg/rtun/gateway/grpc_options.go
Outdated
| // RecommendedGRPCServerOptions returns server options enabling basic keepalive and | ||
| // reasonable message size limits suitable for the gateway service. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update the docstring to reflect actual behavior.
The comment mentions "reasonable message size limits" but the function does not set MaxRecvMsgSize, MaxSendMsgSize, or MaxConcurrentStreams, unlike the analogous function in pkg/rtun/server/grpc_options.go (lines 14-16). If message size limits are intentionally omitted for the gateway, update the docstring to remove this claim. Otherwise, consider adding them.
Apply this diff if limits should be added:
func RecommendedGRPCServerOptions() []grpc.ServerOption {
return []grpc.ServerOption{
+ grpc.MaxRecvMsgSize(4 * 1024 * 1024),
+ grpc.MaxSendMsgSize(4 * 1024 * 1024),
+ grpc.MaxConcurrentStreams(250),
grpc.KeepaliveParams(keepalive.ServerParameters{Or update the docstring:
-// RecommendedGRPCServerOptions returns server options enabling basic keepalive and
-// reasonable message size limits suitable for the gateway service.
+// RecommendedGRPCServerOptions returns server options enabling basic keepalive
+// suitable for the gateway service.🤖 Prompt for AI Agents
In pkg/rtun/gateway/grpc_options.go around lines 10-11, the function docstring
wrongly claims it sets "reasonable message size limits" even though
MaxRecvMsgSize, MaxSendMsgSize, and MaxConcurrentStreams are not configured;
update the comment to accurately describe behavior (e.g., state it returns
server options enabling basic keepalive settings without altering message size
or concurrent stream limits) or alternatively, if you intended to enforce
limits, add the same MaxRecvMsgSize/MaxSendMsgSize/MaxConcurrentStreams options
as used in pkg/rtun/server/grpc_options.go so the comment matches
implementation.
| time.Sleep(50 * time.Millisecond) | ||
|
|
||
| // Remote caller: use gateway.Dialer to get net.Conn to client | ||
| dialer := NewDialer(lA.Addr().String(), insecure.NewCredentials(), nil) | ||
| gwConn, err := dialer.DialContext(ctx, "client-123", 1) | ||
| require.NoError(t, err) | ||
| defer gwConn.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retry dial instead of sleeping
The hard-coded 50 ms pause makes the test timing fragile. Use require.Eventually to retry DialContext until registration is ready, eliminating flakiness from slow environments.
Apply this diff:
- // Wait for registration
- time.Sleep(50 * time.Millisecond)
-
- // Remote caller: use gateway.Dialer to get net.Conn to client
- dialer := NewDialer(lA.Addr().String(), insecure.NewCredentials(), nil)
- gwConn, err := dialer.DialContext(ctx, "client-123", 1)
- require.NoError(t, err)
+ // Remote caller: wait for registration by retrying through the gateway dialer
+ dialer := NewDialer(lA.Addr().String(), insecure.NewCredentials(), nil)
+ var gwConn net.Conn
+ require.Eventually(t, func() bool {
+ conn, err := dialer.DialContext(ctx, "client-123", 1)
+ if err != nil {
+ return false
+ }
+ gwConn = conn
+ return true
+ }, time.Second, 10*time.Millisecond)
+ require.NotNil(t, gwConn)
defer gwConn.Close()Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In pkg/rtun/gateway/integration_test.go around lines 82 to 88, replace the
hard-coded time.Sleep(50 * time.Millisecond) and single DialContext call with a
retry using require.Eventually: repeatedly call dialer.DialContext(ctx,
"client-123", 1) until it returns a non-nil connection and no error (or until a
sensible timeout like a few seconds) and inside the closure assert err == nil
and gwConn != nil, then assign and defer Close after the require.Eventually
succeeds; use a short poll interval (e.g., 10–50ms) and a longer overall timeout
to avoid flakiness in slow environments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/rtun/transport/conn.go (1)
430-433: Synchronizesidscounter updates.
incSidsActivemutatesm.sidsfrom multiple goroutines (Open, FIN/RST handling) without any synchronization, which is a data race. Guard the counter with a mutex or atomics before observing the gauge.type transportMetrics struct { framesRx sdkmetrics.Int64Counter @@ - sids int64 + mu sync.Mutex + sids int64 } @@ func (m *transportMetrics) incSidsActive(ctx context.Context, delta int64) { - m.sids += delta - m.sidsGauge.Observe(ctx, m.sids, nil) + m.mu.Lock() + m.sids += delta + cur := m.sids + m.mu.Unlock() + m.sidsGauge.Observe(ctx, cur, nil) }
♻️ Duplicate comments (2)
pkg/rtun/gateway/client.go (1)
95-154: HandleCloseSenderrorsWe ignore
CloseSendin every failure path, trippingerrcheckand potentially masking transport errors. Assign the result to_(or handle it) at each call site so the linter passes and we don’t drop a real error.@@ - stream.CloseSend() + _ = stream.CloseSend() @@ - stream.CloseSend() + _ = stream.CloseSend() @@ - stream.CloseSend() + _ = stream.CloseSend() @@ - stream.CloseSend() + _ = stream.CloseSend() @@ - stream.CloseSend() + _ = stream.CloseSend() @@ - stream.CloseSend() + _ = stream.CloseSend()pkg/rtun/transport/conn_test.go (1)
14-45: Useerrors.Isfor wrapped errorsDirect
==misses wrapped EOF/closed errors and failserrorlint. Importerrorsand switch toerrors.Isso the assertion survives wrapping and unblocks lint.@@ -import ( - "context" - "io" - "net" - "testing" - "time" +import ( + "context" + "errors" + "io" + "net" + "testing" + "time" @@ - _, err = c.Read(make([]byte, 1)) - require.True(t, err == io.EOF || err == net.ErrClosed) + _, err = c.Read(make([]byte, 1)) + require.True(t, errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (4)
pb/c1/connectorapi/rtun/v1/gateway.pb.gois excluded by!**/*.pb.gopb/c1/connectorapi/rtun/v1/gateway_grpc.pb.gois excluded by!**/*.pb.gopb/c1/connectorapi/rtun/v1/rtun.pb.gois excluded by!**/*.pb.gopb/c1/connectorapi/rtun/v1/rtun_grpc.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (16)
pb/c1/connectorapi/rtun/v1/gateway.pb.validate.go(1 hunks)pb/c1/connectorapi/rtun/v1/rtun.pb.validate.go(1 hunks)pkg/rtun/gateway/client.go(1 hunks)pkg/rtun/gateway/client_conn_test.go(1 hunks)pkg/rtun/gateway/integration_test.go(1 hunks)pkg/rtun/gateway/server.go(1 hunks)pkg/rtun/match/route_test.go(1 hunks)pkg/rtun/server/handler.go(1 hunks)pkg/rtun/server/server_integration_test.go(1 hunks)pkg/rtun/transport/conn.go(1 hunks)pkg/rtun/transport/conn_test.go(1 hunks)pkg/rtun/transport/errors.go(1 hunks)pkg/rtun/transport/listener.go(1 hunks)pkg/rtun/transport/session.go(1 hunks)proto/c1/connectorapi/rtun/v1/gateway.proto(1 hunks)proto/c1/connectorapi/rtun/v1/rtun.proto(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- pkg/rtun/server/server_integration_test.go
- pkg/rtun/gateway/integration_test.go
- pkg/rtun/gateway/client_conn_test.go
🧰 Additional context used
🧬 Code graph analysis (10)
pkg/rtun/transport/conn_test.go (3)
pkg/rtun/transport/session.go (1)
NewSession(108-141)pb/c1/connectorapi/rtun/v1/rtun.pb.go (14)
Frame(167-180)Frame(193-193)Frame(208-210)Frame_Syn(279-281)Frame_Syn(297-297)Syn(357-362)Syn(375-375)Syn(390-392)Frame_Rst(291-293)Frame_Rst(303-303)Rst(489-494)Rst(507-507)Rst(522-524)RstCode_RST_CODE_INTERNAL(31-31)pkg/rtun/transport/errors.go (1)
ErrConnReset(6-6)
pkg/rtun/transport/listener.go (1)
pkg/rtun/transport/session.go (1)
Session(85-105)
pkg/rtun/gateway/client.go (6)
vendor/google.golang.org/grpc/clientconn.go (1)
DialContext(239-318)vendor/google.golang.org/grpc/dialoptions.go (1)
WithTransportCredentials(404-408)pb/c1/connectorapi/rtun/v1/gateway_grpc.pb.go (2)
NewReverseDialerServiceClient(39-41)ReverseDialerService_OpenClient(54-54)pb/c1/connectorapi/rtun/v1/gateway.pb.go (14)
ReverseDialerServiceOpenRequest(25-34)ReverseDialerServiceOpenRequest(47-47)ReverseDialerServiceOpenRequest(62-64)ReverseDialerServiceOpenRequest_OpenReq(95-97)ReverseDialerServiceOpenRequest_OpenReq(103-103)OpenRequest(192-199)OpenRequest(212-212)OpenRequest(227-229)OpenResponse_NotFound(331-333)OpenResponse_NotFound(339-339)OpenResponse_Opened(335-337)OpenResponse_Opened(341-341)ReverseDialerServiceOpenRequest_Frame(99-101)ReverseDialerServiceOpenRequest_Frame(105-105)pkg/rtun/gateway/errors.go (2)
ErrProtocol(8-8)ErrNotFound(6-6)pb/c1/connectorapi/rtun/v1/rtun.pb.go (18)
Frame_Data(283-285)Frame_Data(299-299)Data(401-406)Data(419-419)Data(434-436)Frame_Fin(287-289)Frame_Fin(301-301)Frame_Rst(291-293)Frame_Rst(303-303)Rst(489-494)Rst(507-507)Rst(522-524)Frame(167-180)Frame(193-193)Frame(208-210)Fin(445-450)Fin(463-463)Fin(478-480)
pkg/rtun/transport/conn.go (3)
pkg/rtun/transport/session.go (1)
Session(85-105)pkg/rtun/transport/errors.go (1)
ErrTimeout(7-7)pb/c1/connectorapi/rtun/v1/rtun.pb.go (19)
Frame(167-180)Frame(193-193)Frame(208-210)Frame_Data(283-285)Frame_Data(299-299)Data(401-406)Data(419-419)Data(434-436)Frame_Fin(287-289)Frame_Fin(301-301)Fin(445-450)Fin(463-463)Fin(478-480)Frame_Rst(291-293)Frame_Rst(303-303)Rst(489-494)Rst(507-507)Rst(522-524)RstCode_RST_CODE_INTERNAL(31-31)
pkg/rtun/match/route_test.go (11)
pb/c1/connectorapi/rtun/v1/rtun.pb.go (11)
Hello(305-311)Hello(324-324)Hello(339-341)Frame(167-180)Frame(193-193)Frame(208-210)Frame_Hello(275-277)Frame_Hello(295-295)ReverseTunnelServiceLinkRequest(79-84)ReverseTunnelServiceLinkRequest(97-97)ReverseTunnelServiceLinkRequest(112-114)pkg/rtun/match/memory/presence.go (2)
NewPresence(18-23)Presence(12-16)pkg/rtun/match/memory/directory.go (2)
NewDirectory(23-27)Directory(13-16)pkg/rtun/server/registry.go (1)
NewRegistry(20-30)pkg/rtun/server/handler.go (1)
NewHandler(28-41)pb/c1/connectorapi/rtun/v1/rtun_grpc.pb.go (3)
RegisterReverseTunnelServiceServer(79-88)NewReverseTunnelServiceClient(36-38)ReverseTunnelService_LinkClient(51-51)pkg/rtun/transport/session.go (2)
Link(78-82)NewSession(108-141)vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go (1)
HealthCheckResponse_SERVING(45-45)pkg/rtun/match/route.go (2)
OwnerRouter(16-20)LocalReverseDial(45-62)pkg/rtun/match/presence.go (1)
Presence(15-21)pkg/rtun/match/directory.go (1)
Directory(11-15)
pkg/rtun/transport/session.go (4)
pkg/rtun/server/handler.go (1)
Handler(15-26)pb/c1/connectorapi/rtun/v1/rtun.pb.go (27)
Frame(167-180)Frame(193-193)Frame(208-210)Frame_Syn(279-281)Frame_Syn(297-297)Syn(357-362)Syn(375-375)Syn(390-392)Frame_Rst(291-293)Frame_Rst(303-303)Rst(489-494)Rst(507-507)Rst(522-524)RstCode_RST_CODE_NO_LISTENER(28-28)Frame_Data(283-285)Frame_Data(299-299)Data(401-406)Data(419-419)Data(434-436)RstCode_RST_CODE_INTERNAL(31-31)Frame_Fin(287-289)Frame_Fin(301-301)Fin(445-450)Fin(463-463)Fin(478-480)Frame_Hello(275-277)Frame_Hello(295-295)pkg/rtun/transport/errors.go (1)
ErrConnReset(6-6)pkg/metrics/metrics.go (1)
Dimensionless(29-29)
pkg/rtun/gateway/server.go (4)
pb/c1/connectorapi/rtun/v1/gateway_grpc.pb.go (4)
UnimplementedReverseDialerServiceServer(71-71)UnimplementedReverseDialerServiceServer(73-75)UnimplementedReverseDialerServiceServer(76-76)ReverseDialerService_OpenServer(101-101)pkg/rtun/server/registry.go (1)
Registry(14-18)pb/c1/connectorapi/rtun/v1/gateway.pb.go (24)
ReverseDialerServiceOpenRequest_OpenReq(95-97)ReverseDialerServiceOpenRequest_OpenReq(103-103)ReverseDialerServiceOpenResponse(108-117)ReverseDialerServiceOpenResponse(130-130)ReverseDialerServiceOpenResponse(145-147)ReverseDialerServiceOpenResponse_Frame(182-184)ReverseDialerServiceOpenResponse_Frame(188-188)ReverseDialerServiceOpenResponse_OpenResp(178-180)ReverseDialerServiceOpenResponse_OpenResp(186-186)OpenResponse(253-263)OpenResponse(276-276)OpenResponse(291-293)OpenResponse_NotFound(331-333)OpenResponse_NotFound(339-339)NotFound(343-347)NotFound(360-360)NotFound(375-377)OpenResponse_Opened(335-337)OpenResponse_Opened(341-341)Opened(379-383)Opened(396-396)Opened(411-413)ReverseDialerServiceOpenRequest_Frame(99-101)ReverseDialerServiceOpenRequest_Frame(105-105)pb/c1/connectorapi/rtun/v1/rtun.pb.go (19)
Frame(167-180)Frame(193-193)Frame(208-210)Frame_Rst(291-293)Frame_Rst(303-303)Rst(489-494)Rst(507-507)Rst(522-524)RstCode_RST_CODE_INTERNAL(31-31)Frame_Data(283-285)Frame_Data(299-299)Data(401-406)Data(419-419)Data(434-436)Frame_Fin(287-289)Frame_Fin(301-301)Fin(445-450)Fin(463-463)Fin(478-480)
pkg/rtun/server/handler.go (5)
pb/c1/connectorapi/rtun/v1/rtun_grpc.pb.go (5)
UnimplementedReverseTunnelServiceServer(65-65)UnimplementedReverseTunnelServiceServer(67-69)UnimplementedReverseTunnelServiceServer(70-70)ReverseTunnelServiceServer(56-58)ReverseTunnelService_LinkServer(95-95)pkg/rtun/server/registry.go (1)
Registry(14-18)pkg/rtun/server/auth.go (1)
TokenValidator(10-16)pkg/rtun/transport/session.go (5)
Option(16-16)Link(78-82)WithAllowedPorts(39-52)WithMetricsHandler(70-74)NewSession(108-141)pkg/rtun/server/options.go (4)
Option(15-15)ErrProtocol(11-11)ErrHelloTimeout(12-12)WithMetricsHandler(22-24)
pb/c1/connectorapi/rtun/v1/rtun.pb.validate.go (1)
pb/c1/connectorapi/rtun/v1/rtun.pb.go (34)
ReverseTunnelServiceLinkRequest(79-84)ReverseTunnelServiceLinkRequest(97-97)ReverseTunnelServiceLinkRequest(112-114)ReverseTunnelServiceLinkResponse(123-128)ReverseTunnelServiceLinkResponse(141-141)ReverseTunnelServiceLinkResponse(156-158)Frame(167-180)Frame(193-193)Frame(208-210)Frame_Hello(275-277)Frame_Hello(295-295)Frame_Syn(279-281)Frame_Syn(297-297)Frame_Data(283-285)Frame_Data(299-299)Frame_Fin(287-289)Frame_Fin(301-301)Frame_Rst(291-293)Frame_Rst(303-303)Hello(305-311)Hello(324-324)Hello(339-341)Syn(357-362)Syn(375-375)Syn(390-392)Data(401-406)Data(419-419)Data(434-436)Fin(445-450)Fin(463-463)Fin(478-480)Rst(489-494)Rst(507-507)Rst(522-524)
pb/c1/connectorapi/rtun/v1/gateway.pb.validate.go (1)
pb/c1/connectorapi/rtun/v1/gateway.pb.go (30)
ReverseDialerServiceOpenRequest(25-34)ReverseDialerServiceOpenRequest(47-47)ReverseDialerServiceOpenRequest(62-64)ReverseDialerServiceOpenRequest_OpenReq(95-97)ReverseDialerServiceOpenRequest_OpenReq(103-103)ReverseDialerServiceOpenRequest_Frame(99-101)ReverseDialerServiceOpenRequest_Frame(105-105)ReverseDialerServiceOpenResponse(108-117)ReverseDialerServiceOpenResponse(130-130)ReverseDialerServiceOpenResponse(145-147)ReverseDialerServiceOpenResponse_OpenResp(178-180)ReverseDialerServiceOpenResponse_OpenResp(186-186)ReverseDialerServiceOpenResponse_Frame(182-184)ReverseDialerServiceOpenResponse_Frame(188-188)OpenRequest(192-199)OpenRequest(212-212)OpenRequest(227-229)OpenResponse(253-263)OpenResponse(276-276)OpenResponse(291-293)OpenResponse_NotFound(331-333)OpenResponse_NotFound(339-339)OpenResponse_Opened(335-337)OpenResponse_Opened(341-341)NotFound(343-347)NotFound(360-360)NotFound(375-377)Opened(379-383)Opened(396-396)Opened(411-413)
🪛 GitHub Check: go-lint
pkg/rtun/transport/conn_test.go
[failure] 44-44:
comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
pkg/rtun/gateway/client.go
[failure] 150-150:
Error return value of stream.CloseSend is not checked (errcheck)
[failure] 131-131:
Error return value of stream.CloseSend is not checked (errcheck)
[failure] 123-123:
Error return value of stream.CloseSend is not checked (errcheck)
[failure] 117-117:
Error return value of stream.CloseSend is not checked (errcheck)
[failure] 109-109:
Error return value of stream.CloseSend is not checked (errcheck)
[failure] 100-100:
Error return value of stream.CloseSend is not checked (errcheck)
pkg/rtun/transport/session.go
[failure] 402-402:
Comment should end in a period (godot)
pkg/rtun/server/handler.go
[failure] 137-137:
Comment should end in a period (godot)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: go-test (1.25.2, windows-latest)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
pkg/rtun/gateway/client.go (1)
100-100: Past review issue addressed.The unchecked
stream.CloseSend()errors flagged in the past review have been fixed with explicit error discards (_ =).Also applies to: 109-109, 117-117, 123-123, 131-131, 150-150
🧹 Nitpick comments (3)
pkg/rtun/match/locator.go (1)
15-15: Consider removing unused named returns.The named returns
serverID,ports, anderrare declared but never assigned to directly (you use explicitreturnstatements). Removing them would simplify the signature without changing behavior.Apply this diff:
-func (l *Locator) OwnerOf(ctx context.Context, clientID string) (serverID string, ports []uint32, err error) { +func (l *Locator) OwnerOf(ctx context.Context, clientID string) (string, []uint32, error) {pkg/rtun/match/memory/directory.go (1)
45-45: Consider removing unused named returns.The named returns
addranderrare declared but never assigned to directly. Removing them would simplify the signature.Apply this diff:
-func (d *Directory) Resolve(ctx context.Context, serverID string) (addr string, err error) { +func (d *Directory) Resolve(ctx context.Context, serverID string) (string, error) {pkg/rtun/transport/session.go (1)
95-95: Consider removing or documenting the unusedpendingbuffer.The
pendingmap is declared, initialized (line 131), and drained (lines 261-266) but never populated. The DATA handler (lines 287-295) explicitly sends RST for DATA-before-SYN instead of buffering. Either remove this dead code or document its intended future use.Apply this diff to remove the unused field and related code:
listeners map[uint32]*rtunListener nextSID uint32 - pending map[uint32][][]byte // queued DATA before SYN processed closed closedSet // closed SIDs for late-frame detections := &Session{ link: link, conns: make(map[uint32]*virtConn), listeners: make(map[uint32]*rtunListener), nextSID: 1, - pending: make(map[uint32][][]byte), logger: logger,s.conns[sid] = vc vc.startIdleTimer() // capture new connection count under lock for gauge update newCount := len(s.conns) - // Drain any pending data queued before SYN - if q := s.pending[sid]; len(q) > 0 { - for _, p := range q { - vc.feedData(p) - } - delete(s.pending, sid) - } s.mu.Unlock()Alternatively, if buffering DATA-before-SYN is planned, document this in a TODO comment.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (17)
pkg/rtun/gateway/client.go(1 hunks)pkg/rtun/match/directory.go(1 hunks)pkg/rtun/match/errors.go(1 hunks)pkg/rtun/match/locator.go(1 hunks)pkg/rtun/match/memory/directory.go(1 hunks)pkg/rtun/match/memory/presence.go(1 hunks)pkg/rtun/match/route_test.go(1 hunks)pkg/rtun/server/handler.go(1 hunks)pkg/rtun/server/options.go(1 hunks)pkg/rtun/server/registry.go(1 hunks)pkg/rtun/transport/conn.go(1 hunks)pkg/rtun/transport/conn_test.go(1 hunks)pkg/rtun/transport/errors.go(1 hunks)pkg/rtun/transport/listener.go(1 hunks)pkg/rtun/transport/session.go(1 hunks)pkg/rtun/transport/session_race_test.go(1 hunks)pkg/rtun/transport/session_test.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- pkg/rtun/match/memory/presence.go
- pkg/rtun/transport/listener.go
- pkg/rtun/server/registry.go
- pkg/rtun/transport/errors.go
- pkg/rtun/server/options.go
🧰 Additional context used
🧬 Code graph analysis (10)
pkg/rtun/transport/session_test.go (3)
pb/c1/connectorapi/rtun/v1/rtun.pb.go (21)
Frame(167-180)Frame(193-193)Frame(208-210)Frame_Syn(279-281)Frame_Syn(297-297)Syn(357-362)Syn(375-375)Syn(390-392)Frame_Data(283-285)Frame_Data(299-299)Data(401-406)Data(419-419)Data(434-436)Frame_Fin(287-289)Frame_Fin(301-301)Fin(445-450)Fin(463-463)Fin(478-480)Frame_Rst(291-293)Frame_Rst(303-303)RstCode_RST_CODE_NO_LISTENER(28-28)pkg/rtun/transport/session.go (1)
NewSession(108-141)pkg/rtun/transport/errors.go (1)
ErrTimeout(9-9)
pkg/rtun/transport/conn_test.go (3)
pkg/rtun/transport/session.go (1)
NewSession(108-141)pb/c1/connectorapi/rtun/v1/rtun.pb.go (14)
Frame(167-180)Frame(193-193)Frame(208-210)Frame_Syn(279-281)Frame_Syn(297-297)Syn(357-362)Syn(375-375)Syn(390-392)Frame_Rst(291-293)Frame_Rst(303-303)Rst(489-494)Rst(507-507)Rst(522-524)RstCode_RST_CODE_INTERNAL(31-31)pkg/rtun/transport/errors.go (1)
ErrConnReset(7-7)
pkg/rtun/transport/session_race_test.go (2)
pkg/rtun/transport/session.go (1)
NewSession(108-141)pb/c1/connectorapi/rtun/v1/rtun.pb.go (13)
Frame(167-180)Frame(193-193)Frame(208-210)Frame_Syn(279-281)Frame_Syn(297-297)Syn(357-362)Syn(375-375)Syn(390-392)Frame_Data(283-285)Frame_Data(299-299)Data(401-406)Data(419-419)Data(434-436)
pkg/rtun/gateway/client.go (6)
vendor/google.golang.org/grpc/clientconn.go (1)
DialContext(239-318)vendor/google.golang.org/grpc/dialoptions.go (1)
WithTransportCredentials(404-408)pb/c1/connectorapi/rtun/v1/gateway_grpc.pb.go (2)
NewReverseDialerServiceClient(39-41)ReverseDialerService_OpenClient(54-54)pb/c1/connectorapi/rtun/v1/gateway.pb.go (14)
ReverseDialerServiceOpenRequest(25-34)ReverseDialerServiceOpenRequest(47-47)ReverseDialerServiceOpenRequest(62-64)ReverseDialerServiceOpenRequest_OpenReq(95-97)ReverseDialerServiceOpenRequest_OpenReq(103-103)OpenRequest(192-199)OpenRequest(212-212)OpenRequest(227-229)OpenResponse_NotFound(331-333)OpenResponse_NotFound(339-339)OpenResponse_Opened(335-337)OpenResponse_Opened(341-341)ReverseDialerServiceOpenRequest_Frame(99-101)ReverseDialerServiceOpenRequest_Frame(105-105)pkg/rtun/gateway/errors.go (2)
ErrProtocol(8-8)ErrNotFound(6-6)pb/c1/connectorapi/rtun/v1/rtun.pb.go (18)
Frame_Data(283-285)Frame_Data(299-299)Data(401-406)Data(419-419)Data(434-436)Frame_Fin(287-289)Frame_Fin(301-301)Frame_Rst(291-293)Frame_Rst(303-303)Rst(489-494)Rst(507-507)Rst(522-524)Frame(167-180)Frame(193-193)Frame(208-210)Fin(445-450)Fin(463-463)Fin(478-480)
pkg/rtun/match/locator.go (4)
pkg/rtun/match/memory/presence.go (1)
Presence(12-16)pkg/rtun/match/presence.go (1)
Presence(15-21)pkg/rtun/match/directory.go (1)
ErrNotImplemented(10-10)pkg/rtun/match/errors.go (1)
ErrClientOffline(7-7)
pkg/rtun/transport/conn.go (3)
pkg/rtun/transport/session.go (1)
Session(85-105)pkg/rtun/transport/errors.go (1)
ErrTimeout(9-9)pb/c1/connectorapi/rtun/v1/rtun.pb.go (19)
Frame(167-180)Frame(193-193)Frame(208-210)Frame_Data(283-285)Frame_Data(299-299)Data(401-406)Data(419-419)Data(434-436)Frame_Fin(287-289)Frame_Fin(301-301)Fin(445-450)Fin(463-463)Fin(478-480)Frame_Rst(291-293)Frame_Rst(303-303)Rst(489-494)Rst(507-507)Rst(522-524)RstCode_RST_CODE_INTERNAL(31-31)
pkg/rtun/match/memory/directory.go (1)
pkg/rtun/match/directory.go (1)
Directory(13-20)
pkg/rtun/transport/session.go (2)
pb/c1/connectorapi/rtun/v1/rtun.pb.go (27)
Frame(167-180)Frame(193-193)Frame(208-210)Frame_Syn(279-281)Frame_Syn(297-297)Syn(357-362)Syn(375-375)Syn(390-392)Frame_Rst(291-293)Frame_Rst(303-303)Rst(489-494)Rst(507-507)Rst(522-524)RstCode_RST_CODE_NO_LISTENER(28-28)Frame_Data(283-285)Frame_Data(299-299)Data(401-406)Data(419-419)Data(434-436)RstCode_RST_CODE_INTERNAL(31-31)Frame_Fin(287-289)Frame_Fin(301-301)Fin(445-450)Fin(463-463)Fin(478-480)Frame_Hello(275-277)Frame_Hello(295-295)pkg/metrics/metrics.go (1)
Dimensionless(29-29)
pkg/rtun/server/handler.go (5)
pb/c1/connectorapi/rtun/v1/rtun_grpc.pb.go (5)
UnimplementedReverseTunnelServiceServer(65-65)UnimplementedReverseTunnelServiceServer(67-69)UnimplementedReverseTunnelServiceServer(70-70)ReverseTunnelServiceServer(56-58)ReverseTunnelService_LinkServer(95-95)pkg/rtun/server/registry.go (1)
Registry(15-19)pkg/rtun/server/auth.go (1)
TokenValidator(10-16)pkg/rtun/server/options.go (4)
Option(19-19)ErrProtocol(13-13)ErrHelloTimeout(15-15)WithMetricsHandler(26-28)pkg/rtun/transport/session.go (5)
Option(16-16)Link(78-82)WithAllowedPorts(39-52)WithMetricsHandler(70-74)NewSession(108-141)
pkg/rtun/match/route_test.go (9)
pb/c1/connectorapi/rtun/v1/rtun.pb.go (11)
Hello(305-311)Hello(324-324)Hello(339-341)Frame(167-180)Frame(193-193)Frame(208-210)Frame_Hello(275-277)Frame_Hello(295-295)ReverseTunnelServiceLinkRequest(79-84)ReverseTunnelServiceLinkRequest(97-97)ReverseTunnelServiceLinkRequest(112-114)pkg/rtun/match/memory/presence.go (2)
NewPresence(19-24)Presence(12-16)pkg/rtun/match/memory/directory.go (2)
NewDirectory(25-29)Directory(14-17)pkg/rtun/server/registry.go (1)
NewRegistry(22-32)pkg/rtun/server/handler.go (1)
NewHandler(30-43)pb/c1/connectorapi/rtun/v1/rtun_grpc.pb.go (2)
NewReverseTunnelServiceClient(36-38)ReverseTunnelService_LinkClient(51-51)pkg/rtun/transport/session.go (2)
Link(78-82)NewSession(108-141)pkg/rtun/match/route.go (2)
OwnerRouter(16-20)LocalReverseDial(45-62)pkg/rtun/match/presence.go (1)
Presence(15-21)
🪛 GitHub Check: go-lint
pkg/rtun/gateway/client.go
[failure] 78-78:
SA1019: grpc.DialContext is deprecated: use NewClient instead. Will be supported throughout 1.x. (staticcheck)
pkg/rtun/match/locator.go
[failure] 15-15:
named return "serverID" with type "string" found (nonamedreturns)
pkg/rtun/match/memory/directory.go
[failure] 45-45:
named return "addr" with type "string" found (nonamedreturns)
pkg/rtun/match/route_test.go
[failure] 40-40:
net.Listen must not be called. use (*net.ListenConfig).Listen (noctx)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (33)
pkg/rtun/match/errors.go (1)
1-8: LGTM!The error sentinel is well-defined with clear documentation. The naming follows Go conventions and accurately describes the offline-client condition.
pkg/rtun/match/directory.go (1)
1-20: LGTM!The Directory interface is well-designed with context-aware methods and clear documentation. The TTL-based advertisement model aligns with the presence/routing architecture described in the PR objectives.
pkg/rtun/match/locator.go (2)
16-32: LGTM!The owner selection logic is correct. The nil checks, early-return error handling, and deterministic rendezvous hashing ensure consistent and safe owner resolution.
34-51: LGTM!The rendezvous hashing implementation is correct and efficient. FNV-1a provides fast, deterministic selection, and the delimiter prevents hash collisions. The
haveflag properly handles the edge case of an empty server list.pkg/rtun/match/memory/directory.go (6)
1-11: LGTM!The error sentinel is properly defined with clear documentation.
13-22: LGTM!The data structures are appropriate for an in-memory directory. The RWMutex provides efficient concurrent access.
24-29: LGTM!The constructor properly initializes the servers map.
31-36: LGTM!The Advertise method correctly stores the server record with proper locking and TTL calculation.
38-43: LGTM!The Revoke method correctly removes the server with proper locking.
46-58: LGTM!The Resolve method correctly handles TTL expiration with proper eager cleanup and appropriate locking (write lock needed for deletion).
pkg/rtun/match/route_test.go (7)
20-24: LGTM!The test validator provides minimal, appropriate behavior for integration testing.
29-45: LGTM!The server setup is appropriate for an integration test. The timeout context and proper error checking on
Advertiseensure the test fails fast if setup fails.
47-62: LGTM!The client session setup correctly establishes the RTUN link, sends the HELLO frame, and obtains a listener. Error handling is appropriate.
64-73: LGTM!The health service setup is correct, and presence operations now properly check errors (addressing the previous review comment).
75-98: LGTM!The routing and reverse dial logic correctly demonstrates the end-to-end flow. The health check confirms successful connectivity through the RTUN reverse tunnel.
100-108: LGTM!The cleanup sequence is correct, ensuring resources are released in the proper order.
111-128: LGTM!The
clientLinkadapter correctly bridges the gRPC stream to the transport.Link interface.pkg/rtun/gateway/client.go (9)
19-23: LGTM: Reasonable buffer and chunk size defaults.The default buffer capacities (16) and chunk size (32 KB) are appropriate for typical reverse-dial scenarios.
25-70: LGTM: Clean options pattern with validation.The
Dialerconfiguration follows idiomatic Go patterns. Options properly validate positive capacities before applying changes.
129-154: LGTM: Proper OpenResponse handling with cleanup.The switch correctly handles NotFound, Opened, and unexpected responses. Error paths properly cancel context and close resources before returning.
159-179: Past review issue resolved: deadline race condition fixed.The race condition on deadline fields flagged in the past review has been properly addressed. The implementation uses separate mutexes (
readMuforrdDeadline,writeMuforwrDeadline) with correct locking inRead(),Write(), and theSet*Deadline()methods. This approach is valid and upholds thenet.Connconcurrency contract.Also applies to: 393-395, 448-450, 510-532
186-275: LGTM: Sound reader implementation with proper synchronization.The reader correctly uses
sync.Onceto ensure a single receive goroutine, copies payloads to prevent data races (line 236), and properly handles EOF via Fin frames and errors via Rst frames.
441-473: LGTM: Write properly synchronized and chunked.The
Writemethod correctly holdswriteMufor the duration, snapshots the deadline, chunks data to respect frame size limits, and copies data before enqueueing to prevent data races. Partial write tracking is handled properly.
477-505: LGTM: Idempotent Close with documented best-effort Fin.The
Closemethod correctly usessync.Oncefor idempotency and implements intentional best-effort Fin sending (line 482 comment). Ignoring Send errors during teardown is reasonable since resources are released regardless and thenet.Conncontract doesn't require reliable Close error propagation in this scenario.
346-379: LGTM: Thorough deadline handling in enqueue.The
enqueuemethod properly handles write deadlines with fast-fail for expired deadlines, timer cleanup, and context cancellation awareness. The dual select structure correctly handles both deadline and no-deadline cases.
381-428: LGTM: Read correctly handles deadlines and partial data.The
Readmethod properly:
- Consumes remainder buffer under lock
- Snapshots deadline to prevent races
- Classifies errors (deadline/canceled/EOF vs connection closed vs other)
- Handles partial reads by storing remainder
pkg/rtun/transport/session_race_test.go (1)
12-96: LGTM! Excellent concurrency test coverage.The race tests provide solid coverage of concurrent operations, late-frame handling, and context cancellation scenarios. The tests are appropriately designed to catch panics, deadlocks, and race conditions rather than validating specific protocol behaviors.
pkg/rtun/transport/session_test.go (1)
14-308: LGTM! Comprehensive session behavior coverage.The test suite thoroughly validates the transport session's lifecycle, deadline handling, half-close semantics, fragmentation, and backpressure behavior. The testLink harness provides a clean mock implementation for testing frame flows.
pkg/rtun/transport/conn_test.go (1)
15-94: LGTM! Tests correctly validate connection lifecycle.The virtual connection tests properly cover close idempotency, write-after-close behavior, RST propagation, and address handling. The past review comment regarding
errors.Isusage has been addressed correctly on line 45.pkg/rtun/server/handler.go (1)
14-156: LGTM! Handler correctly implements the ReverseTunnel service.The implementation properly validates authentication before HELLO, enforces a 15-second HELLO timeout using a goroutine, validates port limits, and manages session lifecycle with appropriate cleanup. The past review comment regarding the doc comment period has been addressed.
pkg/rtun/transport/conn.go (1)
15-310: LGTM! Virtual connection implementation is correct.The virtConn properly implements net.Conn with comprehensive deadline handling, idle timer management, and half-close semantics. The past review comment regarding write-side closure during overflow has been addressed—feedData now delegates to handleRst (line 231), which properly closes both read and write sides (line 255).
pkg/rtun/transport/session.go (2)
200-329: LGTM! Session recv loop correctly implements protocol handling.The frame processing logic properly handles SYN (with duplicate detection), DATA, FIN, and RST frames. The protocol enforcement is solid: SID 0 validation, duplicate SYN detection as fatal error, proper listener resolution, and late-frame detection via the closed set.
334-385: LGTM! Open method correctly implements reverse dialing.The reverse connection initiation properly checks context cancellation, enforces port allowlists, allocates SIDs sequentially, sends SYN with cleanup on failure, and starts idle timers with metrics tracking.
| if msg.fin { | ||
| _ = w.stream.Send(&rtunpb.ReverseDialerServiceOpenRequest{Kind: &rtunpb.ReverseDialerServiceOpenRequest_Frame{ | ||
| Frame: &rtunpb.Frame{Sid: w.gsid, Kind: &rtunpb.Frame_Fin{Fin: &rtunpb.Fin{}}}, | ||
| }}) | ||
| continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle or document ignored Fin send error.
The writer loop ignores the error when sending a Fin frame, unlike Data frames which properly handle errors. If this is intentional best-effort behavior, add a comment explaining why. Otherwise, handle the error consistently.
Apply this diff to handle the error:
case msg := <-w.ch:
if msg.fin {
- _ = w.stream.Send(&rtunpb.ReverseDialerServiceOpenRequest{Kind: &rtunpb.ReverseDialerServiceOpenRequest_Frame{
+ if err := w.stream.Send(&rtunpb.ReverseDialerServiceOpenRequest{Kind: &rtunpb.ReverseDialerServiceOpenRequest_Frame{
Frame: &rtunpb.Frame{Sid: w.gsid, Kind: &rtunpb.Frame_Fin{Fin: &rtunpb.Fin{}}},
- }})
- continue
+ }}); err != nil {
+ w.setErr(err)
+ return
+ }
+ return // Fin is final, exit loop
}Or, if best-effort is intentional, add:
if msg.fin {
+ // Best-effort Fin: ignore errors during teardown
_ = w.stream.Send(&rtunpb.ReverseDialerServiceOpenRequest{Kind: &rtunpb.ReverseDialerServiceOpenRequest_Frame{📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if msg.fin { | |
| _ = w.stream.Send(&rtunpb.ReverseDialerServiceOpenRequest{Kind: &rtunpb.ReverseDialerServiceOpenRequest_Frame{ | |
| Frame: &rtunpb.Frame{Sid: w.gsid, Kind: &rtunpb.Frame_Fin{Fin: &rtunpb.Fin{}}}, | |
| }}) | |
| continue | |
| case msg := <-w.ch: | |
| if msg.fin { | |
| if err := w.stream.Send(&rtunpb.ReverseDialerServiceOpenRequest{ | |
| Kind: &rtunpb.ReverseDialerServiceOpenRequest_Frame{ | |
| Frame: &rtunpb.Frame{Sid: w.gsid, Kind: &rtunpb.Frame_Fin{Fin: &rtunpb.Fin{}}}, | |
| }, | |
| }); err != nil { | |
| w.setErr(err) | |
| return | |
| } | |
| return // Fin is final, exit loop | |
| } |
🤖 Prompt for AI Agents
In pkg/rtun/gateway/client.go around lines 326-330 the Send call for the Fin
frame ignores its returned error; either handle it the same way Data-frame send
errors are handled (capture the error, log/assign it to the writer state, clean
up/close the writer and break/return from the writer loop) or, if this is
intentionally best-effort, replace the suppressed error with a clear comment
explaining why failures can be ignored and what invariants make that safe
(include expected consequences and that no further recovery is required). Ensure
the action you choose matches the surrounding error-handling pattern for Data
frames.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
♻️ Duplicate comments (2)
pkg/rtun/server/server_integration_test.go (1)
84-88: Replace the fixed sleep withrequire.EventuallyA hard-coded 50 ms pause makes this integration test race-prone on slow CI. Retry the reverse dial with
require.Eventually, storing the first successful connection and asserting it’s non-nil afterward.- // Wait briefly for registration to finish - time.Sleep(50 * time.Millisecond) - rconn, err := reg.DialContext(context.Background(), "rtun://client-123:1") - require.NoError(t, err) - defer rconn.Close() + var rconn net.Conn + require.Eventually(t, func() bool { + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + conn, err := reg.DialContext(ctx, "rtun://client-123:1") + if err != nil { + return false + } + rconn = conn + return true + }, time.Second, 10*time.Millisecond) + require.NotNil(t, rconn) + defer rconn.Close()pkg/rtun/gateway/integration_test.go (1)
91-99: Retry the gateway dial instead of sleepingThe 50 ms sleep can flake under scheduler variance. Repeatedly attempt the gateway dial with
require.Eventually, keeping the first successful connection for the rest of the test.- // Wait for registration - time.Sleep(50 * time.Millisecond) - - // Remote caller: use gateway.Dialer to get net.Conn to client - dialer := NewDialer(lA.Addr().String(), insecure.NewCredentials(), nil) - gwConn, err := dialer.DialContext(ctx, "client-123", 1) - require.NoError(t, err) - defer gwConn.Close() + // Remote caller: wait for registration by retrying the gateway dial + dialer := NewDialer(lA.Addr().String(), insecure.NewCredentials(), nil) + var gwConn net.Conn + require.Eventually(t, func() bool { + attemptCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer cancel() + conn, err := dialer.DialContext(attemptCtx, "client-123", 1) + if err != nil { + return false + } + gwConn = conn + return true + }, time.Second, 10*time.Millisecond) + require.NotNil(t, gwConn) + defer gwConn.Close()
🧹 Nitpick comments (1)
pkg/rtun/match/route_test.go (1)
26-109: Excellent end-to-end test coverage.This integration test comprehensively exercises the two-server routing scenario: Server A owns a client, Server B discovers and dials A via OwnerRouter, then reverse-dials the client through A's registry. The test correctly validates the health check over the reverse connection and cleans up resources in proper order.
Optional: Replace sleep with explicit readiness check.
Line 82 uses
time.Sleep(50 * time.Millisecond)to wait for registration. While acceptable for in-process tests, consider using an explicit readiness check or retry loop to eliminate potential flakiness in slow CI environments:- // Wait briefly for registration - time.Sleep(50 * time.Millisecond) + // Wait for registration with retry + var ownerConn *grpc.ClientConn + var ownerID string + require.Eventually(t, func() bool { + var err error + ownerConn, ownerID, err = router.DialOwner(ctx, "client-123") + return err == nil + }, 2*time.Second, 10*time.Millisecond, "owner should be discoverable") + require.Equal(t, "server-a", ownerID)Then remove the separate DialOwner call at lines 84-86.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
pkg/metrics/metrics.go(1 hunks)pkg/metrics/noop.go(1 hunks)pkg/metrics/otel.go(1 hunks)pkg/rtun/gateway/client.go(1 hunks)pkg/rtun/gateway/client_conn_test.go(1 hunks)pkg/rtun/gateway/integration_test.go(1 hunks)pkg/rtun/match/directory.go(1 hunks)pkg/rtun/match/locator.go(1 hunks)pkg/rtun/match/memory/directory.go(1 hunks)pkg/rtun/match/route.go(1 hunks)pkg/rtun/match/route_test.go(1 hunks)pkg/rtun/server/server_integration_test.go(1 hunks)pkg/rtun/transport/session.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- pkg/rtun/match/memory/directory.go
- pkg/rtun/match/locator.go
🧰 Additional context used
🧬 Code graph analysis (9)
pkg/metrics/noop.go (1)
pkg/metrics/metrics.go (1)
Unit(30-30)
pkg/metrics/otel.go (4)
pkg/metrics/metrics.go (1)
Unit(30-30)vendor/go.opentelemetry.io/otel/metric/asyncint64.go (1)
Int64ObservableGauge(146-153)vendor/go.opentelemetry.io/otel/metric/instrument.go (3)
WithDescription(112-112)WithUnit(189-189)WithAttributes(364-368)vendor/go.opentelemetry.io/otel/metric/meter.go (1)
Observer(249-260)
pkg/rtun/gateway/client.go (6)
vendor/google.golang.org/grpc/clientconn.go (1)
DialContext(239-318)vendor/google.golang.org/grpc/dialoptions.go (1)
WithTransportCredentials(404-408)pb/c1/connectorapi/rtun/v1/gateway_grpc.pb.go (2)
NewReverseDialerServiceClient(39-41)ReverseDialerService_OpenClient(54-54)pb/c1/connectorapi/rtun/v1/gateway.pb.go (14)
ReverseDialerServiceOpenRequest(25-34)ReverseDialerServiceOpenRequest(47-47)ReverseDialerServiceOpenRequest(62-64)ReverseDialerServiceOpenRequest_OpenReq(95-97)ReverseDialerServiceOpenRequest_OpenReq(103-103)OpenRequest(192-199)OpenRequest(212-212)OpenRequest(227-229)OpenResponse_NotFound(331-333)OpenResponse_NotFound(339-339)OpenResponse_Opened(335-337)OpenResponse_Opened(341-341)ReverseDialerServiceOpenRequest_Frame(99-101)ReverseDialerServiceOpenRequest_Frame(105-105)pkg/rtun/gateway/errors.go (2)
ErrProtocol(8-8)ErrNotFound(6-6)pb/c1/connectorapi/rtun/v1/rtun.pb.go (18)
Frame_Data(283-285)Frame_Data(299-299)Data(401-406)Data(419-419)Data(434-436)Frame_Fin(287-289)Frame_Fin(301-301)Frame_Rst(291-293)Frame_Rst(303-303)Rst(489-494)Rst(507-507)Rst(522-524)Frame(167-180)Frame(193-193)Frame(208-210)Fin(445-450)Fin(463-463)Fin(478-480)
pkg/rtun/match/route.go (5)
pkg/rtun/match/locator.go (1)
Locator(9-11)pkg/rtun/match/directory.go (1)
Directory(13-20)vendor/google.golang.org/grpc/dialoptions.go (3)
DialOption(102-104)WithContextDialer(463-467)WithTransportCredentials(404-408)pkg/rtun/server/registry.go (1)
Registry(15-19)vendor/google.golang.org/grpc/clientconn.go (1)
DialContext(239-318)
pkg/rtun/match/route_test.go (10)
pb/c1/connectorapi/rtun/v1/rtun.pb.go (11)
Hello(305-311)Hello(324-324)Hello(339-341)Frame(167-180)Frame(193-193)Frame(208-210)Frame_Hello(275-277)Frame_Hello(295-295)ReverseTunnelServiceLinkRequest(79-84)ReverseTunnelServiceLinkRequest(97-97)ReverseTunnelServiceLinkRequest(112-114)pkg/rtun/match/memory/presence.go (2)
NewPresence(19-24)Presence(12-16)pkg/rtun/match/memory/directory.go (2)
NewDirectory(25-29)Directory(14-17)pkg/rtun/server/registry.go (1)
NewRegistry(22-32)pkg/rtun/server/handler.go (1)
NewHandler(30-43)pkg/rtun/transport/session.go (2)
Link(78-82)NewSession(108-146)pkg/rtun/match/route.go (2)
OwnerRouter(16-20)LocalReverseDial(45-62)pkg/rtun/match/locator.go (1)
Locator(9-11)pkg/rtun/match/presence.go (1)
Presence(15-21)pkg/rtun/match/directory.go (1)
Directory(13-20)
pkg/rtun/gateway/client_conn_test.go (5)
pkg/rtun/server/registry.go (1)
NewRegistry(22-32)pkg/rtun/server/handler.go (1)
NewHandler(30-43)pkg/rtun/gateway/server.go (2)
NewServer(30-46)Server(21-27)pkg/rtun/transport/session.go (2)
Link(78-82)NewSession(108-146)pkg/rtun/gateway/client.go (1)
NewDialer(56-70)
pkg/rtun/server/server_integration_test.go (7)
pb/c1/connectorapi/rtun/v1/rtun.pb.go (11)
Hello(305-311)Hello(324-324)Hello(339-341)Frame(167-180)Frame(193-193)Frame(208-210)ReverseTunnelServiceLinkRequest(79-84)ReverseTunnelServiceLinkRequest(97-97)ReverseTunnelServiceLinkRequest(112-114)Frame_Hello(275-277)Frame_Hello(295-295)pb/c1/connectorapi/rtun/v1/rtun_grpc.pb.go (2)
ReverseTunnelService_LinkClient(51-51)NewReverseTunnelServiceClient(36-38)pkg/rtun/server/registry.go (1)
NewRegistry(22-32)pkg/rtun/server/handler.go (1)
NewHandler(30-43)vendor/google.golang.org/grpc/health/server.go (1)
NewServer(46-51)pkg/rtun/transport/session.go (2)
Link(78-82)NewSession(108-146)vendor/google.golang.org/grpc/health/grpc_health_v1/health.pb.go (1)
HealthCheckResponse_SERVING(45-45)
pkg/rtun/gateway/integration_test.go (8)
pb/c1/connectorapi/rtun/v1/rtun.pb.go (11)
Hello(305-311)Hello(324-324)Hello(339-341)Frame(167-180)Frame(193-193)Frame(208-210)ReverseTunnelServiceLinkRequest(79-84)ReverseTunnelServiceLinkRequest(97-97)ReverseTunnelServiceLinkRequest(112-114)Frame_Hello(275-277)Frame_Hello(295-295)pb/c1/connectorapi/rtun/v1/rtun_grpc.pb.go (2)
ReverseTunnelService_LinkClient(51-51)NewReverseTunnelServiceClient(36-38)pkg/rtun/server/registry.go (1)
NewRegistry(22-32)pkg/rtun/server/handler.go (1)
NewHandler(30-43)pkg/rtun/gateway/server.go (1)
NewServer(30-46)pkg/rtun/transport/session.go (2)
Link(78-82)NewSession(108-146)pkg/rtun/gateway/client.go (1)
NewDialer(56-70)pkg/rtun/gateway/errors.go (1)
ErrNotFound(6-6)
pkg/rtun/transport/session.go (3)
pkg/metrics/metrics.go (4)
Handler(7-16)Int64Counter(18-20)Dimensionless(33-33)Bytes(34-34)pb/c1/connectorapi/rtun/v1/rtun.pb.go (27)
Frame(167-180)Frame(193-193)Frame(208-210)Frame_Syn(279-281)Frame_Syn(297-297)Syn(357-362)Syn(375-375)Syn(390-392)Frame_Rst(291-293)Frame_Rst(303-303)Rst(489-494)Rst(507-507)Rst(522-524)RstCode_RST_CODE_NO_LISTENER(28-28)Frame_Data(283-285)Frame_Data(299-299)Data(401-406)Data(419-419)Data(434-436)RstCode_RST_CODE_INTERNAL(31-31)Frame_Fin(287-289)Frame_Fin(301-301)Fin(445-450)Fin(463-463)Fin(478-480)Frame_Hello(275-277)Frame_Hello(295-295)pkg/rtun/transport/errors.go (1)
ErrConnReset(7-7)
🪛 GitHub Check: go-lint
pkg/rtun/transport/session.go
[failure] 203-203:
const maxPendingBufferSize is unused (unused)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (8)
pkg/rtun/match/directory.go (1)
12-20: LGTM! Clean interface design.The Directory interface provides a clear contract for server advertisement and resolution with TTL support. The method signatures are idiomatic and the documentation clearly explains the intended behavior.
pkg/rtun/match/route.go (2)
15-20: LGTM! Well-structured routing helper.The OwnerRouter design cleanly separates the concerns of locating owners (Locator), resolving addresses (Directory), and configuring gRPC dial options. This makes it easy to test and compose.
22-40: Note: Connection is established lazily.
grpc.NewClientreturns immediately without blocking; the actual connection happens on first RPC or when explicitly awaited. The function name "DialOwner" might imply synchronous dialing, but this is the standard non-blocking gRPC pattern. Callers should be aware that network errors may surface later during RPC invocation rather than at dial time.Consider documenting this behavior in the function's godoc if you expect callers to need immediate connection validation. Otherwise, the current design is idiomatic for gRPC.
pkg/rtun/match/route_test.go (2)
20-24: LGTM! Clean test validator.The testValidator stub appropriately returns fixed values for integration testing without external dependencies.
111-128: LGTM! Correct adapter implementation.The clientLink type correctly bridges the gRPC bidi stream to the transport.Link interface. The Send/Recv methods properly wrap/unwrap frames, and Context delegation is appropriate.
pkg/rtun/gateway/client.go (2)
78-80: LGTM: Deprecated API replaced correctly.The code now uses
grpc.NewClientwith thepassthrough:///scheme prefix instead of the deprecatedgrpc.DialContext. This non-blocking client creation is appropriate for the reverse-dial use case, where the actual connection attempt happens duringclient.Open(streamCtx).
177-179: LGTM: Clever deadline synchronization strategy.The implementation protects deadline fields using the existing
readMuforrdDeadlineandwriteMuforwrDeadline, rather than introducing a separate deadline mutex. This is actually better than a dedicated lock:
Read()snapshotsrdDeadlineunderreadMu(lines 393-395)Write()snapshotswrDeadlineunderwriteMuand holds it for the write duration (lines 448-450)SetReadDeadlineacquiresreadMu(lines 521-523)SetWriteDeadlineacquireswriteMu(lines 528-530)SetDeadlineacquires both locks sequentially (lines 511-516)This approach avoids data races while minimizing lock contention and eliminating the need for an additional mutex.
Also applies to: 393-395, 448-448, 510-532
pkg/rtun/transport/session.go (1)
289-295: Defensive programming choice: DATA-before-SYN rejection.The implementation treats DATA-before-SYN as a protocol error and immediately sends RST rather than buffering the data. This is a sound defensive choice that enforces strict protocol ordering and avoids the complexity and potential DoS vectors of pre-SYN buffering.
This aligns with the principle of fail-fast for protocol violations and simplifies the state machine by requiring SYN to establish the connection before any DATA can be processed.
| // LocalReverseDial is a helper to be called ON the owner server process. | ||
| // It uses the local Registry to open a reverse connection to the client. | ||
| // clientID must be URL-safe; use url.PathEscape if it contains special characters. | ||
| func LocalReverseDial(ctx context.Context, reg *server.Registry, clientID string, port uint32) (*grpc.ClientConn, error) { | ||
| u := url.URL{ | ||
| Scheme: "rtun", | ||
| Host: net.JoinHostPort( | ||
| clientID, | ||
| strconv.FormatUint(uint64(port), 10), | ||
| ), | ||
| } | ||
| addr := u.String() | ||
| conn, err := grpc.NewClient("passthrough:///"+addr, | ||
| grpc.WithContextDialer(reg.DialContext), | ||
| grpc.WithTransportCredentials(insecure.NewCredentials()), | ||
| ) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("rtun: reverse dial: %w", err) | ||
| } | ||
| return conn, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
🧩 Analysis chain
Verify callers properly escape clientID.
The function documents that clientID must be URL-safe (line 44), but the code at lines 48-50 uses clientID directly in net.JoinHostPort. If a caller passes an unescaped clientID containing ":" (e.g., "user:org"), the resulting host becomes ambiguous ("user:org:1"). While the comment places responsibility on the caller, this is error-prone.
Run the following script to check if callers are properly escaping clientID before calling LocalReverseDial:
Consider either:
- Enforcing validation: Add a check that returns an error if clientID contains ":" or other reserved characters.
- Auto-escaping: Apply
url.PathEscape(clientID)within the function and document that callers pass raw IDs. - Stronger documentation: If neither option is feasible, add a clear example to the godoc showing the escaping pattern.
🏁 Script executed:
#!/bin/bash
# Description: Find all calls to LocalReverseDial and check if clientID is escaped
# Expected: Calls should use url.PathEscape or already-safe identifiers
rg -n -A2 -B2 'LocalReverseDial\(' --type=goLength of output: 826
Auto-escape or validate clientID in LocalReverseDial
Use url.PathEscape(clientID) (e.g., escID := url.PathEscape(clientID)) before building the URL, or return an error if clientID contains reserved characters (like :). Update the godoc to state that raw IDs are accepted and will be escaped or validated.
🤖 Prompt for AI Agents
In pkg/rtun/match/route.go around lines 42 to 62, LocalReverseDial currently
inserts clientID directly into a URL which can break for reserved characters;
update the function to call url.PathEscape(clientID) (e.g., escID :=
url.PathEscape(clientID)) and use escID when building the Host/addr so the URL
is safe, and update the godoc comment to state that raw client IDs are accepted
and will be URL-escaped; alternatively, if you prefer strict validation, detect
reserved characters (like ':') and return a clear error instead of proceeding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
pkg/rtun/server/handler.go (2)
27-40: Consider validating required parameters in the constructor.The constructor accepts
tv TokenValidatorwithout validating it's non-nil, butLinkchecks for nil at line 48. For fail-fast behavior and clearer contracts, validate required parameters in the constructor.func NewHandler(reg *Registry, serverID string, tv TokenValidator, opts ...Option) rtunpb.ReverseTunnelServiceServer { + if reg == nil || tv == nil { + panic("rtun/server: Registry and TokenValidator are required") + } var o options for _, opt := range opts {Then remove the redundant nil check in
Link:func (h *Handler) Link(stream rtunpb.ReverseTunnelService_LinkServer) error { // Wrap the gRPC stream as transport.Link l := &grpcLink{srv: stream} // Authenticate and determine clientID via TokenValidator BEFORE waiting for HELLO. - if h.tv == nil { - return ErrProtocol - } clientID, err := h.tv.ValidateAuth(stream.Context())
68-68: Consider extracting magic numbers to constants.The HELLO timeout (15 seconds) and port limit (2500) are hardcoded. Extracting these to package-level constants improves discoverability and maintainability.
At package level:
const ( defaultHelloTimeout = 15 * time.Second maxHelloPorts = 2500 )Then use them in the code:
- helloTimeout := 15 * time.Second + helloTimeout := defaultHelloTimeout- if len(hello.GetPorts()) > 2500 { + if len(hello.GetPorts()) > maxHelloPorts {Also applies to: 85-85
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
pkg/rtun/server/handler.go(1 hunks)pkg/rtun/transport/session.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
pkg/rtun/transport/session.go (3)
pkg/metrics/metrics.go (4)
Handler(7-16)Int64Counter(18-20)Dimensionless(33-33)Bytes(34-34)pb/c1/connectorapi/rtun/v1/rtun.pb.go (27)
Frame(167-180)Frame(193-193)Frame(208-210)Frame_Syn(279-281)Frame_Syn(297-297)Syn(357-362)Syn(375-375)Syn(390-392)Frame_Rst(291-293)Frame_Rst(303-303)Rst(489-494)Rst(507-507)Rst(522-524)RstCode_RST_CODE_NO_LISTENER(28-28)Frame_Data(283-285)Frame_Data(299-299)Data(401-406)Data(419-419)Data(434-436)RstCode_RST_CODE_INTERNAL(31-31)Frame_Fin(287-289)Frame_Fin(301-301)Fin(445-450)Fin(463-463)Fin(478-480)Frame_Hello(275-277)Frame_Hello(295-295)pkg/rtun/transport/errors.go (1)
ErrConnReset(7-7)
pkg/rtun/server/handler.go (6)
pb/c1/connectorapi/rtun/v1/rtun_grpc.pb.go (4)
UnimplementedReverseTunnelServiceServer(65-65)UnimplementedReverseTunnelServiceServer(67-69)UnimplementedReverseTunnelServiceServer(70-70)ReverseTunnelServiceServer(56-58)pkg/rtun/server/registry.go (1)
Registry(15-19)pkg/rtun/server/auth.go (1)
TokenValidator(10-16)pkg/rtun/transport/session.go (5)
Option(16-16)Link(78-82)WithAllowedPorts(39-52)WithMetricsHandler(70-74)NewSession(108-146)pkg/rtun/server/options.go (4)
Option(19-19)ErrProtocol(13-13)ErrHelloTimeout(15-15)WithMetricsHandler(26-28)pb/c1/connectorapi/rtun/v1/rtun.pb.go (9)
Frame(167-180)Frame(193-193)Frame(208-210)Hello(305-311)Hello(324-324)Hello(339-341)ReverseTunnelServiceLinkResponse(123-128)ReverseTunnelServiceLinkResponse(141-141)ReverseTunnelServiceLinkResponse(156-158)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (3)
pkg/rtun/server/handler.go (3)
1-11: LGTM: Clean importsThe imports are well-organized and all necessary for the implementation.
106-134: LGTM: Clean session lifecycle management.The session creation, registration, and cleanup flow is well-structured:
- Options built from validated HELLO ports
- Metrics passed through to transport layer
- Proper defer stacking ensures cleanup in the correct order (metrics unregister before registry unregister)
- Blocking on stream context keeps the handler alive until client disconnect or shutdown
136-153: LGTM: Correct transport.Link adapter.The grpcLink adapter correctly bridges the gRPC bidi stream to the transport.Link interface, with proper frame wrapping/unwrapping and context delegation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/metrics/otel.go (1)
25-36: GuarddefaultAttrsreaders to finish the race fix
WithTagsnow writesh.defaultAttrsbehinddefaultAttrsMtx, but the reads inInt64Histogram,Int64Counter, andInt64Gaugestill access it without the matching lock (c.setDefaultAttrs(h.defaultAttrs)), so the data race survives: one goroutine writes under the mutex, another reads without it. Please take the read-sideRLockbefore copying the pointer in those methods.@@ func (h *otelHandler) Int64Histogram(name string, description string, unit Unit) Int64Histogram { - c.setDefaultAttrs(h.defaultAttrs) + h.defaultAttrsMtx.RLock() + defaultAttrs := h.defaultAttrs + h.defaultAttrsMtx.RUnlock() + c.setDefaultAttrs(defaultAttrs) @@ func (h *otelHandler) Int64Counter(name string, description string, unit Unit) Int64Counter { - c.setDefaultAttrs(h.defaultAttrs) + h.defaultAttrsMtx.RLock() + defaultAttrs := h.defaultAttrs + h.defaultAttrsMtx.RUnlock() + c.setDefaultAttrs(defaultAttrs) @@ func (h *otelHandler) Int64Gauge(name string, description string, unit Unit) Int64Gauge { - c.setDefaultAttrs(h.defaultAttrs) + h.defaultAttrsMtx.RLock() + defaultAttrs := h.defaultAttrs + h.defaultAttrsMtx.RUnlock() + c.setDefaultAttrs(defaultAttrs)
🧹 Nitpick comments (2)
pkg/rtun/gateway/client.go (1)
326-338: Document or handle ignored Fin send error.The Fin send error at line 331 is ignored with
_ =, unlike Data frame send errors (lines 347-356) which properly capture the error and return. If this is intentional best-effort behavior during connection teardown, add a comment explaining why errors can be safely ignored. Otherwise, handle the error consistently with Data frames.Apply this diff to add documentation:
case msg := <-w.ch: if msg.fin { + // Best-effort Fin during teardown: ignore errors as connection is closing finFrame := &rtunpb.Frame{Or handle the error consistently:
case msg := <-w.ch: if msg.fin { finFrame := &rtunpb.Frame{ Sid: w.gsid, Kind: &rtunpb.Frame_Fin{Fin: &rtunpb.Fin{}}, } - _ = w.stream.Send( + if err := w.stream.Send( &rtunpb.ReverseDialerServiceOpenRequest{ Kind: &rtunpb.ReverseDialerServiceOpenRequest_Frame{ Frame: finFrame, }, }, - ) - continue + ); err != nil { + w.setErr(err) + } + return // Fin is final, exit loop }pkg/rtun/transport/session.go (1)
370-387: Clarify misleading comment.The comment at line 371 states "It must be called outside of s.mu (we only use it here where no lock is held)" but this might confuse readers since the method itself acquires the lock at line 374. Consider rephrasing to: "Must be called without holding s.mu; acquires the lock internally."
-// failLocked closes all session resources and marks the session as closing. -// It must be called outside of s.mu (we only use it here where no lock is held), -// but it acquires the lock internally for safety. +// failLocked closes all session resources and marks the session as closing. +// Must be called without holding s.mu; acquires the lock internally. func (s *Session) failLocked(err error) {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
pkg/metrics/otel.go(3 hunks)pkg/rtun/gateway/client.go(1 hunks)pkg/rtun/gateway/client_conn_test.go(1 hunks)pkg/rtun/transport/session.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
pkg/rtun/transport/session.go (3)
pkg/metrics/metrics.go (4)
Handler(7-16)Int64Counter(18-20)Dimensionless(33-33)Bytes(34-34)pb/c1/connectorapi/rtun/v1/rtun.pb.go (27)
Frame(167-180)Frame(193-193)Frame(208-210)Frame_Syn(279-281)Frame_Syn(297-297)Syn(357-362)Syn(375-375)Syn(390-392)Frame_Rst(291-293)Frame_Rst(303-303)Rst(489-494)Rst(507-507)Rst(522-524)RstCode_RST_CODE_NO_LISTENER(28-28)Frame_Data(283-285)Frame_Data(299-299)Data(401-406)Data(419-419)Data(434-436)RstCode_RST_CODE_INTERNAL(31-31)Frame_Fin(287-289)Frame_Fin(301-301)Fin(445-450)Fin(463-463)Fin(478-480)Frame_Hello(275-277)Frame_Hello(295-295)pkg/rtun/transport/errors.go (1)
ErrConnReset(7-7)
pkg/rtun/gateway/client_conn_test.go (5)
pkg/rtun/server/registry.go (1)
NewRegistry(22-32)pkg/rtun/server/handler.go (1)
NewHandler(27-40)pkg/rtun/gateway/server.go (2)
NewServer(30-46)Server(21-27)pkg/rtun/transport/session.go (2)
Link(78-82)NewSession(107-144)pkg/rtun/gateway/client.go (1)
NewDialer(56-70)
pkg/rtun/gateway/client.go (4)
pb/c1/connectorapi/rtun/v1/gateway_grpc.pb.go (2)
NewReverseDialerServiceClient(39-41)ReverseDialerService_OpenClient(54-54)pb/c1/connectorapi/rtun/v1/gateway.pb.go (14)
ReverseDialerServiceOpenRequest(25-34)ReverseDialerServiceOpenRequest(47-47)ReverseDialerServiceOpenRequest(62-64)ReverseDialerServiceOpenRequest_OpenReq(95-97)ReverseDialerServiceOpenRequest_OpenReq(103-103)OpenRequest(192-199)OpenRequest(212-212)OpenRequest(227-229)OpenResponse_NotFound(331-333)OpenResponse_NotFound(339-339)OpenResponse_Opened(335-337)OpenResponse_Opened(341-341)ReverseDialerServiceOpenRequest_Frame(99-101)ReverseDialerServiceOpenRequest_Frame(105-105)pkg/rtun/gateway/errors.go (2)
ErrProtocol(8-8)ErrNotFound(6-6)pb/c1/connectorapi/rtun/v1/rtun.pb.go (18)
Frame_Data(283-285)Frame_Data(299-299)Data(401-406)Data(419-419)Data(434-436)Frame_Fin(287-289)Frame_Fin(301-301)Frame_Rst(291-293)Frame_Rst(303-303)Rst(489-494)Rst(507-507)Rst(522-524)Frame(167-180)Frame(193-193)Frame(208-210)Fin(445-450)Fin(463-463)Fin(478-480)
pkg/metrics/otel.go (3)
pkg/metrics/metrics.go (2)
Unit(30-30)Handler(7-16)vendor/go.opentelemetry.io/otel/metric/instrument.go (3)
WithDescription(112-112)WithUnit(189-189)WithAttributes(364-368)vendor/go.opentelemetry.io/otel/metric/meter.go (2)
Observer(249-260)Meter(41-228)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (10)
pkg/rtun/gateway/client_conn_test.go (2)
27-78: LGTM: Well-structured test environment setup.The
setupGatewayfunction properly establishes a comprehensive integration test environment with appropriate cleanup. The dual mode (silent vs health-server) provides flexibility for different test scenarios.
136-171: LGTM: Bidirectional I/O test properly usesio.ReadFull.The test correctly uses
io.ReadFullat line 161 to ensure the full message is received, addressing the previous review concern about partial reads from a singleReadcall. The test comprehensively validates write, remote receive, and EOF propagation on close.pkg/rtun/gateway/client.go (3)
72-155: LGTM: DialContext properly addresses previous review concerns.The implementation correctly uses
grpc.NewClientinstead of the deprecatedgrpc.DialContext, and allstream.CloseSend()errors are explicitly acknowledged with_ =to satisfy errcheck. The OpenResponse handling logic (NotFound vs Opened) is correct, and cleanup on error paths is appropriate.
400-551: LGTM: Deadline race condition properly addressed.The implementation correctly protects deadline fields with locks:
rdDeadlineis protected byreadMu(snapshots at lines 412-414, updates at lines 530-532, 540-542)wrDeadlineis protected bywriteMu(snapshots at line 467 while holding lock from 461, updates at lines 533-535, 547-549)This approach eliminates the data race concern from the previous review and upholds the
net.Connconcurrency contract.
186-398: LGTM: Reader and writer components are well-implemented.The reader properly handles frame types (Data, Fin, Rst) with appropriate error propagation. The writer's enqueue method correctly handles deadlines, backpressure, and context cancellation. Lazy initialization via
once.Doand shutdown signaling viadoneChare appropriately managed.pkg/rtun/transport/session.go (5)
76-82: LGTM: Clean Link interface design.The minimal Link interface appropriately decouples from gRPC specifics and simplifies testing.
106-144: LGTM: Robust Session initialization.The constructor applies appropriate defaults (10m idle timeout, nop logger fallback) and correctly registers the observable gauge callback for active SID monitoring.
201-313: LGTM: Comprehensive frame dispatch with correct protocol enforcement.The recvLoop correctly implements the protocol state machine:
- sid==0 validation treats SYN as fatal, drops other frames (lines 224-231)
- Duplicate SYN detection prevents SID reuse (lines 247-252)
- DATA-before-SYN rejection as protocol error (lines 278-284)
- Late frame handling via closed set (lines 265-268)
- Proper cleanup on FIN/RST with closed set tracking (lines 286-310)
318-361: LGTM: Context-aware Open with appropriate safeguards.The method correctly checks context before locking (lines 323-327), enforces port policy (lines 330-334), and handles SYN send failures with cleanup (lines 351-355).
389-452: LGTM: Well-structured metrics instrumentation.The metrics implementation correctly:
- Uses observable gauge for active SID count with safe locking in callback (lines 409, 136-141)
- Records frame/byte counters with appropriate dimensionality (kind/code tags)
- Separates concerns with dedicated helper methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
pkg/rtun/transport/session.go (4)
301-344: Start the recv loop on Open() for ergonomics (or document requirement).Without Start(), reverse-dial replies won’t be processed. Safer to auto-start here too.
Apply this diff:
func (s *Session) Open(ctx context.Context, port uint32) (net.Conn, error) { @@ if s.allowedPorts != nil { if !s.allowedPorts[port] { return nil, errors.New("rtun: port not allowed by HELLO policy") } } + // Ensure recv loop is running so replies are processed. + s.startOnce() + s.mu.Lock()If explicit Start() is required by design, please add a note to Open()’s doc comment.
223-227: Metrics consistency: count RST as frames_tx and use enum strings for code.Add frames_tx for RST and tag rstSent with the enum string.
Apply this diff:
- _ = s.link.Send(&rtunpb.Frame{Sid: sid, Kind: &rtunpb.Frame_Rst{Rst: &rtunpb.Rst{Code: rtunpb.RstCode_RST_CODE_NO_LISTENER}}}) - if s.m != nil { - s.m.recordRstSent(s.link.Context(), "no_listener") - } + _ = s.link.Send(&rtunpb.Frame{Sid: sid, Kind: &rtunpb.Frame_Rst{Rst: &rtunpb.Rst{Code: rtunpb.RstCode_RST_CODE_NO_LISTENER}}}) + if s.m != nil { + s.m.recordFrameTx(s.link.Context(), "RST") + s.m.recordRstSent(s.link.Context(), rtunpb.RstCode_RST_CODE_NO_LISTENER.String()) + }
246-269: Shorten critical section in DATA path; avoid metrics and payload copy under lock (and count RST as frames_tx).Reduces contention and avoids calling external handlers while holding s.mu.
Apply this diff:
- case *rtunpb.Frame_Data: - s.mu.Lock() - // Ignore if SID is closed (late frame) - if s.closed.IsClosed(sid) { - s.mu.Unlock() - continue - } - c := s.conns[sid] - payload := append([]byte(nil), k.Data.GetPayload()...) - if s.m != nil { - s.m.recordBytesRx(s.link.Context(), int64(len(payload))) - } - if c != nil { - s.mu.Unlock() - c.feedData(payload) - } else { - // defensive programming decision: DATA-before-SYN is a protocol error. RST and do not buffer. - s.mu.Unlock() - _ = s.link.Send(&rtunpb.Frame{Sid: sid, Kind: &rtunpb.Frame_Rst{Rst: &rtunpb.Rst{Code: rtunpb.RstCode_RST_CODE_INTERNAL}}}) - if s.m != nil { - s.m.recordRstSent(s.link.Context(), "protocol_violation") - } - continue - } + case *rtunpb.Frame_Data: + s.mu.Lock() + // Ignore if SID is closed (late frame) + if s.closed.IsClosed(sid) { + s.mu.Unlock() + continue + } + c := s.conns[sid] + s.mu.Unlock() + + payload := append([]byte(nil), k.Data.GetPayload()...) + if s.m != nil { + s.m.recordBytesRx(s.link.Context(), int64(len(payload))) + } + if c != nil { + c.feedData(payload) + } else { + // DATA-before-SYN is a protocol error. RST and do not buffer. + _ = s.link.Send(&rtunpb.Frame{Sid: sid, Kind: &rtunpb.Frame_Rst{Rst: &rtunpb.Rst{Code: rtunpb.RstCode_RST_CODE_INTERNAL}}}) + if s.m != nil { + s.m.recordFrameTx(s.link.Context(), "RST") + s.m.recordRstSent(s.link.Context(), rtunpb.RstCode_RST_CODE_INTERNAL.String()) + } + continue + }
312-317: Validate port 0 semantics for Open().Should port 0 be rejected? If reserved/invalid, add an early check.
Example:
- if s.allowedPorts != nil { + if port == 0 { + return nil, errors.New("rtun: port 0 is invalid") + } + if s.allowedPorts != nil {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
pkg/rtun/transport/session.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
pkg/rtun/transport/session.go (3)
pkg/metrics/metrics.go (4)
Handler(7-16)Int64Counter(18-20)Dimensionless(33-33)Bytes(34-34)pb/c1/connectorapi/rtun/v1/rtun.pb.go (27)
Frame(167-180)Frame(193-193)Frame(208-210)Frame_Syn(279-281)Frame_Syn(297-297)Syn(357-362)Syn(375-375)Syn(390-392)Frame_Rst(291-293)Frame_Rst(303-303)Rst(489-494)Rst(507-507)Rst(522-524)RstCode_RST_CODE_NO_LISTENER(28-28)Frame_Data(283-285)Frame_Data(299-299)Data(401-406)Data(419-419)Data(434-436)RstCode_RST_CODE_INTERNAL(31-31)Frame_Fin(287-289)Frame_Fin(301-301)Fin(445-450)Fin(463-463)Fin(478-480)Frame_Hello(275-277)Frame_Hello(295-295)pkg/rtun/transport/errors.go (1)
ErrConnReset(7-7)
🔇 Additional comments (1)
pkg/rtun/transport/session.go (1)
85-86: closedSet zero-value safety confirmed.IsClosedandClosehandlehighClosed=0and a nilrangesslice without panicking; no initializer required.
| s.mu.Lock() | ||
| for _, c := range s.conns { | ||
| c.handleRst(err) | ||
| } | ||
| for _, l := range s.listeners { | ||
| l.closeWithErr(err) | ||
| } | ||
| s.closing = true | ||
| s.mu.Unlock() | ||
| return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix potential deadlocks: don’t call into conns/listeners while holding s.mu (also refactor failLocked).
Both the Recv error path and failLocked call c.handleRst/closeWithErr under s.mu, risking deadlocks if those paths reenter Session. Snapshot under lock, then invoke without the lock.
Apply these diffs:
@@
- if err != nil {
- s.mu.Lock()
- for _, c := range s.conns {
- c.handleRst(err)
- }
- for _, l := range s.listeners {
- l.closeWithErr(err)
- }
- s.closing = true
- s.mu.Unlock()
- return
- }
+ if err != nil {
+ s.failLocked(err)
+ return
+ }
@@
-func (s *Session) failLocked(err error) {
- s.mu.Lock()
- if s.closing {
- s.mu.Unlock()
- return
- }
- for _, c := range s.conns {
- c.handleRst(err)
- }
- for _, l := range s.listeners {
- l.closeWithErr(err)
- }
- s.closing = true
- s.mu.Unlock()
-}
+func (s *Session) failLocked(err error) {
+ s.mu.Lock()
+ if s.closing {
+ s.mu.Unlock()
+ return
+ }
+ conns := make([]*virtConn, 0, len(s.conns))
+ for _, c := range s.conns {
+ conns = append(conns, c)
+ }
+ listeners := make([]*rtunListener, 0, len(s.listeners))
+ for _, l := range s.listeners {
+ listeners = append(listeners, l)
+ }
+ s.closing = true
+ s.mu.Unlock()
+
+ for _, c := range conns {
+ c.handleRst(err)
+ }
+ for _, l := range listeners {
+ l.closeWithErr(err)
+ }
+}Also applies to: 357-370
🤖 Prompt for AI Agents
In pkg/rtun/transport/session.go around lines 188-197 and 357-370, the code
calls c.handleRst and l.closeWithErr while holding s.mu which can deadlock if
those methods reenter Session; change both sites to: under s.mu take
snapshots/copies of s.conns and s.listeners (and set s.closing=true) then
release s.mu and iterate the snapshots to call handleRst/closeWithErr outside
the lock; also refactor failLocked so it only performs state changes under the
lock and defers any calls into connections/listeners to a follow-up step
executed after unlocking.
| Sid: w.gsid, | ||
| Kind: &rtunpb.Frame_Fin{Fin: &rtunpb.Fin{}}, | ||
| } | ||
| _ = w.stream.Send( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log?
Introduce rtun: reverse-tunnel subsystem (gateway, server, transport, match)
Summary
Adds the initial rtun subsystem to enable reverse gRPC to clients behind NATs/firewalls. The design tries cleanly separates concerns into small, testable packages.
Core ideas
RegistryandHandlerfor reverse dialing; integrates with gRPC via a context dialer; lifecycle-safe shutdown.rtun://<clientID>:<port>addressing, context-aware dialing, error propagation.Things for reviewers to note
gateway,server,transport(keep idiomatic; small and composable).rtun://<clientID>:<port>is the canonical reverse-dial format.Non-goals (for this PR)
Follow-ups
Summary by CodeRabbit
New Features
Chores
Tests