diff --git a/pkg/compose/containers.go b/pkg/compose/containers.go index f1a54fa1a7..bbe5ea3820 100644 --- a/pkg/compose/containers.go +++ b/pkg/compose/containers.go @@ -56,6 +56,20 @@ func (s *composeService) getContainers(ctx context.Context, project string, oneO return containers, nil } +// getContainersByService returns all non-oneoff containers for the project, grouped by service name. +func (s *composeService) getContainersByService(ctx context.Context, projectName string) (map[string]Containers, error) { + all, err := s.getContainers(ctx, projectName, oneOffExclude, true) + if err != nil { + return nil, err + } + result := map[string]Containers{} + for _, c := range all.filter(isNotOneOff) { + svc := c.Labels[api.ServiceLabel] + result[svc] = append(result[svc], c) + } + return result, nil +} + func getDefaultFilters(projectName string, oneOff oneOff, selectedServices ...string) client.Filters { f := projectFilter(projectName) if len(selectedServices) == 1 { @@ -158,14 +172,6 @@ func (containers Containers) filter(predicates ...containerPredicate) Containers return filtered } -func (containers Containers) names() []string { - var names []string - for _, c := range containers { - names = append(names, getCanonicalContainerName(c)) - } - return names -} - func (containers Containers) forEach(fn func(container.Summary)) { for _, c := range containers { fn(c) diff --git a/pkg/compose/convergence.go b/pkg/compose/convergence.go index 609f803949..d237debdc3 100644 --- a/pkg/compose/convergence.go +++ b/pkg/compose/convergence.go @@ -21,8 +21,6 @@ import ( "errors" "fmt" "maps" - "slices" - "sort" "strconv" "strings" "sync" @@ -31,18 +29,13 @@ import ( "github.com/compose-spec/compose-go/v2/types" "github.com/containerd/platforms" "github.com/moby/moby/api/types/container" - mmount "github.com/moby/moby/api/types/mount" "github.com/moby/moby/client" "github.com/moby/moby/client/pkg/versions" specs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/sirupsen/logrus" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" - "github.com/docker/compose/v5/internal/tracing" "github.com/docker/compose/v5/pkg/api" - "github.com/docker/compose/v5/pkg/utils" ) const ( @@ -56,208 +49,6 @@ const ( // re-creating container, adding or removing replicas, or starting stopped containers. // Cross services dependencies are managed by creating services in expected order and updating `service:xx` reference // when a service has converged, so dependent ones can be managed with resolved containers references. -type convergence struct { - compose *composeService - services map[string]Containers - networks map[string]string - volumes map[string]string - stateMutex sync.Mutex -} - -func (c *convergence) getObservedState(serviceName string) Containers { - c.stateMutex.Lock() - defer c.stateMutex.Unlock() - return c.services[serviceName] -} - -func (c *convergence) setObservedState(serviceName string, containers Containers) { - c.stateMutex.Lock() - defer c.stateMutex.Unlock() - c.services[serviceName] = containers -} - -func newConvergence(services []string, state Containers, networks map[string]string, volumes map[string]string, s *composeService) *convergence { - observedState := map[string]Containers{} - for _, s := range services { - observedState[s] = Containers{} - } - for _, c := range state.filter(isNotOneOff) { - service := c.Labels[api.ServiceLabel] - observedState[service] = append(observedState[service], c) - } - return &convergence{ - compose: s, - services: observedState, - networks: networks, - volumes: volumes, - } -} - -func (c *convergence) apply(ctx context.Context, project *types.Project, options api.CreateOptions) error { - return InDependencyOrder(ctx, project, func(ctx context.Context, name string) error { - service, err := project.GetService(name) - if err != nil { - return err - } - - // Skip provider services when the caller opted out (e.g. watch rebuild), - // since providers were already set up during initial "up". - if service.Provider != nil && options.SkipProviders { - return nil - } - - return tracing.SpanWrapFunc("service/apply", tracing.ServiceOptions(service), func(ctx context.Context) error { - strategy := options.RecreateDependencies - if slices.Contains(options.Services, name) { - strategy = options.Recreate - } - return c.ensureService(ctx, project, service, strategy, options.Inherit, options.Timeout) - })(ctx) - }) -} - -func (c *convergence) ensureService(ctx context.Context, project *types.Project, service types.ServiceConfig, recreate string, inherit bool, timeout *time.Duration) error { //nolint:gocyclo - if service.Provider != nil { - return c.compose.runPlugin(ctx, project, service, "up") - } - expected, err := getScale(service) - if err != nil { - return err - } - containers := c.getObservedState(service.Name) - actual := len(containers) - updated := make(Containers, expected) - - eg, ctx := errgroup.WithContext(ctx) - - err = c.resolveServiceReferences(&service) - if err != nil { - return err - } - - sort.Slice(containers, func(i, j int) bool { - // select obsolete containers first, so they get removed as we scale down - if obsolete, _ := c.mustRecreate(service, containers[i], recreate); obsolete { - // i is obsolete, so must be first in the list - return true - } - if obsolete, _ := c.mustRecreate(service, containers[j], recreate); obsolete { - // j is obsolete, so must be first in the list - return false - } - - // For up-to-date containers, sort by container number to preserve low-values in container numbers - ni, erri := strconv.Atoi(containers[i].Labels[api.ContainerNumberLabel]) - nj, errj := strconv.Atoi(containers[j].Labels[api.ContainerNumberLabel]) - if erri == nil && errj == nil { - return ni > nj - } - - // If we don't get a container number (?) just sort by creation date - return containers[i].Created < containers[j].Created - }) - - slices.Reverse(containers) - for i, ctr := range containers { - if i >= expected { - // Scale Down - // As we sorted containers, obsolete ones and/or highest number will be removed - ctr := ctr - traceOpts := append(tracing.ServiceOptions(service), tracing.ContainerOptions(ctr)...) - eg.Go(tracing.SpanWrapFuncForErrGroup(ctx, "service/scale/down", traceOpts, func(ctx context.Context) error { - return c.compose.stopAndRemoveContainer(ctx, ctr, &service, timeout, false) - })) - continue - } - - mustRecreate, err := c.mustRecreate(service, ctr, recreate) - if err != nil { - return err - } - if mustRecreate { - err := c.stopDependentContainers(ctx, project, service) - if err != nil { - return err - } - - i, ctr := i, ctr - eg.Go(tracing.SpanWrapFuncForErrGroup(ctx, "container/recreate", tracing.ContainerOptions(ctr), func(ctx context.Context) error { - recreated, err := c.compose.recreateContainer(ctx, project, service, ctr, inherit, timeout) - updated[i] = recreated - return err - })) - continue - } - - // Enforce non-diverged containers are running - name := getContainerProgressName(ctr) - switch ctr.State { - case container.StateRunning: - c.compose.events.On(runningEvent(name)) - case container.StateCreated: - case container.StateRestarting: - case container.StateExited: - default: - ctr := ctr - eg.Go(tracing.EventWrapFuncForErrGroup(ctx, "service/start", tracing.ContainerOptions(ctr), func(ctx context.Context) error { - return c.compose.startContainer(ctx, ctr) - })) - } - updated[i] = ctr - } - - next := nextContainerNumber(containers) - for i := 0; i < expected-actual; i++ { - // Scale UP - number := next + i - name := getContainerName(project.Name, service, number) - eventOpts := tracing.SpanOptions{trace.WithAttributes(attribute.String("container.name", name))} - eg.Go(tracing.EventWrapFuncForErrGroup(ctx, "service/scale/up", eventOpts, func(ctx context.Context) error { - opts := createOptions{ - AutoRemove: false, - AttachStdin: false, - UseNetworkAliases: true, - Labels: mergeLabels(service.Labels, service.CustomLabels), - } - ctr, err := c.compose.createContainer(ctx, project, service, name, number, opts) - updated[actual+i] = ctr - return err - })) - continue - } - - err = eg.Wait() - c.setObservedState(service.Name, updated) - return err -} - -func (c *convergence) stopDependentContainers(ctx context.Context, project *types.Project, service types.ServiceConfig) error { - // Stop dependent containers, so they will be restarted after service is re-created - dependents := project.GetDependentsForService(service, func(dependency types.ServiceDependency) bool { - return dependency.Restart - }) - if len(dependents) == 0 { - return nil - } - err := c.compose.stop(ctx, project.Name, api.StopOptions{ - Services: dependents, - Project: project, - }, nil) - if err != nil { - return err - } - - for _, name := range dependents { - dependentStates := c.getObservedState(name) - for i, dependent := range dependentStates { - dependent.State = container.StateExited - dependentStates[i] = dependent - } - c.setObservedState(name, dependentStates) - } - return nil -} - func getScale(config types.ServiceConfig) (int, error) { scale := config.GetScale() if scale > 1 && config.ContainerName != "" { @@ -268,21 +59,18 @@ func getScale(config types.ServiceConfig) (int, error) { return scale, nil } -// resolveServiceReferences replaces reference to another service with reference to an actual container -func (c *convergence) resolveServiceReferences(service *types.ServiceConfig) error { - err := c.resolveVolumeFrom(service) - if err != nil { - return err - } - - err = c.resolveSharedNamespaces(service) - if err != nil { +// resolveServiceReferences replaces references to other services with references +// to actual container IDs. It resolves VolumesFrom, NetworkMode, IPC and PID +// shared namespaces. The containersByService map provides the observed containers +// grouped by service name. +func resolveServiceReferences(service *types.ServiceConfig, containersByService map[string]Containers) error { + if err := resolveVolumeFrom(service, containersByService); err != nil { return err } - return nil + return resolveSharedNamespaces(service, containersByService) } -func (c *convergence) resolveVolumeFrom(service *types.ServiceConfig) error { +func resolveVolumeFrom(service *types.ServiceConfig, containersByService map[string]Containers) error { for i, vol := range service.VolumesFrom { spec := strings.Split(vol, ":") if len(spec) == 0 { @@ -293,7 +81,7 @@ func (c *convergence) resolveVolumeFrom(service *types.ServiceConfig) error { continue } name := spec[0] - dependencies := c.getObservedState(name) + dependencies := containersByService[name] if len(dependencies) == 0 { return fmt.Errorf("cannot share volume with service %s: container missing", name) } @@ -302,28 +90,25 @@ func (c *convergence) resolveVolumeFrom(service *types.ServiceConfig) error { return nil } -func (c *convergence) resolveSharedNamespaces(service *types.ServiceConfig) error { - str := service.NetworkMode - if name := getDependentServiceFromMode(str); name != "" { - dependencies := c.getObservedState(name) +func resolveSharedNamespaces(service *types.ServiceConfig, containersByService map[string]Containers) error { + if name := getDependentServiceFromMode(service.NetworkMode); name != "" { + dependencies := containersByService[name] if len(dependencies) == 0 { return fmt.Errorf("cannot share network namespace with service %s: container missing", name) } service.NetworkMode = types.ContainerPrefix + dependencies.sorted()[0].ID } - str = service.Ipc - if name := getDependentServiceFromMode(str); name != "" { - dependencies := c.getObservedState(name) + if name := getDependentServiceFromMode(service.Ipc); name != "" { + dependencies := containersByService[name] if len(dependencies) == 0 { return fmt.Errorf("cannot share IPC namespace with service %s: container missing", name) } service.Ipc = types.ContainerPrefix + dependencies.sorted()[0].ID } - str = service.Pid - if name := getDependentServiceFromMode(str); name != "" { - dependencies := c.getObservedState(name) + if name := getDependentServiceFromMode(service.Pid); name != "" { + dependencies := containersByService[name] if len(dependencies) == 0 { return fmt.Errorf("cannot share PID namespace with service %s: container missing", name) } @@ -333,89 +118,6 @@ func (c *convergence) resolveSharedNamespaces(service *types.ServiceConfig) erro return nil } -func (c *convergence) mustRecreate(expected types.ServiceConfig, actual container.Summary, policy string) (bool, error) { - if policy == api.RecreateNever { - return false, nil - } - if policy == api.RecreateForce { - return true, nil - } - configHash, err := ServiceHash(expected) - if err != nil { - return false, err - } - configChanged := actual.Labels[api.ConfigHashLabel] != configHash - imageUpdated := actual.Labels[api.ImageDigestLabel] != expected.CustomLabels[api.ImageDigestLabel] - if configChanged || imageUpdated { - return true, nil - } - - if c.networks != nil && actual.State == "running" { - if checkExpectedNetworks(expected, actual, c.networks) { - return true, nil - } - } - - if c.volumes != nil { - if checkExpectedVolumes(expected, actual, c.volumes) { - return true, nil - } - } - - return false, nil -} - -func checkExpectedNetworks(expected types.ServiceConfig, actual container.Summary, networks map[string]string) bool { - // check the networks container is connected to are the expected ones - for net := range expected.Networks { - id := networks[net] - if id == "swarm" { - // corner-case : swarm overlay network isn't visible until a container is attached - continue - } - found := false - for _, settings := range actual.NetworkSettings.Networks { - if settings.NetworkID == id { - found = true - break - } - } - if !found { - // config is up-to-date but container is not connected to network - return true - } - } - return false -} - -func checkExpectedVolumes(expected types.ServiceConfig, actual container.Summary, volumes map[string]string) bool { - // check container's volume mounts and search for the expected ones - for _, vol := range expected.Volumes { - if vol.Type != string(mmount.TypeVolume) { - continue - } - if vol.Source == "" { - continue - } - id := volumes[vol.Source] - found := false - for _, mount := range actual.Mounts { - if mount.Type != mmount.TypeVolume { - continue - } - if mount.Name == id { - found = true - break - } - } - if !found { - // config is up-to-date but container doesn't have volume mounted - return true - } - } - return false -} - func getContainerName(projectName string, service types.ServiceConfig, number int) string { name := getDefaultContainerName(projectName, service.Name, strconv.Itoa(number)) if service.ContainerName != "" { @@ -621,85 +323,9 @@ func (s *composeService) createContainer(ctx context.Context, project *types.Pro return ctr, nil } -func (s *composeService) recreateContainer(ctx context.Context, project *types.Project, service types.ServiceConfig, - replaced container.Summary, inherit bool, timeout *time.Duration, -) (created container.Summary, err error) { - eventName := getContainerProgressName(replaced) - s.events.On(newEvent(eventName, api.Working, "Recreate")) - defer func() { - if err != nil && ctx.Err() == nil { - s.events.On(api.Resource{ - ID: eventName, - Status: api.Error, - Text: err.Error(), - }) - } - }() - - number, err := strconv.Atoi(replaced.Labels[api.ContainerNumberLabel]) - if err != nil { - return created, err - } - - var inherited *container.Summary - if inherit { - inherited = &replaced - } - - replacedContainerName := service.ContainerName - if replacedContainerName == "" { - replacedContainerName = service.Name + api.Separator + strconv.Itoa(number) - } - name := getContainerName(project.Name, service, number) - tmpName := fmt.Sprintf("%s_%s", replaced.ID[:12], name) - opts := createOptions{ - AutoRemove: false, - AttachStdin: false, - UseNetworkAliases: true, - Labels: mergeLabels(service.Labels, service.CustomLabels).Add(api.ContainerReplaceLabel, replacedContainerName), - } - created, err = s.createMobyContainer(ctx, project, service, tmpName, number, inherited, opts) - if err != nil { - return created, err - } - - timeoutInSecond := utils.DurationSecondToInt(timeout) - _, err = s.apiClient().ContainerStop(ctx, replaced.ID, client.ContainerStopOptions{Timeout: timeoutInSecond}) - if err != nil { - return created, err - } - - _, err = s.apiClient().ContainerRemove(ctx, replaced.ID, client.ContainerRemoveOptions{}) - if err != nil { - return created, err - } - - _, err = s.apiClient().ContainerRename(ctx, tmpName, client.ContainerRenameOptions{ - NewName: name, - }) - if err != nil { - return created, err - } - - s.events.On(newEvent(eventName, api.Done, "Recreated")) - return created, err -} - // force sequential calls to ContainerStart to prevent race condition in engine assigning ports from ranges var startMx sync.Mutex -func (s *composeService) startContainer(ctx context.Context, ctr container.Summary) error { - s.events.On(newEvent(getContainerProgressName(ctr), api.Working, "Restart")) - startMx.Lock() - defer startMx.Unlock() - _, err := s.apiClient().ContainerStart(ctx, ctr.ID, client.ContainerStartOptions{}) - if err != nil { - return err - } - s.events.On(newEvent(getContainerProgressName(ctr), api.Done, "Restarted")) - return nil -} - func (s *composeService) createMobyContainer(ctx context.Context, project *types.Project, service types.ServiceConfig, name string, number int, inherit *container.Summary, opts createOptions, ) (container.Summary, error) { diff --git a/pkg/compose/create.go b/pkg/compose/create.go index cc0b4afbce..97c12f6532 100644 --- a/pkg/compose/create.go +++ b/pkg/compose/create.go @@ -97,33 +97,36 @@ func (s *composeService) create(ctx context.Context, project *types.Project, opt return err } - var observedState Containers - observedState, err = s.getContainers(ctx, project.Name, oneOffInclude, true) + // Temporary implementation of use_api_socket until we get actual support inside docker engine + project, err = s.useAPISocket(project) if err != nil { return err } - orphans := observedState.filter(isOrphaned(project)) - if len(orphans) > 0 && !options.IgnoreOrphans { - if options.RemoveOrphans { - err := s.removeContainers(ctx, orphans, nil, nil, false) - if err != nil { - return err - } - } else { - logrus.Warnf("Found orphan containers (%s) for this project. If "+ - "you removed or renamed this service in your compose "+ - "file, you can run this command with the "+ - "--remove-orphans flag to clean it up.", orphans.names()) - } + + observed, err := s.collectObservedState(ctx, project) + if err != nil { + return err } + observed.setResolvedNetworks(networks, project) + observed.setResolvedVolumes(volumes) - // Temporary implementation of use_api_socket until we get actual support inside docker engine - project, err = s.useAPISocket(project) + if len(observed.Orphans) > 0 && !options.IgnoreOrphans && !options.RemoveOrphans { + logrus.Warnf("Found orphan containers (%s) for this project. If "+ + "you removed or renamed this service in your compose "+ + "file, you can run this command with the "+ + "--remove-orphans flag to clean it up.", observed.orphanNames()) + } + + plan, err := reconcile(ctx, project, observed, toReconcileOptions(options), s.prompt) if err != nil { return err } - return newConvergence(options.Services, observedState, networks, volumes, s).apply(ctx, project, options) + // Emit "Running" events for containers that are already up-to-date, + // matching the previous convergence behavior for progress display. + emitRunningEvents(observed, plan, s.events) + + return s.executePlan(ctx, project, plan) } func prepareNetworks(project *types.Project) { diff --git a/pkg/compose/executor.go b/pkg/compose/executor.go new file mode 100644 index 0000000000..8fe3bc46cb --- /dev/null +++ b/pkg/compose/executor.go @@ -0,0 +1,450 @@ +/* + Copyright 2020 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package compose + +import ( + "context" + "fmt" + "sync" + + "github.com/compose-spec/compose-go/v2/types" + "github.com/moby/moby/client" + "golang.org/x/sync/errgroup" + + "github.com/docker/compose/v5/pkg/api" + "github.com/docker/compose/v5/pkg/utils" +) + +// planExecutor executes a reconciliation Plan by walking the DAG and performing +// each atomic operation via the Docker API. It carries no decision logic — all +// decisions were made by the reconciler when building the plan. +type planExecutor struct { + compose *composeService + project *types.Project + pctx *reconciliationContext +} + +// reconciliationContext holds results produced by completed nodes so that downstream +// nodes can reference them (e.g. a RenameContainer node needs the container ID +// created by a prior CreateContainer node). +type reconciliationContext struct { + mu sync.Mutex + results map[int]operationResult +} + +type operationResult struct { + ContainerID string + ContainerName string +} + +func (pc *reconciliationContext) set(nodeID int, r operationResult) { + pc.mu.Lock() + defer pc.mu.Unlock() + pc.results[nodeID] = r +} + +func (pc *reconciliationContext) get(nodeID int) operationResult { + pc.mu.Lock() + defer pc.mu.Unlock() + return pc.results[nodeID] +} + +// executePlan walks the plan DAG, executing nodes in parallel where possible +// while respecting dependency edges. It emits progress events and handles +// group-based event aggregation for composite operations like recreate. +func (s *composeService) executePlan(ctx context.Context, project *types.Project, plan *Plan) error { + if plan.IsEmpty() { + return nil + } + + exec := &planExecutor{ + compose: s, + project: project, + pctx: &reconciliationContext{results: map[int]operationResult{}}, + } + + // Build a done-channel per node so dependents can wait + done := make(map[int]chan struct{}, len(plan.Nodes)) + for _, node := range plan.Nodes { + done[node.ID] = make(chan struct{}) + } + + // Track group event state: first node emits Working, last emits Done + groups := exec.buildGroupTracker(plan) + + eg, ctx := errgroup.WithContext(ctx) + for _, node := range plan.Nodes { + eg.Go(func() error { + // Wait for all dependencies + for _, dep := range node.DependsOn { + select { + case <-done[dep.ID]: + case <-ctx.Done(): + return ctx.Err() + } + } + + // Emit group start event if this is the first node of a group + groups.onNodeStart(node, s.events) + + err := exec.executeNode(ctx, node) + + if err == nil { + // Emit group done event if this is the last node of a group + groups.onNodeDone(node, s.events) + } else if ctx.Err() == nil { + groups.onNodeError(node, s.events, err) + } + + close(done[node.ID]) + return err + }) + } + + return eg.Wait() +} + +// executeNode dispatches a single plan node to the appropriate API call. +func (exec *planExecutor) executeNode(ctx context.Context, node *PlanNode) error { + op := node.Operation + switch op.Type { + case OpCreateNetwork: + return exec.execCreateNetwork(ctx, op) + case OpRemoveNetwork: + return exec.execRemoveNetwork(ctx, op) + case OpDisconnectNetwork: + return exec.execDisconnectNetwork(ctx, op) + case OpConnectNetwork: + return exec.execConnectNetwork(ctx, op) + case OpCreateVolume: + return exec.execCreateVolume(ctx, op) + case OpRemoveVolume: + return exec.execRemoveVolume(ctx, op) + case OpCreateContainer: + return exec.execCreateContainer(ctx, node) + case OpStartContainer: + return exec.execStartContainer(ctx, op) + case OpStopContainer: + return exec.execStopContainer(ctx, op) + case OpRemoveContainer: + return exec.execRemoveContainer(ctx, op) + case OpRenameContainer: + return exec.execRenameContainer(ctx, node) + case OpRunProvider: + return exec.compose.runPlugin(ctx, exec.project, *op.Service, "up") + default: + return fmt.Errorf("unknown operation type: %s", op.Type) + } +} + +// --- Network operations --- + +func (exec *planExecutor) execCreateNetwork(ctx context.Context, op Operation) error { + _, err := exec.compose.ensureNetwork(ctx, exec.project, op.Name, op.Network) + return err +} + +func (exec *planExecutor) execRemoveNetwork(ctx context.Context, op Operation) error { + _, err := exec.compose.apiClient().NetworkRemove(ctx, op.Name, client.NetworkRemoveOptions{}) + return err +} + +func (exec *planExecutor) execDisconnectNetwork(ctx context.Context, op Operation) error { + _, err := exec.compose.apiClient().NetworkDisconnect(ctx, op.Name, client.NetworkDisconnectOptions{ + Container: op.Container.ID, + Force: true, + }) + return err +} + +func (exec *planExecutor) execConnectNetwork(ctx context.Context, op Operation) error { + _, err := exec.compose.apiClient().NetworkConnect(ctx, op.Name, client.NetworkConnectOptions{ + Container: op.Container.ID, + }) + return err +} + +// --- Volume operations --- + +func (exec *planExecutor) execCreateVolume(ctx context.Context, op Operation) error { + return exec.compose.createVolume(ctx, *op.Volume) +} + +func (exec *planExecutor) execRemoveVolume(ctx context.Context, op Operation) error { + _, err := exec.compose.apiClient().VolumeRemove(ctx, op.Name, client.VolumeRemoveOptions{Force: true}) + return err +} + +// --- Container operations --- + +func (exec *planExecutor) execCreateContainer(ctx context.Context, node *PlanNode) error { + op := node.Operation + service := *op.Service + + // Resolve service references (network_mode, ipc, pid, volumes_from) to actual + // container IDs. This must happen at execution time because the referenced + // containers may have just been created by earlier plan nodes. + containersByService, err := exec.compose.getContainersByService(ctx, exec.project.Name) + if err != nil { + return err + } + if err := resolveServiceReferences(&service, containersByService); err != nil { + return err + } + + labels := mergeLabels(service.Labels, service.CustomLabels) + if op.Inherited != nil { + // This is a recreate: add the replace label + replacedName := op.Service.ContainerName + if replacedName == "" { + replacedName = fmt.Sprintf("%s%s%d", op.Service.Name, api.Separator, op.Number) + } + labels = labels.Add(api.ContainerReplaceLabel, replacedName) + } + + opts := createOptions{ + AutoRemove: false, + AttachStdin: false, + UseNetworkAliases: true, + Labels: labels, + } + ctr, err := exec.compose.createMobyContainer(ctx, exec.project, service, op.Name, op.Number, op.Inherited, opts) + if err != nil { + return err + } + + exec.pctx.set(node.ID, operationResult{ + ContainerID: ctr.ID, + ContainerName: op.Name, + }) + return nil +} + +func (exec *planExecutor) execStartContainer(ctx context.Context, op Operation) error { + startMx.Lock() + defer startMx.Unlock() + _, err := exec.compose.apiClient().ContainerStart(ctx, op.Container.ID, client.ContainerStartOptions{}) + return err +} + +func (exec *planExecutor) execStopContainer(ctx context.Context, op Operation) error { + _, err := exec.compose.apiClient().ContainerStop(ctx, op.Container.ID, client.ContainerStopOptions{ + Timeout: utils.DurationSecondToInt(op.Timeout), + }) + return err +} + +func (exec *planExecutor) execRemoveContainer(ctx context.Context, op Operation) error { + _, err := exec.compose.apiClient().ContainerRemove(ctx, op.Container.ID, client.ContainerRemoveOptions{Force: true}) + return err +} + +func (exec *planExecutor) execRenameContainer(ctx context.Context, node *PlanNode) error { + // Find the CreateContainer node in our dependencies to get the created container ID + var createdID string + for _, dep := range node.DependsOn { + r := exec.pctx.get(dep.ID) + if r.ContainerID != "" { + createdID = r.ContainerID + break + } + // Also check transitive: the create node may not be a direct dep + // Walk up the group chain + } + // Fallback: walk all results in the group + if createdID == "" { + for _, n := range node.DependsOn { + createdID = exec.findCreatedIDInChain(n) + if createdID != "" { + break + } + } + } + if createdID == "" { + return fmt.Errorf("rename: could not find created container ID for node #%d", node.ID) + } + + _, err := exec.compose.apiClient().ContainerRename(ctx, createdID, client.ContainerRenameOptions{ + NewName: node.Operation.Name, + }) + return err +} + +// findCreatedIDInChain walks the dependency chain to find a CreateContainer result. +func (exec *planExecutor) findCreatedIDInChain(node *PlanNode) string { + r := exec.pctx.get(node.ID) + if r.ContainerID != "" { + return r.ContainerID + } + for _, dep := range node.DependsOn { + if id := exec.findCreatedIDInChain(dep); id != "" { + return id + } + } + return "" +} + +// --- Group event tracking --- + +// groupTracker manages event emission for grouped nodes (e.g. recreate). +// The first node starting emits Working, the last finishing emits Done. +type groupTracker struct { + mu sync.Mutex + groups map[string]*groupState +} + +type groupState struct { + eventName string // e.g. "Container myproject-web-1" + total int // total nodes in this group + started int // nodes that have started + done int // nodes that have completed +} + +func (exec *planExecutor) buildGroupTracker(plan *Plan) *groupTracker { + gt := &groupTracker{groups: map[string]*groupState{}} + for _, node := range plan.Nodes { + if node.Group == "" { + continue + } + if _, ok := gt.groups[node.Group]; !ok { + gt.groups[node.Group] = &groupState{} + } + gt.groups[node.Group].total++ + // Pick the event name from a node that has the existing container reference + if gt.groups[node.Group].eventName == "" && node.Operation.Container != nil { + gt.groups[node.Group].eventName = getContainerProgressName(*node.Operation.Container) + } + } + // Fallback for groups where no node had a Container (shouldn't happen for recreate) + for name, gs := range gt.groups { + if gs.eventName == "" { + gs.eventName = name + } + } + return gt +} + +func (gt *groupTracker) onNodeStart(node *PlanNode, events api.EventProcessor) { + if node.Group == "" { + // Ungrouped: emit individual event + emitStartEvent(node, events) + return + } + gt.mu.Lock() + defer gt.mu.Unlock() + gs := gt.groups[node.Group] + gs.started++ + if gs.started == 1 { + events.On(newEvent(gs.eventName, api.Working, "Recreate")) + } +} + +func (gt *groupTracker) onNodeDone(node *PlanNode, events api.EventProcessor) { + if node.Group == "" { + emitDoneEvent(node, events) + return + } + gt.mu.Lock() + defer gt.mu.Unlock() + gs := gt.groups[node.Group] + gs.done++ + if gs.done == gs.total { + events.On(newEvent(gs.eventName, api.Done, "Recreated")) + } +} + +func (gt *groupTracker) onNodeError(node *PlanNode, events api.EventProcessor, err error) { + if node.Group == "" { + emitErrorEvent(node, events, err) + return + } + gt.mu.Lock() + defer gt.mu.Unlock() + gs := gt.groups[node.Group] + events.On(api.Resource{ + ID: gs.eventName, + Status: api.Error, + Text: err.Error(), + }) +} + +// emitStartEvent emits the appropriate Working event for an ungrouped node. +func emitStartEvent(node *PlanNode, events api.EventProcessor) { + op := node.Operation + switch op.Type { + case OpCreateContainer: + events.On(creatingEvent("Container " + op.Name)) + case OpStartContainer: + events.On(startingEvent(getContainerProgressName(*op.Container))) + case OpStopContainer: + events.On(stoppingEvent(getContainerProgressName(*op.Container))) + case OpRemoveContainer: + events.On(removingEvent(getContainerProgressName(*op.Container))) + case OpCreateNetwork: + events.On(creatingEvent("Network " + op.Name)) + case OpRemoveNetwork: + events.On(removingEvent("Network " + op.Name)) + case OpCreateVolume: + events.On(creatingEvent("Volume " + op.Name)) + case OpRemoveVolume: + events.On(removingEvent("Volume " + op.Name)) + } +} + +// emitDoneEvent emits the appropriate Done event for an ungrouped node. +func emitDoneEvent(node *PlanNode, events api.EventProcessor) { + op := node.Operation + switch op.Type { + case OpCreateContainer: + events.On(createdEvent("Container " + op.Name)) + case OpStartContainer: + events.On(startedEvent(getContainerProgressName(*op.Container))) + case OpStopContainer: + events.On(stoppedEvent(getContainerProgressName(*op.Container))) + case OpRemoveContainer: + events.On(removedEvent(getContainerProgressName(*op.Container))) + case OpCreateNetwork: + events.On(createdEvent("Network " + op.Name)) + case OpRemoveNetwork: + events.On(removedEvent("Network " + op.Name)) + case OpCreateVolume: + events.On(createdEvent("Volume " + op.Name)) + case OpRemoveVolume: + events.On(removedEvent("Volume " + op.Name)) + } +} + +// emitErrorEvent emits an error event for an ungrouped node. +func emitErrorEvent(node *PlanNode, events api.EventProcessor, err error) { + op := node.Operation + var id string + switch { + case op.Container != nil: + id = getContainerProgressName(*op.Container) + default: + id = op.ResourceID + } + events.On(api.Resource{ + ID: id, + Status: api.Error, + Text: err.Error(), + }) +} + +// getContainerProgressName returns the display name for a container in progress output. +// Re-exported from convergence.go for use by the executor. +var _ = getContainerProgressName // ensure it stays accessible diff --git a/pkg/compose/executor_test.go b/pkg/compose/executor_test.go new file mode 100644 index 0000000000..21a3c783ab --- /dev/null +++ b/pkg/compose/executor_test.go @@ -0,0 +1,127 @@ +/* + Copyright 2020 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package compose + +import ( + "context" + "testing" + + "github.com/compose-spec/compose-go/v2/types" + "github.com/moby/moby/api/types/container" + "github.com/moby/moby/client" + "go.uber.org/mock/gomock" + "gotest.tools/v3/assert" + + "github.com/docker/compose/v5/pkg/api" + "github.com/docker/compose/v5/pkg/mocks" +) + +// noopEventProcessor discards all events. +type noopEventProcessor struct{} + +func (noopEventProcessor) Start(_ context.Context, _ string) {} +func (noopEventProcessor) On(_ ...api.Resource) {} +func (noopEventProcessor) Done(_ string, _ bool) {} + +func newTestService(t *testing.T) (*composeService, *mocks.MockAPIClient) { + t.Helper() + mockCtrl := gomock.NewController(t) + cli := mocks.NewMockCli(mockCtrl) + apiClient := mocks.NewMockAPIClient(mockCtrl) + cli.EXPECT().Client().Return(apiClient).AnyTimes() + + svc, err := NewComposeService(cli, WithEventProcessor(noopEventProcessor{})) + assert.NilError(t, err) + return svc.(*composeService), apiClient +} + +func TestExecutePlanEmpty(t *testing.T) { + svc, _ := newTestService(t) + err := svc.executePlan(t.Context(), &types.Project{Name: "test"}, &Plan{}) + assert.NilError(t, err) +} + +func TestExecutePlanCreateNetwork(t *testing.T) { + svc, apiClient := newTestService(t) + + nw := types.NetworkConfig{Name: "test_default"} + project := &types.Project{ + Name: "test", + Networks: types.Networks{"default": nw}, + } + + // ensureNetwork: inspect → not found, list → empty, create + apiClient.EXPECT().NetworkInspect(gomock.Any(), "test_default", gomock.Any()). + Return(client.NetworkInspectResult{}, notFoundError{}) + apiClient.EXPECT().NetworkList(gomock.Any(), gomock.Any()). + Return(client.NetworkListResult{}, nil) + apiClient.EXPECT().NetworkCreate(gomock.Any(), "test_default", gomock.Any()). + Return(client.NetworkCreateResult{ID: "net1"}, nil) + + plan := &Plan{} + plan.addNode(Operation{ + Type: OpCreateNetwork, + ResourceID: "network:default", + Cause: "not found", + Name: nw.Name, + Network: &nw, + }, "") + + err := svc.executePlan(t.Context(), project, plan) + assert.NilError(t, err) +} + +func TestExecutePlanStopRemoveContainer(t *testing.T) { + svc, apiClient := newTestService(t) + + ctr := container.Summary{ + ID: "c1", + Names: []string{"/test-web-1"}, + Labels: map[string]string{ + api.ServiceLabel: "web", + api.ContainerNumberLabel: "1", + }, + } + + apiClient.EXPECT().ContainerStop(gomock.Any(), "c1", gomock.Any()). + Return(client.ContainerStopResult{}, nil) + apiClient.EXPECT().ContainerRemove(gomock.Any(), "c1", gomock.Any()). + Return(client.ContainerRemoveResult{}, nil) + + plan := &Plan{} + stopNode := plan.addNode(Operation{ + Type: OpStopContainer, + ResourceID: "service:web:1", + Cause: "scale down", + Container: &ctr, + }, "") + plan.addNode(Operation{ + Type: OpRemoveContainer, + ResourceID: "service:web:1", + Cause: "scale down", + Container: &ctr, + }, "", stopNode) + + err := svc.executePlan(t.Context(), &types.Project{Name: "test"}, plan) + assert.NilError(t, err) +} + +// notFoundError implements the errdefs.ErrNotFound interface for test mocks. +type notFoundError struct{} + +func (notFoundError) Error() string { return "not found" } +func (notFoundError) NotFound() {} diff --git a/pkg/compose/observed_state.go b/pkg/compose/observed_state.go new file mode 100644 index 0000000000..988c04e5e4 --- /dev/null +++ b/pkg/compose/observed_state.go @@ -0,0 +1,241 @@ +/* + Copyright 2020 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package compose + +import ( + "context" + "strconv" + "strings" + + "github.com/compose-spec/compose-go/v2/types" + "github.com/moby/moby/api/types/container" + "github.com/moby/moby/client" + + "github.com/docker/compose/v5/pkg/api" +) + +// ObservedState captures the current state of all Docker resources belonging to +// a Compose project. It is a snapshot taken before reconciliation so that the +// reconciler can compare desired state (types.Project) with reality without +// making any further API calls. +type ObservedState struct { + ProjectName string + Containers map[string][]ObservedContainer // service name → containers + Orphans []ObservedContainer // containers with no matching service + Networks map[string]ObservedNetwork // compose network key → observed + Volumes map[string]ObservedVolume // compose volume key → observed +} + +// ObservedContainer holds the relevant state extracted from a running or stopped +// container, with label values pre-parsed for efficient comparison. +type ObservedContainer struct { + ID string + Name string + State container.ContainerState // "running", "exited", "created", "restarting", etc. + ConfigHash string // label com.docker.compose.config-hash + ImageDigest string // label com.docker.compose.image + Number int // label com.docker.compose.container-number + + // ConnectedNetworks maps network IDs found in the container's network + // settings. Key is the network name as seen by Docker, value is the + // network ID. + ConnectedNetworks map[string]string + + // Raw summary kept for the executor which needs it to call Moby APIs. + Summary container.Summary +} + +// ObservedNetwork holds the state of a Docker network that belongs to the +// project, identified by the com.docker.compose.network label. +type ObservedNetwork struct { + ID string + Name string + ConfigHash string // label com.docker.compose.config-hash + ProjectName string // label com.docker.compose.project +} + +// ObservedVolume holds the state of a Docker volume that belongs to the +// project, identified by the com.docker.compose.volume label. +type ObservedVolume struct { + Name string + ConfigHash string // label com.docker.compose.config-hash + ProjectName string // label com.docker.compose.project + Driver string +} + +// collectObservedState queries the Docker daemon for all resources belonging to +// the given project and returns a structured snapshot. +// The project model is used to classify containers by service and to identify +// orphans, and to scope network/volume queries to declared resources. +func (s *composeService) collectObservedState(ctx context.Context, project *types.Project) (*ObservedState, error) { + state := &ObservedState{ + ProjectName: project.Name, + Containers: map[string][]ObservedContainer{}, + Networks: map[string]ObservedNetwork{}, + Volumes: map[string]ObservedVolume{}, + } + + // --- Containers --- + // Use oneOffInclude to detect orphaned one-off containers (matching the + // previous behavior of create() which used oneOffInclude + isOrphaned). + raw, err := s.getContainers(ctx, project.Name, oneOffInclude, true) + if err != nil { + return nil, err + } + + knownServices := map[string]bool{} + for _, svc := range project.Services { + knownServices[svc.Name] = true + state.Containers[svc.Name] = nil // ensure key exists even if empty + } + for _, ds := range project.DisabledServices { + knownServices[ds.Name] = true + } + + for _, c := range raw { + svcName := c.Labels[api.ServiceLabel] + if isNotOneOff(c) && knownServices[svcName] { + state.Containers[svcName] = append(state.Containers[svcName], toObservedContainer(c)) + } else if isOrphaned(project)(c) { + state.Orphans = append(state.Orphans, toObservedContainer(c)) + } + } + + // --- Networks --- + nwList, err := s.apiClient().NetworkList(ctx, client.NetworkListOptions{ + Filters: projectFilter(project.Name), + }) + if err != nil { + return nil, err + } + for _, nw := range nwList.Items { + key := nw.Labels[api.NetworkLabel] + if key == "" { + continue + } + state.Networks[key] = ObservedNetwork{ + ID: nw.ID, + Name: nw.Name, + ConfigHash: nw.Labels[api.ConfigHashLabel], + ProjectName: nw.Labels[api.ProjectLabel], + } + } + + // --- Volumes --- + volList, err := s.apiClient().VolumeList(ctx, client.VolumeListOptions{ + Filters: projectFilter(project.Name), + }) + if err != nil { + return nil, err + } + for _, vol := range volList.Items { + key := vol.Labels[api.VolumeLabel] + if key == "" { + continue + } + state.Volumes[key] = ObservedVolume{ + Name: vol.Name, + ConfigHash: vol.Labels[api.ConfigHashLabel], + ProjectName: vol.Labels[api.ProjectLabel], + Driver: vol.Driver, + } + } + + return state, nil +} + +// toObservedContainer extracts the relevant fields from a container.Summary, +// parsing labels into typed values. +func toObservedContainer(c container.Summary) ObservedContainer { + number, _ := strconv.Atoi(c.Labels[api.ContainerNumberLabel]) + + networks := map[string]string{} + if c.NetworkSettings != nil { + for name, settings := range c.NetworkSettings.Networks { + networks[name] = settings.NetworkID + } + } + + return ObservedContainer{ + ID: c.ID, + Name: getCanonicalContainerName(c), + State: c.State, + ConfigHash: c.Labels[api.ConfigHashLabel], + ImageDigest: c.Labels[api.ImageDigestLabel], + Number: number, + ConnectedNetworks: networks, + Summary: c, + } +} + +// setResolvedNetworks injects network IDs already resolved by ensureNetworks +// into the observed state, so the reconciler can compare container connections +// against actual network IDs. +func (s *ObservedState) setResolvedNetworks(networks map[string]string, project *types.Project) { + for key, id := range networks { + if obs, exists := s.Networks[key]; exists { + obs.ID = id + s.Networks[key] = obs + } else { + nw := project.Networks[key] + s.Networks[key] = ObservedNetwork{ID: id, Name: nw.Name} + } + } +} + +// setResolvedVolumes injects volume names already resolved by ensureProjectVolumes +// into the observed state. +func (s *ObservedState) setResolvedVolumes(volumes map[string]string) { + for key, id := range volumes { + if obs, exists := s.Volumes[key]; exists { + obs.Name = id + s.Volumes[key] = obs + } else { + s.Volumes[key] = ObservedVolume{Name: id} + } + } +} + +// emitRunningEvents emits "Running" progress events for containers that are already +// running and have no operations planned for them. This matches the previous behavior +// where convergence.ensureService emitted runningEvent for up-to-date containers. +func emitRunningEvents(observed *ObservedState, plan *Plan, events api.EventProcessor) { + // Collect all container IDs that appear in the plan + planned := map[string]bool{} + for _, node := range plan.Nodes { + if node.Operation.Container != nil { + planned[node.Operation.Container.ID] = true + } + } + + for _, containers := range observed.Containers { + for _, oc := range containers { + if oc.State == container.StateRunning && !planned[oc.ID] { + events.On(newEvent("Container "+oc.Name, api.Done, api.StatusRunning)) + } + } + } +} + +// orphanNames returns the names of orphaned containers as a comma-separated string. +func (s *ObservedState) orphanNames() string { + names := make([]string, len(s.Orphans)) + for i, o := range s.Orphans { + names[i] = o.Name + } + return strings.Join(names, ", ") +} diff --git a/pkg/compose/observed_state_test.go b/pkg/compose/observed_state_test.go new file mode 100644 index 0000000000..d75b97eacf --- /dev/null +++ b/pkg/compose/observed_state_test.go @@ -0,0 +1,203 @@ +/* + Copyright 2020 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package compose + +import ( + "testing" + + "github.com/compose-spec/compose-go/v2/types" + "github.com/moby/moby/api/types/container" + "github.com/moby/moby/api/types/network" + "github.com/moby/moby/api/types/volume" + "github.com/moby/moby/client" + "go.uber.org/mock/gomock" + "gotest.tools/v3/assert" + + "github.com/docker/compose/v5/pkg/api" + "github.com/docker/compose/v5/pkg/mocks" +) + +func TestToObservedContainer(t *testing.T) { + c := container.Summary{ + ID: "abc123", + Names: []string{"/testProject-web-1"}, + State: container.StateRunning, + Labels: map[string]string{ + api.ServiceLabel: "web", + api.ConfigHashLabel: "sha256:aaa", + api.ImageDigestLabel: "sha256:bbb", + api.ContainerNumberLabel: "1", + api.ProjectLabel: "testproject", + }, + NetworkSettings: &container.NetworkSettingsSummary{ + Networks: map[string]*network.EndpointSettings{ + "mynet": {NetworkID: "net123"}, + }, + }, + } + + oc := toObservedContainer(c) + + assert.Equal(t, oc.ID, "abc123") + assert.Equal(t, oc.Name, "testProject-web-1") + assert.Equal(t, oc.State, container.StateRunning) + assert.Equal(t, oc.ConfigHash, "sha256:aaa") + assert.Equal(t, oc.ImageDigest, "sha256:bbb") + assert.Equal(t, oc.Number, 1) + assert.Equal(t, oc.ConnectedNetworks["mynet"], "net123") + assert.Equal(t, oc.Summary.ID, "abc123") +} + +func TestToObservedContainerNoNetworkSettings(t *testing.T) { + c := container.Summary{ + ID: "def456", + Names: []string{"/testProject-db-1"}, + State: container.StateExited, + Labels: map[string]string{}, + } + + oc := toObservedContainer(c) + + assert.Equal(t, oc.ID, "def456") + assert.Equal(t, oc.Number, 0) + assert.Equal(t, oc.ConfigHash, "") + assert.Equal(t, oc.ImageDigest, "") + assert.Equal(t, len(oc.ConnectedNetworks), 0) +} + +func TestCollectObservedState(t *testing.T) { + mockCtrl := gomock.NewController(t) + + apiClient := mocks.NewMockAPIClient(mockCtrl) + cli := mocks.NewMockCli(mockCtrl) + tested, err := NewComposeService(cli) + assert.NilError(t, err) + cli.EXPECT().Client().Return(apiClient).AnyTimes() + + project := &types.Project{ + Name: "myproject", + Services: types.Services{ + "web": {Name: "web"}, + "db": {Name: "db"}, + }, + Networks: types.Networks{ + "frontend": {Name: "myproject_frontend"}, + }, + Volumes: types.Volumes{ + "data": {Name: "myproject_data"}, + }, + } + + // Mock ContainerList + apiClient.EXPECT().ContainerList(gomock.Any(), gomock.Any()).Return(client.ContainerListResult{ + Items: []container.Summary{ + { + ID: "c1", + Names: []string{"/myproject-web-1"}, + State: container.StateRunning, + Labels: map[string]string{ + api.ServiceLabel: "web", + api.ProjectLabel: "myproject", + api.ConfigHashLabel: "hash1", + api.ContainerNumberLabel: "1", + api.OneoffLabel: "False", + }, + }, + { + ID: "c2", + Names: []string{"/myproject-db-1"}, + State: container.StateRunning, + Labels: map[string]string{ + api.ServiceLabel: "db", + api.ProjectLabel: "myproject", + api.ConfigHashLabel: "hash2", + api.ContainerNumberLabel: "1", + api.OneoffLabel: "False", + }, + }, + { + ID: "c3", + Names: []string{"/myproject-old-1"}, + State: container.StateExited, + Labels: map[string]string{ + api.ServiceLabel: "old", + api.ProjectLabel: "myproject", + api.ConfigHashLabel: "hash3", + api.ContainerNumberLabel: "1", + api.OneoffLabel: "False", + }, + }, + }, + }, nil) + + // Mock NetworkList + apiClient.EXPECT().NetworkList(gomock.Any(), gomock.Any()).Return(client.NetworkListResult{ + Items: []network.Summary{ + {Network: network.Network{ + ID: "net1", + Name: "myproject_frontend", + Labels: map[string]string{ + api.NetworkLabel: "frontend", + api.ProjectLabel: "myproject", + api.ConfigHashLabel: "nethash1", + }, + }}, + }, + }, nil) + + // Mock VolumeList + apiClient.EXPECT().VolumeList(gomock.Any(), gomock.Any()).Return(client.VolumeListResult{ + Items: []volume.Volume{ + { + Name: "myproject_data", + Driver: "local", + Labels: map[string]string{ + api.VolumeLabel: "data", + api.ProjectLabel: "myproject", + api.ConfigHashLabel: "volhash1", + }, + }, + }, + }, nil) + + state, err := tested.(*composeService).collectObservedState(t.Context(), project) + assert.NilError(t, err) + + // Containers classified by service + assert.Equal(t, len(state.Containers["web"]), 1) + assert.Equal(t, state.Containers["web"][0].ID, "c1") + assert.Equal(t, len(state.Containers["db"]), 1) + assert.Equal(t, state.Containers["db"][0].ID, "c2") + + // Orphan container (service "old" not in project) + assert.Equal(t, len(state.Orphans), 1) + assert.Equal(t, state.Orphans[0].ID, "c3") + + // Networks + assert.Equal(t, len(state.Networks), 1) + nw := state.Networks["frontend"] + assert.Equal(t, nw.ID, "net1") + assert.Equal(t, nw.Name, "myproject_frontend") + assert.Equal(t, nw.ConfigHash, "nethash1") + + // Volumes + assert.Equal(t, len(state.Volumes), 1) + vol := state.Volumes["data"] + assert.Equal(t, vol.Name, "myproject_data") + assert.Equal(t, vol.Driver, "local") + assert.Equal(t, vol.ConfigHash, "volhash1") +} diff --git a/pkg/compose/plan.go b/pkg/compose/plan.go new file mode 100644 index 0000000000..0755f05f70 --- /dev/null +++ b/pkg/compose/plan.go @@ -0,0 +1,175 @@ +/* + Copyright 2020 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package compose + +import ( + "fmt" + "sort" + "strconv" + "strings" + "time" + + "github.com/compose-spec/compose-go/v2/types" + "github.com/moby/moby/api/types/container" +) + +// OperationType identifies the kind of atomic operation in a reconciliation plan. +// Each operation maps to exactly one Docker API call. +type OperationType int + +const ( + // Network operations + OpCreateNetwork OperationType = iota + OpRemoveNetwork + OpDisconnectNetwork + OpConnectNetwork + + // Volume operations + OpCreateVolume + OpRemoveVolume + + // Container operations + OpCreateContainer + OpStartContainer + OpStopContainer + OpRemoveContainer + OpRenameContainer + + // Provider operations + OpRunProvider +) + +// String returns the human-readable name of an OperationType. +func (o OperationType) String() string { + switch o { + case OpCreateNetwork: + return "CreateNetwork" + case OpRemoveNetwork: + return "RemoveNetwork" + case OpDisconnectNetwork: + return "DisconnectNetwork" + case OpConnectNetwork: + return "ConnectNetwork" + case OpCreateVolume: + return "CreateVolume" + case OpRemoveVolume: + return "RemoveVolume" + case OpCreateContainer: + return "CreateContainer" + case OpStartContainer: + return "StartContainer" + case OpStopContainer: + return "StopContainer" + case OpRemoveContainer: + return "RemoveContainer" + case OpRenameContainer: + return "RenameContainer" + case OpRunProvider: + return "RunProvider" + default: + return fmt.Sprintf("Unknown(%d)", int(o)) + } +} + +// Operation describes a single atomic action to be performed by the executor. +// It carries all the data needed to execute the operation without further +// decision-making. +type Operation struct { + Type OperationType + ResourceID string // e.g. "service:web:1", "network:backend", "volume:data" + Cause string // why this operation is needed + + // Resource-specific data (only the relevant fields are set per operation type) + Service *types.ServiceConfig // for container operations + Container *container.Summary // existing container (for stop/remove) + Inherited *container.Summary // container to inherit anonymous volumes from (for create-as-replacement) + Number int // container replica number (for create) + Name string // target container/resource name + Network *types.NetworkConfig // for network operations + Volume *types.VolumeConfig // for volume operations + Timeout *time.Duration // for stop operations +} + +// PlanNode is a single node in the reconciliation DAG. It represents one +// atomic operation and its dependencies on other nodes. +type PlanNode struct { + ID int // numeric identifier (#1, #2, ...) + Operation Operation + DependsOn []*PlanNode // prerequisite operations + Group string // event grouping key (e.g. "recreate:web:1"); empty for ungrouped nodes +} + +// Plan is a directed acyclic graph of operations produced by the reconciler. +// Nodes are stored in topological order (dependencies before dependents). +type Plan struct { + Nodes []*PlanNode + nextID int +} + +// addNode appends a new node to the plan and returns it. +func (p *Plan) addNode(op Operation, group string, deps ...*PlanNode) *PlanNode { + p.nextID++ + node := &PlanNode{ + ID: p.nextID, + Operation: op, + DependsOn: deps, + Group: group, + } + p.Nodes = append(p.Nodes, node) + return node +} + +// String renders the plan as a human-readable graph for testing and debugging. +// +// Format per line: [dep1,dep2] -> #id resource, operation, cause [group] +// +// Examples: +// +// [] -> #1 network:default, CreateNetwork, not found +// [1] -> #2 service:web:1, CreateContainer, no existing container +// [2] -> #3 service:web:1, StopContainer, replaced by #2 [recreate:web:1] +func (p *Plan) String() string { + var sb strings.Builder + for _, node := range p.Nodes { + depIDs := make([]int, len(node.DependsOn)) + for i, d := range node.DependsOn { + depIDs[i] = d.ID + } + sort.Ints(depIDs) + deps := make([]string, len(depIDs)) + for i, id := range depIDs { + deps[i] = strconv.Itoa(id) + } + fmt.Fprintf(&sb, "[%s] -> #%d %s, %s, %s", + strings.Join(deps, ","), + node.ID, + node.Operation.ResourceID, + node.Operation.Type, + node.Operation.Cause, + ) + if node.Group != "" { + fmt.Fprintf(&sb, " [%s]", node.Group) + } + sb.WriteByte('\n') + } + return sb.String() +} + +// IsEmpty returns true if the plan contains no operations. +func (p *Plan) IsEmpty() bool { + return len(p.Nodes) == 0 +} diff --git a/pkg/compose/plan_test.go b/pkg/compose/plan_test.go new file mode 100644 index 0000000000..d0e9999c90 --- /dev/null +++ b/pkg/compose/plan_test.go @@ -0,0 +1,139 @@ +/* + Copyright 2020 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package compose + +import ( + "testing" + + "gotest.tools/v3/assert" +) + +func TestOperationTypeString(t *testing.T) { + tests := []struct { + op OperationType + want string + }{ + {OpCreateNetwork, "CreateNetwork"}, + {OpRemoveNetwork, "RemoveNetwork"}, + {OpDisconnectNetwork, "DisconnectNetwork"}, + {OpConnectNetwork, "ConnectNetwork"}, + {OpCreateVolume, "CreateVolume"}, + {OpRemoveVolume, "RemoveVolume"}, + {OpCreateContainer, "CreateContainer"}, + {OpStartContainer, "StartContainer"}, + {OpStopContainer, "StopContainer"}, + {OpRemoveContainer, "RemoveContainer"}, + {OpRenameContainer, "RenameContainer"}, + {OperationType(999), "Unknown(999)"}, + } + for _, tt := range tests { + assert.Equal(t, tt.op.String(), tt.want) + } +} + +func TestPlanStringEmpty(t *testing.T) { + p := &Plan{} + assert.Equal(t, p.String(), "") + assert.Assert(t, p.IsEmpty()) +} + +func TestPlanStringNoDeps(t *testing.T) { + p := &Plan{} + p.addNode(Operation{ + Type: OpCreateNetwork, + ResourceID: "network:default", + Cause: "not found", + }, "") + p.addNode(Operation{ + Type: OpCreateVolume, + ResourceID: "volume:data", + Cause: "not found", + }, "") + + expected := "[] -> #1 network:default, CreateNetwork, not found\n" + + "[] -> #2 volume:data, CreateVolume, not found\n" + assert.Equal(t, p.String(), expected) + assert.Assert(t, !p.IsEmpty()) +} + +func TestPlanStringWithDeps(t *testing.T) { + p := &Plan{} + nw := p.addNode(Operation{ + Type: OpCreateNetwork, + ResourceID: "network:default", + Cause: "not found", + }, "") + vol := p.addNode(Operation{ + Type: OpCreateVolume, + ResourceID: "volume:data", + Cause: "not found", + }, "") + p.addNode(Operation{ + Type: OpCreateContainer, + ResourceID: "service:db:1", + Cause: "no existing container", + }, "", nw, vol) + + expected := "[] -> #1 network:default, CreateNetwork, not found\n" + + "[] -> #2 volume:data, CreateVolume, not found\n" + + "[1,2] -> #3 service:db:1, CreateContainer, no existing container\n" + assert.Equal(t, p.String(), expected) +} + +func TestPlanStringWithGroup(t *testing.T) { + p := &Plan{} + create := p.addNode(Operation{ + Type: OpCreateContainer, + ResourceID: "service:web:1", + Cause: "config hash changed (tmpName)", + }, "recreate:web:1") + stop := p.addNode(Operation{ + Type: OpStopContainer, + ResourceID: "service:web:1", + Cause: "replaced by #1", + }, "recreate:web:1", create) + remove := p.addNode(Operation{ + Type: OpRemoveContainer, + ResourceID: "service:web:1", + Cause: "replaced by #1", + }, "recreate:web:1", stop) + p.addNode(Operation{ + Type: OpRenameContainer, + ResourceID: "service:web:1", + Cause: "finalize recreate", + }, "recreate:web:1", remove) + + expected := "[] -> #1 service:web:1, CreateContainer, config hash changed (tmpName) [recreate:web:1]\n" + + "[1] -> #2 service:web:1, StopContainer, replaced by #1 [recreate:web:1]\n" + + "[2] -> #3 service:web:1, RemoveContainer, replaced by #1 [recreate:web:1]\n" + + "[3] -> #4 service:web:1, RenameContainer, finalize recreate [recreate:web:1]\n" + assert.Equal(t, p.String(), expected) +} + +func TestPlanAddNodeAutoIncrements(t *testing.T) { + p := &Plan{} + n1 := p.addNode(Operation{Type: OpCreateNetwork, ResourceID: "a", Cause: "x"}, "") + n2 := p.addNode(Operation{Type: OpCreateVolume, ResourceID: "b", Cause: "y"}, "") + n3 := p.addNode(Operation{Type: OpCreateContainer, ResourceID: "c", Cause: "z"}, "", n1, n2) + + assert.Equal(t, n1.ID, 1) + assert.Equal(t, n2.ID, 2) + assert.Equal(t, n3.ID, 3) + assert.Equal(t, len(n3.DependsOn), 2) + assert.Equal(t, n3.DependsOn[0].ID, 1) + assert.Equal(t, n3.DependsOn[1].ID, 2) +} diff --git a/pkg/compose/progress.go b/pkg/compose/progress.go index 26f9b5d859..d753c52f03 100644 --- a/pkg/compose/progress.go +++ b/pkg/compose/progress.go @@ -82,11 +82,6 @@ func restartingEvent(id string) api.Resource { return newEvent(id, api.Working, api.StatusRestarting) } -// runningEvent creates a new Running in progress Resource -func runningEvent(id string) api.Resource { - return newEvent(id, api.Done, api.StatusRunning) -} - // createdEvent creates a new Created (done) Resource func createdEvent(id string) api.Resource { return newEvent(id, api.Done, api.StatusCreated) diff --git a/pkg/compose/reconcile.go b/pkg/compose/reconcile.go new file mode 100644 index 0000000000..aab5c2b097 --- /dev/null +++ b/pkg/compose/reconcile.go @@ -0,0 +1,750 @@ +/* + Copyright 2020 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package compose + +import ( + "context" + "fmt" + "slices" + "sort" + "time" + + "github.com/compose-spec/compose-go/v2/types" + "github.com/moby/moby/api/types/container" + mmount "github.com/moby/moby/api/types/mount" + + "github.com/docker/compose/v5/pkg/api" +) + +// toReconcileOptions maps api.CreateOptions to ReconcileOptions. +func toReconcileOptions(options api.CreateOptions) ReconcileOptions { + return ReconcileOptions{ + Services: options.Services, + Recreate: options.Recreate, + RecreateDependencies: options.RecreateDependencies, + Inherit: options.Inherit, + Timeout: options.Timeout, + RemoveOrphans: options.RemoveOrphans, + SkipProviders: options.SkipProviders, + } +} + +// ReconcileOptions controls how the reconciler compares desired and observed state. +type ReconcileOptions struct { + Services []string // targeted services (empty = all) + Recreate string // "diverged", "force", "never" for targeted services + RecreateDependencies string // same for non-targeted services + Inherit bool // inherit anonymous volumes on recreate + Timeout *time.Duration // for stop operations + RemoveOrphans bool + SkipProviders bool +} + +// reconciler compares a types.Project (desired state) with an ObservedState +// (actual state) and produces a Plan — a DAG of atomic operations. +type reconciler struct { + project *types.Project + observed *ObservedState + options ReconcileOptions + prompt Prompt + plan *Plan + + // networkNodes and volumeNodes track the last plan node for each + // network/volume, so container creation nodes can depend on them. + networkNodes map[string]*PlanNode // compose network key → create node + volumeNodes map[string]*PlanNode // compose volume key → create node + // serviceNodes tracks the last plan node per service, so dependent + // services can order their operations after dependencies. + serviceNodes map[string]*PlanNode +} + +// reconcile is the main entry point: it builds a Plan from desired vs observed state. +// The prompt function is called for interactive decisions (e.g. volume divergence). +func reconcile(_ context.Context, project *types.Project, observed *ObservedState, options ReconcileOptions, prompt Prompt) (*Plan, error) { + r := &reconciler{ + project: project, + observed: observed, + options: options, + prompt: prompt, + plan: &Plan{}, + networkNodes: map[string]*PlanNode{}, + volumeNodes: map[string]*PlanNode{}, + serviceNodes: map[string]*PlanNode{}, + } + + if err := r.reconcileNetworks(); err != nil { + return nil, err + } + + if err := r.reconcileVolumes(); err != nil { + return nil, err + } + + if err := r.reconcileContainers(); err != nil { + return nil, err + } + + if r.options.RemoveOrphans { + r.reconcileOrphans() + } + + return r.plan, nil +} + +// reconcileNetworks adds plan nodes for network creation or recreation. +func (r *reconciler) reconcileNetworks() error { + for _, key := range sortedKeys(r.project.Networks) { + desired := r.project.Networks[key] + if desired.External { + continue + } + observed, exists := r.observed.Networks[key] + if !exists { + r.planCreateNetwork(key, &desired) + continue + } + + expectedHash, err := NetworkHash(&desired) + if err != nil { + return err + } + if observed.ConfigHash != "" && observed.ConfigHash != expectedHash { + if err := r.planRecreateNetwork(key, &desired); err != nil { + return err + } + } + // else: network exists and config matches, nothing to do + } + return nil +} + +// planCreateNetwork adds a single CreateNetwork node and records it for dependency tracking. +func (r *reconciler) planCreateNetwork(key string, nw *types.NetworkConfig) *PlanNode { + node := r.plan.addNode(Operation{ + Type: OpCreateNetwork, + ResourceID: fmt.Sprintf("network:%s", key), + Cause: "not found", + Name: nw.Name, + Network: nw, + }, "") + r.networkNodes[key] = node + return node +} + +// planRecreateNetwork adds the full sequence for a diverged network: +// stop affected containers → disconnect → remove network → create network. +func (r *reconciler) planRecreateNetwork(key string, nw *types.NetworkConfig) error { + observed := r.observed.Networks[key] + affectedServices := r.servicesUsingNetwork(key) + affectedContainers := r.containersForServices(affectedServices) + + // Stop all affected containers + var stopNodes []*PlanNode + for i := range affectedContainers { + oc := &affectedContainers[i] + node := r.plan.addNode(Operation{ + Type: OpStopContainer, + ResourceID: fmt.Sprintf("service:%s:%d", oc.Summary.Labels[serviceLabel], oc.Number), + Cause: fmt.Sprintf("network %s config changed", key), + Container: &oc.Summary, + }, "") + stopNodes = append(stopNodes, node) + } + + // Disconnect all affected containers from the *observed* network (each depends on its own stop) + var disconnectNodes []*PlanNode + for i, oc := range affectedContainers { + node := r.plan.addNode(Operation{ + Type: OpDisconnectNetwork, + ResourceID: fmt.Sprintf("service:%s:%d", oc.Summary.Labels[serviceLabel], oc.Number), + Cause: fmt.Sprintf("network %s recreate", key), + Container: &affectedContainers[i].Summary, + Name: observed.Name, + }, "", stopNodes[i]) + disconnectNodes = append(disconnectNodes, node) + } + + // Remove the *observed* network (depends on all disconnects) + removeNode := r.plan.addNode(Operation{ + Type: OpRemoveNetwork, + ResourceID: fmt.Sprintf("network:%s", key), + Cause: "config hash diverged", + Name: observed.Name, + }, "", disconnectNodes...) + + // Create network (depends on remove) + createNode := r.plan.addNode(Operation{ + Type: OpCreateNetwork, + ResourceID: fmt.Sprintf("network:%s", key), + Cause: "recreate after config change", + Name: nw.Name, + Network: nw, + }, "", removeNode) + r.networkNodes[key] = createNode + + return nil +} + +// reconcileVolumes adds plan nodes for volume creation or recreation. +func (r *reconciler) reconcileVolumes() error { + for _, key := range sortedKeys(r.project.Volumes) { + desired := r.project.Volumes[key] + if desired.External { + continue + } + observed, exists := r.observed.Volumes[key] + if !exists { + r.planCreateVolume(key, &desired) + continue + } + + expectedHash, err := VolumeHash(desired) + if err != nil { + return err + } + if observed.ConfigHash != "" && observed.ConfigHash != expectedHash { + confirmed, err := r.prompt( + fmt.Sprintf("Volume %q exists but doesn't match configuration in compose file. Recreate (data will be lost)?", desired.Name), + false, + ) + if err != nil { + return err + } + if confirmed { + r.planRecreateVolume(key, &desired) + } + } + // else: volume exists and config matches, nothing to do + } + return nil +} + +// planCreateVolume adds a single CreateVolume node and records it for dependency tracking. +func (r *reconciler) planCreateVolume(key string, vol *types.VolumeConfig) *PlanNode { + node := r.plan.addNode(Operation{ + Type: OpCreateVolume, + ResourceID: fmt.Sprintf("volume:%s", key), + Cause: "not found", + Name: vol.Name, + Volume: vol, + }, "") + r.volumeNodes[key] = node + return node +} + +// planRecreateVolume adds the full sequence for a diverged volume: +// stop affected containers → remove containers → remove volume → create volume. +// Containers must be removed (not just stopped) because Docker does not allow +// removing a volume that is referenced by any container, even a stopped one. +func (r *reconciler) planRecreateVolume(key string, vol *types.VolumeConfig) { + observed := r.observed.Volumes[key] + affectedServices := r.servicesUsingVolume(key) + affectedContainers := r.containersForServices(affectedServices) + + // Stop all affected containers + var stopNodes []*PlanNode + for i := range affectedContainers { + oc := &affectedContainers[i] + node := r.plan.addNode(Operation{ + Type: OpStopContainer, + ResourceID: fmt.Sprintf("service:%s:%d", oc.Summary.Labels[serviceLabel], oc.Number), + Cause: fmt.Sprintf("volume %s config changed", key), + Container: &oc.Summary, + }, "") + stopNodes = append(stopNodes, node) + } + + // Remove all affected containers (each depends on its own stop) + var removeNodes []*PlanNode + for i, oc := range affectedContainers { + node := r.plan.addNode(Operation{ + Type: OpRemoveContainer, + ResourceID: fmt.Sprintf("service:%s:%d", oc.Summary.Labels[serviceLabel], oc.Number), + Cause: fmt.Sprintf("volume %s config changed", key), + Container: &affectedContainers[i].Summary, + }, "", stopNodes[i]) + removeNodes = append(removeNodes, node) + } + + // Remove the *observed* volume (depends on all container removals) + removeVolNode := r.plan.addNode(Operation{ + Type: OpRemoveVolume, + ResourceID: fmt.Sprintf("volume:%s", key), + Cause: "config hash diverged", + Name: observed.Name, + }, "", removeNodes...) + + // Create volume (depends on remove) + createNode := r.plan.addNode(Operation{ + Type: OpCreateVolume, + ResourceID: fmt.Sprintf("volume:%s", key), + Cause: "recreate after config change", + Name: vol.Name, + Volume: vol, + }, "", removeVolNode) + r.volumeNodes[key] = createNode +} + +// servicesUsingNetwork returns the names of services that reference the given +// compose network key, sorted for deterministic plan output. +func (r *reconciler) servicesUsingNetwork(networkKey string) []string { + var names []string + for _, key := range sortedKeys(r.project.Services) { + svc := r.project.Services[key] + if _, ok := svc.Networks[networkKey]; ok { + names = append(names, svc.Name) + } + } + return names +} + +// servicesUsingVolume returns the names of services that mount the given +// compose volume key, sorted for deterministic plan output. +func (r *reconciler) servicesUsingVolume(volumeKey string) []string { + var names []string + for _, key := range sortedKeys(r.project.Services) { + svc := r.project.Services[key] + for _, v := range svc.Volumes { + if v.Source == volumeKey { + names = append(names, svc.Name) + break + } + } + } + return names +} + +// containersForServices returns all observed containers belonging to the given +// service names. +func (r *reconciler) containersForServices(services []string) []ObservedContainer { + var result []ObservedContainer + for _, svc := range services { + result = append(result, r.observed.Containers[svc]...) + } + return result +} + +// reconcileContainers processes each service in dependency order, comparing +// the desired scale and configuration with observed containers. +func (r *reconciler) reconcileContainers() error { + // Build dependency graph and process in order + graph, err := NewGraph(r.project, ServiceStopped) + if err != nil { + return err + } + + // Visit in dependency order (leaves first = services with no deps) + return r.visitInDependencyOrder(graph) +} + +// visitInDependencyOrder processes services from leaves to roots so that +// dependencies are reconciled before the services that depend on them. +func (r *reconciler) visitInDependencyOrder(g *Graph) error { + visited := map[string]bool{} + // Sort vertex keys for deterministic plan output in tests + keys := sortedKeys(g.Vertices) + for { + // Find a vertex whose all children are visited + var next *Vertex + for _, k := range keys { + v := g.Vertices[k] + if visited[v.Key] { + continue + } + allChildrenVisited := true + for _, child := range v.Children { + if !visited[child.Key] { + allChildrenVisited = false + break + } + } + if allChildrenVisited { + next = v + break + } + } + if next == nil { + break // all visited + } + visited[next.Key] = true + + service, err := r.project.GetService(next.Service) + if err != nil { + return err + } + if err := r.reconcileService(service); err != nil { + return err + } + } + return nil +} + +// reconcileService handles a single service: scale down, recreate diverged, +// start stopped, scale up. +func (r *reconciler) reconcileService(service types.ServiceConfig) error { + if service.Provider != nil && r.options.SkipProviders { + return nil + } + if service.Provider != nil { + svc := service + deps := r.infrastructureDeps(service) + node := r.plan.addNode(Operation{ + Type: OpRunProvider, + ResourceID: fmt.Sprintf("provider:%s", service.Name), + Cause: "provider service", + Service: &svc, + }, "", deps...) + r.serviceNodes[service.Name] = node + return nil + } + + expected, err := getScale(service) + if err != nil { + return err + } + + containers := r.observed.Containers[service.Name] + actual := len(containers) + + strategy := r.options.RecreateDependencies + if slices.Contains(r.options.Services, service.Name) || len(r.options.Services) == 0 { + strategy = r.options.Recreate + } + + // Sort containers: obsolete first, then by number descending, then reverse + // to get the same ordering as the existing convergence code. + r.sortContainers(containers, service, strategy) + + // Collect dependency nodes that container creation should depend on + infraDeps := r.infrastructureDeps(service) + + var lastNode *PlanNode + + // Process existing containers + for i, oc := range containers { + if i >= expected { + // Scale down: stop + remove excess containers + stopNode := r.plan.addNode(Operation{ + Type: OpStopContainer, + ResourceID: fmt.Sprintf("service:%s:%d", service.Name, oc.Number), + Cause: "scale down", + Container: &containers[i].Summary, + Timeout: r.options.Timeout, + }, "") + r.plan.addNode(Operation{ + Type: OpRemoveContainer, + ResourceID: fmt.Sprintf("service:%s:%d", service.Name, oc.Number), + Cause: "scale down", + Container: &containers[i].Summary, + }, "", stopNode) + continue + } + + recreate, err := r.mustRecreate(service, oc, strategy) + if err != nil { + return err + } + if recreate { + lastNode = r.planRecreateContainer(service, &containers[i], infraDeps) + continue + } + + // Container is up-to-date + switch oc.State { + case container.StateRunning, container.StateCreated, container.StateRestarting, container.StateExited: + // Nothing to do (exited containers are left as-is, matching convergence.go behavior) + default: + // Stopped/exited container that needs starting + lastNode = r.plan.addNode(Operation{ + Type: OpStartContainer, + ResourceID: fmt.Sprintf("service:%s:%d", service.Name, oc.Number), + Cause: "not running", + Container: &containers[i].Summary, + }, "", infraDeps...) + } + } + + // Scale up: create new containers + nextNum := nextContainerNumber(r.observedSummaries(service.Name)) + for i := 0; i < expected-actual; i++ { + number := nextNum + i + name := getContainerName(r.project.Name, service, number) + svc := service // copy for pointer stability + lastNode = r.plan.addNode(Operation{ + Type: OpCreateContainer, + ResourceID: fmt.Sprintf("service:%s:%d", service.Name, number), + Cause: "no existing container", + Service: &svc, + Number: number, + Name: name, + }, "", infraDeps...) + } + + if lastNode != nil { + r.serviceNodes[service.Name] = lastNode + } + return nil +} + +// mustRecreate mirrors the existing convergence.mustRecreate logic. +func (r *reconciler) mustRecreate(expected types.ServiceConfig, oc ObservedContainer, policy string) (bool, error) { + if policy == api.RecreateNever { + return false, nil + } + if policy == api.RecreateForce { + return true, nil + } + configHash, err := ServiceHash(expected) + if err != nil { + return false, err + } + if oc.ConfigHash != configHash { + return true, nil + } + if oc.ImageDigest != expected.CustomLabels[api.ImageDigestLabel] { + return true, nil + } + + if oc.State == container.StateRunning && r.hasNetworkMismatch(expected, oc) { + return true, nil + } + if r.hasVolumeMismatch(expected, oc) { + return true, nil + } + + return false, nil +} + +// hasNetworkMismatch checks if the container is not connected to all expected networks. +func (r *reconciler) hasNetworkMismatch(expected types.ServiceConfig, oc ObservedContainer) bool { + for _, net := range sortedKeys(expected.Networks) { + expectedID := "" + if obs, ok := r.observed.Networks[net]; ok { + expectedID = obs.ID + } + if expectedID == "" || expectedID == "swarm" { + continue + } + found := false + for _, netID := range oc.ConnectedNetworks { + if netID == expectedID { + found = true + break + } + } + if !found { + return true + } + } + return false +} + +// hasVolumeMismatch checks if the container is missing any expected volume mounts. +func (r *reconciler) hasVolumeMismatch(expected types.ServiceConfig, oc ObservedContainer) bool { + for _, vol := range expected.Volumes { + if vol.Type != string(mmount.TypeVolume) || vol.Source == "" { + continue + } + expectedName := "" + if obs, ok := r.observed.Volumes[vol.Source]; ok { + expectedName = obs.Name + } + if expectedName == "" { + continue + } + found := false + for _, m := range oc.Summary.Mounts { + if m.Type == mmount.TypeVolume && m.Name == expectedName { + found = true + break + } + } + if !found { + return true + } + } + return false +} + +// planRecreateContainer decomposes container recreation into 4 atomic operations: +// CreateContainer(tmpName) → StopContainer → RemoveContainer → RenameContainer +func (r *reconciler) planRecreateContainer(service types.ServiceConfig, oc *ObservedContainer, infraDeps []*PlanNode) *PlanNode { + resID := fmt.Sprintf("service:%s:%d", service.Name, oc.Number) + group := fmt.Sprintf("recreate:%s:%d", service.Name, oc.Number) + tmpName := fmt.Sprintf("%s_%s", oc.ID[:min(12, len(oc.ID))], getContainerName(r.project.Name, service, oc.Number)) + svc := service // copy for pointer stability + + // Stop dependents first + depStopNodes := r.planStopDependents(service) + + // All deps: infrastructure + dependent stops + allDeps := append(slices.Clone(infraDeps), depStopNodes...) + + var inherited *container.Summary + if r.options.Inherit { + inherited = &oc.Summary + } + + // 1. Create new container with temporary name + createNode := r.plan.addNode(Operation{ + Type: OpCreateContainer, + ResourceID: resID, + Cause: "config changed (tmpName)", + Service: &svc, + Inherited: inherited, + Number: oc.Number, + Name: tmpName, + }, group, allDeps...) + + // 2. Stop old container + stopNode := r.plan.addNode(Operation{ + Type: OpStopContainer, + ResourceID: resID, + Cause: fmt.Sprintf("replaced by #%d", createNode.ID), + Container: &oc.Summary, + Timeout: r.options.Timeout, + }, group, createNode) + + // 3. Remove old container + removeNode := r.plan.addNode(Operation{ + Type: OpRemoveContainer, + ResourceID: resID, + Cause: fmt.Sprintf("replaced by #%d", createNode.ID), + Container: &oc.Summary, + }, group, stopNode) + + // 4. Rename to final name + finalName := getContainerName(r.project.Name, service, oc.Number) + renameNode := r.plan.addNode(Operation{ + Type: OpRenameContainer, + ResourceID: resID, + Cause: "finalize recreate", + Name: finalName, + }, group, removeNode) + + return renameNode +} + +// planStopDependents plans stop operations for containers of services that +// depend on the given service with restart: true. +func (r *reconciler) planStopDependents(service types.ServiceConfig) []*PlanNode { + dependents := r.project.GetDependentsForService(service, func(dep types.ServiceDependency) bool { + return dep.Restart + }) + var nodes []*PlanNode + for _, depName := range dependents { + for i, oc := range r.observed.Containers[depName] { + node := r.plan.addNode(Operation{ + Type: OpStopContainer, + ResourceID: fmt.Sprintf("service:%s:%d", depName, oc.Number), + Cause: fmt.Sprintf("dependency %s being recreated", service.Name), + Container: &r.observed.Containers[depName][i].Summary, + Timeout: r.options.Timeout, + }, "") + nodes = append(nodes, node) + } + } + return nodes +} + +// infrastructureDeps returns the plan nodes that a container creation for this +// service should depend on: network creates and volume creates that the service +// references, plus the last node of dependency services. +func (r *reconciler) infrastructureDeps(service types.ServiceConfig) []*PlanNode { + var deps []*PlanNode + // Sort map keys for deterministic plan output in tests + for _, net := range sortedKeys(service.Networks) { + if node, ok := r.networkNodes[net]; ok { + deps = append(deps, node) + } + } + for _, vol := range service.Volumes { + if vol.Type == string(mmount.TypeVolume) && vol.Source != "" { + if node, ok := r.volumeNodes[vol.Source]; ok { + deps = append(deps, node) + } + } + } + for _, depName := range sortedKeys(service.DependsOn) { + if node, ok := r.serviceNodes[depName]; ok && node != nil { + deps = append(deps, node) + } + } + return deps +} + +// sortContainers sorts containers the same way as convergence.go:138-160: +// obsolete first, then by container number descending, then reversed. +func (r *reconciler) sortContainers(containers []ObservedContainer, service types.ServiceConfig, policy string) { + sort.Slice(containers, func(i, j int) bool { + obsi, _ := r.mustRecreate(service, containers[i], policy) + obsj, _ := r.mustRecreate(service, containers[j], policy) + if obsi != obsj { + return obsi // obsolete first + } + // preserve low container numbers + if containers[i].Number != containers[j].Number { + return containers[i].Number > containers[j].Number + } + return containers[i].Summary.Created < containers[j].Summary.Created + }) + slices.Reverse(containers) +} + +// reconcileOrphans plans stop + remove for orphaned containers. +func (r *reconciler) reconcileOrphans() { + for i, oc := range r.observed.Orphans { + stopNode := r.plan.addNode(Operation{ + Type: OpStopContainer, + ResourceID: fmt.Sprintf("orphan:%s", oc.Name), + Cause: "orphaned container", + Container: &r.observed.Orphans[i].Summary, + Timeout: r.options.Timeout, + }, "") + r.plan.addNode(Operation{ + Type: OpRemoveContainer, + ResourceID: fmt.Sprintf("orphan:%s", oc.Name), + Cause: "orphaned container", + Container: &r.observed.Orphans[i].Summary, + }, "", stopNode) + } +} + +// observedSummaries returns the raw container.Summary list for a service, +// needed by nextContainerNumber which expects []container.Summary. +func (r *reconciler) observedSummaries(serviceName string) []container.Summary { + ocs := r.observed.Containers[serviceName] + result := make([]container.Summary, len(ocs)) + for i, oc := range ocs { + result[i] = oc.Summary + } + return result +} + +// sortedKeys returns the keys of a map sorted alphabetically. +// This ensures deterministic iteration order for reproducible plan output in tests. +func sortedKeys[V any](m map[string]V) []string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + sort.Strings(keys) + return keys +} + +// serviceLabel is a package-level shorthand for the service label key. +const serviceLabel = "com.docker.compose.service" diff --git a/pkg/compose/reconcile_test.go b/pkg/compose/reconcile_test.go new file mode 100644 index 0000000000..ff0c8516ae --- /dev/null +++ b/pkg/compose/reconcile_test.go @@ -0,0 +1,668 @@ +/* + Copyright 2020 Docker Compose CLI authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package compose + +import ( + "strings" + "testing" + + "github.com/compose-spec/compose-go/v2/types" + "github.com/moby/moby/api/types/container" + "gotest.tools/v3/assert" + + "github.com/docker/compose/v5/pkg/api" +) + +// noPrompt is a Prompt that should never be called in tests that don't expect it. +func noPrompt(msg string, _ bool) (bool, error) { + panic("unexpected prompt call: " + msg) +} + +func alwaysYesPrompt(string, bool) (bool, error) { return true, nil } +func alwaysNoPrompt(string, bool) (bool, error) { return false, nil } + +func defaultReconcileOptions() ReconcileOptions { + return ReconcileOptions{ + Recreate: api.RecreateDiverged, + RecreateDependencies: api.RecreateDiverged, + Inherit: true, + } +} + +// --- Network tests --- + +func TestReconcileNetworks_CreateMissing(t *testing.T) { + project := &types.Project{ + Name: "myproject", + Networks: types.Networks{ + "frontend": {Name: "myproject_frontend"}, + "backend": {Name: "myproject_backend"}, + }, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{}, + Networks: map[string]ObservedNetwork{}, + Volumes: map[string]ObservedVolume{}, + } + + plan, err := reconcile(t.Context(), project, observed, defaultReconcileOptions(), noPrompt) + assert.NilError(t, err) + + // Sorted alphabetically: backend before frontend + assert.Equal(t, plan.String(), strings.TrimSpace(` +[] -> #1 network:backend, CreateNetwork, not found +[] -> #2 network:frontend, CreateNetwork, not found +`)+"\n") +} + +func TestReconcileNetworks_ExistingMatch(t *testing.T) { + nw := types.NetworkConfig{Name: "myproject_frontend"} + hash, err := NetworkHash(&nw) + assert.NilError(t, err) + + project := &types.Project{ + Name: "myproject", + Networks: types.Networks{"frontend": nw}, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{}, + Networks: map[string]ObservedNetwork{ + "frontend": {ID: "net1", Name: "myproject_frontend", ConfigHash: hash}, + }, + Volumes: map[string]ObservedVolume{}, + } + + plan, err := reconcile(t.Context(), project, observed, defaultReconcileOptions(), noPrompt) + assert.NilError(t, err) + assert.Assert(t, plan.IsEmpty()) +} + +func TestReconcileNetworks_ExternalSkipped(t *testing.T) { + project := &types.Project{ + Name: "myproject", + Networks: types.Networks{ + "ext": {Name: "external_net", External: true}, + }, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{}, + Networks: map[string]ObservedNetwork{}, + Volumes: map[string]ObservedVolume{}, + } + + plan, err := reconcile(t.Context(), project, observed, defaultReconcileOptions(), noPrompt) + assert.NilError(t, err) + assert.Assert(t, plan.IsEmpty()) +} + +func TestReconcileNetworks_Diverged(t *testing.T) { + project := &types.Project{ + Name: "myproject", + Networks: types.Networks{ + "frontend": {Name: "myproject_frontend", Driver: "overlay"}, + }, + Services: types.Services{ + "web": { + Name: "web", + Scale: intPtr(1), + Networks: map[string]*types.ServiceNetworkConfig{"frontend": {}}, + }, + }, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{ + "web": {{ + ID: "c1aabbccddee", Number: 1, State: container.StateRunning, + Summary: container.Summary{ + ID: "c1aabbccddee", + Labels: map[string]string{ + api.ServiceLabel: "web", + api.ContainerNumberLabel: "1", + }, + }, + }}, + }, + Networks: map[string]ObservedNetwork{ + "frontend": {ID: "net1", Name: "myproject_frontend", ConfigHash: "oldhash"}, + }, + Volumes: map[string]ObservedVolume{}, + } + + plan, err := reconcile(t.Context(), project, observed, defaultReconcileOptions(), noPrompt) + assert.NilError(t, err) + + assert.Equal(t, plan.String(), strings.TrimSpace(` +[] -> #1 service:web:1, StopContainer, network frontend config changed +[1] -> #2 service:web:1, DisconnectNetwork, network frontend recreate +[2] -> #3 network:frontend, RemoveNetwork, config hash diverged +[3] -> #4 network:frontend, CreateNetwork, recreate after config change +[4] -> #5 service:web:1, CreateContainer, config changed (tmpName) [recreate:web:1] +[5] -> #6 service:web:1, StopContainer, replaced by #5 [recreate:web:1] +[6] -> #7 service:web:1, RemoveContainer, replaced by #5 [recreate:web:1] +[7] -> #8 service:web:1, RenameContainer, finalize recreate [recreate:web:1] +`)+"\n") +} + +func TestReconcileNetworks_DivergedMultipleServices(t *testing.T) { + project := &types.Project{ + Name: "myproject", + Networks: types.Networks{ + "frontend": {Name: "myproject_frontend", Driver: "overlay"}, + }, + Services: types.Services{ + "web": { + Name: "web", + Scale: intPtr(1), + Networks: map[string]*types.ServiceNetworkConfig{"frontend": {}}, + }, + "api": { + Name: "api", + Scale: intPtr(1), + Networks: map[string]*types.ServiceNetworkConfig{"frontend": {}}, + }, + }, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{ + "web": {{ + ID: "c1aabbccddee", Number: 1, State: container.StateRunning, + Summary: container.Summary{ID: "c1aabbccddee", Labels: map[string]string{api.ServiceLabel: "web", api.ContainerNumberLabel: "1"}}, + }}, + "api": {{ + ID: "c2aabbccddee", Number: 1, State: container.StateRunning, + Summary: container.Summary{ID: "c2aabbccddee", Labels: map[string]string{api.ServiceLabel: "api", api.ContainerNumberLabel: "1"}}, + }}, + }, + Networks: map[string]ObservedNetwork{ + "frontend": {ID: "net1", Name: "myproject_frontend", ConfigHash: "oldhash"}, + }, + Volumes: map[string]ObservedVolume{}, + } + + plan, err := reconcile(t.Context(), project, observed, defaultReconcileOptions(), noPrompt) + assert.NilError(t, err) + + // Services sorted alphabetically: api before web + assert.Equal(t, plan.String(), strings.TrimSpace(` +[] -> #1 service:api:1, StopContainer, network frontend config changed +[] -> #2 service:web:1, StopContainer, network frontend config changed +[1] -> #3 service:api:1, DisconnectNetwork, network frontend recreate +[2] -> #4 service:web:1, DisconnectNetwork, network frontend recreate +[3,4] -> #5 network:frontend, RemoveNetwork, config hash diverged +[5] -> #6 network:frontend, CreateNetwork, recreate after config change +[6] -> #7 service:api:1, CreateContainer, config changed (tmpName) [recreate:api:1] +[7] -> #8 service:api:1, StopContainer, replaced by #7 [recreate:api:1] +[8] -> #9 service:api:1, RemoveContainer, replaced by #7 [recreate:api:1] +[9] -> #10 service:api:1, RenameContainer, finalize recreate [recreate:api:1] +[6] -> #11 service:web:1, CreateContainer, config changed (tmpName) [recreate:web:1] +[11] -> #12 service:web:1, StopContainer, replaced by #11 [recreate:web:1] +[12] -> #13 service:web:1, RemoveContainer, replaced by #11 [recreate:web:1] +[13] -> #14 service:web:1, RenameContainer, finalize recreate [recreate:web:1] +`)+"\n") +} + +// --- Volume tests --- + +func TestReconcileVolumes_CreateMissing(t *testing.T) { + project := &types.Project{ + Name: "myproject", + Volumes: types.Volumes{"data": {Name: "myproject_data"}}, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{}, + Networks: map[string]ObservedNetwork{}, + Volumes: map[string]ObservedVolume{}, + } + + plan, err := reconcile(t.Context(), project, observed, defaultReconcileOptions(), noPrompt) + assert.NilError(t, err) + + assert.Equal(t, plan.String(), strings.TrimSpace(` +[] -> #1 volume:data, CreateVolume, not found +`)+"\n") +} + +func TestReconcileVolumes_ExistingMatch(t *testing.T) { + vol := types.VolumeConfig{Name: "myproject_data", Driver: "local"} + hash, err := VolumeHash(vol) + assert.NilError(t, err) + + project := &types.Project{ + Name: "myproject", + Volumes: types.Volumes{"data": vol}, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{}, + Networks: map[string]ObservedNetwork{}, + Volumes: map[string]ObservedVolume{ + "data": {Name: "myproject_data", ConfigHash: hash}, + }, + } + + plan, err := reconcile(t.Context(), project, observed, defaultReconcileOptions(), noPrompt) + assert.NilError(t, err) + assert.Assert(t, plan.IsEmpty()) +} + +func TestReconcileVolumes_ExternalSkipped(t *testing.T) { + project := &types.Project{ + Name: "myproject", + Volumes: types.Volumes{"ext": {Name: "external_vol", External: true}}, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{}, + Networks: map[string]ObservedNetwork{}, + Volumes: map[string]ObservedVolume{}, + } + + plan, err := reconcile(t.Context(), project, observed, defaultReconcileOptions(), noPrompt) + assert.NilError(t, err) + assert.Assert(t, plan.IsEmpty()) +} + +func TestReconcileVolumes_DivergedConfirmed(t *testing.T) { + project := &types.Project{ + Name: "myproject", + Volumes: types.Volumes{"data": {Name: "myproject_data", Driver: "local"}}, + Services: types.Services{ + "db": { + Name: "db", + Scale: intPtr(1), + Volumes: []types.ServiceVolumeConfig{ + {Source: "data", Type: "volume"}, + }, + }, + }, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{ + "db": {{ + ID: "c1aabbccddee", Number: 1, State: container.StateRunning, + Summary: container.Summary{ + ID: "c1aabbccddee", + Labels: map[string]string{ + api.ServiceLabel: "db", + api.ContainerNumberLabel: "1", + }, + }, + }}, + }, + Networks: map[string]ObservedNetwork{}, + Volumes: map[string]ObservedVolume{ + "data": {Name: "myproject_data", ConfigHash: "oldhash"}, + }, + } + + plan, err := reconcile(t.Context(), project, observed, defaultReconcileOptions(), alwaysYesPrompt) + assert.NilError(t, err) + + assert.Equal(t, plan.String(), strings.TrimSpace(` +[] -> #1 service:db:1, StopContainer, volume data config changed +[1] -> #2 service:db:1, RemoveContainer, volume data config changed +[2] -> #3 volume:data, RemoveVolume, config hash diverged +[3] -> #4 volume:data, CreateVolume, recreate after config change +[4] -> #5 service:db:1, CreateContainer, config changed (tmpName) [recreate:db:1] +[5] -> #6 service:db:1, StopContainer, replaced by #5 [recreate:db:1] +[6] -> #7 service:db:1, RemoveContainer, replaced by #5 [recreate:db:1] +[7] -> #8 service:db:1, RenameContainer, finalize recreate [recreate:db:1] +`)+"\n") +} + +func TestReconcileVolumes_DivergedDeclined(t *testing.T) { + vol := types.VolumeConfig{Name: "myproject_data", Driver: "local"} + + project := &types.Project{ + Name: "myproject", + Volumes: types.Volumes{"data": {Name: "myproject_data", Driver: "local"}}, + Services: types.Services{ + "db": { + Name: "db", + Scale: intPtr(1), + Volumes: []types.ServiceVolumeConfig{ + {Source: "data", Type: "volume"}, + }, + }, + }, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{ + "db": {{ + ID: "c1", Number: 1, State: container.StateRunning, + ConfigHash: mustServiceHash(t, project.Services["db"]), + Summary: container.Summary{ + ID: "c1", + State: container.StateRunning, + Labels: map[string]string{ + api.ServiceLabel: "db", + api.ContainerNumberLabel: "1", + api.ConfigHashLabel: mustServiceHash(t, project.Services["db"]), + }, + Mounts: []container.MountPoint{{Type: "volume", Name: vol.Name}}, + }, + }}, + }, + Networks: map[string]ObservedNetwork{}, + Volumes: map[string]ObservedVolume{ + "data": {Name: vol.Name, ConfigHash: "oldhash"}, + }, + } + + plan, err := reconcile(t.Context(), project, observed, defaultReconcileOptions(), alwaysNoPrompt) + assert.NilError(t, err) + assert.Assert(t, plan.IsEmpty()) +} + +// --- Container tests --- + +func TestReconcileContainers_NewProject(t *testing.T) { + project := &types.Project{ + Name: "myproject", + Services: types.Services{ + "web": {Name: "web", Scale: intPtr(1)}, + }, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{"web": {}}, + Networks: map[string]ObservedNetwork{}, + Volumes: map[string]ObservedVolume{}, + } + + plan, err := reconcile(t.Context(), project, observed, defaultReconcileOptions(), noPrompt) + assert.NilError(t, err) + + assert.Equal(t, plan.String(), strings.TrimSpace(` +[] -> #1 service:web:1, CreateContainer, no existing container +`)+"\n") +} + +func TestReconcileContainers_AlreadyRunning(t *testing.T) { + svc := types.ServiceConfig{Name: "web", Scale: intPtr(1)} + hash := mustServiceHash(t, svc) + + project := &types.Project{ + Name: "myproject", + Services: types.Services{"web": svc}, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{ + "web": {{ + ID: "c1", Number: 1, State: container.StateRunning, ConfigHash: hash, + Summary: container.Summary{ + ID: "c1", State: container.StateRunning, + Labels: map[string]string{api.ServiceLabel: "web", api.ContainerNumberLabel: "1", api.ConfigHashLabel: hash}, + }, + }}, + }, + Networks: map[string]ObservedNetwork{}, + Volumes: map[string]ObservedVolume{}, + } + + plan, err := reconcile(t.Context(), project, observed, defaultReconcileOptions(), noPrompt) + assert.NilError(t, err) + assert.Assert(t, plan.IsEmpty()) +} + +func TestReconcileContainers_ConfigChanged(t *testing.T) { + project := &types.Project{ + Name: "myproject", + Services: types.Services{ + "web": {Name: "web", Scale: intPtr(1)}, + }, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{ + "web": {{ + ID: "c1aabbccddee", Number: 1, State: container.StateRunning, ConfigHash: "oldhash", + Summary: container.Summary{ + ID: "c1aabbccddee", State: container.StateRunning, + Labels: map[string]string{api.ServiceLabel: "web", api.ContainerNumberLabel: "1", api.ConfigHashLabel: "oldhash"}, + }, + }}, + }, + Networks: map[string]ObservedNetwork{}, + Volumes: map[string]ObservedVolume{}, + } + + plan, err := reconcile(t.Context(), project, observed, defaultReconcileOptions(), noPrompt) + assert.NilError(t, err) + + assert.Equal(t, plan.String(), strings.TrimSpace(` +[] -> #1 service:web:1, CreateContainer, config changed (tmpName) [recreate:web:1] +[1] -> #2 service:web:1, StopContainer, replaced by #1 [recreate:web:1] +[2] -> #3 service:web:1, RemoveContainer, replaced by #1 [recreate:web:1] +[3] -> #4 service:web:1, RenameContainer, finalize recreate [recreate:web:1] +`)+"\n") +} + +func TestReconcileContainers_ScaleUp(t *testing.T) { + svc := types.ServiceConfig{Name: "web", Scale: intPtr(3)} + hash := mustServiceHash(t, svc) + + project := &types.Project{ + Name: "myproject", + Services: types.Services{"web": svc}, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{ + "web": {{ + ID: "c1", Number: 1, State: container.StateRunning, ConfigHash: hash, + Summary: container.Summary{ + ID: "c1", State: container.StateRunning, + Labels: map[string]string{api.ServiceLabel: "web", api.ContainerNumberLabel: "1", api.ConfigHashLabel: hash}, + }, + }}, + }, + Networks: map[string]ObservedNetwork{}, + Volumes: map[string]ObservedVolume{}, + } + + plan, err := reconcile(t.Context(), project, observed, defaultReconcileOptions(), noPrompt) + assert.NilError(t, err) + + assert.Equal(t, plan.String(), strings.TrimSpace(` +[] -> #1 service:web:2, CreateContainer, no existing container +[] -> #2 service:web:3, CreateContainer, no existing container +`)+"\n") +} + +func TestReconcileContainers_ScaleDown(t *testing.T) { + svc := types.ServiceConfig{Name: "web", Scale: intPtr(1)} + hash := mustServiceHash(t, svc) + + project := &types.Project{ + Name: "myproject", + Services: types.Services{"web": svc}, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{ + "web": { + { + ID: "c1", Number: 1, State: container.StateRunning, ConfigHash: hash, + Summary: container.Summary{ + ID: "c1", State: container.StateRunning, + Labels: map[string]string{api.ServiceLabel: "web", api.ContainerNumberLabel: "1", api.ConfigHashLabel: hash}, + }, + }, + { + ID: "c2", Number: 2, State: container.StateRunning, ConfigHash: hash, + Summary: container.Summary{ + ID: "c2", State: container.StateRunning, + Labels: map[string]string{api.ServiceLabel: "web", api.ContainerNumberLabel: "2", api.ConfigHashLabel: hash}, + }, + }, + }, + }, + Networks: map[string]ObservedNetwork{}, + Volumes: map[string]ObservedVolume{}, + } + + plan, err := reconcile(t.Context(), project, observed, defaultReconcileOptions(), noPrompt) + assert.NilError(t, err) + + assert.Equal(t, plan.String(), strings.TrimSpace(` +[] -> #1 service:web:2, StopContainer, scale down +[1] -> #2 service:web:2, RemoveContainer, scale down +`)+"\n") +} + +func TestReconcileContainers_ForceRecreate(t *testing.T) { + svc := types.ServiceConfig{Name: "web", Scale: intPtr(1)} + hash := mustServiceHash(t, svc) + + project := &types.Project{ + Name: "myproject", + Services: types.Services{"web": svc}, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{ + "web": {{ + ID: "c1aabbccddee", Number: 1, State: container.StateRunning, ConfigHash: hash, + Summary: container.Summary{ + ID: "c1aabbccddee", State: container.StateRunning, + Labels: map[string]string{api.ServiceLabel: "web", api.ContainerNumberLabel: "1", api.ConfigHashLabel: hash}, + }, + }}, + }, + Networks: map[string]ObservedNetwork{}, + Volumes: map[string]ObservedVolume{}, + } + + opts := defaultReconcileOptions() + opts.Recreate = api.RecreateForce + + plan, err := reconcile(t.Context(), project, observed, opts, noPrompt) + assert.NilError(t, err) + + assert.Equal(t, plan.String(), strings.TrimSpace(` +[] -> #1 service:web:1, CreateContainer, config changed (tmpName) [recreate:web:1] +[1] -> #2 service:web:1, StopContainer, replaced by #1 [recreate:web:1] +[2] -> #3 service:web:1, RemoveContainer, replaced by #1 [recreate:web:1] +[3] -> #4 service:web:1, RenameContainer, finalize recreate [recreate:web:1] +`)+"\n") +} + +func TestReconcileContainers_NeverRecreate(t *testing.T) { + project := &types.Project{ + Name: "myproject", + Services: types.Services{ + "web": {Name: "web", Scale: intPtr(1)}, + }, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{ + "web": {{ + ID: "c1", Number: 1, State: container.StateRunning, ConfigHash: "oldhash", + Summary: container.Summary{ + ID: "c1", State: container.StateRunning, + Labels: map[string]string{api.ServiceLabel: "web", api.ContainerNumberLabel: "1", api.ConfigHashLabel: "oldhash"}, + }, + }}, + }, + Networks: map[string]ObservedNetwork{}, + Volumes: map[string]ObservedVolume{}, + } + + opts := defaultReconcileOptions() + opts.Recreate = api.RecreateNever + + plan, err := reconcile(t.Context(), project, observed, opts, noPrompt) + assert.NilError(t, err) + assert.Assert(t, plan.IsEmpty()) +} + +func TestReconcileContainers_ExitedIsNoop(t *testing.T) { + svc := types.ServiceConfig{Name: "web", Scale: intPtr(1)} + hash := mustServiceHash(t, svc) + + project := &types.Project{ + Name: "myproject", + Services: types.Services{"web": svc}, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{ + "web": {{ + ID: "c1", Number: 1, State: container.StateExited, ConfigHash: hash, + Summary: container.Summary{ + ID: "c1", State: container.StateExited, + Labels: map[string]string{api.ServiceLabel: "web", api.ContainerNumberLabel: "1", api.ConfigHashLabel: hash}, + }, + }}, + }, + Networks: map[string]ObservedNetwork{}, + Volumes: map[string]ObservedVolume{}, + } + + plan, err := reconcile(t.Context(), project, observed, defaultReconcileOptions(), noPrompt) + assert.NilError(t, err) + // Exited containers are left as-is, matching convergence.go:199 behavior + assert.Assert(t, plan.IsEmpty()) +} + +func TestReconcileOrphans(t *testing.T) { + project := &types.Project{ + Name: "myproject", + Services: types.Services{}, + } + observed := &ObservedState{ + ProjectName: "myproject", + Containers: map[string][]ObservedContainer{}, + Orphans: []ObservedContainer{{ + ID: "orphan1", Number: 1, Name: "myproject-old-1", + Summary: container.Summary{ID: "orphan1"}, + }}, + Networks: map[string]ObservedNetwork{}, + Volumes: map[string]ObservedVolume{}, + } + + opts := defaultReconcileOptions() + opts.RemoveOrphans = true + + plan, err := reconcile(t.Context(), project, observed, opts, noPrompt) + assert.NilError(t, err) + + assert.Equal(t, plan.String(), strings.TrimSpace(` +[] -> #1 orphan:myproject-old-1, StopContainer, orphaned container +[1] -> #2 orphan:myproject-old-1, RemoveContainer, orphaned container +`)+"\n") +} + +// --- Helpers --- + +func mustServiceHash(t *testing.T, svc types.ServiceConfig) string { + t.Helper() + h, err := ServiceHash(svc) + assert.NilError(t, err) + return h +} diff --git a/pkg/compose/run.go b/pkg/compose/run.go index 7ca080264d..007c10dcd2 100644 --- a/pkg/compose/run.go +++ b/pkg/compose/run.go @@ -181,8 +181,7 @@ func (s *composeService) prepareRun(ctx context.Context, project *types.Project, Labels: mergeLabels(service.Labels, service.CustomLabels), } - err = newConvergence(project.ServiceNames(), observedState, nil, nil, s).resolveServiceReferences(&service) - if err != nil { + if err := s.resolveRunServiceReferences(ctx, project.Name, &service); err != nil { return prepareRunResult{}, err } @@ -269,6 +268,14 @@ func applyRunOptions(project *types.Project, service *types.ServiceConfig, opts } } +func (s *composeService) resolveRunServiceReferences(ctx context.Context, projectName string, service *types.ServiceConfig) error { + containersByService, err := s.getContainersByService(ctx, projectName) + if err != nil { + return err + } + return resolveServiceReferences(service, containersByService) +} + func (s *composeService) startDependencies(ctx context.Context, project *types.Project, options api.RunOptions) error { project = project.WithServicesDisabled(options.Service) diff --git a/reconciliation.md b/reconciliation.md new file mode 100644 index 0000000000..a4b54fa1e7 --- /dev/null +++ b/reconciliation.md @@ -0,0 +1,361 @@ +# Reconciliation Engine pour Docker Compose + +## 1. Architecture + +Le pipeline remplace l'ancien systeme `convergence` (convergence.go) par 3 etapes : + +``` +ensureImagesExists -> ensureNetworks/Volumes -> collectObservedState -> reconcile -> executePlan + (inchange) (inchange) (1) (2) (3) +``` + +**Etape 1 -- ObservedState** (`observed_state.go`) : Collecte l'etat de toutes les +ressources Docker du projet (conteneurs, networks, volumes) en un snapshot immutable. + +**Etape 2 -- Reconciler** (`reconcile.go`) : Compare l'ObservedState au modele compose-go +(`types.Project`) et produit un `Plan` -- un DAG d'operations atomiques avec dependances. + +**Etape 3 -- Executor** (`executor.go`) : Parcourt le DAG en parallele et execute chaque +operation via l'API Moby, sans logique decisionnelle. + +### Ce qui n'a PAS change + +- `ensureImagesExists()` reste appele avant le pipeline (build/pull inchange) +- `ensureNetworks()` et `ensureProjectVolumes()` restent appeles avant le pipeline + (gestion des external networks, retries sur conflit, warnings). Leurs resultats (maps + `network_key -> ID` et `volume_key -> name`) sont injectes dans l'ObservedState via + `setResolvedNetworks()`/`setResolvedVolumes()`. +- `start()` reste separe -- il gere le demarrage avec `waitDependencies`, injection de + secrets/configs, et post-start hooks +- `down()` n'est pas modifie +- Le reconciler ne gere PAS la creation/recreation des networks et volumes au niveau API. + Il detecte la divergence des networks/volumes dans l'ObservedState et planifie les + operations containers associees (stop, disconnect, remove, recreate), mais les operations + reelles sur les networks/volumes sont deleguees a `ensureNetworks`/`ensureProjectVolumes`. + +--- + +## 2. Fichiers et responsabilites + +| Fichier | Role | +|---------|------| +| `pkg/compose/observed_state.go` | Types `ObservedState`, `ObservedContainer`, `ObservedNetwork`, `ObservedVolume`. Fonction `collectObservedState()` (methode sur `composeService`). Helpers `setResolvedNetworks()`, `setResolvedVolumes()`, `orphanNames()`, `emitRunningEvents()`. | +| `pkg/compose/plan.go` | Types `Plan`, `PlanNode`, `Operation`, `OperationType`. Methodes `addNode()`, `String()`, `IsEmpty()`. Le `String()` trie les deps par ID pour un output deterministe. | +| `pkg/compose/reconcile.go` | Type `reconciler` avec `ReconcileOptions`. Fonction `reconcile()`. Logique de reconciliation : networks, volumes (avec prompt), conteneurs (mustRecreate, scale, recreation atomique, dependants, orphelins, providers). Helper `toReconcileOptions()`, `sortedKeys()`. | +| `pkg/compose/reconcile_test.go` | 19 tests avec comparaisons `plan.String()` exactes. Helpers : `noPrompt`, `alwaysYesPrompt`, `alwaysNoPrompt`, `mustServiceHash()`. | +| `pkg/compose/executor.go` | Type `planExecutor` avec `reconciliationContext` (propagation de resultats entre noeuds). Methode `executePlan()` sur `composeService`. Handlers pour chaque `OperationType`. `groupTracker` pour l'agregation d'evenements sur les recreations. | +| `pkg/compose/executor_test.go` | 3 tests avec mocks API (plan vide, create network, stop+remove container). | +| `pkg/compose/convergence.go` | **Nettoye** -- la struct `convergence` a ete supprimee. Reste : `resolveServiceReferences()` (fonction standalone), `getScale()`, `getContainerName()`, `nextContainerNumber()`, `waitDependencies()`, `createMobyContainer()`, `createContainer()`, `startService()`, `mergeLabels()`, et les helpers d'evenements. | +| `pkg/compose/containers.go` | Ajout de `getContainersByService()`. Suppression de `Containers.names()`. | +| `pkg/compose/create.go` | `create()` refactorisee : appelle `collectObservedState -> reconcile -> executePlan`. `emitRunningEvents()` emet les evenements "Running" pour les conteneurs inchanges. | + +--- + +## 3. Types d'operations + +Toutes les operations sont **atomiques** -- chacune correspond a exactement un appel API +Moby (sauf `OpCreateContainer` qui fait `ContainerCreate` + `ContainerInspect`). + +```go +OpCreateNetwork // NetworkCreate (via ensureNetwork) +OpRemoveNetwork // NetworkRemove +OpDisconnectNetwork // NetworkDisconnect +OpConnectNetwork // NetworkConnect +OpCreateVolume // VolumeCreate (via createVolume) +OpRemoveVolume // VolumeRemove +OpCreateContainer // ContainerCreate + ContainerInspect (via createMobyContainer) +OpStartContainer // ContainerStart (avec mutex startMx) +OpStopContainer // ContainerStop +OpRemoveContainer // ContainerRemove +OpRenameContainer // ContainerRename +OpRunProvider // runPlugin(ctx, project, service, "up") +``` + +--- + +## 4. Format `Plan.String()` + +``` +[deps] -> #id resource, operation, cause [group] +``` + +Le `String()` trie les IDs de dependances (`sort.Ints`) pour garantir un output +deterministe. Le reconciler trie aussi les cles de maps (`sortedKeys()`) pour garantir +un ordre d'insertion deterministe des noeuds. + +**Exemple -- recreation de conteneur :** +``` +[] -> #1 service:web:1, CreateContainer, config changed (tmpName) [recreate:web:1] +[1] -> #2 service:web:1, StopContainer, replaced by #1 [recreate:web:1] +[2] -> #3 service:web:1, RemoveContainer, replaced by #1 [recreate:web:1] +[3] -> #4 service:web:1, RenameContainer, finalize recreate [recreate:web:1] +``` + +**Exemple -- network diverge avec 2 services :** +``` +[] -> #1 service:api:1, StopContainer, network frontend config changed +[] -> #2 service:web:1, StopContainer, network frontend config changed +[1] -> #3 service:api:1, DisconnectNetwork, network frontend recreate +[2] -> #4 service:web:1, DisconnectNetwork, network frontend recreate +[3,4] -> #5 network:frontend, RemoveNetwork, config hash diverged +[5] -> #6 network:frontend, CreateNetwork, recreate after config change +[6] -> #7 service:api:1, CreateContainer, config changed (tmpName) [recreate:api:1] +... +``` + +--- + +## 5. Logique du reconciler + +Le reconciler (`reconcile.go`) procede dans cet ordre : + +### 5.1 Networks + +Pour chaque network du projet (itere par cle triee) : +- External -> skip +- Absent dans l'ObservedState -> `OpCreateNetwork` +- Config-hash diverge -> sequence : StopContainers -> DisconnectNetwork -> RemoveNetwork -> CreateNetwork + +### 5.2 Volumes + +Pour chaque volume du projet (itere par cle triee) : +- External -> skip +- Absent -> `OpCreateVolume` +- Config-hash diverge -> **appel `prompt`** et si confirme : StopContainers -> RemoveContainers -> RemoveVolume -> CreateVolume + +### 5.3 Conteneurs + +Pour chaque service en ordre de dependances (`visitInDependencyOrder` via `NewGraph`) : +- Provider avec `SkipProviders` -> skip +- Provider -> `OpRunProvider` (enregistre dans `serviceNodes` pour le chainage) +- Calcul du scale attendu +- Tri des conteneurs (obsoletes d'abord, puis numero decroissant) +- Scale down -> `OpStopContainer` + `OpRemoveContainer` +- `mustRecreate` = true -> decomposition en 4 noeuds atomiques (Create tmpName -> Stop -> Remove -> Rename) avec groupe d'evenements +- Exited/Created/Running/Restarting -> noop (pas d'operation) +- Autre etat -> `OpStartContainer` +- Scale up -> `OpCreateContainer` + +### 5.4 Orphelins + +Si `RemoveOrphans` -> `OpStopContainer` + `OpRemoveContainer` pour chaque orphelin. + +### 5.5 Dependances entre noeuds + +- `CreateContainer` depend des noeuds network/volume references + dernier noeud des services `depends_on` +- Pour un recreate, les dependants avec `restart: true` sont stoppes d'abord +- Le champ `serviceNodes` dans le reconciler garde une trace du dernier noeud par service + +--- + +## 6. Executor + +### 6.1 Parcours du DAG + +`executePlan()` cree un channel `done[nodeID]` par noeud. Chaque noeud est lance dans +une goroutine (`errgroup`) qui attend ses dependances via `<-done[dep.ID]`, execute +l'operation, puis ferme son channel. + +### 6.2 ReconciliationContext + +Le `reconciliationContext` propage les resultats entre noeuds : +- `OpCreateContainer` stocke le `ContainerID` cree +- `OpRenameContainer` retrouve le `ContainerID` en remontant la chaine de dependances + +### 6.3 Groupement d'evenements + +Le `groupTracker` gere l'affichage pour les recreations : +- Premier noeud d'un groupe -> emet "Recreate" (Working) +- Dernier noeud d'un groupe -> emet "Recreated" (Done) +- Noeuds intermediaires -> silencieux +- L'event name est derive du `Container` du noeud `StopContainer` (pas du `CreateContainer` + qui n'a qu'un tmpName) + +### 6.4 Resolution des service references + +`resolveServiceReferences()` est appelee dans `execCreateContainer` **au moment de +l'execution** (pas au moment de la reconciliation), car les conteneurs references +(`service:xxx` -> `container:ID` pour network_mode, ipc, pid, volumes_from) peuvent +avoir ete crees par des noeuds precedents du plan. + +### 6.5 Evenements "Running" + +`emitRunningEvents()` est appelee dans `create()` apres la reconciliation et avant +l'execution. Elle emet des evenements "Running" pour les conteneurs existants qui n'ont +aucune operation dans le plan. C'est necessaire pour la sortie progress (les e2e tests +verifient que les conteneurs inchanges affichent "Running"). + +--- + +## 7. Lecons apprises lors de l'implementation + +### 7.1 La resolution des service references ne peut pas etre dans le reconciler + +Les references `service:shareable` -> `container:ID` pour IPC/PID/NetworkMode et +`volumes_from` doivent etre resolues **a l'execution**, pas a la planification. Le +reconciler n'a pas les IDs des conteneurs qui seront crees par des noeuds precedents. + +### 7.2 Les evenements "Running" sont necessaires pour la compatibilite + +L'ancien code emettait `runningEvent()` pour chaque conteneur deja running et conforme. +Les tests e2e (`TestScaleBasicCases`, `TestRestartWithDependencies`) verifient que la +sortie contient "Running" pour ces conteneurs. Sans `emitRunningEvents()`, ces tests +echouent. + +### 7.3 Les providers doivent etre chaines dans le DAG + +Les services providers (`OpRunProvider`) doivent etre enregistres dans `serviceNodes` +et dependre de `infrastructureDeps`, sinon les services qui en dependent ne recevront +pas les variables d'environnement injectees par le provider. + +### 7.4 `serviceNodes` ne doit pas contenir nil + +Quand tous les conteneurs d'un service sont up-to-date, `lastNode` reste nil. Si on +stocke nil dans `serviceNodes`, les services dependants crash (nil pointer dereference +sur `done[dep.ID]` dans l'executor). Solution : ne stocker que les noeuds non-nil. + +### 7.5 L'event name du groupe doit venir du conteneur existant + +Le `buildGroupTracker` doit chercher l'event name sur un noeud qui a un `Container` +(typiquement `StopContainer`), pas sur le premier noeud du groupe (`CreateContainer` +avec tmpName) qui n'a pas de `Container`. + +### 7.6 Les orphelins incluent les one-off + +`collectObservedState` doit utiliser `oneOffInclude` et `isOrphaned()` pour detecter +les conteneurs one-off orphelins (exited/dead), reproduisant le comportement de l'ancien +`create()`. + +### 7.7 Determinisme du plan pour les tests + +L'iteration des maps Go est non-deterministe. Pour des comparaisons `String()` exactes +dans les tests, il faut trier les cles de maps partout : `reconcileNetworks`, +`reconcileVolumes`, `visitInDependencyOrder`, `servicesUsingNetwork`, +`servicesUsingVolume`, `infrastructureDeps`, `hasNetworkMismatch`. Le helper generique +`sortedKeys[V any](map[string]V)` centralise ce pattern. Le `String()` du plan trie +aussi les IDs de dependances. + +### 7.8 Disconnect/Remove utilisent les noms observes, pas desires + +Quand un reseau ou volume a diverge, les operations de disconnect et remove doivent +cibler la ressource **observee** (l'ancienne), pas la ressource **desiree** (la nouvelle +config). Si le nom a change entre les deux, utiliser le nom desire pour le remove +echouerait ou supprimerait la mauvaise ressource. Seul le `CreateNetwork`/`CreateVolume` +utilise le nom desire. + +### 7.9 Le label ContainerReplaceLabel necessite un signal fiable pour le recreate + +Dans `execCreateContainer`, le label `api.ContainerReplaceLabel` doit etre ajoute pour +les conteneurs crees en remplacement d'un autre (recreate). Le signal fiable est +`op.Inherited != nil` (conteneur dont on herite les volumes anonymes), pas +`op.Container != nil` (qui n'est pas set sur le noeud CreateContainer du recreate). + +### 7.10 La boucle `for range` en Go 1.22+ + +Depuis Go 1.22, les variables de boucle `for _, x := range` sont copiees a chaque +iteration. Le pattern `x := x` dans les goroutines n'est plus necessaire. Docker Compose +utilise Go 1.25+. Si le projet devait un jour revenir a une version anterieure a 1.22, +les closures dans `executePlan` devraient capturer explicitement la variable de boucle. + +--- + +## 8. Strategie d'implementation recommandee + +L'implementation doit etre decoupee en etapes additives. Chaque etape produit du code +qui compile, passe le lint et les tests, et peut etre commitee independamment. Le code +existant n'est pas modifie avant l'etape de branchement. + +### Etape 1 -- ObservedState (additif) +Creer `observed_state.go` + `observed_state_test.go`. Types et `collectObservedState`. + +### Etape 2 -- Plan (additif) +Creer `plan.go` + `plan_test.go`. Types, `String()`, `addNode()`. + +### Etape 3 -- Reconciler networks et volumes (additif) +Creer `reconcile.go` + `reconcile_test.go`. Reconciliation des networks et volumes. + +### Etape 4 -- Reconciler conteneurs (additif) +Etendre `reconcile.go` et `reconcile_test.go`. Reconciliation des conteneurs : +mustRecreate, scale, recreation atomique, dependants, orphelins, providers. +Ajouter `sortedKeys` pour le determinisme. + +### Etape 5 -- Executor (additif) +Creer `executor.go` + `executor_test.go`. DAG parallele, reconciliationContext, +groupTracker, handlers. + +### Etape 6 -- Branchement (modification du code existant) +Modifier `create()` pour utiliser le nouveau pipeline. Ajouter `emitRunningEvents()`, +`setResolvedNetworks()`, `setResolvedVolumes()`, `orphanNames()`. +Ajouter `resolveServiceReferences` dans l'executor. +Supprimer le code mort (struct convergence, mustRecreate, checkExpectedNetworks, +checkExpectedVolumes, recreateContainer, startContainer, etc.). +Extraire `resolveServiceReferences` en fonction standalone. +Ajouter `getContainersByService()`. + +**Point critique** : cette etape modifie le comportement. Tous les e2e tests doivent +passer. Les corrections identifiees lors de notre implementation : +- `resolveServiceReferences` dans l'executor (pas le reconciler) +- `emitRunningEvents` pour les conteneurs inchanges +- `OpRunProvider` chaine dans `serviceNodes` +- `serviceNodes` ne stocke pas nil +- `groupTracker` cherche l'event name sur le noeud `StopContainer` +- `oneOffInclude` + `isOrphaned` pour detecter les orphelins one-off + +--- + +## 9. Evolutions futures (hors scope) + +### 9.1 Migrer la logique reseau/volume dans le reconciler + +Actuellement `ensureNetworks`/`ensureProjectVolumes` gerent a la fois la detection de +divergence ET la creation/recreation. A terme, la detection de divergence pourrait migrer +entierement dans le reconciler, et `ensureNetworks`/`ensureProjectVolumes` deviendraient +de simples "find or create" sans logique de divergence. + +### 9.2 Appliquer le pattern a `down()` + +Le flow `down()` (InReverseDependencyOrder + stop + remove + cleanup) pourrait produire +un plan de reconciliation. + +### 9.3 Commande `docker compose plan` + +Exposer le plan via une commande CLI pour visualiser les operations avant execution. + +### 9.4 Integration du build/pull + +`ensureImagesExists()` (build.go) pourrait etre partiellement integre dans le pipeline. +La decouverte d'images locales s'integre dans l'ObservedState. Le pull est atomique et +peut devenir un `OpPullImage` dans le plan. Le build (Bake) est une operation batch qui +ne se decoupe pas proprement en noeuds atomiques -- il resterait une phase separee. +Voir les details dans la section dediee ci-dessous. + +### 9.5 Mode watch + +Le code de watch appelle `create()` avec `SkipProviders: true`. Le pipeline le supporte +deja via `ReconcileOptions.SkipProviders`. + +--- + +## 10. Details sur l'integration future du build/pull + +> **Hors scope du refactoring actuel.** + +### Le probleme Bake + +Bake (`buildWithBake`, build_bake.go) regroupe tous les services a builder en un seul +appel `buildx bake`. C'est incompatible avec le modele "1 noeud = 1 appel API" : +batch, multi-resultat, fallback vers classic builder, pull-then-build. + +### Approches + +- **Option A** : Noeud `OpBuildImages` batch (exception au modele atomique) +- **Option B** : Phase separee `resolveImages()` dont le resultat alimente le reconciler +- **Option C** : Hybride (pulls dans le plan, build hors plan) + +Recommandation : Option B comme premier pas, Option C ensuite. + +### Etat actuel + +`ensureImagesExists()` est appele tel quel avant le pipeline. Les digests d'images sont +dans `service.CustomLabels[ImageDigestLabel]`. Le reconciler les compare via +`mustRecreate()` avec `oc.ImageDigest`.