diff --git a/cmd/kubectl-ate/README.md b/cmd/kubectl-ate/README.md index b45679ae2..eba292517 100644 --- a/cmd/kubectl-ate/README.md +++ b/cmd/kubectl-ate/README.md @@ -131,11 +131,19 @@ kubectl ate delete actor my-actor `kubectl ate logs` requires a resource-type subcommand; running `kubectl ate logs ` on its own prints help. The only supported resource type is `actors`: ```bash -# Stream logs for an actor (follows by default; aggregated across worker -# reassignments so the same actor is queryable as it teleports between pods). +# Stream logs for an actor (aggregated across worker reassignments so the same +# actor is queryable as it teleports between pods). kubectl ate logs actors my-actor + +# Show logs from a single container within the actor (-c for short). +kubectl ate logs actors my-actor --container my-container + +# Show only the ateom supervisor (lifecycle) logs, e.g. "Actor started". +kubectl ate logs actors my-actor --supervisor ``` +By default all of the actor's logs are shown. Use `-c`/`--container ` to restrict output to a single container, or `--supervisor` to show only ateom's lifecycle logs. The two flags are mutually exclusive. + Logs are streamable only while the actor is bound to a worker (i.e., `STATUS_RUNNING`). For history across worker migrations, route through a centralized log backend (Cloud Logging, Loki, etc.); see `docs/observability.md`. ### Administration & Setup diff --git a/cmd/kubectl-ate/internal/cmd/logs_actors.go b/cmd/kubectl-ate/internal/cmd/logs_actors.go index 2b8187870..93c1cb1a9 100644 --- a/cmd/kubectl-ate/internal/cmd/logs_actors.go +++ b/cmd/kubectl-ate/internal/cmd/logs_actors.go @@ -38,7 +38,11 @@ import ( "k8s.io/client-go/kubernetes" ) -var followLogs bool +var ( + followLogs bool + containerFlag string + supervisorFlag bool +) var logsActorsCmd = &cobra.Command{ Use: "actors ", @@ -50,6 +54,8 @@ var logsActorsCmd = &cobra.Command{ func init() { logsActorsCmd.Flags().BoolVarP(&followLogs, "follow", "f", false, "Specify if the logs should be streamed.") + logsActorsCmd.Flags().StringVarP(&containerFlag, "container", "c", "", "Show logs only from the named container within the actor. Mutually exclusive with --supervisor.") + logsActorsCmd.Flags().BoolVar(&supervisorFlag, "supervisor", false, "Show only the ateom supervisor (lifecycle) logs. Mutually exclusive with --container.") logsCmd.AddCommand(logsActorsCmd) } @@ -80,6 +86,8 @@ type LogsActorRunner struct { stdout io.Writer stderr io.Writer follow bool + container string + supervisor bool pollInterval time.Duration reconnectInterval time.Duration tickerInterval time.Duration @@ -98,6 +106,9 @@ func (r *LogsActorRunner) Run(ctx context.Context, actorID string) error { } defer r.apiClient.Close() + if err := validateLogFilterFlags(r.container, r.supervisor); err != nil { + return err + } if r.follow { return r.runFollow(ctx, actorID) } @@ -128,12 +139,13 @@ func (r *LogsActorRunner) runOneShot(ctx context.Context, actorID string) error } defer stream.Close() + filter := logLineFilter{actorID: actorID, container: r.container, supervisor: r.supervisor} scanner := bufio.NewScanner(stream) buf := make([]byte, 0, 64*1024) scanner.Buffer(buf, 1024*1024) // Support up to 1MB lines for scanner.Scan() { line := scanner.Text() - filterAndDisplayLogLine(line, actorID, r.stdout) + filterAndDisplayLogLine(line, filter, r.stdout) } if err := scanner.Err(); err != nil { return fmt.Errorf("error reading log stream: %w", err) @@ -206,12 +218,13 @@ func (r *LogsActorRunner) runFollow(ctx context.Context, actorID string) error { var wg sync.WaitGroup r.startMigrationMonitor(streamCtx, streamCancel, &wg, actorID, podName) + filter := logLineFilter{actorID: actorID, container: r.container, supervisor: r.supervisor} scanner := bufio.NewScanner(stream) buf := make([]byte, 0, 64*1024) scanner.Buffer(buf, 1024*1024) // Support up to 1MB lines for scanner.Scan() { line := scanner.Text() - logTime, _ := filterAndDisplayLogLine(line, actorID, r.stdout) + logTime, _ := filterAndDisplayLogLine(line, filter, r.stdout) if !logTime.IsZero() { lastSeenTime = logTime } @@ -274,10 +287,23 @@ func (r *LogsActorRunner) startMigrationMonitor( }() } +// validateLogFilterFlags rejects the one unsupported filter combination: a +// container filter and the supervisor filter cannot be requested together. +func validateLogFilterFlags(container string, supervisor bool) error { + if container != "" && supervisor { + return fmt.Errorf("--container and --supervisor are mutually exclusive; specify only one") + } + return nil +} + func runLogsActor(cmd *cobra.Command, args []string) error { ctx := cmd.Context() actorID := args[0] + if err := validateLogFilterFlags(containerFlag, supervisorFlag); err != nil { + return err + } + apiClient, err := ateclient.NewClient(ctx, kubeconfig, k8sContext, endpoint, traceEnabled) if err != nil { return fmt.Errorf("failed to connect to ate-api-server: %w", err) @@ -295,6 +321,8 @@ func runLogsActor(cmd *cobra.Command, args []string) error { stdout: os.Stdout, stderr: os.Stderr, follow: followLogs, + container: containerFlag, + supervisor: supervisorFlag, pollInterval: 2 * time.Second, reconnectInterval: 1 * time.Second, tickerInterval: 2 * time.Second, @@ -303,7 +331,61 @@ func runLogsActor(cmd *cobra.Command, args []string) error { return runner.Run(ctx, actorID) } -func filterAndDisplayLogLine(line, targetActorID string, w io.Writer) (time.Time, bool) { +// logLineFilter selects which of an actor's log lines are displayed. container +// and supervisor are mutually exclusive; zero values show all of the lines. +type logLineFilter struct { + actorID string + container string // when non-empty, show only lines from this container + supervisor bool // when true, show only supervisor (non-container) lines +} + +// matchesSource reports whether a line from the given container (empty for +// supervisor lines) passes the filter's source selection. +func (f logLineFilter) matchesSource(containerName string) bool { + switch { + case f.supervisor: + return containerName == "" + case f.container != "": + return containerName == f.container + default: + return true + } +} + +// Reserved Substrate log labels, written under one of logActorLabelKeys. +const ( + ateLabelPrefix = "ate.dev/" + ateActorIDLabel = ateLabelPrefix + "actor_id" + ateContainerNameLabel = ateLabelPrefix + "container_name" +) + +// logActorLabelKeys are the keys a line may carry labels under, GCE key first. +var logActorLabelKeys = []string{"logging.googleapis.com/labels", "labels"} + +// actorSource returns the actor_id and container_name from the first recognized +// label map that carries a non-empty ate.dev/actor_id. Both are read from that +// same map so a line's source is never split across the two keys; actor_id is "" +// when no map identifies an actor. +func actorSource(m map[string]any) (actorID, containerName string) { + for _, key := range logActorLabelKeys { + labels, ok := m[key].(map[string]any) + if !ok { + continue + } + if id, _ := labels[ateActorIDLabel].(string); id != "" { + containerName, _ = labels[ateContainerNameLabel].(string) + return id, containerName + } + } + return "", "" +} + +// filterAndDisplayLogLine writes the cleaned line (ate.dev labels stripped) to w +// when it belongs to the target actor and passes the filter. It returns the +// line's timestamp only for displayed lines (zero otherwise), so follow mode +// never advances its resume cursor past a filtered-out or foreign line and skips +// logs on reconnect. The bool reports whether the line was written. +func filterAndDisplayLogLine(line string, filter logLineFilter, w io.Writer) (time.Time, bool) { var m map[string]any dec := json.NewDecoder(strings.NewReader(line)) dec.UseNumber() @@ -320,30 +402,21 @@ func filterAndDisplayLogLine(line, targetActorID string, w io.Writer) (time.Time } } - var actorID string - for _, labelKey := range []string{"logging.googleapis.com/labels", "labels"} { - if labelsAny, ok := m[labelKey]; ok { - if labels, ok := labelsAny.(map[string]any); ok { - if id, ok := labels["ate.dev/actor_id"].(string); ok && id != "" { - actorID = id - break - } - } - } + actorID, containerName := actorSource(m) + if actorID == "" || actorID != filter.actorID { + return time.Time{}, false } - matched := (actorID != "" && actorID == targetActorID) - - if !matched { - return logTime, false + if !filter.matchesSource(containerName) { + return time.Time{}, false } // remove actor labels from CLI output - for _, labelKey := range []string{"logging.googleapis.com/labels", "labels"} { + for _, labelKey := range logActorLabelKeys { if labelsAny, ok := m[labelKey]; ok { if labels, ok := labelsAny.(map[string]any); ok { for k := range labels { - if strings.HasPrefix(k, "ate.dev/") { + if strings.HasPrefix(k, ateLabelPrefix) { delete(labels, k) } } @@ -363,7 +436,7 @@ func filterAndDisplayLogLine(line, targetActorID string, w io.Writer) (time.Time enc := json.NewEncoder(&buf) enc.SetEscapeHTML(false) if err := enc.Encode(m); err != nil { - return logTime, false + return time.Time{}, false } encodedStr := strings.TrimSpace(buf.String()) diff --git a/cmd/kubectl-ate/internal/cmd/logs_actors_test.go b/cmd/kubectl-ate/internal/cmd/logs_actors_test.go index 97ce505e8..a977cb08a 100644 --- a/cmd/kubectl-ate/internal/cmd/logs_actors_test.go +++ b/cmd/kubectl-ate/internal/cmd/logs_actors_test.go @@ -36,6 +36,8 @@ func TestFilterAndDisplayLogLine(t *testing.T) { name string line string targetActorID string + container string + supervisor bool wantMatched bool wantTime string wantOutput string @@ -77,7 +79,7 @@ func TestFilterAndDisplayLogLine(t *testing.T) { line: `{"time":"2026-05-16T01:03:38Z","message":"Hello world","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-2"}}`, targetActorID: "act-1", wantMatched: false, - wantTime: "2026-05-16T01:03:38Z", + wantTime: "", // zero time: another actor's line must not advance follow's resume cursor wantOutput: "", }, { @@ -120,12 +122,137 @@ func TestFilterAndDisplayLogLine(t *testing.T) { wantTime: "2026-05-16T01:03:38Z", wantOutput: `{"time":"2026-05-16T01:03:38Z","level":"info","logging.googleapis.com/labels":{"app":"my-app"},"msg":"Hello"}`, }, + { + name: "container filter matches the named container", + line: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"hi","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-1","ate.dev/container_name":"counter"}}`, + targetActorID: "act-1", + container: "counter", + wantMatched: true, + wantTime: "2026-05-16T01:03:38Z", + wantOutput: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"hi"}`, + }, + { + name: "container filter excludes a different container", + line: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"hi","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-1","ate.dev/container_name":"sidecar"}}`, + targetActorID: "act-1", + container: "counter", + wantMatched: false, + wantTime: "", // filtered out: only displayed lines may advance follow's resume cursor + wantOutput: "", + }, + { + name: "supervisor filter matches a lifecycle line without container_name", + line: `{"time":"2026-05-16T01:03:38Z","message":"Actor started","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-1"}}`, + targetActorID: "act-1", + supervisor: true, + wantMatched: true, + wantTime: "2026-05-16T01:03:38Z", + wantOutput: `{"time":"2026-05-16T01:03:38Z","message":"Actor started"}`, + }, + { + name: "supervisor filter excludes a container line", + line: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"hi","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-1","ate.dev/container_name":"counter"}}`, + targetActorID: "act-1", + supervisor: true, + wantMatched: false, + wantTime: "", // filtered out: only displayed lines may advance follow's resume cursor + wantOutput: "", + }, + { + name: "default filter shows a container line", + line: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"hi","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-1","ate.dev/container_name":"counter"}}`, + targetActorID: "act-1", + wantMatched: true, + wantTime: "2026-05-16T01:03:38Z", + wantOutput: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"hi"}`, + }, + { + // Non-GCE shape: actor metadata under "labels", app labels under the GCE key. + name: "metadata under labels key, app data under GCE key", + line: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"hi","logging.googleapis.com/labels":{"app":"my-app"},"labels":{"ate.dev/actor_id":"act-1","ate.dev/container_name":"counter"}}`, + targetActorID: "act-1", + container: "counter", + wantMatched: true, + wantTime: "2026-05-16T01:03:38Z", + wantOutput: `{"time":"2026-05-16T01:03:38Z","level":"info","logging.googleapis.com/labels":{"app":"my-app"},"msg":"hi"}`, + }, + { + // Both keys carry metadata: GCE key wins; container read from its map (counter). + name: "both keys carry metadata, GCE key wins atomically", + line: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"hi","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-1","ate.dev/container_name":"counter"},"labels":{"ate.dev/actor_id":"act-1","ate.dev/container_name":"sidecar"}}`, + targetActorID: "act-1", + container: "counter", + wantMatched: true, + wantTime: "2026-05-16T01:03:38Z", + wantOutput: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"hi"}`, + }, + { + // Same line: the non-authoritative key's container (sidecar) is ignored. + name: "both keys carry metadata, non-GCE container is ignored", + line: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"hi","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-1","ate.dev/container_name":"counter"},"labels":{"ate.dev/actor_id":"act-1","ate.dev/container_name":"sidecar"}}`, + targetActorID: "act-1", + container: "sidecar", + wantMatched: false, + wantTime: "", + wantOutput: "", + }, + { + // Empty actor_id under the GCE key must not shadow the labels key. + name: "empty actor_id under GCE key falls through to labels key", + line: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"hi","logging.googleapis.com/labels":{"ate.dev/actor_id":""},"labels":{"ate.dev/actor_id":"act-1","ate.dev/container_name":"counter"}}`, + targetActorID: "act-1", + container: "counter", + wantMatched: true, + wantTime: "2026-05-16T01:03:38Z", + wantOutput: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"hi"}`, + }, + { + // Non-string actor_id under the GCE key fails the type assertion and falls through. + name: "non-string actor_id under GCE key falls through to labels key", + line: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"hi","logging.googleapis.com/labels":{"ate.dev/actor_id":123},"labels":{"ate.dev/actor_id":"act-1","ate.dev/container_name":"counter"}}`, + targetActorID: "act-1", + container: "counter", + wantMatched: true, + wantTime: "2026-05-16T01:03:38Z", + wantOutput: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"hi"}`, + }, + { + // An empty actor_id with no other identifying key is not our actor. + name: "empty actor_id under the only key is dropped", + line: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"hi","logging.googleapis.com/labels":{"ate.dev/actor_id":""}}`, + targetActorID: "act-1", + wantMatched: false, + wantTime: "", + wantOutput: "", + }, + { + // actor_id is under the GCE key (which carries no container_name); + // container_name lives only under the labels key. Filtering by container + // must not borrow it from the non-selected map, so the line is not matched. + name: "container_name only under non-selected key is not borrowed", + line: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"hi","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-1"},"labels":{"ate.dev/container_name":"counter"}}`, + targetActorID: "act-1", + container: "counter", + wantMatched: false, + wantTime: "", + wantOutput: "", + }, + { + // Same line, default filter: the line is the actor's and is shown, and the + // stray container_name under the labels key is stripped, not adopted. + name: "container_name under non-selected key, default filter shows the line", + line: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"hi","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-1"},"labels":{"ate.dev/container_name":"counter"}}`, + targetActorID: "act-1", + wantMatched: true, + wantTime: "2026-05-16T01:03:38Z", + wantOutput: `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"hi"}`, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var buf bytes.Buffer - logTime, matched := filterAndDisplayLogLine(tc.line, tc.targetActorID, &buf) + logTime, matched := filterAndDisplayLogLine(tc.line, logLineFilter{actorID: tc.targetActorID, container: tc.container, supervisor: tc.supervisor}, &buf) if matched != tc.wantMatched { t.Errorf("got matched = %v, want %v", matched, tc.wantMatched) @@ -239,6 +366,171 @@ func TestLogsActorRunner_Run_OneShotSuccess(t *testing.T) { } } +func TestLogsActorRunner_Run_OneShot_ContainerFilter(t *testing.T) { + actorID := "act-123" + podName := "pod-xyz" + namespace := "ns-abc" + + mockAPI := &mockAteAPIClient{ + GetActorFunc: func(ctx context.Context, in *ateapipb.GetActorRequest, opts ...grpc.CallOption) (*ateapipb.GetActorResponse, error) { + return &ateapipb.GetActorResponse{ + Actor: &ateapipb.Actor{ + ActorId: actorID, + AteomPodName: podName, + AteomPodNamespace: namespace, + Status: ateapipb.Actor_STATUS_RUNNING, + }, + }, nil + }, + } + + counterLine := `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"from counter","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-123","ate.dev/container_name":"counter"}}` + sidecarLine := `{"time":"2026-05-16T01:03:39Z","level":"info","msg":"from sidecar","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-123","ate.dev/container_name":"sidecar"}}` + supervisorLine := `{"time":"2026-05-16T01:03:40Z","message":"Actor started","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-123"}}` + + mockStreamer := &mockPodLogsStreamer{ + StreamLogsFunc: func(ctx context.Context, ns, name string, opts *corev1.PodLogOptions) (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader(counterLine + "\n" + sidecarLine + "\n" + supervisorLine + "\n")), nil + }, + } + + var stdout, stderr bytes.Buffer + runner := &LogsActorRunner{ + apiClient: mockAPI, + streamer: mockStreamer, + stdout: &stdout, + stderr: &stderr, + follow: false, + container: "counter", + } + + if err := runner.Run(context.Background(), actorID); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + gotOutput := strings.TrimSpace(stdout.String()) + wantOutput := `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"from counter"}` + if gotOutput != wantOutput { + t.Errorf("got stdout %q, want only the counter line %q", gotOutput, wantOutput) + } +} + +func TestLogsActorRunner_Run_OneShot_SupervisorFilter(t *testing.T) { + actorID := "act-123" + podName := "pod-xyz" + namespace := "ns-abc" + + mockAPI := &mockAteAPIClient{ + GetActorFunc: func(ctx context.Context, in *ateapipb.GetActorRequest, opts ...grpc.CallOption) (*ateapipb.GetActorResponse, error) { + return &ateapipb.GetActorResponse{ + Actor: &ateapipb.Actor{ + ActorId: actorID, + AteomPodName: podName, + AteomPodNamespace: namespace, + Status: ateapipb.Actor_STATUS_RUNNING, + }, + }, nil + }, + } + + counterLine := `{"time":"2026-05-16T01:03:38Z","level":"info","msg":"from counter","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-123","ate.dev/container_name":"counter"}}` + supervisorLine := `{"time":"2026-05-16T01:03:40Z","message":"Actor started","logging.googleapis.com/labels":{"ate.dev/actor_id":"act-123"}}` + + mockStreamer := &mockPodLogsStreamer{ + StreamLogsFunc: func(ctx context.Context, ns, name string, opts *corev1.PodLogOptions) (io.ReadCloser, error) { + return io.NopCloser(strings.NewReader(counterLine + "\n" + supervisorLine + "\n")), nil + }, + } + + var stdout, stderr bytes.Buffer + runner := &LogsActorRunner{ + apiClient: mockAPI, + streamer: mockStreamer, + stdout: &stdout, + stderr: &stderr, + follow: false, + supervisor: true, + } + + if err := runner.Run(context.Background(), actorID); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + gotOutput := strings.TrimSpace(stdout.String()) + wantOutput := `{"time":"2026-05-16T01:03:40Z","message":"Actor started"}` + if gotOutput != wantOutput { + t.Errorf("got stdout %q, want only the supervisor line %q", gotOutput, wantOutput) + } +} + +func TestValidateLogFilterFlags(t *testing.T) { + tests := []struct { + name string + container string + supervisor bool + wantErr bool + }{ + {"neither", "", false, false}, + {"container only", "counter", false, false}, + {"supervisor only", "", true, false}, + {"both", "counter", true, true}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := validateLogFilterFlags(tc.container, tc.supervisor) + if tc.wantErr { + if err == nil { + t.Fatal("expected an error, got nil") + } + if !strings.Contains(err.Error(), "mutually exclusive") { + t.Errorf("error %q missing substring %q", err, "mutually exclusive") + } + } else if err != nil { + t.Errorf("unexpected error: %v", err) + } + }) + } +} + +// TestLogsActorRunner_Run_RejectsContainerAndSupervisor verifies the runner +// enforces its own invariant when constructed directly (not only via the CLI), +// failing fast before any control-plane or pod calls. +func TestLogsActorRunner_Run_RejectsContainerAndSupervisor(t *testing.T) { + mockAPI := &mockAteAPIClient{ + GetActorFunc: func(ctx context.Context, in *ateapipb.GetActorRequest, opts ...grpc.CallOption) (*ateapipb.GetActorResponse, error) { + t.Error("GetActor should not be called when filters are mutually exclusive") + return nil, fmt.Errorf("should not be called") + }, + } + mockStreamer := &mockPodLogsStreamer{ + StreamLogsFunc: func(ctx context.Context, ns, name string, opts *corev1.PodLogOptions) (io.ReadCloser, error) { + t.Error("StreamLogs should not be called when filters are mutually exclusive") + return nil, fmt.Errorf("should not be called") + }, + } + + var stdout, stderr bytes.Buffer + runner := &LogsActorRunner{ + apiClient: mockAPI, + streamer: mockStreamer, + stdout: &stdout, + stderr: &stderr, + container: "counter", + supervisor: true, + } + + err := runner.Run(context.Background(), "act-1") + if err == nil { + t.Fatal("expected an error when both --container and --supervisor are set, got nil") + } + if !strings.Contains(err.Error(), "mutually exclusive") { + t.Errorf("unexpected error: %v (want substring %q)", err, "mutually exclusive") + } + if mockAPI.CloseCalls != 1 { + t.Errorf("expected Close to be called once, got %d", mockAPI.CloseCalls) + } +} + func TestLogsActorRunner_Run_OneShot_ActorNotRunning(t *testing.T) { actorID := "act-123" @@ -381,6 +673,98 @@ func TestLogsActorRunner_Run_Follow_SuspendedToRunning(t *testing.T) { } } +// TestLogsActorRunner_Run_Follow_ForeignActorLineDoesNotAdvanceCursor is a +// regression test: a worker pod's log stream can contain lines from a different +// actor that previously ran on it. Those foreign lines must never advance the +// follow resume cursor (SinceTime); otherwise a reconnect could skip the target +// actor's own logs. +func TestLogsActorRunner_Run_Follow_ForeignActorLineDoesNotAdvanceCursor(t *testing.T) { + actorID := "act-123" + podName := "pod-xyz" + namespace := "ns-abc" + + mockAPI := &mockAteAPIClient{ + GetActorFunc: func(ctx context.Context, in *ateapipb.GetActorRequest, opts ...grpc.CallOption) (*ateapipb.GetActorResponse, error) { + return &ateapipb.GetActorResponse{ + Actor: &ateapipb.Actor{ + ActorId: actorID, + AteomPodName: podName, + AteomPodNamespace: namespace, + Status: ateapipb.Actor_STATUS_RUNNING, + }, + }, nil + }, + } + + // A line belonging to a DIFFERENT actor that shares the worker pod's stream. + // It carries a parseable timestamp but must not move the resume cursor. + foreignLine := `{"time":"2026-05-16T01:03:38Z","message":"not mine","logging.googleapis.com/labels":{"ate.dev/actor_id":"other"}}` + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var mu sync.Mutex + var streamCalls int + var reconnectHadSinceTime bool + secondCall := make(chan struct{}) + var once sync.Once + + mockStreamer := &mockPodLogsStreamer{ + StreamLogsFunc: func(streamCtx context.Context, ns, name string, opts *corev1.PodLogOptions) (io.ReadCloser, error) { + mu.Lock() + streamCalls++ + call := streamCalls + mu.Unlock() + + if call == 1 { + // First connection: emit one foreign line, then EOF to force a reconnect. + return io.NopCloser(strings.NewReader(foreignLine + "\n")), nil + } + + // Reconnection: record whether a resume cursor was carried over. + mu.Lock() + reconnectHadSinceTime = opts.SinceTime != nil + mu.Unlock() + once.Do(func() { close(secondCall) }) + <-streamCtx.Done() + return nil, streamCtx.Err() + }, + } + + runner := &LogsActorRunner{ + apiClient: mockAPI, + streamer: mockStreamer, + stdout: &bytes.Buffer{}, + stderr: &bytes.Buffer{}, + follow: true, + pollInterval: 1 * time.Millisecond, + reconnectInterval: 1 * time.Millisecond, + tickerInterval: time.Hour, // keep the migration monitor out of the way + } + + done := make(chan struct{}) + go func() { + _ = runner.Run(ctx, actorID) + close(done) + }() + + select { + case <-secondCall: + case <-time.After(2 * time.Second): + cancel() + t.Fatal("reconnect (second StreamLogs call) never happened") + } + + cancel() + <-done + + mu.Lock() + defer mu.Unlock() + if reconnectHadSinceTime { + t.Error("reconnect carried a SinceTime cursor; a foreign actor's line must not advance the follow cursor") + } +} + func TestLogsActorRunner_Run_Follow_NotFoundActor(t *testing.T) { actorID := "act-notfound" diff --git a/docs/observability.md b/docs/observability.md index fc8a40152..ee03ee976 100644 --- a/docs/observability.md +++ b/docs/observability.md @@ -24,16 +24,16 @@ Agent Substrate captures container standard output/error, wraps them into struct For quick, on-demand debugging of an active actor, use the Agent Substrate CLI: ```bash -kubectl ate logs [--follow / -f] +kubectl ate logs actors [--follow / -f] ``` -> **Note:** By default, `kubectl ate logs` queries the Kubernetes API of the worker pod where the actor is *currently* running. It is designed for immediate inspection of active actors. To view historical logs across past worker pods and suspension cycles, use a centralized logging backend. +> **Note:** By default, `kubectl ate logs actors` queries the Kubernetes API of the worker pod where the actor is *currently* running. It is designed for immediate inspection of active actors. To view historical logs across past worker pods and suspension cycles, use a centralized logging backend. #### Example 1: Actor Not Currently Running If an actor is suspended or not assigned to a worker pod, the CLI informs you immediately: ```bash -$ kubectl ate logs test +$ kubectl ate logs actors test Error: actor test is not currently running on any worker pod ``` @@ -41,7 +41,7 @@ Error: actor test is not currently running on any worker pod When an active actor is assigned to a worker pod, the CLI outputs clean, uniform JSON lines stripped of Substrate metadata, perfectly matching standard `kubectl logs` behavior: ```bash -$ kubectl ate logs test +$ kubectl ate logs actors test {"time":"2026-05-22T21:49:15.23700774Z","message":"Actor started"} {"time":"2026-05-22T21:49:15.23700774Z","level":"INFO","msg":"Starting counter server on port 80"} {"time":"2026-05-22T21:49:15.255765354Z","count":0,"fshash":"mCY7G4S318ztOUojPTF2NA/W+ZSmWyr+T5K3udFuP50","level":"INFO","msg":"Count"} @@ -52,7 +52,7 @@ $ kubectl ate logs test To stream actor logs in real-time, append the `--follow` (or `-f`) flag. The CLI is fully actor-aware, automatically resuming the stream if the actor is suspended or migrates to a different worker pod: ```bash -$ kubectl ate logs test -f +$ kubectl ate logs actors test -f Actor is currently running on pod ate-demo-counter/counter-deployment-d8f99-m7d96 {"time":"2026-05-22T21:49:15.255765354Z","count":0,"fshash":"mCY7...","level":"INFO","msg":"Count"} {"time":"2026-05-22T21:49:25.263744806Z","count":1,"fshash":"mCY7...","level":"INFO","msg":"Count"} @@ -60,6 +60,18 @@ Actor is currently running on pod ate-demo-counter/counter-deployment-ab123-x4y5 {"time":"2026-05-22T21:50:02.123456789Z","count":2,"fshash":"mCY7...","level":"INFO","msg":"Count"} ``` +#### Example 4: Filtering by Container or Supervisor +An actor may run multiple containers, and Agent Substrate also emits synthetic supervisor lifecycle events (e.g., `Actor started`, `Actor checkpointing`). By default `kubectl ate logs actors` shows all of an actor's log lines. You can narrow the output to a single container or to just the supervisor: + +```bash +# Only logs from a specific container within the actor (-c for short). +kubectl ate logs actors --container + +# Only the ateom supervisor (lifecycle) logs. +kubectl ate logs actors --supervisor +``` + +`--container` and `--supervisor` are mutually exclusive. Container log lines are identified by the `ate.dev/container_name` label; supervisor lifecycle lines do not carry that label. --- diff --git a/internal/actorlog/logger.go b/internal/actorlog/logger.go index abe09e863..57d19b5f6 100644 --- a/internal/actorlog/logger.go +++ b/internal/actorlog/logger.go @@ -25,10 +25,17 @@ import ( "errors" "io" "os" + "strings" "sync" "time" ) +const ateLabelPrefix = "ate.dev/" + +// recognizedLabelKeys are the keys a log line may carry labels under +// ("logging.googleapis.com/labels" on GCE, "labels" elsewhere). +var recognizedLabelKeys = []string{"labels", "logging.googleapis.com/labels"} + // SyncedWriter wraps an io.Writer and synchronizes writes across goroutines. type SyncedWriter struct { mu sync.Mutex @@ -142,6 +149,17 @@ func (al *ActorLogger) WrapContainerLogs(r io.Reader, actorID, actorTemplateName if _, ok := m["time"]; !ok { m["time"] = time.Now().Format(time.RFC3339Nano) } + // Strip app-provided ate.dev/* labels from every recognized key + // (not just the one we write) so apps can't spoof Substrate metadata. + for _, key := range recognizedLabelKeys { + if existing, ok := m[key].(map[string]any); ok { + for k := range existing { + if strings.HasPrefix(k, ateLabelPrefix) { + delete(existing, k) + } + } + } + } labels, ok := m[al.labelsKey].(map[string]any) if !ok { labels = make(map[string]any) diff --git a/internal/actorlog/logger_test.go b/internal/actorlog/logger_test.go index ab2b3240f..931c017bc 100644 --- a/internal/actorlog/logger_test.go +++ b/internal/actorlog/logger_test.go @@ -224,6 +224,49 @@ func TestWrapContainerLogs_LabelCollision(t *testing.T) { } } +func TestWrapContainerLogs_CrossKeyLabelSpoofing(t *testing.T) { + // Non-GCE logger writes under "labels"; an app must not be able to spoof + // metadata under the other recognized key and have a consumer trust it. + input := `{"level":"info","msg":"App log","logging.googleapis.com/labels":{"ate.dev/actor_id":"victim","ate.dev/container_name":"spoofed","app":"keep-me"}}` + "\n" + rdr := strings.NewReader(input) + + var buf bytes.Buffer + al := NewActorLogger(&buf, false) // non-GCE => authoritative key is "labels" + al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default", "ctr-1") + + var m map[string]any + if err := json.Unmarshal(buf.Bytes(), &m); err != nil { + t.Fatalf("failed to parse JSON output: %v", err) + } + + // Substrate's real metadata lives under the environment key ("labels"). + authoritative, ok := m["labels"].(map[string]any) + if !ok { + t.Fatal("missing authoritative labels group") + } + if authoritative["ate.dev/actor_id"] != "act-1" { + t.Errorf("got actor_id = %v, want 'act-1'", authoritative["ate.dev/actor_id"]) + } + if authoritative["ate.dev/container_name"] != "ctr-1" { + t.Errorf("got container_name = %v, want 'ctr-1'", authoritative["ate.dev/container_name"]) + } + + // The spoofed ate.dev/* labels are stripped; the app's own labels are kept. + spoofed, ok := m["logging.googleapis.com/labels"].(map[string]any) + if !ok { + t.Fatal("expected the application's logging.googleapis.com/labels map to survive") + } + if _, present := spoofed["ate.dev/actor_id"]; present { + t.Error("spoofed ate.dev/actor_id survived under logging.googleapis.com/labels") + } + if _, present := spoofed["ate.dev/container_name"]; present { + t.Error("spoofed ate.dev/container_name survived under logging.googleapis.com/labels") + } + if spoofed["app"] != "keep-me" { + t.Errorf("non-reserved app label not preserved: got %v", spoofed["app"]) + } +} + func TestWrapContainerLogs_TrailingGarbage(t *testing.T) { input := `{"count": 1} garbage` + "\n" rdr := strings.NewReader(input)