Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions channels/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package channels
import (
"fmt"
"sort"
"strconv"
"strings"

"github.com/couchbase/sync_gateway/base"
Expand All @@ -32,21 +31,21 @@ const AllChannelWildcard = "*" // wildcard for 'all channels'

// ID represents a single channel inside a collection
type ID struct {
Name string // name of channel
CollectionID uint32 // collection it belongs to
serialization string // private method for logging and matching inside changeWaiter notification
Name string // name of channel
CollectionID uint32 // collection it belongs to
}

// String returns the channel name as a string, omitting the collection ID, so beware when using for logging purposes
// that the collection name is logged from the context on the log line
Comment on lines +38 to +39
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment warns about logging implications but doesn't specify what "context on the log line" means. Consider clarifying what context is expected to provide the collection ID information, or provide an example of proper usage.

Suggested change
// String returns the channel name as a string, omitting the collection ID, so beware when using for logging purposes
// that the collection name is logged from the context on the log line
// String returns only the channel name, and intentionally omits the collection ID.
// When using this in log messages, always include the collection identifier separately in the
// log context or format string (for example: "collection %d channel %s").

Copilot uses AI. Check for mistakes.
func (c ID) String() string {
return c.serialization
return c.Name
}

// NewID returns a new ChannelID
func NewID(channelName string, collectionID uint32) ID {
return ID{
Name: channelName,
CollectionID: collectionID,
serialization: strconv.FormatUint(uint64(collectionID), 10) + "." + base.UserDataPrefix + channelName + base.UserDataSuffix,
Name: channelName,
CollectionID: collectionID,
}
}

Expand Down Expand Up @@ -196,7 +195,7 @@ func (redactorSet RedactorSet) GetRedactionString(shouldRedact bool) string {
tmp := []byte("{")
iterationCount := 0
for setItem, _ := range redactorSet.set {
tmp = append(tmp, redactorSet.redactorFunc(setItem).String()...)
tmp = append(tmp, redactorSet.redactorFunc(setItem).Redact()...)
iterationCount++
if iterationCount != len(redactorSet.set) {
tmp = append(tmp, ", "...)
Expand Down
32 changes: 16 additions & 16 deletions channels/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,14 +373,14 @@ func TestSetString(t *testing.T) {
NewID("B", 2): present{},
NewID("C", 1): present{},
},
output: "{1.<ud>A</ud>, 1.<ud>C</ud>, 2.<ud>B</ud>}",
output: "{A, B, C}",
redactedOutput: []string{
"{1.<ud>A</ud>, 1.<ud>C</ud>, 2.<ud>B</ud>}",
"{1.<ud>A</ud>, 2.<ud>B</ud>, 1.<ud>C</ud>}",
"{1.<ud>C</ud>, 1.<ud>A</ud>, 2.<ud>B</ud>}",
"{1.<ud>C</ud>, 2.<ud>B</ud>, 1.<ud>A</ud>}",
"{2.<ud>B</ud>, 1.<ud>A</ud>, 1.<ud>C</ud>}",
"{2.<ud>B</ud>, 1.<ud>C</ud>, 1.<ud>A</ud>}",
"{<ud>A</ud>, <ud>C</ud>, <ud>B</ud>}",
"{<ud>A</ud>, <ud>B</ud>, <ud>C</ud>}",
"{<ud>C</ud>, <ud>A</ud>, <ud>B</ud>}",
"{<ud>C</ud>, <ud>B</ud>, <ud>A</ud>}",
"{<ud>B</ud>, <ud>A</ud>, <ud>C</ud>}",
"{<ud>B</ud>, <ud>C</ud>, <ud>A</ud>}",
},
},
{
Expand All @@ -390,23 +390,23 @@ func TestSetString(t *testing.T) {
NewID("B", 2): present{},
NewID("C", 1): present{},
},
output: "{1.<ud>C</ud>, 2.<ud>A</ud>, 2.<ud>B</ud>}",
output: "{A, B, C}",
redactedOutput: []string{
"{2.<ud>A</ud>, 1.<ud>C</ud>, 2.<ud>B</ud>}",
"{2.<ud>A</ud>, 2.<ud>B</ud>, 1.<ud>C</ud>}",
"{1.<ud>C</ud>, 2.<ud>A</ud>, 2.<ud>B</ud>}",
"{1.<ud>C</ud>, 2.<ud>B</ud>, 2.<ud>A</ud>}",
"{2.<ud>B</ud>, 2.<ud>A</ud>, 1.<ud>C</ud>}",
"{2.<ud>B</ud>, 1.<ud>C</ud>, 2.<ud>A</ud>}",
"{<ud>A</ud>, <ud>C</ud>, <ud>B</ud>}",
"{<ud>A</ud>, <ud>B</ud>, <ud>C</ud>}",
"{<ud>C</ud>, <ud>A</ud>, <ud>B</ud>}",
"{<ud>C</ud>, <ud>B</ud>, <ud>A</ud>}",
"{<ud>B</ud>, <ud>A</ud>, <ud>C</ud>}",
"{<ud>B</ud>, <ud>C</ud>, <ud>A</ud>}",
},
},
{
name: "one collection",
input: Set{
NewID("A", 1): present{},
},
output: "{1.<ud>A</ud>}",
redactedOutput: []string{"{1.<ud>A</ud>}"},
output: "{A}",
redactedOutput: []string{"{<ud>A</ud>}"},
},
}
for _, test := range testCases {
Expand Down
49 changes: 27 additions & 22 deletions db/change_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type changeListener struct {
FeedArgs sgbucket.FeedArguments // The Tap Args (backfill, etc)
counter uint64 // Event counter; increments on every doc update
_terminateCheckCounter uint64 // Termination Event counter; increments on every notifyCheckForTermination
keyCounts map[string]uint64 // Latest count at which each doc key was updated
keyCounts map[channels.ID]uint64 // Latest count at which each doc key was updated
OnChangeCallback DocChangedFunc
terminator chan bool // Signal to cause DCP feed to exit
broadcastChangesDoneChan chan struct{} // Channel to signal that broadcast changes goroutine has terminated
Expand All @@ -53,13 +53,16 @@ type changeListener struct {
// unusedSeqChannelID marks the unused sequence key for the channel cache. This is a marker that is global to all collections.
var unusedSeqChannelID = channels.NewID(unusedSeqKey, unusedSeqCollectionID)

// principalDocCollectionIDForChannelID is the collection ID for construction of channel ID for principal documents (users, roles).
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment should explain why collection ID 0 is used for principal documents and whether this is a special/reserved value. This magic number would benefit from additional context about its significance.

Suggested change
// principalDocCollectionIDForChannelID is the collection ID for construction of channel ID for principal documents (users, roles).
// principalDocCollectionIDForChannelID is the (synthetic) collection ID used when constructing
// channel IDs for principal documents (users, roles). Principals are global and are not stored
// in a normal data collection, so they are associated with this reserved collection ID (0) only
// for the purposes of channel identification. This value must not overlap any real collection ID.

Copilot uses AI. Check for mistakes.
const principalDocCollectionIDForChannelID = 0

type DocChangedFunc func(event sgbucket.FeedEvent, docType DocumentType)

func (listener *changeListener) Init(name string, groupID string, db *DatabaseContext) {
listener.bucketName = name
listener.counter = 1
listener._terminateCheckCounter = 0
listener.keyCounts = map[string]uint64{}
listener.keyCounts = map[channels.ID]uint64{}
listener.tapNotifier = sync.NewCond(&sync.Mutex{})
listener.sgCfgPrefix = db.MetadataKeys.SGCfgPrefix(groupID)
listener.metaKeys = db.MetadataKeys
Expand Down Expand Up @@ -181,7 +184,8 @@ func (listener *changeListener) ProcessFeedEvent(event sgbucket.FeedEvent) bool
docType := listener.DocumentType(event.Key)
if docType == DocTypeUser || docType == DocTypeRole {
// defer to notify after callback completion
defer listener.notifyKey(listener.ctx, string(event.Key))
key := channels.NewID(string(event.Key), principalDocCollectionIDForChannelID)
defer listener.notifyKey(listener.ctx, key)
}

listener.OnDocChanged(event, docType)
Expand Down Expand Up @@ -251,7 +255,7 @@ func (listener *changeListener) Notify(ctx context.Context, keys channels.Set) {
listener.tapNotifier.L.Lock()
listener.counter++
for key := range keys {
listener.keyCounts[key.String()] = listener.counter
listener.keyCounts[key] = listener.counter
}
base.DebugfCtx(ctx, base.KeyChanges, "Listener keys %q for %s have changed, count=%d",
base.UD(keys), base.MD(listener.bucketName), listener.counter)
Expand Down Expand Up @@ -309,7 +313,7 @@ func tickerValForBroadcastSpeed(skippedSequencePresent bool) time.Duration {
}

// Changes the counter, notifying waiting clients. Only use for a key update.
func (listener *changeListener) notifyKey(ctx context.Context, key string) {
func (listener *changeListener) notifyKey(ctx context.Context, key channels.ID) {
listener.tapNotifier.L.Lock()
listener.counter++
listener.keyCounts[key] = listener.counter
Expand Down Expand Up @@ -340,7 +344,7 @@ func (listener *changeListener) NotifyCheckForTermination(ctx context.Context, k
}

// Waits until either the counter, or terminateCheckCounter exceeds the given value. Returns the new counters.
func (listener *changeListener) Wait(ctx context.Context, keys []string, counter uint64, terminateCheckCounter uint64) (uint64, uint64) {
func (listener *changeListener) Wait(ctx context.Context, keys []channels.ID, counter uint64, terminateCheckCounter uint64) (uint64, uint64) {
listener.tapNotifier.L.Lock()
defer listener.tapNotifier.L.Unlock()
base.DebugfCtx(ctx, base.KeyChanges, "No new changes to send to change listener. Waiting for %q's count to pass %d",
Expand All @@ -367,13 +371,13 @@ func (listener *changeListener) Wait(ctx context.Context, keys []string, counter
}

// Returns the max value of the counter for all the given keys
func (listener *changeListener) CurrentCount(keys []string) uint64 {
func (listener *changeListener) CurrentCount(keys []channels.ID) uint64 {
listener.tapNotifier.L.Lock()
defer listener.tapNotifier.L.Unlock()
return listener._currentCount(keys)
}

func (listener *changeListener) _currentCount(keys []string) uint64 {
func (listener *changeListener) _currentCount(keys []channels.ID) uint64 {
var max uint64 = 0
for _, key := range keys {
if count := listener.keyCounts[key]; count > max {
Expand All @@ -389,23 +393,23 @@ func (listener *changeListener) _currentCount(keys []string) uint64 {
// listener's counter to increment from the value at the last call.
type ChangeWaiter struct {
listener *changeListener
keys []string
userKeys []string
keys []channels.ID
userKeys []channels.ID
lastCounter uint64
lastTerminateCheckCounter uint64
lastUserCount uint64
trackUnusedSequences bool // track unused sequences in Wait functions
}

// NewWaiter a new ChangeWaiter that will wait for changes for the given document keys, and will optionally track unused sequences.
func (listener *changeListener) NewWaiter(keys []string, trackUnusedSequences bool) *ChangeWaiter {
func (listener *changeListener) NewWaiter(keys []channels.ID, trackUnusedSequences bool) *ChangeWaiter {
listener.tapNotifier.L.Lock()
defer listener.tapNotifier.L.Unlock()
return listener._newWaiter(keys, trackUnusedSequences)
}

// _newWaiter a new ChangeWaiter that will wait for changes for the given document keys, and will optionally track unused sequences.
func (listener *changeListener) _newWaiter(keys []string, trackUnusedSequences bool) *ChangeWaiter {
func (listener *changeListener) _newWaiter(keys []channels.ID, trackUnusedSequences bool) *ChangeWaiter {
return &ChangeWaiter{
listener: listener,
keys: keys,
Expand All @@ -417,15 +421,16 @@ func (listener *changeListener) _newWaiter(keys []string, trackUnusedSequences b

// NewWaiterWithChannels creates ChangeWaiter for a given channel and user, and will optionally track unused sequences.
func (listener *changeListener) NewWaiterWithChannels(chans channels.Set, user auth.User, trackUnusedSequences bool) *ChangeWaiter {
waitKeys := make([]string, 0, 5)
waitKeys := make([]channels.ID, 0, 5)
for channel := range chans {
waitKeys = append(waitKeys, channel.String())
waitKeys = append(waitKeys, channel)
}
var userKeys []string
var userKeys []channels.ID
if user != nil {
userKeys = []string{listener.metaKeys.UserKey(user.Name())}
usrID := channels.NewID(listener.metaKeys.UserKey(user.Name()), principalDocCollectionIDForChannelID)
userKeys = []channels.ID{usrID}
for role := range user.RoleNames() {
userKeys = append(userKeys, listener.metaKeys.RoleKey(role))
userKeys = append(userKeys, channels.NewID(listener.metaKeys.RoleKey(role), principalDocCollectionIDForChannelID))
}
waitKeys = append(waitKeys, userKeys...)
}
Expand Down Expand Up @@ -480,12 +485,12 @@ func (waiter *ChangeWaiter) RefreshUserCount() bool {
func (waiter *ChangeWaiter) UpdateChannels(collectionID uint32, timedSet channels.TimedSet) {
// This capacity is not right can not accommodate channels without iteration.
initialCapacity := len(waiter.userKeys)
updatedKeys := make([]string, 0, initialCapacity)
updatedKeys := make([]channels.ID, 0, initialCapacity)
for channelName, _ := range timedSet {
updatedKeys = append(updatedKeys, channels.NewID(channelName, collectionID).String())
updatedKeys = append(updatedKeys, channels.NewID(channelName, collectionID))
}
if waiter.trackUnusedSequences {
updatedKeys = append(updatedKeys, unusedSeqChannelID.String())
updatedKeys = append(updatedKeys, unusedSeqChannelID)
}
if len(waiter.userKeys) > 0 {
updatedKeys = append(updatedKeys, waiter.userKeys...)
Expand All @@ -505,9 +510,9 @@ func (waiter *ChangeWaiter) RefreshUserKeys(user auth.User, metaKeys *base.Metad
if len(waiter.userKeys) == 1 && len(user.RoleNames()) == 0 {
return
}
waiter.userKeys = []string{metaKeys.UserKey(user.Name())}
waiter.userKeys = []channels.ID{channels.NewID(metaKeys.UserKey(user.Name()), principalDocCollectionIDForChannelID)}
for role := range user.RoleNames() {
waiter.userKeys = append(waiter.userKeys, metaKeys.RoleKey(role))
waiter.userKeys = append(waiter.userKeys, channels.NewID(metaKeys.RoleKey(role), principalDocCollectionIDForChannelID))
}
waiter.lastUserCount = waiter.listener.CurrentCount(waiter.userKeys)

Expand Down