Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions database/common/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ type InstanceStore interface {
DeleteInstance(ctx context.Context, poolID string, instanceNameOrID string) error
DeleteInstanceByName(ctx context.Context, instanceName string) error
UpdateInstance(ctx context.Context, instanceNameOrID string, param params.UpdateInstanceParams) (params.Instance, error)
// ForceUpdateInstance functions identically to UpdateInstance with one important distinction. It does not
// validate the agent ID or instance status or runner status transitions. This function must only be used in
// recovery scenarios, where GARM crashed or was restarted while runners were going through a transitory
// state like "creating", "deleting", etc. In such cases, we cannot simply pick up where we left off, and
// we must set the instance in "pending_delete" or some other state that allows us to recover.
// An instance in "creating" state is currently being serviced by a call by GARM to the external provider.
// If GARM is just starting up and we see one in "creating", it means that the process that was creating it
// was most likely interrupted before it finished and we must clean it up.
ForceUpdateInstance(ctx context.Context, instanceName string, param params.UpdateInstanceParams) (params.Instance, error)

// Probably a bad idea without some king of filter or at least pagination
//
Expand Down
29 changes: 20 additions & 9 deletions database/sql/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,23 +395,34 @@ func (s *sqlDatabase) applyInstanceUpdates(instance *Instance, param params.Upda
return nil
}

func (s *sqlDatabase) ForceUpdateInstance(ctx context.Context, instanceName string, param params.UpdateInstanceParams) (params.Instance, error) {
return s.updateInstance(ctx, instanceName, param, true)
}

func (s *sqlDatabase) UpdateInstance(ctx context.Context, instanceName string, param params.UpdateInstanceParams) (params.Instance, error) {
return s.updateInstance(ctx, instanceName, param, false)
}

func (s *sqlDatabase) updateInstance(ctx context.Context, instanceName string, param params.UpdateInstanceParams, force bool) (params.Instance, error) {
var rowsAffected int64
err := s.conn.Transaction(func(tx *gorm.DB) error {
instance, err := s.getInstance(ctx, tx, instanceName, "Pool", "ScaleSet")
if err != nil {
return fmt.Errorf("error updating instance: %w", err)
}

// Validate transitions
if err := s.validateAgentID(instance.AgentID, param.AgentID); err != nil {
return err
}
if err := s.validateRunnerStatusTransition(instance.RunnerStatus, param.RunnerStatus); err != nil {
return err
}
if err := s.validateInstanceStatusTransition(instance.Status, param.Status); err != nil {
return err
if !force {
// Validate transitions
if err := s.validateAgentID(instance.AgentID, param.AgentID); err != nil {
return err
}

if err := s.validateRunnerStatusTransition(instance.RunnerStatus, param.RunnerStatus); err != nil {
return err
}
if err := s.validateInstanceStatusTransition(instance.Status, param.Status); err != nil {
return err
}
}

// Apply updates
Expand Down
10 changes: 5 additions & 5 deletions workers/scaleset/scaleset.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (w *Worker) Start() (err error) {
// it appears that it finished booting and is now running.
//
// NOTE: if the instance was in creating and it managed to boot, there
// is a high chance that the we do not have a provider ID for the runner
// is a high chance that we do not have a provider ID for the runner
// inside our database. When removing the runner, the provider will attempt
// to use the instance name instead of the provider ID, the same as when
// creation of the instance fails and we try to clean up any lingering resources
Expand All @@ -251,7 +251,7 @@ func (w *Worker) Start() (err error) {
runnerUpdateParams := params.UpdateInstanceParams{
Status: instanceState,
}
instance, err = w.store.UpdateInstance(w.ctx, instance.Name, runnerUpdateParams)
instance, err = w.store.ForceUpdateInstance(w.ctx, instance.Name, runnerUpdateParams)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
locking.Unlock(instance.Name, false)
Expand All @@ -268,7 +268,7 @@ func (w *Worker) Start() (err error) {
runnerUpdateParams := params.UpdateInstanceParams{
Status: commonParams.InstancePendingDelete,
}
instance, err = w.store.UpdateInstance(w.ctx, instance.Name, runnerUpdateParams)
instance, err = w.store.ForceUpdateInstance(w.ctx, instance.Name, runnerUpdateParams)
if err != nil {
if !errors.Is(err, runnerErrors.ErrNotFound) {
locking.Unlock(instance.Name, false)
Expand Down Expand Up @@ -566,15 +566,16 @@ func (w *Worker) consolidateRunnerState(runners []params.RunnerReference) error
slog.DebugContext(w.ctx, "runner is locked; skipping", "runner_name", runner.Name)
continue
}
defer locking.Unlock(runner.Name, false)

if _, ok := providerRunnersByName[runner.Name]; !ok {
// The runner is not in the provider anymore. Remove it from the DB.
slog.InfoContext(w.ctx, "runner does not exist in provider; removing from database", "runner_name", runner.Name)
if err := w.removeRunnerFromGithubAndSetPendingDelete(runner.Name, runner.AgentID); err != nil {
locking.Unlock(runner.Name, false)
return fmt.Errorf("removing runner %s: %w", runner.Name, err)
}
}
locking.Unlock(runner.Name, false)
}

return nil
Expand Down Expand Up @@ -893,7 +894,6 @@ func (w *Worker) handleScaleDown() {
switch runner.RunnerStatus {
case params.RunnerTerminated, params.RunnerActive:
slog.DebugContext(w.ctx, "runner is not in a valid state; skipping", "runner_name", runner.Name, "runner_status", runner.RunnerStatus)
locking.Unlock(runner.Name, false)
continue
}
locked := locking.TryLock(runner.Name, w.consumerID)
Expand Down
Loading