diff --git a/cmd/ateom-gvisor/internal/ateom/logger.go b/cmd/ateom-gvisor/internal/ateom/logger.go index 7e3775a78..ea5a31901 100644 --- a/cmd/ateom-gvisor/internal/ateom/logger.go +++ b/cmd/ateom-gvisor/internal/ateom/logger.go @@ -78,21 +78,26 @@ func (al *ActorLogger) EmitLifecycleLog(msg, actorID, actorTemplateName, actorTe } } -// StartJSONLogPipe intercepts container raw stdout/stderr streams and pipes them through the logger. -func (al *ActorLogger) StartJSONLogPipe(actorID, actorTemplateName, actorTemplateNamespace string) (io.WriteCloser, error) { +// StartJSONLogPipe intercepts container raw stdout/stderr streams and pipes them +// through the logger. containerName tags every line with the originating container; +// callers that multiplex multiple containers should give each its own pipe so the +// tag is meaningful. +func (al *ActorLogger) StartJSONLogPipe(actorID, actorTemplateName, actorTemplateNamespace, containerName string) (io.WriteCloser, error) { pr, pw, err := os.Pipe() if err != nil { return nil, err } go func() { - al.WrapContainerLogs(pr, actorID, actorTemplateName, actorTemplateNamespace) + al.WrapContainerLogs(pr, actorID, actorTemplateName, actorTemplateNamespace, containerName) pr.Close() }() return pw, nil } -// WrapContainerLogs reads log lines from r, parses them, and logs them in a unified structured format. -func (al *ActorLogger) WrapContainerLogs(r io.Reader, actorID, actorTemplateName, actorTemplateNamespace string) { +// WrapContainerLogs reads log lines from r, parses them, and logs them in a unified +// structured format. containerName is added as the ate.dev/container_name label so +// multi-container actors can be demultiplexed. +func (al *ActorLogger) WrapContainerLogs(r io.Reader, actorID, actorTemplateName, actorTemplateNamespace, containerName string) { rdr := bufio.NewReader(r) for { lineBytes, err := rdr.ReadBytes('\n') @@ -118,14 +123,16 @@ func (al *ActorLogger) WrapContainerLogs(r io.Reader, actorID, actorTemplateName } if unmarshalErr != nil { + labels := map[string]string{ + "ate.dev/actor_id": actorID, + "ate.dev/actor_template_name": actorTemplateName, + "ate.dev/actor_template_namespace": actorTemplateNamespace, + "ate.dev/container_name": containerName, + } envelope = map[string]any{ - "time": time.Now().Format(time.RFC3339Nano), - "message": string(lineBytes), - al.labelsKey: map[string]string{ - "ate.dev/actor_id": actorID, - "ate.dev/actor_template_name": actorTemplateName, - "ate.dev/actor_template_namespace": actorTemplateNamespace, - }, + "time": time.Now().Format(time.RFC3339Nano), + "message": string(lineBytes), + al.labelsKey: labels, } } else { if _, ok := m["time"]; !ok { @@ -139,6 +146,7 @@ func (al *ActorLogger) WrapContainerLogs(r io.Reader, actorID, actorTemplateName labels["ate.dev/actor_id"] = actorID labels["ate.dev/actor_template_name"] = actorTemplateName labels["ate.dev/actor_template_namespace"] = actorTemplateNamespace + labels["ate.dev/container_name"] = containerName envelope = m } diff --git a/cmd/ateom-gvisor/internal/ateom/logger_test.go b/cmd/ateom-gvisor/internal/ateom/logger_test.go index 5566beff5..c76ffc7fc 100644 --- a/cmd/ateom-gvisor/internal/ateom/logger_test.go +++ b/cmd/ateom-gvisor/internal/ateom/logger_test.go @@ -28,7 +28,7 @@ func TestWrapContainerLogs(t *testing.T) { var buf bytes.Buffer al := NewActorLogger(&buf, false) - al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default") + al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default", "ctr-1") var m map[string]any if err := json.Unmarshal(buf.Bytes(), &m); err != nil { @@ -63,6 +63,9 @@ func TestWrapContainerLogs(t *testing.T) { if labels["ate.dev/actor_template_namespace"] != "default" { t.Errorf("got actor_template_namespace = %v, want 'default'", labels["ate.dev/actor_template_namespace"]) } + if labels["ate.dev/container_name"] != "ctr-1" { + t.Errorf("got container_name = %v, want 'ctr-1'", labels["ate.dev/container_name"]) + } } func TestWrapContainerLogs_JSONInput(t *testing.T) { @@ -72,7 +75,7 @@ func TestWrapContainerLogs_JSONInput(t *testing.T) { var buf bytes.Buffer al := NewActorLogger(&buf, false) - al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default") + al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default", "ctr-1") dec := json.NewDecoder(&buf) dec.UseNumber() @@ -161,7 +164,7 @@ func TestWrapContainerLogs_MergeLabels(t *testing.T) { var buf bytes.Buffer al := NewActorLogger(&buf, false) // labelsKey will be "labels" - al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default") + al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default", "ctr-1") var m map[string]any if err := json.Unmarshal(buf.Bytes(), &m); err != nil { @@ -186,6 +189,9 @@ func TestWrapContainerLogs_MergeLabels(t *testing.T) { if labels["ate.dev/actor_id"] != "act-1" { t.Errorf("got actor_id = %v, want 'act-1'", labels["ate.dev/actor_id"]) } + if labels["ate.dev/container_name"] != "ctr-1" { + t.Errorf("got container_name = %v, want 'ctr-1'", labels["ate.dev/container_name"]) + } } func TestWrapContainerLogs_LabelCollision(t *testing.T) { @@ -194,7 +200,7 @@ func TestWrapContainerLogs_LabelCollision(t *testing.T) { var buf bytes.Buffer al := NewActorLogger(&buf, false) - al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default") + al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default", "ctr-1") var m map[string]any if err := json.Unmarshal(buf.Bytes(), &m); err != nil { @@ -224,7 +230,7 @@ func TestWrapContainerLogs_TrailingGarbage(t *testing.T) { var buf bytes.Buffer al := NewActorLogger(&buf, false) - al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default") + al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default", "ctr-1") var m map[string]any if err := json.Unmarshal(buf.Bytes(), &m); err != nil { diff --git a/cmd/ateom-gvisor/main.go b/cmd/ateom-gvisor/main.go index 365447fca..389db5b71 100644 --- a/cmd/ateom-gvisor/main.go +++ b/cmd/ateom-gvisor/main.go @@ -210,14 +210,14 @@ func (s *AteomService) RunWorkload(ctx context.Context, req *ateompb.RunWorkload return nil, fmt.Errorf("while starting pause container: %w", err) } - pw, err := s.actorLogger.StartJSONLogPipe(req.GetActorId(), req.GetActorTemplateName(), req.GetActorTemplateNamespace()) - if err != nil { - return nil, fmt.Errorf("while starting json log pipe: %w", err) - } - defer pw.Close() - - // Create and start each application container + // Create and start each application container, each with its own log pipe so + // every line is tagged with the originating container (ate.dev/container_name). for _, ac := range req.GetSpec().GetContainers() { + pw, err := s.actorLogger.StartJSONLogPipe(req.GetActorId(), req.GetActorTemplateName(), req.GetActorTemplateNamespace(), ac.GetName()) + if err != nil { + return nil, fmt.Errorf("while starting json log pipe for %q: %w", ac.GetName(), err) + } + defer pw.Close() if err := rcmd.cmdCreate(ctx, pw, ac.GetName()); err != nil { return nil, fmt.Errorf("while creating %q application container: %w", ac.GetName(), err) } @@ -342,14 +342,14 @@ func (s *AteomService) RestoreWorkload(ctx context.Context, req *ateompb.Restore return nil, fmt.Errorf("while starting pause container: %w", err) } - pw, err := s.actorLogger.StartJSONLogPipe(req.GetActorId(), req.GetActorTemplateName(), req.GetActorTemplateNamespace()) - if err != nil { - return nil, fmt.Errorf("while starting json log pipe: %w", err) - } - defer pw.Close() - - // Create and restore each application container + // Create and restore each application container, each with its own log pipe so + // every line is tagged with the originating container (ate.dev/container_name). for _, ac := range req.GetSpec().GetContainers() { + pw, err := s.actorLogger.StartJSONLogPipe(req.GetActorId(), req.GetActorTemplateName(), req.GetActorTemplateNamespace(), ac.GetName()) + if err != nil { + return nil, fmt.Errorf("while starting json log pipe for %q: %w", ac.GetName(), err) + } + defer pw.Close() if err := rcmd.cmdCreate(ctx, pw, ac.GetName()); err != nil { return nil, fmt.Errorf("while creating %q application container: %w", ac.GetName(), err) } diff --git a/docs/observability.md b/docs/observability.md index 5d05a4ce3..fc8a40152 100644 --- a/docs/observability.md +++ b/docs/observability.md @@ -10,6 +10,7 @@ To make underlying infrastructure transitions transparent, Agent Substrate estab * `ate.dev/actor_id`: The unique identifier of the actor (e.g., `my-counter-1` or `test`). * `ate.dev/actor_template_name`: The name of the actor's ActorTemplate (e.g., `counter`). * `ate.dev/actor_template_namespace`: The Kubernetes namespace of the actor's ActorTemplate (e.g., `ate-demo-counter`). +* `ate.dev/container_name`: The name of the container within the actor that produced the log line (e.g., `counter`), so a multi-container actor's logs can be demultiplexed by container. Currently, Agent Substrate automatically wraps container output and injects these metadata labels into **container logs**. For metrics and distributed tracing, Agent Substrate provides foundational system telemetry and on-demand request tracing, with roadmap plans to fully integrate actor-level correlation.