Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion cmd/atecontroller/internal/controllers/workerpool_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
package controllers

import (
"os"
"strconv"

corev1 "k8s.io/api/core/v1"
appsv1ac "k8s.io/client-go/applyconfigurations/apps/v1"
corev1ac "k8s.io/client-go/applyconfigurations/core/v1"
metav1ac "k8s.io/client-go/applyconfigurations/meta/v1"

"github.com/agent-substrate/substrate/internal/ateompath"
"github.com/agent-substrate/substrate/internal/egresscapture"
atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1"
)

Expand All @@ -44,7 +48,9 @@ func buildDeploymentApplyConfig(wp *atev1alpha1.WorkerPool) *appsv1ac.Deployment
WithValueFrom(corev1ac.EnvVarSource().
WithFieldRef(corev1ac.ObjectFieldSelector().
WithFieldPath("metadata.uid"))),
).
)
containerAC.WithEnv(egressCaptureEnvFromController()...)
containerAC.
WithVolumeMounts(corev1ac.VolumeMount().
WithName("run-ateom").
WithMountPath(ateompath.BasePath))
Expand Down Expand Up @@ -82,6 +88,37 @@ func buildDeploymentApplyConfig(wp *atev1alpha1.WorkerPool) *appsv1ac.Deployment
WithSpec(podSpecAC)))
}

func egressCaptureEnvFromController() []*corev1ac.EnvVarApplyConfiguration {
enabled, _ := strconv.ParseBool(os.Getenv(egresscapture.EnvCaptureEnabled))
if !enabled {
return nil
}

env := []*corev1ac.EnvVarApplyConfiguration{
corev1ac.EnvVar().
WithName(egresscapture.EnvCaptureEnabled).
WithValue("true"),
}
if v := os.Getenv(egresscapture.EnvPEPAddress); v != "" {
env = append(env, corev1ac.EnvVar().
WithName(egresscapture.EnvPEPAddress).
WithValue(v))
}
if v := os.Getenv(egresscapture.EnvTunnelProtocol); v != "" {
env = append(env, corev1ac.EnvVar().
WithName(egresscapture.EnvTunnelProtocol).
WithValue(v))
}
for _, name := range egresscapture.OptionalEnvNames {
if v := os.Getenv(name); v != "" {
env = append(env, corev1ac.EnvVar().
WithName(name).
WithValue(v))
}
}
return env
}

// maybeApplyMicroVMPodShape adds the /dev/kvm device and node placement a
// micro-VM (kata + cloud-hypervisor) worker pool needs, on top of any
// pod-template settings. No-op unless sandboxClass is the micro-VM class.
Expand Down
38 changes: 38 additions & 0 deletions cmd/atecontroller/internal/controllers/workerpool_apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
metav1ac "k8s.io/client-go/applyconfigurations/meta/v1"

"github.com/agent-substrate/substrate/internal/ateompath"
"github.com/agent-substrate/substrate/internal/egresscapture"
atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1"
)

Expand Down Expand Up @@ -261,6 +262,43 @@ func TestMicroVMPodShape(t *testing.T) {
}
}

func TestWorkerPoolEgressCaptureEnvPropagation(t *testing.T) {
t.Setenv(egresscapture.EnvCaptureEnabled, "1")
t.Setenv(egresscapture.EnvPEPAddress, "ate-egress.agentgateway-system.svc.cluster.local:15008")
t.Setenv(egresscapture.EnvTunnelProtocol, egresscapture.TunnelProtocolConnectTLS)
t.Setenv(egresscapture.EnvConnectTLSServerName, "ate-egress.agentgateway-system.svc.cluster.local")
t.Setenv(egresscapture.EnvConnectTLSCAFile, "/run/egress-ca/ca.crt")
t.Setenv(egresscapture.EnvConnectTLSInsecureSkipVerify, "true")

wp := testWorkerPoolApplyConfig(nil)
deployment := buildDeploymentApplyConfig(wp)
containers := deployment.Spec.Template.Spec.Containers
if len(containers) != 1 {
t.Fatalf("containers length = %d, want 1", len(containers))
}

got := map[string]string{}
for _, env := range containers[0].Env {
if env.Name != nil && env.Value != nil {
got[*env.Name] = *env.Value
}
}

want := map[string]string{
egresscapture.EnvCaptureEnabled: "true",
egresscapture.EnvPEPAddress: "ate-egress.agentgateway-system.svc.cluster.local:15008",
egresscapture.EnvTunnelProtocol: egresscapture.TunnelProtocolConnectTLS,
egresscapture.EnvConnectTLSServerName: "ate-egress.agentgateway-system.svc.cluster.local",
egresscapture.EnvConnectTLSCAFile: "/run/egress-ca/ca.crt",
egresscapture.EnvConnectTLSInsecureSkipVerify: "true",
}
for name, value := range want {
if got[name] != value {
t.Errorf("env %s = %q, want %q", name, got[name], value)
}
}
}

func testWorkerPoolApplyConfig(tmpl *atev1alpha1.WorkerPoolPodTemplate) *atev1alpha1.WorkerPool {
return &atev1alpha1.WorkerPool{
ObjectMeta: metav1.ObjectMeta{Name: "pool", Namespace: "default", UID: "uid"},
Expand Down
140 changes: 140 additions & 0 deletions cmd/ateom-gvisor/egress_capture.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
//go:build linux

// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"encoding/binary"
"fmt"
"net"
"syscall"
"unsafe"

"github.com/agent-substrate/substrate/internal/egresscapture"
"github.com/google/nftables"
"github.com/google/nftables/binaryutil"
"github.com/google/nftables/expr"
"golang.org/x/sys/unix"
)

const (
egressCaptureHTTPPort = uint16(15001)
egressCaptureHTTPSPort = uint16(15002)
Comment on lines +35 to +36

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to capture HTTP vs HTTPS separately? A single listener could handle both and since SO_ORIGINAL_DST is used anyway to lookup the original port in deriveConnectAuthority.

egressOriginalHTTPPort = uint16(80)
egressOriginalHTTPSPort = uint16(443)
)

var defaultEgressCaptureRedirects = []struct {
originalPort uint16
capturePort uint16
}{
{originalPort: egressOriginalHTTPPort, capturePort: egressCaptureHTTPPort},
{originalPort: egressOriginalHTTPSPort, capturePort: egressCaptureHTTPSPort},
}

var defaultEgressCaptureListeners = []egresscapture.Listener{
{Port: egressCaptureHTTPPort},
{Port: egressCaptureHTTPSPort},
}

func (s *AteomService) startEgressCaptureIfEnabled(ctx context.Context, identity egresscapture.ActorIdentity) error {
if !egresscapture.EnabledFromEnv() {
return nil
}
cfg, err := egresscapture.ConfigFromEnv(defaultEgressCaptureListeners)
if err != nil {
return err
}
capture, err := egresscapture.Start(ctx, identity, cfg, originalDestination)
if err != nil {
return fmt.Errorf("while starting actor egress capture: %w", err)
}
s.egressCapture = capture
return nil
}

func addEgressCaptureRedirectRules(c *nftables.Conn, table *nftables.Table, prerouting *nftables.Chain, sourceIP string) {
for _, redirect := range defaultEgressCaptureRedirects {
c.AddRule(&nftables.Rule{
Table: table,
Chain: prerouting,
Exprs: tcpRedirectExprs(sourceIP, redirect.originalPort, redirect.capturePort),
})
}
}

func tcpRedirectExprs(sourceIP string, originalPort, capturePort uint16) []expr.Any {
exprs := append(ipSourceEqual(sourceIP), tcpDestinationPortEqual(originalPort)...)
exprs = append(exprs,
&expr.Immediate{
Register: 1,
Data: binaryutil.BigEndian.PutUint16(capturePort),
},
&expr.Redir{
RegisterProtoMin: 1,
},
)
return exprs
}

func originalDestination(conn net.Conn) (net.Addr, error) {
tcpConn, ok := conn.(*net.TCPConn)
if !ok {
return nil, fmt.Errorf("captured connection is %T, not *net.TCPConn", conn)
}

rawConn, err := tcpConn.SyscallConn()
if err != nil {
return nil, err
}

var addr *net.TCPAddr
var controlErr error
if err := rawConn.Control(func(fd uintptr) {
addr, controlErr = originalDstFromFD(int(fd))
}); err != nil {
return nil, err
}
if controlErr != nil {
return nil, controlErr
}
return addr, nil
}

func originalDstFromFD(fd int) (*net.TCPAddr, error) {
var raw unix.RawSockaddrInet4
size := uint32(unsafe.Sizeof(raw))
_, _, errno := unix.Syscall6(
unix.SYS_GETSOCKOPT,
uintptr(fd),
uintptr(unix.SOL_IP),
uintptr(unix.SO_ORIGINAL_DST),
uintptr(unsafe.Pointer(&raw)),
uintptr(unsafe.Pointer(&size)),
0,
)
if errno != 0 {
return nil, errno
}
if raw.Family != syscall.AF_INET {
return nil, fmt.Errorf("SO_ORIGINAL_DST returned address family %d", raw.Family)
}
return &net.TCPAddr{
IP: net.IPv4(raw.Addr[0], raw.Addr[1], raw.Addr[2], raw.Addr[3]),
Port: int(binary.BigEndian.Uint16((*[2]byte)(unsafe.Pointer(&raw.Port))[:])),
}, nil
}
45 changes: 40 additions & 5 deletions cmd/ateom-gvisor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/agent-substrate/substrate/internal/ateinterceptors"
"github.com/agent-substrate/substrate/internal/ateompath"
"github.com/agent-substrate/substrate/internal/contextlogging"
"github.com/agent-substrate/substrate/internal/egresscapture"
"github.com/agent-substrate/substrate/internal/proto/ateompb"
"github.com/agent-substrate/substrate/internal/serverboot"
"github.com/agent-substrate/substrate/internal/version"
Expand Down Expand Up @@ -163,6 +164,7 @@ type AteomService struct {

interiorNetNS netns.NsHandle
actorLogger *actorlog.ActorLogger
egressCapture *egresscapture.Capture
}

var _ ateompb.AteomServer = (*AteomService)(nil)
Expand All @@ -187,7 +189,7 @@ func (s *AteomService) RunWorkload(ctx context.Context, req *ateompb.RunWorkload
// * Correct runsc version is downloaded and placed on disk.
// * All OCI bundles are set up, including for "pause" container.

if err := s.setupActorNetwork(ctx); err != nil {
if err := s.setupActorNetwork(ctx, actorIdentityFromRun(req)); err != nil {
return nil, fmt.Errorf("while setting up actor network: %w", err)
}
defer func() {
Expand Down Expand Up @@ -341,7 +343,7 @@ func (s *AteomService) RestoreWorkload(ctx context.Context, req *ateompb.Restore
// * All OCI bundles are set up, including for "pause" container.
// * Checkpoint downloaded and placed on disk

if err := s.setupActorNetwork(ctx); err != nil {
if err := s.setupActorNetwork(ctx, actorIdentityFromRestore(req)); err != nil {
return nil, fmt.Errorf("while setting up actor network: %w", err)
}
defer func() {
Expand Down Expand Up @@ -388,7 +390,7 @@ func (s *AteomService) RestoreWorkload(ctx context.Context, req *ateompb.Restore
return &ateompb.RestoreWorkloadResponse{}, nil
}

func (s *AteomService) setupActorNetwork(ctx context.Context) (retErr error) {
func (s *AteomService) setupActorNetwork(ctx context.Context, identity egresscapture.ActorIdentity) (retErr error) {
// Build a fresh point-to-point network between the worker pod netns and the
// gVisor interior netns. The worker side keeps the pod's real eth0, creates
// ateom0 as the gateway, and moves only the veth peer into the actor netns.
Expand Down Expand Up @@ -456,7 +458,11 @@ func (s *AteomService) setupActorNetwork(ctx context.Context) (retErr error) {
if err := enableIPv4Forwarding(); err != nil {
return err
}
if err := installActorNftablesRules(podIP); err != nil {
if err := s.startEgressCaptureIfEnabled(ctx, identity); err != nil {
return err
}

if err := installActorNftablesRules(podIP, s.egressCapture != nil); err != nil {
return err
}

Expand Down Expand Up @@ -540,6 +546,12 @@ func (s *AteomService) cleanupActorNetwork(ctx context.Context) error {
if err := removeActorNftablesRules(); err != nil {
return err
}
if s.egressCapture != nil {
if err := s.egressCapture.Close(); err != nil {
slog.WarnContext(ctx, "Failed to close actor egress capture", "err", err)
}
s.egressCapture = nil
}

var cleanupErr error
if link, err := netlink.LinkByName(hostVethName); err == nil {
Expand Down Expand Up @@ -621,7 +633,7 @@ func enableIPv4Forwarding() error {
return nil
}

func installActorNftablesRules(podIP net.IP) error {
func installActorNftablesRules(podIP net.IP, egressCapture bool) error {
// Install a dedicated nftables table for the active actor. Keeping all
// rules in an ateom-owned table makes cleanup simple and avoids mutating
// Kubernetes or CNI-managed chains directly.
Expand Down Expand Up @@ -659,6 +671,13 @@ func installActorNftablesRules(podIP net.IP) error {
Hooknum: nftables.ChainHookPrerouting,
Priority: nftables.ChainPriorityNATDest,
})
if egressCapture {
addEgressCaptureRedirectRules(c, table, prerouting, actorVethIP)
}
// TODO: Support optional DNS capture for hostname recovery for non-SNI,
// non-HTTP, or DNS-policy egress. The current HTTP/HTTPS path derives
// authority from TLS SNI or HTTP Host, so redirecting UDP/TCP 53 would
// add potential DNS proxy/cache/TTL/search-domain failures
// TODO: Support inbound UDP DNAT for actors that expose UDP protocols such
// as QUIC.
// TODO: Replace the hard-coded HTTP port with the actor's configured
Expand Down Expand Up @@ -790,6 +809,22 @@ func tcpDestinationPortEqual(port uint16) []expr.Any {
}
}

func actorIdentityFromRun(req *ateompb.RunWorkloadRequest) egresscapture.ActorIdentity {
return egresscapture.ActorIdentity{
Namespace: req.GetActorTemplateNamespace(),
Template: req.GetActorTemplateName(),
ActorID: req.GetActorId(),
}
}

func actorIdentityFromRestore(req *ateompb.RestoreWorkloadRequest) egresscapture.ActorIdentity {
return egresscapture.ActorIdentity{
Namespace: req.GetActorTemplateNamespace(),
Template: req.GetActorTemplateName(),
ActorID: req.GetActorId(),
}
}

func createNetNSWithoutSwitching(ctx context.Context, name string) (netns.NsHandle, error) {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
Expand Down
Loading
Loading