diff --git a/pkg/operator/etcd_resources.go b/pkg/operator/etcd_resources.go index 80714437..73d9e8d5 100644 --- a/pkg/operator/etcd_resources.go +++ b/pkg/operator/etcd_resources.go @@ -53,6 +53,14 @@ const ( operatorEtcdSnapshotProtectBucketEnv = "KAFSCALE_OPERATOR_ETCD_SNAPSHOT_PROTECT_BUCKET" operatorEtcdStorageMemoryEnv = "KAFSCALE_OPERATOR_ETCD_STORAGE_MEMORY" + // etcd space-management knobs. KafScale writes one MVCC revision per + // offset update (one per produce), so the store grows fast under load. + // These three control periodic compaction and the backend quota; all + // have safe defaults and only need an override on high-write clusters. + operatorEtcdQuotaBackendBytesEnv = "KAFSCALE_OPERATOR_ETCD_QUOTA_BACKEND_BYTES" + operatorEtcdAutoCompactionRetentionEnv = "KAFSCALE_OPERATOR_ETCD_AUTO_COMPACTION_RETENTION" + operatorEtcdAutoCompactionModeEnv = "KAFSCALE_OPERATOR_ETCD_AUTO_COMPACTION_MODE" + defaultEtcdImage = "kubesphere/etcd:3.6.4-0" defaultEtcdctlImage = "ghcr.io/kafscale/kafscale-etcd-tools:dev" defaultEtcdStorageSize = "10Gi" @@ -62,6 +70,16 @@ const ( defaultSnapshotSchedule = "0 * * * *" defaultSnapshotImage = "amazon/aws-cli:2.15.0" defaultSnapshotStaleAfterSeconds = 2 * 60 * 60 + + // 4 GiB backend quota: headroom above etcd's 2 GiB default so a write + // burst cannot exceed the quota between compaction cycles. + defaultEtcdQuotaBackendBytes = int64(4 * 1024 * 1024 * 1024) + // 5 minutes of retained revisions keeps recovery/audit history while the + // compactor keeps pace with the broker write rate. + defaultEtcdAutoCompactionRetention = "5m" + // "periodic" compacts on a wall-clock interval (the retention value); + // "revision" is the only other valid mode. + defaultEtcdAutoCompactionMode = "periodic" ) type EtcdResolution struct { @@ -367,25 +385,46 @@ func reconcileEtcdStatefulSet(ctx context.Context, c client.Client, scheme *runt }, } } - sts.Spec.Template.Spec.Containers = []corev1.Container{ - { - Name: "etcd", - Image: image, - Ports: []corev1.ContainerPort{ - {Name: "client", ContainerPort: 2379}, - {Name: "peer", ContainerPort: 2380}, - }, - Env: []corev1.EnvVar{ - {Name: "POD_NAME", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}}}, - {Name: "POD_NAMESPACE", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"}}}, + etcdContainer := corev1.Container{ + Name: "etcd", + Image: image, + Ports: []corev1.ContainerPort{ + {Name: "client", ContainerPort: 2379}, + {Name: "peer", ContainerPort: 2380}, + }, + Env: []corev1.EnvVar{ + {Name: "POD_NAME", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}}}, + {Name: "POD_NAMESPACE", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"}}}, + }, + Command: []string{"etcd"}, + Args: etcdArgs(cluster), + VolumeMounts: []corev1.VolumeMount{ + {Name: "data", MountPath: "/var/lib/etcd"}, + }, + } + // Memory-mode guard. With storage-memory mode the data dir is a tmpfs + // emptyDir, so every byte etcd writes (up to the backend quota) is + // resident node RAM. A tmpfs has no size cap of its own, so a 4 GiB + // quota could drive ~4 GiB of node memory and risk an OOM that takes + // the node down. The tmpfs allocation counts against this container's + // memory cgroup, so a memory limit bounds it: the kernel reclaims/kills + // inside the container before the node is starved. The limit is the + // quota plus headroom for etcd's own heap and bbolt mmap pages; the + // request matches the quota so the scheduler reserves real capacity. + // Disk-backed (PVC) mode is unchanged: bytes land on the volume, not RAM. + if useMemory { + quota := etcdQuotaBackendBytes() + memLimit := quota + (512 * 1024 * 1024) + etcdContainer.Resources = corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceMemory: *resource.NewQuantity(quota, resource.BinarySI), }, - Command: []string{"etcd"}, - Args: etcdArgs(cluster), - VolumeMounts: []corev1.VolumeMount{ - {Name: "data", MountPath: "/var/lib/etcd"}, + Limits: corev1.ResourceList{ + corev1.ResourceMemory: *resource.NewQuantity(memLimit, resource.BinarySI), }, - }, + } } + sts.Spec.Template.Spec.Containers = []corev1.Container{etcdContainer} return controllerutil.SetControllerReference(cluster, sts, scheme) }) return err @@ -406,6 +445,19 @@ func etcdArgs(cluster *kafscalev1alpha1.KafscaleCluster) []string { "--initial-cluster=" + initialCluster, "--initial-cluster-state=new", "--initial-cluster-token=" + cluster.Name + "-etcd", + // Periodic auto-compaction. KafScale writes one etcd revision per + // offset update (one per produce), so without compaction the DB fills + // to the default 2 GiB quota under load and the broker starts rejecting + // produce with `mvcc: database space exceeded`. Compaction reclaims + // revisions logically (bbolt pages become reusable); it does not shrink + // the physical file, so this reduces NOSPACE frequency rather than + // eliminating it. Physical reclaim (defrag) is a separate follow-up. + // Mode + retention + quota are env-configurable for high-write clusters. + "--auto-compaction-mode=" + etcdAutoCompactionMode(), + "--auto-compaction-retention=" + etcdAutoCompactionRetention(), + // Headroom above the default 2 GiB so a burst cannot exceed the quota + // between compactions; raises the soft cap inside etcd. + "--quota-backend-bytes=" + strconv.FormatInt(etcdQuotaBackendBytes(), 10), } } @@ -722,3 +774,43 @@ func stringPtrOrNil(val string) *string { } return &val } + +// etcdQuotaBackendBytes returns the configured backend quota in bytes, or the +// default. Empty or non-positive / unparseable values fall back to the default +// so a garbage override never disables the quota. +func etcdQuotaBackendBytes() int64 { + raw := strings.TrimSpace(os.Getenv(operatorEtcdQuotaBackendBytesEnv)) + if raw == "" { + return defaultEtcdQuotaBackendBytes + } + parsed, err := strconv.ParseInt(raw, 10, 64) + if err != nil || parsed <= 0 { + return defaultEtcdQuotaBackendBytes + } + return parsed +} + +// etcdAutoCompactionRetention returns the configured retention window, or the +// default. The value is passed verbatim to etcd, which interprets it per the +// compaction mode (a duration like "5m" for periodic, a count for revision). +func etcdAutoCompactionRetention() string { + raw := strings.TrimSpace(os.Getenv(operatorEtcdAutoCompactionRetentionEnv)) + if raw == "" { + return defaultEtcdAutoCompactionRetention + } + return raw +} + +// etcdAutoCompactionMode returns the configured compaction mode, restricted to +// the two values etcd accepts ("periodic", "revision"); anything else falls +// back to the default. +func etcdAutoCompactionMode() string { + switch strings.ToLower(strings.TrimSpace(os.Getenv(operatorEtcdAutoCompactionModeEnv))) { + case "periodic": + return "periodic" + case "revision": + return "revision" + default: + return defaultEtcdAutoCompactionMode + } +} diff --git a/pkg/operator/etcd_resources_test.go b/pkg/operator/etcd_resources_test.go index 68f7b2e7..cd5e8f46 100644 --- a/pkg/operator/etcd_resources_test.go +++ b/pkg/operator/etcd_resources_test.go @@ -17,6 +17,7 @@ package operator import ( "context" + "strings" "testing" appsv1 "k8s.io/api/apps/v1" @@ -188,6 +189,134 @@ func TestSnapshotStaleAfterEnv(t *testing.T) { } } +// argValue returns the value of a `--flag=value` argument, or "" if the flag +// is not present in args. +func argValue(args []string, flag string) string { + prefix := flag + "=" + for _, a := range args { + if strings.HasPrefix(a, prefix) { + return strings.TrimPrefix(a, prefix) + } + } + return "" +} + +func TestEtcdArgsSpaceManagement(t *testing.T) { + cluster := testCluster("compaction", nil) + + cases := []struct { + name string + env map[string]string + wantMode string + wantRetention string + wantQuota string + }{ + { + name: "defaults", + env: nil, + wantMode: "periodic", + wantRetention: "5m", + wantQuota: "4294967296", // 4 GiB + }, + { + name: "env overrides", + env: map[string]string{ + operatorEtcdAutoCompactionModeEnv: "revision", + operatorEtcdAutoCompactionRetentionEnv: "10000", + operatorEtcdQuotaBackendBytesEnv: "8589934592", // 8 GiB + }, + wantMode: "revision", + wantRetention: "10000", + wantQuota: "8589934592", + }, + { + name: "garbage falls back to defaults", + env: map[string]string{ + operatorEtcdAutoCompactionModeEnv: "bogus", + operatorEtcdAutoCompactionRetentionEnv: "", // empty -> default + operatorEtcdQuotaBackendBytesEnv: "-1", + }, + wantMode: "periodic", + wantRetention: "5m", + wantQuota: "4294967296", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + for k, v := range tc.env { + t.Setenv(k, v) + } + args := etcdArgs(cluster) + if got := argValue(args, "--auto-compaction-mode"); got != tc.wantMode { + t.Errorf("auto-compaction-mode = %q, want %q", got, tc.wantMode) + } + if got := argValue(args, "--auto-compaction-retention"); got != tc.wantRetention { + t.Errorf("auto-compaction-retention = %q, want %q", got, tc.wantRetention) + } + if got := argValue(args, "--quota-backend-bytes"); got != tc.wantQuota { + t.Errorf("quota-backend-bytes = %q, want %q", got, tc.wantQuota) + } + }) + } +} + +func TestEtcdMemoryModeSetsContainerMemoryLimit(t *testing.T) { + t.Setenv(operatorEtcdEndpointsEnv, "") + t.Setenv(operatorEtcdStorageMemoryEnv, "true") + cluster := testCluster("mem-guard", nil) + scheme := testScheme(t) + c := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cluster).Build() + + if _, err := EnsureEtcd(context.Background(), c, scheme, cluster); err != nil { + t.Fatalf("EnsureEtcd: %v", err) + } + + sts := &appsv1.StatefulSet{} + assertFound(t, c, sts, cluster.Namespace, cluster.Name+"-etcd") + if len(sts.Spec.VolumeClaimTemplates) != 0 { + t.Fatalf("memory mode should not declare a PVC template, got %d", len(sts.Spec.VolumeClaimTemplates)) + } + if len(sts.Spec.Template.Spec.Containers) == 0 { + t.Fatalf("expected etcd container") + } + res := sts.Spec.Template.Spec.Containers[0].Resources + memReq, okReq := res.Requests[corev1.ResourceMemory] + memLim, okLim := res.Limits[corev1.ResourceMemory] + if !okReq || !okLim { + t.Fatalf("memory mode must set memory request and limit, got requests=%v limits=%v", res.Requests, res.Limits) + } + // request == quota (4 GiB), limit == quota + 512 MiB headroom. + if got := memReq.Value(); got != defaultEtcdQuotaBackendBytes { + t.Errorf("memory request = %d, want %d", got, defaultEtcdQuotaBackendBytes) + } + if got := memLim.Value(); got != defaultEtcdQuotaBackendBytes+(512*1024*1024) { + t.Errorf("memory limit = %d, want %d", got, defaultEtcdQuotaBackendBytes+(512*1024*1024)) + } +} + +func TestEtcdDiskModeHasNoContainerMemoryLimit(t *testing.T) { + t.Setenv(operatorEtcdEndpointsEnv, "") + // storage-memory mode explicitly off (the default); disk-backed PVC path. + cluster := testCluster("disk-mode", nil) + scheme := testScheme(t) + c := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cluster).Build() + + if _, err := EnsureEtcd(context.Background(), c, scheme, cluster); err != nil { + t.Fatalf("EnsureEtcd: %v", err) + } + + sts := &appsv1.StatefulSet{} + assertFound(t, c, sts, cluster.Namespace, cluster.Name+"-etcd") + if len(sts.Spec.VolumeClaimTemplates) == 0 { + t.Fatalf("disk mode must declare a PVC template") + } + res := sts.Spec.Template.Spec.Containers[0].Resources + if len(res.Requests) != 0 || len(res.Limits) != 0 { + t.Fatalf("disk mode must not set container resources, got requests=%v limits=%v", res.Requests, res.Limits) + } +} + func envValue(env []corev1.EnvVar, key string) string { for _, entry := range env { if entry.Name == key {