Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 41 additions & 12 deletions pkg/autoscaler/statforwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"go.uber.org/zap"
"k8s.io/client-go/util/workqueue"
"knative.dev/pkg/hash"
"knative.dev/pkg/logging"
"knative.dev/pkg/logging/logkey"
Expand All @@ -38,8 +39,18 @@ const (
retryInterval = 100 * time.Millisecond

// Retry at most 15 seconds to process a stat.
maxProcessingRetry = 30
maxProcessingRetry = 30
// retryProcessingInterval is kept for backward compatibility but no longer used.
// Use rateLimiter instead.
retryProcessingInterval = 500 * time.Millisecond

// Rate limiting configuration for retries.
// fastRetryDelay is used for the first maxFastRetryAttempts retries.
fastRetryDelay = 100 * time.Millisecond
// slowRetryDelay is used after maxFastRetryAttempts retries.
slowRetryDelay = 500 * time.Millisecond
// maxFastRetryAttempts is the number of fast retries before switching to slow retries.
maxFastRetryAttempts = 5
)

var svcURLSuffix = fmt.Sprintf("svc.%s:%d", network.GetClusterDomainName(), autoscalerPort)
Expand Down Expand Up @@ -75,6 +86,9 @@ type Forwarder struct {

statCh chan stat
stopCh chan struct{}

// rateLimiter controls the retry backoff strategy using fast/slow retry delays.
rateLimiter workqueue.TypedRateLimiter[string]
}

// New creates a new Forwarder.
Expand All @@ -84,11 +98,12 @@ type Forwarder struct {
func New(ctx context.Context, bs *hash.BucketSet) *Forwarder {
bkts := bs.Buckets()
f := &Forwarder{
logger: logging.FromContext(ctx),
bs: bs,
processors: make(map[string]bucketProcessor, len(bkts)),
statCh: make(chan stat, 1000),
stopCh: make(chan struct{}),
logger: logging.FromContext(ctx),
bs: bs,
processors: make(map[string]bucketProcessor, len(bkts)),
statCh: make(chan stat, 1000),
stopCh: make(chan struct{}),
rateLimiter: workqueue.NewTypedItemFastSlowRateLimiter[string](fastRetryDelay, slowRetryDelay, maxFastRetryAttempts),
}

f.processingWg.Add(1)
Expand Down Expand Up @@ -142,23 +157,37 @@ func (f *Forwarder) process() {
if err := p.process(s.sm); err != nil {
l.Errorw("Error while processing stat", zap.Error(err))
f.maybeRetry(l, s)
} else {
// Successfully processed, forget the retry state for this stat.
f.rateLimiter.Forget(rev)
}
}
}
}

func (f *Forwarder) maybeRetry(logger *zap.SugaredLogger, s stat) {
if s.retry > maxProcessingRetry {
logger.Warn("Exceeding max retries. Dropping the stat.")
rev := s.sm.Key.String()

// Check current retry count before calling When (which increments the count).
numRequeues := f.rateLimiter.NumRequeues(rev)
if numRequeues >= maxProcessingRetry {
logger.Warnf("Exceeding max retries (%d). Dropping the stat.", maxProcessingRetry)
// Clean up the rate limiter state for this stat.
f.rateLimiter.Forget(rev)
return
}

s.retry++
// Get the retry delay from the rate limiter (fast for initial retries, slow after).
// This will increment the internal retry count.
retryDelay := f.rateLimiter.When(rev)

f.retryWg.Add(1)
go func() {
defer f.retryWg.Done()
// TODO(yanweiguo): Use RateLimitingInterface and NewItemFastSlowRateLimiter.
time.Sleep(retryProcessingInterval)
logger.Debug("Enqueuing stat for retry.")
time.Sleep(retryDelay)
logger.Debugf("Enqueuing stat for retry (attempt %d, delay %v).", numRequeues+1, retryDelay)
// Increment retry count for tracking (though rate limiter also tracks this).
s.retry++
f.statCh <- s
}()
}
Expand Down
Loading