diff --git a/contrib/cmd/runkperf/commands/bench/read_update.go b/contrib/cmd/runkperf/commands/bench/read_update.go index 4074112..a0b2ffd 100644 --- a/contrib/cmd/runkperf/commands/bench/read_update.go +++ b/contrib/cmd/runkperf/commands/bench/read_update.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "sync" + "time" internaltypes "github.com/Azure/kperf/contrib/internal/types" "github.com/Azure/kperf/contrib/utils" @@ -44,8 +45,8 @@ It creates ConfigMaps, establishes watch connections, and then issues concurrent }, cli.IntFlag{ Name: "read-update-configmap-size", - Usage: "Size of each ConfigMap. ConfigMap must not exceed 3 MiB.", - Value: 1024, // 1 MiB + Usage: "Size of each ConfigMap, unit: Byte. ConfigMap must not exceed 1 MiB.", + Value: 1024, // 1 KiB }, cli.StringFlag{ Name: "read-update-name-pattern", @@ -86,7 +87,7 @@ func benchReadUpdateRun(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, er size := cliCtx.Int("read-update-configmap-size") namespace := cliCtx.String("read-update-namespace") namePattern := cliCtx.String("read-update-name-pattern") - if total <= 0 || size <= 0 || total*size > 2*1024*1024 || size > 1024 { + if total <= 0 || size <= 0 || total*size > 2*1024*1024*1024 || size > 1024*1024 { return nil, fmt.Errorf("invalid total (%d) or size (%d) for configmaps: total must be > 0, size must be > 0, and total*size must not exceed 2 GiB, size must not exceed 1 MiB", total, size) } @@ -96,7 +97,7 @@ func benchReadUpdateRun(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, er return nil, fmt.Errorf("failed to build clientset: %w", err) } - err = utils.CreateConfigmaps(ctx, kubeCfgPath, namespace, namePattern, total, size, 2, 0) + err = utils.CreateConfigmaps(ctx, kubeCfgPath, namespace, namePattern, total, size, 30, 0) if err != nil { return nil, fmt.Errorf("failed to create configmaps: %w", err) @@ -111,11 +112,6 @@ func benchReadUpdateRun(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, er }() // Stop all the watches when the function returns - watches := make([]watch.Interface, 0) - defer func() { - stopWatches(watches) - }() - var wg sync.WaitGroup defer wg.Wait() @@ -125,37 +121,56 @@ func benchReadUpdateRun(cliCtx *cli.Context) (*internaltypes.BenchmarkReport, er // Start to watch the configmaps for i := 0; i < total; i++ { wg.Add(1) - go func() { + go func(ii int) { defer wg.Done() - watchReq, err := client.CoreV1().ConfigMaps(namespace). - Watch(context.TODO(), metav1.ListOptions{ - Watch: true, - FieldSelector: fmt.Sprintf("metadata.name=%s-cm-%s-%d", appLabel, namePattern, i), - }) - - watches = append(watches, watchReq) - if err != nil { - fmt.Printf("Error starting watch for configmap %s: %v\n", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, i), err) - return - } - klog.V(5).Infof("Starting watch for configmap: %s\n", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, i)) + timeoutSeconds := int64(100000) + + var watchReq watch.Interface + var err error + defer func() { + if watchReq != nil { + watchReq.Stop() + } + }() - // Process watch events proactively to prevent cache oversizing. for { + if watchReq == nil { + watchReq, err = client.CoreV1().ConfigMaps(namespace). + Watch(context.TODO(), metav1.ListOptions{ + Watch: true, + TimeoutSeconds: &timeoutSeconds, + FieldSelector: fmt.Sprintf("metadata.name=%s-cm-%s-%d", appLabel, namePattern, ii), + }) + + if err != nil { + fmt.Printf("Error starting watch for configmap %s: %v\n", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, ii), err) + time.Sleep(5 * time.Second) + continue + } + klog.V(5).Infof("Starting watch for configmap: %s\n", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, ii)) + } + + // Process watch events proactively to prevent cache oversizing. select { case <-dpCtx.Done(): - klog.V(5).Infof("Stopping watch for configmap: %s\n", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, i)) + klog.V(5).Infof("Stopping watch for configmap: %s\n", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, ii)) return - case event := <-watchReq.ResultChan(): + case event, ok := <-watchReq.ResultChan(): + if !ok { + klog.V(2).Infof("Watch channel closed for configmap: %s", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, ii)) + watchReq.Stop() + watchReq = nil + } if event.Type == watch.Error { - klog.Errorf("Error event received for configmap %s: %v", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, i), event.Object) - return + klog.Errorf("Error event received for configmap %s: %v", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, ii), event.Object) + watchReq.Stop() + watchReq = nil } - klog.V(5).Infof("Event received for configmap %s: %v", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, i), event.Type) + klog.V(5).Infof("Event received for configmap %s: %v", fmt.Sprintf("%s-cm-%s-%d", appLabel, namePattern, ii), event.Type) } } - }() + }(i) } // Deploy the runner group @@ -180,13 +195,3 @@ Environment: Combine %d%% read requests and %d%% update requests during benchmar Info: map[string]interface{}{}, }, nil } - -// StopWatches stops all the watches -func stopWatches(watches []watch.Interface) { - for _, w := range watches { - if w != nil { - klog.V(5).Infof("Stopping watch: %v", w) - w.Stop() - } - } -}