Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 125 additions & 45 deletions internal/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,16 @@ func RepsetDiffCLI(ctx *cli.Context) error {
return scheduler.RunSingleJob(runCtx, job)
}

// StartSchedulerCLI starts the ACE scheduler and/or API server according to the CLI
// component selection and manages lifecycle and reloads.
//
// StartSchedulerCLI validates that a configuration is loaded, then starts the API
// server, the scheduler, or both depending on the "component" flag ("scheduler",
// "api", or "all"). The API server is started once and runs until shutdown. The
// scheduler runs under a reload loop that listens for SIGHUP to perform an in-place
// config reload; SIGINT or SIGTERM triggers a graceful shutdown of all running
// components. Returns an error if configuration is missing, the component flag is
// invalid, or the API server cannot be initialized when requested.
func StartSchedulerCLI(ctx *cli.Context) error {
if config.Cfg == nil {
return fmt.Errorf("configuration not loaded; run inside a directory with ace.yaml or set ACE_CONFIG")
Expand All @@ -1418,77 +1428,147 @@ func StartSchedulerCLI(ctx *cli.Context) error {
return fmt.Errorf("invalid component %q (expected scheduler, api, or all)", component)
}

jobs, err := scheduler.BuildJobsFromConfig(config.Cfg)
if err != nil {
return err
}

// runCtx is canceled on SIGINT or SIGTERM – triggers a full shutdown.
runCtx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

type runner struct {
name string
run func(context.Context) error
}

var runners []runner

if runScheduler {
if len(jobs) == 0 {
logger.Info("scheduler: no enabled jobs found in configuration")
} else {
for _, job := range jobs {
logger.Info("scheduler: registering job %s", job.Name)
}
runners = append(runners, runner{
name: "scheduler",
run: func(ctx context.Context) error {
return scheduler.RunJobs(ctx, jobs)
},
})
}
}
// sighupCh receives SIGHUP signals for in-place config reload.
sighupCh := make(chan os.Signal, 1)
signal.Notify(sighupCh, syscall.SIGHUP)
defer signal.Stop(sighupCh)

// Start the API server once. It does not need to restart on reload because
// it handles on-demand requests rather than reading scheduled job config.
if runAPI {
if ok, apiErr := canStartAPIServer(config.Cfg); ok {
apiServer, err := server.New(config.Cfg)
if err != nil {
return fmt.Errorf("api server init failed: %w", err)
}
runners = append(runners, runner{
name: "api-server",
run: func(ctx context.Context) error {
return apiServer.Run(ctx)
},
})
go func() {
if err := apiServer.Run(runCtx); err != nil && !errors.Is(err, context.Canceled) {
logger.Error("api server error: %v", err)
stop()
}
}()
} else if component == "api" {
return fmt.Errorf("api server requested but cannot start: %w", apiErr)
} else {
logger.Info("api server not started: %v", apiErr)
}
}

if len(runners) == 0 {
if !runScheduler {
// API-only mode: just wait for shutdown.
<-runCtx.Done()
return nil
}

errCh := make(chan error, len(runners))
for _, r := range runners {
go func(r runner) {
errCh <- r.run(runCtx)
}(r)
}
// schedulerReloadLoop runs the scheduler and restarts it on each valid
// SIGHUP. It returns only when runCtx is canceled (SIGINT/SIGTERM).
return schedulerReloadLoop(runCtx, sighupCh)
}

for i := 0; i < len(runners); i++ {
if err := <-errCh; err != nil && !errors.Is(err, context.Canceled) {
stop()
// schedulerReloadLoop is the heart of the SIGHUP feature.
//
// Design:
// 1. Build jobs from the current config and start the gocron scheduler.
// 2. Block waiting for either a shutdown signal (runCtx.Done) or SIGHUP.
// 3. On SIGHUP:
// a. Load and validate the new config (parse YAML + dry-run job build).
// b. If invalid: log the error and keep the current scheduler running.
// c. If valid: cancel the per-iteration scheduler context, which causes
// gocron.Shutdown() to drain all in-flight jobs before returning.
// Then atomically swap in the new config and loop back to step 1.
// schedulerReloadLoop runs scheduler jobs from the current configuration and watches for SIGHUP to perform validated, atomic config reloads.
// It drains in-flight jobs before applying a new configuration and responds to runCtx cancellation by draining and exiting.
// The provided sighupCh is used to receive SIGHUP notifications that trigger a reload attempt.
// It returns nil on graceful shutdown, or an error if building or running jobs fails irrecoverably.
func schedulerReloadLoop(
runCtx context.Context,
sighupCh <-chan os.Signal,
) error {
for {
currentCfg := config.Get()

jobs, err := scheduler.BuildJobsFromConfig(currentCfg)
if err != nil {
return err
}
}

return nil
if len(jobs) == 0 {
logger.Info("scheduler: no enabled jobs found in configuration")
} else {
for _, job := range jobs {
logger.Info("scheduler: registering job %s", job.Name)
}
}

// Per-iteration context: canceled to trigger graceful scheduler drain.
schedCtx, schedCancel := context.WithCancel(runCtx)
schedDone := make(chan error, 1)

go func() {
if len(jobs) == 0 {
// No jobs: park until the context is canceled.
<-schedCtx.Done()
schedDone <- nil
return
}
schedDone <- scheduler.RunJobs(schedCtx, jobs)
}()

// Inner loop: handle repeated SIGHUPs without restarting for invalid configs.
reloaded := false
for !reloaded {
select {
case <-runCtx.Done():
// SIGINT / SIGTERM: drain in-flight ops and exit.
schedCancel()
if err := <-schedDone; err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil

case <-sighupCh:
logger.Info("scheduler: received SIGHUP – reloading configuration from %s", config.CfgPath)

newCfg, loadErr := config.Reload(config.CfgPath)
if loadErr != nil {
logger.Error("scheduler: config reload failed (keeping current config): %v", loadErr)
continue // wait for next signal
}

// Validate the new config by doing a dry-run job build.
if _, buildErr := scheduler.BuildJobsFromConfig(newCfg); buildErr != nil {
logger.Error("scheduler: new config rejected (keeping current config): %v", buildErr)
continue // wait for next signal
}

// New config is valid. Drain in-flight jobs, then swap.
logger.Info("scheduler: new configuration valid – draining in-flight operations")
schedCancel()
if err := <-schedDone; err != nil && !errors.Is(err, context.Canceled) {
logger.Error("scheduler: error while draining for reload: %v", err)
// The scheduler exited with a real error; propagate it.
return err
}

// Atomic config swap.
config.Set(newCfg)
logger.Info("scheduler: configuration reloaded successfully")
reloaded = true // break inner loop → outer loop restarts scheduler
}
}
// schedCancel is always called before reaching here (either inside the
// SIGTERM branch, which returns, or inside the SIGHUP branch above).
schedCancel()
}
}

// StartAPIServerCLI starts the ACE HTTP API server using the currently loaded configuration.
//
// The function requires a loaded configuration with the server TLS and certificate-authority settings configured; it returns an error if those prerequisites are not satisfied or if the server fails to initialize. The call blocks until the server exits or the process receives an interrupt/terminate signal, and it returns the server's error (if any).
func StartAPIServerCLI(ctx *cli.Context) error {
if config.Cfg == nil {
return fmt.Errorf("configuration not loaded; run inside a directory with ace.yaml or set ACE_CONFIG")
Expand Down
59 changes: 55 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package config
import (
"os"
"strings"
"sync"

"gopkg.in/yaml.v3"
)
Expand Down Expand Up @@ -113,9 +114,20 @@ type CertAuthConfig struct {
}

// Cfg holds the loaded config for the whole app.
// Reads may use Cfg directly; concurrent reloads must go through Set/Get.
var Cfg *Config

// Load reads and parses path into a Config.
// CfgPath is the path from which the current config was loaded.
// It is set by Init and used by the SIGHUP reload handler.
var CfgPath string

// cfgMu protects Cfg and CfgPath during live reloads.
var cfgMu sync.RWMutex

// Load reads the YAML file at the given path and unmarshals it into a Config.
// It performs no mutation of package-level state; callers must call Set to apply
// the loaded configuration. It returns the parsed Config or an error if the
// file cannot be read or the YAML cannot be decoded.
func Load(path string) (*Config, error) {
data, err := os.ReadFile(path)
if err != nil {
Expand All @@ -128,20 +140,59 @@ func Load(path string) (*Config, error) {
return &c, nil
}

// Init loads the config and assigns it to the package variable.
// Init loads the configuration from the file at path and sets the package-level
// Cfg and CfgPath under a write lock to enable safe concurrent access.
// It returns any error encountered while loading or parsing the configuration.
func Init(path string) error {
c, err := Load(path)
if err != nil {
return err
}
cfgMu.Lock()
Cfg = c
CfgPath = path
cfgMu.Unlock()
return nil
}

// Reload loads a new Config from path and returns it for validation.
// The caller is responsible for calling Set to apply the new config.
// Reload attempts to parse the configuration file at the given path without applying it.
// If path is empty, Reload uses the most recently loaded configuration path (CfgPath) while holding a read lock.
// It does not modify the active package configuration (Cfg).
// It returns the parsed *Config on success, or an error if loading or parsing fails.
func Reload(path string) (*Config, error) {
if path == "" {
cfgMu.RLock()
path = CfgPath
cfgMu.RUnlock()
}
return Load(path)
}

// Set replaces the package's active configuration with c atomically.
// Passing nil sets the active configuration to nil.
func Set(c *Config) {
cfgMu.Lock()
Cfg = c
cfgMu.Unlock()
}

// Get returns the active configuration under a read lock.
// Prefer Get() over reading Cfg directly in code that runs concurrently with
// Get returns the currently active Config instance and is safe for concurrent use.
func Get() *Config {
cfgMu.RLock()
defer cfgMu.RUnlock()
return Cfg
}

// DefaultCluster returns the trimmed default cluster name from the loaded config.
// If no configuration is loaded, it returns the empty string.
func DefaultCluster() string {
if Cfg == nil {
c := Get()
if c == nil {
return ""
}
return strings.TrimSpace(Cfg.DefaultCluster)
return strings.TrimSpace(c.DefaultCluster)
}
Loading