diff --git a/cmd/main.go b/cmd/main.go index 7fe545b29..bbb2d3534 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -26,14 +26,11 @@ import ( "path/filepath" "time" - "sigs.k8s.io/controller-runtime/pkg/metrics/filters" - intController "github.com/splunk/splunk-operator/internal/controller" "github.com/splunk/splunk-operator/internal/controller/debug" "github.com/splunk/splunk-operator/pkg/config" "github.com/splunk/splunk-operator/pkg/logging" "github.com/splunk/splunk-operator/pkg/splunk/enterprise/validation" - "sigs.k8s.io/controller-runtime/pkg/certwatcher" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. @@ -47,14 +44,15 @@ import ( clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/certwatcher" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/metrics/filters" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" enterpriseApiV3 "github.com/splunk/splunk-operator/api/v3" enterpriseApi "github.com/splunk/splunk-operator/api/v4" - "github.com/splunk/splunk-operator/internal/controller" //+kubebuilder:scaffold:imports //extapi "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" ) @@ -299,7 +297,7 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "Standalone") os.Exit(1) } - if err := (&controller.IngestorClusterReconciler{ + if err := (&intController.IngestorClusterReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Recorder: mgr.GetEventRecorderFor("ingestorcluster-controller"), diff --git a/pkg/splunk/client/enterprise.go b/pkg/splunk/client/enterprise.go index 9291484cb..9025102b1 100644 --- a/pkg/splunk/client/enterprise.go +++ b/pkg/splunk/client/enterprise.go @@ -17,17 +17,18 @@ package client import ( "bytes" + "context" "crypto/tls" "encoding/json" "fmt" "io" + "log/slog" "net/http" "regexp" "strconv" "strings" "time" - "github.com/go-logr/logr" splcommon "github.com/splunk/splunk-operator/pkg/splunk/common" ) @@ -1035,23 +1036,22 @@ func (c *SplunkClient) RestartSplunk() error { // Updates conf files and their properties // See https://help.splunk.com/en/splunk-enterprise/leverage-rest-apis/rest-api-reference/10.0/configuration-endpoints/configuration-endpoint-descriptions -func (c *SplunkClient) UpdateConfFile(scopedLog logr.Logger, fileName, property string, propertyKVList [][]string) error { +func (c *SplunkClient) UpdateConfFile(ctx context.Context, logger *slog.Logger, fileName, property string, propertyKVList [][]string) error { // Creates an object in a conf file if it doesn't exist endpoint := fmt.Sprintf("%s/servicesNS/nobody/system/configs/conf-%s", c.ManagementURI, fileName) body := fmt.Sprintf("name=%s", property) - scopedLog.Info("Creating conf file object if it does not exist", "fileName", fileName, "property", property) + logger.InfoContext(ctx, "Creating conf file object if it does not exist", "fileName", fileName, "property", property) request, err := http.NewRequest("POST", endpoint, strings.NewReader(body)) if err != nil { - scopedLog.Error(err, "Failed to create conf file object if it does not exist", "fileName", fileName, "property", property) + logger.ErrorContext(ctx, "Failed to create conf file object if it does not exist", "fileName", fileName, "property", property, "error", err.Error()) return err } - scopedLog.Info("Validating conf file object creation", "fileName", fileName, "property", property) expectedStatus := []int{200, 201, 409} err = c.Do(request, expectedStatus, nil) if err != nil { - scopedLog.Error(err, fmt.Sprintf("Status not in %v for conf file object creation", expectedStatus), "fileName", fileName, "property", property) + logger.ErrorContext(ctx, fmt.Sprintf("Status not in %v for conf file object creation", expectedStatus), "fileName", fileName, "property", property, "error", err.Error()) return err } @@ -1065,18 +1065,17 @@ func (c *SplunkClient) UpdateConfFile(scopedLog logr.Logger, fileName, property body = body[:len(body)-1] } - scopedLog.Info("Updating conf file object", "fileName", fileName, "property", property, "body", body) + logger.DebugContext(ctx, "Updating conf file object", "fileName", fileName, "property", property) request, err = http.NewRequest("POST", endpoint, strings.NewReader(body)) if err != nil { - scopedLog.Error(err, "Failed to update conf file object", "fileName", fileName, "property", property, "body", body) + logger.ErrorContext(ctx, "Failed to update conf file object", "fileName", fileName, "property", property, "error", err.Error()) return err } - scopedLog.Info("Validating conf file object update", "fileName", fileName, "property", property) expectedStatus = []int{200, 201} err = c.Do(request, expectedStatus, nil) if err != nil { - scopedLog.Error(err, fmt.Sprintf("Status not in %v for conf file object update", expectedStatus), "fileName", fileName, "property", property, "body", body) + logger.ErrorContext(ctx, fmt.Sprintf("Status not in %v for conf file object update", expectedStatus), "fileName", fileName, "property", property, "error", err.Error()) } return err } diff --git a/pkg/splunk/client/enterprise_test.go b/pkg/splunk/client/enterprise_test.go index 2c902d537..9a7322790 100644 --- a/pkg/splunk/client/enterprise_test.go +++ b/pkg/splunk/client/enterprise_test.go @@ -24,8 +24,8 @@ import ( "strings" "testing" + "github.com/splunk/splunk-operator/pkg/logging" splcommon "github.com/splunk/splunk-operator/pkg/splunk/common" - "sigs.k8s.io/controller-runtime/pkg/log" spltest "github.com/splunk/splunk-operator/pkg/splunk/test" ) @@ -704,8 +704,8 @@ func TestUpdateConfFile(t *testing.T) { value := "myvalue" fileName := "outputs" - reqLogger := log.FromContext(context.TODO()) - scopedLog := reqLogger.WithName("TestUpdateConfFile") + ctx := context.TODO() + logger := logging.FromContext(ctx).With("func", "TestUpdateConfFile", "name", "test", "namespace", "test") // First request: create the property (object) if it doesn't exist createBody := strings.NewReader(fmt.Sprintf("name=%s", property)) @@ -722,7 +722,7 @@ func TestUpdateConfFile(t *testing.T) { c := NewSplunkClient("https://localhost:8089", "admin", "p@ssw0rd") c.Client = mockSplunkClient - err := c.UpdateConfFile(scopedLog, fileName, property, [][]string{{key, value}}) + err := c.UpdateConfFile(ctx, logger, fileName, property, [][]string{{key, value}}) if err != nil { t.Errorf("UpdateConfFile err = %v", err) } @@ -732,7 +732,7 @@ func TestUpdateConfFile(t *testing.T) { mockSplunkClient = &spltest.MockHTTPClient{} mockSplunkClient.AddHandler(wantCreateRequest, 500, "", nil) c.Client = mockSplunkClient - err = c.UpdateConfFile(scopedLog, fileName, property, [][]string{{key, value}}) + err = c.UpdateConfFile(ctx, logger, fileName, property, [][]string{{key, value}}) if err == nil { t.Errorf("UpdateConfFile expected error on create, got nil") } @@ -742,7 +742,7 @@ func TestUpdateConfFile(t *testing.T) { mockSplunkClient.AddHandler(wantCreateRequest, 201, "", nil) mockSplunkClient.AddHandler(wantUpdateRequest, 500, "", nil) c.Client = mockSplunkClient - err = c.UpdateConfFile(scopedLog, fileName, property, [][]string{{key, value}}) + err = c.UpdateConfFile(ctx, logger, fileName, property, [][]string{{key, value}}) if err == nil { t.Errorf("UpdateConfFile expected error on update, got nil") } diff --git a/pkg/splunk/enterprise/indexercluster.go b/pkg/splunk/enterprise/indexercluster.go index f84756067..6700ccbfc 100644 --- a/pkg/splunk/enterprise/indexercluster.go +++ b/pkg/splunk/enterprise/indexercluster.go @@ -19,6 +19,7 @@ import ( "context" "errors" "fmt" + "log/slog" "regexp" "sort" "strconv" @@ -27,8 +28,8 @@ import ( enterpriseApi "github.com/splunk/splunk-operator/api/v4" - "github.com/go-logr/logr" enterpriseApiV3 "github.com/splunk/splunk-operator/api/v3" + "github.com/splunk/splunk-operator/pkg/logging" splclient "github.com/splunk/splunk-operator/pkg/splunk/client" splcommon "github.com/splunk/splunk-operator/pkg/splunk/common" splctrl "github.com/splunk/splunk-operator/pkg/splunk/splkcontroller" @@ -52,8 +53,8 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller Requeue: true, RequeueAfter: time.Second * 5, } - reqLogger := log.FromContext(ctx) - scopedLog := reqLogger.WithName("ApplyIndexerClusterManager").WithValues("name", cr.GetName(), "namespace", cr.GetNamespace()) + + logger := logging.FromContext(ctx).With("func", "ApplyIndexerClusterManager", "name", cr.GetName(), "namespace", cr.GetNamespace()) eventPublisher := GetEventPublisher(ctx, cr) ctx = context.WithValue(ctx, splcommon.EventPublisherKey, eventPublisher) @@ -70,13 +71,14 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller err = validateIndexerClusterSpec(ctx, client, cr) if err != nil { eventPublisher.Warning(ctx, "validateIndexerClusterSpec", fmt.Sprintf("validate indexercluster spec failed %s", err.Error())) - scopedLog.Error(err, "Failed to validate indexercluster spec") + logger.ErrorContext(ctx, "Failed to validate indexercluster spec", "error", err.Error()) return result, err } // updates status after function completes cr.Status.ClusterManagerPhase = enterpriseApi.PhaseError if cr.Status.Replicas < cr.Spec.Replicas { + logger.InfoContext(ctx, "Scaling up indexer cluster", "previousReplicas", cr.Status.Replicas, "newReplicas", cr.Spec.Replicas) cr.Status.CredentialSecretVersion = "0" cr.Status.ServiceAccount = "" } @@ -95,7 +97,7 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller // create or update general config resources namespaceScopedSecret, err := ApplySplunkConfig(ctx, client, cr, cr.Spec.CommonSplunkSpec, SplunkIndexer) if err != nil { - scopedLog.Error(err, "create or update general config failed", "error", err.Error()) + logger.ErrorContext(ctx, "create or update general config failed", "error", err.Error()) eventPublisher.Warning(ctx, "ApplySplunkConfig", fmt.Sprintf("create or update general config failed with error %s", err.Error())) return result, err } @@ -115,11 +117,11 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller cr.Status.ClusterManagerPhase = managerIdxCluster.Status.Phase } } else { - scopedLog.Error(nil, "The configured clusterMasterRef doesn't exist", "clusterManagerRef", cr.Spec.ClusterManagerRef.Name) + logger.WarnContext(ctx, "The configured clusterMasterRef doesn't exist", "clusterManagerRef", cr.Spec.ClusterManagerRef.Name) cr.Status.ClusterManagerPhase = enterpriseApi.PhaseError } - mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) + mgr := newIndexerClusterPodManager(logger, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) // Check if we have configured enough number(<= RF) of replicas if mgr.cr.Status.ClusterManagerPhase == enterpriseApi.PhaseReady { err = VerifyRFPeers(ctx, mgr, client) @@ -247,35 +249,47 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller if cr.Status.Phase == enterpriseApi.PhaseReady { qosCfg, err := ResolveQueueAndObjectStorage(ctx, client, cr, cr.Spec.QueueRef, cr.Spec.ObjectStorageRef, cr.Spec.ServiceAccount) if err != nil { - scopedLog.Error(err, "Failed to resolve Queue/ObjectStorage config") + logger.ErrorContext(ctx, "Failed to resolve Queue/ObjectStorage config", "error", err.Error()) return result, err } + logger.DebugContext(ctx, "Resolved Queue/ObjectStorage config", "queue", qosCfg.Queue, "objectStorage", qosCfg.OS, "version", qosCfg.Version, "serviceAccount", cr.Spec.ServiceAccount) secretChanged := cr.Status.CredentialSecretVersion != qosCfg.Version serviceAccountChanged := cr.Status.ServiceAccount != cr.Spec.ServiceAccount + logger.InfoContext(ctx, "Checking for changes", "previousCredentialSecretVersion", cr.Status.CredentialSecretVersion, "previousServiceAccount", cr.Status.ServiceAccount, "secretChanged", secretChanged, "serviceAccountChanged", serviceAccountChanged) + // If queue is updated if cr.Spec.QueueRef.Name != "" { if secretChanged || serviceAccountChanged { - mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) + mgr := newIndexerClusterPodManager(logger, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) err = mgr.updateIndexerConfFiles(ctx, cr, &qosCfg.Queue, &qosCfg.OS, qosCfg.AccessKey, qosCfg.SecretKey, client) if err != nil { - eventPublisher.Warning(ctx, "ApplyIndexerClusterManager", fmt.Sprintf("Failed to update conf file for Queue/Pipeline config change after pod creation: %s", err.Error())) - scopedLog.Error(err, "Failed to update conf file for Queue/Pipeline config change after pod creation") + eventPublisher.Warning(ctx, "UpdateConfFilesFailure", fmt.Sprintf("failed to update conf file for Queue/Pipeline config due to %s", err.Error())) + logger.ErrorContext(ctx, "Failed to update conf file for Queue/Pipeline config", "error", err.Error()) return result, err } + eventPublisher.Normal(ctx, "QueueConfigUpdated", + fmt.Sprintf("Queue/Pipeline configuration updated for %d indexers", cr.Spec.Replicas)) + logger.InfoContext(ctx, "Queue/Pipeline configuration updated", "readyReplicas", cr.Status.ReadyReplicas) + for i := int32(0); i < cr.Spec.Replicas; i++ { idxcClient := mgr.getClient(ctx, i) err = idxcClient.RestartSplunk() if err != nil { return result, err } - scopedLog.Info("Restarted splunk", "indexer", i) + logger.DebugContext(ctx, "Restarted splunk", "indexer", i) } + eventPublisher.Normal(ctx, "IndexersRestarted", + fmt.Sprintf("Restarted Splunk on %d indexer pods", cr.Spec.Replicas)) + cr.Status.CredentialSecretVersion = qosCfg.Version cr.Status.ServiceAccount = cr.Spec.ServiceAccount + + logger.InfoContext(ctx, "Updated status", "credentialSecretVersion", cr.Status.CredentialSecretVersion, "serviceAccount", cr.Status.ServiceAccount) } } @@ -299,7 +313,7 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller } } if len(cr.Spec.MonitoringConsoleRef.Name) > 0 && (cr.Spec.MonitoringConsoleRef.Name != cmMonitoringConsoleConfigRef) { - scopedLog.Info("Indexer Cluster CR should not specify monitoringConsoleRef and if specified, should be similar to cluster manager spec") + logger.WarnContext(ctx, "Indexer Cluster CR should not specify monitoringConsoleRef and if specified, should be similar to cluster manager spec") } } if len(cr.Status.IndexerSecretChanged) > 0 { @@ -326,7 +340,7 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller result.Requeue = false // Set indexer cluster CR as owner reference for clustermanager - scopedLog.Info("Setting indexer cluster as owner for cluster manager") + logger.InfoContext(ctx, "Setting indexer cluster as owner for cluster manager") if len(cr.Spec.ClusterManagerRef.Name) > 0 { namespacedName = types.NamespacedName{Namespace: cr.GetNamespace(), Name: GetSplunkStatefulsetName(SplunkClusterManager, cr.Spec.ClusterManagerRef.Name)} } @@ -353,8 +367,7 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, Requeue: true, RequeueAfter: time.Second * 5, } - reqLogger := log.FromContext(ctx) - scopedLog := reqLogger.WithName("ApplyIndexerCluster") + logger := logging.FromContext(ctx).With("func", "ApplyIndexerCluster", "name", cr.GetName(), "namespace", cr.GetNamespace()) eventPublisher := GetEventPublisher(ctx, cr) cr.Kind = "IndexerCluster" @@ -362,6 +375,8 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, // validate and updates defaults for CR err := validateIndexerClusterSpec(ctx, client, cr) if err != nil { + eventPublisher.Warning(ctx, "validateIndexerClusterSpec", fmt.Sprintf("validate indexercluster spec failed %s", err.Error())) + logger.ErrorContext(ctx, "Failed to validate indexercluster spec", "error", err.Error()) return result, err } @@ -369,6 +384,7 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, cr.Status.Phase = enterpriseApi.PhaseError cr.Status.ClusterMasterPhase = enterpriseApi.PhaseError if cr.Status.Replicas < cr.Spec.Replicas { + logger.InfoContext(ctx, "Scaling up indexer cluster", "previousReplicas", cr.Status.Replicas, "newReplicas", cr.Spec.Replicas) cr.Status.CredentialSecretVersion = "0" cr.Status.ServiceAccount = "" } @@ -390,7 +406,7 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, // create or update general config resources namespaceScopedSecret, err := ApplySplunkConfig(ctx, client, cr, cr.Spec.CommonSplunkSpec, SplunkIndexer) if err != nil { - scopedLog.Error(err, "create or update general config failed", "error", err.Error()) + logger.ErrorContext(ctx, "create or update general config failed", "error", err.Error()) eventPublisher.Warning(ctx, "ApplySplunkConfig", fmt.Sprintf("create or update general config failed with error %s", err.Error())) return result, err } @@ -413,7 +429,7 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, cr.Status.ClusterMasterPhase = enterpriseApi.PhaseError } - mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) + mgr := newIndexerClusterPodManager(logger, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) // Check if we have configured enough number(<= RF) of replicas if mgr.cr.Status.ClusterMasterPhase == enterpriseApi.PhaseReady { err = VerifyRFPeers(ctx, mgr, client) @@ -542,34 +558,46 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, if cr.Status.Phase == enterpriseApi.PhaseReady { qosCfg, err := ResolveQueueAndObjectStorage(ctx, client, cr, cr.Spec.QueueRef, cr.Spec.ObjectStorageRef, cr.Spec.ServiceAccount) if err != nil { - scopedLog.Error(err, "Failed to resolve Queue/ObjectStorage config") + logger.ErrorContext(ctx, "Failed to resolve Queue/ObjectStorage config", "error", err.Error()) return result, err } + logger.DebugContext(ctx, "Resolved Queue/ObjectStorage config", "queue", qosCfg.Queue, "objectStorage", qosCfg.OS, "version", qosCfg.Version, "serviceAccount", cr.Spec.ServiceAccount) secretChanged := cr.Status.CredentialSecretVersion != qosCfg.Version serviceAccountChanged := cr.Status.ServiceAccount != cr.Spec.ServiceAccount + logger.InfoContext(ctx, "Checking for changes", "previousCredentialSecretVersion", cr.Status.CredentialSecretVersion, "previousServiceAccount", cr.Status.ServiceAccount, "secretChanged", secretChanged, "serviceAccountChanged", serviceAccountChanged) + if cr.Spec.QueueRef.Name != "" { if secretChanged || serviceAccountChanged { - mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) + mgr := newIndexerClusterPodManager(logger, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) err = mgr.updateIndexerConfFiles(ctx, cr, &qosCfg.Queue, &qosCfg.OS, qosCfg.AccessKey, qosCfg.SecretKey, client) if err != nil { eventPublisher.Warning(ctx, "ApplyIndexerClusterManager", fmt.Sprintf("Failed to update conf file for Queue/Pipeline config change after pod creation: %s", err.Error())) - scopedLog.Error(err, "Failed to update conf file for Queue/Pipeline config change after pod creation") + logger.ErrorContext(ctx, "Failed to update conf file for Queue/Pipeline config change after pod creation", "error", err.Error()) return result, err } + eventPublisher.Normal(ctx, "QueueConfigUpdated", + fmt.Sprintf("Queue/Pipeline configuration updated for %d indexers", cr.Spec.Replicas)) + logger.InfoContext(ctx, "Queue/Pipeline configuration updated", "readyReplicas", cr.Status.ReadyReplicas) + for i := int32(0); i < cr.Spec.Replicas; i++ { idxcClient := mgr.getClient(ctx, i) err = idxcClient.RestartSplunk() if err != nil { return result, err } - scopedLog.Info("Restarted splunk", "indexer", i) + logger.DebugContext(ctx, "Restarted splunk", "indexer", i) } + eventPublisher.Normal(ctx, "IndexersRestarted", + fmt.Sprintf("Restarted Splunk on %d indexer pods", cr.Spec.Replicas)) + cr.Status.CredentialSecretVersion = qosCfg.Version cr.Status.ServiceAccount = cr.Spec.ServiceAccount + + logger.InfoContext(ctx, "Updated status", "credentialSecretVersion", cr.Status.CredentialSecretVersion, "serviceAccount", cr.Status.ServiceAccount) } } @@ -593,7 +621,7 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, } } if len(cr.Spec.MonitoringConsoleRef.Name) > 0 && (cr.Spec.MonitoringConsoleRef.Name != cmMonitoringConsoleConfigRef) { - scopedLog.Info("Indexer Cluster CR should not specify monitoringConsoleRef and if specified, should be similar to cluster master spec") + logger.WarnContext(ctx, "Indexer Cluster CR should not specify monitoringConsoleRef and if specified, should be similar to cluster master spec") } } if len(cr.Status.IndexerSecretChanged) > 0 { @@ -620,7 +648,7 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, result.Requeue = false // Set indexer cluster CR as owner reference for clustermaster - scopedLog.Info("Setting indexer cluster as owner for cluster master") + logger.InfoContext(ctx, "Setting indexer cluster as owner for cluster master") namespacedName = types.NamespacedName{Namespace: cr.GetNamespace(), Name: GetSplunkStatefulsetName(SplunkClusterMaster, cr.Spec.ClusterMasterRef.Name)} err = splctrl.SetStatefulSetOwnerRef(ctx, client, cr, namespacedName) if err != nil { @@ -645,14 +673,14 @@ var VerifyRFPeers = func(ctx context.Context, mgr indexerClusterPodManager, clie // indexerClusterPodManager is used to manage the pods within an indexer cluster type indexerClusterPodManager struct { c splcommon.ControllerClient - log logr.Logger + log *slog.Logger cr *enterpriseApi.IndexerCluster secrets *corev1.Secret newSplunkClient func(managementURI, username, password string) *splclient.SplunkClient } // newIndexerClusterPodManager function to create pod manager this is added to write unit test case -var newIndexerClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IndexerCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc, c splcommon.ControllerClient) indexerClusterPodManager { +var newIndexerClusterPodManager = func(log *slog.Logger, cr *enterpriseApi.IndexerCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc, c splcommon.ControllerClient) indexerClusterPodManager { return indexerClusterPodManager{ log: log, cr: cr, @@ -887,8 +915,8 @@ func (mgr *indexerClusterPodManager) Update(ctx context.Context, c splcommon.Con // Get event publisher from context eventPublisher := GetEventPublisher(ctx, mgr.cr) - // Track last successful replica count to emit scale events after completion - previousReplicas := mgr.cr.Status.Replicas + // Track previous ready replicas for scaling events + previousReadyReplicas := mgr.cr.Status.ReadyReplicas // Assign client if mgr.c == nil { @@ -901,7 +929,7 @@ func (mgr *indexerClusterPodManager) Update(ctx context.Context, c splcommon.Con return enterpriseApi.PhaseError, err } } else { - mgr.log.Info("Cluster Manager is not ready yet", "reason ", err) + mgr.log.InfoContext(ctx, "Cluster Manager is not ready yet", "error", err) return enterpriseApi.PhaseError, err } @@ -927,17 +955,19 @@ func (mgr *indexerClusterPodManager) Update(ctx context.Context, c splcommon.Con return phase, err } - // Emit ScaledUp event only after a successful scale-up has completed + // Emit scale events when phase is ready and ready replicas changed to match desired if phase == enterpriseApi.PhaseReady { - if desiredReplicas > previousReplicas && mgr.cr.Status.Replicas == desiredReplicas { - if eventPublisher != nil { - eventPublisher.Normal(ctx, "ScaledUp", - fmt.Sprintf("Successfully scaled %s up from %d to %d replicas", mgr.cr.GetName(), previousReplicas, desiredReplicas)) - } - } else if desiredReplicas < previousReplicas && mgr.cr.Status.Replicas == desiredReplicas { - if eventPublisher != nil { - eventPublisher.Normal(ctx, "ScaledDown", - fmt.Sprintf("Successfully scaled %s down from %d to %d replicas", mgr.cr.GetName(), previousReplicas, desiredReplicas)) + if mgr.cr.Status.ReadyReplicas == desiredReplicas && previousReadyReplicas != desiredReplicas { + if desiredReplicas > previousReadyReplicas { + if eventPublisher != nil { + eventPublisher.Normal(ctx, "ScaledUp", + fmt.Sprintf("Successfully scaled %s up from %d to %d replicas", mgr.cr.GetName(), previousReadyReplicas, desiredReplicas)) + } + } else if desiredReplicas < previousReadyReplicas { + if eventPublisher != nil { + eventPublisher.Normal(ctx, "ScaledDown", + fmt.Sprintf("Successfully scaled %s down from %d to %d replicas", mgr.cr.GetName(), previousReadyReplicas, desiredReplicas)) + } } } } @@ -1331,8 +1361,7 @@ var newSplunkClientForQueuePipeline = splclient.NewSplunkClient // updateIndexerConfFiles checks if Queue or Pipeline inputs are created for the first time and updates the conf file if so func (mgr *indexerClusterPodManager) updateIndexerConfFiles(ctx context.Context, newCR *enterpriseApi.IndexerCluster, queue *enterpriseApi.QueueSpec, os *enterpriseApi.ObjectStorageSpec, accessKey, secretKey string, k8s rclient.Client) error { - reqLogger := log.FromContext(ctx) - scopedLog := reqLogger.WithName("updateIndexerConfFiles").WithValues("name", newCR.GetName(), "namespace", newCR.GetNamespace()) + logger := logging.FromContext(ctx).With("func", "updateIndexerConfFiles", "name", newCR.GetName(), "namespace", newCR.GetNamespace()) // Only update config for pods that exist readyReplicas := newCR.Status.ReadyReplicas @@ -1351,19 +1380,26 @@ func (mgr *indexerClusterPodManager) updateIndexerConfFiles(ctx context.Context, queueInputs, queueOutputs, pipelineInputs := getQueueAndPipelineInputsForIndexerConfFiles(queue, os, accessKey, secretKey) for _, pbVal := range queueOutputs { - if err := splunkClient.UpdateConfFile(scopedLog, "outputs", fmt.Sprintf("remote_queue:%s", queue.SQS.Name), [][]string{pbVal}); err != nil { + if !strings.Contains(pbVal[0], "access_key") && !strings.Contains(pbVal[0], "secret_key") { + logger.InfoContext(ctx, "Updating queue input in outputs.conf", "input", pbVal) + } + if err := splunkClient.UpdateConfFile(ctx, logger, "outputs", fmt.Sprintf("remote_queue:%s", queue.SQS.Name), [][]string{pbVal}); err != nil { updateErr = err } } for _, pbVal := range queueInputs { - if err := splunkClient.UpdateConfFile(scopedLog, "inputs", fmt.Sprintf("remote_queue:%s", queue.SQS.Name), [][]string{pbVal}); err != nil { + if !strings.Contains(pbVal[0], "access_key") && !strings.Contains(pbVal[0], "secret_key") { + logger.InfoContext(ctx, "Updating queue input in inputs.conf", "input", pbVal) + } + if err := splunkClient.UpdateConfFile(ctx, logger, "inputs", fmt.Sprintf("remote_queue:%s", queue.SQS.Name), [][]string{pbVal}); err != nil { updateErr = err } } for _, field := range pipelineInputs { - if err := splunkClient.UpdateConfFile(scopedLog, "default-mode", field[0], [][]string{{field[1], field[2]}}); err != nil { + logger.InfoContext(ctx, "Updating pipeline input in default-mode.conf", "input", field) + if err := splunkClient.UpdateConfFile(ctx, logger, "default-mode", field[0], [][]string{{field[1], field[2]}}); err != nil { updateErr = err } } diff --git a/pkg/splunk/enterprise/indexercluster_test.go b/pkg/splunk/enterprise/indexercluster_test.go index 0629e27ec..e0fa5fdfa 100644 --- a/pkg/splunk/enterprise/indexercluster_test.go +++ b/pkg/splunk/enterprise/indexercluster_test.go @@ -19,6 +19,7 @@ import ( "context" "encoding/json" "fmt" + "log/slog" "net/http" "os" "path/filepath" @@ -44,7 +45,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "github.com/go-logr/logr" + "github.com/splunk/splunk-operator/pkg/logging" splclient "github.com/splunk/splunk-operator/pkg/splunk/client" splcommon "github.com/splunk/splunk-operator/pkg/splunk/common" spltest "github.com/splunk/splunk-operator/pkg/splunk/test" @@ -307,7 +308,7 @@ func TestGetMonitoringConsoleClient(t *testing.T) { }, }, } - scopedLog := logt.WithName("TestGetMonitoringConsoleClient") + scopedLog := logging.FromContext(context.Background()).With("func", "TestGetMonitoringConsoleClient", "name", "stack1", "namespace", "test") secrets := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ @@ -336,7 +337,7 @@ func TestGetClusterManagerClient(t *testing.T) { os.Setenv("SPLUNK_GENERAL_TERMS", "--accept-sgt-current-at-splunk-com") ctx := context.TODO() - scopedLog := logt.WithName("TestGetClusterManagerClient") + scopedLog := logging.FromContext(ctx).With("func", "TestGetClusterManagerClient", "name", "stack1", "namespace", "test") cr := enterpriseApi.IndexerCluster{ TypeMeta: metav1.TypeMeta{ Kind: "IndexerCluster", @@ -387,7 +388,7 @@ func TestGetClusterManagerClient(t *testing.T) { func getIndexerClusterPodManager(method string, mockHandlers []spltest.MockHTTPHandler, mockSplunkClient *spltest.MockHTTPClient, replicas int32) *indexerClusterPodManager { os.Setenv("SPLUNK_GENERAL_TERMS", "--accept-sgt-current-at-splunk-com") - scopedLog := logt.WithName(method) + scopedLog := logging.FromContext(context.Background()).With("func", method, "name", "stack1", "namespace", "test") cr := enterpriseApi.IndexerCluster{ TypeMeta: metav1.TypeMeta{ Kind: "IndexerCluster", @@ -1029,7 +1030,7 @@ func TestSetClusterMaintenanceMode(t *testing.T) { func TestApplyIdxcSecret(t *testing.T) { os.Setenv("SPLUNK_GENERAL_TERMS", "--accept-sgt-current-at-splunk-com") method := "ApplyIdxcSecret" - scopedLog := logt.WithName(method) + scopedLog := logging.FromContext(context.Background()).With("func", method, "name", "stack1", "namespace", "test") var initObjectList []client.Object ctx := context.TODO() @@ -1596,7 +1597,7 @@ func TestIndexerClusterWithReadyState(t *testing.T) { savedNewIndexerClusterPodManager := newIndexerClusterPodManager defer func() { newIndexerClusterPodManager = savedNewIndexerClusterPodManager }() - newIndexerClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IndexerCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc, c splcommon.ControllerClient) indexerClusterPodManager { + newIndexerClusterPodManager = func(log *slog.Logger, cr *enterpriseApi.IndexerCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc, c splcommon.ControllerClient) indexerClusterPodManager { return indexerClusterPodManager{ log: log, cr: cr, @@ -2828,7 +2829,7 @@ func TestPasswordSyncCompleted(t *testing.T) { // Initialize a minimal pod manager for ApplyIdxcSecret mgr := &indexerClusterPodManager{ c: client, - log: logt.WithName("TestPasswordSyncCompleted"), + log: logging.FromContext(ctx).With("func", "TestPasswordSyncCompleted", "name", idxc.GetName(), "namespace", idxc.GetNamespace()), cr: &idxc, } @@ -3360,7 +3361,7 @@ func TestIdxcPasswordSyncFailedEvent(t *testing.T) { mgr := &indexerClusterPodManager{ c: c, - log: logt.WithName("TestIdxcPasswordSyncFailedEvent"), + log: logging.FromContext(ctx).With("func", "TestIdxcPasswordSyncFailedEvent", "name", idxc.GetName(), "namespace", idxc.GetNamespace()), cr: &idxc, newSplunkClient: func(managementURI, username, password string) *splclient.SplunkClient { sc := splclient.NewSplunkClient(managementURI, username, password) @@ -3418,3 +3419,79 @@ func (m *mockEventRecorder) Eventf(object pkgruntime.Object, eventType, reason, func (m *mockEventRecorder) AnnotatedEventf(object pkgruntime.Object, annotations map[string]string, eventType, reason, messageFmt string, args ...interface{}) { m.events = append(m.events, mockEvent{eventType: eventType, reason: reason, message: fmt.Sprintf(messageFmt, args...)}) } + +func TestIdxcQueueConfigUpdatedEvent(t *testing.T) { + ctx := context.TODO() + recorder := &mockEventRecorder{events: []mockEvent{}} + eventPublisher := &K8EventPublisher{recorder: recorder} + ctx = context.WithValue(ctx, splcommon.EventPublisherKey, eventPublisher) + + crName := "test-idxc" + cr := &enterpriseApi.IndexerCluster{ + ObjectMeta: metav1.ObjectMeta{Name: crName, Namespace: "test"}, + } + cr.Spec.Replicas = 3 + + // Replicate the production conditional from ApplyIndexerClusterManager() + ep := GetEventPublisher(ctx, cr) + ep.Normal(ctx, "QueueConfigUpdated", + fmt.Sprintf("Queue/Pipeline configuration updated for %d indexers", cr.Spec.Replicas)) + + found := false + for _, event := range recorder.events { + if event.reason == "QueueConfigUpdated" { + found = true + if event.eventType != corev1.EventTypeNormal { + t.Errorf("Expected Normal event type for QueueConfigUpdated, got %s", event.eventType) + } + if !strings.Contains(event.message, "3") { + t.Errorf("Expected event message to contain replica count '3', got: %s", event.message) + } + if !strings.Contains(event.message, "Queue/Pipeline") { + t.Errorf("Expected event message to contain 'Queue/Pipeline', got: %s", event.message) + } + break + } + } + if !found { + t.Errorf("Expected QueueConfigUpdated event to be published") + } +} + +func TestIdxcIndexersRestartedEvent(t *testing.T) { + ctx := context.TODO() + recorder := &mockEventRecorder{events: []mockEvent{}} + eventPublisher := &K8EventPublisher{recorder: recorder} + ctx = context.WithValue(ctx, splcommon.EventPublisherKey, eventPublisher) + + crName := "test-idxc" + cr := &enterpriseApi.IndexerCluster{ + ObjectMeta: metav1.ObjectMeta{Name: crName, Namespace: "test"}, + } + cr.Spec.Replicas = 5 + + // Replicate the production conditional from ApplyIndexerClusterManager() + ep := GetEventPublisher(ctx, cr) + ep.Normal(ctx, "IndexersRestarted", + fmt.Sprintf("Restarted Splunk on %d indexer pods", cr.Spec.Replicas)) + + found := false + for _, event := range recorder.events { + if event.reason == "IndexersRestarted" { + found = true + if event.eventType != corev1.EventTypeNormal { + t.Errorf("Expected Normal event type for IndexersRestarted, got %s", event.eventType) + } + if !strings.Contains(event.message, "5") { + t.Errorf("Expected event message to contain replica count '5', got: %s", event.message) + } + if !strings.Contains(event.message, "Restarted Splunk") { + t.Errorf("Expected event message to contain 'Restarted Splunk', got: %s", event.message) + } + break + } + } + if !found { + t.Errorf("Expected IndexersRestarted event to be published") + } +} diff --git a/pkg/splunk/enterprise/ingestorcluster.go b/pkg/splunk/enterprise/ingestorcluster.go index b0f866413..47e4c4653 100644 --- a/pkg/splunk/enterprise/ingestorcluster.go +++ b/pkg/splunk/enterprise/ingestorcluster.go @@ -17,12 +17,13 @@ package enterprise import ( "context" "fmt" + "log/slog" "reflect" "strings" "time" - "github.com/go-logr/logr" enterpriseApi "github.com/splunk/splunk-operator/api/v4" + "github.com/splunk/splunk-operator/pkg/logging" splclient "github.com/splunk/splunk-operator/pkg/splunk/client" splcommon "github.com/splunk/splunk-operator/pkg/splunk/common" splctrl "github.com/splunk/splunk-operator/pkg/splunk/splkcontroller" @@ -31,7 +32,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -45,8 +45,7 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr RequeueAfter: time.Second * 5, } - reqLogger := log.FromContext(ctx) - scopedLog := reqLogger.WithName("ApplyIngestorCluster") + logger := logging.FromContext(ctx).With("func", "ApplyIngestorCluster", "name", cr.GetName(), "namespace", cr.GetNamespace()) if cr.Status.ResourceRevMap == nil { cr.Status.ResourceRevMap = make(map[string]string) @@ -60,17 +59,21 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr // Validate and updates defaults for CR err = validateIngestorClusterSpec(ctx, client, cr) if err != nil { - eventPublisher.Warning(ctx, "validateIngestorClusterSpec", fmt.Sprintf("validate ingestor cluster spec failed %s", err.Error())) - scopedLog.Error(err, "Failed to validate ingestor cluster spec") + eventPublisher.Warning(ctx, "SpecValidationFailure", fmt.Sprintf("validation of ingestor cluster spec failed due to %s", err.Error())) + logger.ErrorContext(ctx, "Failed to validate ingestor cluster spec", "error", err.Error()) return result, err } // Initialize phase cr.Status.Phase = enterpriseApi.PhaseError + // Track previous ready replicas for scaling events + previousReadyReplicas := cr.Status.ReadyReplicas + // Update the CR Status defer updateCRStatus(ctx, client, cr, &err) if cr.Status.Replicas < cr.Spec.Replicas { + logger.InfoContext(ctx, "Scaling up ingestor cluster", "previousReplicas", cr.Status.Replicas, "newReplicas", cr.Spec.Replicas) cr.Status.CredentialSecretVersion = "0" cr.Status.ServiceAccount = "" } @@ -88,7 +91,7 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr if len(cr.Spec.AppFrameworkConfig.AppSources) != 0 { err = initAndCheckAppInfoStatus(ctx, client, cr, &cr.Spec.AppFrameworkConfig, &cr.Status.AppContext) if err != nil { - eventPublisher.Warning(ctx, "initAndCheckAppInfoStatus", fmt.Sprintf("init and check app info status failed %s", err.Error())) + eventPublisher.Warning(ctx, "AppInfoStatusInitializationFailure", fmt.Sprintf("init and check app info status failed due to %s", err.Error())) cr.Status.AppContext.IsDeploymentInProgress = false return result, err } @@ -99,8 +102,8 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr // Create or update general config resources namespaceScopedSecret, err := ApplySplunkConfig(ctx, client, cr, cr.Spec.CommonSplunkSpec, SplunkIngestor) if err != nil { - scopedLog.Error(err, "create or update general config failed", "error", err.Error()) - eventPublisher.Warning(ctx, "ApplySplunkConfig", fmt.Sprintf("create or update general config failed with error %s", err.Error())) + eventPublisher.Warning(ctx, "ApplySplunkConfigFailure", fmt.Sprintf("apply of general config failed due to %s", err.Error())) + logger.ErrorContext(ctx, "create or update general config failed", "error", err.Error()) return result, err } @@ -109,7 +112,7 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr if cr.Spec.MonitoringConsoleRef.Name != "" { _, err = ApplyMonitoringConsoleEnvConfigMap(ctx, client, cr.GetNamespace(), cr.GetName(), cr.Spec.MonitoringConsoleRef.Name, make([]corev1.EnvVar, 0), false) if err != nil { - eventPublisher.Warning(ctx, "ApplyMonitoringConsoleEnvConfigMap", fmt.Sprintf("create/update monitoring console config map failed %s", err.Error())) + eventPublisher.Warning(ctx, "ApplyMonitoringConsoleEnvConfigMapFailure", fmt.Sprintf("apply of monitoring console config map failed due to %s", err.Error())) return result, err } } @@ -138,14 +141,14 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr // Create or update a headless service for ingestor cluster err = splctrl.ApplyService(ctx, client, getSplunkService(ctx, cr, &cr.Spec.CommonSplunkSpec, SplunkIngestor, true)) if err != nil { - eventPublisher.Warning(ctx, "ApplyService", fmt.Sprintf("create/update headless service for ingestor cluster failed %s", err.Error())) + eventPublisher.Warning(ctx, "ApplyServiceFailure", fmt.Sprintf("apply of headless service failed due to %s", err.Error())) return result, err } // Create or update a regular service for ingestor cluster err = splctrl.ApplyService(ctx, client, getSplunkService(ctx, cr, &cr.Spec.CommonSplunkSpec, SplunkIngestor, false)) if err != nil { - eventPublisher.Warning(ctx, "ApplyService", fmt.Sprintf("create/update service for ingestor cluster failed %s", err.Error())) + eventPublisher.Warning(ctx, "ApplyServiceFailure", fmt.Sprintf("apply of service failed due to %s", err.Error())) return result, err } @@ -187,14 +190,14 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr // Create or update statefulset for the ingestors statefulSet, err := getIngestorStatefulSet(ctx, client, cr) if err != nil { - eventPublisher.Warning(ctx, "getIngestorStatefulSet", fmt.Sprintf("get ingestor stateful set failed %s", err.Error())) + eventPublisher.Warning(ctx, "GetIngestorStatefulSetFailure", fmt.Sprintf("get stateful set failed due to %s", err.Error())) return result, err } // Make changes to respective mc configmap when changing/removing mcRef from spec err = validateMonitoringConsoleRef(ctx, client, statefulSet, make([]corev1.EnvVar, 0)) if err != nil { - eventPublisher.Warning(ctx, "validateMonitoringConsoleRef", fmt.Sprintf("validate monitoring console reference failed %s", err.Error())) + eventPublisher.Warning(ctx, "MonitoringConsoleRefValidationFailure", fmt.Sprintf("monitoring console reference validation failed due to %s", err.Error())) return result, err } @@ -202,56 +205,82 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr phase, err := mgr.Update(ctx, client, statefulSet, cr.Spec.Replicas) cr.Status.ReadyReplicas = statefulSet.Status.ReadyReplicas if err != nil { - eventPublisher.Warning(ctx, "update", fmt.Sprintf("update stateful set failed %s", err.Error())) + eventPublisher.Warning(ctx, "UpdateStatefulSetFailure", fmt.Sprintf("stateful set update failed due to %s", err.Error())) return result, err } cr.Status.Phase = phase + // Emit scaling events when phase is ready and ready replicas changed to match desired + if phase == enterpriseApi.PhaseReady { + desiredReplicas := cr.Spec.Replicas + if cr.Status.ReadyReplicas == desiredReplicas && previousReadyReplicas != desiredReplicas { + if desiredReplicas > previousReadyReplicas { + eventPublisher.Normal(ctx, "ScaledUp", + fmt.Sprintf("Successfully scaled %s up from %d to %d replicas", cr.GetName(), previousReadyReplicas, desiredReplicas)) + } else if desiredReplicas < previousReadyReplicas { + eventPublisher.Normal(ctx, "ScaledDown", + fmt.Sprintf("Successfully scaled %s down from %d to %d replicas", cr.GetName(), previousReadyReplicas, desiredReplicas)) + } + } + } + // No need to requeue if everything is ready if cr.Status.Phase == enterpriseApi.PhaseReady { qosCfg, err := ResolveQueueAndObjectStorage(ctx, client, cr, cr.Spec.QueueRef, cr.Spec.ObjectStorageRef, cr.Spec.ServiceAccount) if err != nil { - scopedLog.Error(err, "Failed to resolve Queue/ObjectStorage config") + logger.ErrorContext(ctx, "Failed to resolve Queue/ObjectStorage config", "error", err.Error()) return result, err } + logger.DebugContext(ctx, "Resolved Queue/ObjectStorage config", "queue", qosCfg.Queue, "objectStorage", qosCfg.OS, "version", qosCfg.Version, "serviceAccount", cr.Spec.ServiceAccount) secretChanged := cr.Status.CredentialSecretVersion != qosCfg.Version serviceAccountChanged := cr.Status.ServiceAccount != cr.Spec.ServiceAccount + logger.InfoContext(ctx, "Checking for changes", "previousCredentialSecretVersion", cr.Status.CredentialSecretVersion, "previousServiceAccount", cr.Status.ServiceAccount, "secretChanged", secretChanged, "serviceAccountChanged", serviceAccountChanged) + // If queue is updated if secretChanged || serviceAccountChanged { - mgr := newIngestorClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) - err = mgr.updateIngestorConfFiles(ctx, cr, &qosCfg.Queue, &qosCfg.OS, qosCfg.AccessKey, qosCfg.SecretKey, client) + ingMgr := newIngestorClusterPodManager(logger, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) + err = ingMgr.updateIngestorConfFiles(ctx, cr, &qosCfg.Queue, &qosCfg.OS, qosCfg.AccessKey, qosCfg.SecretKey, client) if err != nil { - eventPublisher.Warning(ctx, "ApplyIngestorCluster", fmt.Sprintf("Failed to update conf file for Queue/Pipeline config change after pod creation: %s", err.Error())) - scopedLog.Error(err, "Failed to update conf file for Queue/Pipeline config change after pod creation") + eventPublisher.Warning(ctx, "UpdateConfFilesFailure", fmt.Sprintf("failed to update conf file for Queue/Pipeline config due to %s", err.Error())) + logger.ErrorContext(ctx, "Failed to update conf file for Queue/Pipeline config", "error", err.Error()) return result, err } + eventPublisher.Normal(ctx, "QueueConfigUpdated", + fmt.Sprintf("Queue/Pipeline configuration updated for %d ingestors", cr.Spec.Replicas)) + logger.InfoContext(ctx, "Queue/Pipeline configuration updated", "readyReplicas", cr.Status.ReadyReplicas) + for i := int32(0); i < cr.Spec.Replicas; i++ { - ingClient := mgr.getClient(ctx, i) + ingClient := ingMgr.getClient(ctx, i) err = ingClient.RestartSplunk() if err != nil { return result, err } - scopedLog.Info("Restarted splunk", "ingestor", i) + logger.DebugContext(ctx, "Restarted splunk", "ingestor", i) } + eventPublisher.Normal(ctx, "IngestorsRestarted", + fmt.Sprintf("Restarted Splunk on %d ingestor pods", cr.Spec.Replicas)) + cr.Status.CredentialSecretVersion = qosCfg.Version cr.Status.ServiceAccount = cr.Spec.ServiceAccount + + logger.InfoContext(ctx, "Updated status", "credentialSecretVersion", cr.Status.CredentialSecretVersion, "serviceAccount", cr.Status.ServiceAccount) } - // Upgrade fron automated MC to MC CRD + // Upgrade from automated MC to MC CRD namespacedName := types.NamespacedName{Namespace: cr.GetNamespace(), Name: GetSplunkStatefulsetName(SplunkMonitoringConsole, cr.GetNamespace())} err = splctrl.DeleteReferencesToAutomatedMCIfExists(ctx, client, cr, namespacedName) if err != nil { - eventPublisher.Warning(ctx, "DeleteReferencesToAutomatedMCIfExists", fmt.Sprintf("delete reference to automated MC if exists failed %s", err.Error())) - scopedLog.Error(err, "Error in deleting automated monitoring console resource") + eventPublisher.Warning(ctx, "MCReferencesDeletionFailure", fmt.Sprintf("reference to automated MC if exists failed due to %s", err.Error())) + logger.ErrorContext(ctx, "Error in deleting automated monitoring console resource", "error", err.Error()) } if cr.Spec.MonitoringConsoleRef.Name != "" { _, err = ApplyMonitoringConsoleEnvConfigMap(ctx, client, cr.GetNamespace(), cr.GetName(), cr.Spec.MonitoringConsoleRef.Name, make([]corev1.EnvVar, 0), true) if err != nil { - eventPublisher.Warning(ctx, "ApplyMonitoringConsoleEnvConfigMap", fmt.Sprintf("apply monitoring console environment config map failed %s", err.Error())) + eventPublisher.Warning(ctx, "ApplyMonitoringConsoleEnvConfigMapFailure", fmt.Sprintf("apply of monitoring console environment config map failed due to %s", err.Error())) return result, err } } @@ -283,8 +312,7 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr // getClient for ingestorClusterPodManager returns a SplunkClient for the member n func (mgr *ingestorClusterPodManager) getClient(ctx context.Context, n int32) *splclient.SplunkClient { - reqLogger := log.FromContext(ctx) - scopedLog := reqLogger.WithName("ingestorClusterPodManager.getClient").WithValues("name", mgr.cr.GetName(), "namespace", mgr.cr.GetNamespace()) + logger := logging.FromContext(ctx).With("func", "getClient", "name", mgr.cr.GetName(), "namespace", mgr.cr.GetNamespace()) // Get Pod Name memberName := GetSplunkStatefulsetPodName(SplunkIngestor, mgr.cr.GetName(), n) @@ -296,7 +324,7 @@ func (mgr *ingestorClusterPodManager) getClient(ctx context.Context, n int32) *s // Retrieve admin password from Pod adminPwd, err := splutil.GetSpecificSecretTokenFromPod(ctx, mgr.c, memberName, mgr.cr.GetNamespace(), "password") if err != nil { - scopedLog.Error(err, "Couldn't retrieve the admin password from pod") + logger.ErrorContext(ctx, "Couldn't retrieve the admin password from pod", "error", err.Error()) } return mgr.newSplunkClient(fmt.Sprintf("https://%s:8089", fqdnName), "admin", adminPwd) @@ -334,11 +362,10 @@ func getIngestorStatefulSet(ctx context.Context, client splcommon.ControllerClie // updateIngestorConfFiles checks if Queue or Pipeline inputs are created for the first time and updates the conf file if so func (mgr *ingestorClusterPodManager) updateIngestorConfFiles(ctx context.Context, newCR *enterpriseApi.IngestorCluster, queue *enterpriseApi.QueueSpec, os *enterpriseApi.ObjectStorageSpec, accessKey, secretKey string, k8s client.Client) error { - reqLogger := log.FromContext(ctx) - scopedLog := reqLogger.WithName("updateIngestorConfFiles").WithValues("name", newCR.GetName(), "namespace", newCR.GetNamespace()) + logger := logging.FromContext(ctx).With("func", "updateIngestorConfFiles", "name", newCR.GetName(), "namespace", newCR.GetNamespace()) // Only update config for pods that exist - readyReplicas := newCR.Status.Replicas + readyReplicas := newCR.Status.ReadyReplicas // List all pods for this IngestorCluster StatefulSet var updateErr error @@ -354,16 +381,22 @@ func (mgr *ingestorClusterPodManager) updateIngestorConfFiles(ctx context.Contex queueInputs, pipelineInputs := getQueueAndPipelineInputsForIngestorConfFiles(queue, os, accessKey, secretKey) for _, input := range queueInputs { - if err := splunkClient.UpdateConfFile(scopedLog, "outputs", fmt.Sprintf("remote_queue:%s", queue.SQS.Name), [][]string{input}); err != nil { + if !strings.Contains(input[0], "access_key") && !strings.Contains(input[0], "secret_key") { + logger.InfoContext(ctx, "Updating queue input in outputs.conf", "input", input) + } + if err := splunkClient.UpdateConfFile(ctx, logger, "outputs", fmt.Sprintf("remote_queue:%s", queue.SQS.Name), [][]string{input}); err != nil { updateErr = err } } for _, input := range pipelineInputs { - if err := splunkClient.UpdateConfFile(scopedLog, "default-mode", input[0], [][]string{{input[1], input[2]}}); err != nil { + logger.InfoContext(ctx, "Updating pipeline input in default-mode.conf", "input", input) + if err := splunkClient.UpdateConfFile(ctx, logger, "default-mode", input[0], [][]string{{input[1], input[2]}}); err != nil { updateErr = err } } + + logger.InfoContext(ctx, "Updated conf files for pod", "pod", memberName) } return updateErr @@ -382,14 +415,14 @@ func getQueueAndPipelineInputsForIngestorConfFiles(queue *enterpriseApi.QueueSpe type ingestorClusterPodManager struct { c splcommon.ControllerClient - log logr.Logger + log *slog.Logger cr *enterpriseApi.IngestorCluster secrets *corev1.Secret newSplunkClient func(managementURI, username, password string) *splclient.SplunkClient } // newIngestorClusterPodManager creates pod manager to handle unit test cases -var newIngestorClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc, c splcommon.ControllerClient) ingestorClusterPodManager { +var newIngestorClusterPodManager = func(log *slog.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc, c splcommon.ControllerClient) ingestorClusterPodManager { return ingestorClusterPodManager{ log: log, cr: cr, diff --git a/pkg/splunk/enterprise/ingestorcluster_test.go b/pkg/splunk/enterprise/ingestorcluster_test.go index e96002372..a805928f2 100644 --- a/pkg/splunk/enterprise/ingestorcluster_test.go +++ b/pkg/splunk/enterprise/ingestorcluster_test.go @@ -17,13 +17,13 @@ package enterprise import ( "context" "fmt" + "log/slog" "net/http" "os" "path/filepath" "strings" "testing" - "github.com/go-logr/logr" enterpriseApi "github.com/splunk/splunk-operator/api/v4" splclient "github.com/splunk/splunk-operator/pkg/splunk/client" splcommon "github.com/splunk/splunk-operator/pkg/splunk/common" @@ -252,7 +252,7 @@ func TestApplyIngestorCluster(t *testing.T) { // outputs.conf origNew := newIngestorClusterPodManager mockHTTPClient := &spltest.MockHTTPClient{} - newIngestorClusterPodManager = func(l logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, _ NewSplunkClientFunc, c splcommon.ControllerClient) ingestorClusterPodManager { + newIngestorClusterPodManager = func(l *slog.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, _ NewSplunkClientFunc, c splcommon.ControllerClient) ingestorClusterPodManager { return ingestorClusterPodManager{ c: c, log: l, cr: cr, secrets: secret, @@ -775,3 +775,187 @@ func newTestIngestorQueuePipelineManager(mockHTTPClient *spltest.MockHTTPClient) newSplunkClient: newSplunkClientForQueuePipeline, } } + +func TestIngScaledUpScaledDownEvent(t *testing.T) { + ctx := context.TODO() + recorder := &mockEventRecorder{events: []mockEvent{}} + eventPublisher := &K8EventPublisher{recorder: recorder} + ctx = context.WithValue(ctx, splcommon.EventPublisherKey, eventPublisher) + + crName := "test-ingestor" + cr := &enterpriseApi.IngestorCluster{ + ObjectMeta: metav1.ObjectMeta{Name: crName, Namespace: "test"}, + } + + // Simulate ScaledUp: previousReplicas=1, desiredReplicas=3, phase=PhaseReady, Status.ReadyReplicas=3 + previousReplicas := int32(1) + desiredReplicas := int32(3) + cr.Status.ReadyReplicas = desiredReplicas + phase := enterpriseApi.PhaseReady + + // Replicate the production conditional from ApplyIngestorCluster() + ep := GetEventPublisher(ctx, cr) + if phase == enterpriseApi.PhaseReady { + if desiredReplicas > previousReplicas && cr.Status.ReadyReplicas == desiredReplicas { + ep.Normal(ctx, "ScaledUp", + fmt.Sprintf("Successfully scaled %s up from %d to %d replicas", cr.GetName(), previousReplicas, desiredReplicas)) + } + } + + found := false + for _, event := range recorder.events { + if event.reason == "ScaledUp" { + found = true + if event.eventType != corev1.EventTypeNormal { + t.Errorf("Expected Normal event type for ScaledUp, got %s", event.eventType) + } + if !strings.Contains(event.message, crName) { + t.Errorf("Expected event message to contain CR name '%s', got: %s", crName, event.message) + } + if !strings.Contains(event.message, "1") || !strings.Contains(event.message, "3") { + t.Errorf("Expected event message to contain replica counts, got: %s", event.message) + } + break + } + } + if !found { + t.Errorf("Expected ScaledUp event to be published") + } + + // Simulate ScaledDown: previousReplicas=3, desiredReplicas=1, phase=PhaseReady, Status.ReadyReplicas=1 + recorder.events = []mockEvent{} + previousReplicas = int32(3) + desiredReplicas = int32(1) + cr.Status.ReadyReplicas = desiredReplicas + + if phase == enterpriseApi.PhaseReady { + if desiredReplicas < previousReplicas && cr.Status.ReadyReplicas == desiredReplicas { + ep.Normal(ctx, "ScaledDown", + fmt.Sprintf("Successfully scaled %s down from %d to %d replicas", cr.GetName(), previousReplicas, desiredReplicas)) + } + } + + found = false + for _, event := range recorder.events { + if event.reason == "ScaledDown" { + found = true + if event.eventType != corev1.EventTypeNormal { + t.Errorf("Expected Normal event type for ScaledDown, got %s", event.eventType) + } + if !strings.Contains(event.message, crName) { + t.Errorf("Expected event message to contain CR name '%s', got: %s", crName, event.message) + } + break + } + } + if !found { + t.Errorf("Expected ScaledDown event to be published") + } + + // Negative: no event when phase is not PhaseReady + recorder.events = []mockEvent{} + phase = enterpriseApi.PhasePending + if phase == enterpriseApi.PhaseReady { + if desiredReplicas < previousReplicas && cr.Status.ReadyReplicas == desiredReplicas { + ep.Normal(ctx, "ScaledDown", + fmt.Sprintf("Successfully scaled %s down from %d to %d replicas", cr.GetName(), previousReplicas, desiredReplicas)) + } + } + if len(recorder.events) != 0 { + t.Errorf("Expected no events when phase is not PhaseReady, got %d events", len(recorder.events)) + } + + // Negative: no event when replicas haven't converged + recorder.events = []mockEvent{} + phase = enterpriseApi.PhaseReady + cr.Status.ReadyReplicas = int32(2) // not yet at desiredReplicas + if phase == enterpriseApi.PhaseReady { + if desiredReplicas < previousReplicas && cr.Status.ReadyReplicas == desiredReplicas { + ep.Normal(ctx, "ScaledDown", + fmt.Sprintf("Successfully scaled %s down from %d to %d replicas", cr.GetName(), previousReplicas, desiredReplicas)) + } + } + if len(recorder.events) != 0 { + t.Errorf("Expected no events when replicas haven't converged, got %d events", len(recorder.events)) + } +} + +func TestIngQueueConfigUpdatedEvent(t *testing.T) { + ctx := context.TODO() + recorder := &mockEventRecorder{events: []mockEvent{}} + eventPublisher := &K8EventPublisher{recorder: recorder} + ctx = context.WithValue(ctx, splcommon.EventPublisherKey, eventPublisher) + + crName := "test-ingestor" + cr := &enterpriseApi.IngestorCluster{ + ObjectMeta: metav1.ObjectMeta{Name: crName, Namespace: "test"}, + Spec: enterpriseApi.IngestorClusterSpec{ + Replicas: 3, + }, + } + + // Replicate the production conditional from ApplyIngestorCluster() + ep := GetEventPublisher(ctx, cr) + ep.Normal(ctx, "QueueConfigUpdated", + fmt.Sprintf("Queue/Pipeline configuration updated for %d ingestors", cr.Spec.Replicas)) + + found := false + for _, event := range recorder.events { + if event.reason == "QueueConfigUpdated" { + found = true + if event.eventType != corev1.EventTypeNormal { + t.Errorf("Expected Normal event type for QueueConfigUpdated, got %s", event.eventType) + } + if !strings.Contains(event.message, "3") { + t.Errorf("Expected event message to contain replica count '3', got: %s", event.message) + } + if !strings.Contains(event.message, "Queue/Pipeline") { + t.Errorf("Expected event message to contain 'Queue/Pipeline', got: %s", event.message) + } + break + } + } + if !found { + t.Errorf("Expected QueueConfigUpdated event to be published") + } +} + +func TestIngIngestorsRestartedEvent(t *testing.T) { + ctx := context.TODO() + recorder := &mockEventRecorder{events: []mockEvent{}} + eventPublisher := &K8EventPublisher{recorder: recorder} + ctx = context.WithValue(ctx, splcommon.EventPublisherKey, eventPublisher) + + crName := "test-ingestor" + cr := &enterpriseApi.IngestorCluster{ + ObjectMeta: metav1.ObjectMeta{Name: crName, Namespace: "test"}, + Spec: enterpriseApi.IngestorClusterSpec{ + Replicas: 5, + }, + } + + // Replicate the production conditional from ApplyIngestorCluster() + ep := GetEventPublisher(ctx, cr) + ep.Normal(ctx, "IngestorsRestarted", + fmt.Sprintf("Restarted Splunk on %d ingestor pods", cr.Spec.Replicas)) + + found := false + for _, event := range recorder.events { + if event.reason == "IngestorsRestarted" { + found = true + if event.eventType != corev1.EventTypeNormal { + t.Errorf("Expected Normal event type for IngestorsRestarted, got %s", event.eventType) + } + if !strings.Contains(event.message, "5") { + t.Errorf("Expected event message to contain replica count '5', got: %s", event.message) + } + if !strings.Contains(event.message, "Restarted Splunk") { + t.Errorf("Expected event message to contain 'Restarted Splunk', got: %s", event.message) + } + break + } + } + if !found { + t.Errorf("Expected IngestorsRestarted event to be published") + } +} diff --git a/pkg/splunk/enterprise/standalone.go b/pkg/splunk/enterprise/standalone.go index dd498ce33..7e9f3f05a 100644 --- a/pkg/splunk/enterprise/standalone.go +++ b/pkg/splunk/enterprise/standalone.go @@ -215,8 +215,8 @@ func ApplyStandalone(ctx context.Context, client splcommon.ControllerClient, cr return result, err } - // Track last successful replica count to emit scale events after completion - previousReplicas := cr.Status.Replicas + // Track previous ready replicas for scaling events + previousReadyReplicas := cr.Status.ReadyReplicas mgr := splctrl.DefaultStatefulSetPodManager{} phase, err := mgr.Update(ctx, client, statefulSet, cr.Spec.Replicas) @@ -228,18 +228,20 @@ func ApplyStandalone(ctx context.Context, client splcommon.ControllerClient, cr } cr.Status.Phase = phase - // Emit scale events only after a successful scale operation has completed + // Emit scale events when phase is ready and ready replicas changed to match desired if phase == enterpriseApi.PhaseReady { desiredReplicas := cr.Spec.Replicas - if desiredReplicas > previousReplicas && cr.Status.Replicas == desiredReplicas { - if eventPublisher != nil { - eventPublisher.Normal(ctx, "ScaledUp", - fmt.Sprintf("Successfully scaled %s up from %d to %d replicas", cr.GetName(), previousReplicas, desiredReplicas)) - } - } else if desiredReplicas < previousReplicas && cr.Status.Replicas == desiredReplicas { - if eventPublisher != nil { - eventPublisher.Normal(ctx, "ScaledDown", - fmt.Sprintf("Successfully scaled %s down from %d to %d replicas", cr.GetName(), previousReplicas, desiredReplicas)) + if cr.Status.ReadyReplicas == desiredReplicas && previousReadyReplicas != desiredReplicas { + if desiredReplicas > previousReadyReplicas { + if eventPublisher != nil { + eventPublisher.Normal(ctx, "ScaledUp", + fmt.Sprintf("Successfully scaled %s up from %d to %d replicas", cr.GetName(), previousReadyReplicas, desiredReplicas)) + } + } else if desiredReplicas < previousReadyReplicas { + if eventPublisher != nil { + eventPublisher.Normal(ctx, "ScaledDown", + fmt.Sprintf("Successfully scaled %s down from %d to %d replicas", cr.GetName(), previousReadyReplicas, desiredReplicas)) + } } } } diff --git a/pkg/splunk/enterprise/util.go b/pkg/splunk/enterprise/util.go index 4b5d77e8f..3b525f94a 100644 --- a/pkg/splunk/enterprise/util.go +++ b/pkg/splunk/enterprise/util.go @@ -2700,7 +2700,6 @@ func resolveS3Endpoint(ctx context.Context, region string) (string, error) { if err != nil { return "", err } - // Full endpoint URL as string: return ep.URI.String(), nil } diff --git a/pkg/splunk/enterprise/util_test.go b/pkg/splunk/enterprise/util_test.go index 7168e366a..b2ac6ba4d 100644 --- a/pkg/splunk/enterprise/util_test.go +++ b/pkg/splunk/enterprise/util_test.go @@ -3522,8 +3522,8 @@ func TestAppRepositoryConnectionFailedEvent(t *testing.T) { secret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{Name: "test-s3-secret", Namespace: "test"}, Data: map[string][]byte{ - "s3_access_key": []byte("AKIAIOSFODNN7EXAMPLE"), - "s3_secret_key": []byte("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"), + "s3_access_key": []byte("abc"), + "s3_secret_key": []byte("123"), }, } if err := client.Create(ctx, secret); err != nil { @@ -3573,3 +3573,416 @@ func TestAppRepositoryConnectionFailedEvent(t *testing.T) { t.Errorf("Expected AppRepositoryConnectionFailed event to be published") } } + +func TestResolveSQSEndpoint(t *testing.T) { + ctx := context.TODO() + + tests := []struct { + name string + region string + wantEndpoint string + wantErrContain string + }{ + { + name: "valid us-east-1 region", + region: "us-east-1", + wantEndpoint: "https://sqs.us-east-1.amazonaws.com", + }, + { + name: "valid eu-west-1 region", + region: "eu-west-1", + wantEndpoint: "https://sqs.eu-west-1.amazonaws.com", + }, + { + name: "valid ap-southeast-1 region", + region: "ap-southeast-1", + wantEndpoint: "https://sqs.ap-southeast-1.amazonaws.com", + }, + { + name: "valid cn-north-1 region (China)", + region: "cn-north-1", + wantEndpoint: "https://sqs.cn-north-1.amazonaws.com.cn", + }, + { + name: "valid cn-northwest-1 region (China Ningxia)", + region: "cn-northwest-1", + wantEndpoint: "https://sqs.cn-northwest-1.amazonaws.com.cn", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + endpoint, err := resolveSQSEndpoint(ctx, tt.region) + if tt.wantErrContain != "" { + if err == nil { + t.Errorf("resolveSQSEndpoint() expected error containing %q, got nil", tt.wantErrContain) + } else if !strings.Contains(err.Error(), tt.wantErrContain) { + t.Errorf("resolveSQSEndpoint() error = %v, want error containing %q", err, tt.wantErrContain) + } + return + } + if err != nil { + t.Errorf("resolveSQSEndpoint() unexpected error = %v", err) + return + } + if endpoint != tt.wantEndpoint { + t.Errorf("resolveSQSEndpoint() = %v, want %v", endpoint, tt.wantEndpoint) + } + }) + } +} + +func TestResolveS3Endpoint(t *testing.T) { + ctx := context.TODO() + + tests := []struct { + name string + region string + wantEndpoint string + wantErrContain string + }{ + { + name: "valid us-east-1 region", + region: "us-east-1", + wantEndpoint: "https://s3.us-east-1.amazonaws.com", + }, + { + name: "valid eu-west-1 region", + region: "eu-west-1", + wantEndpoint: "https://s3.eu-west-1.amazonaws.com", + }, + { + name: "valid ap-southeast-1 region", + region: "ap-southeast-1", + wantEndpoint: "https://s3.ap-southeast-1.amazonaws.com", + }, + { + name: "valid cn-north-1 region (China)", + region: "cn-north-1", + wantEndpoint: "https://s3.cn-north-1.amazonaws.com.cn", + }, + { + name: "valid cn-northwest-1 region (China Ningxia)", + region: "cn-northwest-1", + wantEndpoint: "https://s3.cn-northwest-1.amazonaws.com.cn", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + endpoint, err := resolveS3Endpoint(ctx, tt.region) + if tt.wantErrContain != "" { + if err == nil { + t.Errorf("resolveS3Endpoint() expected error containing %q, got nil", tt.wantErrContain) + } else if !strings.Contains(err.Error(), tt.wantErrContain) { + t.Errorf("resolveS3Endpoint() error = %v, want error containing %q", err, tt.wantErrContain) + } + return + } + if err != nil { + t.Errorf("resolveS3Endpoint() unexpected error = %v", err) + return + } + if endpoint != tt.wantEndpoint { + t.Errorf("resolveS3Endpoint() = %v, want %v", endpoint, tt.wantEndpoint) + } + }) + } +} + +func TestResolveQueueAndObjectStorage(t *testing.T) { + ctx := context.TODO() + + sch := pkgruntime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(sch)) + utilruntime.Must(corev1.AddToScheme(sch)) + utilruntime.Must(enterpriseApi.AddToScheme(sch)) + + t.Run("empty refs returns empty config", func(t *testing.T) { + client := fake.NewClientBuilder().WithScheme(sch).Build() + cr := &enterpriseApi.IndexerCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test-idxc", Namespace: "test"}, + } + + cfg, err := ResolveQueueAndObjectStorage(ctx, client, cr, corev1.ObjectReference{}, corev1.ObjectReference{}, "") + if err != nil { + t.Errorf("ResolveQueueAndObjectStorage() unexpected error = %v", err) + } + if cfg == nil { + t.Fatal("ResolveQueueAndObjectStorage() returned nil config") + } + if cfg.Queue.Provider != "" { + t.Errorf("Expected empty Queue.Provider, got %q", cfg.Queue.Provider) + } + if cfg.OS.Provider != "" { + t.Errorf("Expected empty OS.Provider, got %q", cfg.OS.Provider) + } + }) + + t.Run("queue ref not found returns error", func(t *testing.T) { + client := fake.NewClientBuilder().WithScheme(sch).Build() + cr := &enterpriseApi.IndexerCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test-idxc", Namespace: "test"}, + } + + queueRef := corev1.ObjectReference{Name: "nonexistent-queue"} + _, err := ResolveQueueAndObjectStorage(ctx, client, cr, queueRef, corev1.ObjectReference{}, "") + if err == nil { + t.Error("ResolveQueueAndObjectStorage() expected error for nonexistent queue, got nil") + } + }) + + t.Run("objectstorage ref not found returns error", func(t *testing.T) { + client := fake.NewClientBuilder().WithScheme(sch).Build() + cr := &enterpriseApi.IndexerCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test-idxc", Namespace: "test"}, + } + + osRef := corev1.ObjectReference{Name: "nonexistent-os"} + _, err := ResolveQueueAndObjectStorage(ctx, client, cr, corev1.ObjectReference{}, osRef, "") + if err == nil { + t.Error("ResolveQueueAndObjectStorage() expected error for nonexistent objectstorage, got nil") + } + }) + + t.Run("valid queue ref returns queue spec", func(t *testing.T) { + queue := &enterpriseApi.Queue{ + ObjectMeta: metav1.ObjectMeta{Name: "test-queue", Namespace: "test"}, + Spec: enterpriseApi.QueueSpec{ + Provider: "sqs", + SQS: enterpriseApi.SQSSpec{ + Name: "my-queue", + AuthRegion: "", + DLQ: "my-dlq", + Endpoint: "https://sqs.us-east-1.amazonaws.com", + }, + }, + } + client := fake.NewClientBuilder().WithScheme(sch).WithObjects(queue).Build() + cr := &enterpriseApi.IndexerCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test-idxc", Namespace: "test"}, + } + + queueRef := corev1.ObjectReference{Name: "test-queue"} + cfg, err := ResolveQueueAndObjectStorage(ctx, client, cr, queueRef, corev1.ObjectReference{}, "") + if err != nil { + t.Errorf("ResolveQueueAndObjectStorage() unexpected error = %v", err) + } + if cfg.Queue.Provider != "sqs" { + t.Errorf("Expected Queue.Provider = 'sqs', got %q", cfg.Queue.Provider) + } + if cfg.Queue.SQS.Name != "my-queue" { + t.Errorf("Expected Queue.SQS.Name = 'my-queue', got %q", cfg.Queue.SQS.Name) + } + if cfg.Queue.SQS.Endpoint != "https://sqs.us-east-1.amazonaws.com" { + t.Errorf("Expected Queue.SQS.Endpoint = 'https://sqs.us-east-1.amazonaws.com', got %q", cfg.Queue.SQS.Endpoint) + } + }) + + t.Run("valid objectstorage ref returns os spec", func(t *testing.T) { + os := &enterpriseApi.ObjectStorage{ + ObjectMeta: metav1.ObjectMeta{Name: "test-os", Namespace: "test"}, + Spec: enterpriseApi.ObjectStorageSpec{ + Provider: "s3", + S3: enterpriseApi.S3Spec{ + Endpoint: "https://s3.us-east-1.amazonaws.com", + Path: "my-bucket/prefix", + }, + }, + } + client := fake.NewClientBuilder().WithScheme(sch).WithObjects(os).Build() + cr := &enterpriseApi.IndexerCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test-idxc", Namespace: "test"}, + } + + osRef := corev1.ObjectReference{Name: "test-os"} + cfg, err := ResolveQueueAndObjectStorage(ctx, client, cr, corev1.ObjectReference{}, osRef, "") + if err != nil { + t.Errorf("ResolveQueueAndObjectStorage() unexpected error = %v", err) + } + if cfg.OS.Provider != "s3" { + t.Errorf("Expected OS.Provider = 's3', got %q", cfg.OS.Provider) + } + if cfg.OS.S3.Path != "my-bucket/prefix" { + t.Errorf("Expected OS.S3.Path = 'my-bucket/prefix', got %q", cfg.OS.S3.Path) + } + }) + + t.Run("queue ref with different namespace", func(t *testing.T) { + queue := &enterpriseApi.Queue{ + ObjectMeta: metav1.ObjectMeta{Name: "test-queue", Namespace: "other-ns"}, + Spec: enterpriseApi.QueueSpec{ + Provider: "sqs", + SQS: enterpriseApi.SQSSpec{ + Name: "cross-ns-queue", + DLQ: "my-dlq", + Endpoint: "https://sqs.eu-west-1.amazonaws.com", + }, + }, + } + client := fake.NewClientBuilder().WithScheme(sch).WithObjects(queue).Build() + cr := &enterpriseApi.IndexerCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test-idxc", Namespace: "test"}, + } + + queueRef := corev1.ObjectReference{Name: "test-queue", Namespace: "other-ns"} + cfg, err := ResolveQueueAndObjectStorage(ctx, client, cr, queueRef, corev1.ObjectReference{}, "") + if err != nil { + t.Errorf("ResolveQueueAndObjectStorage() unexpected error = %v", err) + } + if cfg.Queue.SQS.Name != "cross-ns-queue" { + t.Errorf("Expected Queue.SQS.Name = 'cross-ns-queue', got %q", cfg.Queue.SQS.Name) + } + }) + + t.Run("queue with secret ref extracts credentials", func(t *testing.T) { + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "aws-creds", Namespace: "test"}, + Data: map[string][]byte{ + "s3_access_key": []byte("abc"), + "s3_secret_key": []byte("123"), + }, + } + queue := &enterpriseApi.Queue{ + ObjectMeta: metav1.ObjectMeta{Name: "test-queue", Namespace: "test"}, + Spec: enterpriseApi.QueueSpec{ + Provider: "sqs", + SQS: enterpriseApi.SQSSpec{ + Name: "my-queue", + DLQ: "my-dlq", + Endpoint: "https://sqs.us-east-1.amazonaws.com", + VolList: []enterpriseApi.VolumeSpec{ + { + Name: "vol1", + SecretRef: "aws-creds", + }, + }, + }, + }, + } + client := fake.NewClientBuilder().WithScheme(sch).WithObjects(queue, secret).Build() + cr := &enterpriseApi.IndexerCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test-idxc", Namespace: "test"}, + } + + queueRef := corev1.ObjectReference{Name: "test-queue"} + cfg, err := ResolveQueueAndObjectStorage(ctx, client, cr, queueRef, corev1.ObjectReference{}, "") + if err != nil { + t.Errorf("ResolveQueueAndObjectStorage() unexpected error = %v", err) + } + if cfg.AccessKey != "abc" { + t.Errorf("Expected AccessKey = 'abc', got %q", cfg.AccessKey) + } + if cfg.SecretKey != "123" { + t.Errorf("Expected SecretKey = '123', got %q", cfg.SecretKey) + } + }) + + t.Run("queue with serviceAccount skips secret extraction", func(t *testing.T) { + queue := &enterpriseApi.Queue{ + ObjectMeta: metav1.ObjectMeta{Name: "test-queue", Namespace: "test"}, + Spec: enterpriseApi.QueueSpec{ + Provider: "sqs", + SQS: enterpriseApi.SQSSpec{ + Name: "my-queue", + DLQ: "my-dlq", + Endpoint: "https://sqs.us-east-1.amazonaws.com", + VolList: []enterpriseApi.VolumeSpec{ + { + Name: "vol1", + SecretRef: "aws-creds", + }, + }, + }, + }, + } + client := fake.NewClientBuilder().WithScheme(sch).WithObjects(queue).Build() + cr := &enterpriseApi.IndexerCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test-idxc", Namespace: "test"}, + } + + queueRef := corev1.ObjectReference{Name: "test-queue"} + cfg, err := ResolveQueueAndObjectStorage(ctx, client, cr, queueRef, corev1.ObjectReference{}, "my-service-account") + if err != nil { + t.Errorf("ResolveQueueAndObjectStorage() unexpected error = %v", err) + } + // When serviceAccount is provided, credentials should not be extracted + if cfg.AccessKey != "" { + t.Errorf("Expected empty AccessKey when serviceAccount is provided, got %q", cfg.AccessKey) + } + if cfg.SecretKey != "" { + t.Errorf("Expected empty SecretKey when serviceAccount is provided, got %q", cfg.SecretKey) + } + }) + + t.Run("queue with missing secret returns error", func(t *testing.T) { + queue := &enterpriseApi.Queue{ + ObjectMeta: metav1.ObjectMeta{Name: "test-queue", Namespace: "test"}, + Spec: enterpriseApi.QueueSpec{ + Provider: "sqs", + SQS: enterpriseApi.SQSSpec{ + Name: "my-queue", + DLQ: "my-dlq", + Endpoint: "https://sqs.us-east-1.amazonaws.com", + VolList: []enterpriseApi.VolumeSpec{ + { + Name: "vol1", + SecretRef: "nonexistent-secret", + }, + }, + }, + }, + } + client := fake.NewClientBuilder().WithScheme(sch).WithObjects(queue).Build() + cr := &enterpriseApi.IndexerCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test-idxc", Namespace: "test"}, + } + + queueRef := corev1.ObjectReference{Name: "test-queue"} + _, err := ResolveQueueAndObjectStorage(ctx, client, cr, queueRef, corev1.ObjectReference{}, "") + if err == nil { + t.Error("ResolveQueueAndObjectStorage() expected error for missing secret, got nil") + } + }) + + t.Run("both queue and objectstorage refs", func(t *testing.T) { + queue := &enterpriseApi.Queue{ + ObjectMeta: metav1.ObjectMeta{Name: "test-queue", Namespace: "test"}, + Spec: enterpriseApi.QueueSpec{ + Provider: "sqs", + SQS: enterpriseApi.SQSSpec{ + Name: "my-queue", + DLQ: "my-dlq", + Endpoint: "https://sqs.us-east-1.amazonaws.com", + }, + }, + } + os := &enterpriseApi.ObjectStorage{ + ObjectMeta: metav1.ObjectMeta{Name: "test-os", Namespace: "test"}, + Spec: enterpriseApi.ObjectStorageSpec{ + Provider: "s3", + S3: enterpriseApi.S3Spec{ + Endpoint: "https://s3.us-east-1.amazonaws.com", + Path: "my-bucket", + }, + }, + } + client := fake.NewClientBuilder().WithScheme(sch).WithObjects(queue, os).Build() + cr := &enterpriseApi.IndexerCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test-idxc", Namespace: "test"}, + } + + queueRef := corev1.ObjectReference{Name: "test-queue"} + osRef := corev1.ObjectReference{Name: "test-os"} + cfg, err := ResolveQueueAndObjectStorage(ctx, client, cr, queueRef, osRef, "") + if err != nil { + t.Errorf("ResolveQueueAndObjectStorage() unexpected error = %v", err) + } + if cfg.Queue.Provider != "sqs" { + t.Errorf("Expected Queue.Provider = 'sqs', got %q", cfg.Queue.Provider) + } + if cfg.OS.Provider != "s3" { + t.Errorf("Expected OS.Provider = 's3', got %q", cfg.OS.Provider) + } + }) +} diff --git a/pkg/splunk/util/secrets.go b/pkg/splunk/util/secrets.go index 7240c3916..004266b1b 100644 --- a/pkg/splunk/util/secrets.go +++ b/pkg/splunk/util/secrets.go @@ -197,7 +197,7 @@ func RemoveSecretOwnerRef(ctx context.Context, client splcommon.ControllerClient return refCount, nil } -// RemoveUnwantedSecrets deletes all secrets whose version precedes (latestVersion - MinimumVersionedSecrets) +// RemoveUnwantedSecrets deletes all secrets whose version preceeds (latestVersion - MinimumVersionedSecrets) func RemoveUnwantedSecrets(ctx context.Context, c splcommon.ControllerClient, versionedSecretIdentifier, namespace string) error { logger := slog.With("func", "RemoveUnwantedSecrets") // retrieve the list of versioned namespace scoped secrets