From 03e5696b53ce917961e684bd9eced41920d08044 Mon Sep 17 00:00:00 2001 From: Scalytics Date: Thu, 4 Jun 2026 19:36:04 +0200 Subject: [PATCH 1/2] fix(operator): enable etcd auto-compaction and raise backend quota The operator-managed etcd takes one revision per offset update (one per produce). Without compaction the MVCC store reaches the default 2 GiB quota under sustained load and the broker then fails produce with 'mvcc: database space exceeded'. Add periodic auto-compaction (5m retention) and raise the backend quota to 4 GiB so bursts do not trip the quota between compactions. Co-Authored-By: Claude Opus 4.8 (1M context) --- pkg/operator/etcd_resources.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/operator/etcd_resources.go b/pkg/operator/etcd_resources.go index 80714437..18186b70 100644 --- a/pkg/operator/etcd_resources.go +++ b/pkg/operator/etcd_resources.go @@ -406,6 +406,16 @@ func etcdArgs(cluster *kafscalev1alpha1.KafscaleCluster) []string { "--initial-cluster=" + initialCluster, "--initial-cluster-state=new", "--initial-cluster-token=" + cluster.Name + "-etcd", + // Periodic auto-compaction. KafScale writes one etcd revision per + // offset update (one per produce), so without compaction the DB fills + // to the default 2 GiB quota under load and the broker starts rejecting + // produce with `mvcc: database space exceeded`. 5 minutes of retention + // keeps recovery/audit data while the GC keeps up with the write rate. + "--auto-compaction-mode=periodic", + "--auto-compaction-retention=5m", + // Headroom above the default 2 GiB so a burst cannot exceed the quota + // between compactions; just raises the soft cap inside etcd. + "--quota-backend-bytes=4294967296", } } From 4aa09c7565c6c37588166745af7977a93a651468 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mirko=20K=C3=A4mpf?= Date: Tue, 16 Jun 2026 00:04:58 +0200 Subject: [PATCH 2/2] fix(operator): make etcd space-management configurable + guard memory mode Build on the periodic auto-compaction + raised backend quota change. Compaction reclaims revisions logically; it does not shrink the physical bbolt file, so this reduces NOSPACE frequency rather than eliminating it. Physical reclaim (defrag) stays a tracked follow-up. - Expose mode, retention, and quota as env-configurable knobs following the existing KAFSCALE_OPERATOR_ETCD_* pattern, with periodic / 5m / 4 GiB as safe defaults. Each parser validates and falls back to the default on empty or garbage input, so an override can never disable the quota or pick an invalid mode. - Memory-mode guard: when the data dir is a tmpfs emptyDir, set a memory request (= quota) and limit (= quota + 512 MiB headroom) on the etcd container so a large quota cannot drive unbounded node RAM and OOM the node. Disk-backed (PVC) mode is unchanged. - Table-driven test for etcdArgs() asserting the three flags emit the defaults and honour env overrides, plus tests for the memory-mode limit and the absence of container resources in disk mode. Co-Authored-By: Claude Opus 4.8 (1M context) --- pkg/operator/etcd_resources.go | 126 ++++++++++++++++++++++----- pkg/operator/etcd_resources_test.go | 129 ++++++++++++++++++++++++++++ 2 files changed, 233 insertions(+), 22 deletions(-) diff --git a/pkg/operator/etcd_resources.go b/pkg/operator/etcd_resources.go index 18186b70..73d9e8d5 100644 --- a/pkg/operator/etcd_resources.go +++ b/pkg/operator/etcd_resources.go @@ -53,6 +53,14 @@ const ( operatorEtcdSnapshotProtectBucketEnv = "KAFSCALE_OPERATOR_ETCD_SNAPSHOT_PROTECT_BUCKET" operatorEtcdStorageMemoryEnv = "KAFSCALE_OPERATOR_ETCD_STORAGE_MEMORY" + // etcd space-management knobs. KafScale writes one MVCC revision per + // offset update (one per produce), so the store grows fast under load. + // These three control periodic compaction and the backend quota; all + // have safe defaults and only need an override on high-write clusters. + operatorEtcdQuotaBackendBytesEnv = "KAFSCALE_OPERATOR_ETCD_QUOTA_BACKEND_BYTES" + operatorEtcdAutoCompactionRetentionEnv = "KAFSCALE_OPERATOR_ETCD_AUTO_COMPACTION_RETENTION" + operatorEtcdAutoCompactionModeEnv = "KAFSCALE_OPERATOR_ETCD_AUTO_COMPACTION_MODE" + defaultEtcdImage = "kubesphere/etcd:3.6.4-0" defaultEtcdctlImage = "ghcr.io/kafscale/kafscale-etcd-tools:dev" defaultEtcdStorageSize = "10Gi" @@ -62,6 +70,16 @@ const ( defaultSnapshotSchedule = "0 * * * *" defaultSnapshotImage = "amazon/aws-cli:2.15.0" defaultSnapshotStaleAfterSeconds = 2 * 60 * 60 + + // 4 GiB backend quota: headroom above etcd's 2 GiB default so a write + // burst cannot exceed the quota between compaction cycles. + defaultEtcdQuotaBackendBytes = int64(4 * 1024 * 1024 * 1024) + // 5 minutes of retained revisions keeps recovery/audit history while the + // compactor keeps pace with the broker write rate. + defaultEtcdAutoCompactionRetention = "5m" + // "periodic" compacts on a wall-clock interval (the retention value); + // "revision" is the only other valid mode. + defaultEtcdAutoCompactionMode = "periodic" ) type EtcdResolution struct { @@ -367,25 +385,46 @@ func reconcileEtcdStatefulSet(ctx context.Context, c client.Client, scheme *runt }, } } - sts.Spec.Template.Spec.Containers = []corev1.Container{ - { - Name: "etcd", - Image: image, - Ports: []corev1.ContainerPort{ - {Name: "client", ContainerPort: 2379}, - {Name: "peer", ContainerPort: 2380}, - }, - Env: []corev1.EnvVar{ - {Name: "POD_NAME", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}}}, - {Name: "POD_NAMESPACE", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"}}}, + etcdContainer := corev1.Container{ + Name: "etcd", + Image: image, + Ports: []corev1.ContainerPort{ + {Name: "client", ContainerPort: 2379}, + {Name: "peer", ContainerPort: 2380}, + }, + Env: []corev1.EnvVar{ + {Name: "POD_NAME", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}}}, + {Name: "POD_NAMESPACE", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"}}}, + }, + Command: []string{"etcd"}, + Args: etcdArgs(cluster), + VolumeMounts: []corev1.VolumeMount{ + {Name: "data", MountPath: "/var/lib/etcd"}, + }, + } + // Memory-mode guard. With storage-memory mode the data dir is a tmpfs + // emptyDir, so every byte etcd writes (up to the backend quota) is + // resident node RAM. A tmpfs has no size cap of its own, so a 4 GiB + // quota could drive ~4 GiB of node memory and risk an OOM that takes + // the node down. The tmpfs allocation counts against this container's + // memory cgroup, so a memory limit bounds it: the kernel reclaims/kills + // inside the container before the node is starved. The limit is the + // quota plus headroom for etcd's own heap and bbolt mmap pages; the + // request matches the quota so the scheduler reserves real capacity. + // Disk-backed (PVC) mode is unchanged: bytes land on the volume, not RAM. + if useMemory { + quota := etcdQuotaBackendBytes() + memLimit := quota + (512 * 1024 * 1024) + etcdContainer.Resources = corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceMemory: *resource.NewQuantity(quota, resource.BinarySI), }, - Command: []string{"etcd"}, - Args: etcdArgs(cluster), - VolumeMounts: []corev1.VolumeMount{ - {Name: "data", MountPath: "/var/lib/etcd"}, + Limits: corev1.ResourceList{ + corev1.ResourceMemory: *resource.NewQuantity(memLimit, resource.BinarySI), }, - }, + } } + sts.Spec.Template.Spec.Containers = []corev1.Container{etcdContainer} return controllerutil.SetControllerReference(cluster, sts, scheme) }) return err @@ -409,13 +448,16 @@ func etcdArgs(cluster *kafscalev1alpha1.KafscaleCluster) []string { // Periodic auto-compaction. KafScale writes one etcd revision per // offset update (one per produce), so without compaction the DB fills // to the default 2 GiB quota under load and the broker starts rejecting - // produce with `mvcc: database space exceeded`. 5 minutes of retention - // keeps recovery/audit data while the GC keeps up with the write rate. - "--auto-compaction-mode=periodic", - "--auto-compaction-retention=5m", + // produce with `mvcc: database space exceeded`. Compaction reclaims + // revisions logically (bbolt pages become reusable); it does not shrink + // the physical file, so this reduces NOSPACE frequency rather than + // eliminating it. Physical reclaim (defrag) is a separate follow-up. + // Mode + retention + quota are env-configurable for high-write clusters. + "--auto-compaction-mode=" + etcdAutoCompactionMode(), + "--auto-compaction-retention=" + etcdAutoCompactionRetention(), // Headroom above the default 2 GiB so a burst cannot exceed the quota - // between compactions; just raises the soft cap inside etcd. - "--quota-backend-bytes=4294967296", + // between compactions; raises the soft cap inside etcd. + "--quota-backend-bytes=" + strconv.FormatInt(etcdQuotaBackendBytes(), 10), } } @@ -732,3 +774,43 @@ func stringPtrOrNil(val string) *string { } return &val } + +// etcdQuotaBackendBytes returns the configured backend quota in bytes, or the +// default. Empty or non-positive / unparseable values fall back to the default +// so a garbage override never disables the quota. +func etcdQuotaBackendBytes() int64 { + raw := strings.TrimSpace(os.Getenv(operatorEtcdQuotaBackendBytesEnv)) + if raw == "" { + return defaultEtcdQuotaBackendBytes + } + parsed, err := strconv.ParseInt(raw, 10, 64) + if err != nil || parsed <= 0 { + return defaultEtcdQuotaBackendBytes + } + return parsed +} + +// etcdAutoCompactionRetention returns the configured retention window, or the +// default. The value is passed verbatim to etcd, which interprets it per the +// compaction mode (a duration like "5m" for periodic, a count for revision). +func etcdAutoCompactionRetention() string { + raw := strings.TrimSpace(os.Getenv(operatorEtcdAutoCompactionRetentionEnv)) + if raw == "" { + return defaultEtcdAutoCompactionRetention + } + return raw +} + +// etcdAutoCompactionMode returns the configured compaction mode, restricted to +// the two values etcd accepts ("periodic", "revision"); anything else falls +// back to the default. +func etcdAutoCompactionMode() string { + switch strings.ToLower(strings.TrimSpace(os.Getenv(operatorEtcdAutoCompactionModeEnv))) { + case "periodic": + return "periodic" + case "revision": + return "revision" + default: + return defaultEtcdAutoCompactionMode + } +} diff --git a/pkg/operator/etcd_resources_test.go b/pkg/operator/etcd_resources_test.go index 68f7b2e7..cd5e8f46 100644 --- a/pkg/operator/etcd_resources_test.go +++ b/pkg/operator/etcd_resources_test.go @@ -17,6 +17,7 @@ package operator import ( "context" + "strings" "testing" appsv1 "k8s.io/api/apps/v1" @@ -188,6 +189,134 @@ func TestSnapshotStaleAfterEnv(t *testing.T) { } } +// argValue returns the value of a `--flag=value` argument, or "" if the flag +// is not present in args. +func argValue(args []string, flag string) string { + prefix := flag + "=" + for _, a := range args { + if strings.HasPrefix(a, prefix) { + return strings.TrimPrefix(a, prefix) + } + } + return "" +} + +func TestEtcdArgsSpaceManagement(t *testing.T) { + cluster := testCluster("compaction", nil) + + cases := []struct { + name string + env map[string]string + wantMode string + wantRetention string + wantQuota string + }{ + { + name: "defaults", + env: nil, + wantMode: "periodic", + wantRetention: "5m", + wantQuota: "4294967296", // 4 GiB + }, + { + name: "env overrides", + env: map[string]string{ + operatorEtcdAutoCompactionModeEnv: "revision", + operatorEtcdAutoCompactionRetentionEnv: "10000", + operatorEtcdQuotaBackendBytesEnv: "8589934592", // 8 GiB + }, + wantMode: "revision", + wantRetention: "10000", + wantQuota: "8589934592", + }, + { + name: "garbage falls back to defaults", + env: map[string]string{ + operatorEtcdAutoCompactionModeEnv: "bogus", + operatorEtcdAutoCompactionRetentionEnv: "", // empty -> default + operatorEtcdQuotaBackendBytesEnv: "-1", + }, + wantMode: "periodic", + wantRetention: "5m", + wantQuota: "4294967296", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + for k, v := range tc.env { + t.Setenv(k, v) + } + args := etcdArgs(cluster) + if got := argValue(args, "--auto-compaction-mode"); got != tc.wantMode { + t.Errorf("auto-compaction-mode = %q, want %q", got, tc.wantMode) + } + if got := argValue(args, "--auto-compaction-retention"); got != tc.wantRetention { + t.Errorf("auto-compaction-retention = %q, want %q", got, tc.wantRetention) + } + if got := argValue(args, "--quota-backend-bytes"); got != tc.wantQuota { + t.Errorf("quota-backend-bytes = %q, want %q", got, tc.wantQuota) + } + }) + } +} + +func TestEtcdMemoryModeSetsContainerMemoryLimit(t *testing.T) { + t.Setenv(operatorEtcdEndpointsEnv, "") + t.Setenv(operatorEtcdStorageMemoryEnv, "true") + cluster := testCluster("mem-guard", nil) + scheme := testScheme(t) + c := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cluster).Build() + + if _, err := EnsureEtcd(context.Background(), c, scheme, cluster); err != nil { + t.Fatalf("EnsureEtcd: %v", err) + } + + sts := &appsv1.StatefulSet{} + assertFound(t, c, sts, cluster.Namespace, cluster.Name+"-etcd") + if len(sts.Spec.VolumeClaimTemplates) != 0 { + t.Fatalf("memory mode should not declare a PVC template, got %d", len(sts.Spec.VolumeClaimTemplates)) + } + if len(sts.Spec.Template.Spec.Containers) == 0 { + t.Fatalf("expected etcd container") + } + res := sts.Spec.Template.Spec.Containers[0].Resources + memReq, okReq := res.Requests[corev1.ResourceMemory] + memLim, okLim := res.Limits[corev1.ResourceMemory] + if !okReq || !okLim { + t.Fatalf("memory mode must set memory request and limit, got requests=%v limits=%v", res.Requests, res.Limits) + } + // request == quota (4 GiB), limit == quota + 512 MiB headroom. + if got := memReq.Value(); got != defaultEtcdQuotaBackendBytes { + t.Errorf("memory request = %d, want %d", got, defaultEtcdQuotaBackendBytes) + } + if got := memLim.Value(); got != defaultEtcdQuotaBackendBytes+(512*1024*1024) { + t.Errorf("memory limit = %d, want %d", got, defaultEtcdQuotaBackendBytes+(512*1024*1024)) + } +} + +func TestEtcdDiskModeHasNoContainerMemoryLimit(t *testing.T) { + t.Setenv(operatorEtcdEndpointsEnv, "") + // storage-memory mode explicitly off (the default); disk-backed PVC path. + cluster := testCluster("disk-mode", nil) + scheme := testScheme(t) + c := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cluster).Build() + + if _, err := EnsureEtcd(context.Background(), c, scheme, cluster); err != nil { + t.Fatalf("EnsureEtcd: %v", err) + } + + sts := &appsv1.StatefulSet{} + assertFound(t, c, sts, cluster.Namespace, cluster.Name+"-etcd") + if len(sts.Spec.VolumeClaimTemplates) == 0 { + t.Fatalf("disk mode must declare a PVC template") + } + res := sts.Spec.Template.Spec.Containers[0].Resources + if len(res.Requests) != 0 || len(res.Limits) != 0 { + t.Fatalf("disk mode must not set container resources, got requests=%v limits=%v", res.Requests, res.Limits) + } +} + func envValue(env []corev1.EnvVar, key string) string { for _, entry := range env { if entry.Name == key {