diff --git a/cns/kubecontroller/nodenetworkconfig/reconciler.go b/cns/kubecontroller/nodenetworkconfig/reconciler.go index 81d389f73d..6a067cf567 100644 --- a/cns/kubecontroller/nodenetworkconfig/reconciler.go +++ b/cns/kubecontroller/nodenetworkconfig/reconciler.go @@ -27,8 +27,10 @@ type cnsClient interface { MustEnsureNoStaleNCs(validNCIDs []string) } +type nodenetworkconfigSink func(*v1alpha.NodeNetworkConfig) error + type nodeNetworkConfigListener interface { - Update(*v1alpha.NodeNetworkConfig) error + Update(*v1alpha.NodeNetworkConfig) error // phasing this out in favor of the sink } type nncGetter interface { @@ -38,31 +40,33 @@ type nncGetter interface { // Reconciler watches for CRD status changes type Reconciler struct { cnscli cnsClient - ipampoolmonitorcli nodeNetworkConfigListener + ipampoolmonitorcli nodenetworkconfigSink nnccli nncGetter once sync.Once - started chan interface{} + started chan any nodeIP string isSwiftV2 bool + initializer nodenetworkconfigSink } // NewReconciler creates a NodeNetworkConfig Reconciler which will get updates from the Kubernetes // apiserver for NNC events. // Provided nncListeners are passed the NNC after the Reconcile preprocesses it. Note: order matters! The // passed Listeners are notified in the order provided. -func NewReconciler(cnscli cnsClient, ipampoolmonitorcli nodeNetworkConfigListener, nodeIP string, isSwiftV2 bool) *Reconciler { +func NewReconciler(cnscli cnsClient, initializer nodenetworkconfigSink, ipampoolmonitorcli nodeNetworkConfigListener, nodeIP string, isSwiftV2 bool) *Reconciler { return &Reconciler{ cnscli: cnscli, - ipampoolmonitorcli: ipampoolmonitorcli, - started: make(chan interface{}), + ipampoolmonitorcli: ipampoolmonitorcli.Update, + started: make(chan any), nodeIP: nodeIP, isSwiftV2: isSwiftV2, + initializer: initializer, } } // Reconcile is called on CRD status changes func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - listenersToNotify := []nodeNetworkConfigListener{} + listenersToNotify := []nodenetworkconfigSink{} nnc, err := r.nnccli.Get(ctx, req.NamespacedName) if err != nil { if apierrors.IsNotFound(err) { @@ -89,6 +93,15 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco } r.cnscli.MustEnsureNoStaleNCs(validNCIDs) + // call initFunc on first reconcile and never again + if r.initializer != nil { + if err := r.initializer(nnc); err != nil { + logger.Errorf("[cns-rc] initializer failed during reconcile: %v", err) + return reconcile.Result{}, errors.Wrap(err, "initializer failed during reconcile") + } + r.initializer = nil + } + // for each NC, parse it in to a CreateNCRequest and forward it to the appropriate Listener for i := range nnc.Status.NetworkContainers { // check if this NC matches the Node IP if we have one to check against @@ -134,7 +147,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reco // push the NNC to the registered NNC listeners. for _, l := range listenersToNotify { - if err := l.Update(nnc); err != nil { + if err := l(nnc); err != nil { return reconcile.Result{}, errors.Wrap(err, "nnc listener return error during update") } } diff --git a/cns/kubecontroller/nodenetworkconfig/reconciler_test.go b/cns/kubecontroller/nodenetworkconfig/reconciler_test.go index bcf882b2b8..5e93f4ff0d 100644 --- a/cns/kubecontroller/nodenetworkconfig/reconciler_test.go +++ b/cns/kubecontroller/nodenetworkconfig/reconciler_test.go @@ -192,7 +192,7 @@ func TestReconcile(t *testing.T) { } t.Run(tt.name, func(t *testing.T) { - r := NewReconciler(&tt.cnsClient, &tt.cnsClient, tt.nodeIP, false) + r := NewReconciler(&tt.cnsClient, func(*v1alpha.NodeNetworkConfig) error { return nil }, &tt.cnsClient, tt.nodeIP, false) r.nnccli = &tt.ncGetter got, err := r.Reconcile(context.Background(), tt.in) if tt.wantErr { @@ -249,7 +249,7 @@ func TestReconcileStaleNCs(t *testing.T) { return &nncLog[len(nncLog)-1], nil } - r := NewReconciler(&cnsClient, &cnsClient, nodeIP, false) + r := NewReconciler(&cnsClient, func(*v1alpha.NodeNetworkConfig) error { return nil }, &cnsClient, nodeIP, false) r.nnccli = &mockNCGetter{get: nncIterator} _, err := r.Reconcile(context.Background(), reconcile.Request{}) @@ -266,3 +266,76 @@ func TestReconcileStaleNCs(t *testing.T) { assert.Contains(t, cnsClient.state.reqsByNCID, "nc3") assert.Contains(t, cnsClient.state.reqsByNCID, "nc4") } + +func TestReconcileInitializerRunsOnceOnSuccess(t *testing.T) { + logger.InitLogger("", 0, 0, "") + + initializerCalls := 0 + initializer := func(*v1alpha.NodeNetworkConfig) error { + initializerCalls++ + return nil + } + + cnsClient := mockCNSClient{ + state: cnsClientState{reqsByNCID: make(map[string]*cns.CreateNetworkContainerRequest)}, + createOrUpdateNC: func(*cns.CreateNetworkContainerRequest) cnstypes.ResponseCode { return cnstypes.Success }, + update: func(*v1alpha.NodeNetworkConfig) error { return nil }, + } + + ncGetter := mockNCGetter{get: func(context.Context, types.NamespacedName) (*v1alpha.NodeNetworkConfig, error) { + return &v1alpha.NodeNetworkConfig{Status: validSwiftStatus}, nil + }} + + r := NewReconciler(&cnsClient, initializer, &cnsClient, "", false) + r.nnccli = &ncGetter + + _, err := r.Reconcile(context.Background(), reconcile.Request{}) + require.NoError(t, err) + _, err = r.Reconcile(context.Background(), reconcile.Request{}) + require.NoError(t, err) + + assert.Equal(t, 1, initializerCalls) + assert.Nil(t, r.initializer) +} + +func TestReconcileInitializerRetriesAfterFailure(t *testing.T) { + logger.InitLogger("", 0, 0, "") + + initializerCalls := 0 + initializer := func(*v1alpha.NodeNetworkConfig) error { + initializerCalls++ + if initializerCalls == 1 { + return errors.New("init failed") + } + return nil + } + + createCalls := 0 + cnsClient := mockCNSClient{ + state: cnsClientState{reqsByNCID: make(map[string]*cns.CreateNetworkContainerRequest)}, + createOrUpdateNC: func(*cns.CreateNetworkContainerRequest) cnstypes.ResponseCode { + createCalls++ + return cnstypes.Success + }, + update: func(*v1alpha.NodeNetworkConfig) error { return nil }, + } + + ncGetter := mockNCGetter{get: func(context.Context, types.NamespacedName) (*v1alpha.NodeNetworkConfig, error) { + return &v1alpha.NodeNetworkConfig{Status: validSwiftStatus}, nil + }} + + r := NewReconciler(&cnsClient, initializer, &cnsClient, "", false) + r.nnccli = &ncGetter + + _, err := r.Reconcile(context.Background(), reconcile.Request{}) + require.Error(t, err) + assert.NotNil(t, r.initializer) + assert.Equal(t, 1, initializerCalls) + assert.Equal(t, 0, createCalls) + + _, err = r.Reconcile(context.Background(), reconcile.Request{}) + require.NoError(t, err) + assert.Equal(t, 2, initializerCalls) + assert.Equal(t, 1, createCalls) + assert.Nil(t, r.initializer) +} diff --git a/cns/service/main.go b/cns/service/main.go index e980360cf6..092bbf1acc 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -52,7 +52,6 @@ import ( cnstypes "github.com/Azure/azure-container-networking/cns/types" "github.com/Azure/azure-container-networking/cns/wireserver" acn "github.com/Azure/azure-container-networking/common" - "github.com/Azure/azure-container-networking/crd" "github.com/Azure/azure-container-networking/crd/clustersubnetstate" cssv1alpha1 "github.com/Azure/azure-container-networking/crd/clustersubnetstate/api/v1alpha1" "github.com/Azure/azure-container-networking/crd/multitenancy" @@ -74,7 +73,6 @@ import ( "go.uber.org/zap" "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" kuberuntime "k8s.io/apimachinery/pkg/runtime" @@ -1307,37 +1305,17 @@ func InitializeMultiTenantController(ctx context.Context, httpRestService cns.HT return nil } -type nodeNetworkConfigGetter interface { - Get(context.Context) (*v1alpha.NodeNetworkConfig, error) -} - type ipamStateReconciler interface { ReconcileIPAMStateForSwift(ncRequests []*cns.CreateNetworkContainerRequest, podInfoByIP map[string]cns.PodInfo, nnc *v1alpha.NodeNetworkConfig) cnstypes.ResponseCode } // TODO(rbtr) where should this live?? // reconcileInitialCNSState initializes cns by passing pods and a CreateNetworkContainerRequest -func reconcileInitialCNSState(ctx context.Context, cli nodeNetworkConfigGetter, ipamReconciler ipamStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider, isSwiftV2 bool) error { - // Get nnc using direct client - nnc, err := cli.Get(ctx) - if err != nil { - if crd.IsNotDefined(err) { - return errors.Wrap(err, "failed to init CNS state: NNC CRD is not defined") - } - if apierrors.IsNotFound(err) { - return errors.Wrap(err, "failed to init CNS state: NNC not found") - } - return errors.Wrap(err, "failed to init CNS state: failed to get NNC CRD") - } - - logger.Printf("Retrieved NNC: %+v", nnc) - if !nnc.DeletionTimestamp.IsZero() { - return errors.New("failed to init CNS state: NNC is being deleted") - } - - // If there are no NCs, we can't initialize our state and we should fail out. - if len(nnc.Status.NetworkContainers) == 0 { - return errors.New("failed to init CNS state: no NCs found in NNC CRD") +func reconcileInitialCNSState(nnc *v1alpha.NodeNetworkConfig, ipamReconciler ipamStateReconciler, podInfoByIPProvider cns.PodInfoByIPProvider, isSwiftV2 bool) error { + // if no NCs, nothing to do + ncCount := len(nnc.Status.NetworkContainers) + if ncCount == 0 { + return errors.New("no network containers found in NNC status") } // Get previous PodInfo state from podInfoByIPProvider @@ -1433,7 +1411,7 @@ func InitializeCRDState(ctx context.Context, z *zap.Logger, httpRestService cns. if err = PopulateCNSEndpointState(httpRestServiceImplementation.EndpointStateStore); err != nil { return errors.Wrap(err, "failed to create CNS EndpointState From CNI") } - // endpoint state needs tobe loaded in memory so the subsequent Delete calls remove the state and release the IPs. + // endpoint state needs to be loaded in memory so the subsequent Delete calls remove the state and release the IPs. if err = httpRestServiceImplementation.EndpointStateStore.Read(restserver.EndpointStoreKey, &httpRestServiceImplementation.EndpointState); err != nil { return errors.Wrap(err, "failed to restore endpoint state") } @@ -1444,35 +1422,16 @@ func InitializeCRDState(ctx context.Context, z *zap.Logger, httpRestService cns. return errors.Wrap(err, "failed to initialize ip state") } - // create scoped kube clients. - directcli, err := client.New(kubeConfig, client.Options{Scheme: nodenetworkconfig.Scheme}) - if err != nil { - return errors.Wrap(err, "failed to create ctrl client") - } - directnnccli := nodenetworkconfig.NewClient(directcli) - if err != nil { - return errors.Wrap(err, "failed to create NNC client") - } - // TODO(rbtr): nodename and namespace should be in the cns config - directscopedcli := nncctrl.NewScopedClient(directnnccli, types.NamespacedName{Namespace: "kube-system", Name: nodeName}) - - logger.Printf("Reconciling initial CNS state") - // apiserver nnc might not be registered or api server might be down and crashloop backof puts us outside of 5-10 minutes we have for - // aks addons to come up so retry a bit more aggresively here. - // will retry 10 times maxing out at a minute taking about 8 minutes before it gives up. - attempt := 0 - _ = retry.Do(func() error { - attempt++ - logger.Printf("reconciling initial CNS state attempt: %d", attempt) - err = reconcileInitialCNSState(ctx, directscopedcli, httpRestServiceImplementation, podInfoByIPProvider, cnsconfig.EnableSwiftV2) - if err != nil { - logger.Errorf("failed to reconcile initial CNS state, attempt: %d err: %v", attempt, err) - nncInitFailure.Inc() + initializerWrapper := func(nnc *v1alpha.NodeNetworkConfig) error { + logger.Printf("Reconciling initial CNS state") + if err := reconcileInitialCNSState(nnc, httpRestServiceImplementation, podInfoByIPProvider, cnsconfig.EnableSwiftV2); err != nil { + return err } - return errors.Wrap(err, "failed to initialize CNS state") - }, retry.Context(ctx), retry.Delay(initCNSInitalDelay), retry.MaxDelay(time.Minute), retry.UntilSucceeded()) - logger.Printf("reconciled initial CNS state after %d attempts", attempt) - hasNNCInitialized.Set(1) + hasNNCInitialized.Set(1) + return nil + } + + scheme := kuberuntime.NewScheme() if err := corev1.AddToScheme(scheme); err != nil { //nolint:govet // intentional shadow return errors.Wrap(err, "failed to add corev1 to scheme") @@ -1561,7 +1520,7 @@ func InitializeCRDState(ctx context.Context, z *zap.Logger, httpRestService cns. // get CNS Node IP to compare NC Node IP with this Node IP to ensure NCs were created for this node nodeIP := configuration.NodeIP() - nncReconciler := nncctrl.NewReconciler(httpRestServiceImplementation, poolMonitor, nodeIP, cnsconfig.EnableSwiftV2) + nncReconciler := nncctrl.NewReconciler(httpRestServiceImplementation, initializerWrapper, poolMonitor, nodeIP, cnsconfig.EnableSwiftV2) // pass Node to the Reconciler for Controller xref // IPAMv1 - reconcile only status changes (where generation doesn't change). // IPAMv2 - reconcile all updates.