Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 108 additions & 16 deletions pkg/operator/etcd_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ const (
operatorEtcdSnapshotProtectBucketEnv = "KAFSCALE_OPERATOR_ETCD_SNAPSHOT_PROTECT_BUCKET"
operatorEtcdStorageMemoryEnv = "KAFSCALE_OPERATOR_ETCD_STORAGE_MEMORY"

// etcd space-management knobs. KafScale writes one MVCC revision per
// offset update (one per produce), so the store grows fast under load.
// These three control periodic compaction and the backend quota; all
// have safe defaults and only need an override on high-write clusters.
operatorEtcdQuotaBackendBytesEnv = "KAFSCALE_OPERATOR_ETCD_QUOTA_BACKEND_BYTES"
operatorEtcdAutoCompactionRetentionEnv = "KAFSCALE_OPERATOR_ETCD_AUTO_COMPACTION_RETENTION"
operatorEtcdAutoCompactionModeEnv = "KAFSCALE_OPERATOR_ETCD_AUTO_COMPACTION_MODE"

defaultEtcdImage = "kubesphere/etcd:3.6.4-0"
defaultEtcdctlImage = "ghcr.io/kafscale/kafscale-etcd-tools:dev"
defaultEtcdStorageSize = "10Gi"
Expand All @@ -62,6 +70,16 @@ const (
defaultSnapshotSchedule = "0 * * * *"
defaultSnapshotImage = "amazon/aws-cli:2.15.0"
defaultSnapshotStaleAfterSeconds = 2 * 60 * 60

// 4 GiB backend quota: headroom above etcd's 2 GiB default so a write
// burst cannot exceed the quota between compaction cycles.
defaultEtcdQuotaBackendBytes = int64(4 * 1024 * 1024 * 1024)
// 5 minutes of retained revisions keeps recovery/audit history while the
// compactor keeps pace with the broker write rate.
defaultEtcdAutoCompactionRetention = "5m"
// "periodic" compacts on a wall-clock interval (the retention value);
// "revision" is the only other valid mode.
defaultEtcdAutoCompactionMode = "periodic"
)

type EtcdResolution struct {
Expand Down Expand Up @@ -367,25 +385,46 @@ func reconcileEtcdStatefulSet(ctx context.Context, c client.Client, scheme *runt
},
}
}
sts.Spec.Template.Spec.Containers = []corev1.Container{
{
Name: "etcd",
Image: image,
Ports: []corev1.ContainerPort{
{Name: "client", ContainerPort: 2379},
{Name: "peer", ContainerPort: 2380},
},
Env: []corev1.EnvVar{
{Name: "POD_NAME", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}}},
{Name: "POD_NAMESPACE", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"}}},
etcdContainer := corev1.Container{
Name: "etcd",
Image: image,
Ports: []corev1.ContainerPort{
{Name: "client", ContainerPort: 2379},
{Name: "peer", ContainerPort: 2380},
},
Env: []corev1.EnvVar{
{Name: "POD_NAME", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}}},
{Name: "POD_NAMESPACE", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"}}},
},
Command: []string{"etcd"},
Args: etcdArgs(cluster),
VolumeMounts: []corev1.VolumeMount{
{Name: "data", MountPath: "/var/lib/etcd"},
},
}
// Memory-mode guard. With storage-memory mode the data dir is a tmpfs
// emptyDir, so every byte etcd writes (up to the backend quota) is
// resident node RAM. A tmpfs has no size cap of its own, so a 4 GiB
// quota could drive ~4 GiB of node memory and risk an OOM that takes
// the node down. The tmpfs allocation counts against this container's
// memory cgroup, so a memory limit bounds it: the kernel reclaims/kills
// inside the container before the node is starved. The limit is the
// quota plus headroom for etcd's own heap and bbolt mmap pages; the
// request matches the quota so the scheduler reserves real capacity.
// Disk-backed (PVC) mode is unchanged: bytes land on the volume, not RAM.
if useMemory {
quota := etcdQuotaBackendBytes()
memLimit := quota + (512 * 1024 * 1024)
etcdContainer.Resources = corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceMemory: *resource.NewQuantity(quota, resource.BinarySI),
},
Command: []string{"etcd"},
Args: etcdArgs(cluster),
VolumeMounts: []corev1.VolumeMount{
{Name: "data", MountPath: "/var/lib/etcd"},
Limits: corev1.ResourceList{
corev1.ResourceMemory: *resource.NewQuantity(memLimit, resource.BinarySI),
},
},
}
}
sts.Spec.Template.Spec.Containers = []corev1.Container{etcdContainer}
return controllerutil.SetControllerReference(cluster, sts, scheme)
})
return err
Expand All @@ -406,6 +445,19 @@ func etcdArgs(cluster *kafscalev1alpha1.KafscaleCluster) []string {
"--initial-cluster=" + initialCluster,
"--initial-cluster-state=new",
"--initial-cluster-token=" + cluster.Name + "-etcd",
// Periodic auto-compaction. KafScale writes one etcd revision per
// offset update (one per produce), so without compaction the DB fills
// to the default 2 GiB quota under load and the broker starts rejecting
// produce with `mvcc: database space exceeded`. Compaction reclaims
// revisions logically (bbolt pages become reusable); it does not shrink
// the physical file, so this reduces NOSPACE frequency rather than
// eliminating it. Physical reclaim (defrag) is a separate follow-up.
// Mode + retention + quota are env-configurable for high-write clusters.
"--auto-compaction-mode=" + etcdAutoCompactionMode(),
"--auto-compaction-retention=" + etcdAutoCompactionRetention(),
// Headroom above the default 2 GiB so a burst cannot exceed the quota
// between compactions; raises the soft cap inside etcd.
"--quota-backend-bytes=" + strconv.FormatInt(etcdQuotaBackendBytes(), 10),
}
}

Expand Down Expand Up @@ -722,3 +774,43 @@ func stringPtrOrNil(val string) *string {
}
return &val
}

// etcdQuotaBackendBytes returns the configured backend quota in bytes, or the
// default. Empty or non-positive / unparseable values fall back to the default
// so a garbage override never disables the quota.
func etcdQuotaBackendBytes() int64 {
raw := strings.TrimSpace(os.Getenv(operatorEtcdQuotaBackendBytesEnv))
if raw == "" {
return defaultEtcdQuotaBackendBytes
}
parsed, err := strconv.ParseInt(raw, 10, 64)
if err != nil || parsed <= 0 {
return defaultEtcdQuotaBackendBytes
}
return parsed
}

// etcdAutoCompactionRetention returns the configured retention window, or the
// default. The value is passed verbatim to etcd, which interprets it per the
// compaction mode (a duration like "5m" for periodic, a count for revision).
func etcdAutoCompactionRetention() string {
raw := strings.TrimSpace(os.Getenv(operatorEtcdAutoCompactionRetentionEnv))
if raw == "" {
return defaultEtcdAutoCompactionRetention
}
return raw
}

// etcdAutoCompactionMode returns the configured compaction mode, restricted to
// the two values etcd accepts ("periodic", "revision"); anything else falls
// back to the default.
func etcdAutoCompactionMode() string {
switch strings.ToLower(strings.TrimSpace(os.Getenv(operatorEtcdAutoCompactionModeEnv))) {
case "periodic":
return "periodic"
case "revision":
return "revision"
default:
return defaultEtcdAutoCompactionMode
}
}
129 changes: 129 additions & 0 deletions pkg/operator/etcd_resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package operator

import (
"context"
"strings"
"testing"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -188,6 +189,134 @@ func TestSnapshotStaleAfterEnv(t *testing.T) {
}
}

// argValue returns the value of a `--flag=value` argument, or "" if the flag
// is not present in args.
func argValue(args []string, flag string) string {
prefix := flag + "="
for _, a := range args {
if strings.HasPrefix(a, prefix) {
return strings.TrimPrefix(a, prefix)
}
}
return ""
}

func TestEtcdArgsSpaceManagement(t *testing.T) {
cluster := testCluster("compaction", nil)

cases := []struct {
name string
env map[string]string
wantMode string
wantRetention string
wantQuota string
}{
{
name: "defaults",
env: nil,
wantMode: "periodic",
wantRetention: "5m",
wantQuota: "4294967296", // 4 GiB
},
{
name: "env overrides",
env: map[string]string{
operatorEtcdAutoCompactionModeEnv: "revision",
operatorEtcdAutoCompactionRetentionEnv: "10000",
operatorEtcdQuotaBackendBytesEnv: "8589934592", // 8 GiB
},
wantMode: "revision",
wantRetention: "10000",
wantQuota: "8589934592",
},
{
name: "garbage falls back to defaults",
env: map[string]string{
operatorEtcdAutoCompactionModeEnv: "bogus",
operatorEtcdAutoCompactionRetentionEnv: "", // empty -> default
operatorEtcdQuotaBackendBytesEnv: "-1",
},
wantMode: "periodic",
wantRetention: "5m",
wantQuota: "4294967296",
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
for k, v := range tc.env {
t.Setenv(k, v)
}
args := etcdArgs(cluster)
if got := argValue(args, "--auto-compaction-mode"); got != tc.wantMode {
t.Errorf("auto-compaction-mode = %q, want %q", got, tc.wantMode)
}
if got := argValue(args, "--auto-compaction-retention"); got != tc.wantRetention {
t.Errorf("auto-compaction-retention = %q, want %q", got, tc.wantRetention)
}
if got := argValue(args, "--quota-backend-bytes"); got != tc.wantQuota {
t.Errorf("quota-backend-bytes = %q, want %q", got, tc.wantQuota)
}
})
}
}

func TestEtcdMemoryModeSetsContainerMemoryLimit(t *testing.T) {
t.Setenv(operatorEtcdEndpointsEnv, "")
t.Setenv(operatorEtcdStorageMemoryEnv, "true")
cluster := testCluster("mem-guard", nil)
scheme := testScheme(t)
c := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cluster).Build()

if _, err := EnsureEtcd(context.Background(), c, scheme, cluster); err != nil {
t.Fatalf("EnsureEtcd: %v", err)
}

sts := &appsv1.StatefulSet{}
assertFound(t, c, sts, cluster.Namespace, cluster.Name+"-etcd")
if len(sts.Spec.VolumeClaimTemplates) != 0 {
t.Fatalf("memory mode should not declare a PVC template, got %d", len(sts.Spec.VolumeClaimTemplates))
}
if len(sts.Spec.Template.Spec.Containers) == 0 {
t.Fatalf("expected etcd container")
}
res := sts.Spec.Template.Spec.Containers[0].Resources
memReq, okReq := res.Requests[corev1.ResourceMemory]
memLim, okLim := res.Limits[corev1.ResourceMemory]
if !okReq || !okLim {
t.Fatalf("memory mode must set memory request and limit, got requests=%v limits=%v", res.Requests, res.Limits)
}
// request == quota (4 GiB), limit == quota + 512 MiB headroom.
if got := memReq.Value(); got != defaultEtcdQuotaBackendBytes {
t.Errorf("memory request = %d, want %d", got, defaultEtcdQuotaBackendBytes)
}
if got := memLim.Value(); got != defaultEtcdQuotaBackendBytes+(512*1024*1024) {
t.Errorf("memory limit = %d, want %d", got, defaultEtcdQuotaBackendBytes+(512*1024*1024))
}
}

func TestEtcdDiskModeHasNoContainerMemoryLimit(t *testing.T) {
t.Setenv(operatorEtcdEndpointsEnv, "")
// storage-memory mode explicitly off (the default); disk-backed PVC path.
cluster := testCluster("disk-mode", nil)
scheme := testScheme(t)
c := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cluster).Build()

if _, err := EnsureEtcd(context.Background(), c, scheme, cluster); err != nil {
t.Fatalf("EnsureEtcd: %v", err)
}

sts := &appsv1.StatefulSet{}
assertFound(t, c, sts, cluster.Namespace, cluster.Name+"-etcd")
if len(sts.Spec.VolumeClaimTemplates) == 0 {
t.Fatalf("disk mode must declare a PVC template")
}
res := sts.Spec.Template.Spec.Containers[0].Resources
if len(res.Requests) != 0 || len(res.Limits) != 0 {
t.Fatalf("disk mode must not set container resources, got requests=%v limits=%v", res.Requests, res.Limits)
}
}

func envValue(env []corev1.EnvVar, key string) string {
for _, entry := range env {
if entry.Name == key {
Expand Down
Loading