Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions cmd/ateapi/internal/controlapi/converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controlapi

import (
"log/slog"

"github.com/agent-substrate/substrate/internal/proto/ateletpb"
atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1"
)

// convert atev1alpha1.SnapshotScope to ateletpb.SnapshotScope
func toAteletSnapshotScope(in atev1alpha1.SnapshotScope) ateletpb.SnapshotScope {
switch in {
case atev1alpha1.SnapshotScopeFull:
return ateletpb.SnapshotScope_SNAPSHOT_SCOPE_FULL
case atev1alpha1.SnapshotScopeData:
return ateletpb.SnapshotScope_SNAPSHOT_SCOPE_DATA
default:
slog.Warn("unknown SnapshotScope; falling back to Full", "scope", string(in))
return ateletpb.SnapshotScope_SNAPSHOT_SCOPE_FULL
}
}
60 changes: 60 additions & 0 deletions cmd/ateapi/internal/controlapi/converter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controlapi

import (
"testing"

"github.com/agent-substrate/substrate/internal/proto/ateletpb"
atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1"
)

func TestToAteletSnapshotScope(t *testing.T) {
tests := []struct {
name string
in atev1alpha1.SnapshotScope
expected ateletpb.SnapshotScope
}{
{
name: "Full scope",
in: atev1alpha1.SnapshotScopeFull,
expected: ateletpb.SnapshotScope_SNAPSHOT_SCOPE_FULL,
},
{
name: "Data scope",
in: atev1alpha1.SnapshotScopeData,
expected: ateletpb.SnapshotScope_SNAPSHOT_SCOPE_DATA,
},
{
name: "Default scope (empty)",
in: "",
expected: ateletpb.SnapshotScope_SNAPSHOT_SCOPE_FULL,
},
{
name: "Default scope (unknown)",
in: "unknown",
expected: ateletpb.SnapshotScope_SNAPSHOT_SCOPE_FULL,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := toAteletSnapshotScope(tt.in)
if result != tt.expected {
t.Errorf("toAteletSnapshotScope(%q) = %v, want %v", tt.in, result, tt.expected)
}
})
}
}
32 changes: 9 additions & 23 deletions cmd/ateapi/internal/controlapi/workflow_pause.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ func (s *CallAteletPauseStep) Execute(ctx context.Context, input *PauseInput, st
}
client := ateletpb.NewAteomHerderClient(ateletConn)

workloadSpec, err := workloadSpecFromActorTemplate(ctx, nil, nil, state.ActorTemplate)
if err != nil {
return err
}

// Checkpoint does not carry the sandbox config: atelet uses the version the
// actor is currently running (recorded on-node at Run/Restore) and pins it
// into the snapshot manifest.
Expand All @@ -122,35 +127,16 @@ func (s *CallAteletPauseStep) Execute(ctx context.Context, input *PauseInput, st
ActorTemplateNamespace: state.Actor.GetActorTemplateNamespace(),
ActorTemplateName: state.Actor.GetActorTemplateName(),
ActorId: state.Actor.GetActorId(),
Spec: &ateletpb.WorkloadSpec{
PauseImage: state.ActorTemplate.Spec.PauseImage,
},
Type: ateletpb.CheckpointType_CHECKPOINT_TYPE_LOCAL,
Spec: workloadSpec,
Type: ateletpb.CheckpointType_CHECKPOINT_TYPE_LOCAL,
Config: &ateletpb.CheckpointRequest_LocalConfig{
LocalConfig: &ateletpb.LocalCheckpointConfiguration{
SnapshotPrefix: state.Actor.InProgressSnapshot,
},
},
Scope: toAteletSnapshotScope(state.ActorTemplate.Spec.SnapshotsConfig.OnPause),
}
for _, ctr := range state.ActorTemplate.Spec.Containers {
ateletCtr := &ateletpb.Container{
Name: ctr.Name,
Image: ctr.Image,
Command: ctr.Command,
}
for _, env := range ctr.Env {
var val string
if env.Value != nil {
val = *env.Value
}
ateletEnv := &ateletpb.EnvEntry{
Name: env.Name,
Value: val,
}
ateletCtr.Env = append(ateletCtr.Env, ateletEnv)
}
req.Spec.Containers = append(req.Spec.Containers, ateletCtr)
}

_, err = client.Checkpoint(ctx, req)
if err != nil {
return fmt.Errorf("while checkpointing workload: %w", err)
Expand Down
3 changes: 3 additions & 0 deletions cmd/ateapi/internal/controlapi/workflow_resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,15 @@ func (s *CallAteletRestoreStep) Execute(ctx context.Context, input *ResumeInput,
SnapshotPrefix: state.Actor.GetLatestSnapshotInfo().GetLocal().SnapshotPrefix,
},
}
req.Scope = toAteletSnapshotScope(state.ActorTemplate.Spec.SnapshotsConfig.OnPause)
case ateapipb.SnapshotType_SNAPSHOT_TYPE_EXTERNAL:
req.Type = ateletpb.CheckpointType_CHECKPOINT_TYPE_EXTERNAL
req.Config = &ateletpb.RestoreRequest_ExternalConfig{
ExternalConfig: &ateletpb.ExternalCheckpointConfiguration{
SnapshotUriPrefix: state.Actor.GetLatestSnapshotInfo().GetExternal().SnapshotUriPrefix,
},
}
req.Scope = toAteletSnapshotScope(state.ActorTemplate.Spec.SnapshotsConfig.OnCommit)
default:
return fmt.Errorf("unsupported snapshot type: %v", state.Actor.GetLatestSnapshotInfo().GetType())
}
Expand All @@ -308,6 +310,7 @@ func (s *CallAteletRestoreStep) Execute(ctx context.Context, input *ResumeInput,
SnapshotUriPrefix: snapshot,
},
},
Scope: toAteletSnapshotScope(state.ActorTemplate.Spec.SnapshotsConfig.OnCommit),
}
_, err = client.Restore(ctx, req)
if err != nil {
Expand Down
32 changes: 9 additions & 23 deletions cmd/ateapi/internal/controlapi/workflow_suspend.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ func (s *CallAteletSuspendStep) Execute(ctx context.Context, input *SuspendInput
}
client := ateletpb.NewAteomHerderClient(ateletConn)

workloadSpec, err := workloadSpecFromActorTemplate(ctx, nil, nil, state.ActorTemplate)
if err != nil {
return err
}

// Checkpoint does not carry the sandbox config: atelet uses the version the
// actor is currently running (recorded on-node at Run/Restore) and pins it
// into the snapshot manifest.
Expand All @@ -124,35 +129,16 @@ func (s *CallAteletSuspendStep) Execute(ctx context.Context, input *SuspendInput
ActorTemplateNamespace: state.Actor.GetActorTemplateNamespace(),
ActorTemplateName: state.Actor.GetActorTemplateName(),
ActorId: state.Actor.GetActorId(),
Spec: &ateletpb.WorkloadSpec{
PauseImage: state.ActorTemplate.Spec.PauseImage,
},
Type: ateletpb.CheckpointType_CHECKPOINT_TYPE_EXTERNAL,
Spec: workloadSpec,
Type: ateletpb.CheckpointType_CHECKPOINT_TYPE_EXTERNAL,
Config: &ateletpb.CheckpointRequest_ExternalConfig{
ExternalConfig: &ateletpb.ExternalCheckpointConfiguration{
SnapshotUriPrefix: state.Actor.GetInProgressSnapshot(),
},
},
Scope: toAteletSnapshotScope(state.ActorTemplate.Spec.SnapshotsConfig.OnCommit),
}
for _, ctr := range state.ActorTemplate.Spec.Containers {
ateletCtr := &ateletpb.Container{
Name: ctr.Name,
Image: ctr.Image,
Command: ctr.Command,
}
for _, env := range ctr.Env {
var val string
if env.Value != nil {
val = *env.Value
}
ateletEnv := &ateletpb.EnvEntry{
Name: env.Name,
Value: val,
}
ateletCtr.Env = append(ateletCtr.Env, ateletEnv)
}
req.Spec.Containers = append(req.Spec.Containers, ateletCtr)
}

_, err = client.Checkpoint(ctx, req)
if err != nil {
return fmt.Errorf("while checkpointing workload: %w", err)
Expand Down
23 changes: 23 additions & 0 deletions cmd/ateapi/internal/controlapi/workload_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ func workloadSpecFromActorTemplate(ctx context.Context, kubeClient kubernetes.In
workloadSpec := &ateletpb.WorkloadSpec{
PauseImage: actorTemplate.Spec.PauseImage,
}

// add volumes
for _, vol := range actorTemplate.Spec.Volumes {
// volume is durable-dir type
if vol.VolumeSource.DurableDir != nil {
ateletVol := &ateletpb.Volume{
Name: vol.Name,
Type: ateletpb.VolumeType_VOLUME_TYPE_DURABLE_DIR,
Source: &ateletpb.Volume_DurableDir{
DurableDir: &ateletpb.DurableDirVolume{},
},
}
workloadSpec.Volumes = append(workloadSpec.Volumes, ateletVol)
}
}

resolver := envResolver{
kubeClient: kubeClient,
namespace: actorTemplate.Namespace,
Expand All @@ -57,6 +73,13 @@ func workloadSpecFromActorTemplate(ctx context.Context, kubeClient kubernetes.In
ateletCtr.Env = append(ateletCtr.Env, ateletEnv)
}
}
for _, mount := range ctr.VolumeMounts {
ateletCtr.VolumeMounts = append(ateletCtr.VolumeMounts, &ateletpb.VolumeMount{
Name: mount.Name,
MountPath: mount.MountPath,
})
}

workloadSpec.Containers = append(workloadSpec.Containers, ateletCtr)
}

Expand Down
Loading
Loading