Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func Run(cmd *cobra.Command, args []string) error {
dataDumpController = controllerutils.NewClusterWatchingController(
"DataDump", dbClient, subscriptionLister, 1*time.Minute, controllers.NewDataDumpController(dbClient))
doNothingController = controllers.NewDoNothingExampleController(dbClient, subscriptionLister)
operationClusterCreateController = operationcontrollers.NewGenericOperationClusterCreateController(
operationClusterCreateController = operationcontrollers.NewGenericOperationController(
"OperationClusterCreate",
operationcontrollers.NewOperationClusterCreateSynchronizer(
argLocation,
Expand All @@ -334,7 +334,7 @@ func Run(cmd *cobra.Command, args []string) error {
subscriptionLister,
dbClient,
)
operationClusterUpdateController = operationcontrollers.NewGenericOperationClusterCreateController(
operationClusterUpdateController = operationcontrollers.NewGenericOperationController(
"OperationClusterUpdate",
operationcontrollers.NewOperationClusterUpdateSynchronizer(
dbClient,
Expand All @@ -345,7 +345,7 @@ func Run(cmd *cobra.Command, args []string) error {
subscriptionLister,
dbClient,
)
operationClusterDeleteController = operationcontrollers.NewGenericOperationClusterCreateController(
operationClusterDeleteController = operationcontrollers.NewGenericOperationController(
"OperationClusterDelete",
operationcontrollers.NewOperationClusterDeleteSynchronizer(
dbClient,
Expand All @@ -356,6 +356,28 @@ func Run(cmd *cobra.Command, args []string) error {
subscriptionLister,
dbClient,
)
operationRequestCredentialController = operationcontrollers.NewGenericOperationController(
"OperationRequestCredential",
operationcontrollers.NewOperationRequestCredentialSynchronizer(
dbClient,
clusterServiceClient,
http.DefaultClient,
),
10*time.Second,
subscriptionLister,
dbClient,
)
operationRevokeCredentialsController = operationcontrollers.NewGenericOperationController(
"OperationRevokeCredentials",
operationcontrollers.NewOperationRevokeCredentialsSynchronizer(
dbClient,
clusterServiceClient,
http.DefaultClient,
),
10*time.Second,
subscriptionLister,
dbClient,
)
clusterServiceMatchingClusterController = mismatchcontrollers.NewClusterServiceClusterMatchingController(dbClient, subscriptionLister, clusterServiceClient)
cosmosMatchingNodePoolController = controllerutils.NewClusterWatchingController(
"CosmosMatchingNodePools", dbClient, subscriptionLister, 60*time.Minute,
Expand Down Expand Up @@ -384,6 +406,8 @@ func Run(cmd *cobra.Command, args []string) error {
go operationClusterCreateController.Run(ctx, 20)
go operationClusterUpdateController.Run(ctx, 20)
go operationClusterDeleteController.Run(ctx, 20)
go operationRequestCredentialController.Run(ctx, 20)
go operationRevokeCredentialsController.Run(ctx, 20)
go clusterServiceMatchingClusterController.Run(ctx, 20)
go cosmosMatchingNodePoolController.Run(ctx, 20)
go cosmosMatchingExternalAuthController.Run(ctx, 20)
Expand Down
121 changes: 0 additions & 121 deletions backend/oldoperationscanner/operations_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ const (
pollClusterOperationLabel = "poll_cluster"
pollNodePoolOperationLabel = "poll_node_pool"
pollExternalAuthOperationLabel = "poll_external_auth"
pollBreakGlassCredential = "poll_break_glass_credential"
pollBreakGlassCredentialRevoke = "poll_break_glass_credential_revoke"

TracerName = "github.com/Azure/ARO-HCP/backend"
)
Expand Down Expand Up @@ -103,8 +101,6 @@ func listOperationLabelValues() iter.Seq[string] {
pollClusterOperationLabel,
pollNodePoolOperationLabel,
pollExternalAuthOperationLabel,
pollBreakGlassCredential,
pollBreakGlassCredentialRevoke,
})
}

Expand Down Expand Up @@ -499,17 +495,10 @@ func (s *OperationsScanner) processOperation(ctx context.Context, op operation)
defer logger.Info("Processed")

switch op.doc.InternalID.Kind() {
case arohcpv1alpha1.ClusterKind:
switch op.doc.Request {
case database.OperationRequestRevokeCredentials:
s.pollBreakGlassCredentialRevoke(ctx, op)
}
case arohcpv1alpha1.NodePoolKind:
s.pollNodePoolOperation(ctx, op)
case cmv1.ExternalAuthKind:
s.pollExternalAuthOperation(ctx, op)
case cmv1.BreakGlassCredentialKind:
s.pollBreakGlassCredential(ctx, op)
}
}

Expand Down Expand Up @@ -593,116 +582,6 @@ func (s *OperationsScanner) pollExternalAuthOperation(ctx context.Context, op op
}
}

// pollBreakGlassCredential updates the status of a credential creation operation.
func (s *OperationsScanner) pollBreakGlassCredential(ctx context.Context, op operation) {
logger := utils.LoggerFromContext(ctx)
ctx, span := startChildSpan(ctx, "pollBreakGlassCredential")
defer span.End()
defer s.updateOperationMetrics(pollBreakGlassCredential)()
op.setSpanAttributes(span)

breakGlassCredential, err := s.clusterService.GetBreakGlassCredential(ctx, op.doc.InternalID)
if err != nil {
s.recordOperationError(ctx, pollBreakGlassCredential, err)
logger.Error(fmt.Sprintf("Failed to get break-glass credential: %v", err))
return
}

var opStatus arm.ProvisioningState
var opError *arm.CloudErrorBody

switch status := breakGlassCredential.Status(); status {
case cmv1.BreakGlassCredentialStatusCreated:
opStatus = arm.ProvisioningStateProvisioning
case cmv1.BreakGlassCredentialStatusFailed:
// XXX Cluster Service does not provide a reason for the failure,
// so we have no choice but to use a generic error message.
opStatus = arm.ProvisioningStateFailed
opError = &arm.CloudErrorBody{
Code: arm.CloudErrorCodeInternalServerError,
Message: "Failed to provision cluster credential",
}
case cmv1.BreakGlassCredentialStatusIssued:
opStatus = arm.ProvisioningStateSucceeded
default:
s.recordOperationError(ctx, pollBreakGlassCredential, err)
logger.Error(fmt.Sprintf("Unhandled BreakGlassCredentialStatus '%s'", status))
return
}

err = database.PatchOperationDocument(ctx, s.dbClient, op.doc, opStatus, opError, s.postAsyncNotification)
if err != nil {
s.recordOperationError(ctx, pollBreakGlassCredential, err)
logger.Error(fmt.Sprintf("Failed to update operation status: %v", err))
}
}

// pollBreakGlassCredentialRevoke updates the status of a credential revocation operation.
func (s *OperationsScanner) pollBreakGlassCredentialRevoke(ctx context.Context, op operation) {
logger := utils.LoggerFromContext(ctx)
ctx, span := startChildSpan(ctx, "pollBreakGlassCredentialRevoke")
defer span.End()
defer s.updateOperationMetrics(pollBreakGlassCredentialRevoke)()
op.setSpanAttributes(span)

var opStatus = arm.ProvisioningStateSucceeded
var opError *arm.CloudErrorBody

// XXX Error handling here is tricky. Since the operation applies to multiple
// Cluster Service objects, we can find a mix of successes and failures.
// And with only a Failed status for each object, it's difficult to make
// intelligent decisions like whether to retry. This is just to say the
// error handling policy here may need revising once Cluster Service
// offers more detail to accompany BreakGlassCredentialStatusFailed.

iterator := s.clusterService.ListBreakGlassCredentials(op.doc.InternalID, "")

loop:
for breakGlassCredential := range iterator.Items(ctx) {
// An expired credential is as good as a revoked credential
// for this operation, regardless of the credential status.
if breakGlassCredential.ExpirationTimestamp().After(time.Now()) {
switch status := breakGlassCredential.Status(); status {
case cmv1.BreakGlassCredentialStatusAwaitingRevocation:
opStatus = arm.ProvisioningStateDeleting
// break alone just breaks out of select.
// Use a label to break out of the loop.
break loop
case cmv1.BreakGlassCredentialStatusRevoked:
// maintain ProvisioningStateSucceeded
case cmv1.BreakGlassCredentialStatusFailed:
// XXX Cluster Service does not provide a reason for the failure,
// so we have no choice but to use a generic error message.
opStatus = arm.ProvisioningStateFailed
opError = &arm.CloudErrorBody{
Code: arm.CloudErrorCodeInternalServerError,
Message: "Failed to revoke cluster credential",
}
// break alone just breaks out of select.
// Use a label to break out of the loop.
break loop
default:
err := fmt.Errorf("unhandled BreakGlassCredentialStatus '%s'", status)
s.recordOperationError(ctx, pollBreakGlassCredentialRevoke, err)
logger.Error(err.Error())
}
}
}

err := iterator.GetError()
if err != nil {
s.recordOperationError(ctx, pollBreakGlassCredentialRevoke, err)
logger.Error(fmt.Sprintf("Error while paging through Cluster Service query results: %v", err.Error()))
return
}

err = database.PatchOperationDocument(ctx, s.dbClient, op.doc, opStatus, opError, s.postAsyncNotification)
if err != nil {
s.recordOperationError(ctx, pollBreakGlassCredentialRevoke, err)
logger.Error(fmt.Sprintf("Failed to update operation status: %v", err))
}
}

// withSubscriptionLock holds a subscription lock while executing the given function.
// In the event the subscription lock is lost, the context passed to the function will
// be canceled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ type genericOperation struct {
queue workqueue.TypedRateLimitingInterface[controllerutils.OperationKey]
}

// NewOperationClusterCreateSynchronizer periodically lists all clusters and for each out when the cluster was created and its state.
func NewGenericOperationClusterCreateController(
// NewGenericOperationController returns a Controller that updates Cosmos DB documents
// tracking ongoing asynchronous operations. Each Controller instance has a unique
// OperationSynchronizer that reconciles a particular type of asynchronous operation,
// like cluster creation or node pool deletion.
func NewGenericOperationController(
name string,
synchronizer OperationSynchronizer,
activeOperationScanInterval time.Duration,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2025 Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package operationcontrollers

import (
"context"
"fmt"
"net/http"

cmv1 "github.com/openshift-online/ocm-sdk-go/clustersmgmt/v1"

"github.com/Azure/ARO-HCP/backend/pkg/controllers/controllerutils"
"github.com/Azure/ARO-HCP/internal/api"
"github.com/Azure/ARO-HCP/internal/api/arm"
"github.com/Azure/ARO-HCP/internal/database"
"github.com/Azure/ARO-HCP/internal/ocm"
"github.com/Azure/ARO-HCP/internal/utils"
)

type operationRequestCredential struct {
cosmosClient database.DBClient
clusterServiceClient ocm.ClusterServiceClientSpec
notificationClient *http.Client
}

func NewOperationRequestCredentialSynchronizer(
cosmosClient database.DBClient,
clusterServiceClient ocm.ClusterServiceClientSpec,
notificationClient *http.Client,
) OperationSynchronizer {
return &operationRequestCredential{
cosmosClient: cosmosClient,
clusterServiceClient: clusterServiceClient,
notificationClient: notificationClient,
}
}

func (opsync *operationRequestCredential) ShouldProcess(ctx context.Context, operation *api.Operation) bool {
if operation.Status.IsTerminal() {
return false
}
if operation.Request != database.OperationRequestRequestCredential {
return false
}
return true
}

func (opsync *operationRequestCredential) SynchronizeOperation(ctx context.Context, key controllerutils.OperationKey) error {
logger := utils.LoggerFromContext(ctx)
logger.Info("checking operation")

operation, err := opsync.cosmosClient.Operations(key.SubscriptionID).Get(ctx, key.OperationName)
if database.IsResponseError(err, http.StatusNotFound) {
return nil // no work to do
}
if err != nil {
return fmt.Errorf("failed to get active operation: %w", err)
}
if !opsync.ShouldProcess(ctx, operation) {
return nil // no work to do
}

breakGlassCredential, err := opsync.clusterServiceClient.GetBreakGlassCredential(ctx, operation.InternalID)
if err != nil {
return utils.TrackError(err)
}

var opStatus arm.ProvisioningState
var opError *arm.CloudErrorBody

switch status := breakGlassCredential.Status(); status {
case cmv1.BreakGlassCredentialStatusCreated:
opStatus = arm.ProvisioningStateProvisioning
case cmv1.BreakGlassCredentialStatusFailed:
// XXX Cluster Service does not provide a reason for the failure,
// so we have no choice but to use a generic error message.
opStatus = arm.ProvisioningStateFailed
opError = &arm.CloudErrorBody{
Code: arm.CloudErrorCodeInternalServerError,
Message: "Failed to provision cluster credential",
}
case cmv1.BreakGlassCredentialStatusIssued:
opStatus = arm.ProvisioningStateSucceeded
default:
return fmt.Errorf("unhandled BreakGlassCredentialStatus '%s'", status)
}

logger.Info("updating status")
err = database.PatchOperationDocument(ctx, opsync.cosmosClient, operation, opStatus, opError, PostAsyncNotification(opsync.notificationClient))
if err != nil {
return utils.TrackError(err)
}

return nil
}
Loading