Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ func Run(cmd *cobra.Command, args []string) error {
clusterServiceMatchingExternalAuthController = controllerutils.NewClusterWatchingController(
"ClusterServiceMatchingExternalAuths", dbClient, subscriptionLister, 60*time.Minute,
mismatchcontrollers.NewClusterServiceExternalAuthMatchingController(dbClient, clusterServiceClient))
deleteOrphanedCosmosResourcesController = mismatchcontrollers.NewDeleteOrphanedCosmosResourcesController(dbClient, subscriptionLister)
)

le, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Expand All @@ -368,6 +369,7 @@ func Run(cmd *cobra.Command, args []string) error {
go clusterServiceMatchingClusterController.Run(ctx, 20)
go clusterServiceMatchingNodePoolController.Run(ctx, 20)
go clusterServiceMatchingExternalAuthController.Run(ctx, 20)
go deleteOrphanedCosmosResourcesController.Run(ctx, 20)
},
OnStoppedLeading: func() {
operationsScanner.LeaderGauge.Set(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (c *clusterWatchingController) processNextWorkItem(ctx context.Context) boo
defer c.queue.Done(ref)

logger := utils.LoggerFromContext(ctx)
logger = ref.AddLoggerValues(logger)
ctx = utils.ContextWithLogger(ctx, logger)

err := c.SyncOnce(ctx, ref)
Expand Down
240 changes: 240 additions & 0 deletions backend/pkg/controllers/mismatchcontrollers/delete_orphaned_cosmos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
// Copyright 2025 Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mismatchcontrollers

import (
"context"
"errors"
"fmt"
"slices"
"strings"
"time"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"

azcorearm "github.com/Azure/azure-sdk-for-go/sdk/azcore/arm"

"github.com/Azure/ARO-HCP/backend/pkg/controllers/controllerutils"
"github.com/Azure/ARO-HCP/backend/pkg/listers"
"github.com/Azure/ARO-HCP/internal/api"
"github.com/Azure/ARO-HCP/internal/api/arm"
"github.com/Azure/ARO-HCP/internal/database"
"github.com/Azure/ARO-HCP/internal/utils"
)

type deleteOrphanedCosmosResources struct {
name string

subscriptionLister listers.SubscriptionLister
cosmosClient database.DBClient

// queue is where incoming work is placed to de-dup and to allow "easy"
// rate limited requeues on errors
queue workqueue.TypedRateLimitingInterface[string]
}

// NewDeleteOrphanedCosmosResourcesController periodically looks for cosmos objs that don't have an owning cluster and deletes them.
func NewDeleteOrphanedCosmosResourcesController(cosmosClient database.DBClient, subscriptionLister listers.SubscriptionLister) controllerutils.Controller {
c := &deleteOrphanedCosmosResources{
name: "DeleteOrphanedCosmosResources",
subscriptionLister: subscriptionLister,
cosmosClient: cosmosClient,
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "DeleteOrphanedCosmosResources",
},
),
}

return c
}

func (c *deleteOrphanedCosmosResources) synchronizeSubscription(ctx context.Context, subscription string) error {
logger := utils.LoggerFromContext(ctx)

subscriptionResourceID, err := arm.ToSubscriptionResourceID(subscription)
if err != nil {
return utils.TrackError(err)
}
untypedSubscriptionCRUD, err := c.cosmosClient.UntypedCRUD(*subscriptionResourceID)
if err != nil {
return utils.TrackError(err)
}
paginatedListOptions := &database.DBClientListResourceDocsOptions{
PageSizeHint: ptr.To(int32(500)),
}
subscriptionResourceIterator, err := untypedSubscriptionCRUD.ListRecursive(ctx, paginatedListOptions)
if err != nil {
return utils.TrackError(err)
}

errs := []error{}
// while the number of items is large, but we can paginate through
allSubscriptionResourceIDs := map[string]*azcorearm.ResourceID{}
for _, subscriptionResource := range subscriptionResourceIterator.Items(ctx) {
resourceID, err := api.CosmosIDToResourceID(subscriptionResource.ID)
if err != nil {
// we cannot convert the stored cosmos ID into a resourceID, then it was never migrated. This means it was inaccessible and needs to be removed
logger.Error("cosmos resource not accessible, removing", "cosmosResourceID", subscriptionResource.ID)
if err := untypedSubscriptionCRUD.DeleteByCosmosID(ctx, subscriptionResource.PartitionKey, subscriptionResource.ID); err != nil {
errs = append(errs, utils.TrackError(fmt.Errorf("unable to delete %q in %q: %w", subscriptionResource.ID, subscriptionResource.PartitionKey, err)))
}
continue
}
allSubscriptionResourceIDs[resourceID.String()] = resourceID
}
if err := subscriptionResourceIterator.GetError(); err != nil {
return utils.TrackError(err)
}

// longer strings are first, so we're guaranteed to see children before parents when we iterate
resourceIDStrings := sets.KeySet(allSubscriptionResourceIDs).UnsortedList()
slices.Sort(resourceIDStrings)
slices.Reverse(resourceIDStrings)

// at this point we have every resourceID under the subscription that is under a resourcegroup
resourceGroupPrefix := subscriptionResourceID.String() + "/resourcegroups/"
for _, currResourceIDString := range resourceIDStrings {
currResourceID := allSubscriptionResourceIDs[currResourceIDString]
switch {
case strings.EqualFold(currResourceID.ResourceType.String(), api.ClusterResourceType.String()):
// clusters clearly have an owning cluster
continue
case !strings.HasPrefix(currResourceIDString, resourceGroupPrefix):
// skip anything outside a resourcegroup (operations for instance)
continue
case !strings.EqualFold(currResourceID.ResourceType.Namespace, api.ProviderNamespace):
// any resources outside our namespace we shouldn't delete
continue
}

_, parentExists := allSubscriptionResourceIDs[currResourceID.Parent.String()]
if !parentExists {
localLogger := logger.With(
"cosmosResourceID", currResourceIDString,
"resource_group", currResourceID.ResourceGroupName,
"resource_id", currResourceID,
"resource_name", currResourceID.Name,
"hcp_cluster_name", clusterNameOfResourceID(currResourceID),
)
ctxWithLocalLogger := utils.ContextWithLogger(ctx, localLogger) // setting so that other calls down the chain will show correctly in kusto for the delete
localLogger.Info("deleting orphaned cosmos resource")
if err := untypedSubscriptionCRUD.Delete(ctxWithLocalLogger, currResourceID); err != nil {
errs = append(errs, utils.TrackError(fmt.Errorf("unable to delete %q in %q: %w", currResourceIDString, currResourceID.Parent.String(), err)))
}
}
}

return errors.Join(errs...)
}

func clusterNameOfResourceID(resourceID *azcorearm.ResourceID) string {
if resourceID == nil {
return ""
}
if !strings.EqualFold(resourceID.ResourceType.Namespace, api.ProviderNamespace) {
return ""
}
if strings.EqualFold(resourceID.ResourceType.String(), api.ClusterResourceType.String()) {
return resourceID.String()
}
return clusterNameOfResourceID(resourceID.Parent)
}

func (c *deleteOrphanedCosmosResources) SyncOnce(ctx context.Context, subscription any) error {
logger := utils.LoggerFromContext(ctx)

syncErr := c.synchronizeSubscription(ctx, subscription.(string)) // we'll handle this is a moment.
if syncErr != nil {
logger.Error("unable to synchronize all clusters", "error", syncErr)
}

return utils.TrackError(syncErr)
}

func (c *deleteOrphanedCosmosResources) queueAllSubscriptions(ctx context.Context) {
logger := utils.LoggerFromContext(ctx)

allSubscriptions, err := c.subscriptionLister.List(ctx)
if err != nil {
logger.Error("unable to list subscriptions", "error", err)
}
for _, subscription := range allSubscriptions {
c.queue.Add(subscription.ResourceID.SubscriptionID)
}
}

func (c *deleteOrphanedCosmosResources) Run(ctx context.Context, threadiness int) {
// don't let panics crash the process
defer utilruntime.HandleCrash()
// make sure the work queue is shutdown which will trigger workers to end
defer c.queue.ShutDown()

logger := utils.LoggerFromContext(ctx)
logger.With("controller_name", c.name)
ctx = utils.ContextWithLogger(ctx, logger)
logger.Info("Starting")

// start up your worker threads based on threadiness. Some controllers
// have multiple kinds of workers
for i := 0; i < threadiness; i++ {
// runWorker will loop until "something bad" happens. The .Until will
// then rekick the worker after one second
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}

go wait.JitterUntilWithContext(ctx, c.queueAllSubscriptions, 60*time.Minute, 0.1, true)

logger.Info("Started workers")

// wait until we're told to stop
<-ctx.Done()
logger.Info("Shutting down")
}

func (c *deleteOrphanedCosmosResources) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}

// processNextWorkItem deals with one item off the queue. It returns false
// when it's time to quit.
func (c *deleteOrphanedCosmosResources) processNextWorkItem(ctx context.Context) bool {
ref, shutdown := c.queue.Get()
if shutdown {
return false
}
defer c.queue.Done(ref)

logger := utils.LoggerFromContext(ctx)
logger = logger.With("subscription_id", ref)
ctx = utils.ContextWithLogger(ctx, logger)

err := c.SyncOnce(ctx, ref)
if err == nil {
c.queue.Forget(ref)
return true
}

utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", ref)
c.queue.AddRateLimited(ref)

return true
}
3 changes: 1 addition & 2 deletions frontend/pkg/frontend/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func (f *Frontend) updateHCPClusterInCosmos(ctx context.Context, writer http.Res
}

// Read back the resource document so the response body is accurate.
resultingUncastObj, err := transactionResult.GetItem(oldInternalCluster.ServiceProviderProperties.CosmosUID)
resultingUncastObj, err := transactionResult.GetItem(oldInternalCluster.GetCosmosData().CosmosUID)
if err != nil {
return utils.TrackError(err)
}
Expand Down Expand Up @@ -800,7 +800,6 @@ func mergeToInternalCluster(csCluster *arohcpv1alpha1.Cluster, internalCluster *
clusterServiceBasedInternalCluster.ServiceProviderProperties.ProvisioningState = internalCluster.ServiceProviderProperties.ProvisioningState
clusterServiceBasedInternalCluster.ServiceProviderProperties.ActiveOperationID = internalCluster.ServiceProviderProperties.ActiveOperationID
clusterServiceBasedInternalCluster.ServiceProviderProperties.ClusterServiceID = internalCluster.ServiceProviderProperties.ClusterServiceID
clusterServiceBasedInternalCluster.ServiceProviderProperties.CosmosUID = internalCluster.ServiceProviderProperties.CosmosUID
if clusterServiceBasedInternalCluster.Identity == nil {
clusterServiceBasedInternalCluster.Identity = &arm.ManagedServiceIdentity{}
}
Expand Down
3 changes: 1 addition & 2 deletions frontend/pkg/frontend/external_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func (f *Frontend) updateExternalAuthInCosmos(ctx context.Context, writer http.R
}

// Read back the resource document so the response body is accurate.
resultingUncastInternalExternalAuth, err := transactionResult.GetItem(oldInternalExternalAuth.ServiceProviderProperties.CosmosUID)
resultingUncastInternalExternalAuth, err := transactionResult.GetItem(oldInternalExternalAuth.GetCosmosData().CosmosUID)
if err != nil {
return utils.TrackError(err)
}
Expand Down Expand Up @@ -681,7 +681,6 @@ func mergeToInternalExternalAuth(csEternalAuth *arohcpv1alpha1.ExternalAuth, int
// this does not use conversion.CopyReadOnly* because some ServiceProvider properties come from cluster-service-only or live reads
mergedExternalAuth.SystemData = internalObj.SystemData
mergedExternalAuth.Properties.ProvisioningState = internalObj.Properties.ProvisioningState
mergedExternalAuth.ServiceProviderProperties.CosmosUID = internalObj.ServiceProviderProperties.CosmosUID
mergedExternalAuth.ServiceProviderProperties.ClusterServiceID = internalObj.ServiceProviderProperties.ClusterServiceID
mergedExternalAuth.ServiceProviderProperties.ActiveOperationID = internalObj.ServiceProviderProperties.ActiveOperationID

Expand Down
3 changes: 1 addition & 2 deletions frontend/pkg/frontend/node_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ func (f *Frontend) updateNodePoolInCosmos(ctx context.Context, writer http.Respo
}

// Read back the resource document so the response body is accurate.
resultingUncastInternalNodePool, err := transactionResult.GetItem(oldInternalNodePool.ServiceProviderProperties.CosmosUID)
resultingUncastInternalNodePool, err := transactionResult.GetItem(oldInternalNodePool.GetCosmosData().CosmosUID)
if err != nil {
return utils.TrackError(err)
}
Expand Down Expand Up @@ -729,7 +729,6 @@ func mergeToInternalNodePool(clusterServiceNode *arohcpv1alpha1.NodePool, intern
mergedOldClusterServiceNodePool.SystemData = internalNodePool.SystemData
mergedOldClusterServiceNodePool.Tags = maps.Clone(internalNodePool.Tags)
mergedOldClusterServiceNodePool.Properties.ProvisioningState = internalNodePool.Properties.ProvisioningState
mergedOldClusterServiceNodePool.ServiceProviderProperties.CosmosUID = internalNodePool.ServiceProviderProperties.CosmosUID
mergedOldClusterServiceNodePool.ServiceProviderProperties.ClusterServiceID = internalNodePool.ServiceProviderProperties.ClusterServiceID
mergedOldClusterServiceNodePool.ServiceProviderProperties.ActiveOperationID = internalNodePool.ServiceProviderProperties.ActiveOperationID

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
"_rid": "LKgdAIiT-BgDAAAAAAAAAA==",
"_self": "dbs/LKgdAA==/colls/LKgdAIiT-Bg=/docs/LKgdAIiT-BgDAAAAAAAAAA==/",
"_ts": 1765995430,
"id": "b3fa2ee1-d1b3-4eaa-beae-7dd4003b4987",
"id": "|subscriptions|b3fa2ee1-d1b3-4eaa-beae-7dd4003b4987",
"partitionKey": "b3fa2ee1-d1b3-4eaa-beae-7dd4003b4987",
"properties": {
"resourceId": "/subscriptions/b3fa2ee1-d1b3-4eaa-beae-7dd4003b4987",
"properties": null,
"registrationDate": "2025-12-17T18:16:37+00:00",
"state": "Registered"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"_rid": "zDEpAKTEnZ4DAAAAAAAAAA==",
"_self": "dbs/zDEpAA==/colls/zDEpAKTEnZ4=/docs/zDEpAKTEnZ4DAAAAAAAAAA==/",
"_ts": 1765995984,
"id": "ddfbdeeb-89a1-4a9a-9469-2895f63e2d82",
"id": "|subscriptions|ddfbdeeb-89a1-4a9a-9469-2895f63e2d82",
"partitionKey": "ddfbdeeb-89a1-4a9a-9469-2895f63e2d82",
"properties": {
"properties": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
"_rid": "LKgdAIiT-BgDAAAAAAAAAA==",
"_self": "dbs/LKgdAA==/colls/LKgdAIiT-Bg=/docs/LKgdAIiT-BgDAAAAAAAAAA==/",
"_ts": 1765995430,
"id": "b3fa2ee1-d1b3-4eaa-beae-7dd4003b4987",
"id": "|subscriptions|b3fa2ee1-d1b3-4eaa-beae-7dd4003b4987",
"partitionKey": "b3fa2ee1-d1b3-4eaa-beae-7dd4003b4987",
"properties": {
"resourceId": "/subscriptions/b3fa2ee1-d1b3-4eaa-beae-7dd4003b4987",
"properties": null,
"registrationDate": "2025-12-17T18:16:37+00:00",
"state": "Registered"
Expand Down
11 changes: 1 addition & 10 deletions internal/api/arm/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,8 @@ type Subscription struct {
}

func (o *Subscription) GetCosmosData() CosmosData {
cosmosUID := strings.ReplaceAll(strings.ToLower(o.ResourceID.String()), "/", "|")
if len(o.CosmosUID) != 0 {
// if this is an item that is being serialized for the first time, then we can force it to use the new scheme.
// if it already thinks it knows its CosmosID, then we must accept what it thinks because this could be a case
// where we have a new backend and an old frontend. In that case, the content still has random UIDs, but the backend
// must be able to read AND write the records. This means we cannot assume that all UIDs have already changed.
cosmosUID = o.CosmosUID
}

return CosmosData{
CosmosUID: cosmosUID,
CosmosUID: strings.ReplaceAll(strings.ToLower(o.ResourceID.String()), "/", "|"),
PartitionKey: strings.ToLower(o.ResourceID.Name),
ItemID: o.ResourceID,
}
Expand Down
12 changes: 1 addition & 11 deletions internal/api/types_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,8 @@ type HCPOpenShiftCluster struct {
var _ CosmosPersistable = &HCPOpenShiftCluster{}

func (o *HCPOpenShiftCluster) GetCosmosData() CosmosData {
cosmosUID := Must(ResourceIDToCosmosID(o.ID))
if len(o.ServiceProviderProperties.CosmosUID) != 0 {
// if this is an item that is being serialized for the first time, then we can force it to use the new scheme.
// if it already thinks it knows its CosmosID, then we must accept what it thinks because this could be a case
// where we have a new backend and an old frontend. In that case, the content still has random UIDs, but the backend
// must be able to read AND write the records. This means we cannot assume that all UIDs have already changed.
cosmosUID = o.ServiceProviderProperties.CosmosUID
}

return CosmosData{
CosmosUID: cosmosUID,
CosmosUID: Must(ResourceIDToCosmosID(o.ID)),
PartitionKey: strings.ToLower(o.ID.SubscriptionID),
ItemID: o.ID,
}
Expand All @@ -66,7 +57,6 @@ type HCPOpenShiftClusterCustomerProperties struct {
// HCPOpenShiftClusterCustomerProperties represents the property bag of a HCPOpenShiftCluster resource.
type HCPOpenShiftClusterServiceProviderProperties struct {
ProvisioningState arm.ProvisioningState `json:"provisioningState,omitempty"`
CosmosUID string `json:"cosmosUID,omitempty"`
ClusterServiceID InternalID `json:"clusterServiceID,omitempty"`
ActiveOperationID string `json:"activeOperationId,omitempty"`
DNS ServiceProviderDNSProfile `json:"dns,omitempty"`
Expand Down
Loading