Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 101 additions & 28 deletions internal/integration/unified/client_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,50 @@ var securitySensitiveCommands = []string{
"createUser", "updateUser", "copydbgetnonce", "copydbsaslstart", "copydb",
}

// eventSequencer allows for sequence-based event filtering for
// awaitMinPoolSizeMS support.
//
// Per the unified test format spec, when awaitMinPoolSizeMS is specified, any
// CMAP and SDAM events that occur during connection pool initialization
// (before minPoolSize is reached) must be ignored. We track this by
// assigning a monotonically increasing sequence number to each event as it's
// recorded. After pool initialization completes, we set eventCutoffSeq to the
// current sequence number. Event accessors for CMAP and SDAM types then
// filter out any events with sequence <= eventCutoffSeq.
type eventSequencer struct {
counter atomic.Int64
cutoff int64

// pool events are heterogeneous, so we track their sequence separately
poolSeq []int64
seqByEventType map[monitoringEventType][]int64
}

// setCutoff marks the current sequence as the filtering cutoff point.
func (es *eventSequencer) setCutoff() {
es.cutoff = es.counter.Load()
}

// recordEvent stores the sequence number for a given event type.
func (es *eventSequencer) recordEvent(eventType monitoringEventType) {
next := es.counter.Add(1)
es.seqByEventType[eventType] = append(es.seqByEventType[eventType], next)
}

func (es *eventSequencer) recordPooledEvent() {
next := es.counter.Add(1)
es.poolSeq = append(es.poolSeq, next)
}

// shouldFilter returns true if the event at the given index should be filtered.
func (es *eventSequencer) shouldFilter(eventType monitoringEventType, index int) bool {
if es.cutoff == 0 {
return false
}

return es.seqByEventType[eventType][index] <= es.cutoff
Copy link

Copilot AI Nov 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential index out of bounds panic if the index is beyond the slice length. Add bounds checking before accessing es.seqByEventType[eventType][index].

Suggested change
return es.seqByEventType[eventType][index] <= es.cutoff
seqs, ok := es.seqByEventType[eventType]
if !ok || index < 0 || index >= len(seqs) {
return false
}
return seqs[index] <= es.cutoff

Copilot uses AI. Check for mistakes.
}

// clientEntity is a wrapper for a mongo.Client object that also holds additional information required during test
// execution.
type clientEntity struct {
Expand Down Expand Up @@ -72,30 +116,8 @@ type clientEntity struct {

entityMap *EntityMap

logQueue chan orderedLogMessage
}

// awaitMinimumPoolSize waits for the client's connection pool to reach the
// specified minimum size. This is a best effort operation that times out after
// some predefined amount of time to avoid blocking tests indefinitely.
func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64) error {
// Don't spend longer than 500ms awaiting minPoolSize.
awaitCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
defer cancel()

ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-awaitCtx.Done():
return fmt.Errorf("timed out waiting for client to reach minPoolSize")
case <-ticker.C:
if uint64(entity.eventsCount[connectionReadyEvent]) >= minPoolSize {
return nil
}
}
}
logQueue chan orderedLogMessage
eventSequencer eventSequencer
}

func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOptions) (*clientEntity, error) {
Expand All @@ -118,6 +140,9 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
serverDescriptionChangedEventsCount: make(map[serverDescriptionChangedEventInfo]int32),
entityMap: em,
observeSensitiveCommands: entityOptions.ObserveSensitiveCommands,
eventSequencer: eventSequencer{
seqByEventType: make(map[monitoringEventType][]int64),
},
}
entity.setRecordEvents(true)

Expand Down Expand Up @@ -226,8 +251,9 @@ func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOp
return nil, fmt.Errorf("error creating mongo.Client: %w", err)
}

if entityOptions.AwaitMinPoolSize && clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 {
if err := awaitMinimumPoolSize(ctx, entity, *clientOpts.MinPoolSize); err != nil {
if entityOptions.AwaitMinPoolSizeMS != nil && *entityOptions.AwaitMinPoolSizeMS > 0 &&
clientOpts.MinPoolSize != nil && *clientOpts.MinPoolSize > 0 {
if err := awaitMinimumPoolSize(ctx, entity, *clientOpts.MinPoolSize, *entityOptions.AwaitMinPoolSizeMS); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -326,8 +352,21 @@ func (c *clientEntity) failedEvents() []*event.CommandFailedEvent {
return events
}

func (c *clientEntity) poolEvents() []*event.PoolEvent {
return c.pooled
// filterEventsBySeq filters events by sequence number using the provided
// sequence slice. See comments on eventSequencer for more details.
func filterEventsBySeq[T any](c *clientEntity, events []T, seqSlice []int64) []T {
if c.eventSequencer.cutoff == 0 {
return events
}

var filtered []T
for i, evt := range events {
if seqSlice[i] > c.eventSequencer.cutoff {
filtered = append(filtered, evt)
}
}
Comment on lines +358 to +367
Copy link

Copilot AI Nov 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential index out of bounds panic if seqSlice length doesn't match events length. Add a length check before the loop or validate that i < len(seqSlice) in the loop condition.

Copilot uses AI. Check for mistakes.

return filtered
}

func (c *clientEntity) numberConnectionsCheckedOut() int32 {
Expand Down Expand Up @@ -517,6 +556,7 @@ func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) {
eventType := monitoringEventTypeFromPoolEvent(evt)
if _, ok := c.observedEvents[eventType]; ok {
c.pooled = append(c.pooled, evt)
c.eventSequencer.recordPooledEvent()
}

c.addEventsCount(eventType)
Expand All @@ -539,6 +579,7 @@ func (c *clientEntity) processServerDescriptionChangedEvent(evt *event.ServerDes

if _, ok := c.observedEvents[serverDescriptionChangedEvent]; ok {
c.serverDescriptionChanged = append(c.serverDescriptionChanged, evt)
c.eventSequencer.recordEvent(serverDescriptionChangedEvent)
}

// Record object-specific unified spec test data on an event.
Expand All @@ -558,6 +599,7 @@ func (c *clientEntity) processServerHeartbeatFailedEvent(evt *event.ServerHeartb

if _, ok := c.observedEvents[serverHeartbeatFailedEvent]; ok {
c.serverHeartbeatFailedEvent = append(c.serverHeartbeatFailedEvent, evt)
c.eventSequencer.recordEvent(serverHeartbeatFailedEvent)
}

c.addEventsCount(serverHeartbeatFailedEvent)
Expand All @@ -573,6 +615,7 @@ func (c *clientEntity) processServerHeartbeatStartedEvent(evt *event.ServerHeart

if _, ok := c.observedEvents[serverHeartbeatStartedEvent]; ok {
c.serverHeartbeatStartedEvent = append(c.serverHeartbeatStartedEvent, evt)
c.eventSequencer.recordEvent(serverHeartbeatStartedEvent)
}

c.addEventsCount(serverHeartbeatStartedEvent)
Expand All @@ -588,6 +631,7 @@ func (c *clientEntity) processServerHeartbeatSucceededEvent(evt *event.ServerHea

if _, ok := c.observedEvents[serverHeartbeatSucceededEvent]; ok {
c.serverHeartbeatSucceeded = append(c.serverHeartbeatSucceeded, evt)
c.eventSequencer.recordEvent(serverHeartbeatSucceededEvent)
}

c.addEventsCount(serverHeartbeatSucceededEvent)
Expand All @@ -603,6 +647,7 @@ func (c *clientEntity) processTopologyDescriptionChangedEvent(evt *event.Topolog

if _, ok := c.observedEvents[topologyDescriptionChangedEvent]; ok {
c.topologyDescriptionChanged = append(c.topologyDescriptionChanged, evt)
c.eventSequencer.recordEvent(topologyDescriptionChangedEvent)
}

c.addEventsCount(topologyDescriptionChangedEvent)
Expand All @@ -618,6 +663,7 @@ func (c *clientEntity) processTopologyOpeningEvent(evt *event.TopologyOpeningEve

if _, ok := c.observedEvents[topologyOpeningEvent]; ok {
c.topologyOpening = append(c.topologyOpening, evt)
c.eventSequencer.recordEvent(topologyOpeningEvent)
}

c.addEventsCount(topologyOpeningEvent)
Expand All @@ -633,6 +679,7 @@ func (c *clientEntity) processTopologyClosedEvent(evt *event.TopologyClosedEvent

if _, ok := c.observedEvents[topologyClosedEvent]; ok {
c.topologyClosed = append(c.topologyClosed, evt)
c.eventSequencer.recordEvent(topologyClosedEvent)
}

c.addEventsCount(topologyClosedEvent)
Expand Down Expand Up @@ -724,3 +771,29 @@ func evaluateUseMultipleMongoses(clientOpts *options.ClientOptions, useMultipleM
}
return nil
}

// awaitMinimumPoolSize waits for the client's connection pool to reach the
// specified minimum size, then clears all CMAP and SDAM events that occurred
// during pool initialization.
func awaitMinimumPoolSize(ctx context.Context, entity *clientEntity, minPoolSize uint64, timeoutMS int) error {
awaitCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutMS)*time.Millisecond)
defer cancel()

ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-awaitCtx.Done():
return fmt.Errorf("timed out waiting for client to reach minPoolSize")
case <-ticker.C:
if uint64(entity.eventsCount[connectionReadyEvent]) >= minPoolSize {
// Clear all CMAP and SDAM events that occurred during pool
// initialization.
entity.eventSequencer.setCutoff()

return nil
}
}
}
}
Loading
Loading