diff --git a/cmd/atecontroller/internal/controllers/workerpool_apply.go b/cmd/atecontroller/internal/controllers/workerpool_apply.go index faa94f572..fb3fea2dc 100644 --- a/cmd/atecontroller/internal/controllers/workerpool_apply.go +++ b/cmd/atecontroller/internal/controllers/workerpool_apply.go @@ -15,12 +15,16 @@ package controllers import ( + "os" + "strconv" + corev1 "k8s.io/api/core/v1" appsv1ac "k8s.io/client-go/applyconfigurations/apps/v1" corev1ac "k8s.io/client-go/applyconfigurations/core/v1" metav1ac "k8s.io/client-go/applyconfigurations/meta/v1" "github.com/agent-substrate/substrate/internal/ateompath" + "github.com/agent-substrate/substrate/internal/egresscapture" atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1" ) @@ -44,7 +48,9 @@ func buildDeploymentApplyConfig(wp *atev1alpha1.WorkerPool) *appsv1ac.Deployment WithValueFrom(corev1ac.EnvVarSource(). WithFieldRef(corev1ac.ObjectFieldSelector(). WithFieldPath("metadata.uid"))), - ). + ) + containerAC.WithEnv(egressCaptureEnvFromController()...) + containerAC. WithVolumeMounts(corev1ac.VolumeMount(). WithName("run-ateom"). WithMountPath(ateompath.BasePath)) @@ -82,6 +88,37 @@ func buildDeploymentApplyConfig(wp *atev1alpha1.WorkerPool) *appsv1ac.Deployment WithSpec(podSpecAC))) } +func egressCaptureEnvFromController() []*corev1ac.EnvVarApplyConfiguration { + enabled, _ := strconv.ParseBool(os.Getenv(egresscapture.EnvCaptureEnabled)) + if !enabled { + return nil + } + + env := []*corev1ac.EnvVarApplyConfiguration{ + corev1ac.EnvVar(). + WithName(egresscapture.EnvCaptureEnabled). + WithValue("true"), + } + if v := os.Getenv(egresscapture.EnvPEPAddress); v != "" { + env = append(env, corev1ac.EnvVar(). + WithName(egresscapture.EnvPEPAddress). + WithValue(v)) + } + if v := os.Getenv(egresscapture.EnvTunnelProtocol); v != "" { + env = append(env, corev1ac.EnvVar(). + WithName(egresscapture.EnvTunnelProtocol). + WithValue(v)) + } + for _, name := range egresscapture.OptionalEnvNames { + if v := os.Getenv(name); v != "" { + env = append(env, corev1ac.EnvVar(). + WithName(name). + WithValue(v)) + } + } + return env +} + // maybeApplyMicroVMPodShape adds the /dev/kvm device and node placement a // micro-VM (kata + cloud-hypervisor) worker pool needs, on top of any // pod-template settings. No-op unless sandboxClass is the micro-VM class. diff --git a/cmd/atecontroller/internal/controllers/workerpool_apply_test.go b/cmd/atecontroller/internal/controllers/workerpool_apply_test.go index 1cdc972d6..f93a6d74b 100644 --- a/cmd/atecontroller/internal/controllers/workerpool_apply_test.go +++ b/cmd/atecontroller/internal/controllers/workerpool_apply_test.go @@ -26,6 +26,7 @@ import ( metav1ac "k8s.io/client-go/applyconfigurations/meta/v1" "github.com/agent-substrate/substrate/internal/ateompath" + "github.com/agent-substrate/substrate/internal/egresscapture" atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1" ) @@ -261,6 +262,43 @@ func TestMicroVMPodShape(t *testing.T) { } } +func TestWorkerPoolEgressCaptureEnvPropagation(t *testing.T) { + t.Setenv(egresscapture.EnvCaptureEnabled, "1") + t.Setenv(egresscapture.EnvPEPAddress, "ate-egress.agentgateway-system.svc.cluster.local:15008") + t.Setenv(egresscapture.EnvTunnelProtocol, egresscapture.TunnelProtocolConnectTLS) + t.Setenv(egresscapture.EnvConnectTLSServerName, "ate-egress.agentgateway-system.svc.cluster.local") + t.Setenv(egresscapture.EnvConnectTLSCAFile, "/run/egress-ca/ca.crt") + t.Setenv(egresscapture.EnvConnectTLSInsecureSkipVerify, "true") + + wp := testWorkerPoolApplyConfig(nil) + deployment := buildDeploymentApplyConfig(wp) + containers := deployment.Spec.Template.Spec.Containers + if len(containers) != 1 { + t.Fatalf("containers length = %d, want 1", len(containers)) + } + + got := map[string]string{} + for _, env := range containers[0].Env { + if env.Name != nil && env.Value != nil { + got[*env.Name] = *env.Value + } + } + + want := map[string]string{ + egresscapture.EnvCaptureEnabled: "true", + egresscapture.EnvPEPAddress: "ate-egress.agentgateway-system.svc.cluster.local:15008", + egresscapture.EnvTunnelProtocol: egresscapture.TunnelProtocolConnectTLS, + egresscapture.EnvConnectTLSServerName: "ate-egress.agentgateway-system.svc.cluster.local", + egresscapture.EnvConnectTLSCAFile: "/run/egress-ca/ca.crt", + egresscapture.EnvConnectTLSInsecureSkipVerify: "true", + } + for name, value := range want { + if got[name] != value { + t.Errorf("env %s = %q, want %q", name, got[name], value) + } + } +} + func testWorkerPoolApplyConfig(tmpl *atev1alpha1.WorkerPoolPodTemplate) *atev1alpha1.WorkerPool { return &atev1alpha1.WorkerPool{ ObjectMeta: metav1.ObjectMeta{Name: "pool", Namespace: "default", UID: "uid"}, diff --git a/cmd/ateom-gvisor/egress_capture.go b/cmd/ateom-gvisor/egress_capture.go new file mode 100644 index 000000000..83e648e72 --- /dev/null +++ b/cmd/ateom-gvisor/egress_capture.go @@ -0,0 +1,140 @@ +//go:build linux + +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "encoding/binary" + "fmt" + "net" + "syscall" + "unsafe" + + "github.com/agent-substrate/substrate/internal/egresscapture" + "github.com/google/nftables" + "github.com/google/nftables/binaryutil" + "github.com/google/nftables/expr" + "golang.org/x/sys/unix" +) + +const ( + egressCaptureHTTPPort = uint16(15001) + egressCaptureHTTPSPort = uint16(15002) + egressOriginalHTTPPort = uint16(80) + egressOriginalHTTPSPort = uint16(443) +) + +var defaultEgressCaptureRedirects = []struct { + originalPort uint16 + capturePort uint16 +}{ + {originalPort: egressOriginalHTTPPort, capturePort: egressCaptureHTTPPort}, + {originalPort: egressOriginalHTTPSPort, capturePort: egressCaptureHTTPSPort}, +} + +var defaultEgressCaptureListeners = []egresscapture.Listener{ + {Port: egressCaptureHTTPPort}, + {Port: egressCaptureHTTPSPort}, +} + +func (s *AteomService) startEgressCaptureIfEnabled(ctx context.Context, identity egresscapture.ActorIdentity) error { + if !egresscapture.EnabledFromEnv() { + return nil + } + cfg, err := egresscapture.ConfigFromEnv(defaultEgressCaptureListeners) + if err != nil { + return err + } + capture, err := egresscapture.Start(ctx, identity, cfg, originalDestination) + if err != nil { + return fmt.Errorf("while starting actor egress capture: %w", err) + } + s.egressCapture = capture + return nil +} + +func addEgressCaptureRedirectRules(c *nftables.Conn, table *nftables.Table, prerouting *nftables.Chain, sourceIP string) { + for _, redirect := range defaultEgressCaptureRedirects { + c.AddRule(&nftables.Rule{ + Table: table, + Chain: prerouting, + Exprs: tcpRedirectExprs(sourceIP, redirect.originalPort, redirect.capturePort), + }) + } +} + +func tcpRedirectExprs(sourceIP string, originalPort, capturePort uint16) []expr.Any { + exprs := append(ipSourceEqual(sourceIP), tcpDestinationPortEqual(originalPort)...) + exprs = append(exprs, + &expr.Immediate{ + Register: 1, + Data: binaryutil.BigEndian.PutUint16(capturePort), + }, + &expr.Redir{ + RegisterProtoMin: 1, + }, + ) + return exprs +} + +func originalDestination(conn net.Conn) (net.Addr, error) { + tcpConn, ok := conn.(*net.TCPConn) + if !ok { + return nil, fmt.Errorf("captured connection is %T, not *net.TCPConn", conn) + } + + rawConn, err := tcpConn.SyscallConn() + if err != nil { + return nil, err + } + + var addr *net.TCPAddr + var controlErr error + if err := rawConn.Control(func(fd uintptr) { + addr, controlErr = originalDstFromFD(int(fd)) + }); err != nil { + return nil, err + } + if controlErr != nil { + return nil, controlErr + } + return addr, nil +} + +func originalDstFromFD(fd int) (*net.TCPAddr, error) { + var raw unix.RawSockaddrInet4 + size := uint32(unsafe.Sizeof(raw)) + _, _, errno := unix.Syscall6( + unix.SYS_GETSOCKOPT, + uintptr(fd), + uintptr(unix.SOL_IP), + uintptr(unix.SO_ORIGINAL_DST), + uintptr(unsafe.Pointer(&raw)), + uintptr(unsafe.Pointer(&size)), + 0, + ) + if errno != 0 { + return nil, errno + } + if raw.Family != syscall.AF_INET { + return nil, fmt.Errorf("SO_ORIGINAL_DST returned address family %d", raw.Family) + } + return &net.TCPAddr{ + IP: net.IPv4(raw.Addr[0], raw.Addr[1], raw.Addr[2], raw.Addr[3]), + Port: int(binary.BigEndian.Uint16((*[2]byte)(unsafe.Pointer(&raw.Port))[:])), + }, nil +} diff --git a/cmd/ateom-gvisor/main.go b/cmd/ateom-gvisor/main.go index 7e543c7ac..cabe6225f 100644 --- a/cmd/ateom-gvisor/main.go +++ b/cmd/ateom-gvisor/main.go @@ -32,6 +32,7 @@ import ( "github.com/agent-substrate/substrate/internal/ateinterceptors" "github.com/agent-substrate/substrate/internal/ateompath" "github.com/agent-substrate/substrate/internal/contextlogging" + "github.com/agent-substrate/substrate/internal/egresscapture" "github.com/agent-substrate/substrate/internal/proto/ateompb" "github.com/agent-substrate/substrate/internal/serverboot" "github.com/agent-substrate/substrate/internal/version" @@ -163,6 +164,7 @@ type AteomService struct { interiorNetNS netns.NsHandle actorLogger *actorlog.ActorLogger + egressCapture *egresscapture.Capture } var _ ateompb.AteomServer = (*AteomService)(nil) @@ -187,7 +189,7 @@ func (s *AteomService) RunWorkload(ctx context.Context, req *ateompb.RunWorkload // * Correct runsc version is downloaded and placed on disk. // * All OCI bundles are set up, including for "pause" container. - if err := s.setupActorNetwork(ctx); err != nil { + if err := s.setupActorNetwork(ctx, actorIdentityFromRun(req)); err != nil { return nil, fmt.Errorf("while setting up actor network: %w", err) } defer func() { @@ -341,7 +343,7 @@ func (s *AteomService) RestoreWorkload(ctx context.Context, req *ateompb.Restore // * All OCI bundles are set up, including for "pause" container. // * Checkpoint downloaded and placed on disk - if err := s.setupActorNetwork(ctx); err != nil { + if err := s.setupActorNetwork(ctx, actorIdentityFromRestore(req)); err != nil { return nil, fmt.Errorf("while setting up actor network: %w", err) } defer func() { @@ -388,7 +390,7 @@ func (s *AteomService) RestoreWorkload(ctx context.Context, req *ateompb.Restore return &ateompb.RestoreWorkloadResponse{}, nil } -func (s *AteomService) setupActorNetwork(ctx context.Context) (retErr error) { +func (s *AteomService) setupActorNetwork(ctx context.Context, identity egresscapture.ActorIdentity) (retErr error) { // Build a fresh point-to-point network between the worker pod netns and the // gVisor interior netns. The worker side keeps the pod's real eth0, creates // ateom0 as the gateway, and moves only the veth peer into the actor netns. @@ -456,7 +458,11 @@ func (s *AteomService) setupActorNetwork(ctx context.Context) (retErr error) { if err := enableIPv4Forwarding(); err != nil { return err } - if err := installActorNftablesRules(podIP); err != nil { + if err := s.startEgressCaptureIfEnabled(ctx, identity); err != nil { + return err + } + + if err := installActorNftablesRules(podIP, s.egressCapture != nil); err != nil { return err } @@ -540,6 +546,12 @@ func (s *AteomService) cleanupActorNetwork(ctx context.Context) error { if err := removeActorNftablesRules(); err != nil { return err } + if s.egressCapture != nil { + if err := s.egressCapture.Close(); err != nil { + slog.WarnContext(ctx, "Failed to close actor egress capture", "err", err) + } + s.egressCapture = nil + } var cleanupErr error if link, err := netlink.LinkByName(hostVethName); err == nil { @@ -621,7 +633,7 @@ func enableIPv4Forwarding() error { return nil } -func installActorNftablesRules(podIP net.IP) error { +func installActorNftablesRules(podIP net.IP, egressCapture bool) error { // Install a dedicated nftables table for the active actor. Keeping all // rules in an ateom-owned table makes cleanup simple and avoids mutating // Kubernetes or CNI-managed chains directly. @@ -659,6 +671,13 @@ func installActorNftablesRules(podIP net.IP) error { Hooknum: nftables.ChainHookPrerouting, Priority: nftables.ChainPriorityNATDest, }) + if egressCapture { + addEgressCaptureRedirectRules(c, table, prerouting, actorVethIP) + } + // TODO: Support optional DNS capture for hostname recovery for non-SNI, + // non-HTTP, or DNS-policy egress. The current HTTP/HTTPS path derives + // authority from TLS SNI or HTTP Host, so redirecting UDP/TCP 53 would + // add potential DNS proxy/cache/TTL/search-domain failures // TODO: Support inbound UDP DNAT for actors that expose UDP protocols such // as QUIC. // TODO: Replace the hard-coded HTTP port with the actor's configured @@ -790,6 +809,22 @@ func tcpDestinationPortEqual(port uint16) []expr.Any { } } +func actorIdentityFromRun(req *ateompb.RunWorkloadRequest) egresscapture.ActorIdentity { + return egresscapture.ActorIdentity{ + Namespace: req.GetActorTemplateNamespace(), + Template: req.GetActorTemplateName(), + ActorID: req.GetActorId(), + } +} + +func actorIdentityFromRestore(req *ateompb.RestoreWorkloadRequest) egresscapture.ActorIdentity { + return egresscapture.ActorIdentity{ + Namespace: req.GetActorTemplateNamespace(), + Template: req.GetActorTemplateName(), + ActorID: req.GetActorId(), + } +} + func createNetNSWithoutSwitching(ctx context.Context, name string) (netns.NsHandle, error) { runtime.LockOSThread() defer runtime.UnlockOSThread() diff --git a/cmd/ateom-microvm/egress_capture.go b/cmd/ateom-microvm/egress_capture.go new file mode 100644 index 000000000..3fa6cd4ea --- /dev/null +++ b/cmd/ateom-microvm/egress_capture.go @@ -0,0 +1,157 @@ +//go:build linux + +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "encoding/binary" + "fmt" + "net" + "syscall" + "unsafe" + + "github.com/agent-substrate/substrate/internal/egresscapture" + "github.com/agent-substrate/substrate/internal/proto/ateompb" + "github.com/google/nftables" + "github.com/google/nftables/binaryutil" + "github.com/google/nftables/expr" + "golang.org/x/sys/unix" +) + +const ( + egressCaptureHTTPPort = uint16(15001) + egressCaptureHTTPSPort = uint16(15002) + egressOriginalHTTPPort = uint16(80) + egressOriginalHTTPSPort = uint16(443) +) + +var defaultEgressCaptureRedirects = []struct { + originalPort uint16 + capturePort uint16 +}{ + {originalPort: egressOriginalHTTPPort, capturePort: egressCaptureHTTPPort}, + {originalPort: egressOriginalHTTPSPort, capturePort: egressCaptureHTTPSPort}, +} + +var defaultEgressCaptureListeners = []egresscapture.Listener{ + {Port: egressCaptureHTTPPort}, + {Port: egressCaptureHTTPSPort}, +} + +func (s *AteomService) startEgressCaptureIfEnabled(ctx context.Context, identity egresscapture.ActorIdentity) error { + if !egresscapture.EnabledFromEnv() { + return nil + } + cfg, err := egresscapture.ConfigFromEnv(defaultEgressCaptureListeners) + if err != nil { + return err + } + capture, err := egresscapture.Start(ctx, identity, cfg, originalDestination) + if err != nil { + return fmt.Errorf("while starting actor egress capture: %w", err) + } + s.egressCapture = capture + return nil +} + +func addEgressCaptureRedirectRules(c *nftables.Conn, table *nftables.Table, prerouting *nftables.Chain, sourceIP string) { + for _, redirect := range defaultEgressCaptureRedirects { + c.AddRule(&nftables.Rule{ + Table: table, + Chain: prerouting, + Exprs: tcpRedirectExprs(sourceIP, redirect.originalPort, redirect.capturePort), + }) + } +} + +func tcpRedirectExprs(sourceIP string, originalPort, capturePort uint16) []expr.Any { + exprs := append(ipSourceEqual(sourceIP), tcpDestinationPortEqual(originalPort)...) + exprs = append(exprs, + &expr.Immediate{ + Register: 1, + Data: binaryutil.BigEndian.PutUint16(capturePort), + }, + &expr.Redir{ + RegisterProtoMin: 1, + }, + ) + return exprs +} + +func originalDestination(conn net.Conn) (net.Addr, error) { + tcpConn, ok := conn.(*net.TCPConn) + if !ok { + return nil, fmt.Errorf("captured connection is %T, not *net.TCPConn", conn) + } + + rawConn, err := tcpConn.SyscallConn() + if err != nil { + return nil, err + } + + var addr *net.TCPAddr + var controlErr error + if err := rawConn.Control(func(fd uintptr) { + addr, controlErr = originalDstFromFD(int(fd)) + }); err != nil { + return nil, err + } + if controlErr != nil { + return nil, controlErr + } + return addr, nil +} + +func originalDstFromFD(fd int) (*net.TCPAddr, error) { + var raw unix.RawSockaddrInet4 + size := uint32(unsafe.Sizeof(raw)) + _, _, errno := unix.Syscall6( + unix.SYS_GETSOCKOPT, + uintptr(fd), + uintptr(unix.SOL_IP), + uintptr(unix.SO_ORIGINAL_DST), + uintptr(unsafe.Pointer(&raw)), + uintptr(unsafe.Pointer(&size)), + 0, + ) + if errno != 0 { + return nil, errno + } + if raw.Family != syscall.AF_INET { + return nil, fmt.Errorf("SO_ORIGINAL_DST returned address family %d", raw.Family) + } + return &net.TCPAddr{ + IP: net.IPv4(raw.Addr[0], raw.Addr[1], raw.Addr[2], raw.Addr[3]), + Port: int(binary.BigEndian.Uint16((*[2]byte)(unsafe.Pointer(&raw.Port))[:])), + }, nil +} + +func actorIdentityFromRun(req *ateompb.RunWorkloadRequest) egresscapture.ActorIdentity { + return egresscapture.ActorIdentity{ + Namespace: req.GetActorTemplateNamespace(), + Template: req.GetActorTemplateName(), + ActorID: req.GetActorId(), + } +} + +func actorIdentityFromRestore(req *ateompb.RestoreWorkloadRequest) egresscapture.ActorIdentity { + return egresscapture.ActorIdentity{ + Namespace: req.GetActorTemplateNamespace(), + Template: req.GetActorTemplateName(), + ActorID: req.GetActorId(), + } +} diff --git a/cmd/ateom-microvm/main.go b/cmd/ateom-microvm/main.go index 1bb77cfd4..2a17b45c4 100644 --- a/cmd/ateom-microvm/main.go +++ b/cmd/ateom-microvm/main.go @@ -36,6 +36,7 @@ import ( "github.com/agent-substrate/substrate/internal/actorlog" "github.com/agent-substrate/substrate/internal/ateinterceptors" "github.com/agent-substrate/substrate/internal/ateompath" + "github.com/agent-substrate/substrate/internal/egresscapture" "github.com/agent-substrate/substrate/internal/proto/ateompb" "github.com/agent-substrate/substrate/internal/serverboot" "github.com/agent-substrate/substrate/internal/version" @@ -205,6 +206,10 @@ type AteomService struct { // with ateom-gvisor). actorLogger *actorlog.ActorLogger + // egressCapture owns per-activation capture listeners when egress capture is + // enabled for this worker pod. + egressCapture *egresscapture.Capture + // running maps actor id -> the live micro-VM, kept so CheckpointWorkload can // pause+snapshot+teardown the same sandbox (and RestoreWorkload can track the // CH it relaunched). diff --git a/cmd/ateom-microvm/net.go b/cmd/ateom-microvm/net.go index e74db2bd3..bb77fd5a5 100644 --- a/cmd/ateom-microvm/net.go +++ b/cmd/ateom-microvm/net.go @@ -48,6 +48,7 @@ import ( "github.com/vishvananda/netns" "golang.org/x/sys/unix" + "github.com/agent-substrate/substrate/internal/egresscapture" "github.com/agent-substrate/substrate/internal/serverboot" ) @@ -120,7 +121,7 @@ func mustParseIP(s string) net.IP { // pod netns and the kata interior netns (see the package comment). Idempotent // via cleanup-before-setup; also sweeps stale kata taps out of the interior // netns so the sandbox always builds on a clean slate. -func (s *AteomService) setupActorNetwork(ctx context.Context) (retErr error) { +func (s *AteomService) setupActorNetwork(ctx context.Context, identity egresscapture.ActorIdentity) (retErr error) { s.cleanupActorNetworkOrExit(ctx, "Failed to clean up stale actor network before setup") defer func() { if retErr != nil { @@ -190,7 +191,10 @@ func (s *AteomService) setupActorNetwork(ctx context.Context) (retErr error) { if err := enableIPv4Forwarding(); err != nil { return err } - if err := installActorNftablesRules(podIP); err != nil { + if err := s.startEgressCaptureIfEnabled(ctx, identity); err != nil { + return err + } + if err := installActorNftablesRules(podIP, s.egressCapture != nil); err != nil { return err } @@ -249,6 +253,12 @@ func (s *AteomService) cleanupActorNetwork(ctx context.Context) error { cleanupErr = errors.Join(cleanupErr, fmt.Errorf("while removing actor nftables rules: %w", err)) slog.WarnContext(ctx, "Failed to remove actor nftables rules; continuing actor netns cleanup", slog.Any("err", err)) } + if s.egressCapture != nil { + if err := s.egressCapture.Close(); err != nil { + slog.WarnContext(ctx, "Failed to close actor egress capture", slog.Any("err", err)) + } + s.egressCapture = nil + } if link, err := netlink.LinkByName(hostVethName); err == nil { if err := netlink.LinkDel(link); err != nil { @@ -333,7 +343,7 @@ func enableIPv4Forwarding() error { return nil } -func installActorNftablesRules(podIP net.IP) error { +func installActorNftablesRules(podIP net.IP, egressCapture bool) error { // Dedicated ateom-owned IPv4 table (cheap cleanup, no CNI chain mutation): // * postrouting: masquerade actor egress (169.254.17.2) behind the pod IP. // * prerouting: DNAT pod-IP:80/tcp to the actor veth IP. @@ -357,6 +367,9 @@ func installActorNftablesRules(podIP net.IP) error { Hooknum: nftables.ChainHookPrerouting, Priority: nftables.ChainPriorityNATDest, }) + if egressCapture { + addEgressCaptureRedirectRules(c, table, prerouting, actorVethIP) + } preroutingExprs := append(ipDestinationEqual(podIP.String()), tcpDestinationPortEqual(80)...) preroutingExprs = append(preroutingExprs, &expr.Immediate{ diff --git a/cmd/ateom-microvm/restore.go b/cmd/ateom-microvm/restore.go index 1c7c82b84..cc64b74c3 100644 --- a/cmd/ateom-microvm/restore.go +++ b/cmd/ateom-microvm/restore.go @@ -122,7 +122,7 @@ func (s *AteomService) RestoreWorkload(ctx context.Context, req *ateompb.Restore // Networking: rebuild the per-activation veth + tap; the snapshot's virtio-net // is fd-backed, so CH needs fresh tap FDs (net_fds) on restore. - if err := s.setupActorNetwork(ctx); err != nil { + if err := s.setupActorNetwork(ctx, actorIdentityFromRestore(req)); err != nil { return nil, fmt.Errorf("while setting up actor network: %w", err) } defer func() { diff --git a/cmd/ateom-microvm/run.go b/cmd/ateom-microvm/run.go index 2acdd57bd..c2a1b0d63 100644 --- a/cmd/ateom-microvm/run.go +++ b/cmd/ateom-microvm/run.go @@ -219,7 +219,7 @@ func (s *AteomService) RunWorkload(ctx context.Context, req *ateompb.RunWorkload // Networking (host side): per-activation veth into the interior netns. The // tap + TC mirror is built below (after the VM exists) so its FDs are fresh. - if err := s.setupActorNetwork(ctx); err != nil { + if err := s.setupActorNetwork(ctx, actorIdentityFromRun(req)); err != nil { return nil, fmt.Errorf("while setting up actor network: %w", err) } defer func() { diff --git a/demos/counter/counter.go b/demos/counter/counter.go index 6b99d5db2..53aa58ff7 100644 --- a/demos/counter/counter.go +++ b/demos/counter/counter.go @@ -26,6 +26,7 @@ import ( "log/slog" "net" "net/http" + "net/url" "os" "sync/atomic" "time" @@ -35,6 +36,8 @@ import ( var requestCount uint64 +const defaultEgressURL = "https://httpbin.org/get" + func main() { pflag.Parse() ctx := context.Background() @@ -51,6 +54,7 @@ func main() { w.WriteHeader(http.StatusOK) w.Write([]byte(response)) }) + defaultMux.HandleFunc("/egress", handleEgress) go func() { slog.InfoContext(ctx, "Starting counter server on port 80") @@ -79,6 +83,70 @@ func main() { } } +func handleEgress(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + + targetURL, err := egressTargetURL(r) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, targetURL, nil) + if err != nil { + http.Error(w, fmt.Sprintf("invalid egress target %q: %v", targetURL, err), http.StatusBadRequest) + return + } + + start := time.Now() + resp, err := http.DefaultClient.Do(req) + if err != nil { + slog.ErrorContext(ctx, "Egress request failed", slog.String("target", targetURL), slog.Any("err", err)) + http.Error(w, fmt.Sprintf("egress request to %s failed: %v\n", targetURL, err), http.StatusBadGateway) + return + } + defer resp.Body.Close() + + body, err := io.ReadAll(io.LimitReader(resp.Body, 1024)) + if err != nil { + slog.ErrorContext(ctx, "Failed reading egress response", slog.String("target", targetURL), slog.Any("err", err)) + http.Error(w, fmt.Sprintf("reading egress response from %s failed: %v\n", targetURL, err), http.StatusBadGateway) + return + } + + slog.InfoContext(ctx, "Egress request completed", + slog.String("target", targetURL), + slog.Int("upstream_status", resp.StatusCode), + slog.Duration("duration", time.Since(start))) + + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, "egress target: %s\n", targetURL) + fmt.Fprintf(w, "upstream status: %s\n", resp.Status) + fmt.Fprintf(w, "body bytes read: %d\n", len(body)) + fmt.Fprintf(w, "body:\n%s\n", body) +} + +func egressTargetURL(r *http.Request) (string, error) { + targetURL := r.URL.Query().Get("url") + if targetURL == "" { + targetURL = defaultEgressURL + } + + parsed, err := url.Parse(targetURL) + if err != nil { + return "", fmt.Errorf("invalid egress target %q: %w", targetURL, err) + } + if parsed.Scheme != "http" && parsed.Scheme != "https" { + return "", fmt.Errorf("invalid egress target %q: scheme must be http or https", targetURL) + } + if parsed.Host == "" { + return "", fmt.Errorf("invalid egress target %q: host is required", targetURL) + } + return targetURL, nil +} + func writeRandomFile() error { rf, err := os.Create("/random-content-file") if err != nil { diff --git a/demos/counter/counter_test.go b/demos/counter/counter_test.go new file mode 100644 index 000000000..e5e008574 --- /dev/null +++ b/demos/counter/counter_test.go @@ -0,0 +1,67 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "net/http/httptest" + "testing" +) + +func TestEgressTargetURL(t *testing.T) { + for _, tc := range []struct { + name string + path string + want string + wantErr bool + }{ + { + name: "default", + path: "/egress", + want: defaultEgressURL, + }, + { + name: "custom url", + path: "/egress?url=https%3A%2F%2Fhttpbin.org%2Fheaders", + want: "https://httpbin.org/headers", + }, + { + name: "reject missing host", + path: "/egress?url=https%3A%2F%2F", + wantErr: true, + }, + { + name: "reject unsupported scheme", + path: "/egress?url=ftp%3A%2F%2Fexample.com%2Ffile", + wantErr: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + req := httptest.NewRequest("POST", tc.path, nil) + got, err := egressTargetURL(req) + if tc.wantErr { + if err == nil { + t.Fatal("egressTargetURL() returned nil error, want error") + } + return + } + if err != nil { + t.Fatalf("egressTargetURL() returned error: %v", err) + } + if got != tc.want { + t.Fatalf("egressTargetURL() = %q, want %q", got, tc.want) + } + }) + } +} diff --git a/docs/egress-capture-testing.md b/docs/egress-capture-testing.md new file mode 100644 index 000000000..7a32c043f --- /dev/null +++ b/docs/egress-capture-testing.md @@ -0,0 +1,476 @@ +# Egress Capture + +This documents the demo install path and the ateom capture setup using +the existing counter demo. The counter demo is useful because an external curl to +the router resumes a real gVisor actor, and its `/egress` path makes that actor +open an outbound HTTPS request to `https://httpbin.org/get` by default. + +## Architecture + +When egress capture is enabled, each worker pod gets `ATE_EGRESS_*` +configuration from the controller. The reusable capture core lives in +`internal/egresscapture`: it owns environment parsing, capture listeners, +authority derivation, CONNECT tunnel transports, and byte proxying. The +runtime-specific `ateom` implementation supplies the original-destination lookup +and packet-capture rules. + +The current gVisor implementation starts local capture listeners and installs +actor-network redirects for TCP/80 and TCP/443. From the actor's point of view +it still opens a normal HTTP or HTTPS connection to the original destination. +MicroVM or future hypervisor implementations should reuse `internal/egresscapture` +for the proxy/tunnel path, but provide their own network setup for redirecting +actor traffic and recovering the original destination. + +The redirected connection lands on `ateom`, which records the original +destination and derives a stable CONNECT authority from the first bytes of the +actor connection: + +| Actor traffic | Authority source | Example CONNECT authority | +| --- | --- | --- | +| HTTPS / TCP 443 | TLS ClientHello SNI | `httpbin.org:443` | +| Plaintext HTTP / TCP 80 | HTTP `Host` header | `example.com:80` | + +`ateom` then opens a tunnel selected by `ATE_EGRESS_TUNNEL_PROTOCOL`. The +default `connect` transport opens a plaintext HTTP/2 CONNECT stream to the +agentgateway data plane at +`ate-egress.agentgateway-system.svc.cluster.local:15008`. The transport registry +also supports TLS CONNECT variants and is intended to allow future transports +such as HBONE without changing the runtime-specific capture setup. Agentgateway +maps the CONNECT authority to its configured TCP listener and routes the tunnel +to a Kubernetes Service backed by an EndpointSlice. + +The demo setup configures only `httpbin.org:443` for egress. +Other hosts or plaintext HTTP destinations need their own agentgateway +Service, EndpointSlice, listener, and route. For HTTPS, TLS is still end-to-end +between the actor and the external service; agentgateway only routes the +encrypted bytes after CONNECT succeeds. + +Enabling egress on an already-running ATE system creates or updates the +`ate-egress-capture` ConfigMap, but that ConfigMap does not currently force an +`ate-controller` restart. If egress variables are missing from worker pods after +enabling capture, restart `ate-controller` so it rereads the config and +reconciles WorkerPool deployments. + +## Prerequisites + +- A working Kubernetes cluster and kubeconfig. +- `kubectl`, `helm`, `jq`, and `curl`. +- `kubectl ate` available from this repo, for example: + +```bash +go install ./cmd/kubectl-ate +``` + +## Install with capture enabled + +For a normal cluster: + +```bash +./hack/install-ate.sh --egress --deploy-ate-system +``` + +For kind: + +```bash +./hack/install-ate-kind.sh --egress --deploy-ate-system +``` + +This deploys agentgateway with a static `httpbin.org:443` egress route, creates +the `ate-system/ate-egress-capture` config map, and deploys the ATE system. +When `ATE_EGRESS_CAPTURE_ENABLED=true`, `ATE_EGRESS_PEP_ADDRESS` is required; +the install script sets it to the in-cluster `ate-egress` Service by default. + +The install script resolves `httpbin.org` during install and creates the +`httpbin-egress` Service and EndpointSlice for those IPs. `ateom` derives the +CONNECT authority from SNI for this HTTPS demo. + +Verify the egress config: + +```bash +kubectl get configmap -n ate-system ate-egress-capture -o yaml +``` + +Expected values: + +```yaml +ATE_EGRESS_CAPTURE_ENABLED: "true" +ATE_EGRESS_PEP_ADDRESS: ate-egress.agentgateway-system.svc.cluster.local:15008 +ATE_EGRESS_TUNNEL_PROTOCOL: connect +``` + +Verify the static agentgateway resources: + +```bash +kubectl get gateway -n agentgateway-system ate-egress +kubectl get tcproute -n agentgateway-system httpbin-egress +kubectl get agentgatewaypolicy -n agentgateway-system ate-egress-connect +kubectl get service -n agentgateway-system httpbin-egress +kubectl get endpointslice -n agentgateway-system httpbin-egress +``` + +Expected resources include: + +```text +gateway.gateway.networking.k8s.io/ate-egress +tcproute.gateway.networking.k8s.io/httpbin-egress +agentgatewaypolicy.agentgateway.dev/ate-egress-connect +service/httpbin-egress +endpointslice.discovery.k8s.io/httpbin-egress +``` + +## Deploy and call the counter actor + +Deploy the existing counter demo: + +```bash +./hack/install-ate.sh --deploy-demo-counter +``` + +Create an actor: + +```bash +kubectl ate create actor my-counter-1 --template ate-demo-counter/counter +``` + +Forward the router locally: + +```bash +kubectl port-forward -n ate-system svc/atenet-router 8000:80 +``` + +From another terminal, send an external request through the router to prove +normal ingress still works: + +```bash +curl -i -X POST \ + -H "Host: my-counter-1.actors.resources.substrate.ate.dev" \ + http://localhost:8000 +``` + +Expected response: + +```text +HTTP/1.1 200 OK +hello from: 169.254.17.2 | preserved memory count: 1 +``` + +The exact count can differ. The important part is that the actor reaches +`STATUS_RUNNING` and is assigned to an ateom pod. + +Now send an external request to the demo's egress path: + +```bash +curl -i -X POST \ + -H "Host: my-counter-1.actors.resources.substrate.ate.dev" \ + http://localhost:8000/egress +``` + +Expected response: + +```text +HTTP/1.1 200 OK +egress target: https://httpbin.org/get +upstream status: 200 OK +body bytes read: ... +``` + +The response must name `https://httpbin.org/get`. That proves the actor opened a +TCP connection to `httpbin.org:443` from inside the sandbox. + +To test a different `httpbin.org` path, pass it as the `url` query parameter: + +```bash +curl -i -X POST \ + -H "Host: my-counter-1.actors.resources.substrate.ate.dev" \ + "http://localhost:8000/egress?url=https%3A%2F%2Fhttpbin.org%2Fheaders" +``` + +Do not use this query parameter for a different host unless you also update the +agentgateway route. `ateom` will derive the new host from SNI, but the demo +agentgateway config only routes `httpbin.org:443`. + +## Run the microVM counter demo with egress enabled + +The microVM demo uses the same counter container and `/egress` handler, but runs +it with `sandboxClass: microvm` on `ateom-microvm`. This is useful for checking +that egress configuration reaches microVM worker pods and for exercising the +demo request path. + +When egress capture is enabled, `ateom-microvm` starts the same reusable +`internal/egresscapture` listeners as the gVisor runtime and installs +microVM-specific redirect rules in the worker pod network namespace. HTTP and +HTTPS egress from the guest is redirected to `ateom`, which recovers +`SO_ORIGINAL_DST`, derives CONNECT authority from HTTP `Host` or TLS SNI, and +opens the configured tunnel to the egress PEP. + +For kind, deploy the ATE system first so the in-cluster rustfs service exists +before the microVM demo stages runtime assets. Then run the microVM bring-up, +enable the egress resources, and restart microVM workers so the controller's +egress environment is copied into fresh worker pods: + +```bash +KIND_CLUSTER_NAME="${KIND_CLUSTER_NAME:-kind}" ./hack/install-ate-kind.sh --deploy-ate-system +KIND_CLUSTER_NAME="${KIND_CLUSTER_NAME:-kind}" ./hack/run-microvm-demo-kind.sh +KIND_CLUSTER_NAME="${KIND_CLUSTER_NAME:-kind}" ./hack/install-ate-kind.sh --egress --deploy-ate-controller +``` + +For kind, the node must expose `/dev/kvm` to the kind node and carry the +`ate.dev/sandboxClass=microvm` label. `hack/create-kind-cluster.sh` does both +when `/dev/kvm` is available in the Docker environment. Without that label, the +microVM worker pods remain `Pending`, and router requests fail with: + +```text +actor "my-counter-microvm-1" unavailable: no free workers available +``` + +For a normal cluster, run: + +```bash +./hack/run-microvm-demo.sh +./hack/install-ate.sh --egress --deploy-ate-controller +``` + +Before creating the actor, verify that the microVM worker pods are scheduled and +available: + +```bash +kubectl get nodes -L ate.dev/sandboxClass +kubectl get pods -n ate-demo-counter-microvm -o wide + +kubectl wait --for=condition=Available \ + deployment/counter-microvm-deployment -n ate-demo-counter-microvm --timeout=300s + +kubectl wait --for=condition=Ready \ + actortemplate/counter-microvm -n ate-demo-counter-microvm --timeout=600s +``` + +Expected node output includes `microvm` in the `ATE.DEV/SANDBOXCLASS` column, +and the `counter-microvm` pods should be `Running`. If the pods are `Pending`, +inspect the scheduling reason: + +```bash +kubectl describe pod -n ate-demo-counter-microvm \ + -l ate.dev/worker-pool=counter-microvm +``` + +If `/dev/kvm` is mounted but the label is missing, add the label and recreate the +worker pods: + +```bash +kubectl label node kind-control-plane ate.dev/sandboxClass=microvm --overwrite +kubectl rollout restart deployment/counter-microvm-deployment -n ate-demo-counter-microvm +kubectl rollout status deployment/counter-microvm-deployment -n ate-demo-counter-microvm +``` + +If `/dev/kvm` is not mounted into the node, recreate the kind cluster with +`hack/create-kind-cluster.sh` on a host or Docker VM that has KVM available. + +After the worker deployment and golden snapshot are ready, create a microVM actor +and call it through the router: + +```bash +kubectl ate create actor my-counter-microvm-1 \ + --template ate-demo-counter-microvm/counter-microvm + +kubectl port-forward -n ate-system svc/atenet-router 8000:80 +``` + +From another terminal, verify the normal counter path: + +```bash +curl -i -X POST \ + -H "Host: my-counter-microvm-1.actors.resources.substrate.ate.dev" \ + http://localhost:8000 +``` + +Then exercise the demo egress path: + +```bash +curl -i -X POST \ + -H "Host: my-counter-microvm-1.actors.resources.substrate.ate.dev" \ + http://localhost:8000/egress +``` + +Verify that the microVM worker pod received the egress configuration: + +```bash +actor_json=$(kubectl ate get actor my-counter-microvm-1 -o json) +ateom_ns=$(jq -r '.actors[0].ateomPodNamespace' <<<"${actor_json}") +ateom_pod=$(jq -r '.actors[0].ateomPodName' <<<"${actor_json}") + +kubectl get pod -n "${ateom_ns}" "${ateom_pod}" \ + -o jsonpath='{range .spec.containers[?(@.name=="ateom")].env[*]}{.name}={.value}{"\n"}{end}' \ + | grep ATE_EGRESS +``` + +Expected output includes: + +```text +ATE_EGRESS_CAPTURE_ENABLED=true +ATE_EGRESS_PEP_ADDRESS=ate-egress.agentgateway-system.svc.cluster.local:15008 +ATE_EGRESS_TUNNEL_PROTOCOL=connect +``` + +Check that the microVM runtime installed capture listeners: + +```bash +kubectl logs -n "${ateom_ns}" "${ateom_pod}" -c ateom | grep "Started actor egress capture listener" +``` + +Expected output includes two log lines, one for each local capture listener: + +```text +Started actor egress capture listener ... "port":15001 ... "protocol":"connect" +Started actor egress capture listener ... "port":15002 ... "protocol":"connect" +``` + +After the `/egress` request, the logs should also show the captured stream: + +```bash +kubectl logs -n "${ateom_ns}" "${ateom_pod}" -c ateom | grep "Proxying captured actor egress" +``` + +Expected output includes: + +```text +Proxying captured actor egress ... "originalDestination":"...:443" ... "connectAuthority":"httpbin.org:443" +``` + +## Verify capture was installed + +Find the worker pod hosting the actor: + +```bash +actor_json=$(kubectl ate get actor my-counter-1 -o json) +ateom_ns=$(jq -r '.actors[0].ateomPodNamespace' <<<"${actor_json}") +ateom_pod=$(jq -r '.actors[0].ateomPodName' <<<"${actor_json}") + +echo "${ateom_ns}/${ateom_pod}" +``` + +Check that the ateom pod received capture configuration: + +```bash +kubectl get pod -n "${ateom_ns}" "${ateom_pod}" \ + -o jsonpath='{range .spec.containers[?(@.name=="ateom")].env[*]}{.name}={.value}{"\n"}{end}' \ + | grep ATE_EGRESS +``` + +Expected output includes: + +```text +ATE_EGRESS_CAPTURE_ENABLED=true +ATE_EGRESS_PEP_ADDRESS=ate-egress.agentgateway-system.svc.cluster.local:15008 +ATE_EGRESS_TUNNEL_PROTOCOL=connect +``` + +Check the ateom logs: + +```bash +kubectl logs -n "${ateom_ns}" "${ateom_pod}" -c ateom | grep "Started actor egress capture listener" +``` + +Expected output includes two log lines, one for each local capture listener: + +```text +Started actor egress capture listener ... "port":15001 ... "protocol":"connect" +Started actor egress capture listener ... "port":15002 ... "protocol":"connect" +``` + +After the `/egress` request, the logs should also show the captured stream: + +```bash +kubectl logs -n "${ateom_ns}" "${ateom_pod}" -c ateom | grep "Proxying captured actor egress" +``` + +Expected output includes: + +```text +Proxying captured actor egress ... "originalDestination":"...:443" ... "connectAuthority":"httpbin.org:443" +``` + +## Check agentgateway logs + +The `ate-egress` Gateway creates an agentgateway dataplane pod in the +`agentgateway-system` namespace. Check dataplane logs with: + +```bash +kubectl logs -n agentgateway-system \ + -l gateway.networking.k8s.io/gateway-name=ate-egress \ + --all-containers --tail=200 +``` + +After a successful `/egress` request, dataplane logs should include a TCP route +entry similar to: + +```text +request gateway=agentgateway-system/ate-egress listener=https route=agentgateway-system/httpbin-egress ... protocol=tcp +``` + +If the Gateway, TCPRoute, or policy is not being programmed, check the +agentgateway controller logs: + +```bash +kubectl logs -n agentgateway-system deploy/agentgateway --tail=200 +``` + +## Clean up + +```bash +kubectl ate suspend actor my-counter-1 +kubectl ate delete actor my-counter-1 +./hack/install-ate.sh --delete-demo-counter +``` + +## Troubleshooting + +If the `ATE_EGRESS_*` variables are missing from the worker pod, restart the +controller and recreate the counter WorkerPool pods after creating the config +map: + +```bash +kubectl rollout restart deployment/ate-controller -n ate-system +kubectl rollout status deployment/ate-controller -n ate-system +kubectl rollout restart deployment/counter-deployment -n ate-demo-counter +kubectl rollout status deployment/counter-deployment -n ate-demo-counter +``` + +If the capture listener logs are missing, confirm that the actor is running on a +fresh worker pod created after egress was enabled: + +```bash +kubectl ate get actor my-counter-1 +kubectl get pods -n ate-demo-counter -l ate.dev/worker-pool=counter +``` + +If the microVM curl returns `503 Service Unavailable` with `no free workers +available`, the request reached the router and ate-api, but no eligible microVM +worker was available for assignment. Check the worker pods and node label: + +```bash +kubectl get nodes -L ate.dev/sandboxClass +kubectl get pods -n ate-demo-counter-microvm -o wide +kubectl describe pod -n ate-demo-counter-microvm \ + -l ate.dev/worker-pool=counter-microvm +``` + +The microVM WorkerPool pods require `nodeSelector: +ate.dev/sandboxClass=microvm` and `/dev/kvm`. For kind, recreate the cluster with +`hack/create-kind-cluster.sh`, or label the node if KVM is already mounted: + +```bash +kubectl label node kind-control-plane ate.dev/sandboxClass=microvm --overwrite +kubectl rollout restart deployment/counter-microvm-deployment -n ate-demo-counter-microvm +kubectl rollout status deployment/counter-microvm-deployment -n ate-demo-counter-microvm +``` + +If `/egress` fails after changing the `url` host, remember that this demo only +configures agentgateway for `httpbin.org:443`. Add matching static agentgateway +backend resources for the new destination: + +- HTTPS: Service, EndpointSlice, TCP listener on `443`, and TCPRoute. +- Plaintext HTTP: Service, EndpointSlice, TCP listener on `80`, and TCPRoute. + +Traffic without SNI or a plaintext HTTP `Host` header falls back to the captured +original destination IP and port, which requires matching agentgateway routing +for that address. diff --git a/hack/install-ate.sh b/hack/install-ate.sh index bc8200c06..b35a2eb54 100755 --- a/hack/install-ate.sh +++ b/hack/install-ate.sh @@ -49,6 +49,16 @@ source "${ROOT}"/hack/install-demo-multi-template.sh COLOR_CYAN='\033[1;36m' COLOR_RESET='\033[0m' +ATE_EGRESS_CAPTURE="${ATE_EGRESS_CAPTURE:-false}" +ATE_EGRESS_PEP_ADDRESS="${ATE_EGRESS_PEP_ADDRESS:-ate-egress.agentgateway-system.svc.cluster.local:15008}" +ATE_EGRESS_TUNNEL_PROTOCOL="${ATE_EGRESS_TUNNEL_PROTOCOL:-connect}" +ATE_EGRESS_CAPTURE_ENABLED_ENV="ATE_EGRESS_CAPTURE_ENABLED" +ATE_EGRESS_PEP_ADDRESS_ENV="ATE_EGRESS_PEP_ADDRESS" +ATE_EGRESS_TUNNEL_PROTOCOL_ENV="ATE_EGRESS_TUNNEL_PROTOCOL" +ATE_EGRESS_CONNECT_TLS_SERVER_NAME_ENV="ATE_EGRESS_CONNECT_TLS_SERVER_NAME" +ATE_EGRESS_CONNECT_TLS_CA_FILE_ENV="ATE_EGRESS_CONNECT_TLS_CA_FILE" +ATE_EGRESS_CONNECT_TLS_INSECURE_SKIP_VERIFY_ENV="ATE_EGRESS_CONNECT_TLS_INSECURE_SKIP_VERIFY" + function log_step() { local step_name="$1" echo -e "${COLOR_CYAN}[step]: ${step_name}${COLOR_RESET}" @@ -68,7 +78,9 @@ function usage() { echo "" echo " --deploy-atelet Deploy atelet only" echo " --deploy-ate-apiserver Deploy ate-api-server only" + echo " --deploy-ate-controller Deploy ate-controller only" echo " --deploy-atenet Deploy atenet only" + echo " --egress Enable actor egress capture via agentgateway" echo "" echo "To create individual resources used by ate-system (Note: These are" echo "called automatically by --deploy-ate-system):" @@ -96,6 +108,12 @@ run_kubectl() { "$@" } +run_helm() { + helm \ + ${KUBECTL_CONTEXT:+--kube-context=${KUBECTL_CONTEXT}} \ + "$@" +} + run_kubectl_ate() { go run ./cmd/kubectl-ate \ ${KUBECTL_CONTEXT:+--context=${KUBECTL_CONTEXT}} \ @@ -116,6 +134,218 @@ run_ko() { esac } +resolve_ipv4_addresses() { + local host="$1" + if command -v python3 >/dev/null 2>&1; then + python3 - "${host}" <<'PY' +import socket +import sys + +host = sys.argv[1] +addresses = sorted({ + result[4][0] + for result in socket.getaddrinfo(host, None, socket.AF_INET, socket.SOCK_STREAM) +}) +print(" ".join(addresses)) +PY + return + fi + if command -v python >/dev/null 2>&1; then + python - "${host}" <<'PY' +import socket +import sys + +host = sys.argv[1] +addresses = sorted(set( + result[4][0] + for result in socket.getaddrinfo(host, None, socket.AF_INET, socket.SOCK_STREAM) +)) +print(" ".join(addresses)) +PY + return + fi + if command -v dig >/dev/null 2>&1; then + dig +short A "${host}" | tr '\n' ' ' + return + fi + if command -v nslookup >/dev/null 2>&1; then + nslookup "${host}" | awk '/^Address: / { print $2 }' | tr '\n' ' ' + return + fi + echo "unable to resolve ${host}: python3, python, dig, or nslookup is required" >&2 + return 1 +} + +ensure_gateway_api() { + log_step "ensure_gateway_api" + if run_kubectl get crd \ + gatewayclasses.gateway.networking.k8s.io \ + gateways.gateway.networking.k8s.io \ + tcproutes.gateway.networking.k8s.io >/dev/null 2>&1; then + return + fi + + run_kubectl apply --server-side -f https://github.com/kubernetes-sigs/gateway-api/releases/download/v1.5.0/experimental-install.yaml +} + +deploy_agentgateway() { + log_step "deploy_agentgateway" + local httpbin_ips="${HTTPBIN_EGRESS_IPS:-}" + if [[ -z "${httpbin_ips}" ]]; then + httpbin_ips="$(resolve_ipv4_addresses httpbin.org)" + fi + if [[ -z "${httpbin_ips}" ]]; then + echo "failed to resolve httpbin.org IPv4 addresses" >&2 + exit 1 + fi + + local httpbin_endpoints="" + local ip="" + for ip in ${httpbin_ips}; do + httpbin_endpoints+=" - addresses:\n - ${ip}\n" + done + + ensure_gateway_api + run_helm upgrade -i --create-namespace \ + --namespace agentgateway-system \ + --version v1.3.1 agentgateway-crds oci://cr.agentgateway.dev/charts/agentgateway-crds + run_helm upgrade -i -n agentgateway-system agentgateway oci://cr.agentgateway.dev/charts/agentgateway \ + --version v1.3.1 + run_kubectl delete deployment -n agentgateway-system ate-egress --ignore-not-found >/dev/null 2>&1 || true + run_kubectl delete service -n agentgateway-system ate-egress --ignore-not-found >/dev/null 2>&1 || true + run_kubectl delete service -n agentgateway-system httpbin-egress --ignore-not-found >/dev/null 2>&1 || true + run_kubectl delete endpointslice -n agentgateway-system httpbin-egress --ignore-not-found >/dev/null 2>&1 || true + run_kubectl delete gateway -n agentgateway-system ate-egress --ignore-not-found >/dev/null 2>&1 || true + run_kubectl delete agentgatewaypolicy -n agentgateway-system ate-egress-connect --ignore-not-found >/dev/null 2>&1 || true + printf "%b" "$(cat </dev/null || true)" + if [[ -z "${deployments}" ]]; then + echo "No worker deployments found to restart for egress." + return + fi + + local ns name + while read -r ns name; do + if [[ -z "${ns}" || -z "${name}" ]]; then + continue + fi + run_kubectl rollout restart "deployment/${name}" -n "${ns}" + run_kubectl rollout status "deployment/${name}" -n "${ns}" --timeout=120s + done <<< "${deployments}" +} + create_valkey_ca_certs_secret() { log_step "create_valkey_ca_certs_secret" local ca_certs="" @@ -218,6 +448,11 @@ deploy_ate_system() { log_step "deploy_ate_system" ensure_crds + if [[ "${ATE_EGRESS_CAPTURE}" == "true" ]]; then + deploy_agentgateway + create_egress_capture_config + fi + # Enforce per-class SandboxConfig asset requirements (applied before any # SandboxConfig so the defaults below are validated too). run_kubectl apply -f manifests/ate-install/sandboxconfig-validation.yaml @@ -315,6 +550,19 @@ deploy_atelet() { run_kubectl rollout status daemonset/atelet -n ate-system --timeout=120s } +deploy_ate_controller() { + log_step "deploy_ate_controller" + if [[ "${ATE_EGRESS_CAPTURE}" == "true" ]]; then + deploy_agentgateway + create_egress_capture_config + fi + run_ko apply -f manifests/ate-install/ate-controller.yaml + run_kubectl rollout status deployment/ate-controller -n ate-system --timeout=120s + if [[ "${ATE_EGRESS_CAPTURE}" == "true" ]]; then + rollout_worker_deployments_for_egress + fi +} + deploy_atenet() { log_step "deploy_atenet" ensure_crds @@ -462,6 +710,9 @@ for arg in "$@"; do usage exit 0 ;; + --egress) + ATE_EGRESS_CAPTURE="true" + ;; esac done @@ -485,9 +736,11 @@ while [[ "$#" -gt 0 ]]; do --deploy-atelet) deploy_atelet ;; --deploy-ate-apiserver) deploy_ate_apiserver ;; + --deploy-ate-controller) deploy_ate_controller ;; --deploy-atenet) deploy_atenet ;; --delete-atenet) delete_atenet ;; + --egress) ;; --create-jwt-authority-pool-secret) create_jwt_authority_pool_secret ;; --create-session-id-ca-pool-secret) create_session_id_ca_pool_secret ;; diff --git a/internal/egresscapture/capture.go b/internal/egresscapture/capture.go new file mode 100644 index 000000000..c9673d8d2 --- /dev/null +++ b/internal/egresscapture/capture.go @@ -0,0 +1,621 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package egresscapture + +import ( + "context" + "crypto/tls" + "crypto/x509" + "encoding/binary" + "errors" + "fmt" + "io" + "log/slog" + "net" + "net/http" + "net/url" + "os" + "strconv" + "strings" + "sync" + "time" + + "golang.org/x/net/http2" +) + +type ActorIdentity struct { + Namespace string + Template string + ActorID string + // TODO: Include worker_uid once egress identity is modeled as a signed + // first-class Substrate identity rather than plain actor metadata headers. +} + +type Config struct { + PEPAddress string + Protocol string + TLS TLSConfig + Listeners []Listener +} + +type TLSConfig struct { + ServerName string + CAFile string + InsecureSkipVerify bool +} + +type Listener struct { + Port uint16 +} + +type OriginalDestinationFunc func(net.Conn) (net.Addr, error) + +type Capture struct { + cancel context.CancelFunc + listeners []net.Listener + wg sync.WaitGroup +} + +type TunnelTransport interface { + Open(ctx context.Context, identity ActorIdentity, originalDst net.Addr, authority string) (io.ReadWriteCloser, error) +} + +type TunnelTransportFactory func(Config) (TunnelTransport, error) + +var tunnelTransportFactories = map[string]TunnelTransportFactory{} + +func init() { + // Keep tunnel protocol support behind factories so additional transports + // such as HBONE can plug in without changing capture/listener logic. + RegisterTunnelTransport(TunnelProtocolConnect, newPlaintextCONNECTTunnelTransport) + RegisterTunnelTransport(TunnelProtocolPlaintext, newPlaintextCONNECTTunnelTransport) + RegisterTunnelTransport(TunnelProtocolH2C, newPlaintextCONNECTTunnelTransport) + RegisterTunnelTransport(TunnelProtocolConnectTLS, newTLSCONNECTTunnelTransport) + RegisterTunnelTransport(TunnelProtocolHTTPSConnect, newTLSCONNECTTunnelTransport) + RegisterTunnelTransport(TunnelProtocolTLSConnect, newTLSCONNECTTunnelTransport) +} + +func RegisterTunnelTransport(protocol string, factory TunnelTransportFactory) { + tunnelTransportFactories[strings.ToLower(strings.TrimSpace(protocol))] = factory +} + +func EnabledFromEnv() bool { + enabled, _ := strconv.ParseBool(os.Getenv(EnvCaptureEnabled)) + return enabled +} + +func ConfigFromEnv(listeners []Listener) (Config, error) { + pepAddress := strings.TrimSpace(os.Getenv(EnvPEPAddress)) + if pepAddress == "" { + return Config{}, fmt.Errorf("%s must be set when egress capture is enabled", EnvPEPAddress) + } + cfg := Config{ + PEPAddress: pepAddress, + Protocol: DefaultTunnelProtocol, + TLS: TLSConfig{ + ServerName: os.Getenv(EnvConnectTLSServerName), + CAFile: os.Getenv(EnvConnectTLSCAFile), + InsecureSkipVerify: boolEnv(EnvConnectTLSInsecureSkipVerify), + }, + Listeners: listeners, + } + if v := os.Getenv(EnvTunnelProtocol); v != "" { + cfg.Protocol = v + } + return cfg, nil +} + +func boolEnv(name string) bool { + enabled, _ := strconv.ParseBool(os.Getenv(name)) + return enabled +} + +func Start(ctx context.Context, identity ActorIdentity, cfg Config, originalDestination OriginalDestinationFunc) (*Capture, error) { + if originalDestination == nil { + return nil, errors.New("original destination resolver must be set") + } + transport, err := NewTunnelTransport(cfg) + if err != nil { + return nil, err + } + + ctx, cancel := newCaptureContext(ctx) + capture := &Capture{cancel: cancel} + for _, listenerCfg := range cfg.Listeners { + lis, err := net.Listen("tcp4", net.JoinHostPort("0.0.0.0", strconv.Itoa(int(listenerCfg.Port)))) + if err != nil { + capture.Close() + return nil, fmt.Errorf("while listening for captured egress on port %d: %w", listenerCfg.Port, err) + } + + capture.listeners = append(capture.listeners, lis) + capture.wg.Add(1) + go capture.serve(ctx, lis, identity, transport, originalDestination) + slog.InfoContext(ctx, "Started actor egress capture listener", + "port", listenerCfg.Port, + "pepAddress", cfg.PEPAddress, + "protocol", cfg.Protocol) + } + return capture, nil +} + +func newCaptureContext(ctx context.Context) (context.Context, context.CancelFunc) { + // The setup request context can be cancelled after the actor is running, but + // egress capture must keep serving until actor network cleanup closes it. + return context.WithCancel(context.WithoutCancel(ctx)) +} + +func (c *Capture) Close() error { + if c.cancel != nil { + c.cancel() + } + + var err error + for _, lis := range c.listeners { + if closeErr := lis.Close(); closeErr != nil && !errors.Is(closeErr, net.ErrClosed) { + err = errors.Join(err, closeErr) + } + } + c.wg.Wait() + return err +} + +func (c *Capture) serve(ctx context.Context, lis net.Listener, identity ActorIdentity, transport TunnelTransport, originalDestination OriginalDestinationFunc) { + defer c.wg.Done() + for { + conn, err := lis.Accept() + if err != nil { + if ctx.Err() != nil || errors.Is(err, net.ErrClosed) { + return + } + slog.WarnContext(ctx, "Failed to accept captured egress connection", "err", err) + continue + } + c.wg.Add(1) + go func() { + defer c.wg.Done() + handleCapturedEgress(ctx, conn, identity, transport, originalDestination) + }() + } +} + +func handleCapturedEgress(ctx context.Context, actorConn net.Conn, identity ActorIdentity, transport TunnelTransport, originalDestination OriginalDestinationFunc) { + stopActorClose := context.AfterFunc(ctx, func() { + _ = actorConn.Close() + }) + defer stopActorClose() + defer actorConn.Close() + + originalDst, err := originalDestination(actorConn) + if err != nil { + slog.WarnContext(ctx, "Failed to resolve captured egress original destination", "err", err) + return + } + + authority, initialBytes := deriveConnectAuthority(ctx, actorConn, originalDst) + tunnel, err := transport.Open(ctx, identity, originalDst, authority) + if err != nil { + slog.WarnContext(ctx, "Failed to open egress tunnel", + "originalDestination", originalDst.String(), + "connectAuthority", authority, + "err", err) + return + } + defer tunnel.Close() + + slog.InfoContext(ctx, "Proxying captured actor egress", + "actorID", identity.ActorID, + "actorTemplateNamespace", identity.Namespace, + "actorTemplateName", identity.Template, + "originalDestination", originalDst.String(), + "connectAuthority", authority) + + proxyByteStream(ctx, actorConn, tunnel, initialBytes) +} + +func proxyByteStream(ctx context.Context, actorConn net.Conn, tunnel io.ReadWriteCloser, initialBytes []byte) { + stopProxyClose := context.AfterFunc(ctx, func() { + _ = actorConn.Close() + _ = tunnel.Close() + }) + defer stopProxyClose() + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + if len(initialBytes) > 0 { + if _, err := tunnel.Write(initialBytes); err != nil { + _ = tunnel.Close() + return + } + } + _, _ = io.Copy(tunnel, actorConn) + _ = tunnel.Close() + }() + go func() { + defer wg.Done() + _, _ = io.Copy(actorConn, tunnel) + if tcpConn, ok := actorConn.(*net.TCPConn); ok { + _ = tcpConn.CloseWrite() + } + }() + wg.Wait() +} + +func NewTunnelTransport(cfg Config) (TunnelTransport, error) { + protocol := strings.ToLower(strings.TrimSpace(cfg.Protocol)) + if protocol == "" { + protocol = DefaultTunnelProtocol + } + factory, ok := tunnelTransportFactories[protocol] + if !ok { + return nil, fmt.Errorf("unsupported egress tunnel protocol %q", cfg.Protocol) + } + return factory(cfg) +} + +type PlaintextCONNECTTunnelTransport struct { + PEPAddress string +} + +func newPlaintextCONNECTTunnelTransport(cfg Config) (TunnelTransport, error) { + return &PlaintextCONNECTTunnelTransport{PEPAddress: cfg.PEPAddress}, nil +} + +func (t *PlaintextCONNECTTunnelTransport) Open(ctx context.Context, identity ActorIdentity, originalDst net.Addr, authority string) (io.ReadWriteCloser, error) { + req, pr, pw := newConnectRequest(ctx, "http", identity, originalDst, authority) + transport := &http2.Transport{ + AllowHTTP: true, + DialTLSContext: func(ctx context.Context, network, _ string, _ *tls.Config) (net.Conn, error) { + var dialer net.Dialer + return dialer.DialContext(ctx, network, t.PEPAddress) + }, + } + return roundTripConnect(transport, req, pr, pw, authority, t.PEPAddress) +} + +type TLSCONNECTTunnelTransport struct { + PEPAddress string + TLS TLSConfig +} + +func newTLSCONNECTTunnelTransport(cfg Config) (TunnelTransport, error) { + return &TLSCONNECTTunnelTransport{PEPAddress: cfg.PEPAddress, TLS: cfg.TLS}, nil +} + +func (t *TLSCONNECTTunnelTransport) Open(ctx context.Context, identity ActorIdentity, originalDst net.Addr, authority string) (io.ReadWriteCloser, error) { + req, pr, pw := newConnectRequest(ctx, "https", identity, originalDst, authority) + tlsConfig, err := t.tlsConfig() + if err != nil { + _ = pr.CloseWithError(err) + _ = pw.CloseWithError(err) + return nil, err + } + transport := &http2.Transport{ + DialTLSContext: func(ctx context.Context, network, _ string, _ *tls.Config) (net.Conn, error) { + var dialer net.Dialer + conn, err := dialer.DialContext(ctx, network, t.PEPAddress) + if err != nil { + return nil, err + } + tlsConn := tls.Client(conn, tlsConfig) + if err := tlsConn.HandshakeContext(ctx); err != nil { + _ = conn.Close() + return nil, err + } + return tlsConn, nil + }, + } + return roundTripConnect(transport, req, pr, pw, authority, t.PEPAddress) +} + +func (t *TLSCONNECTTunnelTransport) tlsConfig() (*tls.Config, error) { + cfg := &tls.Config{ + NextProtos: []string{"h2"}, + ServerName: t.TLS.ServerName, + InsecureSkipVerify: t.TLS.InsecureSkipVerify, + } + if t.TLS.CAFile == "" { + return cfg, nil + } + rootsPEM, err := os.ReadFile(t.TLS.CAFile) + if err != nil { + return nil, fmt.Errorf("while reading CONNECT TLS CA file %q: %w", t.TLS.CAFile, err) + } + roots := x509.NewCertPool() + if !roots.AppendCertsFromPEM(rootsPEM) { + return nil, fmt.Errorf("CONNECT TLS CA file %q contains no certificates", t.TLS.CAFile) + } + cfg.RootCAs = roots + return cfg, nil +} + +func deriveConnectAuthority(ctx context.Context, actorConn net.Conn, originalDst net.Addr) (string, []byte) { + if tcpAddr, ok := originalDst.(*net.TCPAddr); ok && tcpAddr.Port == 443 { + authority, initialBytes := deriveTLSConnectAuthority(ctx, actorConn, tcpAddr) + return authority, initialBytes + } + if tcpAddr, ok := originalDst.(*net.TCPAddr); ok && tcpAddr.Port == 80 { + authority, initialBytes := deriveHTTPConnectAuthority(ctx, actorConn, tcpAddr) + return authority, initialBytes + } + return originalDst.String(), nil +} + +func deriveTLSConnectAuthority(ctx context.Context, actorConn net.Conn, originalDst *net.TCPAddr) (string, []byte) { + const maxClientHelloBytes = 16 * 1024 + _ = actorConn.SetReadDeadline(time.Now().Add(2 * time.Second)) + defer actorConn.SetReadDeadline(time.Time{}) + + var initialBytes []byte + buf := make([]byte, 2048) + for len(initialBytes) < maxClientHelloBytes { + n, err := actorConn.Read(buf) + if n > 0 { + initialBytes = append(initialBytes, buf[:n]...) + if sni, ok, needMore := tlsClientHelloSNI(initialBytes); ok { + return net.JoinHostPort(sni, strconv.Itoa(originalDst.Port)), initialBytes + } else if !needMore { + break + } + } + if err != nil { + if ctx.Err() != nil { + return originalDst.String(), initialBytes + } + break + } + } + return originalDst.String(), initialBytes +} + +func deriveHTTPConnectAuthority(ctx context.Context, actorConn net.Conn, originalDst *net.TCPAddr) (string, []byte) { + const maxHeaderBytes = 16 * 1024 + _ = actorConn.SetReadDeadline(time.Now().Add(2 * time.Second)) + defer actorConn.SetReadDeadline(time.Time{}) + + var initialBytes []byte + buf := make([]byte, 2048) + for len(initialBytes) < maxHeaderBytes { + n, err := actorConn.Read(buf) + if n > 0 { + initialBytes = append(initialBytes, buf[:n]...) + if host, ok, needMore := httpHostHeader(initialBytes); ok { + return authorityWithDefaultPort(host, originalDst.Port), initialBytes + } else if !needMore { + break + } + } + if err != nil { + if ctx.Err() != nil { + return originalDst.String(), initialBytes + } + break + } + } + return originalDst.String(), initialBytes +} + +func httpHostHeader(data []byte) (string, bool, bool) { + headers := string(data) + headerEnd := strings.Index(headers, "\r\n\r\n") + separator := "\r\n" + if headerEnd == -1 { + headerEnd = strings.Index(headers, "\n\n") + separator = "\n" + } + if headerEnd == -1 { + return "", false, len(data) < 16*1024 + } + + lines := strings.Split(headers[:headerEnd], separator) + if len(lines) == 0 || !strings.Contains(lines[0], " ") { + return "", false, false + } + for _, line := range lines[1:] { + name, value, ok := strings.Cut(line, ":") + if !ok { + continue + } + if strings.EqualFold(strings.TrimSpace(name), "host") { + host := strings.TrimSpace(value) + return host, host != "", false + } + } + return "", false, false +} + +func authorityWithDefaultPort(host string, port int) string { + host = strings.TrimSpace(host) + if host == "" { + return "" + } + if _, _, err := net.SplitHostPort(host); err == nil { + return host + } + return net.JoinHostPort(strings.Trim(host, "[]"), strconv.Itoa(port)) +} + +func tlsClientHelloSNI(data []byte) (string, bool, bool) { + if len(data) < 5 { + return "", false, true + } + if data[0] != 0x16 { + return "", false, false + } + recordLen := int(binary.BigEndian.Uint16(data[3:5])) + if len(data) < 5+recordLen { + return "", false, true + } + + record := data[5 : 5+recordLen] + if len(record) < 4 || record[0] != 0x01 { + return "", false, false + } + handshakeLen := int(record[1])<<16 | int(record[2])<<8 | int(record[3]) + if len(record) < 4+handshakeLen { + return "", false, false + } + clientHello := record[4 : 4+handshakeLen] + if len(clientHello) < 34 { + return "", false, false + } + + offset := 34 + if len(clientHello) < offset+1 { + return "", false, false + } + sessionIDLen := int(clientHello[offset]) + offset++ + if len(clientHello) < offset+sessionIDLen+2 { + return "", false, false + } + offset += sessionIDLen + + cipherSuitesLen := int(binary.BigEndian.Uint16(clientHello[offset : offset+2])) + offset += 2 + if len(clientHello) < offset+cipherSuitesLen+1 { + return "", false, false + } + offset += cipherSuitesLen + + compressionMethodsLen := int(clientHello[offset]) + offset++ + if len(clientHello) < offset+compressionMethodsLen+2 { + return "", false, false + } + offset += compressionMethodsLen + + extensionsLen := int(binary.BigEndian.Uint16(clientHello[offset : offset+2])) + offset += 2 + if len(clientHello) < offset+extensionsLen { + return "", false, false + } + extensions := clientHello[offset : offset+extensionsLen] + for len(extensions) >= 4 { + extensionType := binary.BigEndian.Uint16(extensions[0:2]) + extensionLen := int(binary.BigEndian.Uint16(extensions[2:4])) + extensions = extensions[4:] + if len(extensions) < extensionLen { + return "", false, false + } + extensionData := extensions[:extensionLen] + extensions = extensions[extensionLen:] + if extensionType != 0 { + continue + } + if len(extensionData) < 2 { + return "", false, false + } + serverNameListLen := int(binary.BigEndian.Uint16(extensionData[0:2])) + if len(extensionData) < 2+serverNameListLen { + return "", false, false + } + serverNames := extensionData[2 : 2+serverNameListLen] + for len(serverNames) >= 3 { + nameType := serverNames[0] + nameLen := int(binary.BigEndian.Uint16(serverNames[1:3])) + serverNames = serverNames[3:] + if len(serverNames) < nameLen { + return "", false, false + } + name := string(serverNames[:nameLen]) + serverNames = serverNames[nameLen:] + if nameType == 0 && name != "" { + return name, true, false + } + } + return "", false, false + } + return "", false, false +} + +func newConnectRequest(ctx context.Context, scheme string, identity ActorIdentity, originalDst net.Addr, authority string) (*http.Request, *io.PipeReader, *io.PipeWriter) { + pr, pw := io.Pipe() + req := &http.Request{ + Method: http.MethodConnect, + URL: &url.URL{Scheme: scheme, Host: authority}, + Host: authority, + Header: make(http.Header), + Body: pr, + ContentLength: -1, + } + req = req.WithContext(ctx) + // TODO: Replace these plain identity headers with a signed short-lived actor + // identity token for the PEP. The signed claims should include sub, aud, exp, + // iat, worker_uid, and the original destination so policy is evaluated over + // verified request identity rather than unsigned metadata. + req.Header.Set("x-ate-actor-id", identity.ActorID) + req.Header.Set("x-ate-actor-template", identity.Template) + req.Header.Set("x-ate-actor-template-namespace", identity.Namespace) + req.Header.Set("x-ate-original-destination", originalDst.String()) + if authority != originalDst.String() { + req.Header.Set("x-ate-connect-authority", authority) + } + return req, pr, pw +} + +func roundTripConnect( + transport *http2.Transport, + req *http.Request, + pr *io.PipeReader, + pw *io.PipeWriter, + connectAuthority string, + pepAddress string, +) (io.ReadWriteCloser, error) { + resp, err := transport.RoundTrip(req) + if err != nil { + _ = pr.CloseWithError(err) + _ = pw.CloseWithError(err) + transport.CloseIdleConnections() + return nil, err + } + if resp.StatusCode < 200 || resp.StatusCode > 299 { + _ = resp.Body.Close() + err := fmt.Errorf("CONNECT to %s through %s returned %s", connectAuthority, pepAddress, resp.Status) + _ = pr.CloseWithError(err) + _ = pw.CloseWithError(err) + transport.CloseIdleConnections() + return nil, err + } + return &hboneStream{ + requestWriter: pw, + responseBody: resp.Body, + closeIdle: transport.CloseIdleConnections, + }, nil +} + +type hboneStream struct { + requestWriter *io.PipeWriter + responseBody io.ReadCloser + closeIdle func() +} + +func (s *hboneStream) Read(p []byte) (int, error) { + return s.responseBody.Read(p) +} + +func (s *hboneStream) Write(p []byte) (int, error) { + return s.requestWriter.Write(p) +} + +func (s *hboneStream) Close() error { + err := errors.Join(s.requestWriter.Close(), s.responseBody.Close()) + if s.closeIdle != nil { + s.closeIdle() + } + return err +} diff --git a/internal/egresscapture/capture_test.go b/internal/egresscapture/capture_test.go new file mode 100644 index 000000000..3ede130da --- /dev/null +++ b/internal/egresscapture/capture_test.go @@ -0,0 +1,265 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package egresscapture + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net" + "strings" + "testing" + "time" +) + +type contextKey string + +func TestNewCaptureContextIgnoresParentCancellation(t *testing.T) { + parent, parentCancel := context.WithCancel(context.WithValue(context.Background(), contextKey("trace"), "value")) + parentCancel() + + ctx, cancel := newCaptureContext(parent) + defer cancel() + + if err := ctx.Err(); err != nil { + t.Fatalf("capture context is cancelled by parent: %v", err) + } + if got := ctx.Value(contextKey("trace")); got != "value" { + t.Fatalf("capture context did not preserve values: got %v", got) + } + + cancel() + if err := ctx.Err(); err == nil { + t.Fatal("capture context was not cancelled by its own cancel func") + } +} + +func TestConfigFromEnvRequiresPEPAddress(t *testing.T) { + t.Setenv(EnvPEPAddress, "") + + _, err := ConfigFromEnv(nil) + if err == nil { + t.Fatal("ConfigFromEnv() returned nil error, want error") + } + if !strings.Contains(err.Error(), EnvPEPAddress) { + t.Fatalf("ConfigFromEnv() error = %v, want %s", err, EnvPEPAddress) + } +} + +func TestConfigFromEnv(t *testing.T) { + t.Setenv(EnvPEPAddress, "ate-egress.example:15008") + t.Setenv(EnvTunnelProtocol, TunnelProtocolConnectTLS) + t.Setenv(EnvConnectTLSServerName, "ate-egress.example") + t.Setenv(EnvConnectTLSCAFile, "/run/egress-ca/ca.crt") + t.Setenv(EnvConnectTLSInsecureSkipVerify, "true") + + listeners := []Listener{{Port: 15001}} + cfg, err := ConfigFromEnv(listeners) + if err != nil { + t.Fatalf("ConfigFromEnv() returned error: %v", err) + } + if cfg.PEPAddress != "ate-egress.example:15008" { + t.Fatalf("cfg.PEPAddress = %q, want ate-egress.example:15008", cfg.PEPAddress) + } + if cfg.Protocol != TunnelProtocolConnectTLS { + t.Fatalf("cfg.Protocol = %q, want %s", cfg.Protocol, TunnelProtocolConnectTLS) + } + if cfg.TLS.ServerName != "ate-egress.example" { + t.Fatalf("cfg.TLS.ServerName = %q, want ate-egress.example", cfg.TLS.ServerName) + } + if cfg.TLS.CAFile != "/run/egress-ca/ca.crt" { + t.Fatalf("cfg.TLS.CAFile = %q, want /run/egress-ca/ca.crt", cfg.TLS.CAFile) + } + if !cfg.TLS.InsecureSkipVerify { + t.Fatal("cfg.TLS.InsecureSkipVerify = false, want true") + } + if len(cfg.Listeners) != 1 || cfg.Listeners[0].Port != 15001 { + t.Fatalf("cfg.Listeners = %+v, want port 15001", cfg.Listeners) + } +} + +func TestNewTunnelTransportUsesRegisteredFactories(t *testing.T) { + for _, tc := range []struct { + name string + protocol string + wantType any + }{ + { + name: "default connect", + protocol: "", + wantType: &PlaintextCONNECTTunnelTransport{}, + }, + { + name: "plaintext alias", + protocol: TunnelProtocolPlaintext, + wantType: &PlaintextCONNECTTunnelTransport{}, + }, + { + name: "tls connect alias", + protocol: TunnelProtocolTLSConnect, + wantType: &TLSCONNECTTunnelTransport{}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + got, err := NewTunnelTransport(Config{PEPAddress: "ate-egress.example:15008", Protocol: tc.protocol}) + if err != nil { + t.Fatalf("NewTunnelTransport() returned error: %v", err) + } + if fmt.Sprintf("%T", got) != fmt.Sprintf("%T", tc.wantType) { + t.Fatalf("NewTunnelTransport() = %T, want %T", got, tc.wantType) + } + }) + } + + if _, err := NewTunnelTransport(Config{Protocol: "does-not-exist"}); err == nil { + t.Fatal("NewTunnelTransport() returned nil error for unsupported protocol") + } +} + +func TestNewConnectRequestUsesConfiguredAuthority(t *testing.T) { + originalDst := &net.TCPAddr{IP: net.ParseIP("203.0.113.10"), Port: 443} + req, pr, pw := newConnectRequest(context.Background(), "http", ActorIdentity{ + Namespace: "default", + Template: "counter", + ActorID: "my-counter-1", + }, originalDst, "httpbin.org:443") + defer pr.Close() + defer pw.Close() + + if req.Host != "httpbin.org:443" { + t.Fatalf("req.Host = %q, want httpbin.org:443", req.Host) + } + if req.URL.Host != "httpbin.org:443" { + t.Fatalf("req.URL.Host = %q, want httpbin.org:443", req.URL.Host) + } + if got := req.Header.Get("x-ate-original-destination"); got != originalDst.String() { + t.Fatalf("x-ate-original-destination = %q, want %q", got, originalDst.String()) + } + if got := req.Header.Get("x-ate-connect-authority"); got != "httpbin.org:443" { + t.Fatalf("x-ate-connect-authority = %q, want httpbin.org:443", got) + } +} + +func TestDeriveConnectAuthorityFromTLSClientHelloSNI(t *testing.T) { + clientConn, serverConn := net.Pipe() + defer clientConn.Close() + defer serverConn.Close() + + errCh := make(chan error, 1) + go func() { + tlsConn := tls.Client(clientConn, &tls.Config{ + ServerName: "httpbin.org", + InsecureSkipVerify: true, + }) + errCh <- tlsConn.Handshake() + }() + + originalDst := &net.TCPAddr{IP: net.ParseIP("203.0.113.10"), Port: 443} + authority, initialBytes := deriveConnectAuthority(context.Background(), serverConn, originalDst) + if authority != "httpbin.org:443" { + t.Fatalf("deriveConnectAuthority() authority = %q, want httpbin.org:443", authority) + } + if len(initialBytes) == 0 { + t.Fatal("deriveConnectAuthority() returned no initial bytes") + } + if _, ok, _ := tlsClientHelloSNI(initialBytes); !ok { + t.Fatal("initial bytes do not contain a parseable TLS ClientHello SNI") + } + + _ = clientConn.Close() + if err := <-errCh; err == nil { + t.Fatal("TLS handshake unexpectedly succeeded") + } else if err != io.ErrClosedPipe && !strings.Contains(err.Error(), "closed") { + t.Fatalf("TLS handshake error = %v, want closed connection", err) + } +} + +func TestDeriveConnectAuthorityFromHTTPHost(t *testing.T) { + clientConn, serverConn := net.Pipe() + defer clientConn.Close() + defer serverConn.Close() + + errCh := make(chan error, 1) + go func() { + _, err := clientConn.Write([]byte("GET /get HTTP/1.1\r\nHost: httpbin.org\r\nUser-Agent: test\r\n\r\n")) + errCh <- err + }() + + originalDst := &net.TCPAddr{IP: net.ParseIP("203.0.113.10"), Port: 80} + authority, initialBytes := deriveConnectAuthority(context.Background(), serverConn, originalDst) + if authority != "httpbin.org:80" { + t.Fatalf("deriveConnectAuthority() authority = %q, want httpbin.org:80", authority) + } + if string(initialBytes) != "GET /get HTTP/1.1\r\nHost: httpbin.org\r\nUser-Agent: test\r\n\r\n" { + t.Fatalf("initial bytes = %q", string(initialBytes)) + } + if err := <-errCh; err != nil { + t.Fatalf("client write returned error: %v", err) + } +} + +func TestProxyByteStreamStopsWhenContextCancelled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + actorConn, actorPeer := net.Pipe() + defer actorPeer.Close() + tunnelConn, tunnelPeer := net.Pipe() + defer tunnelPeer.Close() + + done := make(chan struct{}) + go func() { + proxyByteStream(ctx, actorConn, tunnelConn, nil) + close(done) + }() + + cancel() + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("proxyByteStream did not stop after context cancellation") + } +} + +func TestHBONEStreamCloseClosesIdleTransportConnections(t *testing.T) { + pr, pw := io.Pipe() + defer pr.Close() + + called := false + stream := &hboneStream{ + requestWriter: pw, + responseBody: io.NopCloser(strings.NewReader("")), + closeIdle: func() { + called = true + }, + } + + if err := stream.Close(); err != nil { + t.Fatalf("stream.Close() returned error: %v", err) + } + if !called { + t.Fatal("stream.Close() did not close idle transport connections") + } +} + +func TestHTTPHostHeaderWithPort(t *testing.T) { + host, ok, needMore := httpHostHeader([]byte("GET / HTTP/1.1\r\nHost: example.com:8080\r\n\r\n")) + if !ok || needMore { + t.Fatalf("httpHostHeader() ok=%t needMore=%t, want ok=true needMore=false", ok, needMore) + } + if got := authorityWithDefaultPort(host, 80); got != "example.com:8080" { + t.Fatalf("authorityWithDefaultPort() = %q, want example.com:8080", got) + } +} diff --git a/internal/egresscapture/env.go b/internal/egresscapture/env.go new file mode 100644 index 000000000..211e81092 --- /dev/null +++ b/internal/egresscapture/env.go @@ -0,0 +1,38 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package egresscapture + +const ( + EnvCaptureEnabled = "ATE_EGRESS_CAPTURE_ENABLED" + EnvPEPAddress = "ATE_EGRESS_PEP_ADDRESS" + EnvTunnelProtocol = "ATE_EGRESS_TUNNEL_PROTOCOL" + EnvConnectTLSServerName = "ATE_EGRESS_CONNECT_TLS_SERVER_NAME" + EnvConnectTLSCAFile = "ATE_EGRESS_CONNECT_TLS_CA_FILE" + EnvConnectTLSInsecureSkipVerify = "ATE_EGRESS_CONNECT_TLS_INSECURE_SKIP_VERIFY" + DefaultTunnelProtocol = TunnelProtocolConnect + TunnelProtocolConnect = "connect" + TunnelProtocolPlaintext = "plaintext" + TunnelProtocolH2C = "h2c" + TunnelProtocolConnectTLS = "connect-tls" + TunnelProtocolHTTPSConnect = "https-connect" + TunnelProtocolTLSConnect = "tls-connect" + FutureTunnelProtocolHBONE = "hbone" +) + +var OptionalEnvNames = []string{ + EnvConnectTLSServerName, + EnvConnectTLSCAFile, + EnvConnectTLSInsecureSkipVerify, +} diff --git a/manifests/ate-install/ate-controller.yaml b/manifests/ate-install/ate-controller.yaml index 1f2756a27..7ced83a32 100644 --- a/manifests/ate-install/ate-controller.yaml +++ b/manifests/ate-install/ate-controller.yaml @@ -79,6 +79,10 @@ spec: containers: - name: ate-controller image: ko://github.com/agent-substrate/substrate/cmd/atecontroller + envFrom: + - configMapRef: + name: ate-egress-capture + optional: true ports: - name: metrics containerPort: 8080