From 6be6182684aced3958c5b3066efb1585dc92dac1 Mon Sep 17 00:00:00 2001 From: "coderabbitai[bot]" <136622811+coderabbitai[bot]@users.noreply.github.com> Date: Thu, 5 Mar 2026 09:23:24 +0000 Subject: [PATCH] =?UTF-8?q?=F0=9F=93=9D=20Add=20docstrings=20to=20`ace-160?= =?UTF-8?q?`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Docstrings generation was requested by @danolivo. * https://github.com/pgEdge/ace/pull/84#issuecomment-4003541509 The following files were modified: * `internal/cli/cli.go` * `pkg/config/config.go` --- internal/cli/cli.go | 170 +++++++++++++++++++++++++++++++------------ pkg/config/config.go | 59 ++++++++++++++- 2 files changed, 180 insertions(+), 49 deletions(-) diff --git a/internal/cli/cli.go b/internal/cli/cli.go index 1e10639..9298a3a 100644 --- a/internal/cli/cli.go +++ b/internal/cli/cli.go @@ -1398,6 +1398,16 @@ func RepsetDiffCLI(ctx *cli.Context) error { return scheduler.RunSingleJob(runCtx, job) } +// StartSchedulerCLI starts the ACE scheduler and/or API server according to the CLI +// component selection and manages lifecycle and reloads. +// +// StartSchedulerCLI validates that a configuration is loaded, then starts the API +// server, the scheduler, or both depending on the "component" flag ("scheduler", +// "api", or "all"). The API server is started once and runs until shutdown. The +// scheduler runs under a reload loop that listens for SIGHUP to perform an in-place +// config reload; SIGINT or SIGTERM triggers a graceful shutdown of all running +// components. Returns an error if configuration is missing, the component flag is +// invalid, or the API server cannot be initialized when requested. func StartSchedulerCLI(ctx *cli.Context) error { if config.Cfg == nil { return fmt.Errorf("configuration not loaded; run inside a directory with ace.yaml or set ACE_CONFIG") @@ -1418,49 +1428,29 @@ func StartSchedulerCLI(ctx *cli.Context) error { return fmt.Errorf("invalid component %q (expected scheduler, api, or all)", component) } - jobs, err := scheduler.BuildJobsFromConfig(config.Cfg) - if err != nil { - return err - } - + // runCtx is canceled on SIGINT or SIGTERM – triggers a full shutdown. runCtx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer stop() - type runner struct { - name string - run func(context.Context) error - } - - var runners []runner - - if runScheduler { - if len(jobs) == 0 { - logger.Info("scheduler: no enabled jobs found in configuration") - } else { - for _, job := range jobs { - logger.Info("scheduler: registering job %s", job.Name) - } - runners = append(runners, runner{ - name: "scheduler", - run: func(ctx context.Context) error { - return scheduler.RunJobs(ctx, jobs) - }, - }) - } - } + // sighupCh receives SIGHUP signals for in-place config reload. + sighupCh := make(chan os.Signal, 1) + signal.Notify(sighupCh, syscall.SIGHUP) + defer signal.Stop(sighupCh) + // Start the API server once. It does not need to restart on reload because + // it handles on-demand requests rather than reading scheduled job config. if runAPI { if ok, apiErr := canStartAPIServer(config.Cfg); ok { apiServer, err := server.New(config.Cfg) if err != nil { return fmt.Errorf("api server init failed: %w", err) } - runners = append(runners, runner{ - name: "api-server", - run: func(ctx context.Context) error { - return apiServer.Run(ctx) - }, - }) + go func() { + if err := apiServer.Run(runCtx); err != nil && !errors.Is(err, context.Canceled) { + logger.Error("api server error: %v", err) + stop() + } + }() } else if component == "api" { return fmt.Errorf("api server requested but cannot start: %w", apiErr) } else { @@ -1468,27 +1458,117 @@ func StartSchedulerCLI(ctx *cli.Context) error { } } - if len(runners) == 0 { + if !runScheduler { + // API-only mode: just wait for shutdown. + <-runCtx.Done() return nil } - errCh := make(chan error, len(runners)) - for _, r := range runners { - go func(r runner) { - errCh <- r.run(runCtx) - }(r) - } + // schedulerReloadLoop runs the scheduler and restarts it on each valid + // SIGHUP. It returns only when runCtx is canceled (SIGINT/SIGTERM). + return schedulerReloadLoop(runCtx, sighupCh) +} - for i := 0; i < len(runners); i++ { - if err := <-errCh; err != nil && !errors.Is(err, context.Canceled) { - stop() +// schedulerReloadLoop is the heart of the SIGHUP feature. +// +// Design: +// 1. Build jobs from the current config and start the gocron scheduler. +// 2. Block waiting for either a shutdown signal (runCtx.Done) or SIGHUP. +// 3. On SIGHUP: +// a. Load and validate the new config (parse YAML + dry-run job build). +// b. If invalid: log the error and keep the current scheduler running. +// c. If valid: cancel the per-iteration scheduler context, which causes +// gocron.Shutdown() to drain all in-flight jobs before returning. +// Then atomically swap in the new config and loop back to step 1. +// schedulerReloadLoop runs scheduler jobs from the current configuration and watches for SIGHUP to perform validated, atomic config reloads. +// It drains in-flight jobs before applying a new configuration and responds to runCtx cancellation by draining and exiting. +// The provided sighupCh is used to receive SIGHUP notifications that trigger a reload attempt. +// It returns nil on graceful shutdown, or an error if building or running jobs fails irrecoverably. +func schedulerReloadLoop( + runCtx context.Context, + sighupCh <-chan os.Signal, +) error { + for { + currentCfg := config.Get() + + jobs, err := scheduler.BuildJobsFromConfig(currentCfg) + if err != nil { return err } - } - return nil + if len(jobs) == 0 { + logger.Info("scheduler: no enabled jobs found in configuration") + } else { + for _, job := range jobs { + logger.Info("scheduler: registering job %s", job.Name) + } + } + + // Per-iteration context: canceled to trigger graceful scheduler drain. + schedCtx, schedCancel := context.WithCancel(runCtx) + schedDone := make(chan error, 1) + + go func() { + if len(jobs) == 0 { + // No jobs: park until the context is canceled. + <-schedCtx.Done() + schedDone <- nil + return + } + schedDone <- scheduler.RunJobs(schedCtx, jobs) + }() + + // Inner loop: handle repeated SIGHUPs without restarting for invalid configs. + reloaded := false + for !reloaded { + select { + case <-runCtx.Done(): + // SIGINT / SIGTERM: drain in-flight ops and exit. + schedCancel() + if err := <-schedDone; err != nil && !errors.Is(err, context.Canceled) { + return err + } + return nil + + case <-sighupCh: + logger.Info("scheduler: received SIGHUP – reloading configuration from %s", config.CfgPath) + + newCfg, loadErr := config.Reload(config.CfgPath) + if loadErr != nil { + logger.Error("scheduler: config reload failed (keeping current config): %v", loadErr) + continue // wait for next signal + } + + // Validate the new config by doing a dry-run job build. + if _, buildErr := scheduler.BuildJobsFromConfig(newCfg); buildErr != nil { + logger.Error("scheduler: new config rejected (keeping current config): %v", buildErr) + continue // wait for next signal + } + + // New config is valid. Drain in-flight jobs, then swap. + logger.Info("scheduler: new configuration valid – draining in-flight operations") + schedCancel() + if err := <-schedDone; err != nil && !errors.Is(err, context.Canceled) { + logger.Error("scheduler: error while draining for reload: %v", err) + // The scheduler exited with a real error; propagate it. + return err + } + + // Atomic config swap. + config.Set(newCfg) + logger.Info("scheduler: configuration reloaded successfully") + reloaded = true // break inner loop → outer loop restarts scheduler + } + } + // schedCancel is always called before reaching here (either inside the + // SIGTERM branch, which returns, or inside the SIGHUP branch above). + schedCancel() + } } +// StartAPIServerCLI starts the ACE HTTP API server using the currently loaded configuration. +// +// The function requires a loaded configuration with the server TLS and certificate-authority settings configured; it returns an error if those prerequisites are not satisfied or if the server fails to initialize. The call blocks until the server exits or the process receives an interrupt/terminate signal, and it returns the server's error (if any). func StartAPIServerCLI(ctx *cli.Context) error { if config.Cfg == nil { return fmt.Errorf("configuration not loaded; run inside a directory with ace.yaml or set ACE_CONFIG") diff --git a/pkg/config/config.go b/pkg/config/config.go index 32c42f0..55db864 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -14,6 +14,7 @@ package config import ( "os" "strings" + "sync" "gopkg.in/yaml.v3" ) @@ -113,9 +114,20 @@ type CertAuthConfig struct { } // Cfg holds the loaded config for the whole app. +// Reads may use Cfg directly; concurrent reloads must go through Set/Get. var Cfg *Config -// Load reads and parses path into a Config. +// CfgPath is the path from which the current config was loaded. +// It is set by Init and used by the SIGHUP reload handler. +var CfgPath string + +// cfgMu protects Cfg and CfgPath during live reloads. +var cfgMu sync.RWMutex + +// Load reads the YAML file at the given path and unmarshals it into a Config. +// It performs no mutation of package-level state; callers must call Set to apply +// the loaded configuration. It returns the parsed Config or an error if the +// file cannot be read or the YAML cannot be decoded. func Load(path string) (*Config, error) { data, err := os.ReadFile(path) if err != nil { @@ -128,20 +140,59 @@ func Load(path string) (*Config, error) { return &c, nil } -// Init loads the config and assigns it to the package variable. +// Init loads the configuration from the file at path and sets the package-level +// Cfg and CfgPath under a write lock to enable safe concurrent access. +// It returns any error encountered while loading or parsing the configuration. func Init(path string) error { c, err := Load(path) if err != nil { return err } + cfgMu.Lock() Cfg = c + CfgPath = path + cfgMu.Unlock() return nil } +// Reload loads a new Config from path and returns it for validation. +// The caller is responsible for calling Set to apply the new config. +// Reload attempts to parse the configuration file at the given path without applying it. +// If path is empty, Reload uses the most recently loaded configuration path (CfgPath) while holding a read lock. +// It does not modify the active package configuration (Cfg). +// It returns the parsed *Config on success, or an error if loading or parsing fails. +func Reload(path string) (*Config, error) { + if path == "" { + cfgMu.RLock() + path = CfgPath + cfgMu.RUnlock() + } + return Load(path) +} + +// Set replaces the package's active configuration with c atomically. +// Passing nil sets the active configuration to nil. +func Set(c *Config) { + cfgMu.Lock() + Cfg = c + cfgMu.Unlock() +} + +// Get returns the active configuration under a read lock. +// Prefer Get() over reading Cfg directly in code that runs concurrently with +// Get returns the currently active Config instance and is safe for concurrent use. +func Get() *Config { + cfgMu.RLock() + defer cfgMu.RUnlock() + return Cfg +} + // DefaultCluster returns the trimmed default cluster name from the loaded config. +// If no configuration is loaded, it returns the empty string. func DefaultCluster() string { - if Cfg == nil { + c := Get() + if c == nil { return "" } - return strings.TrimSpace(Cfg.DefaultCluster) + return strings.TrimSpace(c.DefaultCluster) }