From 254cbf045eb625aaa7108742eef8935dee17860e Mon Sep 17 00:00:00 2001 From: Kasia Koziol Date: Tue, 16 Dec 2025 11:05:26 +0100 Subject: [PATCH 01/10] CSPL-4360 Secret reference added for Bus CR --- api/v4/bus_types.go | 4 + api/v4/zz_generated.deepcopy.go | 13 +- .../bases/enterprise.splunk.com_buses.yaml | 33 +++++ ...enterprise.splunk.com_indexerclusters.yaml | 34 +++++ ...nterprise.splunk.com_ingestorclusters.yaml | 34 +++++ .../templates/enterprise_v4_buses.yaml | 4 + .../01-assert.yaml | 133 +----------------- .../01-create-se-secret.yaml | 7 + .../02-assert.yaml | 111 ++++++++++++++- ...stall-setup.yaml => 02-install-setup.yaml} | 0 .../03-assert.yaml | 33 +++++ ...ingestor.yaml => 03-scaleup-ingestor.yaml} | 0 ...all-setup.yaml => 04-uninstall-setup.yaml} | 0 .../splunk_index_ingest_sep.yaml | 3 + pkg/splunk/enterprise/indexercluster.go | 32 +++-- pkg/splunk/enterprise/indexercluster_test.go | 15 +- pkg/splunk/enterprise/ingestorcluster.go | 29 +++- pkg/splunk/enterprise/ingestorcluster_test.go | 11 +- pkg/splunk/enterprise/util.go | 19 +++ .../index_and_ingestion_separation_test.go | 20 +++ test/testenv/remote_index_utils.go | 8 ++ 21 files changed, 384 insertions(+), 159 deletions(-) create mode 100644 kuttl/tests/helm/index-and-ingest-separation/01-create-se-secret.yaml rename kuttl/tests/helm/index-and-ingest-separation/{01-install-setup.yaml => 02-install-setup.yaml} (100%) create mode 100644 kuttl/tests/helm/index-and-ingest-separation/03-assert.yaml rename kuttl/tests/helm/index-and-ingest-separation/{02-scaleup-ingestor.yaml => 03-scaleup-ingestor.yaml} (100%) rename kuttl/tests/helm/index-and-ingest-separation/{03-uninstall-setup.yaml => 04-uninstall-setup.yaml} (100%) diff --git a/api/v4/bus_types.go b/api/v4/bus_types.go index 4d9cd3a42..a45be59d6 100644 --- a/api/v4/bus_types.go +++ b/api/v4/bus_types.go @@ -61,6 +61,10 @@ type SQSSpec struct { // +kubebuilder:validation:Pattern=`^https://sqs(?:-fips)?\.[a-z]+-[a-z]+(?:-[a-z]+)?-\d+\.amazonaws\.com(?:\.cn)?(?:/[A-Za-z0-9._-]+(?:/[A-Za-z0-9._-]+)*)?$` // Amazon SQS Service endpoint Endpoint string `json:"endpoint"` + + // +optional + // List of remote storage volumes + VolList []VolumeSpec `json:"volumes,omitempty"` } // BusStatus defines the observed state of Bus diff --git a/api/v4/zz_generated.deepcopy.go b/api/v4/zz_generated.deepcopy.go index dc19b7f10..eb142f146 100644 --- a/api/v4/zz_generated.deepcopy.go +++ b/api/v4/zz_generated.deepcopy.go @@ -185,7 +185,7 @@ func (in *Bus) DeepCopyInto(out *Bus) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) in.Status.DeepCopyInto(&out.Status) } @@ -234,7 +234,7 @@ func (in *BusList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BusSpec) DeepCopyInto(out *BusSpec) { *out = *in - out.SQS = in.SQS + in.SQS.DeepCopyInto(&out.SQS) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BusSpec. @@ -637,7 +637,7 @@ func (in *IndexerClusterStatus) DeepCopyInto(out *IndexerClusterStatus) { if in.Bus != nil { in, out := &in.Bus, &out.Bus *out = new(BusSpec) - **out = **in + (*in).DeepCopyInto(*out) } if in.LargeMessageStore != nil { in, out := &in.LargeMessageStore, &out.LargeMessageStore @@ -740,7 +740,7 @@ func (in *IngestorClusterStatus) DeepCopyInto(out *IngestorClusterStatus) { if in.Bus != nil { in, out := &in.Bus, &out.Bus *out = new(BusSpec) - **out = **in + (*in).DeepCopyInto(*out) } if in.LargeMessageStore != nil { in, out := &in.LargeMessageStore, &out.LargeMessageStore @@ -1104,6 +1104,11 @@ func (in *S3Spec) DeepCopy() *S3Spec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SQSSpec) DeepCopyInto(out *SQSSpec) { *out = *in + if in.VolList != nil { + in, out := &in.VolList, &out.VolList + *out = make([]VolumeSpec, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SQSSpec. diff --git a/config/crd/bases/enterprise.splunk.com_buses.yaml b/config/crd/bases/enterprise.splunk.com_buses.yaml index 54d498834..db62f351c 100644 --- a/config/crd/bases/enterprise.splunk.com_buses.yaml +++ b/config/crd/bases/enterprise.splunk.com_buses.yaml @@ -78,6 +78,39 @@ spec: description: Region of the resources pattern: ^(?:us|ap|eu|me|af|sa|ca|cn|il)(?:-[a-z]+){1,3}-\d$ type: string + volumes: + description: List of remote storage volumes + items: + description: VolumeSpec defines remote volume config + properties: + endpoint: + description: Remote volume URI + type: string + name: + description: Remote volume name + type: string + path: + description: Remote volume path + type: string + provider: + description: 'App Package Remote Store provider. Supported + values: aws, minio, azure, gcp.' + type: string + region: + description: Region of the remote storage volume where apps + reside. Used for aws, if provided. Not used for minio + and azure. + type: string + secretRef: + description: Secret object name + type: string + storageType: + description: 'Remote Storage type. Supported values: s3, + blob, gcs. s3 works with aws or minio providers, whereas + blob works with azure provider, gcs works for gcp.' + type: string + type: object + type: array required: - dlq - name diff --git a/config/crd/bases/enterprise.splunk.com_indexerclusters.yaml b/config/crd/bases/enterprise.splunk.com_indexerclusters.yaml index 67e1021f6..3389a98d5 100644 --- a/config/crd/bases/enterprise.splunk.com_indexerclusters.yaml +++ b/config/crd/bases/enterprise.splunk.com_indexerclusters.yaml @@ -8368,6 +8368,40 @@ spec: description: Region of the resources pattern: ^(?:us|ap|eu|me|af|sa|ca|cn|il)(?:-[a-z]+){1,3}-\d$ type: string + volumes: + description: List of remote storage volumes + items: + description: VolumeSpec defines remote volume config + properties: + endpoint: + description: Remote volume URI + type: string + name: + description: Remote volume name + type: string + path: + description: Remote volume path + type: string + provider: + description: 'App Package Remote Store provider. Supported + values: aws, minio, azure, gcp.' + type: string + region: + description: Region of the remote storage volume where + apps reside. Used for aws, if provided. Not used for + minio and azure. + type: string + secretRef: + description: Secret object name + type: string + storageType: + description: 'Remote Storage type. Supported values: + s3, blob, gcs. s3 works with aws or minio providers, + whereas blob works with azure provider, gcs works + for gcp.' + type: string + type: object + type: array required: - dlq - name diff --git a/config/crd/bases/enterprise.splunk.com_ingestorclusters.yaml b/config/crd/bases/enterprise.splunk.com_ingestorclusters.yaml index 4ecaa8d32..5b065baa5 100644 --- a/config/crd/bases/enterprise.splunk.com_ingestorclusters.yaml +++ b/config/crd/bases/enterprise.splunk.com_ingestorclusters.yaml @@ -4618,6 +4618,40 @@ spec: description: Region of the resources pattern: ^(?:us|ap|eu|me|af|sa|ca|cn|il)(?:-[a-z]+){1,3}-\d$ type: string + volumes: + description: List of remote storage volumes + items: + description: VolumeSpec defines remote volume config + properties: + endpoint: + description: Remote volume URI + type: string + name: + description: Remote volume name + type: string + path: + description: Remote volume path + type: string + provider: + description: 'App Package Remote Store provider. Supported + values: aws, minio, azure, gcp.' + type: string + region: + description: Region of the remote storage volume where + apps reside. Used for aws, if provided. Not used for + minio and azure. + type: string + secretRef: + description: Secret object name + type: string + storageType: + description: 'Remote Storage type. Supported values: + s3, blob, gcs. s3 works with aws or minio providers, + whereas blob works with azure provider, gcs works + for gcp.' + type: string + type: object + type: array required: - dlq - name diff --git a/helm-chart/splunk-enterprise/templates/enterprise_v4_buses.yaml b/helm-chart/splunk-enterprise/templates/enterprise_v4_buses.yaml index bbf162332..e5b881717 100644 --- a/helm-chart/splunk-enterprise/templates/enterprise_v4_buses.yaml +++ b/helm-chart/splunk-enterprise/templates/enterprise_v4_buses.yaml @@ -29,6 +29,10 @@ spec: {{- if .region }} region: {{ .region | quote }} {{- end }} + {{- if .volumes }} + volumes: + {{ toYaml . | indent 4 }} + {{- end }} {{- end }} {{- end }} {{- end }} \ No newline at end of file diff --git a/kuttl/tests/helm/index-and-ingest-separation/01-assert.yaml b/kuttl/tests/helm/index-and-ingest-separation/01-assert.yaml index f34dd2e6c..1a4e4a60a 100644 --- a/kuttl/tests/helm/index-and-ingest-separation/01-assert.yaml +++ b/kuttl/tests/helm/index-and-ingest-separation/01-assert.yaml @@ -1,136 +1,5 @@ --- -# assert for bus custom resource to be ready -apiVersion: enterprise.splunk.com/v4 -kind: Bus -metadata: - name: bus -spec: - provider: sqs - sqs: - name: sqs-test - region: us-west-2 - endpoint: https://sqs.us-west-2.amazonaws.com - dlq: sqs-dlq-test -status: - phase: Ready - ---- -# assert for large message store custom resource to be ready -apiVersion: enterprise.splunk.com/v4 -kind: LargeMessageStore -metadata: - name: lms -spec: - provider: s3 - s3: - endpoint: https://s3.us-west-2.amazonaws.com - path: s3://ingestion/smartbus-test -status: - phase: Ready - ---- -# assert for cluster manager custom resource to be ready -apiVersion: enterprise.splunk.com/v4 -kind: ClusterManager -metadata: - name: cm -status: - phase: Ready - ---- -# check if stateful sets are created -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: splunk-cm-cluster-manager -status: - replicas: 1 - ---- -# check if secret object are created -apiVersion: v1 -kind: Secret -metadata: - name: splunk-cm-cluster-manager-secret-v1 - ---- -# assert for indexer cluster custom resource to be ready -apiVersion: enterprise.splunk.com/v4 -kind: IndexerCluster -metadata: - name: indexer -spec: - replicas: 3 - busRef: - name: bus -status: - phase: Ready - bus: - provider: sqs - sqs: - name: sqs-test - region: us-west-2 - endpoint: https://sqs.us-west-2.amazonaws.com - dlq: sqs-dlq-test - largeMessageStore: - provider: s3 - s3: - endpoint: https://s3.us-west-2.amazonaws.com - path: s3://ingestion/smartbus-test - ---- -# check for stateful set and replicas as configured -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: splunk-indexer-indexer -status: - replicas: 3 - ---- -# check if secret object are created -apiVersion: v1 -kind: Secret -metadata: - name: splunk-indexer-indexer-secret-v1 - ---- -# assert for indexer cluster custom resource to be ready -apiVersion: enterprise.splunk.com/v4 -kind: IngestorCluster -metadata: - name: ingestor -spec: - replicas: 3 - busRef: - name: bus -status: - phase: Ready - bus: - provider: sqs - sqs: - name: sqs-test - region: us-west-2 - endpoint: https://sqs.us-west-2.amazonaws.com - dlq: sqs-dlq-test - largeMessageStore: - provider: s3 - s3: - endpoint: https://s3.us-west-2.amazonaws.com - path: s3://ingestion/smartbus-test - ---- -# check for stateful set and replicas as configured -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: splunk-ingestor-ingestor -status: - replicas: 3 - ---- -# check if secret object are created apiVersion: v1 kind: Secret metadata: - name: splunk-ingestor-ingestor-secret-v1 \ No newline at end of file + name: s3-secret \ No newline at end of file diff --git a/kuttl/tests/helm/index-and-ingest-separation/01-create-se-secret.yaml b/kuttl/tests/helm/index-and-ingest-separation/01-create-se-secret.yaml new file mode 100644 index 000000000..8f1b1b95f --- /dev/null +++ b/kuttl/tests/helm/index-and-ingest-separation/01-create-se-secret.yaml @@ -0,0 +1,7 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl create secret generic s3-secret --from-literal=s3_access_key=$AWS_ACCESS_KEY_ID --from-literal=s3_secret_key=$AWS_SECRET_ACCESS_KEY --namespace $NAMESPACE + background: false + skipLogOutput: true \ No newline at end of file diff --git a/kuttl/tests/helm/index-and-ingest-separation/02-assert.yaml b/kuttl/tests/helm/index-and-ingest-separation/02-assert.yaml index 291eddeba..f34dd2e6c 100644 --- a/kuttl/tests/helm/index-and-ingest-separation/02-assert.yaml +++ b/kuttl/tests/helm/index-and-ingest-separation/02-assert.yaml @@ -1,11 +1,107 @@ --- -# assert for ingestor cluster custom resource to be ready +# assert for bus custom resource to be ready +apiVersion: enterprise.splunk.com/v4 +kind: Bus +metadata: + name: bus +spec: + provider: sqs + sqs: + name: sqs-test + region: us-west-2 + endpoint: https://sqs.us-west-2.amazonaws.com + dlq: sqs-dlq-test +status: + phase: Ready + +--- +# assert for large message store custom resource to be ready +apiVersion: enterprise.splunk.com/v4 +kind: LargeMessageStore +metadata: + name: lms +spec: + provider: s3 + s3: + endpoint: https://s3.us-west-2.amazonaws.com + path: s3://ingestion/smartbus-test +status: + phase: Ready + +--- +# assert for cluster manager custom resource to be ready +apiVersion: enterprise.splunk.com/v4 +kind: ClusterManager +metadata: + name: cm +status: + phase: Ready + +--- +# check if stateful sets are created +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: splunk-cm-cluster-manager +status: + replicas: 1 + +--- +# check if secret object are created +apiVersion: v1 +kind: Secret +metadata: + name: splunk-cm-cluster-manager-secret-v1 + +--- +# assert for indexer cluster custom resource to be ready +apiVersion: enterprise.splunk.com/v4 +kind: IndexerCluster +metadata: + name: indexer +spec: + replicas: 3 + busRef: + name: bus +status: + phase: Ready + bus: + provider: sqs + sqs: + name: sqs-test + region: us-west-2 + endpoint: https://sqs.us-west-2.amazonaws.com + dlq: sqs-dlq-test + largeMessageStore: + provider: s3 + s3: + endpoint: https://s3.us-west-2.amazonaws.com + path: s3://ingestion/smartbus-test + +--- +# check for stateful set and replicas as configured +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: splunk-indexer-indexer +status: + replicas: 3 + +--- +# check if secret object are created +apiVersion: v1 +kind: Secret +metadata: + name: splunk-indexer-indexer-secret-v1 + +--- +# assert for indexer cluster custom resource to be ready apiVersion: enterprise.splunk.com/v4 kind: IngestorCluster metadata: name: ingestor spec: - replicas: 4 + replicas: 3 busRef: name: bus status: @@ -24,10 +120,17 @@ status: path: s3://ingestion/smartbus-test --- -# check for stateful sets and replicas updated +# check for stateful set and replicas as configured apiVersion: apps/v1 kind: StatefulSet metadata: name: splunk-ingestor-ingestor status: - replicas: 4 + replicas: 3 + +--- +# check if secret object are created +apiVersion: v1 +kind: Secret +metadata: + name: splunk-ingestor-ingestor-secret-v1 \ No newline at end of file diff --git a/kuttl/tests/helm/index-and-ingest-separation/01-install-setup.yaml b/kuttl/tests/helm/index-and-ingest-separation/02-install-setup.yaml similarity index 100% rename from kuttl/tests/helm/index-and-ingest-separation/01-install-setup.yaml rename to kuttl/tests/helm/index-and-ingest-separation/02-install-setup.yaml diff --git a/kuttl/tests/helm/index-and-ingest-separation/03-assert.yaml b/kuttl/tests/helm/index-and-ingest-separation/03-assert.yaml new file mode 100644 index 000000000..291eddeba --- /dev/null +++ b/kuttl/tests/helm/index-and-ingest-separation/03-assert.yaml @@ -0,0 +1,33 @@ +--- +# assert for ingestor cluster custom resource to be ready +apiVersion: enterprise.splunk.com/v4 +kind: IngestorCluster +metadata: + name: ingestor +spec: + replicas: 4 + busRef: + name: bus +status: + phase: Ready + bus: + provider: sqs + sqs: + name: sqs-test + region: us-west-2 + endpoint: https://sqs.us-west-2.amazonaws.com + dlq: sqs-dlq-test + largeMessageStore: + provider: s3 + s3: + endpoint: https://s3.us-west-2.amazonaws.com + path: s3://ingestion/smartbus-test + +--- +# check for stateful sets and replicas updated +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: splunk-ingestor-ingestor +status: + replicas: 4 diff --git a/kuttl/tests/helm/index-and-ingest-separation/02-scaleup-ingestor.yaml b/kuttl/tests/helm/index-and-ingest-separation/03-scaleup-ingestor.yaml similarity index 100% rename from kuttl/tests/helm/index-and-ingest-separation/02-scaleup-ingestor.yaml rename to kuttl/tests/helm/index-and-ingest-separation/03-scaleup-ingestor.yaml diff --git a/kuttl/tests/helm/index-and-ingest-separation/03-uninstall-setup.yaml b/kuttl/tests/helm/index-and-ingest-separation/04-uninstall-setup.yaml similarity index 100% rename from kuttl/tests/helm/index-and-ingest-separation/03-uninstall-setup.yaml rename to kuttl/tests/helm/index-and-ingest-separation/04-uninstall-setup.yaml diff --git a/kuttl/tests/helm/index-and-ingest-separation/splunk_index_ingest_sep.yaml b/kuttl/tests/helm/index-and-ingest-separation/splunk_index_ingest_sep.yaml index a73c51ac2..f75668cf1 100644 --- a/kuttl/tests/helm/index-and-ingest-separation/splunk_index_ingest_sep.yaml +++ b/kuttl/tests/helm/index-and-ingest-separation/splunk_index_ingest_sep.yaml @@ -14,6 +14,9 @@ bus: region: us-west-2 endpoint: https://sqs.us-west-2.amazonaws.com dlq: sqs-dlq-test + volumes: + - name: helm-bus-secret-ref-test + secretRef: s3-secret largeMessageStore: enabled: true diff --git a/pkg/splunk/enterprise/indexercluster.go b/pkg/splunk/enterprise/indexercluster.go index 2170e914a..88b75af70 100644 --- a/pkg/splunk/enterprise/indexercluster.go +++ b/pkg/splunk/enterprise/indexercluster.go @@ -295,9 +295,8 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller // If bus is updated if cr.Spec.BusRef.Name != "" { - if !reflect.DeepEqual(cr.Status.Bus, bus.Spec) { + if !reflect.DeepEqual(cr.Status.Bus, bus.Spec) || !reflect.DeepEqual(cr.Status.LargeMessageStore, lms.Spec) { mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient) - err = mgr.handlePullBusChange(ctx, cr, busCopy, lmsCopy, client) if err != nil { eventPublisher.Warning(ctx, "ApplyIndexerClusterManager", fmt.Sprintf("Failed to update conf file for Bus/Pipeline config change after pod creation: %s", err.Error())) @@ -618,9 +617,8 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, // If bus is updated if cr.Spec.BusRef.Name != "" { - if !reflect.DeepEqual(cr.Status.Bus, bus.Spec) { + if !reflect.DeepEqual(cr.Status.Bus, bus.Spec) || !reflect.DeepEqual(cr.Status.LargeMessageStore, lms.Spec) { mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient) - err = mgr.handlePullBusChange(ctx, cr, busCopy, lmsCopy, client) if err != nil { eventPublisher.Warning(ctx, "ApplyIndexerClusterManager", fmt.Sprintf("Failed to update conf file for Bus/Pipeline config change after pod creation: %s", err.Error())) @@ -1328,7 +1326,21 @@ func (mgr *indexerClusterPodManager) handlePullBusChange(ctx context.Context, ne afterDelete = true } - busChangedFieldsInputs, busChangedFieldsOutputs, pipelineChangedFields := getChangedBusFieldsForIndexer(&bus, &lms, newCR, afterDelete) + // Secret reference + s3AccessKey, s3SecretKey := "", "" + if bus.Spec.Provider == "sqs" { + for _, vol := range bus.Spec.SQS.VolList { + if vol.SecretRef != "" { + s3AccessKey, s3SecretKey, err = GetBusRemoteVolumeSecrets(ctx, vol, k8s, newCR) + if err != nil { + scopedLog.Error(err, "Failed to get bus remote volume secrets") + return err + } + } + } + } + + busChangedFieldsInputs, busChangedFieldsOutputs, pipelineChangedFields := getChangedBusFieldsForIndexer(&bus, &lms, newCR, afterDelete, s3AccessKey, s3SecretKey) for _, pbVal := range busChangedFieldsOutputs { if err := splunkClient.UpdateConfFile(scopedLog, "outputs", fmt.Sprintf("remote_queue:%s", bus.Spec.SQS.Name), [][]string{pbVal}); err != nil { @@ -1354,7 +1366,7 @@ func (mgr *indexerClusterPodManager) handlePullBusChange(ctx context.Context, ne } // getChangedBusFieldsForIndexer returns a list of changed bus and pipeline fields for indexer pods -func getChangedBusFieldsForIndexer(bus *enterpriseApi.Bus, lms *enterpriseApi.LargeMessageStore, busIndexerStatus *enterpriseApi.IndexerCluster, afterDelete bool) (busChangedFieldsInputs, busChangedFieldsOutputs, pipelineChangedFields [][]string) { +func getChangedBusFieldsForIndexer(bus *enterpriseApi.Bus, lms *enterpriseApi.LargeMessageStore, busIndexerStatus *enterpriseApi.IndexerCluster, afterDelete bool, s3AccessKey, s3SecretKey string) (busChangedFieldsInputs, busChangedFieldsOutputs, pipelineChangedFields [][]string) { // Compare bus fields oldPB := busIndexerStatus.Status.Bus if oldPB == nil { @@ -1369,7 +1381,7 @@ func getChangedBusFieldsForIndexer(bus *enterpriseApi.Bus, lms *enterpriseApi.La newLMS := lms.Spec // Push all bus fields - busChangedFieldsInputs, busChangedFieldsOutputs = pullBusChanged(oldPB, &newPB, oldLMS, &newLMS, afterDelete) + busChangedFieldsInputs, busChangedFieldsOutputs = pullBusChanged(oldPB, &newPB, oldLMS, &newLMS, afterDelete, s3AccessKey, s3SecretKey) // Always set all pipeline fields, not just changed ones pipelineChangedFields = pipelineConfig(true) @@ -1387,7 +1399,7 @@ func imageUpdatedTo9(previousImage string, currentImage string) bool { return strings.HasPrefix(previousVersion, "8") && strings.HasPrefix(currentVersion, "9") } -func pullBusChanged(oldBus, newBus *enterpriseApi.BusSpec, oldLMS, newLMS *enterpriseApi.LargeMessageStoreSpec, afterDelete bool) (inputs, outputs [][]string) { +func pullBusChanged(oldBus, newBus *enterpriseApi.BusSpec, oldLMS, newLMS *enterpriseApi.LargeMessageStoreSpec, afterDelete bool, s3AccessKey, s3SecretKey string) (inputs, outputs [][]string) { busProvider := "" if newBus.Provider == "sqs" { busProvider = "sqs_smartbus" @@ -1400,6 +1412,10 @@ func pullBusChanged(oldBus, newBus *enterpriseApi.BusSpec, oldLMS, newLMS *enter if oldBus.Provider != newBus.Provider || afterDelete { inputs = append(inputs, []string{"remote_queue.type", busProvider}) } + if !reflect.DeepEqual(oldBus.SQS.VolList, newBus.SQS.VolList) || afterDelete { + inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.access_key", busProvider), s3AccessKey}) + inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.secret_key", busProvider), s3SecretKey}) + } if oldBus.SQS.Region != newBus.SQS.Region || afterDelete { inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.auth_region", busProvider), newBus.SQS.Region}) } diff --git a/pkg/splunk/enterprise/indexercluster_test.go b/pkg/splunk/enterprise/indexercluster_test.go index ff10e453d..da3f1dfe2 100644 --- a/pkg/splunk/enterprise/indexercluster_test.go +++ b/pkg/splunk/enterprise/indexercluster_test.go @@ -2063,6 +2063,9 @@ func TestGetChangedBusFieldsForIndexer(t *testing.T) { Region: "us-west-2", Endpoint: "https://sqs.us-west-2.amazonaws.com", DLQ: "sqs-dlq-test", + VolList: []enterpriseApi.VolumeSpec{ + {SecretRef: "secret"}, + }, }, }, } @@ -2095,10 +2098,14 @@ func TestGetChangedBusFieldsForIndexer(t *testing.T) { }, } - busChangedFieldsInputs, busChangedFieldsOutputs, pipelineChangedFields := getChangedBusFieldsForIndexer(&bus, &lms, newCR, false) - assert.Equal(t, 8, len(busChangedFieldsInputs)) + key := "key" + secret := "secret" + busChangedFieldsInputs, busChangedFieldsOutputs, pipelineChangedFields := getChangedBusFieldsForIndexer(&bus, &lms, newCR, false, key, secret) + assert.Equal(t, 10, len(busChangedFieldsInputs)) assert.Equal(t, [][]string{ {"remote_queue.type", provider}, + {fmt.Sprintf("remote_queue.%s.access_key", provider), key}, + {fmt.Sprintf("remote_queue.%s.secret_key", provider), secret}, {fmt.Sprintf("remote_queue.%s.auth_region", provider), bus.Spec.SQS.Region}, {fmt.Sprintf("remote_queue.%s.endpoint", provider), bus.Spec.SQS.Endpoint}, {fmt.Sprintf("remote_queue.%s.large_message_store.endpoint", provider), lms.Spec.S3.Endpoint}, @@ -2108,9 +2115,11 @@ func TestGetChangedBusFieldsForIndexer(t *testing.T) { {fmt.Sprintf("remote_queue.%s.retry_policy", provider), "max_count"}, }, busChangedFieldsInputs) - assert.Equal(t, 10, len(busChangedFieldsOutputs)) + assert.Equal(t, 12, len(busChangedFieldsOutputs)) assert.Equal(t, [][]string{ {"remote_queue.type", provider}, + {fmt.Sprintf("remote_queue.%s.access_key", provider), key}, + {fmt.Sprintf("remote_queue.%s.secret_key", provider), secret}, {fmt.Sprintf("remote_queue.%s.auth_region", provider), bus.Spec.SQS.Region}, {fmt.Sprintf("remote_queue.%s.endpoint", provider), bus.Spec.SQS.Endpoint}, {fmt.Sprintf("remote_queue.%s.large_message_store.endpoint", provider), lms.Spec.S3.Endpoint}, diff --git a/pkg/splunk/enterprise/ingestorcluster.go b/pkg/splunk/enterprise/ingestorcluster.go index 524f183b5..5582166b9 100644 --- a/pkg/splunk/enterprise/ingestorcluster.go +++ b/pkg/splunk/enterprise/ingestorcluster.go @@ -259,9 +259,8 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr } // If bus is updated - if !reflect.DeepEqual(cr.Status.Bus, bus.Spec) { + if !reflect.DeepEqual(cr.Status.Bus, bus.Spec) || !reflect.DeepEqual(cr.Status.LargeMessageStore, lms.Spec) { mgr := newIngestorClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient) - err = mgr.handlePushBusChange(ctx, cr, busCopy, lmsCopy, client) if err != nil { eventPublisher.Warning(ctx, "ApplyIngestorCluster", fmt.Sprintf("Failed to update conf file for Bus/Pipeline config change after pod creation: %s", err.Error())) @@ -370,7 +369,21 @@ func (mgr *ingestorClusterPodManager) handlePushBusChange(ctx context.Context, n afterDelete = true } - busChangedFields, pipelineChangedFields := getChangedBusFieldsForIngestor(&bus, &lms, newCR, afterDelete) + // Secret reference + s3AccessKey, s3SecretKey := "", "" + if bus.Spec.Provider == "sqs" { + for _, vol := range bus.Spec.SQS.VolList { + if vol.SecretRef != "" { + s3AccessKey, s3SecretKey, err = GetBusRemoteVolumeSecrets(ctx, vol, k8s, newCR) + if err != nil { + scopedLog.Error(err, "Failed to get bus remote volume secrets") + return err + } + } + } + } + + busChangedFields, pipelineChangedFields := getChangedBusFieldsForIngestor(&bus, &lms, newCR, afterDelete, s3AccessKey, s3SecretKey) for _, pbVal := range busChangedFields { if err := splunkClient.UpdateConfFile(scopedLog, "outputs", fmt.Sprintf("remote_queue:%s", bus.Spec.SQS.Name), [][]string{pbVal}); err != nil { @@ -390,7 +403,7 @@ func (mgr *ingestorClusterPodManager) handlePushBusChange(ctx context.Context, n } // getChangedBusFieldsForIngestor returns a list of changed bus and pipeline fields for ingestor pods -func getChangedBusFieldsForIngestor(bus *enterpriseApi.Bus, lms *enterpriseApi.LargeMessageStore, busIngestorStatus *enterpriseApi.IngestorCluster, afterDelete bool) (busChangedFields, pipelineChangedFields [][]string) { +func getChangedBusFieldsForIngestor(bus *enterpriseApi.Bus, lms *enterpriseApi.LargeMessageStore, busIngestorStatus *enterpriseApi.IngestorCluster, afterDelete bool, s3AccessKey, s3SecretKey string) (busChangedFields, pipelineChangedFields [][]string) { oldPB := busIngestorStatus.Status.Bus if oldPB == nil { oldPB = &enterpriseApi.BusSpec{} @@ -404,7 +417,7 @@ func getChangedBusFieldsForIngestor(bus *enterpriseApi.Bus, lms *enterpriseApi.L newLMS := &lms.Spec // Push changed bus fields - busChangedFields = pushBusChanged(oldPB, newPB, oldLMS, newLMS, afterDelete) + busChangedFields = pushBusChanged(oldPB, newPB, oldLMS, newLMS, afterDelete, s3AccessKey, s3SecretKey) // Always changed pipeline fields pipelineChangedFields = pipelineConfig(false) @@ -443,7 +456,7 @@ func pipelineConfig(isIndexer bool) (output [][]string) { return output } -func pushBusChanged(oldBus, newBus *enterpriseApi.BusSpec, oldLMS, newLMS *enterpriseApi.LargeMessageStoreSpec, afterDelete bool) (output [][]string) { +func pushBusChanged(oldBus, newBus *enterpriseApi.BusSpec, oldLMS, newLMS *enterpriseApi.LargeMessageStoreSpec, afterDelete bool, s3AccessKey, s3SecretKey string) (output [][]string) { busProvider := "" if newBus.Provider == "sqs" { busProvider = "sqs_smartbus" @@ -456,6 +469,10 @@ func pushBusChanged(oldBus, newBus *enterpriseApi.BusSpec, oldLMS, newLMS *enter if oldBus.Provider != newBus.Provider || afterDelete { output = append(output, []string{"remote_queue.type", busProvider}) } + if !reflect.DeepEqual(oldBus.SQS.VolList, newBus.SQS.VolList) || afterDelete { + output = append(output, []string{fmt.Sprintf("remote_queue.%s.access_key", busProvider), s3AccessKey}) + output = append(output, []string{fmt.Sprintf("remote_queue.%s.secret_key", busProvider), s3SecretKey}) + } if oldBus.SQS.Region != newBus.SQS.Region || afterDelete { output = append(output, []string{fmt.Sprintf("remote_queue.%s.auth_region", busProvider), newBus.SQS.Region}) } diff --git a/pkg/splunk/enterprise/ingestorcluster_test.go b/pkg/splunk/enterprise/ingestorcluster_test.go index 75cc14ec5..6136b3f2f 100644 --- a/pkg/splunk/enterprise/ingestorcluster_test.go +++ b/pkg/splunk/enterprise/ingestorcluster_test.go @@ -434,6 +434,9 @@ func TestGetChangedBusFieldsForIngestor(t *testing.T) { Region: "us-west-2", Endpoint: "https://sqs.us-west-2.amazonaws.com", DLQ: "sqs-dlq-test", + VolList: []enterpriseApi.VolumeSpec{ + {SecretRef: "secret"}, + }, }, }, } @@ -467,11 +470,15 @@ func TestGetChangedBusFieldsForIngestor(t *testing.T) { Status: enterpriseApi.IngestorClusterStatus{}, } - busChangedFields, pipelineChangedFields := getChangedBusFieldsForIngestor(&bus, &lms, newCR, false) + key := "key" + secret := "secret" + busChangedFields, pipelineChangedFields := getChangedBusFieldsForIngestor(&bus, &lms, newCR, false, key, secret) - assert.Equal(t, 10, len(busChangedFields)) + assert.Equal(t, 12, len(busChangedFields)) assert.Equal(t, [][]string{ {"remote_queue.type", provider}, + {fmt.Sprintf("remote_queue.%s.access_key", provider), key}, + {fmt.Sprintf("remote_queue.%s.secret_key", provider), secret}, {fmt.Sprintf("remote_queue.%s.auth_region", provider), bus.Spec.SQS.Region}, {fmt.Sprintf("remote_queue.%s.endpoint", provider), bus.Spec.SQS.Endpoint}, {fmt.Sprintf("remote_queue.%s.large_message_store.endpoint", provider), lms.Spec.S3.Endpoint}, diff --git a/pkg/splunk/enterprise/util.go b/pkg/splunk/enterprise/util.go index e8f0736b3..c68b2ca71 100644 --- a/pkg/splunk/enterprise/util.go +++ b/pkg/splunk/enterprise/util.go @@ -417,6 +417,25 @@ func GetSmartstoreRemoteVolumeSecrets(ctx context.Context, volume enterpriseApi. return accessKey, secretKey, namespaceScopedSecret.ResourceVersion, nil } +// GetBusRemoteVolumeSecrets is used to retrieve access key and secrete key for Index & Ingestion separation +func GetBusRemoteVolumeSecrets(ctx context.Context, volume enterpriseApi.VolumeSpec, client splcommon.ControllerClient, cr splcommon.MetaObject) (string, string, error) { + namespaceScopedSecret, err := splutil.GetSecretByName(ctx, client, cr.GetNamespace(), cr.GetName(), volume.SecretRef) + if err != nil { + return "", "", err + } + + accessKey := string(namespaceScopedSecret.Data[s3AccessKey]) + secretKey := string(namespaceScopedSecret.Data[s3SecretKey]) + + if accessKey == "" { + return "", "", errors.New("access Key is missing") + } else if secretKey == "" { + return "", "", errors.New("secret Key is missing") + } + + return accessKey, secretKey, nil +} + // getLocalAppFileName generates the local app file name // For e.g., if the app package name is sample_app.tgz // and etag is "abcd1234", then it will be downloaded locally as sample_app.tgz_abcd1234 diff --git a/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go b/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go index 1b3d27c70..6868dd168 100644 --- a/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go +++ b/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go @@ -79,6 +79,11 @@ var _ = Describe("indingsep test", func() { testcaseEnvInst.Log.Info("Create Service Account") testcaseEnvInst.CreateServiceAccount(serviceAccountName) + // TODO: Remove secret reference once IRSA fixed for Splunk and EKS 1.34+ + // Secret reference + volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateBusVolumeSpec("bus-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} + bus.SQS.VolList = volumeSpec + // Deploy Bus testcaseEnvInst.Log.Info("Deploy Bus") b, err := deployment.DeployBus(ctx, "bus", bus) @@ -152,6 +157,11 @@ var _ = Describe("indingsep test", func() { testcaseEnvInst.Log.Info("Create Service Account") testcaseEnvInst.CreateServiceAccount(serviceAccountName) + // TODO: Remove secret reference once IRSA fixed for Splunk and EKS 1.34+ + // Secret reference + volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateBusVolumeSpec("bus-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} + bus.SQS.VolList = volumeSpec + // Deploy Bus testcaseEnvInst.Log.Info("Deploy Bus") bc, err := deployment.DeployBus(ctx, "bus", bus) @@ -256,6 +266,11 @@ var _ = Describe("indingsep test", func() { testcaseEnvInst.Log.Info("Create Service Account") testcaseEnvInst.CreateServiceAccount(serviceAccountName) + // TODO: Remove secret reference once IRSA fixed for Splunk and EKS 1.34+ + // Secret reference + volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateBusVolumeSpec("bus-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} + bus.SQS.VolList = volumeSpec + // Deploy Bus testcaseEnvInst.Log.Info("Deploy Bus") bc, err := deployment.DeployBus(ctx, "bus", bus) @@ -363,6 +378,11 @@ var _ = Describe("indingsep test", func() { testcaseEnvInst.Log.Info("Create Service Account") testcaseEnvInst.CreateServiceAccount(serviceAccountName) + // TODO: Remove secret reference once IRSA fixed for Splunk and EKS 1.34+ + // Secret reference + volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateBusVolumeSpec("bus-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} + bus.SQS.VolList = volumeSpec + // Deploy Bus testcaseEnvInst.Log.Info("Deploy Bus") bc, err := deployment.DeployBus(ctx, "bus", bus) diff --git a/test/testenv/remote_index_utils.go b/test/testenv/remote_index_utils.go index 0eb2b485c..84e5c0709 100644 --- a/test/testenv/remote_index_utils.go +++ b/test/testenv/remote_index_utils.go @@ -86,6 +86,14 @@ func RollHotToWarm(ctx context.Context, deployment *Deployment, podName string, return true } +// GeneratBusVolumeSpec return VolumeSpec struct with given values +func GenerateBusVolumeSpec(name, secretRef string) enterpriseApi.VolumeSpec { + return enterpriseApi.VolumeSpec{ + Name: name, + SecretRef: secretRef, + } +} + // GenerateIndexVolumeSpec return VolumeSpec struct with given values func GenerateIndexVolumeSpec(volumeName string, endpoint string, secretRef string, provider string, storageType string, region string) enterpriseApi.VolumeSpec { return enterpriseApi.VolumeSpec{ From f992c40483f60cb3426d2639a5e84c71973ca2e6 Mon Sep 17 00:00:00 2001 From: Kasia Koziol Date: Tue, 16 Dec 2025 12:44:42 +0100 Subject: [PATCH 02/10] CSPL-4360 Fix failing tests --- .../templates/enterprise_v4_indexercluster.yaml | 1 + .../index_and_ingestion_separation_test.go | 1 + 2 files changed, 2 insertions(+) diff --git a/helm-chart/splunk-enterprise/templates/enterprise_v4_indexercluster.yaml b/helm-chart/splunk-enterprise/templates/enterprise_v4_indexercluster.yaml index 0e6a96673..62497d0e6 100644 --- a/helm-chart/splunk-enterprise/templates/enterprise_v4_indexercluster.yaml +++ b/helm-chart/splunk-enterprise/templates/enterprise_v4_indexercluster.yaml @@ -169,6 +169,7 @@ items: {{- if .namespace }} namespace: {{ .namespace }} {{- end }} + {{- end }} {{- with $.Values.indexerCluster.largeMessageStoreRef }} largeMessageStoreRef: name: {{ .name }} diff --git a/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go b/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go index 6868dd168..17ab5903b 100644 --- a/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go +++ b/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go @@ -382,6 +382,7 @@ var _ = Describe("indingsep test", func() { // Secret reference volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateBusVolumeSpec("bus-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} bus.SQS.VolList = volumeSpec + updateBus.SQS.VolList = volumeSpec // Deploy Bus testcaseEnvInst.Log.Info("Deploy Bus") From 143dbe0917e34256ce09fbf51a060e15e3f13f19 Mon Sep 17 00:00:00 2001 From: Kasia Koziol Date: Tue, 16 Dec 2025 12:54:28 +0100 Subject: [PATCH 03/10] CSPL-4360 Add Splunk restart --- docs/IndexIngestionSeparation.md | 18 ++++++++-- pkg/splunk/enterprise/indexercluster.go | 18 ++++++++++ pkg/splunk/enterprise/ingestorcluster.go | 36 +++++++++++++++++-- pkg/splunk/enterprise/ingestorcluster_test.go | 12 +++---- pkg/splunk/enterprise/util_test.go | 5 +++ .../index_and_ingestion_separation_test.go | 8 ----- 6 files changed, 77 insertions(+), 20 deletions(-) diff --git a/docs/IndexIngestionSeparation.md b/docs/IndexIngestionSeparation.md index e8c6211d7..195338c7d 100644 --- a/docs/IndexIngestionSeparation.md +++ b/docs/IndexIngestionSeparation.md @@ -1,3 +1,9 @@ +--- +title: Index and Ingestion Separation +parent: Deploy & Configure +nav_order: 6 +--- + # Background Separation between ingestion and indexing services within Splunk Operator for Kubernetes enables the operator to independently manage the ingestion service while maintaining seamless integration with the indexing service. @@ -10,7 +16,7 @@ This separation enables: # Important Note > [!WARNING] -> **As of now, only brand new deployments are supported for Index and Ingestion Separation. No migration path is implemented, described or tested for existing deployments to move from a standard model to Index & Ingestion separation model.** +> **For customers deploying SmartBus on CMP, the Splunk Operator for Kubernetes (SOK) manages the configuration and lifecycle of the ingestor tier. The following SOK guide provides implementation details for setting up ingestion separation and integrating with existing indexers. This reference is primarily intended for CMP users leveraging SOK-managed ingestors.** # Document Variables @@ -38,7 +44,7 @@ SQS message bus inputs can be found in the table below. | endpoint | string | [Optional, if not provided formed based on region] AWS SQS Service endpoint | dlq | string | [Required] Name of the dead letter queue | -Change of any of the bus inputs triggers the restart of Splunk so that appropriate .conf files are correctly refreshed and consumed. +**First provisioning or update of any of the bus inputs requires Ingestor Cluster and Indexer Cluster Splunkd restart, but this restart is implemented automatically and done by SOK.** ## Example ``` @@ -425,6 +431,14 @@ In the following example, the dashboard presents ingestion and indexing data in - [kube-prometheus-stack](https://github.com/prometheus-community/helm-charts/tree/main/charts/kube-prometheus-stack) +# App Installation for Ingestor Cluster Instances + +Application installation is supported for Ingestor Cluster instances. However, as of now, applications are installed using local scope and if any application requires Splunk restart, there is no automated way to detect it and trigger automatically via Splunk Operator. + +Therefore, to be able to enforce Splunk restart for each of the Ingestor Cluster pods, it is recommended to add/update IngestorCluster CR annotations/labels and apply the new configuration which will trigger the rolling restart of Splunk pods for Ingestor Cluster. + +We are under the investigation on how to make it fully automated. What is more, ideally, update of annotations and labels should not trigger pod restart at all and we are investigating on how to fix this behaviour eventually. + # Example 1. Install CRDs and Splunk Operator for Kubernetes. diff --git a/pkg/splunk/enterprise/indexercluster.go b/pkg/splunk/enterprise/indexercluster.go index 88b75af70..d22b7008e 100644 --- a/pkg/splunk/enterprise/indexercluster.go +++ b/pkg/splunk/enterprise/indexercluster.go @@ -305,6 +305,15 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller } cr.Status.Bus = &bus.Spec + + for i := int32(0); i < cr.Spec.Replicas; i++ { + idxcClient := mgr.getClient(ctx, i) + err = idxcClient.RestartSplunk() + if err != nil { + return result, err + } + scopedLog.Info("Restarted splunk", "indexer", i) + } } } @@ -627,6 +636,15 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, } cr.Status.Bus = &bus.Spec + + for i := int32(0); i < cr.Spec.Replicas; i++ { + idxcClient := mgr.getClient(ctx, i) + err = idxcClient.RestartSplunk() + if err != nil { + return result, err + } + scopedLog.Info("Restarted splunk", "indexer", i) + } } } diff --git a/pkg/splunk/enterprise/ingestorcluster.go b/pkg/splunk/enterprise/ingestorcluster.go index 5582166b9..94d51a8f7 100644 --- a/pkg/splunk/enterprise/ingestorcluster.go +++ b/pkg/splunk/enterprise/ingestorcluster.go @@ -260,7 +260,7 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr // If bus is updated if !reflect.DeepEqual(cr.Status.Bus, bus.Spec) || !reflect.DeepEqual(cr.Status.LargeMessageStore, lms.Spec) { - mgr := newIngestorClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient) + mgr := newIngestorClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) err = mgr.handlePushBusChange(ctx, cr, busCopy, lmsCopy, client) if err != nil { eventPublisher.Warning(ctx, "ApplyIngestorCluster", fmt.Sprintf("Failed to update conf file for Bus/Pipeline config change after pod creation: %s", err.Error())) @@ -269,6 +269,15 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr } cr.Status.Bus = &bus.Spec + + for i := int32(0); i < cr.Spec.Replicas; i++ { + ingClient := mgr.getClient(ctx, i) + err = ingClient.RestartSplunk() + if err != nil { + return result, err + } + scopedLog.Info("Restarted splunk", "ingestor", i) + } } // Upgrade fron automated MC to MC CRD @@ -311,6 +320,27 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr return result, nil } +// getClient for ingestorClusterPodManager returns a SplunkClient for the member n +func (mgr *ingestorClusterPodManager) getClient(ctx context.Context, n int32) *splclient.SplunkClient { + reqLogger := log.FromContext(ctx) + scopedLog := reqLogger.WithName("ingestorClusterPodManager.getClient").WithValues("name", mgr.cr.GetName(), "namespace", mgr.cr.GetNamespace()) + + // Get Pod Name + memberName := GetSplunkStatefulsetPodName(SplunkIngestor, mgr.cr.GetName(), n) + + // Get Fully Qualified Domain Name + fqdnName := splcommon.GetServiceFQDN(mgr.cr.GetNamespace(), + fmt.Sprintf("%s.%s", memberName, GetSplunkServiceName(SplunkIngestor, mgr.cr.GetName(), true))) + + // Retrieve admin password from Pod + adminPwd, err := splutil.GetSpecificSecretTokenFromPod(ctx, mgr.c, memberName, mgr.cr.GetNamespace(), "password") + if err != nil { + scopedLog.Error(err, "Couldn't retrieve the admin password from pod") + } + + return mgr.newSplunkClient(fmt.Sprintf("https://%s:8089", fqdnName), "admin", adminPwd) +} + // validateIngestorClusterSpec checks validity and makes default updates to a IngestorClusterSpec and returns error if something is wrong func validateIngestorClusterSpec(ctx context.Context, c splcommon.ControllerClient, cr *enterpriseApi.IngestorCluster) error { // We cannot have 0 replicas in IngestorCluster spec since this refers to number of ingestion pods in an ingestor cluster @@ -426,6 +456,7 @@ func getChangedBusFieldsForIngestor(bus *enterpriseApi.Bus, lms *enterpriseApi.L } type ingestorClusterPodManager struct { + c splcommon.ControllerClient log logr.Logger cr *enterpriseApi.IngestorCluster secrets *corev1.Secret @@ -433,12 +464,13 @@ type ingestorClusterPodManager struct { } // newIngestorClusterPodManager function to create pod manager this is added to write unit test case -var newIngestorClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc) ingestorClusterPodManager { +var newIngestorClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc, c splcommon.ControllerClient) ingestorClusterPodManager { return ingestorClusterPodManager{ log: log, cr: cr, secrets: secret, newSplunkClient: newSplunkClient, + c: c, } } diff --git a/pkg/splunk/enterprise/ingestorcluster_test.go b/pkg/splunk/enterprise/ingestorcluster_test.go index 6136b3f2f..a72179453 100644 --- a/pkg/splunk/enterprise/ingestorcluster_test.go +++ b/pkg/splunk/enterprise/ingestorcluster_test.go @@ -25,15 +25,14 @@ import ( "github.com/go-logr/logr" enterpriseApi "github.com/splunk/splunk-operator/api/v4" splclient "github.com/splunk/splunk-operator/pkg/splunk/client" + splcommon "github.com/splunk/splunk-operator/pkg/splunk/common" spltest "github.com/splunk/splunk-operator/pkg/splunk/test" splutil "github.com/splunk/splunk-operator/pkg/splunk/util" "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client/fake" ) func init() { @@ -56,11 +55,7 @@ func TestApplyIngestorCluster(t *testing.T) { ctx := context.TODO() - scheme := runtime.NewScheme() - _ = enterpriseApi.AddToScheme(scheme) - _ = corev1.AddToScheme(scheme) - _ = appsv1.AddToScheme(scheme) - c := fake.NewClientBuilder().WithScheme(scheme).Build() + c := spltest.NewMockClient() // Object definitions provider := "sqs_smartbus" @@ -273,8 +268,9 @@ func TestApplyIngestorCluster(t *testing.T) { // outputs.conf origNew := newIngestorClusterPodManager mockHTTPClient := &spltest.MockHTTPClient{} - newIngestorClusterPodManager = func(l logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, _ NewSplunkClientFunc) ingestorClusterPodManager { + newIngestorClusterPodManager = func(l logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, _ NewSplunkClientFunc, c splcommon.ControllerClient) ingestorClusterPodManager { return ingestorClusterPodManager{ + c: c, log: l, cr: cr, secrets: secret, newSplunkClient: func(uri, user, pass string) *splclient.SplunkClient { return &splclient.SplunkClient{ManagementURI: uri, Username: user, Password: pass, Client: mockHTTPClient} diff --git a/pkg/splunk/enterprise/util_test.go b/pkg/splunk/enterprise/util_test.go index f5405b2cf..6ea7b021e 100644 --- a/pkg/splunk/enterprise/util_test.go +++ b/pkg/splunk/enterprise/util_test.go @@ -2624,6 +2624,9 @@ func TestUpdateCRStatus(t *testing.T) { WithStatusSubresource(&enterpriseApi.Standalone{}). WithStatusSubresource(&enterpriseApi.MonitoringConsole{}). WithStatusSubresource(&enterpriseApi.IndexerCluster{}). + WithStatusSubresource(&enterpriseApi.Bus{}). + WithStatusSubresource(&enterpriseApi.LargeMessageStore{}). + WithStatusSubresource(&enterpriseApi.IngestorCluster{}). WithStatusSubresource(&enterpriseApi.SearchHeadCluster{}) c := builder.Build() ctx := context.TODO() @@ -3304,6 +3307,8 @@ func TestGetCurrentImage(t *testing.T) { WithStatusSubresource(&enterpriseApi.MonitoringConsole{}). WithStatusSubresource(&enterpriseApi.IndexerCluster{}). WithStatusSubresource(&enterpriseApi.SearchHeadCluster{}). + WithStatusSubresource(&enterpriseApi.Bus{}). + WithStatusSubresource(&enterpriseApi.LargeMessageStore{}). WithStatusSubresource(&enterpriseApi.IngestorCluster{}) client := builder.Build() client.Create(ctx, ¤t) diff --git a/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go b/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go index 17ab5903b..a21146e11 100644 --- a/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go +++ b/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go @@ -433,14 +433,6 @@ var _ = Describe("indingsep test", func() { err = deployment.UpdateCR(ctx, bus) Expect(err).To(Succeed(), "Unable to deploy Bus with updated CR") - // Ensure that Ingestor Cluster has not been restarted - testcaseEnvInst.Log.Info("Ensure that Ingestor Cluster has not been restarted") - testenv.IngestorReady(ctx, deployment, testcaseEnvInst) - - // Ensure that Indexer Cluster has not been restarted - testcaseEnvInst.Log.Info("Ensure that Indexer Cluster has not been restarted") - testenv.SingleSiteIndexersReady(ctx, deployment, testcaseEnvInst) - // Get instance of current Ingestor Cluster CR with latest config testcaseEnvInst.Log.Info("Get instance of current Ingestor Cluster CR with latest config") ingest := &enterpriseApi.IngestorCluster{} From 3c7b2d7c2ae00e126903579ab7d797ab853e4c6f Mon Sep 17 00:00:00 2001 From: Kasia Koziol Date: Tue, 16 Dec 2025 16:05:02 +0100 Subject: [PATCH 04/10] CSPL-4360 Fix failing tests --- pkg/splunk/enterprise/indexercluster.go | 8 ++-- pkg/splunk/enterprise/indexercluster_test.go | 27 ++++++++++- pkg/splunk/enterprise/ingestorcluster.go | 8 ++-- pkg/splunk/enterprise/ingestorcluster_test.go | 47 ++++++++----------- 4 files changed, 54 insertions(+), 36 deletions(-) diff --git a/pkg/splunk/enterprise/indexercluster.go b/pkg/splunk/enterprise/indexercluster.go index d22b7008e..a5ebdbaa1 100644 --- a/pkg/splunk/enterprise/indexercluster.go +++ b/pkg/splunk/enterprise/indexercluster.go @@ -1346,7 +1346,7 @@ func (mgr *indexerClusterPodManager) handlePullBusChange(ctx context.Context, ne // Secret reference s3AccessKey, s3SecretKey := "", "" - if bus.Spec.Provider == "sqs" { + if bus.Spec.Provider == "sqs" && newCR.Spec.ServiceAccount == "" { for _, vol := range bus.Spec.SQS.VolList { if vol.SecretRef != "" { s3AccessKey, s3SecretKey, err = GetBusRemoteVolumeSecrets(ctx, vol, k8s, newCR) @@ -1431,8 +1431,10 @@ func pullBusChanged(oldBus, newBus *enterpriseApi.BusSpec, oldLMS, newLMS *enter inputs = append(inputs, []string{"remote_queue.type", busProvider}) } if !reflect.DeepEqual(oldBus.SQS.VolList, newBus.SQS.VolList) || afterDelete { - inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.access_key", busProvider), s3AccessKey}) - inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.secret_key", busProvider), s3SecretKey}) + if s3AccessKey != "" && s3SecretKey != "" { + inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.access_key", busProvider), s3AccessKey}) + inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.secret_key", busProvider), s3SecretKey}) + } } if oldBus.SQS.Region != newBus.SQS.Region || afterDelete { inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.auth_region", busProvider), newBus.SQS.Region}) diff --git a/pkg/splunk/enterprise/indexercluster_test.go b/pkg/splunk/enterprise/indexercluster_test.go index da3f1dfe2..00f20656f 100644 --- a/pkg/splunk/enterprise/indexercluster_test.go +++ b/pkg/splunk/enterprise/indexercluster_test.go @@ -2404,7 +2404,7 @@ func TestApplyIndexerClusterManager_Bus_Success(t *testing.T) { c := fake.NewClientBuilder().WithScheme(scheme).Build() // Object definitions - bus := enterpriseApi.Bus{ + bus := &enterpriseApi.Bus{ TypeMeta: metav1.TypeMeta{ Kind: "Bus", APIVersion: "enterprise.splunk.com/v4", @@ -2423,7 +2423,26 @@ func TestApplyIndexerClusterManager_Bus_Success(t *testing.T) { }, }, } - c.Create(ctx, &bus) + c.Create(ctx, bus) + + lms := &enterpriseApi.LargeMessageStore{ + TypeMeta: metav1.TypeMeta{ + Kind: "LargeMessageStore", + APIVersion: "enterprise.splunk.com/v4", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "lms", + Namespace: "test", + }, + Spec: enterpriseApi.LargeMessageStoreSpec{ + Provider: "s3", + S3: enterpriseApi.S3Spec{ + Endpoint: "https://s3.us-west-2.amazonaws.com", + Path: "s3://bucket/key", + }, + }, + } + c.Create(ctx, lms) cm := &enterpriseApi.ClusterManager{ TypeMeta: metav1.TypeMeta{Kind: "ClusterManager"}, @@ -2449,6 +2468,10 @@ func TestApplyIndexerClusterManager_Bus_Success(t *testing.T) { Name: bus.Name, Namespace: bus.Namespace, }, + LargeMessageStoreRef: corev1.ObjectReference{ + Name: lms.Name, + Namespace: lms.Namespace, + }, CommonSplunkSpec: enterpriseApi.CommonSplunkSpec{ ClusterManagerRef: corev1.ObjectReference{ Name: "cm", diff --git a/pkg/splunk/enterprise/ingestorcluster.go b/pkg/splunk/enterprise/ingestorcluster.go index 94d51a8f7..90c067494 100644 --- a/pkg/splunk/enterprise/ingestorcluster.go +++ b/pkg/splunk/enterprise/ingestorcluster.go @@ -401,7 +401,7 @@ func (mgr *ingestorClusterPodManager) handlePushBusChange(ctx context.Context, n // Secret reference s3AccessKey, s3SecretKey := "", "" - if bus.Spec.Provider == "sqs" { + if bus.Spec.Provider == "sqs" && newCR.Spec.ServiceAccount == "" { for _, vol := range bus.Spec.SQS.VolList { if vol.SecretRef != "" { s3AccessKey, s3SecretKey, err = GetBusRemoteVolumeSecrets(ctx, vol, k8s, newCR) @@ -502,8 +502,10 @@ func pushBusChanged(oldBus, newBus *enterpriseApi.BusSpec, oldLMS, newLMS *enter output = append(output, []string{"remote_queue.type", busProvider}) } if !reflect.DeepEqual(oldBus.SQS.VolList, newBus.SQS.VolList) || afterDelete { - output = append(output, []string{fmt.Sprintf("remote_queue.%s.access_key", busProvider), s3AccessKey}) - output = append(output, []string{fmt.Sprintf("remote_queue.%s.secret_key", busProvider), s3SecretKey}) + if s3AccessKey != "" && s3SecretKey != "" { + output = append(output, []string{fmt.Sprintf("remote_queue.%s.access_key", busProvider), s3AccessKey}) + output = append(output, []string{fmt.Sprintf("remote_queue.%s.secret_key", busProvider), s3SecretKey}) + } } if oldBus.SQS.Region != newBus.SQS.Region || afterDelete { output = append(output, []string{fmt.Sprintf("remote_queue.%s.auth_region", busProvider), newBus.SQS.Region}) diff --git a/pkg/splunk/enterprise/ingestorcluster_test.go b/pkg/splunk/enterprise/ingestorcluster_test.go index a72179453..0f5fae8fa 100644 --- a/pkg/splunk/enterprise/ingestorcluster_test.go +++ b/pkg/splunk/enterprise/ingestorcluster_test.go @@ -32,7 +32,8 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client/fake" ) func init() { @@ -55,7 +56,11 @@ func TestApplyIngestorCluster(t *testing.T) { ctx := context.TODO() - c := spltest.NewMockClient() + scheme := runtime.NewScheme() + _ = enterpriseApi.AddToScheme(scheme) + _ = corev1.AddToScheme(scheme) + _ = appsv1.AddToScheme(scheme) + c := fake.NewClientBuilder().WithScheme(scheme).Build() // Object definitions provider := "sqs_smartbus" @@ -81,7 +86,7 @@ func TestApplyIngestorCluster(t *testing.T) { } c.Create(ctx, bus) - lms := enterpriseApi.LargeMessageStore{ + lms := &enterpriseApi.LargeMessageStore{ TypeMeta: metav1.TypeMeta{ Kind: "LargeMessageStore", APIVersion: "enterprise.splunk.com/v4", @@ -98,7 +103,7 @@ func TestApplyIngestorCluster(t *testing.T) { }, }, } - c.Create(ctx, &lms) + c.Create(ctx, lms) cr := &enterpriseApi.IngestorCluster{ TypeMeta: metav1.TypeMeta{ @@ -112,7 +117,8 @@ func TestApplyIngestorCluster(t *testing.T) { Spec: enterpriseApi.IngestorClusterSpec{ Replicas: 3, CommonSplunkSpec: enterpriseApi.CommonSplunkSpec{ - Mock: true, + Mock: true, + ServiceAccount: "sa", }, BusRef: corev1.ObjectReference{ Name: bus.Name, @@ -242,29 +248,6 @@ func TestApplyIngestorCluster(t *testing.T) { assert.True(t, result.Requeue) assert.NotEqual(t, enterpriseApi.PhaseError, cr.Status.Phase) - // Ensure stored StatefulSet status reflects readiness after any reconcile modifications - fetched := &appsv1.StatefulSet{} - _ = c.Get(ctx, types.NamespacedName{Name: "splunk-test-ingestor", Namespace: "test"}, fetched) - fetched.Status.Replicas = replicas - fetched.Status.ReadyReplicas = replicas - fetched.Status.UpdatedReplicas = replicas - if fetched.Status.UpdateRevision == "" { - fetched.Status.UpdateRevision = "v1" - } - c.Update(ctx, fetched) - - // Guarantee all pods have matching revision label - for _, pn := range []string{"splunk-test-ingestor-0", "splunk-test-ingestor-1", "splunk-test-ingestor-2"} { - p := &corev1.Pod{} - if err := c.Get(ctx, types.NamespacedName{Name: pn, Namespace: "test"}, p); err == nil { - if p.Labels == nil { - p.Labels = map[string]string{} - } - p.Labels["controller-revision-hash"] = fetched.Status.UpdateRevision - c.Update(ctx, p) - } - } - // outputs.conf origNew := newIngestorClusterPodManager mockHTTPClient := &spltest.MockHTTPClient{} @@ -280,6 +263,7 @@ func TestApplyIngestorCluster(t *testing.T) { defer func() { newIngestorClusterPodManager = origNew }() propertyKVList := [][]string{ + {"remote_queue.type", provider}, {fmt.Sprintf("remote_queue.%s.encoding_format", provider), "s2s"}, {fmt.Sprintf("remote_queue.%s.auth_region", provider), bus.Spec.SQS.Region}, {fmt.Sprintf("remote_queue.%s.endpoint", provider), bus.Spec.SQS.Endpoint}, @@ -318,6 +302,13 @@ func TestApplyIngestorCluster(t *testing.T) { } } + for i := 0; i < int(cr.Status.ReadyReplicas); i++ { + podName := fmt.Sprintf("splunk-test-ingestor-%d", i) + baseURL := fmt.Sprintf("https://%s.splunk-%s-ingestor-headless.%s.svc.cluster.local:8089/services/server/control/restart", podName, cr.GetName(), cr.GetNamespace()) + req, _ := http.NewRequest("POST", baseURL, nil) + mockHTTPClient.AddHandler(req, 200, "", nil) + } + // Second reconcile should now yield Ready cr.Status.TelAppInstalled = true result, err = ApplyIngestorCluster(ctx, c, cr) From e4e083a981061529ca4d948105997901879a1355 Mon Sep 17 00:00:00 2001 From: Kasia Koziol Date: Wed, 17 Dec 2025 11:37:38 +0100 Subject: [PATCH 05/10] CSPL-4360 Fix failing tests --- .../enterprise_v4_largemessagestores.yaml | 16 ++++++++-------- ...se-secret.yaml => 01-create-s3-secret.yaml} | 0 .../index-and-ingest-separation/02-assert.yaml | 4 ++++ .../index-and-ingest-separation/03-assert.yaml | 2 ++ pkg/splunk/enterprise/indexercluster.go | 4 ++++ pkg/splunk/enterprise/ingestorcluster.go | 2 ++ .../index_and_ingestion_separation_test.go | 18 ++++++++++++++---- 7 files changed, 34 insertions(+), 12 deletions(-) rename kuttl/tests/helm/index-and-ingest-separation/{01-create-se-secret.yaml => 01-create-s3-secret.yaml} (100%) diff --git a/helm-chart/splunk-enterprise/templates/enterprise_v4_largemessagestores.yaml b/helm-chart/splunk-enterprise/templates/enterprise_v4_largemessagestores.yaml index 77ef09e69..1e4e9b5db 100644 --- a/helm-chart/splunk-enterprise/templates/enterprise_v4_largemessagestores.yaml +++ b/helm-chart/splunk-enterprise/templates/enterprise_v4_largemessagestores.yaml @@ -1,21 +1,21 @@ -{{- if .Values.largemessagestore }} -{{- if .Values.largemessagestore.enabled }} +{{- if .Values.largeMessageStore }} +{{- if .Values.largeMessageStore.enabled }} apiVersion: enterprise.splunk.com/v4 kind: LargeMessageStore metadata: - name: {{ .Values.largemessagestore.name }} - namespace: {{ default .Release.Namespace .Values.largemessagestore.namespaceOverride }} - {{- with .Values.largemessagestore.additionalLabels }} + name: {{ .Values.largeMessageStore.name }} + namespace: {{ default .Release.Namespace .Values.largeMessageStore.namespaceOverride }} + {{- with .Values.largeMessageStore.additionalLabels }} labels: {{ toYaml . | nindent 4 }} {{- end }} - {{- with .Values.largemessagestore.additionalAnnotations }} + {{- with .Values.largeMessageStore.additionalAnnotations }} annotations: {{ toYaml . | nindent 4 }} {{- end }} spec: - provider: {{ .Values.largemessagestore.provider | quote }} - {{- with .Values.largemessagestore.s3 }} + provider: {{ .Values.largeMessageStore.provider | quote }} + {{- with .Values.largeMessageStore.s3 }} s3: {{- if .endpoint }} endpoint: {{ .endpoint | quote }} diff --git a/kuttl/tests/helm/index-and-ingest-separation/01-create-se-secret.yaml b/kuttl/tests/helm/index-and-ingest-separation/01-create-s3-secret.yaml similarity index 100% rename from kuttl/tests/helm/index-and-ingest-separation/01-create-se-secret.yaml rename to kuttl/tests/helm/index-and-ingest-separation/01-create-s3-secret.yaml diff --git a/kuttl/tests/helm/index-and-ingest-separation/02-assert.yaml b/kuttl/tests/helm/index-and-ingest-separation/02-assert.yaml index f34dd2e6c..42e003418 100644 --- a/kuttl/tests/helm/index-and-ingest-separation/02-assert.yaml +++ b/kuttl/tests/helm/index-and-ingest-separation/02-assert.yaml @@ -63,6 +63,8 @@ spec: replicas: 3 busRef: name: bus + largeMessageStoreRef: + name: lms status: phase: Ready bus: @@ -104,6 +106,8 @@ spec: replicas: 3 busRef: name: bus + largeMessageStoreRef: + name: lms status: phase: Ready bus: diff --git a/kuttl/tests/helm/index-and-ingest-separation/03-assert.yaml b/kuttl/tests/helm/index-and-ingest-separation/03-assert.yaml index 291eddeba..819620baa 100644 --- a/kuttl/tests/helm/index-and-ingest-separation/03-assert.yaml +++ b/kuttl/tests/helm/index-and-ingest-separation/03-assert.yaml @@ -8,6 +8,8 @@ spec: replicas: 4 busRef: name: bus + largeMessageStoreRef: + name: lms status: phase: Ready bus: diff --git a/pkg/splunk/enterprise/indexercluster.go b/pkg/splunk/enterprise/indexercluster.go index a5ebdbaa1..4acbc3d11 100644 --- a/pkg/splunk/enterprise/indexercluster.go +++ b/pkg/splunk/enterprise/indexercluster.go @@ -79,6 +79,7 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller cr.Status.ClusterManagerPhase = enterpriseApi.PhaseError if cr.Status.Replicas < cr.Spec.Replicas { cr.Status.Bus = &enterpriseApi.BusSpec{} + cr.Status.LargeMessageStore = &enterpriseApi.LargeMessageStoreSpec{} } cr.Status.Replicas = cr.Spec.Replicas cr.Status.Selector = fmt.Sprintf("app.kubernetes.io/instance=splunk-%s-indexer", cr.GetName()) @@ -305,6 +306,7 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller } cr.Status.Bus = &bus.Spec + cr.Status.LargeMessageStore = &lms.Spec for i := int32(0); i < cr.Spec.Replicas; i++ { idxcClient := mgr.getClient(ctx, i) @@ -407,6 +409,7 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, cr.Status.ClusterMasterPhase = enterpriseApi.PhaseError if cr.Status.Replicas < cr.Spec.Replicas { cr.Status.Bus = &enterpriseApi.BusSpec{} + cr.Status.LargeMessageStore = &enterpriseApi.LargeMessageStoreSpec{} } cr.Status.Replicas = cr.Spec.Replicas cr.Status.Selector = fmt.Sprintf("app.kubernetes.io/instance=splunk-%s-indexer", cr.GetName()) @@ -636,6 +639,7 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, } cr.Status.Bus = &bus.Spec + cr.Status.LargeMessageStore = &lms.Spec for i := int32(0); i < cr.Spec.Replicas; i++ { idxcClient := mgr.getClient(ctx, i) diff --git a/pkg/splunk/enterprise/ingestorcluster.go b/pkg/splunk/enterprise/ingestorcluster.go index 90c067494..1a1dcd428 100644 --- a/pkg/splunk/enterprise/ingestorcluster.go +++ b/pkg/splunk/enterprise/ingestorcluster.go @@ -74,6 +74,7 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr if cr.Status.Replicas < cr.Spec.Replicas { cr.Status.Bus = &enterpriseApi.BusSpec{} + cr.Status.LargeMessageStore = &enterpriseApi.LargeMessageStoreSpec{} } cr.Status.Replicas = cr.Spec.Replicas @@ -269,6 +270,7 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr } cr.Status.Bus = &bus.Spec + cr.Status.LargeMessageStore = &lms.Spec for i := int32(0); i < cr.Spec.Replicas; i++ { ingClient := mgr.getClient(ctx, i) diff --git a/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go b/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go index a21146e11..4b90db6bd 100644 --- a/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go +++ b/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go @@ -83,6 +83,7 @@ var _ = Describe("indingsep test", func() { // Secret reference volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateBusVolumeSpec("bus-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} bus.SQS.VolList = volumeSpec + updateBus.SQS.VolList = volumeSpec // Deploy Bus testcaseEnvInst.Log.Info("Deploy Bus") @@ -161,6 +162,7 @@ var _ = Describe("indingsep test", func() { // Secret reference volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateBusVolumeSpec("bus-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} bus.SQS.VolList = volumeSpec + updateBus.SQS.VolList = volumeSpec // Deploy Bus testcaseEnvInst.Log.Info("Deploy Bus") @@ -316,7 +318,7 @@ var _ = Describe("indingsep test", func() { // Verify Ingestor Cluster Status testcaseEnvInst.Log.Info("Verify Ingestor Cluster Status") - Expect(ingest.Status.Bus).To(Equal(bus), "Ingestor bus status is not the same as provided as input") + Expect(*ingest.Status.Bus).To(Equal(bus), "Ingestor bus status is not the same as provided as input") // Get instance of current Indexer Cluster CR with latest config testcaseEnvInst.Log.Info("Get instance of current Indexer Cluster CR with latest config") @@ -326,7 +328,7 @@ var _ = Describe("indingsep test", func() { // Verify Indexer Cluster Status testcaseEnvInst.Log.Info("Verify Indexer Cluster Status") - Expect(index.Status.Bus).To(Equal(bus), "Indexer bus status is not the same as provided as input") + Expect(*index.Status.Bus).To(Equal(bus), "Indexer bus status is not the same as provided as input") // Verify conf files testcaseEnvInst.Log.Info("Verify conf files") @@ -433,6 +435,10 @@ var _ = Describe("indingsep test", func() { err = deployment.UpdateCR(ctx, bus) Expect(err).To(Succeed(), "Unable to deploy Bus with updated CR") + // Ensure that Ingestor Cluster is in Ready phase + testcaseEnvInst.Log.Info("Ensure that Ingestor Cluster is in Ready phase") + testenv.IngestorReady(ctx, deployment, testcaseEnvInst) + // Get instance of current Ingestor Cluster CR with latest config testcaseEnvInst.Log.Info("Get instance of current Ingestor Cluster CR with latest config") ingest := &enterpriseApi.IngestorCluster{} @@ -441,7 +447,11 @@ var _ = Describe("indingsep test", func() { // Verify Ingestor Cluster Status testcaseEnvInst.Log.Info("Verify Ingestor Cluster Status") - Expect(ingest.Status.Bus).To(Equal(updateBus), "Ingestor bus status is not the same as provided as input") + Expect(*ingest.Status.Bus).To(Equal(updateBus), "Ingestor bus status is not the same as provided as input") + + // Ensure that Indexer Cluster is in Ready phase + testcaseEnvInst.Log.Info("Ensure that Indexer Cluster is in Ready phase") + testenv.SingleSiteIndexersReady(ctx, deployment, testcaseEnvInst) // Get instance of current Indexer Cluster CR with latest config testcaseEnvInst.Log.Info("Get instance of current Indexer Cluster CR with latest config") @@ -451,7 +461,7 @@ var _ = Describe("indingsep test", func() { // Verify Indexer Cluster Status testcaseEnvInst.Log.Info("Verify Indexer Cluster Status") - Expect(index.Status.Bus).To(Equal(updateBus), "Indexer bus status is not the same as provided as input") + Expect(*index.Status.Bus).To(Equal(updateBus), "Indexer bus status is not the same as provided as input") // Verify conf files testcaseEnvInst.Log.Info("Verify conf files") From 3cb9148536edda1fbfd229c009bb6b7dd1ef9ba4 Mon Sep 17 00:00:00 2001 From: Kasia Koziol Date: Wed, 17 Dec 2025 13:25:54 +0100 Subject: [PATCH 06/10] CSPL-4360 Fix errors with failing validation on status --- pkg/splunk/enterprise/indexercluster.go | 31 +++++++++---------- pkg/splunk/enterprise/ingestorcluster.go | 25 +++++++-------- pkg/splunk/enterprise/ingestorcluster_test.go | 5 ++- 3 files changed, 30 insertions(+), 31 deletions(-) diff --git a/pkg/splunk/enterprise/indexercluster.go b/pkg/splunk/enterprise/indexercluster.go index 4acbc3d11..b9b644599 100644 --- a/pkg/splunk/enterprise/indexercluster.go +++ b/pkg/splunk/enterprise/indexercluster.go @@ -77,10 +77,6 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller // updates status after function completes cr.Status.ClusterManagerPhase = enterpriseApi.PhaseError - if cr.Status.Replicas < cr.Spec.Replicas { - cr.Status.Bus = &enterpriseApi.BusSpec{} - cr.Status.LargeMessageStore = &enterpriseApi.LargeMessageStoreSpec{} - } cr.Status.Replicas = cr.Spec.Replicas cr.Status.Selector = fmt.Sprintf("app.kubernetes.io/instance=splunk-%s-indexer", cr.GetName()) if cr.Status.Peers == nil { @@ -296,7 +292,7 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller // If bus is updated if cr.Spec.BusRef.Name != "" { - if !reflect.DeepEqual(cr.Status.Bus, bus.Spec) || !reflect.DeepEqual(cr.Status.LargeMessageStore, lms.Spec) { + if cr.Status.Bus == nil || cr.Status.LargeMessageStore == nil || !reflect.DeepEqual(*cr.Status.Bus, bus.Spec) || !reflect.DeepEqual(*cr.Status.LargeMessageStore, lms.Spec) { mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient) err = mgr.handlePullBusChange(ctx, cr, busCopy, lmsCopy, client) if err != nil { @@ -305,9 +301,6 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller return result, err } - cr.Status.Bus = &bus.Spec - cr.Status.LargeMessageStore = &lms.Spec - for i := int32(0); i < cr.Spec.Replicas; i++ { idxcClient := mgr.getClient(ctx, i) err = idxcClient.RestartSplunk() @@ -316,6 +309,9 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller } scopedLog.Info("Restarted splunk", "indexer", i) } + + cr.Status.Bus = &bus.Spec + cr.Status.LargeMessageStore = &lms.Spec } } @@ -407,10 +403,6 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, // updates status after function completes cr.Status.Phase = enterpriseApi.PhaseError cr.Status.ClusterMasterPhase = enterpriseApi.PhaseError - if cr.Status.Replicas < cr.Spec.Replicas { - cr.Status.Bus = &enterpriseApi.BusSpec{} - cr.Status.LargeMessageStore = &enterpriseApi.LargeMessageStoreSpec{} - } cr.Status.Replicas = cr.Spec.Replicas cr.Status.Selector = fmt.Sprintf("app.kubernetes.io/instance=splunk-%s-indexer", cr.GetName()) if cr.Status.Peers == nil { @@ -629,7 +621,7 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, // If bus is updated if cr.Spec.BusRef.Name != "" { - if !reflect.DeepEqual(cr.Status.Bus, bus.Spec) || !reflect.DeepEqual(cr.Status.LargeMessageStore, lms.Spec) { + if cr.Status.Bus == nil || cr.Status.LargeMessageStore == nil || !reflect.DeepEqual(*cr.Status.Bus, bus.Spec) || !reflect.DeepEqual(*cr.Status.LargeMessageStore, lms.Spec) { mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient) err = mgr.handlePullBusChange(ctx, cr, busCopy, lmsCopy, client) if err != nil { @@ -638,9 +630,6 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, return result, err } - cr.Status.Bus = &bus.Spec - cr.Status.LargeMessageStore = &lms.Spec - for i := int32(0); i < cr.Spec.Replicas; i++ { idxcClient := mgr.getClient(ctx, i) err = idxcClient.RestartSplunk() @@ -649,6 +638,9 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, } scopedLog.Info("Restarted splunk", "indexer", i) } + + cr.Status.Bus = &bus.Spec + cr.Status.LargeMessageStore = &lms.Spec } } @@ -1336,6 +1328,13 @@ func (mgr *indexerClusterPodManager) handlePullBusChange(ctx context.Context, ne } splunkClient := newSplunkClientForBusPipeline(fmt.Sprintf("https://%s:8089", fqdnName), "admin", string(adminPwd)) + if newCR.Status.Bus == nil { + newCR.Status.Bus = &enterpriseApi.BusSpec{} + } + if newCR.Status.LargeMessageStore == nil { + newCR.Status.LargeMessageStore = &enterpriseApi.LargeMessageStoreSpec{} + } + afterDelete := false if (bus.Spec.SQS.Name != "" && newCR.Status.Bus.SQS.Name != "" && bus.Spec.SQS.Name != newCR.Status.Bus.SQS.Name) || (bus.Spec.Provider != "" && newCR.Status.Bus.Provider != "" && bus.Spec.Provider != newCR.Status.Bus.Provider) { diff --git a/pkg/splunk/enterprise/ingestorcluster.go b/pkg/splunk/enterprise/ingestorcluster.go index 1a1dcd428..f87a1eaa7 100644 --- a/pkg/splunk/enterprise/ingestorcluster.go +++ b/pkg/splunk/enterprise/ingestorcluster.go @@ -72,10 +72,6 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr // Update the CR Status defer updateCRStatus(ctx, client, cr, &err) - if cr.Status.Replicas < cr.Spec.Replicas { - cr.Status.Bus = &enterpriseApi.BusSpec{} - cr.Status.LargeMessageStore = &enterpriseApi.LargeMessageStoreSpec{} - } cr.Status.Replicas = cr.Spec.Replicas // If needed, migrate the app framework status @@ -260,7 +256,7 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr } // If bus is updated - if !reflect.DeepEqual(cr.Status.Bus, bus.Spec) || !reflect.DeepEqual(cr.Status.LargeMessageStore, lms.Spec) { + if cr.Status.Bus == nil || cr.Status.LargeMessageStore == nil || !reflect.DeepEqual(*cr.Status.Bus, bus.Spec) || !reflect.DeepEqual(*cr.Status.LargeMessageStore, lms.Spec) { mgr := newIngestorClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) err = mgr.handlePushBusChange(ctx, cr, busCopy, lmsCopy, client) if err != nil { @@ -269,9 +265,6 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr return result, err } - cr.Status.Bus = &bus.Spec - cr.Status.LargeMessageStore = &lms.Spec - for i := int32(0); i < cr.Spec.Replicas; i++ { ingClient := mgr.getClient(ctx, i) err = ingClient.RestartSplunk() @@ -280,6 +273,9 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr } scopedLog.Info("Restarted splunk", "ingestor", i) } + + cr.Status.Bus = &bus.Spec + cr.Status.LargeMessageStore = &lms.Spec } // Upgrade fron automated MC to MC CRD @@ -392,6 +388,13 @@ func (mgr *ingestorClusterPodManager) handlePushBusChange(ctx context.Context, n } splunkClient := mgr.newSplunkClient(fmt.Sprintf("https://%s:8089", fqdnName), "admin", string(adminPwd)) + if newCR.Status.Bus == nil { + newCR.Status.Bus = &enterpriseApi.BusSpec{} + } + if newCR.Status.LargeMessageStore == nil { + newCR.Status.LargeMessageStore = &enterpriseApi.LargeMessageStoreSpec{} + } + afterDelete := false if (bus.Spec.SQS.Name != "" && newCR.Status.Bus.SQS.Name != "" && bus.Spec.SQS.Name != newCR.Status.Bus.SQS.Name) || (bus.Spec.Provider != "" && newCR.Status.Bus.Provider != "" && bus.Spec.Provider != newCR.Status.Bus.Provider) { @@ -437,15 +440,9 @@ func (mgr *ingestorClusterPodManager) handlePushBusChange(ctx context.Context, n // getChangedBusFieldsForIngestor returns a list of changed bus and pipeline fields for ingestor pods func getChangedBusFieldsForIngestor(bus *enterpriseApi.Bus, lms *enterpriseApi.LargeMessageStore, busIngestorStatus *enterpriseApi.IngestorCluster, afterDelete bool, s3AccessKey, s3SecretKey string) (busChangedFields, pipelineChangedFields [][]string) { oldPB := busIngestorStatus.Status.Bus - if oldPB == nil { - oldPB = &enterpriseApi.BusSpec{} - } newPB := &bus.Spec oldLMS := busIngestorStatus.Status.LargeMessageStore - if oldLMS == nil { - oldLMS = &enterpriseApi.LargeMessageStoreSpec{} - } newLMS := &lms.Spec // Push changed bus fields diff --git a/pkg/splunk/enterprise/ingestorcluster_test.go b/pkg/splunk/enterprise/ingestorcluster_test.go index 0f5fae8fa..63d94facb 100644 --- a/pkg/splunk/enterprise/ingestorcluster_test.go +++ b/pkg/splunk/enterprise/ingestorcluster_test.go @@ -454,7 +454,10 @@ func TestGetChangedBusFieldsForIngestor(t *testing.T) { Name: lms.Name, }, }, - Status: enterpriseApi.IngestorClusterStatus{}, + Status: enterpriseApi.IngestorClusterStatus{ + Bus: &enterpriseApi.BusSpec{}, + LargeMessageStore: &enterpriseApi.LargeMessageStoreSpec{}, + }, } key := "key" From fafed270b1c068601a18f4bfeb4c073e625b2fa9 Mon Sep 17 00:00:00 2001 From: Kasia Koziol Date: Fri, 19 Dec 2025 11:03:36 +0100 Subject: [PATCH 07/10] CSPL-4360 Fixing tests after merge --- pkg/splunk/enterprise/indexercluster_test.go | 2 ++ pkg/splunk/enterprise/util.go | 4 ++-- pkg/splunk/enterprise/util_test.go | 8 ++++---- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/splunk/enterprise/indexercluster_test.go b/pkg/splunk/enterprise/indexercluster_test.go index 503f8beab..4f788d31a 100644 --- a/pkg/splunk/enterprise/indexercluster_test.go +++ b/pkg/splunk/enterprise/indexercluster_test.go @@ -2111,6 +2111,8 @@ func TestGetChangedQueueFieldsForIndexer(t *testing.T) { {fmt.Sprintf("remote_queue.%s.large_message_store.endpoint", provider), os.Spec.S3.Endpoint}, {fmt.Sprintf("remote_queue.%s.large_message_store.path", provider), os.Spec.S3.Path}, {fmt.Sprintf("remote_queue.%s.dead_letter_queue.name", provider), queue.Spec.SQS.DLQ}, + {fmt.Sprintf("remote_queue.%s.max_count.max_retries_per_part", provider), "4"}, + {fmt.Sprintf("remote_queue.%s.retry_policy", provider), "max_count"}, }, queueChangedFieldsInputs) assert.Equal(t, 12, len(queueChangedFieldsOutputs)) diff --git a/pkg/splunk/enterprise/util.go b/pkg/splunk/enterprise/util.go index bdc5d16ab..882a96ff3 100644 --- a/pkg/splunk/enterprise/util.go +++ b/pkg/splunk/enterprise/util.go @@ -417,8 +417,8 @@ func GetSmartstoreRemoteVolumeSecrets(ctx context.Context, volume enterpriseApi. return accessKey, secretKey, namespaceScopedSecret.ResourceVersion, nil } -// GetBusRemoteVolumeSecrets is used to retrieve access key and secrete key for Index & Ingestion separation -func GetBusRemoteVolumeSecrets(ctx context.Context, volume enterpriseApi.VolumeSpec, client splcommon.ControllerClient, cr splcommon.MetaObject) (string, string, error) { +// GetQueueRemoteVolumeSecrets is used to retrieve access key and secrete key for Index & Ingestion separation +func GetQueueRemoteVolumeSecrets(ctx context.Context, volume enterpriseApi.VolumeSpec, client splcommon.ControllerClient, cr splcommon.MetaObject) (string, string, error) { namespaceScopedSecret, err := splutil.GetSecretByName(ctx, client, cr.GetNamespace(), cr.GetName(), volume.SecretRef) if err != nil { return "", "", err diff --git a/pkg/splunk/enterprise/util_test.go b/pkg/splunk/enterprise/util_test.go index 6ea7b021e..35523a028 100644 --- a/pkg/splunk/enterprise/util_test.go +++ b/pkg/splunk/enterprise/util_test.go @@ -2624,8 +2624,8 @@ func TestUpdateCRStatus(t *testing.T) { WithStatusSubresource(&enterpriseApi.Standalone{}). WithStatusSubresource(&enterpriseApi.MonitoringConsole{}). WithStatusSubresource(&enterpriseApi.IndexerCluster{}). - WithStatusSubresource(&enterpriseApi.Bus{}). - WithStatusSubresource(&enterpriseApi.LargeMessageStore{}). + WithStatusSubresource(&enterpriseApi.Queue{}). + WithStatusSubresource(&enterpriseApi.ObjectStorage{}). WithStatusSubresource(&enterpriseApi.IngestorCluster{}). WithStatusSubresource(&enterpriseApi.SearchHeadCluster{}) c := builder.Build() @@ -3307,8 +3307,8 @@ func TestGetCurrentImage(t *testing.T) { WithStatusSubresource(&enterpriseApi.MonitoringConsole{}). WithStatusSubresource(&enterpriseApi.IndexerCluster{}). WithStatusSubresource(&enterpriseApi.SearchHeadCluster{}). - WithStatusSubresource(&enterpriseApi.Bus{}). - WithStatusSubresource(&enterpriseApi.LargeMessageStore{}). + WithStatusSubresource(&enterpriseApi.Queue{}). + WithStatusSubresource(&enterpriseApi.ObjectStorage{}). WithStatusSubresource(&enterpriseApi.IngestorCluster{}) client := builder.Build() client.Create(ctx, ¤t) From e0a10ba9fc5b8993f55d5dae4e1e1f189f76c47f Mon Sep 17 00:00:00 2001 From: Kasia Koziol Date: Fri, 19 Dec 2025 13:20:33 +0100 Subject: [PATCH 08/10] CSPL-4360 Fix validation that fails for status --- pkg/splunk/enterprise/indexercluster.go | 38 +++++++------------ pkg/splunk/enterprise/indexercluster_test.go | 6 ++- pkg/splunk/enterprise/ingestorcluster.go | 28 ++++++-------- pkg/splunk/enterprise/ingestorcluster_test.go | 2 +- 4 files changed, 32 insertions(+), 42 deletions(-) diff --git a/pkg/splunk/enterprise/indexercluster.go b/pkg/splunk/enterprise/indexercluster.go index 88b6a31d0..37e81afd4 100644 --- a/pkg/splunk/enterprise/indexercluster.go +++ b/pkg/splunk/enterprise/indexercluster.go @@ -1327,20 +1327,22 @@ func (mgr *indexerClusterPodManager) handlePullQueueChange(ctx context.Context, } splunkClient := newSplunkClientForQueuePipeline(fmt.Sprintf("https://%s:8089", fqdnName), "admin", string(adminPwd)) - if newCR.Status.Queue == nil { - newCR.Status.Queue = &enterpriseApi.QueueSpec{} + newCrStatusQueue := newCR.Status.Queue + if newCrStatusQueue == nil { + newCrStatusQueue = &enterpriseApi.QueueSpec{} } - if newCR.Status.ObjectStorage == nil { - newCR.Status.ObjectStorage = &enterpriseApi.ObjectStorageSpec{} + newCrStatusObjectStorage := newCR.Status.ObjectStorage + if newCrStatusObjectStorage == nil { + newCrStatusObjectStorage = &enterpriseApi.ObjectStorageSpec{} } afterDelete := false - if (queue.Spec.SQS.Name != "" && newCR.Status.Queue.SQS.Name != "" && queue.Spec.SQS.Name != newCR.Status.Queue.SQS.Name) || - (queue.Spec.Provider != "" && newCR.Status.Queue.Provider != "" && queue.Spec.Provider != newCR.Status.Queue.Provider) { - if err := splunkClient.DeleteConfFileProperty(scopedLog, "outputs", fmt.Sprintf("remote_queue:%s", newCR.Status.Queue.SQS.Name)); err != nil { + if (queue.Spec.SQS.Name != "" && newCrStatusQueue.SQS.Name != "" && queue.Spec.SQS.Name != newCrStatusQueue.SQS.Name) || + (queue.Spec.Provider != "" && newCrStatusQueue.Provider != "" && queue.Spec.Provider != newCrStatusQueue.Provider) { + if err := splunkClient.DeleteConfFileProperty(scopedLog, "outputs", fmt.Sprintf("remote_queue:%s", newCrStatusQueue.SQS.Name)); err != nil { updateErr = err } - if err := splunkClient.DeleteConfFileProperty(scopedLog, "inputs", fmt.Sprintf("remote_queue:%s", newCR.Status.Queue.SQS.Name)); err != nil { + if err := splunkClient.DeleteConfFileProperty(scopedLog, "inputs", fmt.Sprintf("remote_queue:%s", newCrStatusQueue.SQS.Name)); err != nil { updateErr = err } afterDelete = true @@ -1360,7 +1362,7 @@ func (mgr *indexerClusterPodManager) handlePullQueueChange(ctx context.Context, } } - queueChangedFieldsInputs, queueChangedFieldsOutputs, pipelineChangedFields := getChangedQueueFieldsForIndexer(&queue, &os, newCR, afterDelete, s3AccessKey, s3SecretKey) + queueChangedFieldsInputs, queueChangedFieldsOutputs, pipelineChangedFields := getChangedQueueFieldsForIndexer(&queue, &os, newCrStatusQueue, newCrStatusObjectStorage, afterDelete, s3AccessKey, s3SecretKey) for _, pbVal := range queueChangedFieldsOutputs { if err := splunkClient.UpdateConfFile(scopedLog, "outputs", fmt.Sprintf("remote_queue:%s", queue.Spec.SQS.Name), [][]string{pbVal}); err != nil { @@ -1386,22 +1388,10 @@ func (mgr *indexerClusterPodManager) handlePullQueueChange(ctx context.Context, } // getChangedQueueFieldsForIndexer returns a list of changed queue and pipeline fields for indexer pods -func getChangedQueueFieldsForIndexer(queue *enterpriseApi.Queue, os *enterpriseApi.ObjectStorage, queueIndexerStatus *enterpriseApi.IndexerCluster, afterDelete bool, s3AccessKey, s3SecretKey string) (queueChangedFieldsInputs, queueChangedFieldsOutputs, pipelineChangedFields [][]string) { - // Compare queue fields - oldQueue := queueIndexerStatus.Status.Queue - if oldQueue == nil { - oldQueue = &enterpriseApi.QueueSpec{} - } - newQueue := queue.Spec - - oldOS := queueIndexerStatus.Status.ObjectStorage - if oldOS == nil { - oldOS = &enterpriseApi.ObjectStorageSpec{} - } - newOS := os.Spec - +func getChangedQueueFieldsForIndexer(queue *enterpriseApi.Queue, os *enterpriseApi.ObjectStorage, queueStatus *enterpriseApi.QueueSpec, osStatus *enterpriseApi.ObjectStorageSpec, afterDelete bool, s3AccessKey, s3SecretKey string) (queueChangedFieldsInputs, queueChangedFieldsOutputs, pipelineChangedFields [][]string) { // Push all queue fields - queueChangedFieldsInputs, queueChangedFieldsOutputs = pullQueueChanged(oldQueue, &newQueue, oldOS, &newOS, afterDelete, s3AccessKey, s3SecretKey) + queueChangedFieldsInputs, queueChangedFieldsOutputs = pullQueueChanged(queueStatus, &queue.Spec, osStatus, &os.Spec, afterDelete, s3AccessKey, s3SecretKey) + // Always set all pipeline fields, not just changed ones pipelineChangedFields = pipelineConfig(true) diff --git a/pkg/splunk/enterprise/indexercluster_test.go b/pkg/splunk/enterprise/indexercluster_test.go index 4f788d31a..c891f1dd4 100644 --- a/pkg/splunk/enterprise/indexercluster_test.go +++ b/pkg/splunk/enterprise/indexercluster_test.go @@ -2096,11 +2096,15 @@ func TestGetChangedQueueFieldsForIndexer(t *testing.T) { Name: os.Name, }, }, + Status: enterpriseApi.IndexerClusterStatus{ + Queue: &enterpriseApi.QueueSpec{}, + ObjectStorage: &enterpriseApi.ObjectStorageSpec{}, + }, } key := "key" secret := "secret" - queueChangedFieldsInputs, queueChangedFieldsOutputs, pipelineChangedFields := getChangedQueueFieldsForIndexer(&queue, &os, newCR, false, key, secret) + queueChangedFieldsInputs, queueChangedFieldsOutputs, pipelineChangedFields := getChangedQueueFieldsForIndexer(&queue, &os, newCR.Status.Queue, newCR.Status.ObjectStorage, false, key, secret) assert.Equal(t, 10, len(queueChangedFieldsInputs)) assert.Equal(t, [][]string{ {"remote_queue.type", provider}, diff --git a/pkg/splunk/enterprise/ingestorcluster.go b/pkg/splunk/enterprise/ingestorcluster.go index f3db2a1fa..5aa41dd45 100644 --- a/pkg/splunk/enterprise/ingestorcluster.go +++ b/pkg/splunk/enterprise/ingestorcluster.go @@ -388,17 +388,19 @@ func (mgr *ingestorClusterPodManager) handlePushQueueChange(ctx context.Context, } splunkClient := mgr.newSplunkClient(fmt.Sprintf("https://%s:8089", fqdnName), "admin", string(adminPwd)) - if newCR.Status.Queue == nil { - newCR.Status.Queue = &enterpriseApi.QueueSpec{} + newCrStatusQueue := newCR.Status.Queue + if newCrStatusQueue == nil { + newCrStatusQueue = &enterpriseApi.QueueSpec{} } - if newCR.Status.ObjectStorage == nil { - newCR.Status.ObjectStorage = &enterpriseApi.ObjectStorageSpec{} + newCrStatusObjectStorage := newCR.Status.ObjectStorage + if newCrStatusObjectStorage == nil { + newCrStatusObjectStorage = &enterpriseApi.ObjectStorageSpec{} } afterDelete := false - if (queue.Spec.SQS.Name != "" && newCR.Status.Queue.SQS.Name != "" && queue.Spec.SQS.Name != newCR.Status.Queue.SQS.Name) || - (queue.Spec.Provider != "" && newCR.Status.Queue.Provider != "" && queue.Spec.Provider != newCR.Status.Queue.Provider) { - if err := splunkClient.DeleteConfFileProperty(scopedLog, "outputs", fmt.Sprintf("remote_queue:%s", newCR.Status.Queue.SQS.Name)); err != nil { + if (queue.Spec.SQS.Name != "" && newCrStatusQueue.SQS.Name != "" && queue.Spec.SQS.Name != newCrStatusQueue.SQS.Name) || + (queue.Spec.Provider != "" && newCrStatusQueue.Provider != "" && queue.Spec.Provider != newCrStatusQueue.Provider) { + if err := splunkClient.DeleteConfFileProperty(scopedLog, "outputs", fmt.Sprintf("remote_queue:%s", newCrStatusQueue.SQS.Name)); err != nil { updateErr = err } afterDelete = true @@ -418,7 +420,7 @@ func (mgr *ingestorClusterPodManager) handlePushQueueChange(ctx context.Context, } } - queueChangedFields, pipelineChangedFields := getChangedQueueFieldsForIngestor(&queue, &os, newCR, afterDelete, s3AccessKey, s3SecretKey) + queueChangedFields, pipelineChangedFields := getChangedQueueFieldsForIngestor(&queue, &os, newCrStatusQueue, newCrStatusObjectStorage,afterDelete, s3AccessKey, s3SecretKey) for _, pbVal := range queueChangedFields { if err := splunkClient.UpdateConfFile(scopedLog, "outputs", fmt.Sprintf("remote_queue:%s", queue.Spec.SQS.Name), [][]string{pbVal}); err != nil { @@ -438,15 +440,9 @@ func (mgr *ingestorClusterPodManager) handlePushQueueChange(ctx context.Context, } // getChangedBusFieldsForIngestor returns a list of changed bus and pipeline fields for ingestor pods -func getChangedQueueFieldsForIngestor(queue *enterpriseApi.Queue, os *enterpriseApi.ObjectStorage, queueIngestorStatus *enterpriseApi.IngestorCluster, afterDelete bool, s3AccessKey, s3SecretKey string) (queueChangedFields, pipelineChangedFields [][]string) { - oldQueue := queueIngestorStatus.Status.Queue - newQueue := &queue.Spec - - oldOS := queueIngestorStatus.Status.ObjectStorage - newOS := &os.Spec - +func getChangedQueueFieldsForIngestor(queue *enterpriseApi.Queue, os *enterpriseApi.ObjectStorage, queueStatus *enterpriseApi.QueueSpec, osStatus *enterpriseApi.ObjectStorageSpec, afterDelete bool, s3AccessKey, s3SecretKey string) (queueChangedFields, pipelineChangedFields [][]string) { // Push changed bus fields - queueChangedFields = pushQueueChanged(oldQueue, newQueue, oldOS, newOS, afterDelete, s3AccessKey, s3SecretKey) + queueChangedFields = pushQueueChanged(queueStatus, &queue.Spec, osStatus, &os.Spec, afterDelete, s3AccessKey, s3SecretKey) // Always changed pipeline fields pipelineChangedFields = pipelineConfig(false) diff --git a/pkg/splunk/enterprise/ingestorcluster_test.go b/pkg/splunk/enterprise/ingestorcluster_test.go index 448929572..995e52ff8 100644 --- a/pkg/splunk/enterprise/ingestorcluster_test.go +++ b/pkg/splunk/enterprise/ingestorcluster_test.go @@ -462,7 +462,7 @@ func TestGetChangedQueueFieldsForIngestor(t *testing.T) { key := "key" secret := "secret" - queueChangedFields, pipelineChangedFields := getChangedQueueFieldsForIngestor(&queue, &os, newCR, false, key, secret) + queueChangedFields, pipelineChangedFields := getChangedQueueFieldsForIngestor(&queue, &os, newCR.Status.Queue, newCR.Status.ObjectStorage, false, key, secret) assert.Equal(t, 12, len(queueChangedFields)) assert.Equal(t, [][]string{ From 155b21a49fda387472a95a93391c27865d16cf1b Mon Sep 17 00:00:00 2001 From: Kasia Koziol Date: Fri, 19 Dec 2025 14:58:10 +0100 Subject: [PATCH 09/10] CSPL-4360 Fix failing to get k8s secret --- pkg/splunk/enterprise/indexercluster.go | 17 +++++++++-------- pkg/splunk/enterprise/indexercluster_test.go | 3 ++- pkg/splunk/enterprise/ingestorcluster.go | 4 ++-- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/pkg/splunk/enterprise/indexercluster.go b/pkg/splunk/enterprise/indexercluster.go index 37e81afd4..558f862b1 100644 --- a/pkg/splunk/enterprise/indexercluster.go +++ b/pkg/splunk/enterprise/indexercluster.go @@ -115,7 +115,7 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller cr.Status.ClusterManagerPhase = enterpriseApi.PhaseError } - mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient) + mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) // Check if we have configured enough number(<= RF) of replicas if mgr.cr.Status.ClusterManagerPhase == enterpriseApi.PhaseReady { err = VerifyRFPeers(ctx, mgr, client) @@ -248,7 +248,7 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller if cr.Spec.QueueRef.Namespace != "" { ns = cr.Spec.QueueRef.Namespace } - err = client.Get(context.Background(), types.NamespacedName{ + err = client.Get(ctx, types.NamespacedName{ Name: cr.Spec.QueueRef.Name, Namespace: ns, }, &queue) @@ -272,7 +272,7 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller if cr.Spec.ObjectStorageRef.Namespace != "" { ns = cr.Spec.ObjectStorageRef.Namespace } - err = client.Get(context.Background(), types.NamespacedName{ + err = client.Get(ctx, types.NamespacedName{ Name: cr.Spec.ObjectStorageRef.Name, Namespace: ns, }, &os) @@ -292,7 +292,7 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller // If bus is updated if cr.Spec.QueueRef.Name != "" { if cr.Status.Queue == nil || cr.Status.ObjectStorage == nil || !reflect.DeepEqual(*cr.Status.Queue, queue.Spec) || !reflect.DeepEqual(*cr.Status.ObjectStorage, os.Spec) { - mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient) + mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) err = mgr.handlePullQueueChange(ctx, cr, queueCopy, osCopy, client) if err != nil { eventPublisher.Warning(ctx, "ApplyIndexerClusterManager", fmt.Sprintf("Failed to update conf file for Queue/Pipeline config change after pod creation: %s", err.Error())) @@ -443,7 +443,7 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, cr.Status.ClusterMasterPhase = enterpriseApi.PhaseError } - mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient) + mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) // Check if we have configured enough number(<= RF) of replicas if mgr.cr.Status.ClusterMasterPhase == enterpriseApi.PhaseReady { err = VerifyRFPeers(ctx, mgr, client) @@ -621,7 +621,7 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, // If bus is updated if cr.Spec.QueueRef.Name != "" { if cr.Status.Queue == nil || cr.Status.ObjectStorage == nil || !reflect.DeepEqual(*cr.Status.Queue, queue.Spec) || !reflect.DeepEqual(*cr.Status.ObjectStorage, os.Spec) { - mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient) + mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) err = mgr.handlePullQueueChange(ctx, cr, queueCopy, osCopy, client) if err != nil { eventPublisher.Warning(ctx, "ApplyIndexerClusterManager", fmt.Sprintf("Failed to update conf file for Queue/Pipeline config change after pod creation: %s", err.Error())) @@ -722,12 +722,13 @@ type indexerClusterPodManager struct { } // newIndexerClusterPodManager function to create pod manager this is added to write unit test case -var newIndexerClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IndexerCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc) indexerClusterPodManager { +var newIndexerClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IndexerCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc, c splcommon.ControllerClient) indexerClusterPodManager { return indexerClusterPodManager{ log: log, cr: cr, secrets: secret, newSplunkClient: newSplunkClient, + c: c, } } @@ -1391,7 +1392,7 @@ func (mgr *indexerClusterPodManager) handlePullQueueChange(ctx context.Context, func getChangedQueueFieldsForIndexer(queue *enterpriseApi.Queue, os *enterpriseApi.ObjectStorage, queueStatus *enterpriseApi.QueueSpec, osStatus *enterpriseApi.ObjectStorageSpec, afterDelete bool, s3AccessKey, s3SecretKey string) (queueChangedFieldsInputs, queueChangedFieldsOutputs, pipelineChangedFields [][]string) { // Push all queue fields queueChangedFieldsInputs, queueChangedFieldsOutputs = pullQueueChanged(queueStatus, &queue.Spec, osStatus, &os.Spec, afterDelete, s3AccessKey, s3SecretKey) - + // Always set all pipeline fields, not just changed ones pipelineChangedFields = pipelineConfig(true) diff --git a/pkg/splunk/enterprise/indexercluster_test.go b/pkg/splunk/enterprise/indexercluster_test.go index c891f1dd4..2b4026ac5 100644 --- a/pkg/splunk/enterprise/indexercluster_test.go +++ b/pkg/splunk/enterprise/indexercluster_test.go @@ -1569,7 +1569,7 @@ func TestIndexerClusterWithReadyState(t *testing.T) { return nil } - newIndexerClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IndexerCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc) indexerClusterPodManager { + newIndexerClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IndexerCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc, c splcommon.ControllerClient) indexerClusterPodManager { return indexerClusterPodManager{ log: log, cr: cr, @@ -1579,6 +1579,7 @@ func TestIndexerClusterWithReadyState(t *testing.T) { c.Client = mclient return c }, + c: c, } } diff --git a/pkg/splunk/enterprise/ingestorcluster.go b/pkg/splunk/enterprise/ingestorcluster.go index 5aa41dd45..62693e1b5 100644 --- a/pkg/splunk/enterprise/ingestorcluster.go +++ b/pkg/splunk/enterprise/ingestorcluster.go @@ -238,7 +238,7 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr if cr.Spec.ObjectStorageRef.Namespace != "" { ns = cr.Spec.ObjectStorageRef.Namespace } - err = client.Get(context.Background(), types.NamespacedName{ + err = client.Get(ctx, types.NamespacedName{ Name: cr.Spec.ObjectStorageRef.Name, Namespace: ns, }, &os) @@ -420,7 +420,7 @@ func (mgr *ingestorClusterPodManager) handlePushQueueChange(ctx context.Context, } } - queueChangedFields, pipelineChangedFields := getChangedQueueFieldsForIngestor(&queue, &os, newCrStatusQueue, newCrStatusObjectStorage,afterDelete, s3AccessKey, s3SecretKey) + queueChangedFields, pipelineChangedFields := getChangedQueueFieldsForIngestor(&queue, &os, newCrStatusQueue, newCrStatusObjectStorage, afterDelete, s3AccessKey, s3SecretKey) for _, pbVal := range queueChangedFields { if err := splunkClient.UpdateConfFile(scopedLog, "outputs", fmt.Sprintf("remote_queue:%s", queue.Spec.SQS.Name), [][]string{pbVal}); err != nil { From f8afd5a7790c489e2997921ba08060e2dd87c075 Mon Sep 17 00:00:00 2001 From: Kasia Koziol Date: Mon, 22 Dec 2025 13:51:03 +0100 Subject: [PATCH 10/10] CSPL-4360 Fix failing integ and helm tests --- api/v4/objectstorage_types.go | 2 +- .../enterprise.splunk.com_objectstorages.yaml | 2 +- docs/CustomResources.md | 10 +-- docs/IndexIngestionSeparation.md | 24 +++--- .../enterprise_v4_indexercluster.yaml | 4 +- .../enterprise_v4_objectstorages.yaml | 2 +- .../templates/enterprise_v4_queues.yaml | 4 +- .../02-assert.yaml | 50 +++++------ .../03-assert.yaml | 20 ++--- .../splunk_index_ingest_sep.yaml | 8 +- pkg/splunk/enterprise/indexercluster.go | 18 ++-- pkg/splunk/enterprise/ingestorcluster.go | 13 +-- pkg/splunk/enterprise/types.go | 2 +- ...dex_and_ingestion_separation_suite_test.go | 28 +++---- .../index_and_ingestion_separation_test.go | 83 ++++++++++--------- test/testenv/remote_index_utils.go | 4 +- test/testenv/util.go | 8 +- 17 files changed, 147 insertions(+), 135 deletions(-) diff --git a/api/v4/objectstorage_types.go b/api/v4/objectstorage_types.go index 9e95392ce..08205743f 100644 --- a/api/v4/objectstorage_types.go +++ b/api/v4/objectstorage_types.go @@ -55,7 +55,7 @@ type S3Spec struct { // ObjectStorageStatus defines the observed state of ObjectStorage. type ObjectStorageStatus struct { - // Phase of the large message store + // Phase of the object storage Phase Phase `json:"phase"` // Resource revision tracker diff --git a/config/crd/bases/enterprise.splunk.com_objectstorages.yaml b/config/crd/bases/enterprise.splunk.com_objectstorages.yaml index 2fac45707..c84474921 100644 --- a/config/crd/bases/enterprise.splunk.com_objectstorages.yaml +++ b/config/crd/bases/enterprise.splunk.com_objectstorages.yaml @@ -87,7 +87,7 @@ spec: description: Auxillary message describing CR status type: string phase: - description: Phase of the large message store + description: Phase of the object storage enum: - Pending - Ready diff --git a/docs/CustomResources.md b/docs/CustomResources.md index 157a9b123..bd85c05ca 100644 --- a/docs/CustomResources.md +++ b/docs/CustomResources.md @@ -404,21 +404,21 @@ spec: endpoint: https://s3.us-west-2.amazonaws.com ``` -ObjectStorage inputs can be found in the table below. As of now, only S3 provider of large message store is supported. +ObjectStorage inputs can be found in the table below. As of now, only S3 provider of object storage is supported. | Key | Type | Description | | ---------- | ------- | ------------------------------------------------- | -| provider | string | [Required] Provider of large message store (Allowed values: s3) | -| s3 | S3 | [Required if provider=s3] S3 large message store inputs | +| provider | string | [Required] Provider of object storage (Allowed values: s3) | +| s3 | S3 | [Required if provider=s3] S3 object storage inputs | -S3 large message store inputs can be found in the table below. +S3 object storage inputs can be found in the table below. | Key | Type | Description | | ---------- | ------- | ------------------------------------------------- | | path | string | [Required] Remote storage location for messages that are larger than the underlying maximum message size | | endpoint | string | [Optional, if not provided formed based on region] S3-compatible service endpoint -Change of any of the large message queue inputs triggers the restart of Splunk so that appropriate .conf files are correctly refreshed and consumed. +Change of any of the object storage inputs triggers the restart of Splunk so that appropriate .conf files are correctly refreshed and consumed. ## MonitoringConsole Resource Spec Parameters diff --git a/docs/IndexIngestionSeparation.md b/docs/IndexIngestionSeparation.md index d532e189c..c7b05dcae 100644 --- a/docs/IndexIngestionSeparation.md +++ b/docs/IndexIngestionSeparation.md @@ -44,7 +44,7 @@ SQS message queue inputs can be found in the table below. | endpoint | string | [Optional, if not provided formed based on region] AWS SQS Service endpoint | dlq | string | [Required] Name of the dead letter queue | -**First provisioning or update of any of the bus inputs requires Ingestor Cluster and Indexer Cluster Splunkd restart, but this restart is implemented automatically and done by SOK.** +**First provisioning or update of any of the queue inputs requires Ingestor Cluster and Indexer Cluster Splunkd restart, but this restart is implemented automatically and done by SOK.** ## Example ``` @@ -67,21 +67,21 @@ ObjectStorage is introduced to store large message (messages that exceed the siz ## Spec -ObjectStorage inputs can be found in the table below. As of now, only S3 provider of large message store is supported. +ObjectStorage inputs can be found in the table below. As of now, only S3 provider of object storage is supported. | Key | Type | Description | | ---------- | ------- | ------------------------------------------------- | -| provider | string | [Required] Provider of large message store (Allowed values: s3) | -| s3 | S3 | [Required if provider=s3] S3 large message store inputs | +| provider | string | [Required] Provider of object storage (Allowed values: s3) | +| s3 | S3 | [Required if provider=s3] S3 object storage inputs | -S3 large message store inputs can be found in the table below. +S3 object storage inputs can be found in the table below. | Key | Type | Description | | ---------- | ------- | ------------------------------------------------- | | path | string | [Required] Remote storage location for messages that are larger than the underlying maximum message size | | endpoint | string | [Optional, if not provided formed based on region] S3-compatible service endpoint -Change of any of the large message queue inputs triggers the restart of Splunk so that appropriate .conf files are correctly refreshed and consumed. +Change of any of the object storage inputs triggers the restart of Splunk so that appropriate .conf files are correctly refreshed and consumed. ## Example ``` @@ -108,13 +108,13 @@ In addition to common spec inputs, the IngestorCluster resource provides the fol | ---------- | ------- | ------------------------------------------------- | | replicas | integer | The number of replicas (defaults to 3) | | queueRef | corev1.ObjectReference | Message queue reference | -| objectStorageRef | corev1.ObjectReference | Large message store reference | +| objectStorageRef | corev1.ObjectReference | Object storage reference | ## Example The example presented below configures IngestorCluster named ingestor with Splunk ${SPLUNK_IMAGE_VERSION} image that resides in a default namespace and is scaled to 3 replicas that serve the ingestion traffic. This IngestorCluster custom resource is set up with the service account named ingestor-sa allowing it to perform SQS and S3 operations. Queue and ObjectStorage references allow the user to specify queue and bucket settings for the ingestion process. -In this case, the setup uses the SQS and S3 based configuration where the messages are stored in sqs-test queue in us-west-2 region with dead letter queue set to sqs-dlq-test queue. The large message store is set to ingestion bucket in smartbus-test directory. Based on these inputs, default-mode.conf and outputs.conf files are configured accordingly. +In this case, the setup uses the SQS and S3 based configuration where the messages are stored in sqs-test queue in us-west-2 region with dead letter queue set to sqs-dlq-test queue. The object storage is set to ingestion bucket in smartbus-test directory. Based on these inputs, default-mode.conf and outputs.conf files are configured accordingly. ``` apiVersion: enterprise.splunk.com/v4 @@ -145,13 +145,13 @@ In addition to common spec inputs, the IndexerCluster resource provides the foll | ---------- | ------- | ------------------------------------------------- | | replicas | integer | The number of replicas (defaults to 3) | | queueRef | corev1.ObjectReference | Message queue reference | -| objectStorageRef | corev1.ObjectReference | Large message store reference | +| objectStorageRef | corev1.ObjectReference | Object storage reference | ## Example The example presented below configures IndexerCluster named indexer with Splunk ${SPLUNK_IMAGE_VERSION} image that resides in a default namespace and is scaled to 3 replicas that serve the indexing traffic. This IndexerCluster custom resource is set up with the service account named ingestor-sa allowing it to perform SQS and S3 operations. Queue and ObjectStorage references allow the user to specify queue and bucket settings for the indexing process. -In this case, the setup uses the SQS and S3 based configuration where the messages are stored in and retrieved from sqs-test queue in us-west-2 region with dead letter queue set to sqs-dlq-test queue. The large message store is set to ingestion bucket in smartbus-test directory. Based on these inputs, default-mode.conf, inputs.conf and outputs.conf files are configured accordingly. +In this case, the setup uses the SQS and S3 based configuration where the messages are stored in and retrieved from sqs-test queue in us-west-2 region with dead letter queue set to sqs-dlq-test queue. The object storage is set to ingestion bucket in smartbus-test directory. Based on these inputs, default-mode.conf, inputs.conf and outputs.conf files are configured accordingly. ``` apiVersion: enterprise.splunk.com/v4 @@ -717,7 +717,7 @@ Spec: Name: queue Namespace: default Image: splunk/splunk:${SPLUNK_IMAGE_VERSION} - Large Message Store Ref: + Object Storage Ref: Name: os Namespace: default Replicas: 3 @@ -741,7 +741,7 @@ Status: Endpoint: https://sqs.us-west-2.amazonaws.com Name: sqs-test Provider: sqs - Large Message Store: + Object Storage: S3: Endpoint: https://s3.us-west-2.amazonaws.com Path: s3://ingestion/smartbus-test diff --git a/helm-chart/splunk-enterprise/templates/enterprise_v4_indexercluster.yaml b/helm-chart/splunk-enterprise/templates/enterprise_v4_indexercluster.yaml index 235505530..e5541e017 100644 --- a/helm-chart/splunk-enterprise/templates/enterprise_v4_indexercluster.yaml +++ b/helm-chart/splunk-enterprise/templates/enterprise_v4_indexercluster.yaml @@ -170,8 +170,8 @@ items: namespace: {{ .namespace }} {{- end }} {{- end }} - {{- with $.Values.indexerCluster.objectStoreRef }} - objectStoreRef: + {{- with $.Values.indexerCluster.objectStorageRef }} + objectStorageRef: name: {{ .name }} {{- if .namespace }} namespace: {{ .namespace }} diff --git a/helm-chart/splunk-enterprise/templates/enterprise_v4_objectstorages.yaml b/helm-chart/splunk-enterprise/templates/enterprise_v4_objectstorages.yaml index 7cd5bdca0..033aed904 100644 --- a/helm-chart/splunk-enterprise/templates/enterprise_v4_objectstorages.yaml +++ b/helm-chart/splunk-enterprise/templates/enterprise_v4_objectstorages.yaml @@ -1,4 +1,4 @@ -{{- if .Values.objectStorage.enabled }} +{{- if .Values.objectStorage }} {{- if .Values.objectStorage.enabled }} apiVersion: enterprise.splunk.com/v4 kind: ObjectStorage diff --git a/helm-chart/splunk-enterprise/templates/enterprise_v4_queues.yaml b/helm-chart/splunk-enterprise/templates/enterprise_v4_queues.yaml index 09cd949dc..06a3c5dbd 100644 --- a/helm-chart/splunk-enterprise/templates/enterprise_v4_queues.yaml +++ b/helm-chart/splunk-enterprise/templates/enterprise_v4_queues.yaml @@ -26,8 +26,8 @@ spec: {{- if .name }} name: {{ .name | quote }} {{- end }} - {{- if .region }} - region: {{ .region | quote }} + {{- if .authRegion }} + authRegion: {{ .authRegion | quote }} {{- end }} {{- if .volumes }} volumes: diff --git a/kuttl/tests/helm/index-and-ingest-separation/02-assert.yaml b/kuttl/tests/helm/index-and-ingest-separation/02-assert.yaml index 547f2a358..ca56ca5ef 100644 --- a/kuttl/tests/helm/index-and-ingest-separation/02-assert.yaml +++ b/kuttl/tests/helm/index-and-ingest-separation/02-assert.yaml @@ -1,30 +1,30 @@ --- -# assert for bus custom resource to be ready +# assert for queue custom resource to be ready apiVersion: enterprise.splunk.com/v4 -kind: Bus +kind: Queue metadata: - name: bus + name: queue spec: provider: sqs sqs: - name: sqs-test - region: us-west-2 + name: index-ingest-separation-test-q + authRegion: us-west-2 endpoint: https://sqs.us-west-2.amazonaws.com - dlq: sqs-dlq-test + dlq: index-ingest-separation-test-dlq status: phase: Ready --- -# assert for large message store custom resource to be ready +# assert for object storage custom resource to be ready apiVersion: enterprise.splunk.com/v4 -kind: LargeMessageStore +kind: ObjectStorage metadata: - name: lms + name: os spec: provider: s3 s3: endpoint: https://s3.us-west-2.amazonaws.com - path: s3://ingestion/smartbus-test + path: s3://index-ingest-separation-test-bucket/smartbus-test status: phase: Ready @@ -61,24 +61,24 @@ metadata: name: indexer spec: replicas: 3 - busRef: - name: bus - largeMessageStoreRef: - name: lms + queueRef: + name: queue + objectStorageRef: + name: os status: phase: Ready - bus: + queue: provider: sqs sqs: - name: sqs-test - region: us-west-2 + name: index-ingest-separation-test-q + authRegion: us-west-2 endpoint: https://sqs.us-west-2.amazonaws.com - dlq: sqs-dlq-test - largeMessageStore: + dlq: index-ingest-separation-test-dlq + objectStorage: provider: s3 s3: endpoint: https://s3.us-west-2.amazonaws.com - path: s3://ingestion/smartbus-test + path: s3://index-ingest-separation-test-bucket/smartbus-test --- # check for stateful set and replicas as configured @@ -103,7 +103,7 @@ kind: IngestorCluster metadata: name: ingestor spec: - replicas: 4 + replicas: 3 queueRef: name: queue objectStorageRef: @@ -113,15 +113,15 @@ status: queue: provider: sqs sqs: - name: sqs-test - region: us-west-2 + name: index-ingest-separation-test-q + authRegion: us-west-2 endpoint: https://sqs.us-west-2.amazonaws.com - dlq: sqs-dlq-test + dlq: index-ingest-separation-test-dlq objectStorage: provider: s3 s3: endpoint: https://s3.us-west-2.amazonaws.com - path: s3://ingestion/smartbus-test + path: s3://index-ingest-separation-test-bucket/smartbus-test --- # check for stateful set and replicas as configured diff --git a/kuttl/tests/helm/index-and-ingest-separation/03-assert.yaml b/kuttl/tests/helm/index-and-ingest-separation/03-assert.yaml index 819620baa..765a22192 100644 --- a/kuttl/tests/helm/index-and-ingest-separation/03-assert.yaml +++ b/kuttl/tests/helm/index-and-ingest-separation/03-assert.yaml @@ -6,24 +6,24 @@ metadata: name: ingestor spec: replicas: 4 - busRef: - name: bus - largeMessageStoreRef: - name: lms + queueRef: + name: queue + objectStorageRef: + name: os status: phase: Ready - bus: + queue: provider: sqs sqs: - name: sqs-test - region: us-west-2 + name: index-ingest-separation-test-q + authRegion: us-west-2 endpoint: https://sqs.us-west-2.amazonaws.com - dlq: sqs-dlq-test - largeMessageStore: + dlq: index-ingest-separation-test-dlq + objectStorage: provider: s3 s3: endpoint: https://s3.us-west-2.amazonaws.com - path: s3://ingestion/smartbus-test + path: s3://index-ingest-separation-test-bucket/smartbus-test --- # check for stateful sets and replicas updated diff --git a/kuttl/tests/helm/index-and-ingest-separation/splunk_index_ingest_sep.yaml b/kuttl/tests/helm/index-and-ingest-separation/splunk_index_ingest_sep.yaml index 7bec8ee7d..46ef7fce3 100644 --- a/kuttl/tests/helm/index-and-ingest-separation/splunk_index_ingest_sep.yaml +++ b/kuttl/tests/helm/index-and-ingest-separation/splunk_index_ingest_sep.yaml @@ -10,10 +10,10 @@ queue: name: queue provider: sqs sqs: - name: sqs-test - region: us-west-2 + name: index-ingest-separation-test-q + authRegion: us-west-2 endpoint: https://sqs.us-west-2.amazonaws.com - dlq: sqs-dlq-test + dlq: index-ingest-separation-test-dlq volumes: - name: helm-bus-secret-ref-test secretRef: s3-secret @@ -24,7 +24,7 @@ objectStorage: provider: s3 s3: endpoint: https://s3.us-west-2.amazonaws.com - path: s3://ingestion/smartbus-test + path: s3://index-ingest-separation-test-bucket/smartbus-test ingestorCluster: enabled: true diff --git a/pkg/splunk/enterprise/indexercluster.go b/pkg/splunk/enterprise/indexercluster.go index 558f862b1..3808539cc 100644 --- a/pkg/splunk/enterprise/indexercluster.go +++ b/pkg/splunk/enterprise/indexercluster.go @@ -76,6 +76,10 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller // updates status after function completes cr.Status.ClusterManagerPhase = enterpriseApi.PhaseError + if cr.Status.Replicas < cr.Spec.Replicas { + cr.Status.Queue = nil + cr.Status.ObjectStorage = nil + } cr.Status.Replicas = cr.Spec.Replicas cr.Status.Selector = fmt.Sprintf("app.kubernetes.io/instance=splunk-%s-indexer", cr.GetName()) if cr.Status.Peers == nil { @@ -265,7 +269,7 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller } } - // Large Message Store + // Object Storage os := enterpriseApi.ObjectStorage{} if cr.Spec.ObjectStorageRef.Name != "" { ns := cr.GetNamespace() @@ -281,7 +285,7 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller } } - // Can not override original large message store spec due to comparison in the later code + // Can not override original object storage spec due to comparison in the later code osCopy := os if osCopy.Spec.Provider == "s3" { if osCopy.Spec.S3.Endpoint == "" && queueCopy.Spec.SQS.AuthRegion != "" { @@ -289,7 +293,7 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller } } - // If bus is updated + // If queue is updated if cr.Spec.QueueRef.Name != "" { if cr.Status.Queue == nil || cr.Status.ObjectStorage == nil || !reflect.DeepEqual(*cr.Status.Queue, queue.Spec) || !reflect.DeepEqual(*cr.Status.ObjectStorage, os.Spec) { mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) @@ -402,6 +406,10 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, // updates status after function completes cr.Status.Phase = enterpriseApi.PhaseError cr.Status.ClusterMasterPhase = enterpriseApi.PhaseError + if cr.Status.Replicas < cr.Spec.Replicas { + cr.Status.Queue = nil + cr.Status.ObjectStorage = nil + } cr.Status.Replicas = cr.Spec.Replicas cr.Status.Selector = fmt.Sprintf("app.kubernetes.io/instance=splunk-%s-indexer", cr.GetName()) if cr.Status.Peers == nil { @@ -594,7 +602,7 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, } } - // Large Message Store + // Object Storage os := enterpriseApi.ObjectStorage{} if cr.Spec.ObjectStorageRef.Name != "" { ns := cr.GetNamespace() @@ -618,7 +626,7 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, } } - // If bus is updated + // If queue is updated if cr.Spec.QueueRef.Name != "" { if cr.Status.Queue == nil || cr.Status.ObjectStorage == nil || !reflect.DeepEqual(*cr.Status.Queue, queue.Spec) || !reflect.DeepEqual(*cr.Status.ObjectStorage, os.Spec) { mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) diff --git a/pkg/splunk/enterprise/ingestorcluster.go b/pkg/splunk/enterprise/ingestorcluster.go index 62693e1b5..78a51ede2 100644 --- a/pkg/splunk/enterprise/ingestorcluster.go +++ b/pkg/splunk/enterprise/ingestorcluster.go @@ -71,7 +71,10 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr // Update the CR Status defer updateCRStatus(ctx, client, cr, &err) - + if cr.Status.Replicas < cr.Spec.Replicas { + cr.Status.Queue = nil + cr.Status.ObjectStorage = nil + } cr.Status.Replicas = cr.Spec.Replicas // If needed, migrate the app framework status @@ -231,7 +234,7 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr } } - // Large Message Store + // Object Storage os := enterpriseApi.ObjectStorage{} if cr.Spec.ObjectStorageRef.Name != "" { ns := cr.GetNamespace() @@ -255,7 +258,7 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr } } - // If bus is updated + // If queue is updated if cr.Status.Queue == nil || cr.Status.ObjectStorage == nil || !reflect.DeepEqual(*cr.Status.Queue, queue.Spec) || !reflect.DeepEqual(*cr.Status.ObjectStorage, os.Spec) { mgr := newIngestorClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) err = mgr.handlePushQueueChange(ctx, cr, queueCopy, osCopy, client) @@ -439,9 +442,9 @@ func (mgr *ingestorClusterPodManager) handlePushQueueChange(ctx context.Context, return updateErr } -// getChangedBusFieldsForIngestor returns a list of changed bus and pipeline fields for ingestor pods +// getChangedQueueFieldsForIngestor returns a list of changed queue and pipeline fields for ingestor pods func getChangedQueueFieldsForIngestor(queue *enterpriseApi.Queue, os *enterpriseApi.ObjectStorage, queueStatus *enterpriseApi.QueueSpec, osStatus *enterpriseApi.ObjectStorageSpec, afterDelete bool, s3AccessKey, s3SecretKey string) (queueChangedFields, pipelineChangedFields [][]string) { - // Push changed bus fields + // Push changed queue fields queueChangedFields = pushQueueChanged(queueStatus, &queue.Spec, osStatus, &os.Spec, afterDelete, s3AccessKey, s3SecretKey) // Always changed pipeline fields diff --git a/pkg/splunk/enterprise/types.go b/pkg/splunk/enterprise/types.go index fe96430e4..4267662d8 100644 --- a/pkg/splunk/enterprise/types.go +++ b/pkg/splunk/enterprise/types.go @@ -66,7 +66,7 @@ const ( // SplunkQueue is the queue instance SplunkQueue InstanceType = "queue" - // SplunkObjectStorage is the large message store instance + // SplunkObjectStorage is the object storage instance SplunkObjectStorage InstanceType = "object-storage" // SplunkDeployer is an instance that distributes baseline configurations and apps to search head cluster members diff --git a/test/index_and_ingestion_separation/index_and_ingestion_separation_suite_test.go b/test/index_and_ingestion_separation/index_and_ingestion_separation_suite_test.go index 86231df14..8aac52220 100644 --- a/test/index_and_ingestion_separation/index_and_ingestion_separation_suite_test.go +++ b/test/index_and_ingestion_separation/index_and_ingestion_separation_suite_test.go @@ -42,29 +42,29 @@ var ( queue = enterpriseApi.QueueSpec{ Provider: "sqs", SQS: enterpriseApi.SQSSpec{ - Name: "test-queue", + Name: "index-ingest-separation-test-q", AuthRegion: "us-west-2", Endpoint: "https://sqs.us-west-2.amazonaws.com", - DLQ: "test-dead-letter-queue", + DLQ: "index-ingest-separation-test-dlq", }, } objectStorage = enterpriseApi.ObjectStorageSpec{ Provider: "s3", S3: enterpriseApi.S3Spec{ Endpoint: "https://s3.us-west-2.amazonaws.com", - Path: "s3://test-bucket/smartbus-test", + Path: "s3://index-ingest-separation-test-bucket/smartbus-test", }, } serviceAccountName = "index-ingest-sa" inputs = []string{ - "[remote_queue:test-queue]", + "[remote_queue:index-ingest-separation-test-q]", "remote_queue.type = sqs_smartbus", "remote_queue.sqs_smartbus.auth_region = us-west-2", - "remote_queue.sqs_smartbus.dead_letter_queue.name = test-dead-letter-queue", + "remote_queue.sqs_smartbus.dead_letter_queue.name = index-ingest-separation-test-dlq", "remote_queue.sqs_smartbus.endpoint = https://sqs.us-west-2.amazonaws.com", "remote_queue.sqs_smartbus.large_message_store.endpoint = https://s3.us-west-2.amazonaws.com", - "remote_queue.sqs_smartbus.large_message_store.path = s3://test-bucket/smartbus-test", + "remote_queue.sqs_smartbus.large_message_store.path = s3://index-ingest-separation-test-bucket/smartbus-test", "remote_queue.sqs_smartbus.retry_policy = max_count", "remote_queue.sqs_smartbus.max_count.max_retries_per_part = 4"} outputs = append(inputs, "remote_queue.sqs_smartbus.encoding_format = s2s", "remote_queue.sqs_smartbus.send_interval = 5s") @@ -88,21 +88,21 @@ var ( updateQueue = enterpriseApi.QueueSpec{ Provider: "sqs", SQS: enterpriseApi.SQSSpec{ - Name: "test-queue-updated", + Name: "index-ingest-separation-test-q-updated", AuthRegion: "us-west-2", Endpoint: "https://sqs.us-west-2.amazonaws.com", - DLQ: "test-dead-letter-queue-updated", + DLQ: "index-ingest-separation-test-dlq-updated", }, } updatedInputs = []string{ - "[remote_queue:test-queue-updated]", + "[remote_queue:index-ingest-separation-test-q-updated]", "remote_queue.type = sqs_smartbus", "remote_queue.sqs_smartbus.auth_region = us-west-2", - "remote_queue.sqs_smartbus.dead_letter_queue.name = test-dead-letter-queue-updated", + "remote_queue.sqs_smartbus.dead_letter_queue.name = index-ingest-separation-test-dlq-updated", "remote_queue.sqs_smartbus.endpoint = https://sqs.us-west-2.amazonaws.com", "remote_queue.sqs_smartbus.large_message_store.endpoint = https://s3.us-west-2.amazonaws.com", - "remote_queue.sqs_smartbus.large_message_store.path = s3://test-bucket-updated/smartbus-test", + "remote_queue.sqs_smartbus.large_message_store.path = s3://index-ingest-separation-test-bucket/smartbus-test", "remote_queue.sqs_smartbus.retry_policy = max", "remote_queue.max.sqs_smartbus.max_retries_per_part = 5"} updatedOutputs = append(updatedInputs, "remote_queue.sqs_smartbus.encoding_format = s2s", "remote_queue.sqs_smartbus.send_interval = 4s") @@ -116,9 +116,9 @@ var ( updatedDefaultsIngest = append(updatedDefaultsAll, "[pipeline:indexerPipe]\ndisabled = true") inputsShouldNotContain = []string{ - "[remote_queue:test-queue]", - "remote_queue.sqs_smartbus.dead_letter_queue.name = test-dead-letter-queue", - "remote_queue.sqs_smartbus.large_message_store.path = s3://test-bucket/smartbus-test", + "[remote_queue:index-ingest-separation-test-q]", + "remote_queue.sqs_smartbus.dead_letter_queue.name = index-ingest-separation-test-dlq", + "remote_queue.sqs_smartbus.large_message_store.path = s3://index-ingest-separation-test-bucket/smartbus-test", "remote_queue.sqs_smartbus.retry_policy = max_count", "remote_queue.sqs_smartbus.max_count.max_retries_per_part = 4"} outputsShouldNotContain = append(inputs, "remote_queue.sqs_smartbus.send_interval = 5s") diff --git a/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go b/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go index b5e0449f8..85069a071 100644 --- a/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go +++ b/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go @@ -75,13 +75,13 @@ var _ = Describe("indingsep test", func() { Context("Ingestor and Indexer deployment", func() { It("indingsep, smoke, indingsep: Splunk Operator can deploy Ingestors and Indexers", func() { + // TODO: Remove secret reference and uncomment serviceAccountName part once IRSA fixed for Splunk and EKS 1.34+ // Create Service Account - testcaseEnvInst.Log.Info("Create Service Account") - testcaseEnvInst.CreateServiceAccount(serviceAccountName) + // testcaseEnvInst.Log.Info("Create Service Account") + // testcaseEnvInst.CreateServiceAccount(serviceAccountName) - // TODO: Remove secret reference once IRSA fixed for Splunk and EKS 1.34+ // Secret reference - volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateBusVolumeSpec("bus-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} + volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateQueueVolumeSpec("queue-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} queue.SQS.VolList = volumeSpec updateQueue.SQS.VolList = volumeSpec @@ -97,7 +97,7 @@ var _ = Describe("indingsep test", func() { // Deploy Ingestor Cluster testcaseEnvInst.Log.Info("Deploy Ingestor Cluster") - _, err = deployment.DeployIngestorCluster(ctx, deployment.GetName()+"-ingest", 3, v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, serviceAccountName) + _, err = deployment.DeployIngestorCluster(ctx, deployment.GetName()+"-ingest", 3, v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, "") // , serviceAccountName) Expect(err).To(Succeed(), "Unable to deploy Ingestor Cluster") // Deploy Cluster Manager @@ -107,7 +107,7 @@ var _ = Describe("indingsep test", func() { // Deploy Indexer Cluster testcaseEnvInst.Log.Info("Deploy Indexer Cluster") - _, err = deployment.DeployIndexerCluster(ctx, deployment.GetName()+"-idxc", "", 3, deployment.GetName(), "", v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, serviceAccountName) + _, err = deployment.DeployIndexerCluster(ctx, deployment.GetName()+"-idxc", "", 3, deployment.GetName(), "", v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, "") // , serviceAccountName) Expect(err).To(Succeed(), "Unable to deploy Indexer Cluster") // Ensure that Ingestor Cluster is in Ready phase @@ -137,11 +137,11 @@ var _ = Describe("indingsep test", func() { Expect(err).To(Succeed(), "Unable to delete Ingestor Cluster instance", "Ingestor Cluster Name", ingest) // Delete the Queue - queue := &enterpriseApi.Queue{} - err = deployment.GetInstance(ctx, "queue", queue) - Expect(err).To(Succeed(), "Unable to get Queue instance", "Queue Name", queue) - err = deployment.DeleteCR(ctx, queue) - Expect(err).To(Succeed(), "Unable to delete Queue", "Queue Name", queue) + q = &enterpriseApi.Queue{} + err = deployment.GetInstance(ctx, "queue", q) + Expect(err).To(Succeed(), "Unable to get Queue instance", "Queue Name", q) + err = deployment.DeleteCR(ctx, q) + Expect(err).To(Succeed(), "Unable to delete Queue", "Queue Name", q) // Delete the ObjectStorage objStorage = &enterpriseApi.ObjectStorage{} @@ -154,13 +154,13 @@ var _ = Describe("indingsep test", func() { Context("Ingestor and Indexer deployment", func() { It("indingsep, smoke, indingsep: Splunk Operator can deploy Ingestors and Indexers with additional configurations", func() { + // TODO: Remove secret reference and uncomment serviceAccountName part once IRSA fixed for Splunk and EKS 1.34+ // Create Service Account - testcaseEnvInst.Log.Info("Create Service Account") - testcaseEnvInst.CreateServiceAccount(serviceAccountName) + // testcaseEnvInst.Log.Info("Create Service Account") + // testcaseEnvInst.CreateServiceAccount(serviceAccountName) - // TODO: Remove secret reference once IRSA fixed for Splunk and EKS 1.34+ // Secret reference - volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateBusVolumeSpec("bus-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} + volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateQueueVolumeSpec("queue-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} queue.SQS.VolList = volumeSpec updateQueue.SQS.VolList = volumeSpec @@ -174,24 +174,19 @@ var _ = Describe("indingsep test", func() { objStorage, err := deployment.DeployObjectStorage(ctx, "os", objectStorage) Expect(err).To(Succeed(), "Unable to deploy ObjectStorage") - // Upload apps to S3 - testcaseEnvInst.Log.Info("Upload apps to S3") - appFileList := testenv.GetAppFileList(appListV1) - _, err = testenv.UploadFilesToS3(testS3Bucket, s3TestDir, appFileList, downloadDirV1) - Expect(err).To(Succeed(), "Unable to upload V1 apps to S3 test directory for IngestorCluster") - // Deploy Ingestor Cluster with additional configurations (similar to standalone app framework test) appSourceName := "appframework-" + enterpriseApi.ScopeLocal + testenv.RandomDNSName(3) appFrameworkSpec := testenv.GenerateAppFrameworkSpec(ctx, testcaseEnvInst, appSourceVolumeName, enterpriseApi.ScopeLocal, appSourceName, s3TestDir, 60) appFrameworkSpec.MaxConcurrentAppDownloads = uint64(5) ic := &enterpriseApi.IngestorCluster{ ObjectMeta: metav1.ObjectMeta{ - Name: deployment.GetName() + "-ingest", - Namespace: testcaseEnvInst.GetName(), + Name: deployment.GetName() + "-ingest", + Namespace: testcaseEnvInst.GetName(), + Finalizers: []string{"enterprise.splunk.com/delete-pvc"}, }, Spec: enterpriseApi.IngestorClusterSpec{ CommonSplunkSpec: enterpriseApi.CommonSplunkSpec{ - ServiceAccount: serviceAccountName, + // ServiceAccount: serviceAccountName, LivenessInitialDelaySeconds: 600, ReadinessInitialDelaySeconds: 50, StartupProbe: &enterpriseApi.Probe{ @@ -217,10 +212,10 @@ var _ = Describe("indingsep test", func() { Image: testcaseEnvInst.GetSplunkImage(), }, }, - QueueRef: v1.ObjectReference{Name: q.Name}, - ObjectStorageRef: v1.ObjectReference{Name: objStorage.Name}, - Replicas: 3, - AppFrameworkConfig: appFrameworkSpec, + QueueRef: v1.ObjectReference{Name: q.Name}, + ObjectStorageRef: v1.ObjectReference{Name: objStorage.Name}, + Replicas: 3, + AppFrameworkConfig: appFrameworkSpec, }, } @@ -232,6 +227,12 @@ var _ = Describe("indingsep test", func() { testcaseEnvInst.Log.Info("Ensure that Ingestor Cluster is in Ready phase") testenv.IngestorReady(ctx, deployment, testcaseEnvInst) + // Upload apps to S3 + testcaseEnvInst.Log.Info("Upload apps to S3") + appFileList := testenv.GetAppFileList(appListV1) + _, err = testenv.UploadFilesToS3(testS3Bucket, s3TestDir, appFileList, downloadDirV1) + Expect(err).To(Succeed(), "Unable to upload V1 apps to S3 test directory for IngestorCluster") + // Verify Ingestor Cluster Pods have apps installed testcaseEnvInst.Log.Info("Verify Ingestor Cluster Pods have apps installed") ingestorPod := []string{fmt.Sprintf(testenv.IngestorPod, deployment.GetName()+"-ingest", 0)} @@ -264,15 +265,15 @@ var _ = Describe("indingsep test", func() { Context("Ingestor and Indexer deployment", func() { It("indingsep, integration, indingsep: Splunk Operator can deploy Ingestors and Indexers with correct setup", func() { + // TODO: Remove secret reference and uncomment serviceAccountName part once IRSA fixed for Splunk and EKS 1.34+ // Create Service Account - testcaseEnvInst.Log.Info("Create Service Account") - testcaseEnvInst.CreateServiceAccount(serviceAccountName) + // testcaseEnvInst.Log.Info("Create Service Account") + // testcaseEnvInst.CreateServiceAccount(serviceAccountName) - // TODO: Remove secret reference once IRSA fixed for Splunk and EKS 1.34+ // Secret reference - volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateBusVolumeSpec("bus-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} + volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateQueueVolumeSpec("queue-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} queue.SQS.VolList = volumeSpec - + // Deploy Queue testcaseEnvInst.Log.Info("Deploy Queue") q, err := deployment.DeployQueue(ctx, "queue", queue) @@ -285,7 +286,7 @@ var _ = Describe("indingsep test", func() { // Deploy Ingestor Cluster testcaseEnvInst.Log.Info("Deploy Ingestor Cluster") - _, err = deployment.DeployIngestorCluster(ctx, deployment.GetName()+"-ingest", 3, v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, serviceAccountName) + _, err = deployment.DeployIngestorCluster(ctx, deployment.GetName()+"-ingest", 3, v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, "") // , serviceAccountName) Expect(err).To(Succeed(), "Unable to deploy Ingestor Cluster") // Deploy Cluster Manager @@ -295,7 +296,7 @@ var _ = Describe("indingsep test", func() { // Deploy Indexer Cluster testcaseEnvInst.Log.Info("Deploy Indexer Cluster") - _, err = deployment.DeployIndexerCluster(ctx, deployment.GetName()+"-idxc", "", 3, deployment.GetName(), "", v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, serviceAccountName) + _, err = deployment.DeployIndexerCluster(ctx, deployment.GetName()+"-idxc", "", 3, deployment.GetName(), "", v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, "") // , serviceAccountName) Expect(err).To(Succeed(), "Unable to deploy Indexer Cluster") // Ensure that Ingestor Cluster is in Ready phase @@ -376,13 +377,13 @@ var _ = Describe("indingsep test", func() { Context("Ingestor and Indexer deployment", func() { It("indingsep, integration, indingsep: Splunk Operator can update Ingestors and Indexers with correct setup", func() { + // TODO: Remove secret reference and uncomment serviceAccountName part once IRSA fixed for Splunk and EKS 1.34+ // Create Service Account - testcaseEnvInst.Log.Info("Create Service Account") - testcaseEnvInst.CreateServiceAccount(serviceAccountName) + // testcaseEnvInst.Log.Info("Create Service Account") + // testcaseEnvInst.CreateServiceAccount(serviceAccountName) - // TODO: Remove secret reference once IRSA fixed for Splunk and EKS 1.34+ // Secret reference - volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateBusVolumeSpec("bus-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} + volumeSpec := []enterpriseApi.VolumeSpec{testenv.GenerateQueueVolumeSpec("queue-secret-ref-volume", testcaseEnvInst.GetIndexSecretName())} queue.SQS.VolList = volumeSpec updateQueue.SQS.VolList = volumeSpec @@ -398,7 +399,7 @@ var _ = Describe("indingsep test", func() { // Deploy Ingestor Cluster testcaseEnvInst.Log.Info("Deploy Ingestor Cluster") - _, err = deployment.DeployIngestorCluster(ctx, deployment.GetName()+"-ingest", 3, v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, serviceAccountName) + _, err = deployment.DeployIngestorCluster(ctx, deployment.GetName()+"-ingest", 3, v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, "") // , serviceAccountName) Expect(err).To(Succeed(), "Unable to deploy Ingestor Cluster") // Deploy Cluster Manager @@ -408,7 +409,7 @@ var _ = Describe("indingsep test", func() { // Deploy Indexer Cluster testcaseEnvInst.Log.Info("Deploy Indexer Cluster") - _, err = deployment.DeployIndexerCluster(ctx, deployment.GetName()+"-idxc", "", 3, deployment.GetName(), "", v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, serviceAccountName) + _, err = deployment.DeployIndexerCluster(ctx, deployment.GetName()+"-idxc", "", 3, deployment.GetName(), "", v1.ObjectReference{Name: q.Name}, v1.ObjectReference{Name: objStorage.Name}, "") // , serviceAccountName) Expect(err).To(Succeed(), "Unable to deploy Indexer Cluster") // Ensure that Ingestor Cluster is in Ready phase diff --git a/test/testenv/remote_index_utils.go b/test/testenv/remote_index_utils.go index 84e5c0709..f696a4a17 100644 --- a/test/testenv/remote_index_utils.go +++ b/test/testenv/remote_index_utils.go @@ -86,8 +86,8 @@ func RollHotToWarm(ctx context.Context, deployment *Deployment, podName string, return true } -// GeneratBusVolumeSpec return VolumeSpec struct with given values -func GenerateBusVolumeSpec(name, secretRef string) enterpriseApi.VolumeSpec { +// GenerateQueueVolumeSpec return VolumeSpec struct with given values +func GenerateQueueVolumeSpec(name, secretRef string) enterpriseApi.VolumeSpec { return enterpriseApi.VolumeSpec{ Name: name, SecretRef: secretRef, diff --git a/test/testenv/util.go b/test/testenv/util.go index d9c6d5807..366ea3668 100644 --- a/test/testenv/util.go +++ b/test/testenv/util.go @@ -396,8 +396,8 @@ func newIndexerCluster(name, ns, licenseManagerName string, replicas int, cluste }, Defaults: ansibleConfig, }, - Replicas: int32(replicas), - QueueRef: queue, + Replicas: int32(replicas), + QueueRef: queue, ObjectStorageRef: os, }, } @@ -426,8 +426,8 @@ func newIngestorCluster(name, ns string, replicas int, splunkImage string, queue Image: splunkImage, }, }, - Replicas: int32(replicas), - QueueRef: queue, + Replicas: int32(replicas), + QueueRef: queue, ObjectStorageRef: os, }, }