Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions cmd/kubectl-ate/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,19 @@ kubectl ate delete actor my-actor
`kubectl ate logs` requires a resource-type subcommand; running `kubectl ate logs <id>` on its own prints help. The only supported resource type is `actors`:

```bash
# Stream logs for an actor (follows by default; aggregated across worker
# reassignments so the same actor is queryable as it teleports between pods).
# Stream logs for an actor (aggregated across worker reassignments so the same
# actor is queryable as it teleports between pods).
kubectl ate logs actors my-actor

# Show logs from a single container within the actor (-c for short).
kubectl ate logs actors my-actor --container my-container

# Show only the ateom supervisor (lifecycle) logs, e.g. "Actor started".
kubectl ate logs actors my-actor --supervisor
```

By default all of the actor's logs are shown. Use `-c`/`--container <name>` to restrict output to a single container, or `--supervisor` to show only ateom's lifecycle logs. The two flags are mutually exclusive.

Logs are streamable only while the actor is bound to a worker (i.e., `STATUS_RUNNING`). For history across worker migrations, route through a centralized log backend (Cloud Logging, Loki, etc.); see `docs/observability.md`.

### Administration & Setup
Expand Down
115 changes: 94 additions & 21 deletions cmd/kubectl-ate/internal/cmd/logs_actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ import (
"k8s.io/client-go/kubernetes"
)

var followLogs bool
var (
followLogs bool
containerFlag string
supervisorFlag bool
)

var logsActorsCmd = &cobra.Command{
Use: "actors <actor-id>",
Expand All @@ -50,6 +54,8 @@ var logsActorsCmd = &cobra.Command{

func init() {
logsActorsCmd.Flags().BoolVarP(&followLogs, "follow", "f", false, "Specify if the logs should be streamed.")
logsActorsCmd.Flags().StringVarP(&containerFlag, "container", "c", "", "Show logs only from the named container within the actor. Mutually exclusive with --supervisor.")
logsActorsCmd.Flags().BoolVar(&supervisorFlag, "supervisor", false, "Show only the ateom supervisor (lifecycle) logs. Mutually exclusive with --container.")

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why making --container and supervisor mutually exclusive? They seem orthogonal to me.

I was expecting:

  • --supervisor (boolean, defaults to false): whether to include supervisor logs or not
  • --container (string, default to ""): if set, restricts to the logs of this container.

WDYT?

logsCmd.AddCommand(logsActorsCmd)
}

Expand Down Expand Up @@ -80,6 +86,8 @@ type LogsActorRunner struct {
stdout io.Writer
stderr io.Writer
follow bool
container string
supervisor bool
pollInterval time.Duration
reconnectInterval time.Duration
tickerInterval time.Duration
Expand All @@ -98,6 +106,9 @@ func (r *LogsActorRunner) Run(ctx context.Context, actorID string) error {
}

defer r.apiClient.Close()
if err := validateLogFilterFlags(r.container, r.supervisor); err != nil {
return err
}
if r.follow {
return r.runFollow(ctx, actorID)
}
Expand Down Expand Up @@ -128,12 +139,13 @@ func (r *LogsActorRunner) runOneShot(ctx context.Context, actorID string) error
}
defer stream.Close()

filter := logLineFilter{actorID: actorID, container: r.container, supervisor: r.supervisor}
scanner := bufio.NewScanner(stream)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024) // Support up to 1MB lines
for scanner.Scan() {
line := scanner.Text()
filterAndDisplayLogLine(line, actorID, r.stdout)
filterAndDisplayLogLine(line, filter, r.stdout)
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("error reading log stream: %w", err)
Expand Down Expand Up @@ -206,12 +218,13 @@ func (r *LogsActorRunner) runFollow(ctx context.Context, actorID string) error {
var wg sync.WaitGroup
r.startMigrationMonitor(streamCtx, streamCancel, &wg, actorID, podName)

filter := logLineFilter{actorID: actorID, container: r.container, supervisor: r.supervisor}
scanner := bufio.NewScanner(stream)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024) // Support up to 1MB lines
for scanner.Scan() {
line := scanner.Text()
logTime, _ := filterAndDisplayLogLine(line, actorID, r.stdout)
logTime, _ := filterAndDisplayLogLine(line, filter, r.stdout)
if !logTime.IsZero() {
lastSeenTime = logTime
}
Expand Down Expand Up @@ -274,10 +287,23 @@ func (r *LogsActorRunner) startMigrationMonitor(
}()
}

// validateLogFilterFlags rejects the one unsupported filter combination: a
// container filter and the supervisor filter cannot be requested together.
func validateLogFilterFlags(container string, supervisor bool) error {
if container != "" && supervisor {
return fmt.Errorf("--container and --supervisor are mutually exclusive; specify only one")
}
return nil
}

func runLogsActor(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
actorID := args[0]

if err := validateLogFilterFlags(containerFlag, supervisorFlag); err != nil {
return err
}

apiClient, err := ateclient.NewClient(ctx, kubeconfig, k8sContext, endpoint, traceEnabled)
if err != nil {
return fmt.Errorf("failed to connect to ate-api-server: %w", err)
Expand All @@ -295,6 +321,8 @@ func runLogsActor(cmd *cobra.Command, args []string) error {
stdout: os.Stdout,
stderr: os.Stderr,
follow: followLogs,
container: containerFlag,
supervisor: supervisorFlag,
pollInterval: 2 * time.Second,
reconnectInterval: 1 * time.Second,
tickerInterval: 2 * time.Second,
Expand All @@ -303,7 +331,61 @@ func runLogsActor(cmd *cobra.Command, args []string) error {
return runner.Run(ctx, actorID)
}

func filterAndDisplayLogLine(line, targetActorID string, w io.Writer) (time.Time, bool) {
// logLineFilter selects which of an actor's log lines are displayed. container
// and supervisor are mutually exclusive; zero values show all of the lines.
type logLineFilter struct {
actorID string
container string // when non-empty, show only lines from this container
supervisor bool // when true, show only supervisor (non-container) lines
}

// matchesSource reports whether a line from the given container (empty for
// supervisor lines) passes the filter's source selection.
func (f logLineFilter) matchesSource(containerName string) bool {
switch {
case f.supervisor:
return containerName == ""
case f.container != "":
return containerName == f.container
default:
return true
}
}

// Reserved Substrate log labels, written under one of logActorLabelKeys.
const (
ateLabelPrefix = "ate.dev/"
ateActorIDLabel = ateLabelPrefix + "actor_id"
ateContainerNameLabel = ateLabelPrefix + "container_name"
)

// logActorLabelKeys are the keys a line may carry labels under, GCE key first.
var logActorLabelKeys = []string{"logging.googleapis.com/labels", "labels"}

// actorSource returns the actor_id and container_name from the first recognized
// label map that carries a non-empty ate.dev/actor_id. Both are read from that
// same map so a line's source is never split across the two keys; actor_id is ""
// when no map identifies an actor.
func actorSource(m map[string]any) (actorID, containerName string) {
for _, key := range logActorLabelKeys {
labels, ok := m[key].(map[string]any)
if !ok {
continue
}
if id, _ := labels[ateActorIDLabel].(string); id != "" {
containerName, _ = labels[ateContainerNameLabel].(string)
return id, containerName
}
}
return "", ""
}

// filterAndDisplayLogLine writes the cleaned line (ate.dev labels stripped) to w
// when it belongs to the target actor and passes the filter. It returns the
// line's timestamp only for displayed lines (zero otherwise), so follow mode
// never advances its resume cursor past a filtered-out or foreign line and skips
// logs on reconnect. The bool reports whether the line was written.
func filterAndDisplayLogLine(line string, filter logLineFilter, w io.Writer) (time.Time, bool) {
var m map[string]any
dec := json.NewDecoder(strings.NewReader(line))
dec.UseNumber()
Expand All @@ -320,30 +402,21 @@ func filterAndDisplayLogLine(line, targetActorID string, w io.Writer) (time.Time
}
}

var actorID string
for _, labelKey := range []string{"logging.googleapis.com/labels", "labels"} {
if labelsAny, ok := m[labelKey]; ok {
if labels, ok := labelsAny.(map[string]any); ok {
if id, ok := labels["ate.dev/actor_id"].(string); ok && id != "" {
actorID = id
break
}
}
}
actorID, containerName := actorSource(m)
if actorID == "" || actorID != filter.actorID {
return time.Time{}, false
}

matched := (actorID != "" && actorID == targetActorID)

if !matched {
return logTime, false
if !filter.matchesSource(containerName) {
return time.Time{}, false
}

// remove actor labels from CLI output
for _, labelKey := range []string{"logging.googleapis.com/labels", "labels"} {
for _, labelKey := range logActorLabelKeys {
if labelsAny, ok := m[labelKey]; ok {
if labels, ok := labelsAny.(map[string]any); ok {
for k := range labels {
if strings.HasPrefix(k, "ate.dev/") {
if strings.HasPrefix(k, ateLabelPrefix) {
delete(labels, k)
}
}
Expand All @@ -363,7 +436,7 @@ func filterAndDisplayLogLine(line, targetActorID string, w io.Writer) (time.Time
enc := json.NewEncoder(&buf)
enc.SetEscapeHTML(false)
if err := enc.Encode(m); err != nil {
return logTime, false
return time.Time{}, false
}

encodedStr := strings.TrimSpace(buf.String())
Expand Down
Loading