Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions cmd/ateom-gvisor/internal/ateom/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,26 @@ func (al *ActorLogger) EmitLifecycleLog(msg, actorID, actorTemplateName, actorTe
}
}

// StartJSONLogPipe intercepts container raw stdout/stderr streams and pipes them through the logger.
func (al *ActorLogger) StartJSONLogPipe(actorID, actorTemplateName, actorTemplateNamespace string) (io.WriteCloser, error) {
// StartJSONLogPipe intercepts container raw stdout/stderr streams and pipes them
// through the logger. containerName tags every line with the originating container;
// callers that multiplex multiple containers should give each its own pipe so the
// tag is meaningful.
func (al *ActorLogger) StartJSONLogPipe(actorID, actorTemplateName, actorTemplateNamespace, containerName string) (io.WriteCloser, error) {
pr, pw, err := os.Pipe()
if err != nil {
return nil, err
}
go func() {
al.WrapContainerLogs(pr, actorID, actorTemplateName, actorTemplateNamespace)
al.WrapContainerLogs(pr, actorID, actorTemplateName, actorTemplateNamespace, containerName)
pr.Close()
}()
return pw, nil
}

// WrapContainerLogs reads log lines from r, parses them, and logs them in a unified structured format.
func (al *ActorLogger) WrapContainerLogs(r io.Reader, actorID, actorTemplateName, actorTemplateNamespace string) {
// WrapContainerLogs reads log lines from r, parses them, and logs them in a unified
// structured format. containerName is added as the ate.dev/container_name label so
// multi-container actors can be demultiplexed.
func (al *ActorLogger) WrapContainerLogs(r io.Reader, actorID, actorTemplateName, actorTemplateNamespace, containerName string) {
rdr := bufio.NewReader(r)
for {
lineBytes, err := rdr.ReadBytes('\n')
Expand All @@ -118,14 +123,16 @@ func (al *ActorLogger) WrapContainerLogs(r io.Reader, actorID, actorTemplateName
}

if unmarshalErr != nil {
labels := map[string]string{
"ate.dev/actor_id": actorID,
"ate.dev/actor_template_name": actorTemplateName,
"ate.dev/actor_template_namespace": actorTemplateNamespace,
"ate.dev/container_name": containerName,
}
envelope = map[string]any{
"time": time.Now().Format(time.RFC3339Nano),
"message": string(lineBytes),
al.labelsKey: map[string]string{
"ate.dev/actor_id": actorID,
"ate.dev/actor_template_name": actorTemplateName,
"ate.dev/actor_template_namespace": actorTemplateNamespace,
},
"time": time.Now().Format(time.RFC3339Nano),
"message": string(lineBytes),
al.labelsKey: labels,
}
} else {
if _, ok := m["time"]; !ok {
Expand All @@ -139,6 +146,7 @@ func (al *ActorLogger) WrapContainerLogs(r io.Reader, actorID, actorTemplateName
labels["ate.dev/actor_id"] = actorID
labels["ate.dev/actor_template_name"] = actorTemplateName
labels["ate.dev/actor_template_namespace"] = actorTemplateNamespace
labels["ate.dev/container_name"] = containerName
envelope = m
}

Expand Down
16 changes: 11 additions & 5 deletions cmd/ateom-gvisor/internal/ateom/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestWrapContainerLogs(t *testing.T) {

var buf bytes.Buffer
al := NewActorLogger(&buf, false)
al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default")
al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default", "ctr-1")

var m map[string]any
if err := json.Unmarshal(buf.Bytes(), &m); err != nil {
Expand Down Expand Up @@ -63,6 +63,9 @@ func TestWrapContainerLogs(t *testing.T) {
if labels["ate.dev/actor_template_namespace"] != "default" {
t.Errorf("got actor_template_namespace = %v, want 'default'", labels["ate.dev/actor_template_namespace"])
}
if labels["ate.dev/container_name"] != "ctr-1" {
t.Errorf("got container_name = %v, want 'ctr-1'", labels["ate.dev/container_name"])
}
}

func TestWrapContainerLogs_JSONInput(t *testing.T) {
Expand All @@ -72,7 +75,7 @@ func TestWrapContainerLogs_JSONInput(t *testing.T) {

var buf bytes.Buffer
al := NewActorLogger(&buf, false)
al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default")
al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default", "ctr-1")

dec := json.NewDecoder(&buf)
dec.UseNumber()
Expand Down Expand Up @@ -161,7 +164,7 @@ func TestWrapContainerLogs_MergeLabels(t *testing.T) {

var buf bytes.Buffer
al := NewActorLogger(&buf, false) // labelsKey will be "labels"
al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default")
al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default", "ctr-1")

var m map[string]any
if err := json.Unmarshal(buf.Bytes(), &m); err != nil {
Expand All @@ -186,6 +189,9 @@ func TestWrapContainerLogs_MergeLabels(t *testing.T) {
if labels["ate.dev/actor_id"] != "act-1" {
t.Errorf("got actor_id = %v, want 'act-1'", labels["ate.dev/actor_id"])
}
if labels["ate.dev/container_name"] != "ctr-1" {
t.Errorf("got container_name = %v, want 'ctr-1'", labels["ate.dev/container_name"])
}
}

func TestWrapContainerLogs_LabelCollision(t *testing.T) {
Expand All @@ -194,7 +200,7 @@ func TestWrapContainerLogs_LabelCollision(t *testing.T) {

var buf bytes.Buffer
al := NewActorLogger(&buf, false)
al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default")
al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default", "ctr-1")

var m map[string]any
if err := json.Unmarshal(buf.Bytes(), &m); err != nil {
Expand Down Expand Up @@ -224,7 +230,7 @@ func TestWrapContainerLogs_TrailingGarbage(t *testing.T) {

var buf bytes.Buffer
al := NewActorLogger(&buf, false)
al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default")
al.WrapContainerLogs(rdr, "act-1", "tmpl-1", "default", "ctr-1")

var m map[string]any
if err := json.Unmarshal(buf.Bytes(), &m); err != nil {
Expand Down
28 changes: 14 additions & 14 deletions cmd/ateom-gvisor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,14 @@ func (s *AteomService) RunWorkload(ctx context.Context, req *ateompb.RunWorkload
return nil, fmt.Errorf("while starting pause container: %w", err)
}

pw, err := s.actorLogger.StartJSONLogPipe(req.GetActorId(), req.GetActorTemplateName(), req.GetActorTemplateNamespace())
if err != nil {
return nil, fmt.Errorf("while starting json log pipe: %w", err)
}
defer pw.Close()

// Create and start each application container
// Create and start each application container, each with its own log pipe so
// every line is tagged with the originating container (ate.dev/container_name).
for _, ac := range req.GetSpec().GetContainers() {
pw, err := s.actorLogger.StartJSONLogPipe(req.GetActorId(), req.GetActorTemplateName(), req.GetActorTemplateNamespace(), ac.GetName())
if err != nil {
return nil, fmt.Errorf("while starting json log pipe for %q: %w", ac.GetName(), err)
}
defer pw.Close()
if err := rcmd.cmdCreate(ctx, pw, ac.GetName()); err != nil {
return nil, fmt.Errorf("while creating %q application container: %w", ac.GetName(), err)
}
Expand Down Expand Up @@ -342,14 +342,14 @@ func (s *AteomService) RestoreWorkload(ctx context.Context, req *ateompb.Restore
return nil, fmt.Errorf("while starting pause container: %w", err)
}

pw, err := s.actorLogger.StartJSONLogPipe(req.GetActorId(), req.GetActorTemplateName(), req.GetActorTemplateNamespace())
if err != nil {
return nil, fmt.Errorf("while starting json log pipe: %w", err)
}
defer pw.Close()

// Create and restore each application container
// Create and restore each application container, each with its own log pipe so
// every line is tagged with the originating container (ate.dev/container_name).
for _, ac := range req.GetSpec().GetContainers() {
pw, err := s.actorLogger.StartJSONLogPipe(req.GetActorId(), req.GetActorTemplateName(), req.GetActorTemplateNamespace(), ac.GetName())
if err != nil {
return nil, fmt.Errorf("while starting json log pipe for %q: %w", ac.GetName(), err)
}
defer pw.Close()
if err := rcmd.cmdCreate(ctx, pw, ac.GetName()); err != nil {
return nil, fmt.Errorf("while creating %q application container: %w", ac.GetName(), err)
}
Expand Down
1 change: 1 addition & 0 deletions docs/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ To make underlying infrastructure transitions transparent, Agent Substrate estab
* `ate.dev/actor_id`: The unique identifier of the actor (e.g., `my-counter-1` or `test`).
* `ate.dev/actor_template_name`: The name of the actor's ActorTemplate (e.g., `counter`).
* `ate.dev/actor_template_namespace`: The Kubernetes namespace of the actor's ActorTemplate (e.g., `ate-demo-counter`).
* `ate.dev/container_name`: The name of the container within the actor that produced the log line (e.g., `counter`), so a multi-container actor's logs can be demultiplexed by container.

Currently, Agent Substrate automatically wraps container output and injects these metadata labels into **container logs**. For metrics and distributed tracing, Agent Substrate provides foundational system telemetry and on-demand request tracing, with roadmap plans to fully integrate actor-level correlation.

Expand Down
Loading