diff --git a/Makefile b/Makefile index a351bd34..46b728e4 100644 --- a/Makefile +++ b/Makefile @@ -159,7 +159,7 @@ GOLANGCI_LINT = $(LOCALBIN)/golangci-lint ## Tool Versions KUSTOMIZE_VERSION ?= v5.4.3 -CONTROLLER_TOOLS_VERSION ?= v0.16.1 +CONTROLLER_TOOLS_VERSION ?= v0.19.0 ENVTEST_VERSION ?= release-0.19 GOLANGCI_LINT_VERSION ?= v1.64.8 diff --git a/api/v1/objectstore_types.go b/api/v1/objectstore_types.go index f179af9b..3d5cfeb9 100644 --- a/api/v1/objectstore_types.go +++ b/api/v1/objectstore_types.go @@ -94,6 +94,9 @@ type RecoveryWindow struct { // The last failed backup time LastFailedBackupTime *metav1.Time `json:"lastFailedBackupTime,omitempty"` + + // The last time a WAL file was successfully archived by this plugin + LastArchivedWALTime *metav1.Time `json:"lastArchivedWALTime,omitempty"` } // +kubebuilder:object:root=true diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 05f7e61b..2e9ca7ca 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -169,6 +169,10 @@ func (in *RecoveryWindow) DeepCopyInto(out *RecoveryWindow) { in, out := &in.LastFailedBackupTime, &out.LastFailedBackupTime *out = (*in).DeepCopy() } + if in.LastArchivedWALTime != nil { + in, out := &in.LastArchivedWALTime, &out.LastArchivedWALTime + *out = (*in).DeepCopy() + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RecoveryWindow. diff --git a/config/crd/bases/barmancloud.cnpg.io_objectstores.yaml b/config/crd/bases/barmancloud.cnpg.io_objectstores.yaml index a141948e..3715b4f4 100644 --- a/config/crd/bases/barmancloud.cnpg.io_objectstores.yaml +++ b/config/crd/bases/barmancloud.cnpg.io_objectstores.yaml @@ -671,6 +671,11 @@ spec: restored. format: date-time type: string + lastArchivedWALTime: + description: The last time a WAL file was successfully archived + by this plugin + format: date-time + type: string lastFailedBackupTime: description: The last failed backup time format: date-time diff --git a/internal/cnpgi/common/wal.go b/internal/cnpgi/common/wal.go index 8e58cb42..6aebf7e2 100644 --- a/internal/cnpgi/common/wal.go +++ b/internal/cnpgi/common/wal.go @@ -25,6 +25,7 @@ import ( "fmt" "os" "path" + "sync" "time" "github.com/cloudnative-pg/barman-cloud/pkg/archiver" @@ -38,7 +39,10 @@ import ( walUtils "github.com/cloudnative-pg/machinery/pkg/fileutils/wals" "github.com/cloudnative-pg/machinery/pkg/log" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" @@ -67,6 +71,11 @@ func (e *SpoolManagementError) Unwrap() error { return e.err } +const ( + // walStatusUpdateThrottle is the minimum time between status updates for WAL archiving + walStatusUpdateThrottle = 5 * time.Minute +) + // WALServiceImplementation is the implementation of the WAL Service type WALServiceImplementation struct { wal.UnimplementedWALServer @@ -75,6 +84,9 @@ type WALServiceImplementation struct { SpoolDirectory string PGDataPath string PGWALPath string + // LastStatusUpdate tracks the last time we updated the status for each ObjectStore+ServerName + // Key format: "namespace/objectStoreName/serverName" + LastStatusUpdate *sync.Map } // GetCapabilities implements the WALService interface @@ -102,6 +114,37 @@ func (w WALServiceImplementation) GetCapabilities( }, nil } +// shouldUpdateStatus checks if we should update the status based on the throttle. +// It returns true if walStatusUpdateThrottle minutes have passed since the last update, or if this is the first update. +func (w WALServiceImplementation) shouldUpdateStatus(objectStoreKey client.ObjectKey, serverName string) bool { + if w.LastStatusUpdate == nil { + return true + } + + key := fmt.Sprintf("%s/%s", objectStoreKey.String(), serverName) + lastUpdate, ok := w.LastStatusUpdate.Load(key) + if !ok { + return true + } + + lastUpdateTime, ok := lastUpdate.(time.Time) + if !ok { + return true + } + + return time.Since(lastUpdateTime) >= walStatusUpdateThrottle +} + +// recordStatusUpdate records that we just updated the status for a given ObjectStore and server. +func (w WALServiceImplementation) recordStatusUpdate(objectStoreKey client.ObjectKey, serverName string) { + if w.LastStatusUpdate == nil { + return + } + + key := fmt.Sprintf("%s/%s", objectStoreKey.String(), serverName) + w.LastStatusUpdate.Store(key, time.Now()) +} + // Archive implements the WALService interface func (w WALServiceImplementation) Archive( ctx context.Context, @@ -220,6 +263,28 @@ func (w WALServiceImplementation) Archive( } } + // Update the last archived WAL time in the ObjectStore status + // Only update if walStatusUpdateThrottle minutes have passed since the last update to avoid hitting the API server too often + objectStoreKey := configuration.GetBarmanObjectKey() + if w.shouldUpdateStatus(objectStoreKey, configuration.ServerName) { + contextLogger.Debug("Updating last archived WAL time", "serverName", configuration.ServerName) + if err := setLastArchivedWALTime( + ctx, + w.Client, + objectStoreKey, + configuration.ServerName, + time.Now(), + ); err != nil { + // Log the error but don't fail the archive operation + contextLogger.Error(err, "Error updating last archived WAL time in ObjectStore status") + } else { + contextLogger.Debug("Successfully updated last archived WAL time") + w.recordStatusUpdate(objectStoreKey, configuration.ServerName) + } + } else { + contextLogger.Debug("Skipping status update due to throttle", "serverName", configuration.ServerName) + } + return &wal.WALArchiveResult{}, nil } @@ -509,3 +574,30 @@ func isEndOfWALStream(results []barmanRestorer.Result) bool { return false } + +// SetLastArchivedWALTime sets the last archived WAL time in the +// passed object store, for the passed server name. +func setLastArchivedWALTime( + ctx context.Context, + c client.Client, + objectStoreKey client.ObjectKey, + serverName string, + lastArchivedWALTime time.Time, +) error { + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + var objectStore barmancloudv1.ObjectStore + + if err := c.Get(ctx, objectStoreKey, &objectStore); err != nil { + return err + } + recoveryWindow := objectStore.Status.ServerRecoveryWindow[serverName] + recoveryWindow.LastArchivedWALTime = ptr.To(metav1.NewTime(lastArchivedWALTime)) + + if objectStore.Status.ServerRecoveryWindow == nil { + objectStore.Status.ServerRecoveryWindow = make(map[string]barmancloudv1.RecoveryWindow) + } + objectStore.Status.ServerRecoveryWindow[serverName] = recoveryWindow + + return c.Status().Update(ctx, &objectStore) + }) +} diff --git a/internal/cnpgi/instance/metrics.go b/internal/cnpgi/instance/metrics.go index d21a49c0..a58c02fa 100644 --- a/internal/cnpgi/instance/metrics.go +++ b/internal/cnpgi/instance/metrics.go @@ -51,6 +51,7 @@ var ( firstRecoverabilityPointMetricName = buildFqName("first_recoverability_point") lastAvailableBackupTimestampMetricName = buildFqName("last_available_backup_timestamp") lastFailedBackupTimestampMetricName = buildFqName("last_failed_backup_timestamp") + lastArchivedWALTimestampMetricName = buildFqName("last_archived_wal_timestamp") ) func (m metricsImpl) GetCapabilities( @@ -97,6 +98,11 @@ func (m metricsImpl) Define( Help: "The last failed backup as a unix timestamp", ValueType: &metrics.MetricType{Type: metrics.MetricType_TYPE_GAUGE}, }, + { + FqName: lastArchivedWALTimestampMetricName, + Help: "The last archived WAL timestamp as a unix timestamp", + ValueType: &metrics.MetricType{Type: metrics.MetricType_TYPE_GAUGE}, + }, }, }, nil } @@ -136,6 +142,10 @@ func (m metricsImpl) Collect( FqName: lastFailedBackupTimestampMetricName, Value: 0, }, + { + FqName: lastArchivedWALTimestampMetricName, + Value: 0, + }, }, }, nil } @@ -143,6 +153,7 @@ func (m metricsImpl) Collect( var firstRecoverabilityPoint float64 var lastAvailableBackup float64 var lastFailedBackup float64 + var lastArchivedWAL float64 if x.FirstRecoverabilityPoint != nil { firstRecoverabilityPoint = float64(x.FirstRecoverabilityPoint.Unix()) } @@ -152,6 +163,9 @@ func (m metricsImpl) Collect( if x.LastFailedBackupTime != nil { lastFailedBackup = float64(x.LastFailedBackupTime.Unix()) } + if x.LastArchivedWALTime != nil { + lastArchivedWAL = float64(x.LastArchivedWALTime.Unix()) + } return &metrics.CollectMetricsResult{ Metrics: []*metrics.CollectMetric{ @@ -167,6 +181,10 @@ func (m metricsImpl) Collect( FqName: lastFailedBackupTimestampMetricName, Value: lastFailedBackup, }, + { + FqName: lastArchivedWALTimestampMetricName, + Value: lastArchivedWAL, + }, }, }, nil } diff --git a/internal/cnpgi/instance/metrics_test.go b/internal/cnpgi/instance/metrics_test.go index 963332f1..c362a215 100644 --- a/internal/cnpgi/instance/metrics_test.go +++ b/internal/cnpgi/instance/metrics_test.go @@ -22,10 +22,11 @@ package instance import ( "context" "encoding/json" + "time" + cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" "github.com/cloudnative-pg/plugin-barman-cloud/internal/cnpgi/metadata" "k8s.io/utils/ptr" - "time" "github.com/cloudnative-pg/cnpg-i/pkg/metrics" barmancloudv1 "github.com/cloudnative-pg/plugin-barman-cloud/api/v1" @@ -117,7 +118,7 @@ var _ = Describe("Metrics Collect method", func() { res, err := m.Collect(ctx, req) Expect(err).ToNot(HaveOccurred()) Expect(res).ToNot(BeNil()) - Expect(res.Metrics).To(HaveLen(3)) + Expect(res.Metrics).To(HaveLen(4)) // Verify the metrics metricsMap := make(map[string]float64) @@ -131,6 +132,13 @@ var _ = Describe("Metrics Collect method", func() { expectedLastBackup, _ := metricsMap[lastAvailableBackupTimestampMetricName] Expect(expectedLastBackup).To(BeNumerically("~", float64(objectStore.Status.ServerRecoveryWindow[clusterName].LastSuccessfulBackupTime.Unix()), 1)) + + // Check that unset timestamps are 0 + expectedLastFailedBackup, _ := metricsMap[lastFailedBackupTimestampMetricName] + Expect(expectedLastFailedBackup).To(BeZero()) + + expectedLastArchivedWAL, _ := metricsMap[lastArchivedWALTimestampMetricName] + Expect(expectedLastArchivedWAL).To(BeZero()) }) It("should return an error if the object store is not found", func() { diff --git a/internal/cnpgi/instance/start.go b/internal/cnpgi/instance/start.go index b222653e..ffb205b3 100644 --- a/internal/cnpgi/instance/start.go +++ b/internal/cnpgi/instance/start.go @@ -21,6 +21,7 @@ package instance import ( "context" + "sync" "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http" "github.com/cloudnative-pg/cnpg-i/pkg/backup" @@ -47,11 +48,12 @@ type CNPGI struct { func (c *CNPGI) Start(ctx context.Context) error { enrich := func(server *grpc.Server) error { wal.RegisterWALServer(server, common.WALServiceImplementation{ - InstanceName: c.InstanceName, - Client: c.Client, - SpoolDirectory: c.SpoolDirectory, - PGDataPath: c.PGDataPath, - PGWALPath: c.PGWALPath, + InstanceName: c.InstanceName, + Client: c.Client, + SpoolDirectory: c.SpoolDirectory, + PGDataPath: c.PGDataPath, + PGWALPath: c.PGWALPath, + LastStatusUpdate: &sync.Map{}, }) backup.RegisterBackupServer(server, BackupServiceImplementation{ Client: c.Client, diff --git a/internal/cnpgi/restore/start.go b/internal/cnpgi/restore/start.go index efb7828c..f3777151 100644 --- a/internal/cnpgi/restore/start.go +++ b/internal/cnpgi/restore/start.go @@ -22,6 +22,7 @@ package restore import ( "context" "path" + "sync" "github.com/cloudnative-pg/cnpg-i-machinery/pkg/pluginhelper/http" restore "github.com/cloudnative-pg/cnpg-i/pkg/restore/job" @@ -49,11 +50,12 @@ func (c *CNPGI) Start(ctx context.Context) error { enrich := func(server *grpc.Server) error { wal.RegisterWALServer(server, common.WALServiceImplementation{ - InstanceName: c.InstanceName, - Client: c.Client, - SpoolDirectory: c.SpoolDirectory, - PGDataPath: c.PGDataPath, - PGWALPath: path.Join(c.PGDataPath, "pg_wal"), + InstanceName: c.InstanceName, + Client: c.Client, + SpoolDirectory: c.SpoolDirectory, + PGDataPath: c.PGDataPath, + PGWALPath: path.Join(c.PGDataPath, "pg_wal"), + LastStatusUpdate: &sync.Map{}, }) restore.RegisterRestoreJobHooksServer(server, &JobHookImpl{