diff --git a/StackExchange.Redis.sln.DotSettings b/StackExchange.Redis.sln.DotSettings index 216edbcca..8dd9095d9 100644 --- a/StackExchange.Redis.sln.DotSettings +++ b/StackExchange.Redis.sln.DotSettings @@ -12,9 +12,12 @@ True True True + True True True + True True + True True True True diff --git a/src/StackExchange.Redis/ChannelMessage.cs b/src/StackExchange.Redis/ChannelMessage.cs new file mode 100644 index 000000000..330aedee4 --- /dev/null +++ b/src/StackExchange.Redis/ChannelMessage.cs @@ -0,0 +1,64 @@ +namespace StackExchange.Redis; + +/// +/// Represents a message that is broadcast via publish/subscribe. +/// +public readonly struct ChannelMessage +{ + // this is *smaller* than storing a RedisChannel for the subscribed channel + private readonly ChannelMessageQueue _queue; + + /// + /// The Channel:Message string representation. + /// + public override string ToString() => ((string?)Channel) + ":" + ((string?)Message); + + /// + public override int GetHashCode() => Channel.GetHashCode() ^ Message.GetHashCode(); + + /// + public override bool Equals(object? obj) => obj is ChannelMessage cm + && cm.Channel == Channel && cm.Message == Message; + + internal ChannelMessage(ChannelMessageQueue queue, in RedisChannel channel, in RedisValue value) + { + _queue = queue; + _channel = channel; + _message = value; + } + + /// + /// The channel that the subscription was created from. + /// + public RedisChannel SubscriptionChannel => _queue.Channel; + + private readonly RedisChannel _channel; + + /// + /// The channel that the message was broadcast to. + /// + public RedisChannel Channel => _channel; + + private readonly RedisValue _message; + + /// + /// The value that was broadcast. + /// + public RedisValue Message => _message; + + /// + /// Checks if 2 messages are .Equal(). + /// + public static bool operator ==(ChannelMessage left, ChannelMessage right) => left.Equals(right); + + /// + /// Checks if 2 messages are not .Equal(). + /// + public static bool operator !=(ChannelMessage left, ChannelMessage right) => !left.Equals(right); + + /// + /// If the channel is either a keyspace or keyevent notification, resolve the key and event type. + /// + public bool TryParseKeyNotification(out KeyNotification notification) + => KeyNotification.TryParse(in _channel, in _message, out notification); +} diff --git a/src/StackExchange.Redis/ChannelMessageQueue.cs b/src/StackExchange.Redis/ChannelMessageQueue.cs index e58fb393b..9f962e52a 100644 --- a/src/StackExchange.Redis/ChannelMessageQueue.cs +++ b/src/StackExchange.Redis/ChannelMessageQueue.cs @@ -1,385 +1,353 @@ using System; +using System.Buffers.Text; using System.Collections.Generic; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; #if NETCOREAPP3_1 +using System.Diagnostics; using System.Reflection; #endif -namespace StackExchange.Redis +namespace StackExchange.Redis; + +/// +/// Represents a message queue of ordered pub/sub notifications. +/// +/// +/// To create a ChannelMessageQueue, use +/// or . +/// +public sealed class ChannelMessageQueue : IAsyncEnumerable { + private readonly Channel _queue; + /// - /// Represents a message that is broadcast via publish/subscribe. + /// The Channel that was subscribed for this queue. /// - public readonly struct ChannelMessage - { - // this is *smaller* than storing a RedisChannel for the subscribed channel - private readonly ChannelMessageQueue _queue; + public RedisChannel Channel { get; } - /// - /// The Channel:Message string representation. - /// - public override string ToString() => ((string?)Channel) + ":" + ((string?)Message); + private RedisSubscriber? _parent; - /// - public override int GetHashCode() => Channel.GetHashCode() ^ Message.GetHashCode(); - - /// - public override bool Equals(object? obj) => obj is ChannelMessage cm - && cm.Channel == Channel && cm.Message == Message; + /// + /// The string representation of this channel. + /// + public override string? ToString() => (string?)Channel; - internal ChannelMessage(ChannelMessageQueue queue, in RedisChannel channel, in RedisValue value) - { - _queue = queue; - Channel = channel; - Message = value; - } + /// + /// An awaitable task the indicates completion of the queue (including drain of data). + /// + public Task Completion => _queue.Reader.Completion; - /// - /// The channel that the subscription was created from. - /// - public RedisChannel SubscriptionChannel => _queue.Channel; - - /// - /// The channel that the message was broadcast to. - /// - public RedisChannel Channel { get; } - - /// - /// The value that was broadcast. - /// - public RedisValue Message { get; } - - /// - /// Checks if 2 messages are .Equal(). - /// - public static bool operator ==(ChannelMessage left, ChannelMessage right) => left.Equals(right); - - /// - /// Checks if 2 messages are not .Equal(). - /// - public static bool operator !=(ChannelMessage left, ChannelMessage right) => !left.Equals(right); + internal ChannelMessageQueue(in RedisChannel redisChannel, RedisSubscriber parent) + { + Channel = redisChannel; + _parent = parent; + _queue = System.Threading.Channels.Channel.CreateUnbounded(s_ChannelOptions); } - /// - /// Represents a message queue of ordered pub/sub notifications. - /// - /// - /// To create a ChannelMessageQueue, use - /// or . - /// - public sealed class ChannelMessageQueue : IAsyncEnumerable + private static readonly UnboundedChannelOptions s_ChannelOptions = new UnboundedChannelOptions { - private readonly Channel _queue; + SingleWriter = true, SingleReader = false, AllowSynchronousContinuations = false, + }; - /// - /// The Channel that was subscribed for this queue. - /// - public RedisChannel Channel { get; } - private RedisSubscriber? _parent; + private void Write(in RedisChannel channel, in RedisValue value) + { + var writer = _queue.Writer; + writer.TryWrite(new ChannelMessage(this, channel, value)); + } - /// - /// The string representation of this channel. - /// - public override string? ToString() => (string?)Channel; + /// + /// Consume a message from the channel. + /// + /// The to use. + public ValueTask ReadAsync(CancellationToken cancellationToken = default) + => _queue.Reader.ReadAsync(cancellationToken); - /// - /// An awaitable task the indicates completion of the queue (including drain of data). - /// - public Task Completion => _queue.Reader.Completion; + /// + /// Attempt to synchronously consume a message from the channel. + /// + /// The read from the Channel. + public bool TryRead(out ChannelMessage item) => _queue.Reader.TryRead(out item); - internal ChannelMessageQueue(in RedisChannel redisChannel, RedisSubscriber parent) + /// + /// Attempt to query the backlog length of the queue. + /// + /// The (approximate) count of items in the Channel. + public bool TryGetCount(out int count) + { + // This is specific to netcoreapp3.1, because full framework was out of band and the new prop is present +#if NETCOREAPP3_1 + // get this using the reflection + try { - Channel = redisChannel; - _parent = parent; - _queue = System.Threading.Channels.Channel.CreateUnbounded(s_ChannelOptions); + var prop = + _queue.GetType().GetProperty("ItemsCountForDebugger", BindingFlags.Instance | BindingFlags.NonPublic); + if (prop is not null) + { + count = (int)prop.GetValue(_queue)!; + return true; + } } - - private static readonly UnboundedChannelOptions s_ChannelOptions = new UnboundedChannelOptions + catch (Exception ex) { - SingleWriter = true, - SingleReader = false, - AllowSynchronousContinuations = false, - }; - - private void Write(in RedisChannel channel, in RedisValue value) + Debug.WriteLine(ex.Message); // but ignore + } +#else + var reader = _queue.Reader; + if (reader.CanCount) { - var writer = _queue.Writer; - writer.TryWrite(new ChannelMessage(this, channel, value)); + count = reader.Count; + return true; } +#endif - /// - /// Consume a message from the channel. - /// - /// The to use. - public ValueTask ReadAsync(CancellationToken cancellationToken = default) - => _queue.Reader.ReadAsync(cancellationToken); - - /// - /// Attempt to synchronously consume a message from the channel. - /// - /// The read from the Channel. - public bool TryRead(out ChannelMessage item) => _queue.Reader.TryRead(out item); - - /// - /// Attempt to query the backlog length of the queue. - /// - /// The (approximate) count of items in the Channel. - public bool TryGetCount(out int count) + count = 0; + return false; + } + + private Delegate? _onMessageHandler; + + private void AssertOnMessage(Delegate handler) + { + if (handler == null) throw new ArgumentNullException(nameof(handler)); + if (Interlocked.CompareExchange(ref _onMessageHandler, handler, null) != null) + throw new InvalidOperationException("Only a single " + nameof(OnMessage) + " is allowed"); + } + + /// + /// Create a message loop that processes messages sequentially. + /// + /// The handler to run when receiving a message. + public void OnMessage(Action handler) + { + AssertOnMessage(handler); + + ThreadPool.QueueUserWorkItem( + state => ((ChannelMessageQueue)state!).OnMessageSyncImpl().RedisFireAndForget(), this); + } + + private async Task OnMessageSyncImpl() + { + var handler = (Action?)_onMessageHandler; + while (!Completion.IsCompleted) { - // This is specific to netcoreapp3.1, because full framework was out of band and the new prop is present -#if NETCOREAPP3_1 - // get this using the reflection + ChannelMessage next; try { - var prop = _queue.GetType().GetProperty("ItemsCountForDebugger", BindingFlags.Instance | BindingFlags.NonPublic); - if (prop is not null) - { - count = (int)prop.GetValue(_queue)!; - return true; - } + if (!TryRead(out next)) next = await ReadAsync().ForAwait(); } - catch { } -#else - var reader = _queue.Reader; - if (reader.CanCount) + catch (ChannelClosedException) { break; } // expected + catch (Exception ex) { - count = reader.Count; - return true; + _parent?.multiplexer?.OnInternalError(ex); + break; } -#endif - count = default; - return false; + try { handler?.Invoke(next); } + catch { } // matches MessageCompletable } + } - private Delegate? _onMessageHandler; - private void AssertOnMessage(Delegate handler) + internal static void Combine(ref ChannelMessageQueue? head, ChannelMessageQueue queue) + { + if (queue != null) { - if (handler == null) throw new ArgumentNullException(nameof(handler)); - if (Interlocked.CompareExchange(ref _onMessageHandler, handler, null) != null) - throw new InvalidOperationException("Only a single " + nameof(OnMessage) + " is allowed"); + // insert at the start of the linked-list + ChannelMessageQueue? old; + do + { + old = Volatile.Read(ref head); + queue._next = old; + } + // format and validator disagree on newline... + while (Interlocked.CompareExchange(ref head, queue, old) != old); } + } - /// - /// Create a message loop that processes messages sequentially. - /// - /// The handler to run when receiving a message. - public void OnMessage(Action handler) - { - AssertOnMessage(handler); + /// + /// Create a message loop that processes messages sequentially. + /// + /// The handler to execute when receiving a message. + public void OnMessage(Func handler) + { + AssertOnMessage(handler); - ThreadPool.QueueUserWorkItem( - state => ((ChannelMessageQueue)state!).OnMessageSyncImpl().RedisFireAndForget(), this); - } + ThreadPool.QueueUserWorkItem( + state => ((ChannelMessageQueue)state!).OnMessageAsyncImpl().RedisFireAndForget(), this); + } - private async Task OnMessageSyncImpl() + internal static void Remove(ref ChannelMessageQueue? head, ChannelMessageQueue queue) + { + if (queue is null) { - var handler = (Action?)_onMessageHandler; - while (!Completion.IsCompleted) - { - ChannelMessage next; - try { if (!TryRead(out next)) next = await ReadAsync().ForAwait(); } - catch (ChannelClosedException) { break; } // expected - catch (Exception ex) - { - _parent?.multiplexer?.OnInternalError(ex); - break; - } - - try { handler?.Invoke(next); } - catch { } // matches MessageCompletable - } + return; } - internal static void Combine(ref ChannelMessageQueue? head, ChannelMessageQueue queue) + bool found; + // if we fail due to a conflict, re-do from start + do { - if (queue != null) + var current = Volatile.Read(ref head); + if (current == null) return; // no queue? nothing to do + if (current == queue) { - // insert at the start of the linked-list - ChannelMessageQueue? old; - do + found = true; + // found at the head - then we need to change the head + if (Interlocked.CompareExchange(ref head, Volatile.Read(ref current._next), current) == current) { - old = Volatile.Read(ref head); - queue._next = old; + return; // success } - while (Interlocked.CompareExchange(ref head, queue, old) != old); } - } - - /// - /// Create a message loop that processes messages sequentially. - /// - /// The handler to execute when receiving a message. - public void OnMessage(Func handler) - { - AssertOnMessage(handler); - - ThreadPool.QueueUserWorkItem( - state => ((ChannelMessageQueue)state!).OnMessageAsyncImpl().RedisFireAndForget(), this); - } - - internal static void Remove(ref ChannelMessageQueue? head, ChannelMessageQueue queue) - { - if (queue is null) + else { - return; - } - - bool found; - // if we fail due to a conflict, re-do from start - do - { - var current = Volatile.Read(ref head); - if (current == null) return; // no queue? nothing to do - if (current == queue) - { - found = true; - // found at the head - then we need to change the head - if (Interlocked.CompareExchange(ref head, Volatile.Read(ref current._next), current) == current) - { - return; // success - } - } - else + ChannelMessageQueue? previous = current; + current = Volatile.Read(ref previous._next); + found = false; + do { - ChannelMessageQueue? previous = current; - current = Volatile.Read(ref previous._next); - found = false; - do + if (current == queue) { - if (current == queue) + found = true; + // found it, not at the head; remove the node + if (Interlocked.CompareExchange( + ref previous._next, + Volatile.Read(ref current._next), + current) == current) { - found = true; - // found it, not at the head; remove the node - if (Interlocked.CompareExchange(ref previous._next, Volatile.Read(ref current._next), current) == current) - { - return; // success - } - else - { - break; // exit the inner loop, and repeat the outer loop - } + return; // success + } + else + { + break; // exit the inner loop, and repeat the outer loop } - previous = current; - current = Volatile.Read(ref previous!._next); } - while (current != null); + + previous = current; + current = Volatile.Read(ref previous!._next); } + // format and validator disagree on newline... + while (current != null); } - while (found); } + // format and validator disagree on newline... + while (found); + } - internal static int Count(ref ChannelMessageQueue? head) + internal static int Count(ref ChannelMessageQueue? head) + { + var current = Volatile.Read(ref head); + int count = 0; + while (current != null) { - var current = Volatile.Read(ref head); - int count = 0; - while (current != null) - { - count++; - current = Volatile.Read(ref current._next); - } - return count; + count++; + current = Volatile.Read(ref current._next); } - internal static void WriteAll(ref ChannelMessageQueue head, in RedisChannel channel, in RedisValue message) + return count; + } + + internal static void WriteAll(ref ChannelMessageQueue head, in RedisChannel channel, in RedisValue message) + { + var current = Volatile.Read(ref head); + while (current != null) { - var current = Volatile.Read(ref head); - while (current != null) - { - current.Write(channel, message); - current = Volatile.Read(ref current._next); - } + current.Write(channel, message); + current = Volatile.Read(ref current._next); } + } - private ChannelMessageQueue? _next; + private ChannelMessageQueue? _next; - private async Task OnMessageAsyncImpl() + private async Task OnMessageAsyncImpl() + { + var handler = (Func?)_onMessageHandler; + while (!Completion.IsCompleted) { - var handler = (Func?)_onMessageHandler; - while (!Completion.IsCompleted) + ChannelMessage next; + try { - ChannelMessage next; - try { if (!TryRead(out next)) next = await ReadAsync().ForAwait(); } - catch (ChannelClosedException) { break; } // expected - catch (Exception ex) - { - _parent?.multiplexer?.OnInternalError(ex); - break; - } - - try - { - var task = handler?.Invoke(next); - if (task != null && task.Status != TaskStatus.RanToCompletion) await task.ForAwait(); - } - catch { } // matches MessageCompletable + if (!TryRead(out next)) next = await ReadAsync().ForAwait(); + } + catch (ChannelClosedException) { break; } // expected + catch (Exception ex) + { + _parent?.multiplexer?.OnInternalError(ex); + break; } - } - internal static void MarkAllCompleted(ref ChannelMessageQueue? head) - { - var current = Interlocked.Exchange(ref head, null); - while (current != null) + try { - current.MarkCompleted(); - current = Volatile.Read(ref current._next); + var task = handler?.Invoke(next); + if (task != null && task.Status != TaskStatus.RanToCompletion) await task.ForAwait(); } + catch { } // matches MessageCompletable } + } - private void MarkCompleted(Exception? error = null) + internal static void MarkAllCompleted(ref ChannelMessageQueue? head) + { + var current = Interlocked.Exchange(ref head, null); + while (current != null) { - _parent = null; - _queue.Writer.TryComplete(error); + current.MarkCompleted(); + current = Volatile.Read(ref current._next); } + } - internal void UnsubscribeImpl(Exception? error = null, CommandFlags flags = CommandFlags.None) - { - var parent = _parent; - _parent = null; - parent?.UnsubscribeAsync(Channel, null, this, flags); - _queue.Writer.TryComplete(error); - } + private void MarkCompleted(Exception? error = null) + { + _parent = null; + _queue.Writer.TryComplete(error); + } - internal async Task UnsubscribeAsyncImpl(Exception? error = null, CommandFlags flags = CommandFlags.None) + internal void UnsubscribeImpl(Exception? error = null, CommandFlags flags = CommandFlags.None) + { + var parent = _parent; + _parent = null; + parent?.UnsubscribeAsync(Channel, null, this, flags); + _queue.Writer.TryComplete(error); + } + + internal async Task UnsubscribeAsyncImpl(Exception? error = null, CommandFlags flags = CommandFlags.None) + { + var parent = _parent; + _parent = null; + if (parent != null) { - var parent = _parent; - _parent = null; - if (parent != null) - { - await parent.UnsubscribeAsync(Channel, null, this, flags).ForAwait(); - } - _queue.Writer.TryComplete(error); + await parent.UnsubscribeAsync(Channel, null, this, flags).ForAwait(); } - /// - /// Stop receiving messages on this channel. - /// - /// The flags to use when unsubscribing. - public void Unsubscribe(CommandFlags flags = CommandFlags.None) => UnsubscribeImpl(null, flags); + _queue.Writer.TryComplete(error); + } - /// - /// Stop receiving messages on this channel. - /// - /// The flags to use when unsubscribing. - public Task UnsubscribeAsync(CommandFlags flags = CommandFlags.None) => UnsubscribeAsyncImpl(null, flags); + /// + /// Stop receiving messages on this channel. + /// + /// The flags to use when unsubscribing. + public void Unsubscribe(CommandFlags flags = CommandFlags.None) => UnsubscribeImpl(null, flags); - /// + /// + /// Stop receiving messages on this channel. + /// + /// The flags to use when unsubscribing. + public Task UnsubscribeAsync(CommandFlags flags = CommandFlags.None) => UnsubscribeAsyncImpl(null, flags); + + /// #if NETCOREAPP3_0_OR_GREATER - public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) - => _queue.Reader.ReadAllAsync().GetAsyncEnumerator(cancellationToken); + public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + // ReSharper disable once MethodSupportsCancellation - provided in GetAsyncEnumerator + => _queue.Reader.ReadAllAsync().GetAsyncEnumerator(cancellationToken); #else - public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) + { + while (await _queue.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) { - while (await _queue.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) + while (_queue.Reader.TryRead(out var item)) { - while (_queue.Reader.TryRead(out var item)) - { - yield return item; - } + yield return item; } } -#endif } +#endif } diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs index cc766338a..219ac7cb0 100644 --- a/src/StackExchange.Redis/ConnectionMultiplexer.cs +++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs @@ -730,6 +730,7 @@ private static ConnectionMultiplexer ConnectImpl(ConfigurationOptions configurat ReadOnlySpan IInternalConnectionMultiplexer.GetServerSnapshot() => _serverSnapshot.AsSpan(); internal ReadOnlySpan GetServerSnapshot() => _serverSnapshot.AsSpan(); + internal ReadOnlyMemory GetServerSnaphotMemory() => _serverSnapshot.AsMemory(); internal sealed class ServerSnapshot : IEnumerable { public static ServerSnapshot Empty { get; } = new ServerSnapshot(Array.Empty(), 0); diff --git a/src/StackExchange.Redis/KeyNotification.cs b/src/StackExchange.Redis/KeyNotification.cs new file mode 100644 index 000000000..f563019f3 --- /dev/null +++ b/src/StackExchange.Redis/KeyNotification.cs @@ -0,0 +1,244 @@ +using System; +using System.Buffers.Text; +using System.Diagnostics; +using static StackExchange.Redis.KeyNotificationChannels; +namespace StackExchange.Redis; + +/// +/// Represents keyspace and keyevent notifications. +/// +public readonly struct KeyNotification +{ + /// + /// If the channel is either a keyspace or keyevent notification, parsed the data. + /// + public static bool TryParse(in RedisChannel channel, in RedisValue value, out KeyNotification notification) + { + // validate that it looks reasonable + var span = channel.Span; + + // KeySpaceStart and KeyEventStart are the same size, see KeyEventPrefix_KeySpacePrefix_Length_Matches + if (span.Length >= KeySpacePrefix.Length + MinSuffixBytes) + { + // check that the prefix is valid, i.e. "__keyspace@" or "__keyevent@" + var prefix = span.Slice(0, KeySpacePrefix.Length); + var hash = prefix.Hash64(); + switch (hash) + { + case KeySpacePrefix.Hash when KeySpacePrefix.Is(hash, prefix): + case KeyEventPrefix.Hash when KeyEventPrefix.Is(hash, prefix): + // check that there is *something* non-empty after the prefix, with __: as the suffix (we don't verify *what*) + if (span.Slice(KeySpacePrefix.Length).IndexOf("__:"u8) > 0) + { + notification = new KeyNotification(in channel, in value); + return true; + } + + break; + } + } + + notification = default; + return false; + } + + private const int MinSuffixBytes = 5; // need "0__:x" or similar after prefix + + /// + /// The channel associated with this notification. + /// + public RedisChannel Channel => _channel; + + /// + /// The payload associated with this notification. + /// + public RedisValue Value => _value; + + // effectively we just wrap a channel, but: we've pre-validated that things make sense + private readonly RedisChannel _channel; + private readonly RedisValue _value; + + internal KeyNotification(in RedisChannel channel, in RedisValue value) + { + _channel = channel; + _value = value; + } + + /// + /// The database the key is in. If the database cannot be parsed, -1 is returned. + /// + public int Database + { + get + { + // prevalidated format, so we can just skip past the prefix (except for the default value) + if (_channel.IsNull) return -1; + var span = _channel.Span.Slice(KeySpacePrefix.Length); // also works for KeyEventPrefix + var end = span.IndexOf((byte)'_'); // expecting "__:foo" - we'll just stop at the underscore + if (end <= 0) return -1; + + span = span.Slice(0, end); + return Utf8Parser.TryParse(span, out int database, out var bytes) + && bytes == end ? database : -1; + } + } + + /// + /// The key associated with this event. + /// + /// Note that this will allocate a copy of the key bytes; to avoid allocations, + /// the and APIs can be used. + public RedisKey GetKey() + { + if (IsKeySpace) + { + // then the channel contains the key, and the payload contains the event-type + return ChannelSuffix.ToArray(); // create an isolated copy + } + + if (IsKeyEvent) + { + // then the channel contains the event-type, and the payload contains the key + return (byte[]?)Value; // todo: this could probably side-step + } + + return RedisKey.Null; + } + + /// + /// Get the number of bytes in the key. + /// + public int KeyByteCount + { + get + { + if (IsKeySpace) + { + return ChannelSuffix.Length; + } + + if (IsKeyEvent) + { + return _value.GetByteCount(); + } + + return 0; + } + } + + /// + /// Attempt to copy the bytes from the key to a buffer, returning the number of bytes written. + /// + public bool TryCopyKey(Span destination, out int bytesWritten) + { + if (IsKeySpace) + { + var suffix = ChannelSuffix; + bytesWritten = suffix.Length; // assume success + if (bytesWritten <= destination.Length) + { + suffix.CopyTo(destination); + return true; + } + } + + if (IsKeyEvent) + { + bytesWritten = _value.GetByteCount(); + if (bytesWritten <= destination.Length) + { + var tmp = _value.CopyTo(destination); + Debug.Assert(tmp == bytesWritten); + return true; + } + } + + bytesWritten = 0; + return false; + } + + /// + /// Get the portion of the channel after the "__{keyspace|keyevent}@{db}__:". + /// + private ReadOnlySpan ChannelSuffix + { + get + { + var span = _channel.Span; + var index = span.IndexOf("__:"u8); + return index > 0 ? span.Slice(index + 3) : default; + } + } + + /// + /// The type of notification associated with this event, if it is well-known - otherwise . + /// + /// Unexpected values can be processed manually from the and . + public KeyNotificationType Type + { + get + { + if (IsKeySpace) + { + // then the channel contains the key, and the payload contains the event-type + var count = _value.GetByteCount(); + if (count >= KeyNotificationTypeFastHash.MinBytes & count <= KeyNotificationTypeFastHash.MaxBytes) + { + if (_value.TryGetSpan(out var direct)) + { + return KeyNotificationTypeFastHash.Parse(direct); + } + else + { + Span localCopy = stackalloc byte[KeyNotificationTypeFastHash.MaxBytes]; + return KeyNotificationTypeFastHash.Parse(localCopy.Slice(0, _value.CopyTo(localCopy))); + } + } + } + + if (IsKeyEvent) + { + // then the channel contains the event-type, and the payload contains the key + return KeyNotificationTypeFastHash.Parse(ChannelSuffix); + } + return KeyNotificationType.Unknown; + } + } + + /// + /// Indicates whether this notification originated from a keyspace notification, for example __keyspace@4__:mykey with payload set. + /// + public bool IsKeySpace + { + get + { + var span = _channel.Span; + return span.Length >= KeySpacePrefix.Length + MinSuffixBytes && KeySpacePrefix.Is(span.Hash64(), span.Slice(0, KeySpacePrefix.Length)); + } + } + + /// + /// Indicates whether this notification originated from a keyevent notification, for example __keyevent@4__:set with payload mykey. + /// + public bool IsKeyEvent + { + get + { + var span = _channel.Span; + return span.Length >= KeyEventPrefix.Length + MinSuffixBytes && KeyEventPrefix.Is(span.Hash64(), span.Slice(0, KeyEventPrefix.Length)); + } + } +} + +internal static partial class KeyNotificationChannels +{ + [FastHash("__keyspace@")] + internal static partial class KeySpacePrefix + { + } + + [FastHash("__keyevent@")] + internal static partial class KeyEventPrefix + { + } +} diff --git a/src/StackExchange.Redis/KeyNotificationType.cs b/src/StackExchange.Redis/KeyNotificationType.cs new file mode 100644 index 000000000..cc4c74ef1 --- /dev/null +++ b/src/StackExchange.Redis/KeyNotificationType.cs @@ -0,0 +1,69 @@ +namespace StackExchange.Redis; + +/// +/// The type of keyspace or keyevent notification. +/// +public enum KeyNotificationType +{ + // note: initially presented alphabetically, but: new values *must* be appended, not inserted + // (to preserve values of existing elements) +#pragma warning disable CS1591 // docs, redundant + Unknown = 0, + Append = 1, + Copy = 2, + Del = 3, + Expire = 4, + HDel = 5, + HExpired = 6, + HIncrByFloat = 7, + HIncrBy = 8, + HPersist = 9, + HSet = 10, + IncrByFloat = 11, + IncrBy = 12, + LInsert = 13, + LPop = 14, + LPush = 15, + LRem = 16, + LSet = 17, + LTrim = 18, + MoveFrom = 19, + MoveTo = 20, + Persist = 21, + RenameFrom = 22, + RenameTo = 23, + Restore = 24, + RPop = 25, + RPush = 26, + SAdd = 27, + Set = 28, + SetRange = 29, + SortStore = 30, + SRem = 31, + SPop = 32, + XAdd = 33, + XDel = 34, + XGroupCreateConsumer = 35, + XGroupCreate = 36, + XGroupDelConsumer = 37, + XGroupDestroy = 38, + XGroupSetId = 39, + XSetId = 40, + XTrim = 41, + ZAdd = 42, + ZDiffStore = 43, + ZInterStore = 44, + ZUnionStore = 45, + ZIncr = 46, + ZRemByRank = 47, + ZRemByScore = 48, + ZRem = 49, + + // side-effect notifications + Expired = 1000, + Evicted = 1001, + New = 1002, + Overwritten = 1003, + TypeChanged = 1004, // type_changed +#pragma warning restore CS1591 // docs, redundant +} diff --git a/src/StackExchange.Redis/KeyNotificationTypeFastHash.cs b/src/StackExchange.Redis/KeyNotificationTypeFastHash.cs new file mode 100644 index 000000000..bcf08bad2 --- /dev/null +++ b/src/StackExchange.Redis/KeyNotificationTypeFastHash.cs @@ -0,0 +1,413 @@ +using System; + +namespace StackExchange.Redis; + +/// +/// Internal helper type for fast parsing of key notification types, using [FastHash]. +/// +internal static partial class KeyNotificationTypeFastHash +{ + // these are checked by KeyNotificationTypeFastHash_MinMaxBytes_ReflectsActualLengths + public const int MinBytes = 3, MaxBytes = 21; + + public static KeyNotificationType Parse(ReadOnlySpan value) + { + var hash = value.Hash64(); + return hash switch + { + append.Hash when append.Is(hash, value) => KeyNotificationType.Append, + copy.Hash when copy.Is(hash, value) => KeyNotificationType.Copy, + del.Hash when del.Is(hash, value) => KeyNotificationType.Del, + expire.Hash when expire.Is(hash, value) => KeyNotificationType.Expire, + hdel.Hash when hdel.Is(hash, value) => KeyNotificationType.HDel, + hexpired.Hash when hexpired.Is(hash, value) => KeyNotificationType.HExpired, + hincrbyfloat.Hash when hincrbyfloat.Is(hash, value) => KeyNotificationType.HIncrByFloat, + hincrby.Hash when hincrby.Is(hash, value) => KeyNotificationType.HIncrBy, + hpersist.Hash when hpersist.Is(hash, value) => KeyNotificationType.HPersist, + hset.Hash when hset.Is(hash, value) => KeyNotificationType.HSet, + incrbyfloat.Hash when incrbyfloat.Is(hash, value) => KeyNotificationType.IncrByFloat, + incrby.Hash when incrby.Is(hash, value) => KeyNotificationType.IncrBy, + linsert.Hash when linsert.Is(hash, value) => KeyNotificationType.LInsert, + lpop.Hash when lpop.Is(hash, value) => KeyNotificationType.LPop, + lpush.Hash when lpush.Is(hash, value) => KeyNotificationType.LPush, + lrem.Hash when lrem.Is(hash, value) => KeyNotificationType.LRem, + lset.Hash when lset.Is(hash, value) => KeyNotificationType.LSet, + ltrim.Hash when ltrim.Is(hash, value) => KeyNotificationType.LTrim, + move_from.Hash when move_from.Is(hash, value) => KeyNotificationType.MoveFrom, + move_to.Hash when move_to.Is(hash, value) => KeyNotificationType.MoveTo, + persist.Hash when persist.Is(hash, value) => KeyNotificationType.Persist, + rename_from.Hash when rename_from.Is(hash, value) => KeyNotificationType.RenameFrom, + rename_to.Hash when rename_to.Is(hash, value) => KeyNotificationType.RenameTo, + restore.Hash when restore.Is(hash, value) => KeyNotificationType.Restore, + rpop.Hash when rpop.Is(hash, value) => KeyNotificationType.RPop, + rpush.Hash when rpush.Is(hash, value) => KeyNotificationType.RPush, + sadd.Hash when sadd.Is(hash, value) => KeyNotificationType.SAdd, + set.Hash when set.Is(hash, value) => KeyNotificationType.Set, + setrange.Hash when setrange.Is(hash, value) => KeyNotificationType.SetRange, + sortstore.Hash when sortstore.Is(hash, value) => KeyNotificationType.SortStore, + srem.Hash when srem.Is(hash, value) => KeyNotificationType.SRem, + spop.Hash when spop.Is(hash, value) => KeyNotificationType.SPop, + xadd.Hash when xadd.Is(hash, value) => KeyNotificationType.XAdd, + xdel.Hash when xdel.Is(hash, value) => KeyNotificationType.XDel, + xgroup_createconsumer.Hash when xgroup_createconsumer.Is(hash, value) => KeyNotificationType.XGroupCreateConsumer, + xgroup_create.Hash when xgroup_create.Is(hash, value) => KeyNotificationType.XGroupCreate, + xgroup_delconsumer.Hash when xgroup_delconsumer.Is(hash, value) => KeyNotificationType.XGroupDelConsumer, + xgroup_destroy.Hash when xgroup_destroy.Is(hash, value) => KeyNotificationType.XGroupDestroy, + xgroup_setid.Hash when xgroup_setid.Is(hash, value) => KeyNotificationType.XGroupSetId, + xsetid.Hash when xsetid.Is(hash, value) => KeyNotificationType.XSetId, + xtrim.Hash when xtrim.Is(hash, value) => KeyNotificationType.XTrim, + zadd.Hash when zadd.Is(hash, value) => KeyNotificationType.ZAdd, + zdiffstore.Hash when zdiffstore.Is(hash, value) => KeyNotificationType.ZDiffStore, + zinterstore.Hash when zinterstore.Is(hash, value) => KeyNotificationType.ZInterStore, + zunionstore.Hash when zunionstore.Is(hash, value) => KeyNotificationType.ZUnionStore, + zincr.Hash when zincr.Is(hash, value) => KeyNotificationType.ZIncr, + zrembyrank.Hash when zrembyrank.Is(hash, value) => KeyNotificationType.ZRemByRank, + zrembyscore.Hash when zrembyscore.Is(hash, value) => KeyNotificationType.ZRemByScore, + zrem.Hash when zrem.Is(hash, value) => KeyNotificationType.ZRem, + expired.Hash when expired.Is(hash, value) => KeyNotificationType.Expired, + evicted.Hash when evicted.Is(hash, value) => KeyNotificationType.Evicted, + _new.Hash when _new.Is(hash, value) => KeyNotificationType.New, + overwritten.Hash when overwritten.Is(hash, value) => KeyNotificationType.Overwritten, + type_changed.Hash when type_changed.Is(hash, value) => KeyNotificationType.TypeChanged, + _ => KeyNotificationType.Unknown, + }; + } + + internal static ReadOnlySpan GetRawBytes(KeyNotificationType type) + { + return type switch + { + KeyNotificationType.Append => append.U8, + KeyNotificationType.Copy => copy.U8, + KeyNotificationType.Del => del.U8, + KeyNotificationType.Expire => expire.U8, + KeyNotificationType.HDel => hdel.U8, + KeyNotificationType.HExpired => hexpired.U8, + KeyNotificationType.HIncrByFloat => hincrbyfloat.U8, + KeyNotificationType.HIncrBy => hincrby.U8, + KeyNotificationType.HPersist => hpersist.U8, + KeyNotificationType.HSet => hset.U8, + KeyNotificationType.IncrByFloat => incrbyfloat.U8, + KeyNotificationType.IncrBy => incrby.U8, + KeyNotificationType.LInsert => linsert.U8, + KeyNotificationType.LPop => lpop.U8, + KeyNotificationType.LPush => lpush.U8, + KeyNotificationType.LRem => lrem.U8, + KeyNotificationType.LSet => lset.U8, + KeyNotificationType.LTrim => ltrim.U8, + KeyNotificationType.MoveFrom => move_from.U8, + KeyNotificationType.MoveTo => move_to.U8, + KeyNotificationType.Persist => persist.U8, + KeyNotificationType.RenameFrom => rename_from.U8, + KeyNotificationType.RenameTo => rename_to.U8, + KeyNotificationType.Restore => restore.U8, + KeyNotificationType.RPop => rpop.U8, + KeyNotificationType.RPush => rpush.U8, + KeyNotificationType.SAdd => sadd.U8, + KeyNotificationType.Set => set.U8, + KeyNotificationType.SetRange => setrange.U8, + KeyNotificationType.SortStore => sortstore.U8, + KeyNotificationType.SRem => srem.U8, + KeyNotificationType.SPop => spop.U8, + KeyNotificationType.XAdd => xadd.U8, + KeyNotificationType.XDel => xdel.U8, + KeyNotificationType.XGroupCreateConsumer => xgroup_createconsumer.U8, + KeyNotificationType.XGroupCreate => xgroup_create.U8, + KeyNotificationType.XGroupDelConsumer => xgroup_delconsumer.U8, + KeyNotificationType.XGroupDestroy => xgroup_destroy.U8, + KeyNotificationType.XGroupSetId => xgroup_setid.U8, + KeyNotificationType.XSetId => xsetid.U8, + KeyNotificationType.XTrim => xtrim.U8, + KeyNotificationType.ZAdd => zadd.U8, + KeyNotificationType.ZDiffStore => zdiffstore.U8, + KeyNotificationType.ZInterStore => zinterstore.U8, + KeyNotificationType.ZUnionStore => zunionstore.U8, + KeyNotificationType.ZIncr => zincr.U8, + KeyNotificationType.ZRemByRank => zrembyrank.U8, + KeyNotificationType.ZRemByScore => zrembyscore.U8, + KeyNotificationType.ZRem => zrem.U8, + KeyNotificationType.Expired => expired.U8, + KeyNotificationType.Evicted => evicted.U8, + KeyNotificationType.New => _new.U8, + KeyNotificationType.Overwritten => overwritten.U8, + KeyNotificationType.TypeChanged => type_changed.U8, + _ => Throw(), + }; + static ReadOnlySpan Throw() => throw new ArgumentOutOfRangeException(nameof(type)); + } + +#pragma warning disable SA1300, CS8981 + // ReSharper disable InconsistentNaming + [FastHash] + internal static partial class append + { + } + + [FastHash] + internal static partial class copy + { + } + + [FastHash] + internal static partial class del + { + } + + [FastHash] + internal static partial class expire + { + } + + [FastHash] + internal static partial class hdel + { + } + + [FastHash] + internal static partial class hexpired + { + } + + [FastHash] + internal static partial class hincrbyfloat + { + } + + [FastHash] + internal static partial class hincrby + { + } + + [FastHash] + internal static partial class hpersist + { + } + + [FastHash] + internal static partial class hset + { + } + + [FastHash] + internal static partial class incrbyfloat + { + } + + [FastHash] + internal static partial class incrby + { + } + + [FastHash] + internal static partial class linsert + { + } + + [FastHash] + internal static partial class lpop + { + } + + [FastHash] + internal static partial class lpush + { + } + + [FastHash] + internal static partial class lrem + { + } + + [FastHash] + internal static partial class lset + { + } + + [FastHash] + internal static partial class ltrim + { + } + + [FastHash("move_from")] // by default, the generator interprets underscore as hyphen + internal static partial class move_from + { + } + + [FastHash("move_to")] // by default, the generator interprets underscore as hyphen + internal static partial class move_to + { + } + + [FastHash] + internal static partial class persist + { + } + + [FastHash("rename_from")] // by default, the generator interprets underscore as hyphen + internal static partial class rename_from + { + } + + [FastHash("rename_to")] // by default, the generator interprets underscore as hyphen + internal static partial class rename_to + { + } + + [FastHash] + internal static partial class restore + { + } + + [FastHash] + internal static partial class rpop + { + } + + [FastHash] + internal static partial class rpush + { + } + + [FastHash] + internal static partial class sadd + { + } + + [FastHash] + internal static partial class set + { + } + + [FastHash] + internal static partial class setrange + { + } + + [FastHash] + internal static partial class sortstore + { + } + + [FastHash] + internal static partial class srem + { + } + + [FastHash] + internal static partial class spop + { + } + + [FastHash] + internal static partial class xadd + { + } + + [FastHash] + internal static partial class xdel + { + } + + [FastHash] // note: becomes hyphenated + internal static partial class xgroup_createconsumer + { + } + + [FastHash] // note: becomes hyphenated + internal static partial class xgroup_create + { + } + + [FastHash] // note: becomes hyphenated + internal static partial class xgroup_delconsumer + { + } + + [FastHash] // note: becomes hyphenated + internal static partial class xgroup_destroy + { + } + + [FastHash] // note: becomes hyphenated + internal static partial class xgroup_setid + { + } + + [FastHash] + internal static partial class xsetid + { + } + + [FastHash] + internal static partial class xtrim + { + } + + [FastHash] + internal static partial class zadd + { + } + + [FastHash] + internal static partial class zdiffstore + { + } + + [FastHash] + internal static partial class zinterstore + { + } + + [FastHash] + internal static partial class zunionstore + { + } + + [FastHash] + internal static partial class zincr + { + } + + [FastHash] + internal static partial class zrembyrank + { + } + + [FastHash] + internal static partial class zrembyscore + { + } + + [FastHash] + internal static partial class zrem + { + } + + [FastHash] + internal static partial class expired + { + } + + [FastHash] + internal static partial class evicted + { + } + + [FastHash("new")] + internal static partial class _new // it isn't worth making the code-gen keyword aware + { + } + + [FastHash] + internal static partial class overwritten + { + } + + [FastHash("type_changed")] // by default, the generator interprets underscore as hyphen + internal static partial class type_changed + { + } + + // ReSharper restore InconsistentNaming +#pragma warning restore SA1300, CS8981 +} diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt index 91b0e1a43..871fe71f0 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt @@ -1 +1,74 @@ -#nullable enable \ No newline at end of file +#nullable enable +StackExchange.Redis.KeyNotification +StackExchange.Redis.KeyNotification.Channel.get -> StackExchange.Redis.RedisChannel +StackExchange.Redis.KeyNotification.Value.get -> StackExchange.Redis.RedisValue +StackExchange.Redis.KeyNotification.Database.get -> int +StackExchange.Redis.KeyNotification.GetKey() -> StackExchange.Redis.RedisKey +StackExchange.Redis.KeyNotification.IsKeyEvent.get -> bool +StackExchange.Redis.KeyNotification.IsKeySpace.get -> bool +StackExchange.Redis.KeyNotification.KeyByteCount.get -> int +StackExchange.Redis.KeyNotification.KeyNotification() -> void +StackExchange.Redis.KeyNotification.TryCopyKey(System.Span destination, out int bytesWritten) -> bool +StackExchange.Redis.KeyNotification.Type.get -> StackExchange.Redis.KeyNotificationType +static StackExchange.Redis.KeyNotification.TryParse(in StackExchange.Redis.RedisChannel channel, in StackExchange.Redis.RedisValue value, out StackExchange.Redis.KeyNotification notification) -> bool +StackExchange.Redis.ChannelMessage.TryParseKeyNotification(out StackExchange.Redis.KeyNotification notification) -> bool +static StackExchange.Redis.RedisChannel.KeyEvent(StackExchange.Redis.KeyNotificationType type, int? database = null) -> StackExchange.Redis.RedisChannel +static StackExchange.Redis.RedisChannel.KeyEvent(System.ReadOnlySpan type, int? database) -> StackExchange.Redis.RedisChannel +static StackExchange.Redis.RedisChannel.KeySpace(in StackExchange.Redis.RedisKey key, int database) -> StackExchange.Redis.RedisChannel +static StackExchange.Redis.RedisChannel.KeySpacePattern(in StackExchange.Redis.RedisKey pattern, int? database = null) -> StackExchange.Redis.RedisChannel +StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Append = 1 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Copy = 2 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Del = 3 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Evicted = 1001 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Expire = 4 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Expired = 1000 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.HDel = 5 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.HExpired = 6 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.HIncrBy = 8 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.HIncrByFloat = 7 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.HPersist = 9 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.HSet = 10 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.IncrBy = 12 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.IncrByFloat = 11 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.LInsert = 13 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.LPop = 14 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.LPush = 15 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.LRem = 16 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.LSet = 17 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.LTrim = 18 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.MoveFrom = 19 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.MoveTo = 20 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.New = 1002 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Overwritten = 1003 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Persist = 21 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.RenameFrom = 22 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.RenameTo = 23 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Restore = 24 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.RPop = 25 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.RPush = 26 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.SAdd = 27 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Set = 28 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.SetRange = 29 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.SortStore = 30 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.SPop = 32 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.SRem = 31 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.TypeChanged = 1004 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.Unknown = 0 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.XAdd = 33 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.XDel = 34 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.XGroupCreate = 36 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.XGroupCreateConsumer = 35 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.XGroupDelConsumer = 37 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.XGroupDestroy = 38 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.XGroupSetId = 39 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.XSetId = 40 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.XTrim = 41 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.ZAdd = 42 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.ZDiffStore = 43 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.ZIncr = 46 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.ZInterStore = 44 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.ZRem = 49 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.ZRemByRank = 47 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.ZRemByScore = 48 -> StackExchange.Redis.KeyNotificationType +StackExchange.Redis.KeyNotificationType.ZUnionStore = 45 -> StackExchange.Redis.KeyNotificationType diff --git a/src/StackExchange.Redis/RedisChannel.cs b/src/StackExchange.Redis/RedisChannel.cs index d4289f3c6..7b208bb9d 100644 --- a/src/StackExchange.Redis/RedisChannel.cs +++ b/src/StackExchange.Redis/RedisChannel.cs @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Text; namespace StackExchange.Redis @@ -10,6 +11,8 @@ namespace StackExchange.Redis { internal readonly byte[]? Value; + internal ReadOnlySpan Span => Value is null ? default : Value.AsSpan(); + internal readonly RedisChannelOptions Options; [Flags] @@ -19,19 +22,36 @@ internal enum RedisChannelOptions Pattern = 1 << 0, Sharded = 1 << 1, KeyRouted = 1 << 2, + MultiNode = 1 << 3, } // we don't consider Routed for equality - it's an implementation detail, not a fundamental feature - private const RedisChannelOptions EqualityMask = ~RedisChannelOptions.KeyRouted; + private const RedisChannelOptions EqualityMask = + ~(RedisChannelOptions.KeyRouted | RedisChannelOptions.MultiNode); - internal RedisCommand PublishCommand => IsSharded ? RedisCommand.SPUBLISH : RedisCommand.PUBLISH; + internal RedisCommand GetPublishCommand() + { + return (Options & (RedisChannelOptions.Sharded | RedisChannelOptions.MultiNode)) switch + { + RedisChannelOptions.None => RedisCommand.PUBLISH, + RedisChannelOptions.Sharded => RedisCommand.SPUBLISH, + _ => ThrowKeyRouted(), + }; + + static RedisCommand ThrowKeyRouted() => throw new InvalidOperationException("Publishing is not supported for multi-node channels"); + } /// - /// Should we use cluster routing for this channel? This applies *either* to sharded (SPUBLISH) scenarios, + /// Should we use cluster routing for this channel? This applies *either* to sharded (SPUBLISH) scenarios, /// or to scenarios using . /// internal bool IsKeyRouted => (Options & RedisChannelOptions.KeyRouted) != 0; + /// + /// Should this channel be subscribed to on all nodes? This is only relevant for cluster scenarios and keyspace notifications. + /// + internal bool IsMultiNode => (Options & RedisChannelOptions.MultiNode) != 0; + /// /// Indicates whether the channel-name is either null or a zero-length value. /// @@ -58,6 +78,7 @@ public static bool UseImplicitAutoPattern get => s_DefaultPatternMode == PatternMode.Auto; set => s_DefaultPatternMode = value ? PatternMode.Auto : PatternMode.Literal; } + private static PatternMode s_DefaultPatternMode = PatternMode.Auto; /// @@ -82,7 +103,13 @@ public static bool UseImplicitAutoPattern /// a consideration. /// /// Note that channels from Sharded are always routed. - public RedisChannel WithKeyRouting() => new(Value, Options | RedisChannelOptions.KeyRouted); + public RedisChannel WithKeyRouting() + { + if (IsMultiNode) Throw(); + return new(Value, Options | RedisChannelOptions.KeyRouted); + + static void Throw() => throw new InvalidOperationException("Key routing is not supported for multi-node channels"); + } /// /// Creates a new that acts as a wildcard subscription. In cluster @@ -105,7 +132,8 @@ public static bool UseImplicitAutoPattern /// /// The name of the channel to create. /// The mode for name matching. - public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode) ? RedisChannelOptions.Pattern : RedisChannelOptions.None) + public RedisChannel(byte[]? value, PatternMode mode) : this( + value, DeterminePatternBased(value, mode) ? RedisChannelOptions.Pattern : RedisChannelOptions.None) { } @@ -115,7 +143,9 @@ public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatt /// The string name of the channel to create. /// The mode for name matching. // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract - public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode) + public RedisChannel(string value, PatternMode mode) : this( + // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract + value is null ? null : Encoding.UTF8.GetBytes(value), mode) { } @@ -128,7 +158,8 @@ public RedisChannel(string value, PatternMode mode) : this(value is null ? null /// The name of the channel to create. /// Note that sharded subscriptions are completely separate to regular subscriptions; subscriptions /// using sharded channels must also be published with sharded channels (and vice versa). - public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted); + public static RedisChannel Sharded(byte[]? value) => + new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted); /// /// Create a new redis channel from a string, representing a sharded channel. In cluster @@ -139,7 +170,112 @@ public RedisChannel(string value, PatternMode mode) : this(value is null ? null /// The string name of the channel to create. /// Note that sharded subscriptions are completely separate to regular subscriptions; subscriptions /// using sharded channels must also be published with sharded channels (and vice versa). - public static RedisChannel Sharded(string value) => new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted); + public static RedisChannel Sharded(string value) => + new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted); + + /// + /// Create a key-notification channel for a single key in a single database. + /// + public static RedisChannel KeySpace(in RedisKey key, int database) + => BuildKeySpace(key, database, RedisChannelOptions.KeyRouted); + + /// + /// Create a key-notification channel for a pattern, optionally in a specified database. + /// + public static RedisChannel KeySpacePattern(in RedisKey pattern, int? database = null) + => BuildKeySpace(pattern, database, RedisChannelOptions.Pattern | RedisChannelOptions.MultiNode); + + private const int DatabaseScratchBufferSize = 16; // largest non-negative int32 is 10 digits + + private static ReadOnlySpan AppendDatabase(Span target, int? database, RedisChannelOptions options) + { + if (database is null) + { + if ((options & RedisChannelOptions.Pattern) == 0) throw new ArgumentNullException(nameof(database)); + return "*"u8; // don't worry about the inbound scratch buffer, this is fine + } + else + { + var db32 = database.GetValueOrDefault(); + if (db32 == 0) return "0"u8; // so common, we might as well special case + if (db32 < 0) throw new ArgumentOutOfRangeException(nameof(database)); + return target.Slice(0, Format.FormatInt32(db32, target)); + } + } + + /// + /// Create an event-notification channel for a given event type, optionally in a specified database. + /// +#pragma warning disable RS0027 + public static RedisChannel KeyEvent(KeyNotificationType type, int? database = null) +#pragma warning restore RS0027 + => KeyEvent(KeyNotificationTypeFastHash.GetRawBytes(type), database); + + /// + /// Create an event-notification channel for a given event type, optionally in a specified database. + /// + /// This API is intended for use with custom/unknown event types; for well-known types, use . + public static RedisChannel KeyEvent(ReadOnlySpan type, int? database) + { + if (type.IsEmpty) throw new ArgumentNullException(nameof(type)); + + RedisChannelOptions options = RedisChannelOptions.MultiNode; + if (database is null) options |= RedisChannelOptions.Pattern; + var db = AppendDatabase(stackalloc byte[DatabaseScratchBufferSize], database, options); + + // __keyevent@{db}__:{type} + var arr = new byte[14 + db.Length + type.Length]; + + var target = AppendAndAdvance(arr.AsSpan(), "__keyevent@"u8); + target = AppendAndAdvance(target, db); + target = AppendAndAdvance(target, "__:"u8); + target = AppendAndAdvance(target, type); + Debug.Assert(target.IsEmpty); // should have calculated length correctly + + return new RedisChannel(arr, options); + } + + private static Span AppendAndAdvance(Span target, scoped ReadOnlySpan value) + { + value.CopyTo(target); + return target.Slice(value.Length); + } + + private static RedisChannel BuildKeySpace(in RedisKey key, int? database, RedisChannelOptions options) + { + int keyLen; + if (key.IsNull) + { + if ((options & RedisChannelOptions.Pattern) == 0) throw new ArgumentNullException(nameof(key)); + keyLen = 1; + } + else + { + keyLen = key.TotalLength(); + if (keyLen == 0) throw new ArgumentOutOfRangeException(nameof(key)); + } + + var db = AppendDatabase(stackalloc byte[DatabaseScratchBufferSize], database, options); + + // __keyspace@{db}__:{key} + var arr = new byte[14 + db.Length + keyLen]; + + var target = AppendAndAdvance(arr.AsSpan(), "__keyspace@"u8); + target = AppendAndAdvance(target, db); + target = AppendAndAdvance(target, "__:"u8); + Debug.Assert(keyLen == target.Length); // should have exactly "len" bytes remaining + if (key.IsNull) + { + target[0] = (byte)'*'; + target = target.Slice(1); + } + else + { + target = target.Slice(key.CopyTo(target)); + } + Debug.Assert(target.IsEmpty); // should have calculated length correctly + return new RedisChannel(arr, options); + } internal RedisChannel(byte[]? value, RedisChannelOptions options) { @@ -351,7 +487,7 @@ public static implicit operator RedisChannel(byte[]? key) { return Encoding.UTF8.GetString(arr); } - catch (Exception e) when // Only catch exception throwed by Encoding.UTF8.GetString + catch (Exception e) when // Only catch exception thrown by Encoding.UTF8.GetString (e is DecoderFallbackException or ArgumentException or ArgumentNullException) { return BitConverter.ToString(arr); diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index c1c3c5728..056a5380a 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -1900,7 +1900,7 @@ public Task StringLongestCommonSubsequenceWithMatchesAsync(Redis public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel)); - var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); + var msg = Message.Create(-1, flags, channel.GetPublishCommand(), channel, message); // if we're actively subscribed: send via that connection (otherwise, follow normal rules) return ExecuteSync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel)); } @@ -1908,7 +1908,7 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel)); - var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); + var msg = Message.Create(-1, flags, channel.GetPublishCommand(), channel, message); // if we're actively subscribed: send via that connection (otherwise, follow normal rules) return ExecuteAsync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel)); } diff --git a/src/StackExchange.Redis/RedisSubscriber.cs b/src/StackExchange.Redis/RedisSubscriber.cs index 9ade78c2d..824aef025 100644 --- a/src/StackExchange.Redis/RedisSubscriber.cs +++ b/src/StackExchange.Redis/RedisSubscriber.cs @@ -1,11 +1,8 @@ using System; using System.Collections.Concurrent; using System.Diagnostics.CodeAnalysis; -using System.Diagnostics.SymbolStore; using System.Net; -using System.Threading; using System.Threading.Tasks; -using Pipelines.Sockets.Unofficial; using Pipelines.Sockets.Unofficial.Arenas; using static StackExchange.Redis.ConnectionMultiplexer; @@ -30,7 +27,7 @@ internal Subscription GetOrAddSubscription(in RedisChannel channel, CommandFlags { if (!subscriptions.TryGetValue(channel, out var sub)) { - sub = new Subscription(flags); + sub = channel.IsMultiNode ? new MultiNodeSubscription(flags) : new SingleNodeSubscription(flags); subscriptions.TryAdd(channel, sub); } return sub; @@ -71,7 +68,7 @@ internal bool GetSubscriberCounts(in RedisChannel channel, out int handlers, out { if (!channel.IsNullOrEmpty && subscriptions.TryGetValue(channel, out Subscription? sub)) { - return sub.GetCurrentServer(); + return sub.GetAnyCurrentServer(); } return null; } @@ -123,7 +120,7 @@ internal void UpdateSubscriptions() { foreach (var pair in subscriptions) { - pair.Value.UpdateServer(); + pair.Value.RemoveDisconnectedEndpoints(); } } @@ -135,13 +132,10 @@ internal long EnsureSubscriptions(CommandFlags flags = CommandFlags.None) { // TODO: Subscribe with variadic commands to reduce round trips long count = 0; + var subscriber = DefaultSubscriber; foreach (var pair in subscriptions) { - if (!pair.Value.IsConnected) - { - count++; - DefaultSubscriber.EnsureSubscribedToServer(pair.Value, pair.Key, flags, true); - } + count += pair.Value.EnsureSubscribedToServer(subscriber, pair.Key, flags, true); } return count; } @@ -151,161 +145,6 @@ internal enum SubscriptionAction Subscribe, Unsubscribe, } - - /// - /// This is the record of a single subscription to a redis server. - /// It's the singular channel (which may or may not be a pattern), to one or more handlers. - /// We subscriber to a redis server once (for all messages) and execute 1-many handlers when a message arrives. - /// - internal sealed class Subscription - { - private Action? _handlers; - private readonly object _handlersLock = new object(); - private ChannelMessageQueue? _queues; - private ServerEndPoint? CurrentServer; - public CommandFlags Flags { get; } - public ResultProcessor.TrackSubscriptionsProcessor Processor { get; } - - /// - /// Whether the we have is connected. - /// Since we clear on a disconnect, this should stay correct. - /// - internal bool IsConnected => CurrentServer?.IsSubscriberConnected == true; - - public Subscription(CommandFlags flags) - { - Flags = flags; - Processor = new ResultProcessor.TrackSubscriptionsProcessor(this); - } - - /// - /// Gets the configured (P)SUBSCRIBE or (P)UNSUBSCRIBE for an action. - /// - internal Message GetMessage(RedisChannel channel, SubscriptionAction action, CommandFlags flags, bool internalCall) - { - var command = action switch // note that the Routed flag doesn't impact the message here - just the routing - { - SubscriptionAction.Subscribe => (channel.Options & ~RedisChannel.RedisChannelOptions.KeyRouted) switch - { - RedisChannel.RedisChannelOptions.None => RedisCommand.SUBSCRIBE, - RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PSUBSCRIBE, - RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SSUBSCRIBE, - _ => Unknown(action, channel.Options), - }, - SubscriptionAction.Unsubscribe => (channel.Options & ~RedisChannel.RedisChannelOptions.KeyRouted) switch - { - RedisChannel.RedisChannelOptions.None => RedisCommand.UNSUBSCRIBE, - RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PUNSUBSCRIBE, - RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SUNSUBSCRIBE, - _ => Unknown(action, channel.Options), - }, - _ => Unknown(action, channel.Options), - }; - - // TODO: Consider flags here - we need to pass Fire and Forget, but don't want to intermingle Primary/Replica - var msg = Message.Create(-1, Flags | flags, command, channel); - msg.SetForSubscriptionBridge(); - if (internalCall) - { - msg.SetInternalCall(); - } - return msg; - } - - private RedisCommand Unknown(SubscriptionAction action, RedisChannel.RedisChannelOptions options) - => throw new ArgumentException($"Unable to determine pub/sub operation for '{action}' against '{options}'"); - - public void Add(Action? handler, ChannelMessageQueue? queue) - { - if (handler != null) - { - lock (_handlersLock) - { - _handlers += handler; - } - } - if (queue != null) - { - ChannelMessageQueue.Combine(ref _queues, queue); - } - } - - public bool Remove(Action? handler, ChannelMessageQueue? queue) - { - if (handler != null) - { - lock (_handlersLock) - { - _handlers -= handler; - } - } - if (queue != null) - { - ChannelMessageQueue.Remove(ref _queues, queue); - } - return _handlers == null & _queues == null; - } - - public ICompletable? ForInvoke(in RedisChannel channel, in RedisValue message, out ChannelMessageQueue? queues) - { - var handlers = _handlers; - queues = Volatile.Read(ref _queues); - return handlers == null ? null : new MessageCompletable(channel, message, handlers); - } - - internal void MarkCompleted() - { - lock (_handlersLock) - { - _handlers = null; - } - ChannelMessageQueue.MarkAllCompleted(ref _queues); - } - - internal void GetSubscriberCounts(out int handlers, out int queues) - { - queues = ChannelMessageQueue.Count(ref _queues); - var tmp = _handlers; - if (tmp == null) - { - handlers = 0; - } - else if (tmp.IsSingle()) - { - handlers = 1; - } - else - { - handlers = 0; - foreach (var sub in tmp.AsEnumerable()) { handlers++; } - } - } - - internal ServerEndPoint? GetCurrentServer() => Volatile.Read(ref CurrentServer); - internal void SetCurrentServer(ServerEndPoint? server) => CurrentServer = server; - // conditional clear - internal bool ClearCurrentServer(ServerEndPoint expected) - { - if (CurrentServer == expected) - { - CurrentServer = null; - return true; - } - - return false; - } - - /// - /// Evaluates state and if we're not currently connected, clears the server reference. - /// - internal void UpdateServer() - { - if (!IsConnected) - { - CurrentServer = null; - } - } - } } /// @@ -393,7 +232,7 @@ private static void ThrowIfNull(in RedisChannel channel) public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { ThrowIfNull(channel); - var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); + var msg = Message.Create(-1, flags, channel.GetPublishCommand(), channel, message); // if we're actively subscribed: send via that connection (otherwise, follow normal rules) return ExecuteSync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel)); } @@ -401,7 +240,7 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None) { ThrowIfNull(channel); - var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message); + var msg = Message.Create(-1, flags, channel.GetPublishCommand(), channel, message); // if we're actively subscribed: send via that connection (otherwise, follow normal rules) return ExecuteAsync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel)); } @@ -416,37 +255,26 @@ public ChannelMessageQueue Subscribe(RedisChannel channel, CommandFlags flags = return queue; } - private bool Subscribe(RedisChannel channel, Action? handler, ChannelMessageQueue? queue, CommandFlags flags) + private int Subscribe(RedisChannel channel, Action? handler, ChannelMessageQueue? queue, CommandFlags flags) { ThrowIfNull(channel); - if (handler == null && queue == null) { return true; } + if (handler == null && queue == null) { return 0; } var sub = multiplexer.GetOrAddSubscription(channel, flags); sub.Add(handler, queue); - return EnsureSubscribedToServer(sub, channel, flags, false); - } - - internal bool EnsureSubscribedToServer(Subscription sub, RedisChannel channel, CommandFlags flags, bool internalCall) - { - if (sub.IsConnected) { return true; } - - // TODO: Cleanup old hangers here? - sub.SetCurrentServer(null); // we're not appropriately connected, so blank it out for eligible reconnection - var message = sub.GetMessage(channel, SubscriptionAction.Subscribe, flags, internalCall); - var selected = multiplexer.SelectServer(message); - return ExecuteSync(message, sub.Processor, selected); + return sub.EnsureSubscribedToServer(this, channel, flags, false); } internal void ResubscribeToServer(Subscription sub, RedisChannel channel, ServerEndPoint serverEndPoint, string cause) { // conditional: only if that's the server we were connected to, or "none"; we don't want to end up duplicated - if (sub.ClearCurrentServer(serverEndPoint) || !sub.IsConnected) + if (sub.TryRemoveEndpoint(serverEndPoint) || !sub.IsConnectedAny()) { if (serverEndPoint.IsSubscriberConnected) { // we'll *try* for a simple resubscribe, following any -MOVED etc, but if that fails: fall back // to full reconfigure; importantly, note that we've already recorded the disconnect - var message = sub.GetMessage(channel, SubscriptionAction.Subscribe, CommandFlags.None, false); + var message = sub.GetSubscriptionMessage(channel, SubscriptionAction.Subscribe, CommandFlags.None, false); _ = ExecuteAsync(message, sub.Processor, serverEndPoint).ContinueWith( t => multiplexer.ReconfigureIfNeeded(serverEndPoint.EndPoint, false, cause: cause), TaskContinuationOptions.OnlyOnFaulted); @@ -470,25 +298,14 @@ public async Task SubscribeAsync(RedisChannel channel, Comm return queue; } - private Task SubscribeAsync(RedisChannel channel, Action? handler, ChannelMessageQueue? queue, CommandFlags flags, ServerEndPoint? server = null) + private Task SubscribeAsync(RedisChannel channel, Action? handler, ChannelMessageQueue? queue, CommandFlags flags, ServerEndPoint? server = null) { ThrowIfNull(channel); - if (handler == null && queue == null) { return CompletedTask.Default(null); } + if (handler == null && queue == null) { return CompletedTask.Default(null); } var sub = multiplexer.GetOrAddSubscription(channel, flags); sub.Add(handler, queue); - return EnsureSubscribedToServerAsync(sub, channel, flags, false, server); - } - - public Task EnsureSubscribedToServerAsync(Subscription sub, RedisChannel channel, CommandFlags flags, bool internalCall, ServerEndPoint? server = null) - { - if (sub.IsConnected) { return CompletedTask.Default(null); } - - // TODO: Cleanup old hangers here? - sub.SetCurrentServer(null); // we're not appropriately connected, so blank it out for eligible reconnection - var message = sub.GetMessage(channel, SubscriptionAction.Subscribe, flags, internalCall); - server ??= multiplexer.SelectServer(message); - return ExecuteAsync(message, sub.Processor, server); + return sub.EnsureSubscribedToServerAsync(this, channel, flags, false, server); } public EndPoint? SubscribedEndpoint(RedisChannel channel) => multiplexer.GetSubscribedServer(channel)?.EndPoint; @@ -500,21 +317,12 @@ public bool Unsubscribe(in RedisChannel channel, Action? handler, CommandFlags flags) => UnsubscribeAsync(channel, handler, null, flags); @@ -523,20 +331,10 @@ public Task UnsubscribeAsync(in RedisChannel channel, Action.Default(asyncState); } - private Task UnsubscribeFromServerAsync(Subscription sub, RedisChannel channel, CommandFlags flags, object? asyncState, bool internalCall) - { - if (sub.GetCurrentServer() is ServerEndPoint oldOwner) - { - var message = sub.GetMessage(channel, SubscriptionAction.Unsubscribe, flags, internalCall); - return multiplexer.ExecuteAsyncImpl(message, sub.Processor, asyncState, oldOwner); - } - return CompletedTask.FromResult(true, asyncState); - } - /// /// Unregisters a handler or queue and returns if we should remove it from the server. /// @@ -573,7 +371,7 @@ public void UnsubscribeAll(CommandFlags flags = CommandFlags.None) if (subs.TryRemove(pair.Key, out var sub)) { sub.MarkCompleted(); - UnsubscribeFromServer(sub, pair.Key, flags, false); + sub.UnsubscribeFromServer(this, pair.Key, flags, false); } } } @@ -588,7 +386,7 @@ public Task UnsubscribeAllAsync(CommandFlags flags = CommandFlags.None) if (subs.TryRemove(pair.Key, out var sub)) { sub.MarkCompleted(); - last = UnsubscribeFromServerAsync(sub, pair.Key, flags, asyncState, false); + last = sub.UnsubscribeFromServerAsync(this, pair.Key, flags, asyncState, false); } } return last ?? CompletedTask.Default(asyncState); diff --git a/src/StackExchange.Redis/RedisValue.cs b/src/StackExchange.Redis/RedisValue.cs index d306ca0d0..1f6947460 100644 --- a/src/StackExchange.Redis/RedisValue.cs +++ b/src/StackExchange.Redis/RedisValue.cs @@ -1245,5 +1245,16 @@ internal ValueCondition Digest() return digest; } } + + internal bool TryGetSpan(out ReadOnlySpan span) + { + if (_objectOrSentinel == Sentinel_Raw) + { + span = _memory.Span; + return true; + } + span = default; + return false; + } } } diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs index 196cabde5..f2c6deb8b 100644 --- a/src/StackExchange.Redis/ResultProcessor.cs +++ b/src/StackExchange.Redis/ResultProcessor.cs @@ -469,12 +469,21 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes connection.SubscriptionCount = count; SetResult(message, true); - var newServer = message.Command switch + var ep = connection.BridgeCouldBeNull?.ServerEndPoint; + if (ep is not null) { - RedisCommand.SUBSCRIBE or RedisCommand.SSUBSCRIBE or RedisCommand.PSUBSCRIBE => connection.BridgeCouldBeNull?.ServerEndPoint, - _ => null, - }; - Subscription?.SetCurrentServer(newServer); + switch (message.Command) + { + case RedisCommand.SUBSCRIBE: + case RedisCommand.SSUBSCRIBE: + case RedisCommand.PSUBSCRIBE: + Subscription?.AddEndpoint(ep); + break; + default: + Subscription?.TryRemoveEndpoint(ep); + break; + } + } return true; } } diff --git a/src/StackExchange.Redis/Subscription.cs b/src/StackExchange.Redis/Subscription.cs new file mode 100644 index 000000000..2d877c9eb --- /dev/null +++ b/src/StackExchange.Redis/Subscription.cs @@ -0,0 +1,496 @@ +using System; +using System.Buffers; +using System.Collections.Concurrent; +using System.Net; +using System.Threading; +using System.Threading.Tasks; +using Pipelines.Sockets.Unofficial; + +namespace StackExchange.Redis; + +public partial class ConnectionMultiplexer +{ + /// + /// This is the record of a single subscription to a redis server. + /// It's the singular channel (which may or may not be a pattern), to one or more handlers. + /// We subscriber to a redis server once (for all messages) and execute 1-many handlers when a message arrives. + /// + internal abstract class Subscription + { + private Action? _handlers; + private readonly object _handlersLock = new(); + private ChannelMessageQueue? _queues; + public CommandFlags Flags { get; } + public ResultProcessor.TrackSubscriptionsProcessor Processor { get; } + + internal abstract bool IsConnectedAny(); + internal abstract bool IsConnectedTo(EndPoint endpoint); + + internal abstract void AddEndpoint(ServerEndPoint server); + + // conditional clear + internal abstract bool TryRemoveEndpoint(ServerEndPoint expected); + + internal abstract void RemoveDisconnectedEndpoints(); + + // returns the number of changes required + internal abstract int EnsureSubscribedToServer( + RedisSubscriber subscriber, + in RedisChannel channel, + CommandFlags flags, + bool internalCall); + + // returns the number of changes required + internal abstract Task EnsureSubscribedToServerAsync( + RedisSubscriber subscriber, + RedisChannel channel, + CommandFlags flags, + bool internalCall, + ServerEndPoint? server = null); + + internal abstract bool UnsubscribeFromServer( + RedisSubscriber subscriber, + in RedisChannel channel, + CommandFlags flags, + bool internalCall); + + internal abstract Task UnsubscribeFromServerAsync( + RedisSubscriber subscriber, + RedisChannel channel, + CommandFlags flags, + object? asyncState, + bool internalCall); + + internal abstract int GetConnectionCount(); + + internal abstract ServerEndPoint? GetAnyCurrentServer(); + + public Subscription(CommandFlags flags) + { + Flags = flags; + Processor = new ResultProcessor.TrackSubscriptionsProcessor(this); + } + + /// + /// Gets the configured (P)SUBSCRIBE or (P)UNSUBSCRIBE for an action. + /// + internal Message GetSubscriptionMessage( + RedisChannel channel, + SubscriptionAction action, + CommandFlags flags, + bool internalCall) + { + var command = + action switch // note that the Routed flag doesn't impact the message here - just the routing + { + SubscriptionAction.Subscribe => (channel.Options & + ~RedisChannel.RedisChannelOptions + .KeyRouted) switch + { + RedisChannel.RedisChannelOptions.None => RedisCommand.SUBSCRIBE, + RedisChannel.RedisChannelOptions.MultiNode => RedisCommand.SUBSCRIBE, + RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PSUBSCRIBE, + RedisChannel.RedisChannelOptions.Pattern | RedisChannel.RedisChannelOptions.MultiNode => + RedisCommand.PSUBSCRIBE, + RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SSUBSCRIBE, + _ => Unknown(action, channel.Options), + }, + SubscriptionAction.Unsubscribe => (channel.Options & + ~RedisChannel.RedisChannelOptions.KeyRouted) switch + { + RedisChannel.RedisChannelOptions.None => RedisCommand.UNSUBSCRIBE, + RedisChannel.RedisChannelOptions.MultiNode => RedisCommand.UNSUBSCRIBE, + RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PUNSUBSCRIBE, + RedisChannel.RedisChannelOptions.Pattern | RedisChannel.RedisChannelOptions.MultiNode => + RedisCommand.PUNSUBSCRIBE, + RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SUNSUBSCRIBE, + _ => Unknown(action, channel.Options), + }, + _ => Unknown(action, channel.Options), + }; + + // TODO: Consider flags here - we need to pass Fire and Forget, but don't want to intermingle Primary/Replica + var msg = Message.Create(-1, Flags | flags, command, channel); + msg.SetForSubscriptionBridge(); + if (internalCall) + { + msg.SetInternalCall(); + } + + return msg; + } + + private RedisCommand Unknown(SubscriptionAction action, RedisChannel.RedisChannelOptions options) + => throw new ArgumentException( + $"Unable to determine pub/sub operation for '{action}' against '{options}'"); + + public void Add(Action? handler, ChannelMessageQueue? queue) + { + if (handler != null) + { + lock (_handlersLock) + { + _handlers += handler; + } + } + + if (queue != null) + { + ChannelMessageQueue.Combine(ref _queues, queue); + } + } + + public bool Remove(Action? handler, ChannelMessageQueue? queue) + { + if (handler != null) + { + lock (_handlersLock) + { + _handlers -= handler; + } + } + + if (queue != null) + { + ChannelMessageQueue.Remove(ref _queues, queue); + } + + return _handlers == null & _queues == null; + } + + public ICompletable? ForInvoke(in RedisChannel channel, in RedisValue message, out ChannelMessageQueue? queues) + { + var handlers = _handlers; + queues = Volatile.Read(ref _queues); + return handlers == null ? null : new MessageCompletable(channel, message, handlers); + } + + internal void MarkCompleted() + { + lock (_handlersLock) + { + _handlers = null; + } + + ChannelMessageQueue.MarkAllCompleted(ref _queues); + } + + internal void GetSubscriberCounts(out int handlers, out int queues) + { + queues = ChannelMessageQueue.Count(ref _queues); + var tmp = _handlers; + if (tmp == null) + { + handlers = 0; + } + else if (tmp.IsSingle()) + { + handlers = 1; + } + else + { + handlers = 0; + foreach (var sub in tmp.AsEnumerable()) { handlers++; } + } + } + } + + // used for most subscriptions; routed to a single node + internal sealed class SingleNodeSubscription(CommandFlags flags) : Subscription(flags) + { + internal override bool IsConnectedAny() => _currentServer is { IsSubscriberConnected: true }; + + internal override int GetConnectionCount() => IsConnectedAny() ? 1 : 0; + + internal override bool IsConnectedTo(EndPoint endpoint) + { + var server = _currentServer; + return server is { IsSubscriberConnected: true } && server.EndPoint == endpoint; + } + + internal override void AddEndpoint(ServerEndPoint server) => _currentServer = server; + + internal override bool TryRemoveEndpoint(ServerEndPoint expected) + { + if (_currentServer == expected) + { + _currentServer = null; + return true; + } + + return false; + } + + internal override bool UnsubscribeFromServer( + RedisSubscriber subscriber, + in RedisChannel channel, + CommandFlags flags, + bool internalCall) + { + var server = _currentServer; + if (server is not null) + { + var message = GetSubscriptionMessage(channel, SubscriptionAction.Unsubscribe, flags, internalCall); + return subscriber.multiplexer.ExecuteSyncImpl(message, Processor, server); + } + + return true; + } + + internal override Task UnsubscribeFromServerAsync( + RedisSubscriber subscriber, + RedisChannel channel, + CommandFlags flags, + object? asyncState, + bool internalCall) + { + var server = _currentServer; + if (server is not null) + { + var message = GetSubscriptionMessage(channel, SubscriptionAction.Unsubscribe, flags, internalCall); + return subscriber.multiplexer.ExecuteAsyncImpl(message, Processor, asyncState, server); + } + + return CompletedTask.FromResult(true, asyncState); + } + + private ServerEndPoint? _currentServer; + internal ServerEndPoint? GetCurrentServer() => Volatile.Read(ref _currentServer); + + internal override ServerEndPoint? GetAnyCurrentServer() => Volatile.Read(ref _currentServer); + + /// + /// Evaluates state and if we're not currently connected, clears the server reference. + /// + internal override void RemoveDisconnectedEndpoints() + { + var server = _currentServer; + if (server is { IsSubscriberConnected: false }) + { + _currentServer = null; + } + } + + internal override int EnsureSubscribedToServer( + RedisSubscriber subscriber, + in RedisChannel channel, + CommandFlags flags, + bool internalCall) + { + if (IsConnectedAny()) return 0; + + // we're not appropriately connected, so blank it out for eligible reconnection + _currentServer = null; + var message = GetSubscriptionMessage(channel, SubscriptionAction.Subscribe, flags, internalCall); + var selected = subscriber.multiplexer.SelectServer(message); + _ = subscriber.ExecuteSync(message, Processor, selected); + return 1; + } + + internal override async Task EnsureSubscribedToServerAsync( + RedisSubscriber subscriber, + RedisChannel channel, + CommandFlags flags, + bool internalCall, + ServerEndPoint? server = null) + { + if (IsConnectedAny()) return 0; + + // we're not appropriately connected, so blank it out for eligible reconnection + _currentServer = null; + var message = GetSubscriptionMessage(channel, SubscriptionAction.Subscribe, flags, internalCall); + server ??= subscriber.multiplexer.SelectServer(message); + await subscriber.ExecuteAsync(message, Processor, server).ForAwait(); + return 1; + } + } + + // used for keyspace subscriptions, which are routed to multiple nodes + internal sealed class MultiNodeSubscription(CommandFlags flags) : Subscription(flags) + { + private readonly ConcurrentDictionary _servers = new(); + + internal override bool IsConnectedAny() + { + foreach (var server in _servers) + { + if (server.Value is { IsSubscriberConnected: true }) return true; + } + + return false; + } + + internal override int GetConnectionCount() + { + int count = 0; + foreach (var server in _servers) + { + if (server.Value is { IsSubscriberConnected: true }) count++; + } + + return count; + } + + internal override bool IsConnectedTo(EndPoint endpoint) + => _servers.TryGetValue(endpoint, out var server) + && server.IsSubscriberConnected; + + internal override void AddEndpoint(ServerEndPoint server) + { + var ep = server.EndPoint; + if (!_servers.TryAdd(ep, server)) + { + _servers[ep] = server; + } + } + + internal override bool TryRemoveEndpoint(ServerEndPoint expected) + { + return _servers.TryRemove(expected.EndPoint, out _); + } + + internal override ServerEndPoint? GetAnyCurrentServer() + { + ServerEndPoint? last = null; + // prefer actively connected servers, but settle for anything + foreach (var server in _servers) + { + last = server.Value; + if (last is { IsSubscriberConnected: true }) + { + break; + } + } + + return last; + } + + internal override void RemoveDisconnectedEndpoints() + { + // This looks more complicated than it is, because of avoiding mutating the collection + // while iterating; instead, buffer any removals in a scratch buffer, and remove them in a second pass. + EndPoint[] scratch = []; + int count = 0; + foreach (var server in _servers) + { + if (server.Value.IsSubscriberConnected) + { + // flag for removal + if (scratch.Length == count) // need to resize the scratch buffer, using the pool + { + // let the array pool worry about min-sizing etc + var newLease = ArrayPool.Shared.Rent(count + 1); + scratch.CopyTo(newLease, 0); + ArrayPool.Shared.Return(scratch); + scratch = newLease; + } + + scratch[count++] = server.Key; + } + } + + // did we find anything to remove? + if (count != 0) + { + foreach (var ep in scratch.AsSpan(0, count)) + { + _servers.TryRemove(ep, out _); + } + } + + ArrayPool.Shared.Return(scratch); + } + + internal override int EnsureSubscribedToServer( + RedisSubscriber subscriber, + in RedisChannel channel, + CommandFlags flags, + bool internalCall) + { + int delta = 0; + var muxer = subscriber.multiplexer; + foreach (var server in muxer.GetServerSnapshot()) + { + // exclude sentinel, and only use replicas if we're explicitly asking for them + bool useReplica = (Flags & CommandFlags.DemandReplica) != 0; + if (server.ServerType != ServerType.Sentinel & server.IsReplica == useReplica) + { + if (!IsConnectedTo(server.EndPoint)) + { + var message = GetSubscriptionMessage(channel, SubscriptionAction.Subscribe, flags, internalCall); + subscriber.ExecuteSync(message, Processor, server); + delta++; + } + } + } + + return delta; + } + + internal override async Task EnsureSubscribedToServerAsync( + RedisSubscriber subscriber, + RedisChannel channel, + CommandFlags flags, + bool internalCall, + ServerEndPoint? server = null) + { + int delta = 0; + var muxer = subscriber.multiplexer; + var snapshot = muxer.GetServerSnaphotMemory(); + var len = snapshot.Length; + for (int i = 0; i < len; i++) + { + var loopServer = snapshot.Span[i]; // spans and async do not mix well + if (server is null || server == loopServer) + { + // exclude sentinel, and only use replicas if we're explicitly asking for them + bool useReplica = (Flags & CommandFlags.DemandReplica) != 0; + if (loopServer.ServerType != ServerType.Sentinel & loopServer.IsReplica == useReplica) + { + if (!IsConnectedTo(loopServer.EndPoint)) + { + var message = GetSubscriptionMessage(channel, SubscriptionAction.Subscribe, flags, internalCall); + await subscriber.ExecuteAsync(message, Processor, loopServer).ForAwait(); + delta++; + } + } + } + } + + return delta; + } + + internal override bool UnsubscribeFromServer( + RedisSubscriber subscriber, + in RedisChannel channel, + CommandFlags flags, + bool internalCall) + { + bool any = false; + foreach (var server in _servers) + { + var message = GetSubscriptionMessage(channel, SubscriptionAction.Unsubscribe, flags, internalCall); + any |= subscriber.ExecuteSync(message, Processor, server.Value); + } + + return any; + } + + internal override async Task UnsubscribeFromServerAsync( + RedisSubscriber subscriber, + RedisChannel channel, + CommandFlags flags, + object? asyncState, + bool internalCall) + { + bool any = false; + foreach (var server in _servers) + { + var message = GetSubscriptionMessage(channel, SubscriptionAction.Unsubscribe, flags, internalCall); + any |= await subscriber.ExecuteAsync(message, Processor, server.Value).ForAwait(); + } + + return any; + } + } +} diff --git a/tests/StackExchange.Redis.Tests/FailoverTests.cs b/tests/StackExchange.Redis.Tests/FailoverTests.cs index 1f33275b5..33b24f16f 100644 --- a/tests/StackExchange.Redis.Tests/FailoverTests.cs +++ b/tests/StackExchange.Redis.Tests/FailoverTests.cs @@ -263,7 +263,7 @@ public async Task SubscriptionsSurviveConnectionFailureAsync() foreach (var pair in muxerSubs) { var muxerSub = pair.Value; - Log($" Muxer Sub: {pair.Key}: (EndPoint: {muxerSub.GetCurrentServer()}, Connected: {muxerSub.IsConnected})"); + Log($" Muxer Sub: {pair.Key}: (EndPoint: {muxerSub.GetAnyCurrentServer()}, Connected: {muxerSub.IsConnectedAny()})"); } Log("Publishing"); diff --git a/tests/StackExchange.Redis.Tests/FastHashTests.cs b/tests/StackExchange.Redis.Tests/FastHashTests.cs index 418198cfd..a032cfc80 100644 --- a/tests/StackExchange.Redis.Tests/FastHashTests.cs +++ b/tests/StackExchange.Redis.Tests/FastHashTests.cs @@ -2,13 +2,14 @@ using System.Runtime.InteropServices; using System.Text; using Xunit; +using Xunit.Sdk; #pragma warning disable CS8981, SA1134, SA1300, SA1303, SA1502 // names are weird in this test! // ReSharper disable InconsistentNaming - to better represent expected literals // ReSharper disable IdentifierTypo namespace StackExchange.Redis.Tests; -public partial class FastHashTests +public partial class FastHashTests(ITestOutputHelper log) { // note: if the hashing algorithm changes, we can update the last parameter freely; it doesn't matter // what it *is* - what matters is that we can see that it has entropy between different values @@ -83,6 +84,46 @@ public void FastHashIs_Long() Assert.False(abcdefghijklmnopqrst.Is(hash, value)); } + [Fact] + public void KeyNotificationTypeFastHash_MinMaxBytes_ReflectsActualLengths() + { + // Use reflection to find all nested types in KeyNotificationTypeFastHash + var fastHashType = typeof(KeyNotificationTypeFastHash); + var nestedTypes = fastHashType.GetNestedTypes(System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Static); + + int? minLength = null; + int? maxLength = null; + + foreach (var nestedType in nestedTypes) + { + // Look for the Length field (generated by FastHash source generator) + var lengthField = nestedType.GetField("Length", System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Static); + if (lengthField != null && lengthField.FieldType == typeof(int)) + { + var length = (int)lengthField.GetValue(null)!; + + if (minLength == null || length < minLength) + { + minLength = length; + } + + if (maxLength == null || length > maxLength) + { + maxLength = length; + } + } + } + + // Assert that we found at least some nested types with Length fields + Assert.NotNull(minLength); + Assert.NotNull(maxLength); + + // Assert that MinBytes and MaxBytes match the actual min/max lengths + log.WriteLine($"MinBytes: {KeyNotificationTypeFastHash.MinBytes}, MaxBytes: {KeyNotificationTypeFastHash.MaxBytes}"); + Assert.Equal(KeyNotificationTypeFastHash.MinBytes, minLength.Value); + Assert.Equal(KeyNotificationTypeFastHash.MaxBytes, maxLength.Value); + } + [FastHash] private static partial class a { } [FastHash] private static partial class ab { } [FastHash] private static partial class abc { } diff --git a/tests/StackExchange.Redis.Tests/KeyNotificationTests.cs b/tests/StackExchange.Redis.Tests/KeyNotificationTests.cs new file mode 100644 index 000000000..b9548eb44 --- /dev/null +++ b/tests/StackExchange.Redis.Tests/KeyNotificationTests.cs @@ -0,0 +1,500 @@ +using System; +using System.Buffers; +using System.Text; +using Xunit; + +namespace StackExchange.Redis.Tests; + +public class KeyNotificationTests(ITestOutputHelper log) +{ + [Fact] + public void Keyspace_Del_ParsesCorrectly() + { + // __keyspace@1__:mykey with payload "del" + var channel = RedisChannel.Literal("__keyspace@1__:mykey"); + RedisValue value = "del"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeySpace); + Assert.False(notification.IsKeyEvent); + Assert.Equal(1, notification.Database); + Assert.Equal(KeyNotificationType.Del, notification.Type); + Assert.Equal("mykey", (string?)notification.GetKey()); + Assert.Equal(5, notification.KeyByteCount); + } + + [Fact] + public void Keyevent_Del_ParsesCorrectly() + { + // __keyevent@42__:del with value "mykey" + var channel = RedisChannel.Literal("__keyevent@42__:del"); + RedisValue value = "mykey"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.False(notification.IsKeySpace); + Assert.True(notification.IsKeyEvent); + Assert.Equal(42, notification.Database); + Assert.Equal(KeyNotificationType.Del, notification.Type); + Assert.Equal("mykey", (string?)notification.GetKey()); + Assert.Equal(5, notification.KeyByteCount); + } + + [Fact] + public void Keyspace_Set_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyspace@0__:testkey"); + RedisValue value = "set"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeySpace); + Assert.Equal(0, notification.Database); + Assert.Equal(KeyNotificationType.Set, notification.Type); + Assert.Equal("testkey", (string?)notification.GetKey()); + } + + [Fact] + public void Keyevent_Expire_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyevent@5__:expire"); + RedisValue value = "session:12345"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeyEvent); + Assert.Equal(5, notification.Database); + Assert.Equal(KeyNotificationType.Expire, notification.Type); + Assert.Equal("session:12345", (string?)notification.GetKey()); + } + + [Fact] + public void Keyspace_Expired_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyspace@3__:cache:item"); + RedisValue value = "expired"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeySpace); + Assert.Equal(3, notification.Database); + Assert.Equal(KeyNotificationType.Expired, notification.Type); + Assert.Equal("cache:item", (string?)notification.GetKey()); + } + + [Fact] + public void Keyevent_LPush_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyevent@0__:lpush"); + RedisValue value = "queue:tasks"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeyEvent); + Assert.Equal(0, notification.Database); + Assert.Equal(KeyNotificationType.LPush, notification.Type); + Assert.Equal("queue:tasks", (string?)notification.GetKey()); + } + + [Fact] + public void Keyspace_HSet_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyspace@2__:user:1000"); + RedisValue value = "hset"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeySpace); + Assert.Equal(2, notification.Database); + Assert.Equal(KeyNotificationType.HSet, notification.Type); + Assert.Equal("user:1000", (string?)notification.GetKey()); + } + + [Fact] + public void Keyevent_ZAdd_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyevent@7__:zadd"); + RedisValue value = "leaderboard"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeyEvent); + Assert.Equal(7, notification.Database); + Assert.Equal(KeyNotificationType.ZAdd, notification.Type); + Assert.Equal("leaderboard", (string?)notification.GetKey()); + } + + [Fact] + public void TryCopyKey_WorksCorrectly() + { + var channel = RedisChannel.Literal("__keyspace@0__:testkey"); + RedisValue value = "set"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + var lease = ArrayPool.Shared.Rent(20); + Span buffer = lease.AsSpan(0, 20); + Assert.True(notification.TryCopyKey(buffer, out var bytesWritten)); + Assert.Equal(7, bytesWritten); + Assert.Equal("testkey", Encoding.UTF8.GetString(lease, 0, bytesWritten)); + ArrayPool.Shared.Return(lease); + } + + [Fact] + public void TryCopyKey_FailsWithSmallBuffer() + { + var channel = RedisChannel.Literal("__keyspace@0__:testkey"); + RedisValue value = "set"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Span buffer = stackalloc byte[3]; // too small + Assert.False(notification.TryCopyKey(buffer, out var bytesWritten)); + Assert.Equal(0, bytesWritten); + } + + [Fact] + public void InvalidChannel_ReturnsFalse() + { + var channel = RedisChannel.Literal("regular:channel"); + RedisValue value = "data"; + + Assert.False(KeyNotification.TryParse(in channel, in value, out var notification)); + } + + [Fact] + public void InvalidKeyspaceChannel_MissingDelimiter_ReturnsFalse() + { + var channel = RedisChannel.Literal("__keyspace@0__"); // missing the key part + RedisValue value = "set"; + + Assert.False(KeyNotification.TryParse(in channel, in value, out var notification)); + } + + [Fact] + public void Keyspace_UnknownEventType_ReturnsUnknown() + { + var channel = RedisChannel.Literal("__keyspace@0__:mykey"); + RedisValue value = "unknownevent"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeySpace); + Assert.Equal(0, notification.Database); + Assert.Equal(KeyNotificationType.Unknown, notification.Type); + Assert.Equal("mykey", (string?)notification.GetKey()); + } + + [Fact] + public void Keyevent_UnknownEventType_ReturnsUnknown() + { + var channel = RedisChannel.Literal("__keyevent@0__:unknownevent"); + RedisValue value = "mykey"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeyEvent); + Assert.Equal(0, notification.Database); + Assert.Equal(KeyNotificationType.Unknown, notification.Type); + Assert.Equal("mykey", (string?)notification.GetKey()); + } + + [Fact] + public void Keyspace_WithColonInKey_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyspace@0__:user:session:12345"); + RedisValue value = "del"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeySpace); + Assert.Equal(0, notification.Database); + Assert.Equal(KeyNotificationType.Del, notification.Type); + Assert.Equal("user:session:12345", (string?)notification.GetKey()); + } + + [Fact] + public void Keyevent_Evicted_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyevent@1__:evicted"); + RedisValue value = "cache:old"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeyEvent); + Assert.Equal(1, notification.Database); + Assert.Equal(KeyNotificationType.Evicted, notification.Type); + Assert.Equal("cache:old", (string?)notification.GetKey()); + } + + [Fact] + public void Keyspace_New_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyspace@0__:newkey"); + RedisValue value = "new"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeySpace); + Assert.Equal(0, notification.Database); + Assert.Equal(KeyNotificationType.New, notification.Type); + Assert.Equal("newkey", (string?)notification.GetKey()); + } + + [Fact] + public void Keyevent_XGroupCreate_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyevent@0__:xgroup-create"); + RedisValue value = "mystream"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeyEvent); + Assert.Equal(0, notification.Database); + Assert.Equal(KeyNotificationType.XGroupCreate, notification.Type); + Assert.Equal("mystream", (string?)notification.GetKey()); + } + + [Fact] + public void Keyspace_TypeChanged_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyspace@0__:mykey"); + RedisValue value = "type_changed"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeySpace); + Assert.Equal(0, notification.Database); + Assert.Equal(KeyNotificationType.TypeChanged, notification.Type); + Assert.Equal("mykey", (string?)notification.GetKey()); + } + + [Fact] + public void Keyevent_HighDatabaseNumber_ParsesCorrectly() + { + var channel = RedisChannel.Literal("__keyevent@999__:set"); + RedisValue value = "testkey"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeyEvent); + Assert.Equal(999, notification.Database); + Assert.Equal(KeyNotificationType.Set, notification.Type); + Assert.Equal("testkey", (string?)notification.GetKey()); + } + + [Fact] + public void Keyevent_NonIntegerDatabase_ParsesWellEnough() + { + var channel = RedisChannel.Literal("__keyevent@abc__:set"); + RedisValue value = "testkey"; + + Assert.True(KeyNotification.TryParse(in channel, in value, out var notification)); + + Assert.True(notification.IsKeyEvent); + Assert.Equal(-1, notification.Database); + Assert.Equal(KeyNotificationType.Set, notification.Type); + Assert.Equal("testkey", (string?)notification.GetKey()); + } + + [Fact] + public void DefaultKeyNotification_HasExpectedProperties() + { + var notification = default(KeyNotification); + + Assert.False(notification.IsKeySpace); + Assert.False(notification.IsKeyEvent); + Assert.Equal(-1, notification.Database); + Assert.Equal(KeyNotificationType.Unknown, notification.Type); + Assert.True(notification.GetKey().IsNull); + Assert.Equal(0, notification.KeyByteCount); + Assert.True(notification.Channel.IsNull); + Assert.True(notification.Value.IsNull); + + // TryCopyKey should return false and write 0 bytes + Span buffer = stackalloc byte[10]; + Assert.False(notification.TryCopyKey(buffer, out var bytesWritten)); + Assert.Equal(0, bytesWritten); + } + + [Theory] + [InlineData(KeyNotificationTypeFastHash.append.Text, KeyNotificationType.Append)] + [InlineData(KeyNotificationTypeFastHash.copy.Text, KeyNotificationType.Copy)] + [InlineData(KeyNotificationTypeFastHash.del.Text, KeyNotificationType.Del)] + [InlineData(KeyNotificationTypeFastHash.expire.Text, KeyNotificationType.Expire)] + [InlineData(KeyNotificationTypeFastHash.hdel.Text, KeyNotificationType.HDel)] + [InlineData(KeyNotificationTypeFastHash.hexpired.Text, KeyNotificationType.HExpired)] + [InlineData(KeyNotificationTypeFastHash.hincrbyfloat.Text, KeyNotificationType.HIncrByFloat)] + [InlineData(KeyNotificationTypeFastHash.hincrby.Text, KeyNotificationType.HIncrBy)] + [InlineData(KeyNotificationTypeFastHash.hpersist.Text, KeyNotificationType.HPersist)] + [InlineData(KeyNotificationTypeFastHash.hset.Text, KeyNotificationType.HSet)] + [InlineData(KeyNotificationTypeFastHash.incrbyfloat.Text, KeyNotificationType.IncrByFloat)] + [InlineData(KeyNotificationTypeFastHash.incrby.Text, KeyNotificationType.IncrBy)] + [InlineData(KeyNotificationTypeFastHash.linsert.Text, KeyNotificationType.LInsert)] + [InlineData(KeyNotificationTypeFastHash.lpop.Text, KeyNotificationType.LPop)] + [InlineData(KeyNotificationTypeFastHash.lpush.Text, KeyNotificationType.LPush)] + [InlineData(KeyNotificationTypeFastHash.lrem.Text, KeyNotificationType.LRem)] + [InlineData(KeyNotificationTypeFastHash.lset.Text, KeyNotificationType.LSet)] + [InlineData(KeyNotificationTypeFastHash.ltrim.Text, KeyNotificationType.LTrim)] + [InlineData(KeyNotificationTypeFastHash.move_from.Text, KeyNotificationType.MoveFrom)] + [InlineData(KeyNotificationTypeFastHash.move_to.Text, KeyNotificationType.MoveTo)] + [InlineData(KeyNotificationTypeFastHash.persist.Text, KeyNotificationType.Persist)] + [InlineData(KeyNotificationTypeFastHash.rename_from.Text, KeyNotificationType.RenameFrom)] + [InlineData(KeyNotificationTypeFastHash.rename_to.Text, KeyNotificationType.RenameTo)] + [InlineData(KeyNotificationTypeFastHash.restore.Text, KeyNotificationType.Restore)] + [InlineData(KeyNotificationTypeFastHash.rpop.Text, KeyNotificationType.RPop)] + [InlineData(KeyNotificationTypeFastHash.rpush.Text, KeyNotificationType.RPush)] + [InlineData(KeyNotificationTypeFastHash.sadd.Text, KeyNotificationType.SAdd)] + [InlineData(KeyNotificationTypeFastHash.set.Text, KeyNotificationType.Set)] + [InlineData(KeyNotificationTypeFastHash.setrange.Text, KeyNotificationType.SetRange)] + [InlineData(KeyNotificationTypeFastHash.sortstore.Text, KeyNotificationType.SortStore)] + [InlineData(KeyNotificationTypeFastHash.srem.Text, KeyNotificationType.SRem)] + [InlineData(KeyNotificationTypeFastHash.spop.Text, KeyNotificationType.SPop)] + [InlineData(KeyNotificationTypeFastHash.xadd.Text, KeyNotificationType.XAdd)] + [InlineData(KeyNotificationTypeFastHash.xdel.Text, KeyNotificationType.XDel)] + [InlineData(KeyNotificationTypeFastHash.xgroup_createconsumer.Text, KeyNotificationType.XGroupCreateConsumer)] + [InlineData(KeyNotificationTypeFastHash.xgroup_create.Text, KeyNotificationType.XGroupCreate)] + [InlineData(KeyNotificationTypeFastHash.xgroup_delconsumer.Text, KeyNotificationType.XGroupDelConsumer)] + [InlineData(KeyNotificationTypeFastHash.xgroup_destroy.Text, KeyNotificationType.XGroupDestroy)] + [InlineData(KeyNotificationTypeFastHash.xgroup_setid.Text, KeyNotificationType.XGroupSetId)] + [InlineData(KeyNotificationTypeFastHash.xsetid.Text, KeyNotificationType.XSetId)] + [InlineData(KeyNotificationTypeFastHash.xtrim.Text, KeyNotificationType.XTrim)] + [InlineData(KeyNotificationTypeFastHash.zadd.Text, KeyNotificationType.ZAdd)] + [InlineData(KeyNotificationTypeFastHash.zdiffstore.Text, KeyNotificationType.ZDiffStore)] + [InlineData(KeyNotificationTypeFastHash.zinterstore.Text, KeyNotificationType.ZInterStore)] + [InlineData(KeyNotificationTypeFastHash.zunionstore.Text, KeyNotificationType.ZUnionStore)] + [InlineData(KeyNotificationTypeFastHash.zincr.Text, KeyNotificationType.ZIncr)] + [InlineData(KeyNotificationTypeFastHash.zrembyrank.Text, KeyNotificationType.ZRemByRank)] + [InlineData(KeyNotificationTypeFastHash.zrembyscore.Text, KeyNotificationType.ZRemByScore)] + [InlineData(KeyNotificationTypeFastHash.zrem.Text, KeyNotificationType.ZRem)] + [InlineData(KeyNotificationTypeFastHash.expired.Text, KeyNotificationType.Expired)] + [InlineData(KeyNotificationTypeFastHash.evicted.Text, KeyNotificationType.Evicted)] + [InlineData(KeyNotificationTypeFastHash._new.Text, KeyNotificationType.New)] + [InlineData(KeyNotificationTypeFastHash.overwritten.Text, KeyNotificationType.Overwritten)] + [InlineData(KeyNotificationTypeFastHash.type_changed.Text, KeyNotificationType.TypeChanged)] + public unsafe void FastHashParse_AllKnownValues_ParseCorrectly(string raw, KeyNotificationType parsed) + { + var arr = ArrayPool.Shared.Rent(Encoding.UTF8.GetMaxByteCount(raw.Length)); + int bytes; + fixed (byte* bPtr = arr) // encode into the buffer + { + fixed (char* cPtr = raw) + { + bytes = Encoding.UTF8.GetBytes(cPtr, raw.Length, bPtr, arr.Length); + } + } + + var result = KeyNotificationTypeFastHash.Parse(arr.AsSpan(0, bytes)); + log.WriteLine($"Parsed '{raw}' as {result}"); + Assert.Equal(parsed, result); + + // and the other direction: + var fetchedBytes = KeyNotificationTypeFastHash.GetRawBytes(parsed); + string fetched; + fixed (byte* bPtr = fetchedBytes) + { + fetched = Encoding.UTF8.GetString(bPtr, fetchedBytes.Length); + } + + log.WriteLine($"Fetched '{raw}'"); + Assert.Equal(raw, fetched); + + ArrayPool.Shared.Return(arr); + } + + [Fact] + public void CreateKeySpaceNotification_Valid() + { + var channel = RedisChannel.KeySpace("abc", 42); + Assert.Equal("__keyspace@42__:abc", channel.ToString()); + Assert.False(channel.IsMultiNode); + Assert.True(channel.IsKeyRouted); + Assert.False(channel.IsSharded); + Assert.False(channel.IsPattern); + } + + [Theory] + [InlineData(null, null, "__keyspace@*__:*")] + [InlineData("abc*", null, "__keyspace@*__:abc*")] + [InlineData(null, 42, "__keyspace@42__:*")] + [InlineData("abc*", 42, "__keyspace@42__:abc*")] + public void CreateKeySpaceNotificationPattern(string? pattern, int? database, string expected) + { + var channel = RedisChannel.KeySpacePattern(pattern, database); + Assert.Equal(expected, channel.ToString()); + Assert.True(channel.IsMultiNode); + Assert.False(channel.IsKeyRouted); + Assert.False(channel.IsSharded); + Assert.True(channel.IsPattern); + } + + [Theory] + [InlineData(KeyNotificationType.Set, null, "__keyevent@*__:set", true)] + [InlineData(KeyNotificationType.XGroupCreate, null, "__keyevent@*__:xgroup-create", true)] + [InlineData(KeyNotificationType.Set, 42, "__keyevent@42__:set", false)] + [InlineData(KeyNotificationType.XGroupCreate, 42, "__keyevent@42__:xgroup-create", false)] + public void CreateKeyEventNotification(KeyNotificationType type, int? database, string expected, bool isPattern) + { + var channel = RedisChannel.KeyEvent(type, database); + Assert.Equal(expected, channel.ToString()); + Assert.True(channel.IsMultiNode); + Assert.False(channel.IsKeyRouted); + Assert.False(channel.IsSharded); + if (isPattern) + { + Assert.True(channel.IsPattern); + } + else + { + Assert.False(channel.IsPattern); + } + } + + [Fact] + public void Cannot_KeyRoute_KeySpace_SingleKeyIsKeyRouted() + { + var channel = RedisChannel.KeySpace("abc", 42); + Assert.False(channel.IsMultiNode); + Assert.True(channel.IsKeyRouted); + Assert.True(channel.WithKeyRouting().IsKeyRouted); // no change, still key-routed + Assert.Equal(RedisCommand.PUBLISH, channel.GetPublishCommand()); + } + + [Fact] + public void Cannot_KeyRoute_KeySpacePattern() + { + var channel = RedisChannel.KeySpacePattern("abc", 42); + Assert.True(channel.IsMultiNode); + Assert.False(channel.IsKeyRouted); + Assert.StartsWith("Key routing is not supported for multi-node channels", Assert.Throws(() => channel.WithKeyRouting()).Message); + Assert.StartsWith("Publishing is not supported for multi-node channels", Assert.Throws(() => channel.GetPublishCommand()).Message); + } + + [Fact] + public void Cannot_KeyRoute_KeyEvent() + { + var channel = RedisChannel.KeyEvent(KeyNotificationType.Set, 42); + Assert.True(channel.IsMultiNode); + Assert.False(channel.IsKeyRouted); + Assert.StartsWith("Key routing is not supported for multi-node channels", Assert.Throws(() => channel.WithKeyRouting()).Message); + Assert.StartsWith("Publishing is not supported for multi-node channels", Assert.Throws(() => channel.GetPublishCommand()).Message); + } + + [Fact] + public void Cannot_KeyRoute_KeyEvent_Custom() + { + var channel = RedisChannel.KeyEvent("foo"u8, 42); + Assert.True(channel.IsMultiNode); + Assert.False(channel.IsKeyRouted); + Assert.StartsWith("Key routing is not supported for multi-node channels", Assert.Throws(() => channel.WithKeyRouting()).Message); + Assert.StartsWith("Publishing is not supported for multi-node channels", Assert.Throws(() => channel.GetPublishCommand()).Message); + } + + [Fact] + public void KeyEventPrefix_KeySpacePrefix_Length_Matches() + { + // this is a sanity check for the parsing step in KeyNotification.TryParse + Assert.Equal(KeyNotificationChannels.KeySpacePrefix.Length, KeyNotificationChannels.KeyEventPrefix.Length); + } +} diff --git a/tests/StackExchange.Redis.Tests/PubSubKeyNotificationTests.cs b/tests/StackExchange.Redis.Tests/PubSubKeyNotificationTests.cs new file mode 100644 index 000000000..d99c687dc --- /dev/null +++ b/tests/StackExchange.Redis.Tests/PubSubKeyNotificationTests.cs @@ -0,0 +1,162 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace StackExchange.Redis.Tests; + +public sealed class PubSubKeyNotificationTestsCluster(ITestOutputHelper output, SharedConnectionFixture fixture) + : PubSubKeyNotificationTests(output, fixture) +{ + protected override string GetConfiguration() => TestConfig.Current.ClusterServersAndPorts; +} + +public sealed class PubSubKeyNotificationTestsStandalone(ITestOutputHelper output, SharedConnectionFixture fixture) + : PubSubKeyNotificationTests(output, fixture) +{ +} + +public abstract class PubSubKeyNotificationTests(ITestOutputHelper output, SharedConnectionFixture? fixture = null) + : TestBase(output, fixture) +{ + private const int DefaultKeyCount = 10; + private const int DefaultEventCount = 512; + + private RedisKey[] InventKeys(int count = DefaultKeyCount) + { + RedisKey[] keys = new RedisKey[count]; + for (int i = 0; i < count; i++) + { + keys[i] = Guid.NewGuid().ToString(); + } + return keys; + } + + private RedisKey SelectKey(RedisKey[] keys) => keys[SharedRandom.Next(0, keys.Length)]; + +#if NET6_0_OR_GREATER + private static Random SharedRandom => Random.Shared; +#else + private static Random SharedRandom { get; } = new(); +#endif + + [Fact] + public async Task KeySpace_Events_Enabled() + { + // see https://redis.io/docs/latest/develop/pubsub/keyspace-notifications/#configuration + await using var conn = Create(allowAdmin: true); + int failures = 0; + foreach (var ep in conn.GetEndPoints()) + { + var server = conn.GetServer(ep); + var config = (await server.ConfigGetAsync("notify-keyspace-events")).Single(); + Log($"[{Format.ToString(ep)}] notify-keyspace-events: '{config.Value}'"); + + // this is a very broad config, but it's what we use in CI (and probably a common basic config) + if (config.Value != "AKE") + { + failures++; + } + } + // for details, check the log output + Assert.Equal(0, failures); + } + + [Fact] + public async Task KeySpace_CanSubscribe_ManualPublish() + { + await using var conn = Create(); + var db = conn.GetDatabase(); + + var channel = RedisChannel.KeyEvent("nonesuch"u8, database: null); + var sub = conn.GetSubscriber(); + await sub.UnsubscribeAsync(channel); + + int count = 0; + await sub.SubscribeAsync(channel, (_, _) => Interlocked.Increment(ref count)); + + // to publish, we need to remove the marker that this is a multi-node channel + var asLiteral = RedisChannel.Literal(channel.ToString()); + await sub.PublishAsync(asLiteral, Guid.NewGuid().ToString()); + + int expected = GetConnectedCount(conn, channel); + await Task.Delay(100).ForAwait(); + Assert.Equal(expected, count); + } + + // this looks past the horizon to see how many connections we actually have for a given channel, + // which could be more than 1 in a cluster scenario + private static int GetConnectedCount(IConnectionMultiplexer muxer, in RedisChannel channel) + => muxer is ConnectionMultiplexer typed && typed.TryGetSubscription(channel, out var sub) + ? sub.GetConnectionCount() : 1; + + [Fact] + public async Task KeyEvent_CanObserveSimple_ViaCallbackHandler() + { + await using var conn = Create(); + var db = conn.GetDatabase(); + + var keys = InventKeys(); + var channel = RedisChannel.KeyEvent(KeyNotificationType.SAdd); + var sub = conn.GetSubscriber(); + await sub.UnsubscribeAsync(channel); + HashSet observedKeys = []; + int count = 0, callbackCount = 0; + TaskCompletionSource allDone = new(); + await sub.SubscribeAsync(channel, (recvChannel, recvValue) => + { + Interlocked.Increment(ref callbackCount); + if (KeyNotification.TryParse(in recvChannel, in recvValue, out var notification) + && notification is { IsKeyEvent: true, Type: KeyNotificationType.SAdd }) + { + var recvKey = notification.GetKey(); + lock (observedKeys) + { + int currentCount = ++count; + var newKey = observedKeys.Add(recvKey); + if (newKey) + { + Log($"Observed key: '{recvKey}' after {currentCount} events"); + } + + if (currentCount == DefaultEventCount) + { + allDone.TrySetResult(true); + } + } + } + }); + + await Task.Delay(300).ForAwait(); // give it a moment to settle + + HashSet sentKeys = new(keys.Length); + for (int i = 0; i < DefaultEventCount; i++) + { + var key = SelectKey(keys); + await db.SetAddAsync(key, i); + sentKeys.Add(key); // just in case Random has a bad day (obvious Dilbert link is obvious) + } + + // Wait for all events to be observed + try + { + Assert.True(await allDone.Task.WithTimeout(5000)); + } + catch (TimeoutException ex) + { + // if this is zero, the real problem is probably ala KeySpace_Events_Enabled + throw new TimeoutException($"Timeout; {Volatile.Read(ref callbackCount)} events observed", ex); + } + + lock (observedKeys) + { + Assert.Equal(sentKeys.Count, observedKeys.Count); + foreach (var key in sentKeys) + { + Assert.Contains(key, observedKeys); + } + } + } +} diff --git a/tests/StackExchange.Redis.Tests/PubSubMultiserverTests.cs b/tests/StackExchange.Redis.Tests/PubSubMultiserverTests.cs index 43bb4b2b8..691232218 100644 --- a/tests/StackExchange.Redis.Tests/PubSubMultiserverTests.cs +++ b/tests/StackExchange.Redis.Tests/PubSubMultiserverTests.cs @@ -63,7 +63,7 @@ await sub.SubscribeAsync(channel, (_, val) => Assert.True(subscribedServerEndpoint.IsSubscriberConnected, "subscribedServerEndpoint.IsSubscriberConnected"); Assert.True(conn.GetSubscriptions().TryGetValue(channel, out var subscription)); - var initialServer = subscription.GetCurrentServer(); + var initialServer = subscription.GetAnyCurrentServer(); Assert.NotNull(initialServer); Assert.True(initialServer.IsConnected); Log("Connected to: " + initialServer); @@ -83,10 +83,10 @@ await sub.SubscribeAsync(channel, (_, val) => Assert.True(subscribedServerEndpoint.IsConnected, "subscribedServerEndpoint.IsConnected"); Assert.False(subscribedServerEndpoint.IsSubscriberConnected, "subscribedServerEndpoint.IsSubscriberConnected"); } - await UntilConditionAsync(TimeSpan.FromSeconds(5), () => subscription.IsConnected); - Assert.True(subscription.IsConnected); + await UntilConditionAsync(TimeSpan.FromSeconds(5), () => subscription.IsConnectedAny()); + Assert.True(subscription.IsConnectedAny()); - var newServer = subscription.GetCurrentServer(); + var newServer = subscription.GetAnyCurrentServer(); Assert.NotNull(newServer); Assert.NotEqual(newServer, initialServer); Log("Now connected to: " + newServer); @@ -148,7 +148,7 @@ await sub.SubscribeAsync( Assert.True(subscribedServerEndpoint.IsSubscriberConnected, "subscribedServerEndpoint.IsSubscriberConnected"); Assert.True(conn.GetSubscriptions().TryGetValue(channel, out var subscription)); - var initialServer = subscription.GetCurrentServer(); + var initialServer = subscription.GetAnyCurrentServer(); Assert.NotNull(initialServer); Assert.True(initialServer.IsConnected); Log("Connected to: " + initialServer); @@ -169,10 +169,10 @@ await sub.SubscribeAsync( if (expectSuccess) { - await UntilConditionAsync(TimeSpan.FromSeconds(5), () => subscription.IsConnected); - Assert.True(subscription.IsConnected); + await UntilConditionAsync(TimeSpan.FromSeconds(5), () => subscription.IsConnectedAny()); + Assert.True(subscription.IsConnectedAny()); - var newServer = subscription.GetCurrentServer(); + var newServer = subscription.GetAnyCurrentServer(); Assert.NotNull(newServer); Assert.NotEqual(newServer, initialServer); Log("Now connected to: " + newServer); @@ -180,16 +180,16 @@ await sub.SubscribeAsync( else { // This subscription shouldn't be able to reconnect by flags (demanding an unavailable server) - await UntilConditionAsync(TimeSpan.FromSeconds(5), () => subscription.IsConnected); - Assert.False(subscription.IsConnected); + await UntilConditionAsync(TimeSpan.FromSeconds(5), () => subscription.IsConnectedAny()); + Assert.False(subscription.IsConnectedAny()); Log("Unable to reconnect (as expected)"); // Allow connecting back to the original conn.AllowConnect = true; - await UntilConditionAsync(TimeSpan.FromSeconds(5), () => subscription.IsConnected); - Assert.True(subscription.IsConnected); + await UntilConditionAsync(TimeSpan.FromSeconds(5), () => subscription.IsConnectedAny()); + Assert.True(subscription.IsConnectedAny()); - var newServer = subscription.GetCurrentServer(); + var newServer = subscription.GetAnyCurrentServer(); Assert.NotNull(newServer); Assert.Equal(newServer, initialServer); Log("Now connected to: " + newServer);