diff --git a/StackExchange.Redis.sln.DotSettings b/StackExchange.Redis.sln.DotSettings
index 216edbcca..8dd9095d9 100644
--- a/StackExchange.Redis.sln.DotSettings
+++ b/StackExchange.Redis.sln.DotSettings
@@ -12,9 +12,12 @@
True
True
True
+ True
True
True
+ True
True
+ True
True
True
True
diff --git a/src/StackExchange.Redis/ChannelMessage.cs b/src/StackExchange.Redis/ChannelMessage.cs
new file mode 100644
index 000000000..330aedee4
--- /dev/null
+++ b/src/StackExchange.Redis/ChannelMessage.cs
@@ -0,0 +1,64 @@
+namespace StackExchange.Redis;
+
+///
+/// Represents a message that is broadcast via publish/subscribe.
+///
+public readonly struct ChannelMessage
+{
+ // this is *smaller* than storing a RedisChannel for the subscribed channel
+ private readonly ChannelMessageQueue _queue;
+
+ ///
+ /// The Channel:Message string representation.
+ ///
+ public override string ToString() => ((string?)Channel) + ":" + ((string?)Message);
+
+ ///
+ public override int GetHashCode() => Channel.GetHashCode() ^ Message.GetHashCode();
+
+ ///
+ public override bool Equals(object? obj) => obj is ChannelMessage cm
+ && cm.Channel == Channel && cm.Message == Message;
+
+ internal ChannelMessage(ChannelMessageQueue queue, in RedisChannel channel, in RedisValue value)
+ {
+ _queue = queue;
+ _channel = channel;
+ _message = value;
+ }
+
+ ///
+ /// The channel that the subscription was created from.
+ ///
+ public RedisChannel SubscriptionChannel => _queue.Channel;
+
+ private readonly RedisChannel _channel;
+
+ ///
+ /// The channel that the message was broadcast to.
+ ///
+ public RedisChannel Channel => _channel;
+
+ private readonly RedisValue _message;
+
+ ///
+ /// The value that was broadcast.
+ ///
+ public RedisValue Message => _message;
+
+ ///
+ /// Checks if 2 messages are .Equal().
+ ///
+ public static bool operator ==(ChannelMessage left, ChannelMessage right) => left.Equals(right);
+
+ ///
+ /// Checks if 2 messages are not .Equal().
+ ///
+ public static bool operator !=(ChannelMessage left, ChannelMessage right) => !left.Equals(right);
+
+ ///
+ /// If the channel is either a keyspace or keyevent notification, resolve the key and event type.
+ ///
+ public bool TryParseKeyNotification(out KeyNotification notification)
+ => KeyNotification.TryParse(in _channel, in _message, out notification);
+}
diff --git a/src/StackExchange.Redis/ChannelMessageQueue.cs b/src/StackExchange.Redis/ChannelMessageQueue.cs
index e58fb393b..9f962e52a 100644
--- a/src/StackExchange.Redis/ChannelMessageQueue.cs
+++ b/src/StackExchange.Redis/ChannelMessageQueue.cs
@@ -1,385 +1,353 @@
using System;
+using System.Buffers.Text;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
#if NETCOREAPP3_1
+using System.Diagnostics;
using System.Reflection;
#endif
-namespace StackExchange.Redis
+namespace StackExchange.Redis;
+
+///
+/// Represents a message queue of ordered pub/sub notifications.
+///
+///
+/// To create a ChannelMessageQueue, use
+/// or .
+///
+public sealed class ChannelMessageQueue : IAsyncEnumerable
{
+ private readonly Channel _queue;
+
///
- /// Represents a message that is broadcast via publish/subscribe.
+ /// The Channel that was subscribed for this queue.
///
- public readonly struct ChannelMessage
- {
- // this is *smaller* than storing a RedisChannel for the subscribed channel
- private readonly ChannelMessageQueue _queue;
+ public RedisChannel Channel { get; }
- ///
- /// The Channel:Message string representation.
- ///
- public override string ToString() => ((string?)Channel) + ":" + ((string?)Message);
+ private RedisSubscriber? _parent;
- ///
- public override int GetHashCode() => Channel.GetHashCode() ^ Message.GetHashCode();
-
- ///
- public override bool Equals(object? obj) => obj is ChannelMessage cm
- && cm.Channel == Channel && cm.Message == Message;
+ ///
+ /// The string representation of this channel.
+ ///
+ public override string? ToString() => (string?)Channel;
- internal ChannelMessage(ChannelMessageQueue queue, in RedisChannel channel, in RedisValue value)
- {
- _queue = queue;
- Channel = channel;
- Message = value;
- }
+ ///
+ /// An awaitable task the indicates completion of the queue (including drain of data).
+ ///
+ public Task Completion => _queue.Reader.Completion;
- ///
- /// The channel that the subscription was created from.
- ///
- public RedisChannel SubscriptionChannel => _queue.Channel;
-
- ///
- /// The channel that the message was broadcast to.
- ///
- public RedisChannel Channel { get; }
-
- ///
- /// The value that was broadcast.
- ///
- public RedisValue Message { get; }
-
- ///
- /// Checks if 2 messages are .Equal().
- ///
- public static bool operator ==(ChannelMessage left, ChannelMessage right) => left.Equals(right);
-
- ///
- /// Checks if 2 messages are not .Equal().
- ///
- public static bool operator !=(ChannelMessage left, ChannelMessage right) => !left.Equals(right);
+ internal ChannelMessageQueue(in RedisChannel redisChannel, RedisSubscriber parent)
+ {
+ Channel = redisChannel;
+ _parent = parent;
+ _queue = System.Threading.Channels.Channel.CreateUnbounded(s_ChannelOptions);
}
- ///
- /// Represents a message queue of ordered pub/sub notifications.
- ///
- ///
- /// To create a ChannelMessageQueue, use
- /// or .
- ///
- public sealed class ChannelMessageQueue : IAsyncEnumerable
+ private static readonly UnboundedChannelOptions s_ChannelOptions = new UnboundedChannelOptions
{
- private readonly Channel _queue;
+ SingleWriter = true, SingleReader = false, AllowSynchronousContinuations = false,
+ };
- ///
- /// The Channel that was subscribed for this queue.
- ///
- public RedisChannel Channel { get; }
- private RedisSubscriber? _parent;
+ private void Write(in RedisChannel channel, in RedisValue value)
+ {
+ var writer = _queue.Writer;
+ writer.TryWrite(new ChannelMessage(this, channel, value));
+ }
- ///
- /// The string representation of this channel.
- ///
- public override string? ToString() => (string?)Channel;
+ ///
+ /// Consume a message from the channel.
+ ///
+ /// The to use.
+ public ValueTask ReadAsync(CancellationToken cancellationToken = default)
+ => _queue.Reader.ReadAsync(cancellationToken);
- ///
- /// An awaitable task the indicates completion of the queue (including drain of data).
- ///
- public Task Completion => _queue.Reader.Completion;
+ ///
+ /// Attempt to synchronously consume a message from the channel.
+ ///
+ /// The read from the Channel.
+ public bool TryRead(out ChannelMessage item) => _queue.Reader.TryRead(out item);
- internal ChannelMessageQueue(in RedisChannel redisChannel, RedisSubscriber parent)
+ ///
+ /// Attempt to query the backlog length of the queue.
+ ///
+ /// The (approximate) count of items in the Channel.
+ public bool TryGetCount(out int count)
+ {
+ // This is specific to netcoreapp3.1, because full framework was out of band and the new prop is present
+#if NETCOREAPP3_1
+ // get this using the reflection
+ try
{
- Channel = redisChannel;
- _parent = parent;
- _queue = System.Threading.Channels.Channel.CreateUnbounded(s_ChannelOptions);
+ var prop =
+ _queue.GetType().GetProperty("ItemsCountForDebugger", BindingFlags.Instance | BindingFlags.NonPublic);
+ if (prop is not null)
+ {
+ count = (int)prop.GetValue(_queue)!;
+ return true;
+ }
}
-
- private static readonly UnboundedChannelOptions s_ChannelOptions = new UnboundedChannelOptions
+ catch (Exception ex)
{
- SingleWriter = true,
- SingleReader = false,
- AllowSynchronousContinuations = false,
- };
-
- private void Write(in RedisChannel channel, in RedisValue value)
+ Debug.WriteLine(ex.Message); // but ignore
+ }
+#else
+ var reader = _queue.Reader;
+ if (reader.CanCount)
{
- var writer = _queue.Writer;
- writer.TryWrite(new ChannelMessage(this, channel, value));
+ count = reader.Count;
+ return true;
}
+#endif
- ///
- /// Consume a message from the channel.
- ///
- /// The to use.
- public ValueTask ReadAsync(CancellationToken cancellationToken = default)
- => _queue.Reader.ReadAsync(cancellationToken);
-
- ///
- /// Attempt to synchronously consume a message from the channel.
- ///
- /// The read from the Channel.
- public bool TryRead(out ChannelMessage item) => _queue.Reader.TryRead(out item);
-
- ///
- /// Attempt to query the backlog length of the queue.
- ///
- /// The (approximate) count of items in the Channel.
- public bool TryGetCount(out int count)
+ count = 0;
+ return false;
+ }
+
+ private Delegate? _onMessageHandler;
+
+ private void AssertOnMessage(Delegate handler)
+ {
+ if (handler == null) throw new ArgumentNullException(nameof(handler));
+ if (Interlocked.CompareExchange(ref _onMessageHandler, handler, null) != null)
+ throw new InvalidOperationException("Only a single " + nameof(OnMessage) + " is allowed");
+ }
+
+ ///
+ /// Create a message loop that processes messages sequentially.
+ ///
+ /// The handler to run when receiving a message.
+ public void OnMessage(Action handler)
+ {
+ AssertOnMessage(handler);
+
+ ThreadPool.QueueUserWorkItem(
+ state => ((ChannelMessageQueue)state!).OnMessageSyncImpl().RedisFireAndForget(), this);
+ }
+
+ private async Task OnMessageSyncImpl()
+ {
+ var handler = (Action?)_onMessageHandler;
+ while (!Completion.IsCompleted)
{
- // This is specific to netcoreapp3.1, because full framework was out of band and the new prop is present
-#if NETCOREAPP3_1
- // get this using the reflection
+ ChannelMessage next;
try
{
- var prop = _queue.GetType().GetProperty("ItemsCountForDebugger", BindingFlags.Instance | BindingFlags.NonPublic);
- if (prop is not null)
- {
- count = (int)prop.GetValue(_queue)!;
- return true;
- }
+ if (!TryRead(out next)) next = await ReadAsync().ForAwait();
}
- catch { }
-#else
- var reader = _queue.Reader;
- if (reader.CanCount)
+ catch (ChannelClosedException) { break; } // expected
+ catch (Exception ex)
{
- count = reader.Count;
- return true;
+ _parent?.multiplexer?.OnInternalError(ex);
+ break;
}
-#endif
- count = default;
- return false;
+ try { handler?.Invoke(next); }
+ catch { } // matches MessageCompletable
}
+ }
- private Delegate? _onMessageHandler;
- private void AssertOnMessage(Delegate handler)
+ internal static void Combine(ref ChannelMessageQueue? head, ChannelMessageQueue queue)
+ {
+ if (queue != null)
{
- if (handler == null) throw new ArgumentNullException(nameof(handler));
- if (Interlocked.CompareExchange(ref _onMessageHandler, handler, null) != null)
- throw new InvalidOperationException("Only a single " + nameof(OnMessage) + " is allowed");
+ // insert at the start of the linked-list
+ ChannelMessageQueue? old;
+ do
+ {
+ old = Volatile.Read(ref head);
+ queue._next = old;
+ }
+ // format and validator disagree on newline...
+ while (Interlocked.CompareExchange(ref head, queue, old) != old);
}
+ }
- ///
- /// Create a message loop that processes messages sequentially.
- ///
- /// The handler to run when receiving a message.
- public void OnMessage(Action handler)
- {
- AssertOnMessage(handler);
+ ///
+ /// Create a message loop that processes messages sequentially.
+ ///
+ /// The handler to execute when receiving a message.
+ public void OnMessage(Func handler)
+ {
+ AssertOnMessage(handler);
- ThreadPool.QueueUserWorkItem(
- state => ((ChannelMessageQueue)state!).OnMessageSyncImpl().RedisFireAndForget(), this);
- }
+ ThreadPool.QueueUserWorkItem(
+ state => ((ChannelMessageQueue)state!).OnMessageAsyncImpl().RedisFireAndForget(), this);
+ }
- private async Task OnMessageSyncImpl()
+ internal static void Remove(ref ChannelMessageQueue? head, ChannelMessageQueue queue)
+ {
+ if (queue is null)
{
- var handler = (Action?)_onMessageHandler;
- while (!Completion.IsCompleted)
- {
- ChannelMessage next;
- try { if (!TryRead(out next)) next = await ReadAsync().ForAwait(); }
- catch (ChannelClosedException) { break; } // expected
- catch (Exception ex)
- {
- _parent?.multiplexer?.OnInternalError(ex);
- break;
- }
-
- try { handler?.Invoke(next); }
- catch { } // matches MessageCompletable
- }
+ return;
}
- internal static void Combine(ref ChannelMessageQueue? head, ChannelMessageQueue queue)
+ bool found;
+ // if we fail due to a conflict, re-do from start
+ do
{
- if (queue != null)
+ var current = Volatile.Read(ref head);
+ if (current == null) return; // no queue? nothing to do
+ if (current == queue)
{
- // insert at the start of the linked-list
- ChannelMessageQueue? old;
- do
+ found = true;
+ // found at the head - then we need to change the head
+ if (Interlocked.CompareExchange(ref head, Volatile.Read(ref current._next), current) == current)
{
- old = Volatile.Read(ref head);
- queue._next = old;
+ return; // success
}
- while (Interlocked.CompareExchange(ref head, queue, old) != old);
}
- }
-
- ///
- /// Create a message loop that processes messages sequentially.
- ///
- /// The handler to execute when receiving a message.
- public void OnMessage(Func handler)
- {
- AssertOnMessage(handler);
-
- ThreadPool.QueueUserWorkItem(
- state => ((ChannelMessageQueue)state!).OnMessageAsyncImpl().RedisFireAndForget(), this);
- }
-
- internal static void Remove(ref ChannelMessageQueue? head, ChannelMessageQueue queue)
- {
- if (queue is null)
+ else
{
- return;
- }
-
- bool found;
- // if we fail due to a conflict, re-do from start
- do
- {
- var current = Volatile.Read(ref head);
- if (current == null) return; // no queue? nothing to do
- if (current == queue)
- {
- found = true;
- // found at the head - then we need to change the head
- if (Interlocked.CompareExchange(ref head, Volatile.Read(ref current._next), current) == current)
- {
- return; // success
- }
- }
- else
+ ChannelMessageQueue? previous = current;
+ current = Volatile.Read(ref previous._next);
+ found = false;
+ do
{
- ChannelMessageQueue? previous = current;
- current = Volatile.Read(ref previous._next);
- found = false;
- do
+ if (current == queue)
{
- if (current == queue)
+ found = true;
+ // found it, not at the head; remove the node
+ if (Interlocked.CompareExchange(
+ ref previous._next,
+ Volatile.Read(ref current._next),
+ current) == current)
{
- found = true;
- // found it, not at the head; remove the node
- if (Interlocked.CompareExchange(ref previous._next, Volatile.Read(ref current._next), current) == current)
- {
- return; // success
- }
- else
- {
- break; // exit the inner loop, and repeat the outer loop
- }
+ return; // success
+ }
+ else
+ {
+ break; // exit the inner loop, and repeat the outer loop
}
- previous = current;
- current = Volatile.Read(ref previous!._next);
}
- while (current != null);
+
+ previous = current;
+ current = Volatile.Read(ref previous!._next);
}
+ // format and validator disagree on newline...
+ while (current != null);
}
- while (found);
}
+ // format and validator disagree on newline...
+ while (found);
+ }
- internal static int Count(ref ChannelMessageQueue? head)
+ internal static int Count(ref ChannelMessageQueue? head)
+ {
+ var current = Volatile.Read(ref head);
+ int count = 0;
+ while (current != null)
{
- var current = Volatile.Read(ref head);
- int count = 0;
- while (current != null)
- {
- count++;
- current = Volatile.Read(ref current._next);
- }
- return count;
+ count++;
+ current = Volatile.Read(ref current._next);
}
- internal static void WriteAll(ref ChannelMessageQueue head, in RedisChannel channel, in RedisValue message)
+ return count;
+ }
+
+ internal static void WriteAll(ref ChannelMessageQueue head, in RedisChannel channel, in RedisValue message)
+ {
+ var current = Volatile.Read(ref head);
+ while (current != null)
{
- var current = Volatile.Read(ref head);
- while (current != null)
- {
- current.Write(channel, message);
- current = Volatile.Read(ref current._next);
- }
+ current.Write(channel, message);
+ current = Volatile.Read(ref current._next);
}
+ }
- private ChannelMessageQueue? _next;
+ private ChannelMessageQueue? _next;
- private async Task OnMessageAsyncImpl()
+ private async Task OnMessageAsyncImpl()
+ {
+ var handler = (Func?)_onMessageHandler;
+ while (!Completion.IsCompleted)
{
- var handler = (Func?)_onMessageHandler;
- while (!Completion.IsCompleted)
+ ChannelMessage next;
+ try
{
- ChannelMessage next;
- try { if (!TryRead(out next)) next = await ReadAsync().ForAwait(); }
- catch (ChannelClosedException) { break; } // expected
- catch (Exception ex)
- {
- _parent?.multiplexer?.OnInternalError(ex);
- break;
- }
-
- try
- {
- var task = handler?.Invoke(next);
- if (task != null && task.Status != TaskStatus.RanToCompletion) await task.ForAwait();
- }
- catch { } // matches MessageCompletable
+ if (!TryRead(out next)) next = await ReadAsync().ForAwait();
+ }
+ catch (ChannelClosedException) { break; } // expected
+ catch (Exception ex)
+ {
+ _parent?.multiplexer?.OnInternalError(ex);
+ break;
}
- }
- internal static void MarkAllCompleted(ref ChannelMessageQueue? head)
- {
- var current = Interlocked.Exchange(ref head, null);
- while (current != null)
+ try
{
- current.MarkCompleted();
- current = Volatile.Read(ref current._next);
+ var task = handler?.Invoke(next);
+ if (task != null && task.Status != TaskStatus.RanToCompletion) await task.ForAwait();
}
+ catch { } // matches MessageCompletable
}
+ }
- private void MarkCompleted(Exception? error = null)
+ internal static void MarkAllCompleted(ref ChannelMessageQueue? head)
+ {
+ var current = Interlocked.Exchange(ref head, null);
+ while (current != null)
{
- _parent = null;
- _queue.Writer.TryComplete(error);
+ current.MarkCompleted();
+ current = Volatile.Read(ref current._next);
}
+ }
- internal void UnsubscribeImpl(Exception? error = null, CommandFlags flags = CommandFlags.None)
- {
- var parent = _parent;
- _parent = null;
- parent?.UnsubscribeAsync(Channel, null, this, flags);
- _queue.Writer.TryComplete(error);
- }
+ private void MarkCompleted(Exception? error = null)
+ {
+ _parent = null;
+ _queue.Writer.TryComplete(error);
+ }
- internal async Task UnsubscribeAsyncImpl(Exception? error = null, CommandFlags flags = CommandFlags.None)
+ internal void UnsubscribeImpl(Exception? error = null, CommandFlags flags = CommandFlags.None)
+ {
+ var parent = _parent;
+ _parent = null;
+ parent?.UnsubscribeAsync(Channel, null, this, flags);
+ _queue.Writer.TryComplete(error);
+ }
+
+ internal async Task UnsubscribeAsyncImpl(Exception? error = null, CommandFlags flags = CommandFlags.None)
+ {
+ var parent = _parent;
+ _parent = null;
+ if (parent != null)
{
- var parent = _parent;
- _parent = null;
- if (parent != null)
- {
- await parent.UnsubscribeAsync(Channel, null, this, flags).ForAwait();
- }
- _queue.Writer.TryComplete(error);
+ await parent.UnsubscribeAsync(Channel, null, this, flags).ForAwait();
}
- ///
- /// Stop receiving messages on this channel.
- ///
- /// The flags to use when unsubscribing.
- public void Unsubscribe(CommandFlags flags = CommandFlags.None) => UnsubscribeImpl(null, flags);
+ _queue.Writer.TryComplete(error);
+ }
- ///
- /// Stop receiving messages on this channel.
- ///
- /// The flags to use when unsubscribing.
- public Task UnsubscribeAsync(CommandFlags flags = CommandFlags.None) => UnsubscribeAsyncImpl(null, flags);
+ ///
+ /// Stop receiving messages on this channel.
+ ///
+ /// The flags to use when unsubscribing.
+ public void Unsubscribe(CommandFlags flags = CommandFlags.None) => UnsubscribeImpl(null, flags);
- ///
+ ///
+ /// Stop receiving messages on this channel.
+ ///
+ /// The flags to use when unsubscribing.
+ public Task UnsubscribeAsync(CommandFlags flags = CommandFlags.None) => UnsubscribeAsyncImpl(null, flags);
+
+ ///
#if NETCOREAPP3_0_OR_GREATER
- public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default)
- => _queue.Reader.ReadAllAsync().GetAsyncEnumerator(cancellationToken);
+ public IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default)
+ // ReSharper disable once MethodSupportsCancellation - provided in GetAsyncEnumerator
+ => _queue.Reader.ReadAllAsync().GetAsyncEnumerator(cancellationToken);
#else
- public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default)
+ public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default)
+ {
+ while (await _queue.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
- while (await _queue.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
+ while (_queue.Reader.TryRead(out var item))
{
- while (_queue.Reader.TryRead(out var item))
- {
- yield return item;
- }
+ yield return item;
}
}
-#endif
}
+#endif
}
diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs
index cc766338a..219ac7cb0 100644
--- a/src/StackExchange.Redis/ConnectionMultiplexer.cs
+++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs
@@ -730,6 +730,7 @@ private static ConnectionMultiplexer ConnectImpl(ConfigurationOptions configurat
ReadOnlySpan IInternalConnectionMultiplexer.GetServerSnapshot() => _serverSnapshot.AsSpan();
internal ReadOnlySpan GetServerSnapshot() => _serverSnapshot.AsSpan();
+ internal ReadOnlyMemory GetServerSnaphotMemory() => _serverSnapshot.AsMemory();
internal sealed class ServerSnapshot : IEnumerable
{
public static ServerSnapshot Empty { get; } = new ServerSnapshot(Array.Empty(), 0);
diff --git a/src/StackExchange.Redis/KeyNotification.cs b/src/StackExchange.Redis/KeyNotification.cs
new file mode 100644
index 000000000..f563019f3
--- /dev/null
+++ b/src/StackExchange.Redis/KeyNotification.cs
@@ -0,0 +1,244 @@
+using System;
+using System.Buffers.Text;
+using System.Diagnostics;
+using static StackExchange.Redis.KeyNotificationChannels;
+namespace StackExchange.Redis;
+
+///
+/// Represents keyspace and keyevent notifications.
+///
+public readonly struct KeyNotification
+{
+ ///
+ /// If the channel is either a keyspace or keyevent notification, parsed the data.
+ ///
+ public static bool TryParse(in RedisChannel channel, in RedisValue value, out KeyNotification notification)
+ {
+ // validate that it looks reasonable
+ var span = channel.Span;
+
+ // KeySpaceStart and KeyEventStart are the same size, see KeyEventPrefix_KeySpacePrefix_Length_Matches
+ if (span.Length >= KeySpacePrefix.Length + MinSuffixBytes)
+ {
+ // check that the prefix is valid, i.e. "__keyspace@" or "__keyevent@"
+ var prefix = span.Slice(0, KeySpacePrefix.Length);
+ var hash = prefix.Hash64();
+ switch (hash)
+ {
+ case KeySpacePrefix.Hash when KeySpacePrefix.Is(hash, prefix):
+ case KeyEventPrefix.Hash when KeyEventPrefix.Is(hash, prefix):
+ // check that there is *something* non-empty after the prefix, with __: as the suffix (we don't verify *what*)
+ if (span.Slice(KeySpacePrefix.Length).IndexOf("__:"u8) > 0)
+ {
+ notification = new KeyNotification(in channel, in value);
+ return true;
+ }
+
+ break;
+ }
+ }
+
+ notification = default;
+ return false;
+ }
+
+ private const int MinSuffixBytes = 5; // need "0__:x" or similar after prefix
+
+ ///
+ /// The channel associated with this notification.
+ ///
+ public RedisChannel Channel => _channel;
+
+ ///
+ /// The payload associated with this notification.
+ ///
+ public RedisValue Value => _value;
+
+ // effectively we just wrap a channel, but: we've pre-validated that things make sense
+ private readonly RedisChannel _channel;
+ private readonly RedisValue _value;
+
+ internal KeyNotification(in RedisChannel channel, in RedisValue value)
+ {
+ _channel = channel;
+ _value = value;
+ }
+
+ ///
+ /// The database the key is in. If the database cannot be parsed, -1 is returned.
+ ///
+ public int Database
+ {
+ get
+ {
+ // prevalidated format, so we can just skip past the prefix (except for the default value)
+ if (_channel.IsNull) return -1;
+ var span = _channel.Span.Slice(KeySpacePrefix.Length); // also works for KeyEventPrefix
+ var end = span.IndexOf((byte)'_'); // expecting "__:foo" - we'll just stop at the underscore
+ if (end <= 0) return -1;
+
+ span = span.Slice(0, end);
+ return Utf8Parser.TryParse(span, out int database, out var bytes)
+ && bytes == end ? database : -1;
+ }
+ }
+
+ ///
+ /// The key associated with this event.
+ ///
+ /// Note that this will allocate a copy of the key bytes; to avoid allocations,
+ /// the and APIs can be used.
+ public RedisKey GetKey()
+ {
+ if (IsKeySpace)
+ {
+ // then the channel contains the key, and the payload contains the event-type
+ return ChannelSuffix.ToArray(); // create an isolated copy
+ }
+
+ if (IsKeyEvent)
+ {
+ // then the channel contains the event-type, and the payload contains the key
+ return (byte[]?)Value; // todo: this could probably side-step
+ }
+
+ return RedisKey.Null;
+ }
+
+ ///
+ /// Get the number of bytes in the key.
+ ///
+ public int KeyByteCount
+ {
+ get
+ {
+ if (IsKeySpace)
+ {
+ return ChannelSuffix.Length;
+ }
+
+ if (IsKeyEvent)
+ {
+ return _value.GetByteCount();
+ }
+
+ return 0;
+ }
+ }
+
+ ///
+ /// Attempt to copy the bytes from the key to a buffer, returning the number of bytes written.
+ ///
+ public bool TryCopyKey(Span destination, out int bytesWritten)
+ {
+ if (IsKeySpace)
+ {
+ var suffix = ChannelSuffix;
+ bytesWritten = suffix.Length; // assume success
+ if (bytesWritten <= destination.Length)
+ {
+ suffix.CopyTo(destination);
+ return true;
+ }
+ }
+
+ if (IsKeyEvent)
+ {
+ bytesWritten = _value.GetByteCount();
+ if (bytesWritten <= destination.Length)
+ {
+ var tmp = _value.CopyTo(destination);
+ Debug.Assert(tmp == bytesWritten);
+ return true;
+ }
+ }
+
+ bytesWritten = 0;
+ return false;
+ }
+
+ ///
+ /// Get the portion of the channel after the "__{keyspace|keyevent}@{db}__:".
+ ///
+ private ReadOnlySpan ChannelSuffix
+ {
+ get
+ {
+ var span = _channel.Span;
+ var index = span.IndexOf("__:"u8);
+ return index > 0 ? span.Slice(index + 3) : default;
+ }
+ }
+
+ ///
+ /// The type of notification associated with this event, if it is well-known - otherwise .
+ ///
+ /// Unexpected values can be processed manually from the and .
+ public KeyNotificationType Type
+ {
+ get
+ {
+ if (IsKeySpace)
+ {
+ // then the channel contains the key, and the payload contains the event-type
+ var count = _value.GetByteCount();
+ if (count >= KeyNotificationTypeFastHash.MinBytes & count <= KeyNotificationTypeFastHash.MaxBytes)
+ {
+ if (_value.TryGetSpan(out var direct))
+ {
+ return KeyNotificationTypeFastHash.Parse(direct);
+ }
+ else
+ {
+ Span localCopy = stackalloc byte[KeyNotificationTypeFastHash.MaxBytes];
+ return KeyNotificationTypeFastHash.Parse(localCopy.Slice(0, _value.CopyTo(localCopy)));
+ }
+ }
+ }
+
+ if (IsKeyEvent)
+ {
+ // then the channel contains the event-type, and the payload contains the key
+ return KeyNotificationTypeFastHash.Parse(ChannelSuffix);
+ }
+ return KeyNotificationType.Unknown;
+ }
+ }
+
+ ///
+ /// Indicates whether this notification originated from a keyspace notification, for example __keyspace@4__:mykey with payload set.
+ ///
+ public bool IsKeySpace
+ {
+ get
+ {
+ var span = _channel.Span;
+ return span.Length >= KeySpacePrefix.Length + MinSuffixBytes && KeySpacePrefix.Is(span.Hash64(), span.Slice(0, KeySpacePrefix.Length));
+ }
+ }
+
+ ///
+ /// Indicates whether this notification originated from a keyevent notification, for example __keyevent@4__:set with payload mykey.
+ ///
+ public bool IsKeyEvent
+ {
+ get
+ {
+ var span = _channel.Span;
+ return span.Length >= KeyEventPrefix.Length + MinSuffixBytes && KeyEventPrefix.Is(span.Hash64(), span.Slice(0, KeyEventPrefix.Length));
+ }
+ }
+}
+
+internal static partial class KeyNotificationChannels
+{
+ [FastHash("__keyspace@")]
+ internal static partial class KeySpacePrefix
+ {
+ }
+
+ [FastHash("__keyevent@")]
+ internal static partial class KeyEventPrefix
+ {
+ }
+}
diff --git a/src/StackExchange.Redis/KeyNotificationType.cs b/src/StackExchange.Redis/KeyNotificationType.cs
new file mode 100644
index 000000000..cc4c74ef1
--- /dev/null
+++ b/src/StackExchange.Redis/KeyNotificationType.cs
@@ -0,0 +1,69 @@
+namespace StackExchange.Redis;
+
+///
+/// The type of keyspace or keyevent notification.
+///
+public enum KeyNotificationType
+{
+ // note: initially presented alphabetically, but: new values *must* be appended, not inserted
+ // (to preserve values of existing elements)
+#pragma warning disable CS1591 // docs, redundant
+ Unknown = 0,
+ Append = 1,
+ Copy = 2,
+ Del = 3,
+ Expire = 4,
+ HDel = 5,
+ HExpired = 6,
+ HIncrByFloat = 7,
+ HIncrBy = 8,
+ HPersist = 9,
+ HSet = 10,
+ IncrByFloat = 11,
+ IncrBy = 12,
+ LInsert = 13,
+ LPop = 14,
+ LPush = 15,
+ LRem = 16,
+ LSet = 17,
+ LTrim = 18,
+ MoveFrom = 19,
+ MoveTo = 20,
+ Persist = 21,
+ RenameFrom = 22,
+ RenameTo = 23,
+ Restore = 24,
+ RPop = 25,
+ RPush = 26,
+ SAdd = 27,
+ Set = 28,
+ SetRange = 29,
+ SortStore = 30,
+ SRem = 31,
+ SPop = 32,
+ XAdd = 33,
+ XDel = 34,
+ XGroupCreateConsumer = 35,
+ XGroupCreate = 36,
+ XGroupDelConsumer = 37,
+ XGroupDestroy = 38,
+ XGroupSetId = 39,
+ XSetId = 40,
+ XTrim = 41,
+ ZAdd = 42,
+ ZDiffStore = 43,
+ ZInterStore = 44,
+ ZUnionStore = 45,
+ ZIncr = 46,
+ ZRemByRank = 47,
+ ZRemByScore = 48,
+ ZRem = 49,
+
+ // side-effect notifications
+ Expired = 1000,
+ Evicted = 1001,
+ New = 1002,
+ Overwritten = 1003,
+ TypeChanged = 1004, // type_changed
+#pragma warning restore CS1591 // docs, redundant
+}
diff --git a/src/StackExchange.Redis/KeyNotificationTypeFastHash.cs b/src/StackExchange.Redis/KeyNotificationTypeFastHash.cs
new file mode 100644
index 000000000..bcf08bad2
--- /dev/null
+++ b/src/StackExchange.Redis/KeyNotificationTypeFastHash.cs
@@ -0,0 +1,413 @@
+using System;
+
+namespace StackExchange.Redis;
+
+///
+/// Internal helper type for fast parsing of key notification types, using [FastHash].
+///
+internal static partial class KeyNotificationTypeFastHash
+{
+ // these are checked by KeyNotificationTypeFastHash_MinMaxBytes_ReflectsActualLengths
+ public const int MinBytes = 3, MaxBytes = 21;
+
+ public static KeyNotificationType Parse(ReadOnlySpan value)
+ {
+ var hash = value.Hash64();
+ return hash switch
+ {
+ append.Hash when append.Is(hash, value) => KeyNotificationType.Append,
+ copy.Hash when copy.Is(hash, value) => KeyNotificationType.Copy,
+ del.Hash when del.Is(hash, value) => KeyNotificationType.Del,
+ expire.Hash when expire.Is(hash, value) => KeyNotificationType.Expire,
+ hdel.Hash when hdel.Is(hash, value) => KeyNotificationType.HDel,
+ hexpired.Hash when hexpired.Is(hash, value) => KeyNotificationType.HExpired,
+ hincrbyfloat.Hash when hincrbyfloat.Is(hash, value) => KeyNotificationType.HIncrByFloat,
+ hincrby.Hash when hincrby.Is(hash, value) => KeyNotificationType.HIncrBy,
+ hpersist.Hash when hpersist.Is(hash, value) => KeyNotificationType.HPersist,
+ hset.Hash when hset.Is(hash, value) => KeyNotificationType.HSet,
+ incrbyfloat.Hash when incrbyfloat.Is(hash, value) => KeyNotificationType.IncrByFloat,
+ incrby.Hash when incrby.Is(hash, value) => KeyNotificationType.IncrBy,
+ linsert.Hash when linsert.Is(hash, value) => KeyNotificationType.LInsert,
+ lpop.Hash when lpop.Is(hash, value) => KeyNotificationType.LPop,
+ lpush.Hash when lpush.Is(hash, value) => KeyNotificationType.LPush,
+ lrem.Hash when lrem.Is(hash, value) => KeyNotificationType.LRem,
+ lset.Hash when lset.Is(hash, value) => KeyNotificationType.LSet,
+ ltrim.Hash when ltrim.Is(hash, value) => KeyNotificationType.LTrim,
+ move_from.Hash when move_from.Is(hash, value) => KeyNotificationType.MoveFrom,
+ move_to.Hash when move_to.Is(hash, value) => KeyNotificationType.MoveTo,
+ persist.Hash when persist.Is(hash, value) => KeyNotificationType.Persist,
+ rename_from.Hash when rename_from.Is(hash, value) => KeyNotificationType.RenameFrom,
+ rename_to.Hash when rename_to.Is(hash, value) => KeyNotificationType.RenameTo,
+ restore.Hash when restore.Is(hash, value) => KeyNotificationType.Restore,
+ rpop.Hash when rpop.Is(hash, value) => KeyNotificationType.RPop,
+ rpush.Hash when rpush.Is(hash, value) => KeyNotificationType.RPush,
+ sadd.Hash when sadd.Is(hash, value) => KeyNotificationType.SAdd,
+ set.Hash when set.Is(hash, value) => KeyNotificationType.Set,
+ setrange.Hash when setrange.Is(hash, value) => KeyNotificationType.SetRange,
+ sortstore.Hash when sortstore.Is(hash, value) => KeyNotificationType.SortStore,
+ srem.Hash when srem.Is(hash, value) => KeyNotificationType.SRem,
+ spop.Hash when spop.Is(hash, value) => KeyNotificationType.SPop,
+ xadd.Hash when xadd.Is(hash, value) => KeyNotificationType.XAdd,
+ xdel.Hash when xdel.Is(hash, value) => KeyNotificationType.XDel,
+ xgroup_createconsumer.Hash when xgroup_createconsumer.Is(hash, value) => KeyNotificationType.XGroupCreateConsumer,
+ xgroup_create.Hash when xgroup_create.Is(hash, value) => KeyNotificationType.XGroupCreate,
+ xgroup_delconsumer.Hash when xgroup_delconsumer.Is(hash, value) => KeyNotificationType.XGroupDelConsumer,
+ xgroup_destroy.Hash when xgroup_destroy.Is(hash, value) => KeyNotificationType.XGroupDestroy,
+ xgroup_setid.Hash when xgroup_setid.Is(hash, value) => KeyNotificationType.XGroupSetId,
+ xsetid.Hash when xsetid.Is(hash, value) => KeyNotificationType.XSetId,
+ xtrim.Hash when xtrim.Is(hash, value) => KeyNotificationType.XTrim,
+ zadd.Hash when zadd.Is(hash, value) => KeyNotificationType.ZAdd,
+ zdiffstore.Hash when zdiffstore.Is(hash, value) => KeyNotificationType.ZDiffStore,
+ zinterstore.Hash when zinterstore.Is(hash, value) => KeyNotificationType.ZInterStore,
+ zunionstore.Hash when zunionstore.Is(hash, value) => KeyNotificationType.ZUnionStore,
+ zincr.Hash when zincr.Is(hash, value) => KeyNotificationType.ZIncr,
+ zrembyrank.Hash when zrembyrank.Is(hash, value) => KeyNotificationType.ZRemByRank,
+ zrembyscore.Hash when zrembyscore.Is(hash, value) => KeyNotificationType.ZRemByScore,
+ zrem.Hash when zrem.Is(hash, value) => KeyNotificationType.ZRem,
+ expired.Hash when expired.Is(hash, value) => KeyNotificationType.Expired,
+ evicted.Hash when evicted.Is(hash, value) => KeyNotificationType.Evicted,
+ _new.Hash when _new.Is(hash, value) => KeyNotificationType.New,
+ overwritten.Hash when overwritten.Is(hash, value) => KeyNotificationType.Overwritten,
+ type_changed.Hash when type_changed.Is(hash, value) => KeyNotificationType.TypeChanged,
+ _ => KeyNotificationType.Unknown,
+ };
+ }
+
+ internal static ReadOnlySpan GetRawBytes(KeyNotificationType type)
+ {
+ return type switch
+ {
+ KeyNotificationType.Append => append.U8,
+ KeyNotificationType.Copy => copy.U8,
+ KeyNotificationType.Del => del.U8,
+ KeyNotificationType.Expire => expire.U8,
+ KeyNotificationType.HDel => hdel.U8,
+ KeyNotificationType.HExpired => hexpired.U8,
+ KeyNotificationType.HIncrByFloat => hincrbyfloat.U8,
+ KeyNotificationType.HIncrBy => hincrby.U8,
+ KeyNotificationType.HPersist => hpersist.U8,
+ KeyNotificationType.HSet => hset.U8,
+ KeyNotificationType.IncrByFloat => incrbyfloat.U8,
+ KeyNotificationType.IncrBy => incrby.U8,
+ KeyNotificationType.LInsert => linsert.U8,
+ KeyNotificationType.LPop => lpop.U8,
+ KeyNotificationType.LPush => lpush.U8,
+ KeyNotificationType.LRem => lrem.U8,
+ KeyNotificationType.LSet => lset.U8,
+ KeyNotificationType.LTrim => ltrim.U8,
+ KeyNotificationType.MoveFrom => move_from.U8,
+ KeyNotificationType.MoveTo => move_to.U8,
+ KeyNotificationType.Persist => persist.U8,
+ KeyNotificationType.RenameFrom => rename_from.U8,
+ KeyNotificationType.RenameTo => rename_to.U8,
+ KeyNotificationType.Restore => restore.U8,
+ KeyNotificationType.RPop => rpop.U8,
+ KeyNotificationType.RPush => rpush.U8,
+ KeyNotificationType.SAdd => sadd.U8,
+ KeyNotificationType.Set => set.U8,
+ KeyNotificationType.SetRange => setrange.U8,
+ KeyNotificationType.SortStore => sortstore.U8,
+ KeyNotificationType.SRem => srem.U8,
+ KeyNotificationType.SPop => spop.U8,
+ KeyNotificationType.XAdd => xadd.U8,
+ KeyNotificationType.XDel => xdel.U8,
+ KeyNotificationType.XGroupCreateConsumer => xgroup_createconsumer.U8,
+ KeyNotificationType.XGroupCreate => xgroup_create.U8,
+ KeyNotificationType.XGroupDelConsumer => xgroup_delconsumer.U8,
+ KeyNotificationType.XGroupDestroy => xgroup_destroy.U8,
+ KeyNotificationType.XGroupSetId => xgroup_setid.U8,
+ KeyNotificationType.XSetId => xsetid.U8,
+ KeyNotificationType.XTrim => xtrim.U8,
+ KeyNotificationType.ZAdd => zadd.U8,
+ KeyNotificationType.ZDiffStore => zdiffstore.U8,
+ KeyNotificationType.ZInterStore => zinterstore.U8,
+ KeyNotificationType.ZUnionStore => zunionstore.U8,
+ KeyNotificationType.ZIncr => zincr.U8,
+ KeyNotificationType.ZRemByRank => zrembyrank.U8,
+ KeyNotificationType.ZRemByScore => zrembyscore.U8,
+ KeyNotificationType.ZRem => zrem.U8,
+ KeyNotificationType.Expired => expired.U8,
+ KeyNotificationType.Evicted => evicted.U8,
+ KeyNotificationType.New => _new.U8,
+ KeyNotificationType.Overwritten => overwritten.U8,
+ KeyNotificationType.TypeChanged => type_changed.U8,
+ _ => Throw(),
+ };
+ static ReadOnlySpan Throw() => throw new ArgumentOutOfRangeException(nameof(type));
+ }
+
+#pragma warning disable SA1300, CS8981
+ // ReSharper disable InconsistentNaming
+ [FastHash]
+ internal static partial class append
+ {
+ }
+
+ [FastHash]
+ internal static partial class copy
+ {
+ }
+
+ [FastHash]
+ internal static partial class del
+ {
+ }
+
+ [FastHash]
+ internal static partial class expire
+ {
+ }
+
+ [FastHash]
+ internal static partial class hdel
+ {
+ }
+
+ [FastHash]
+ internal static partial class hexpired
+ {
+ }
+
+ [FastHash]
+ internal static partial class hincrbyfloat
+ {
+ }
+
+ [FastHash]
+ internal static partial class hincrby
+ {
+ }
+
+ [FastHash]
+ internal static partial class hpersist
+ {
+ }
+
+ [FastHash]
+ internal static partial class hset
+ {
+ }
+
+ [FastHash]
+ internal static partial class incrbyfloat
+ {
+ }
+
+ [FastHash]
+ internal static partial class incrby
+ {
+ }
+
+ [FastHash]
+ internal static partial class linsert
+ {
+ }
+
+ [FastHash]
+ internal static partial class lpop
+ {
+ }
+
+ [FastHash]
+ internal static partial class lpush
+ {
+ }
+
+ [FastHash]
+ internal static partial class lrem
+ {
+ }
+
+ [FastHash]
+ internal static partial class lset
+ {
+ }
+
+ [FastHash]
+ internal static partial class ltrim
+ {
+ }
+
+ [FastHash("move_from")] // by default, the generator interprets underscore as hyphen
+ internal static partial class move_from
+ {
+ }
+
+ [FastHash("move_to")] // by default, the generator interprets underscore as hyphen
+ internal static partial class move_to
+ {
+ }
+
+ [FastHash]
+ internal static partial class persist
+ {
+ }
+
+ [FastHash("rename_from")] // by default, the generator interprets underscore as hyphen
+ internal static partial class rename_from
+ {
+ }
+
+ [FastHash("rename_to")] // by default, the generator interprets underscore as hyphen
+ internal static partial class rename_to
+ {
+ }
+
+ [FastHash]
+ internal static partial class restore
+ {
+ }
+
+ [FastHash]
+ internal static partial class rpop
+ {
+ }
+
+ [FastHash]
+ internal static partial class rpush
+ {
+ }
+
+ [FastHash]
+ internal static partial class sadd
+ {
+ }
+
+ [FastHash]
+ internal static partial class set
+ {
+ }
+
+ [FastHash]
+ internal static partial class setrange
+ {
+ }
+
+ [FastHash]
+ internal static partial class sortstore
+ {
+ }
+
+ [FastHash]
+ internal static partial class srem
+ {
+ }
+
+ [FastHash]
+ internal static partial class spop
+ {
+ }
+
+ [FastHash]
+ internal static partial class xadd
+ {
+ }
+
+ [FastHash]
+ internal static partial class xdel
+ {
+ }
+
+ [FastHash] // note: becomes hyphenated
+ internal static partial class xgroup_createconsumer
+ {
+ }
+
+ [FastHash] // note: becomes hyphenated
+ internal static partial class xgroup_create
+ {
+ }
+
+ [FastHash] // note: becomes hyphenated
+ internal static partial class xgroup_delconsumer
+ {
+ }
+
+ [FastHash] // note: becomes hyphenated
+ internal static partial class xgroup_destroy
+ {
+ }
+
+ [FastHash] // note: becomes hyphenated
+ internal static partial class xgroup_setid
+ {
+ }
+
+ [FastHash]
+ internal static partial class xsetid
+ {
+ }
+
+ [FastHash]
+ internal static partial class xtrim
+ {
+ }
+
+ [FastHash]
+ internal static partial class zadd
+ {
+ }
+
+ [FastHash]
+ internal static partial class zdiffstore
+ {
+ }
+
+ [FastHash]
+ internal static partial class zinterstore
+ {
+ }
+
+ [FastHash]
+ internal static partial class zunionstore
+ {
+ }
+
+ [FastHash]
+ internal static partial class zincr
+ {
+ }
+
+ [FastHash]
+ internal static partial class zrembyrank
+ {
+ }
+
+ [FastHash]
+ internal static partial class zrembyscore
+ {
+ }
+
+ [FastHash]
+ internal static partial class zrem
+ {
+ }
+
+ [FastHash]
+ internal static partial class expired
+ {
+ }
+
+ [FastHash]
+ internal static partial class evicted
+ {
+ }
+
+ [FastHash("new")]
+ internal static partial class _new // it isn't worth making the code-gen keyword aware
+ {
+ }
+
+ [FastHash]
+ internal static partial class overwritten
+ {
+ }
+
+ [FastHash("type_changed")] // by default, the generator interprets underscore as hyphen
+ internal static partial class type_changed
+ {
+ }
+
+ // ReSharper restore InconsistentNaming
+#pragma warning restore SA1300, CS8981
+}
diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt
index 91b0e1a43..871fe71f0 100644
--- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt
+++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt
@@ -1 +1,74 @@
-#nullable enable
\ No newline at end of file
+#nullable enable
+StackExchange.Redis.KeyNotification
+StackExchange.Redis.KeyNotification.Channel.get -> StackExchange.Redis.RedisChannel
+StackExchange.Redis.KeyNotification.Value.get -> StackExchange.Redis.RedisValue
+StackExchange.Redis.KeyNotification.Database.get -> int
+StackExchange.Redis.KeyNotification.GetKey() -> StackExchange.Redis.RedisKey
+StackExchange.Redis.KeyNotification.IsKeyEvent.get -> bool
+StackExchange.Redis.KeyNotification.IsKeySpace.get -> bool
+StackExchange.Redis.KeyNotification.KeyByteCount.get -> int
+StackExchange.Redis.KeyNotification.KeyNotification() -> void
+StackExchange.Redis.KeyNotification.TryCopyKey(System.Span destination, out int bytesWritten) -> bool
+StackExchange.Redis.KeyNotification.Type.get -> StackExchange.Redis.KeyNotificationType
+static StackExchange.Redis.KeyNotification.TryParse(in StackExchange.Redis.RedisChannel channel, in StackExchange.Redis.RedisValue value, out StackExchange.Redis.KeyNotification notification) -> bool
+StackExchange.Redis.ChannelMessage.TryParseKeyNotification(out StackExchange.Redis.KeyNotification notification) -> bool
+static StackExchange.Redis.RedisChannel.KeyEvent(StackExchange.Redis.KeyNotificationType type, int? database = null) -> StackExchange.Redis.RedisChannel
+static StackExchange.Redis.RedisChannel.KeyEvent(System.ReadOnlySpan type, int? database) -> StackExchange.Redis.RedisChannel
+static StackExchange.Redis.RedisChannel.KeySpace(in StackExchange.Redis.RedisKey key, int database) -> StackExchange.Redis.RedisChannel
+static StackExchange.Redis.RedisChannel.KeySpacePattern(in StackExchange.Redis.RedisKey pattern, int? database = null) -> StackExchange.Redis.RedisChannel
+StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Append = 1 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Copy = 2 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Del = 3 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Evicted = 1001 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Expire = 4 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Expired = 1000 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.HDel = 5 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.HExpired = 6 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.HIncrBy = 8 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.HIncrByFloat = 7 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.HPersist = 9 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.HSet = 10 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.IncrBy = 12 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.IncrByFloat = 11 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.LInsert = 13 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.LPop = 14 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.LPush = 15 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.LRem = 16 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.LSet = 17 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.LTrim = 18 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.MoveFrom = 19 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.MoveTo = 20 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.New = 1002 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Overwritten = 1003 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Persist = 21 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.RenameFrom = 22 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.RenameTo = 23 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Restore = 24 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.RPop = 25 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.RPush = 26 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.SAdd = 27 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Set = 28 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.SetRange = 29 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.SortStore = 30 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.SPop = 32 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.SRem = 31 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.TypeChanged = 1004 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.Unknown = 0 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.XAdd = 33 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.XDel = 34 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.XGroupCreate = 36 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.XGroupCreateConsumer = 35 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.XGroupDelConsumer = 37 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.XGroupDestroy = 38 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.XGroupSetId = 39 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.XSetId = 40 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.XTrim = 41 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.ZAdd = 42 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.ZDiffStore = 43 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.ZIncr = 46 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.ZInterStore = 44 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.ZRem = 49 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.ZRemByRank = 47 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.ZRemByScore = 48 -> StackExchange.Redis.KeyNotificationType
+StackExchange.Redis.KeyNotificationType.ZUnionStore = 45 -> StackExchange.Redis.KeyNotificationType
diff --git a/src/StackExchange.Redis/RedisChannel.cs b/src/StackExchange.Redis/RedisChannel.cs
index d4289f3c6..7b208bb9d 100644
--- a/src/StackExchange.Redis/RedisChannel.cs
+++ b/src/StackExchange.Redis/RedisChannel.cs
@@ -1,4 +1,5 @@
using System;
+using System.Diagnostics;
using System.Text;
namespace StackExchange.Redis
@@ -10,6 +11,8 @@ namespace StackExchange.Redis
{
internal readonly byte[]? Value;
+ internal ReadOnlySpan Span => Value is null ? default : Value.AsSpan();
+
internal readonly RedisChannelOptions Options;
[Flags]
@@ -19,19 +22,36 @@ internal enum RedisChannelOptions
Pattern = 1 << 0,
Sharded = 1 << 1,
KeyRouted = 1 << 2,
+ MultiNode = 1 << 3,
}
// we don't consider Routed for equality - it's an implementation detail, not a fundamental feature
- private const RedisChannelOptions EqualityMask = ~RedisChannelOptions.KeyRouted;
+ private const RedisChannelOptions EqualityMask =
+ ~(RedisChannelOptions.KeyRouted | RedisChannelOptions.MultiNode);
- internal RedisCommand PublishCommand => IsSharded ? RedisCommand.SPUBLISH : RedisCommand.PUBLISH;
+ internal RedisCommand GetPublishCommand()
+ {
+ return (Options & (RedisChannelOptions.Sharded | RedisChannelOptions.MultiNode)) switch
+ {
+ RedisChannelOptions.None => RedisCommand.PUBLISH,
+ RedisChannelOptions.Sharded => RedisCommand.SPUBLISH,
+ _ => ThrowKeyRouted(),
+ };
+
+ static RedisCommand ThrowKeyRouted() => throw new InvalidOperationException("Publishing is not supported for multi-node channels");
+ }
///
- /// Should we use cluster routing for this channel? This applies *either* to sharded (SPUBLISH) scenarios,
+ /// Should we use cluster routing for this channel? This applies *either* to sharded (SPUBLISH) scenarios,
/// or to scenarios using .
///
internal bool IsKeyRouted => (Options & RedisChannelOptions.KeyRouted) != 0;
+ ///
+ /// Should this channel be subscribed to on all nodes? This is only relevant for cluster scenarios and keyspace notifications.
+ ///
+ internal bool IsMultiNode => (Options & RedisChannelOptions.MultiNode) != 0;
+
///
/// Indicates whether the channel-name is either null or a zero-length value.
///
@@ -58,6 +78,7 @@ public static bool UseImplicitAutoPattern
get => s_DefaultPatternMode == PatternMode.Auto;
set => s_DefaultPatternMode = value ? PatternMode.Auto : PatternMode.Literal;
}
+
private static PatternMode s_DefaultPatternMode = PatternMode.Auto;
///
@@ -82,7 +103,13 @@ public static bool UseImplicitAutoPattern
/// a consideration.
///
/// Note that channels from Sharded are always routed.
- public RedisChannel WithKeyRouting() => new(Value, Options | RedisChannelOptions.KeyRouted);
+ public RedisChannel WithKeyRouting()
+ {
+ if (IsMultiNode) Throw();
+ return new(Value, Options | RedisChannelOptions.KeyRouted);
+
+ static void Throw() => throw new InvalidOperationException("Key routing is not supported for multi-node channels");
+ }
///
/// Creates a new that acts as a wildcard subscription. In cluster
@@ -105,7 +132,8 @@ public static bool UseImplicitAutoPattern
///
/// The name of the channel to create.
/// The mode for name matching.
- public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode) ? RedisChannelOptions.Pattern : RedisChannelOptions.None)
+ public RedisChannel(byte[]? value, PatternMode mode) : this(
+ value, DeterminePatternBased(value, mode) ? RedisChannelOptions.Pattern : RedisChannelOptions.None)
{
}
@@ -115,7 +143,9 @@ public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatt
/// The string name of the channel to create.
/// The mode for name matching.
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
- public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode)
+ public RedisChannel(string value, PatternMode mode) : this(
+ // ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
+ value is null ? null : Encoding.UTF8.GetBytes(value), mode)
{
}
@@ -128,7 +158,8 @@ public RedisChannel(string value, PatternMode mode) : this(value is null ? null
/// The name of the channel to create.
/// Note that sharded subscriptions are completely separate to regular subscriptions; subscriptions
/// using sharded channels must also be published with sharded channels (and vice versa).
- public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted);
+ public static RedisChannel Sharded(byte[]? value) =>
+ new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted);
///
/// Create a new redis channel from a string, representing a sharded channel. In cluster
@@ -139,7 +170,112 @@ public RedisChannel(string value, PatternMode mode) : this(value is null ? null
/// The string name of the channel to create.
/// Note that sharded subscriptions are completely separate to regular subscriptions; subscriptions
/// using sharded channels must also be published with sharded channels (and vice versa).
- public static RedisChannel Sharded(string value) => new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted);
+ public static RedisChannel Sharded(string value) =>
+ new(value, RedisChannelOptions.Sharded | RedisChannelOptions.KeyRouted);
+
+ ///
+ /// Create a key-notification channel for a single key in a single database.
+ ///
+ public static RedisChannel KeySpace(in RedisKey key, int database)
+ => BuildKeySpace(key, database, RedisChannelOptions.KeyRouted);
+
+ ///
+ /// Create a key-notification channel for a pattern, optionally in a specified database.
+ ///
+ public static RedisChannel KeySpacePattern(in RedisKey pattern, int? database = null)
+ => BuildKeySpace(pattern, database, RedisChannelOptions.Pattern | RedisChannelOptions.MultiNode);
+
+ private const int DatabaseScratchBufferSize = 16; // largest non-negative int32 is 10 digits
+
+ private static ReadOnlySpan AppendDatabase(Span target, int? database, RedisChannelOptions options)
+ {
+ if (database is null)
+ {
+ if ((options & RedisChannelOptions.Pattern) == 0) throw new ArgumentNullException(nameof(database));
+ return "*"u8; // don't worry about the inbound scratch buffer, this is fine
+ }
+ else
+ {
+ var db32 = database.GetValueOrDefault();
+ if (db32 == 0) return "0"u8; // so common, we might as well special case
+ if (db32 < 0) throw new ArgumentOutOfRangeException(nameof(database));
+ return target.Slice(0, Format.FormatInt32(db32, target));
+ }
+ }
+
+ ///
+ /// Create an event-notification channel for a given event type, optionally in a specified database.
+ ///
+#pragma warning disable RS0027
+ public static RedisChannel KeyEvent(KeyNotificationType type, int? database = null)
+#pragma warning restore RS0027
+ => KeyEvent(KeyNotificationTypeFastHash.GetRawBytes(type), database);
+
+ ///
+ /// Create an event-notification channel for a given event type, optionally in a specified database.
+ ///
+ /// This API is intended for use with custom/unknown event types; for well-known types, use .
+ public static RedisChannel KeyEvent(ReadOnlySpan type, int? database)
+ {
+ if (type.IsEmpty) throw new ArgumentNullException(nameof(type));
+
+ RedisChannelOptions options = RedisChannelOptions.MultiNode;
+ if (database is null) options |= RedisChannelOptions.Pattern;
+ var db = AppendDatabase(stackalloc byte[DatabaseScratchBufferSize], database, options);
+
+ // __keyevent@{db}__:{type}
+ var arr = new byte[14 + db.Length + type.Length];
+
+ var target = AppendAndAdvance(arr.AsSpan(), "__keyevent@"u8);
+ target = AppendAndAdvance(target, db);
+ target = AppendAndAdvance(target, "__:"u8);
+ target = AppendAndAdvance(target, type);
+ Debug.Assert(target.IsEmpty); // should have calculated length correctly
+
+ return new RedisChannel(arr, options);
+ }
+
+ private static Span AppendAndAdvance(Span target, scoped ReadOnlySpan value)
+ {
+ value.CopyTo(target);
+ return target.Slice(value.Length);
+ }
+
+ private static RedisChannel BuildKeySpace(in RedisKey key, int? database, RedisChannelOptions options)
+ {
+ int keyLen;
+ if (key.IsNull)
+ {
+ if ((options & RedisChannelOptions.Pattern) == 0) throw new ArgumentNullException(nameof(key));
+ keyLen = 1;
+ }
+ else
+ {
+ keyLen = key.TotalLength();
+ if (keyLen == 0) throw new ArgumentOutOfRangeException(nameof(key));
+ }
+
+ var db = AppendDatabase(stackalloc byte[DatabaseScratchBufferSize], database, options);
+
+ // __keyspace@{db}__:{key}
+ var arr = new byte[14 + db.Length + keyLen];
+
+ var target = AppendAndAdvance(arr.AsSpan(), "__keyspace@"u8);
+ target = AppendAndAdvance(target, db);
+ target = AppendAndAdvance(target, "__:"u8);
+ Debug.Assert(keyLen == target.Length); // should have exactly "len" bytes remaining
+ if (key.IsNull)
+ {
+ target[0] = (byte)'*';
+ target = target.Slice(1);
+ }
+ else
+ {
+ target = target.Slice(key.CopyTo(target));
+ }
+ Debug.Assert(target.IsEmpty); // should have calculated length correctly
+ return new RedisChannel(arr, options);
+ }
internal RedisChannel(byte[]? value, RedisChannelOptions options)
{
@@ -351,7 +487,7 @@ public static implicit operator RedisChannel(byte[]? key)
{
return Encoding.UTF8.GetString(arr);
}
- catch (Exception e) when // Only catch exception throwed by Encoding.UTF8.GetString
+ catch (Exception e) when // Only catch exception thrown by Encoding.UTF8.GetString
(e is DecoderFallbackException or ArgumentException or ArgumentNullException)
{
return BitConverter.ToString(arr);
diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs
index c1c3c5728..056a5380a 100644
--- a/src/StackExchange.Redis/RedisDatabase.cs
+++ b/src/StackExchange.Redis/RedisDatabase.cs
@@ -1900,7 +1900,7 @@ public Task StringLongestCommonSubsequenceWithMatchesAsync(Redis
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
- var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
+ var msg = Message.Create(-1, flags, channel.GetPublishCommand(), channel, message);
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
return ExecuteSync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
}
@@ -1908,7 +1908,7 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags
public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
- var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
+ var msg = Message.Create(-1, flags, channel.GetPublishCommand(), channel, message);
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
return ExecuteAsync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
}
diff --git a/src/StackExchange.Redis/RedisSubscriber.cs b/src/StackExchange.Redis/RedisSubscriber.cs
index 9ade78c2d..824aef025 100644
--- a/src/StackExchange.Redis/RedisSubscriber.cs
+++ b/src/StackExchange.Redis/RedisSubscriber.cs
@@ -1,11 +1,8 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
-using System.Diagnostics.SymbolStore;
using System.Net;
-using System.Threading;
using System.Threading.Tasks;
-using Pipelines.Sockets.Unofficial;
using Pipelines.Sockets.Unofficial.Arenas;
using static StackExchange.Redis.ConnectionMultiplexer;
@@ -30,7 +27,7 @@ internal Subscription GetOrAddSubscription(in RedisChannel channel, CommandFlags
{
if (!subscriptions.TryGetValue(channel, out var sub))
{
- sub = new Subscription(flags);
+ sub = channel.IsMultiNode ? new MultiNodeSubscription(flags) : new SingleNodeSubscription(flags);
subscriptions.TryAdd(channel, sub);
}
return sub;
@@ -71,7 +68,7 @@ internal bool GetSubscriberCounts(in RedisChannel channel, out int handlers, out
{
if (!channel.IsNullOrEmpty && subscriptions.TryGetValue(channel, out Subscription? sub))
{
- return sub.GetCurrentServer();
+ return sub.GetAnyCurrentServer();
}
return null;
}
@@ -123,7 +120,7 @@ internal void UpdateSubscriptions()
{
foreach (var pair in subscriptions)
{
- pair.Value.UpdateServer();
+ pair.Value.RemoveDisconnectedEndpoints();
}
}
@@ -135,13 +132,10 @@ internal long EnsureSubscriptions(CommandFlags flags = CommandFlags.None)
{
// TODO: Subscribe with variadic commands to reduce round trips
long count = 0;
+ var subscriber = DefaultSubscriber;
foreach (var pair in subscriptions)
{
- if (!pair.Value.IsConnected)
- {
- count++;
- DefaultSubscriber.EnsureSubscribedToServer(pair.Value, pair.Key, flags, true);
- }
+ count += pair.Value.EnsureSubscribedToServer(subscriber, pair.Key, flags, true);
}
return count;
}
@@ -151,161 +145,6 @@ internal enum SubscriptionAction
Subscribe,
Unsubscribe,
}
-
- ///
- /// This is the record of a single subscription to a redis server.
- /// It's the singular channel (which may or may not be a pattern), to one or more handlers.
- /// We subscriber to a redis server once (for all messages) and execute 1-many handlers when a message arrives.
- ///
- internal sealed class Subscription
- {
- private Action? _handlers;
- private readonly object _handlersLock = new object();
- private ChannelMessageQueue? _queues;
- private ServerEndPoint? CurrentServer;
- public CommandFlags Flags { get; }
- public ResultProcessor.TrackSubscriptionsProcessor Processor { get; }
-
- ///
- /// Whether the we have is connected.
- /// Since we clear on a disconnect, this should stay correct.
- ///
- internal bool IsConnected => CurrentServer?.IsSubscriberConnected == true;
-
- public Subscription(CommandFlags flags)
- {
- Flags = flags;
- Processor = new ResultProcessor.TrackSubscriptionsProcessor(this);
- }
-
- ///
- /// Gets the configured (P)SUBSCRIBE or (P)UNSUBSCRIBE for an action.
- ///
- internal Message GetMessage(RedisChannel channel, SubscriptionAction action, CommandFlags flags, bool internalCall)
- {
- var command = action switch // note that the Routed flag doesn't impact the message here - just the routing
- {
- SubscriptionAction.Subscribe => (channel.Options & ~RedisChannel.RedisChannelOptions.KeyRouted) switch
- {
- RedisChannel.RedisChannelOptions.None => RedisCommand.SUBSCRIBE,
- RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PSUBSCRIBE,
- RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SSUBSCRIBE,
- _ => Unknown(action, channel.Options),
- },
- SubscriptionAction.Unsubscribe => (channel.Options & ~RedisChannel.RedisChannelOptions.KeyRouted) switch
- {
- RedisChannel.RedisChannelOptions.None => RedisCommand.UNSUBSCRIBE,
- RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PUNSUBSCRIBE,
- RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SUNSUBSCRIBE,
- _ => Unknown(action, channel.Options),
- },
- _ => Unknown(action, channel.Options),
- };
-
- // TODO: Consider flags here - we need to pass Fire and Forget, but don't want to intermingle Primary/Replica
- var msg = Message.Create(-1, Flags | flags, command, channel);
- msg.SetForSubscriptionBridge();
- if (internalCall)
- {
- msg.SetInternalCall();
- }
- return msg;
- }
-
- private RedisCommand Unknown(SubscriptionAction action, RedisChannel.RedisChannelOptions options)
- => throw new ArgumentException($"Unable to determine pub/sub operation for '{action}' against '{options}'");
-
- public void Add(Action? handler, ChannelMessageQueue? queue)
- {
- if (handler != null)
- {
- lock (_handlersLock)
- {
- _handlers += handler;
- }
- }
- if (queue != null)
- {
- ChannelMessageQueue.Combine(ref _queues, queue);
- }
- }
-
- public bool Remove(Action? handler, ChannelMessageQueue? queue)
- {
- if (handler != null)
- {
- lock (_handlersLock)
- {
- _handlers -= handler;
- }
- }
- if (queue != null)
- {
- ChannelMessageQueue.Remove(ref _queues, queue);
- }
- return _handlers == null & _queues == null;
- }
-
- public ICompletable? ForInvoke(in RedisChannel channel, in RedisValue message, out ChannelMessageQueue? queues)
- {
- var handlers = _handlers;
- queues = Volatile.Read(ref _queues);
- return handlers == null ? null : new MessageCompletable(channel, message, handlers);
- }
-
- internal void MarkCompleted()
- {
- lock (_handlersLock)
- {
- _handlers = null;
- }
- ChannelMessageQueue.MarkAllCompleted(ref _queues);
- }
-
- internal void GetSubscriberCounts(out int handlers, out int queues)
- {
- queues = ChannelMessageQueue.Count(ref _queues);
- var tmp = _handlers;
- if (tmp == null)
- {
- handlers = 0;
- }
- else if (tmp.IsSingle())
- {
- handlers = 1;
- }
- else
- {
- handlers = 0;
- foreach (var sub in tmp.AsEnumerable()) { handlers++; }
- }
- }
-
- internal ServerEndPoint? GetCurrentServer() => Volatile.Read(ref CurrentServer);
- internal void SetCurrentServer(ServerEndPoint? server) => CurrentServer = server;
- // conditional clear
- internal bool ClearCurrentServer(ServerEndPoint expected)
- {
- if (CurrentServer == expected)
- {
- CurrentServer = null;
- return true;
- }
-
- return false;
- }
-
- ///
- /// Evaluates state and if we're not currently connected, clears the server reference.
- ///
- internal void UpdateServer()
- {
- if (!IsConnected)
- {
- CurrentServer = null;
- }
- }
- }
}
///
@@ -393,7 +232,7 @@ private static void ThrowIfNull(in RedisChannel channel)
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
ThrowIfNull(channel);
- var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
+ var msg = Message.Create(-1, flags, channel.GetPublishCommand(), channel, message);
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
return ExecuteSync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
}
@@ -401,7 +240,7 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags
public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
ThrowIfNull(channel);
- var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
+ var msg = Message.Create(-1, flags, channel.GetPublishCommand(), channel, message);
// if we're actively subscribed: send via that connection (otherwise, follow normal rules)
return ExecuteAsync(msg, ResultProcessor.Int64, server: multiplexer.GetSubscribedServer(channel));
}
@@ -416,37 +255,26 @@ public ChannelMessageQueue Subscribe(RedisChannel channel, CommandFlags flags =
return queue;
}
- private bool Subscribe(RedisChannel channel, Action? handler, ChannelMessageQueue? queue, CommandFlags flags)
+ private int Subscribe(RedisChannel channel, Action? handler, ChannelMessageQueue? queue, CommandFlags flags)
{
ThrowIfNull(channel);
- if (handler == null && queue == null) { return true; }
+ if (handler == null && queue == null) { return 0; }
var sub = multiplexer.GetOrAddSubscription(channel, flags);
sub.Add(handler, queue);
- return EnsureSubscribedToServer(sub, channel, flags, false);
- }
-
- internal bool EnsureSubscribedToServer(Subscription sub, RedisChannel channel, CommandFlags flags, bool internalCall)
- {
- if (sub.IsConnected) { return true; }
-
- // TODO: Cleanup old hangers here?
- sub.SetCurrentServer(null); // we're not appropriately connected, so blank it out for eligible reconnection
- var message = sub.GetMessage(channel, SubscriptionAction.Subscribe, flags, internalCall);
- var selected = multiplexer.SelectServer(message);
- return ExecuteSync(message, sub.Processor, selected);
+ return sub.EnsureSubscribedToServer(this, channel, flags, false);
}
internal void ResubscribeToServer(Subscription sub, RedisChannel channel, ServerEndPoint serverEndPoint, string cause)
{
// conditional: only if that's the server we were connected to, or "none"; we don't want to end up duplicated
- if (sub.ClearCurrentServer(serverEndPoint) || !sub.IsConnected)
+ if (sub.TryRemoveEndpoint(serverEndPoint) || !sub.IsConnectedAny())
{
if (serverEndPoint.IsSubscriberConnected)
{
// we'll *try* for a simple resubscribe, following any -MOVED etc, but if that fails: fall back
// to full reconfigure; importantly, note that we've already recorded the disconnect
- var message = sub.GetMessage(channel, SubscriptionAction.Subscribe, CommandFlags.None, false);
+ var message = sub.GetSubscriptionMessage(channel, SubscriptionAction.Subscribe, CommandFlags.None, false);
_ = ExecuteAsync(message, sub.Processor, serverEndPoint).ContinueWith(
t => multiplexer.ReconfigureIfNeeded(serverEndPoint.EndPoint, false, cause: cause),
TaskContinuationOptions.OnlyOnFaulted);
@@ -470,25 +298,14 @@ public async Task SubscribeAsync(RedisChannel channel, Comm
return queue;
}
- private Task SubscribeAsync(RedisChannel channel, Action? handler, ChannelMessageQueue? queue, CommandFlags flags, ServerEndPoint? server = null)
+ private Task SubscribeAsync(RedisChannel channel, Action? handler, ChannelMessageQueue? queue, CommandFlags flags, ServerEndPoint? server = null)
{
ThrowIfNull(channel);
- if (handler == null && queue == null) { return CompletedTask.Default(null); }
+ if (handler == null && queue == null) { return CompletedTask.Default(null); }
var sub = multiplexer.GetOrAddSubscription(channel, flags);
sub.Add(handler, queue);
- return EnsureSubscribedToServerAsync(sub, channel, flags, false, server);
- }
-
- public Task EnsureSubscribedToServerAsync(Subscription sub, RedisChannel channel, CommandFlags flags, bool internalCall, ServerEndPoint? server = null)
- {
- if (sub.IsConnected) { return CompletedTask.Default(null); }
-
- // TODO: Cleanup old hangers here?
- sub.SetCurrentServer(null); // we're not appropriately connected, so blank it out for eligible reconnection
- var message = sub.GetMessage(channel, SubscriptionAction.Subscribe, flags, internalCall);
- server ??= multiplexer.SelectServer(message);
- return ExecuteAsync(message, sub.Processor, server);
+ return sub.EnsureSubscribedToServerAsync(this, channel, flags, false, server);
}
public EndPoint? SubscribedEndpoint(RedisChannel channel) => multiplexer.GetSubscribedServer(channel)?.EndPoint;
@@ -500,21 +317,12 @@ public bool Unsubscribe(in RedisChannel channel, Action? handler, CommandFlags flags)
=> UnsubscribeAsync(channel, handler, null, flags);
@@ -523,20 +331,10 @@ public Task UnsubscribeAsync(in RedisChannel channel, Action.Default(asyncState);
}
- private Task UnsubscribeFromServerAsync(Subscription sub, RedisChannel channel, CommandFlags flags, object? asyncState, bool internalCall)
- {
- if (sub.GetCurrentServer() is ServerEndPoint oldOwner)
- {
- var message = sub.GetMessage(channel, SubscriptionAction.Unsubscribe, flags, internalCall);
- return multiplexer.ExecuteAsyncImpl(message, sub.Processor, asyncState, oldOwner);
- }
- return CompletedTask.FromResult(true, asyncState);
- }
-
///
/// Unregisters a handler or queue and returns if we should remove it from the server.
///
@@ -573,7 +371,7 @@ public void UnsubscribeAll(CommandFlags flags = CommandFlags.None)
if (subs.TryRemove(pair.Key, out var sub))
{
sub.MarkCompleted();
- UnsubscribeFromServer(sub, pair.Key, flags, false);
+ sub.UnsubscribeFromServer(this, pair.Key, flags, false);
}
}
}
@@ -588,7 +386,7 @@ public Task UnsubscribeAllAsync(CommandFlags flags = CommandFlags.None)
if (subs.TryRemove(pair.Key, out var sub))
{
sub.MarkCompleted();
- last = UnsubscribeFromServerAsync(sub, pair.Key, flags, asyncState, false);
+ last = sub.UnsubscribeFromServerAsync(this, pair.Key, flags, asyncState, false);
}
}
return last ?? CompletedTask.Default(asyncState);
diff --git a/src/StackExchange.Redis/RedisValue.cs b/src/StackExchange.Redis/RedisValue.cs
index d306ca0d0..1f6947460 100644
--- a/src/StackExchange.Redis/RedisValue.cs
+++ b/src/StackExchange.Redis/RedisValue.cs
@@ -1245,5 +1245,16 @@ internal ValueCondition Digest()
return digest;
}
}
+
+ internal bool TryGetSpan(out ReadOnlySpan span)
+ {
+ if (_objectOrSentinel == Sentinel_Raw)
+ {
+ span = _memory.Span;
+ return true;
+ }
+ span = default;
+ return false;
+ }
}
}
diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs
index 196cabde5..f2c6deb8b 100644
--- a/src/StackExchange.Redis/ResultProcessor.cs
+++ b/src/StackExchange.Redis/ResultProcessor.cs
@@ -469,12 +469,21 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
connection.SubscriptionCount = count;
SetResult(message, true);
- var newServer = message.Command switch
+ var ep = connection.BridgeCouldBeNull?.ServerEndPoint;
+ if (ep is not null)
{
- RedisCommand.SUBSCRIBE or RedisCommand.SSUBSCRIBE or RedisCommand.PSUBSCRIBE => connection.BridgeCouldBeNull?.ServerEndPoint,
- _ => null,
- };
- Subscription?.SetCurrentServer(newServer);
+ switch (message.Command)
+ {
+ case RedisCommand.SUBSCRIBE:
+ case RedisCommand.SSUBSCRIBE:
+ case RedisCommand.PSUBSCRIBE:
+ Subscription?.AddEndpoint(ep);
+ break;
+ default:
+ Subscription?.TryRemoveEndpoint(ep);
+ break;
+ }
+ }
return true;
}
}
diff --git a/src/StackExchange.Redis/Subscription.cs b/src/StackExchange.Redis/Subscription.cs
new file mode 100644
index 000000000..2d877c9eb
--- /dev/null
+++ b/src/StackExchange.Redis/Subscription.cs
@@ -0,0 +1,496 @@
+using System;
+using System.Buffers;
+using System.Collections.Concurrent;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+using Pipelines.Sockets.Unofficial;
+
+namespace StackExchange.Redis;
+
+public partial class ConnectionMultiplexer
+{
+ ///
+ /// This is the record of a single subscription to a redis server.
+ /// It's the singular channel (which may or may not be a pattern), to one or more handlers.
+ /// We subscriber to a redis server once (for all messages) and execute 1-many handlers when a message arrives.
+ ///
+ internal abstract class Subscription
+ {
+ private Action? _handlers;
+ private readonly object _handlersLock = new();
+ private ChannelMessageQueue? _queues;
+ public CommandFlags Flags { get; }
+ public ResultProcessor.TrackSubscriptionsProcessor Processor { get; }
+
+ internal abstract bool IsConnectedAny();
+ internal abstract bool IsConnectedTo(EndPoint endpoint);
+
+ internal abstract void AddEndpoint(ServerEndPoint server);
+
+ // conditional clear
+ internal abstract bool TryRemoveEndpoint(ServerEndPoint expected);
+
+ internal abstract void RemoveDisconnectedEndpoints();
+
+ // returns the number of changes required
+ internal abstract int EnsureSubscribedToServer(
+ RedisSubscriber subscriber,
+ in RedisChannel channel,
+ CommandFlags flags,
+ bool internalCall);
+
+ // returns the number of changes required
+ internal abstract Task EnsureSubscribedToServerAsync(
+ RedisSubscriber subscriber,
+ RedisChannel channel,
+ CommandFlags flags,
+ bool internalCall,
+ ServerEndPoint? server = null);
+
+ internal abstract bool UnsubscribeFromServer(
+ RedisSubscriber subscriber,
+ in RedisChannel channel,
+ CommandFlags flags,
+ bool internalCall);
+
+ internal abstract Task UnsubscribeFromServerAsync(
+ RedisSubscriber subscriber,
+ RedisChannel channel,
+ CommandFlags flags,
+ object? asyncState,
+ bool internalCall);
+
+ internal abstract int GetConnectionCount();
+
+ internal abstract ServerEndPoint? GetAnyCurrentServer();
+
+ public Subscription(CommandFlags flags)
+ {
+ Flags = flags;
+ Processor = new ResultProcessor.TrackSubscriptionsProcessor(this);
+ }
+
+ ///
+ /// Gets the configured (P)SUBSCRIBE or (P)UNSUBSCRIBE for an action.
+ ///
+ internal Message GetSubscriptionMessage(
+ RedisChannel channel,
+ SubscriptionAction action,
+ CommandFlags flags,
+ bool internalCall)
+ {
+ var command =
+ action switch // note that the Routed flag doesn't impact the message here - just the routing
+ {
+ SubscriptionAction.Subscribe => (channel.Options &
+ ~RedisChannel.RedisChannelOptions
+ .KeyRouted) switch
+ {
+ RedisChannel.RedisChannelOptions.None => RedisCommand.SUBSCRIBE,
+ RedisChannel.RedisChannelOptions.MultiNode => RedisCommand.SUBSCRIBE,
+ RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PSUBSCRIBE,
+ RedisChannel.RedisChannelOptions.Pattern | RedisChannel.RedisChannelOptions.MultiNode =>
+ RedisCommand.PSUBSCRIBE,
+ RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SSUBSCRIBE,
+ _ => Unknown(action, channel.Options),
+ },
+ SubscriptionAction.Unsubscribe => (channel.Options &
+ ~RedisChannel.RedisChannelOptions.KeyRouted) switch
+ {
+ RedisChannel.RedisChannelOptions.None => RedisCommand.UNSUBSCRIBE,
+ RedisChannel.RedisChannelOptions.MultiNode => RedisCommand.UNSUBSCRIBE,
+ RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PUNSUBSCRIBE,
+ RedisChannel.RedisChannelOptions.Pattern | RedisChannel.RedisChannelOptions.MultiNode =>
+ RedisCommand.PUNSUBSCRIBE,
+ RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SUNSUBSCRIBE,
+ _ => Unknown(action, channel.Options),
+ },
+ _ => Unknown(action, channel.Options),
+ };
+
+ // TODO: Consider flags here - we need to pass Fire and Forget, but don't want to intermingle Primary/Replica
+ var msg = Message.Create(-1, Flags | flags, command, channel);
+ msg.SetForSubscriptionBridge();
+ if (internalCall)
+ {
+ msg.SetInternalCall();
+ }
+
+ return msg;
+ }
+
+ private RedisCommand Unknown(SubscriptionAction action, RedisChannel.RedisChannelOptions options)
+ => throw new ArgumentException(
+ $"Unable to determine pub/sub operation for '{action}' against '{options}'");
+
+ public void Add(Action? handler, ChannelMessageQueue? queue)
+ {
+ if (handler != null)
+ {
+ lock (_handlersLock)
+ {
+ _handlers += handler;
+ }
+ }
+
+ if (queue != null)
+ {
+ ChannelMessageQueue.Combine(ref _queues, queue);
+ }
+ }
+
+ public bool Remove(Action? handler, ChannelMessageQueue? queue)
+ {
+ if (handler != null)
+ {
+ lock (_handlersLock)
+ {
+ _handlers -= handler;
+ }
+ }
+
+ if (queue != null)
+ {
+ ChannelMessageQueue.Remove(ref _queues, queue);
+ }
+
+ return _handlers == null & _queues == null;
+ }
+
+ public ICompletable? ForInvoke(in RedisChannel channel, in RedisValue message, out ChannelMessageQueue? queues)
+ {
+ var handlers = _handlers;
+ queues = Volatile.Read(ref _queues);
+ return handlers == null ? null : new MessageCompletable(channel, message, handlers);
+ }
+
+ internal void MarkCompleted()
+ {
+ lock (_handlersLock)
+ {
+ _handlers = null;
+ }
+
+ ChannelMessageQueue.MarkAllCompleted(ref _queues);
+ }
+
+ internal void GetSubscriberCounts(out int handlers, out int queues)
+ {
+ queues = ChannelMessageQueue.Count(ref _queues);
+ var tmp = _handlers;
+ if (tmp == null)
+ {
+ handlers = 0;
+ }
+ else if (tmp.IsSingle())
+ {
+ handlers = 1;
+ }
+ else
+ {
+ handlers = 0;
+ foreach (var sub in tmp.AsEnumerable()) { handlers++; }
+ }
+ }
+ }
+
+ // used for most subscriptions; routed to a single node
+ internal sealed class SingleNodeSubscription(CommandFlags flags) : Subscription(flags)
+ {
+ internal override bool IsConnectedAny() => _currentServer is { IsSubscriberConnected: true };
+
+ internal override int GetConnectionCount() => IsConnectedAny() ? 1 : 0;
+
+ internal override bool IsConnectedTo(EndPoint endpoint)
+ {
+ var server = _currentServer;
+ return server is { IsSubscriberConnected: true } && server.EndPoint == endpoint;
+ }
+
+ internal override void AddEndpoint(ServerEndPoint server) => _currentServer = server;
+
+ internal override bool TryRemoveEndpoint(ServerEndPoint expected)
+ {
+ if (_currentServer == expected)
+ {
+ _currentServer = null;
+ return true;
+ }
+
+ return false;
+ }
+
+ internal override bool UnsubscribeFromServer(
+ RedisSubscriber subscriber,
+ in RedisChannel channel,
+ CommandFlags flags,
+ bool internalCall)
+ {
+ var server = _currentServer;
+ if (server is not null)
+ {
+ var message = GetSubscriptionMessage(channel, SubscriptionAction.Unsubscribe, flags, internalCall);
+ return subscriber.multiplexer.ExecuteSyncImpl(message, Processor, server);
+ }
+
+ return true;
+ }
+
+ internal override Task UnsubscribeFromServerAsync(
+ RedisSubscriber subscriber,
+ RedisChannel channel,
+ CommandFlags flags,
+ object? asyncState,
+ bool internalCall)
+ {
+ var server = _currentServer;
+ if (server is not null)
+ {
+ var message = GetSubscriptionMessage(channel, SubscriptionAction.Unsubscribe, flags, internalCall);
+ return subscriber.multiplexer.ExecuteAsyncImpl(message, Processor, asyncState, server);
+ }
+
+ return CompletedTask.FromResult(true, asyncState);
+ }
+
+ private ServerEndPoint? _currentServer;
+ internal ServerEndPoint? GetCurrentServer() => Volatile.Read(ref _currentServer);
+
+ internal override ServerEndPoint? GetAnyCurrentServer() => Volatile.Read(ref _currentServer);
+
+ ///
+ /// Evaluates state and if we're not currently connected, clears the server reference.
+ ///
+ internal override void RemoveDisconnectedEndpoints()
+ {
+ var server = _currentServer;
+ if (server is { IsSubscriberConnected: false })
+ {
+ _currentServer = null;
+ }
+ }
+
+ internal override int EnsureSubscribedToServer(
+ RedisSubscriber subscriber,
+ in RedisChannel channel,
+ CommandFlags flags,
+ bool internalCall)
+ {
+ if (IsConnectedAny()) return 0;
+
+ // we're not appropriately connected, so blank it out for eligible reconnection
+ _currentServer = null;
+ var message = GetSubscriptionMessage(channel, SubscriptionAction.Subscribe, flags, internalCall);
+ var selected = subscriber.multiplexer.SelectServer(message);
+ _ = subscriber.ExecuteSync(message, Processor, selected);
+ return 1;
+ }
+
+ internal override async Task EnsureSubscribedToServerAsync(
+ RedisSubscriber subscriber,
+ RedisChannel channel,
+ CommandFlags flags,
+ bool internalCall,
+ ServerEndPoint? server = null)
+ {
+ if (IsConnectedAny()) return 0;
+
+ // we're not appropriately connected, so blank it out for eligible reconnection
+ _currentServer = null;
+ var message = GetSubscriptionMessage(channel, SubscriptionAction.Subscribe, flags, internalCall);
+ server ??= subscriber.multiplexer.SelectServer(message);
+ await subscriber.ExecuteAsync(message, Processor, server).ForAwait();
+ return 1;
+ }
+ }
+
+ // used for keyspace subscriptions, which are routed to multiple nodes
+ internal sealed class MultiNodeSubscription(CommandFlags flags) : Subscription(flags)
+ {
+ private readonly ConcurrentDictionary _servers = new();
+
+ internal override bool IsConnectedAny()
+ {
+ foreach (var server in _servers)
+ {
+ if (server.Value is { IsSubscriberConnected: true }) return true;
+ }
+
+ return false;
+ }
+
+ internal override int GetConnectionCount()
+ {
+ int count = 0;
+ foreach (var server in _servers)
+ {
+ if (server.Value is { IsSubscriberConnected: true }) count++;
+ }
+
+ return count;
+ }
+
+ internal override bool IsConnectedTo(EndPoint endpoint)
+ => _servers.TryGetValue(endpoint, out var server)
+ && server.IsSubscriberConnected;
+
+ internal override void AddEndpoint(ServerEndPoint server)
+ {
+ var ep = server.EndPoint;
+ if (!_servers.TryAdd(ep, server))
+ {
+ _servers[ep] = server;
+ }
+ }
+
+ internal override bool TryRemoveEndpoint(ServerEndPoint expected)
+ {
+ return _servers.TryRemove(expected.EndPoint, out _);
+ }
+
+ internal override ServerEndPoint? GetAnyCurrentServer()
+ {
+ ServerEndPoint? last = null;
+ // prefer actively connected servers, but settle for anything
+ foreach (var server in _servers)
+ {
+ last = server.Value;
+ if (last is { IsSubscriberConnected: true })
+ {
+ break;
+ }
+ }
+
+ return last;
+ }
+
+ internal override void RemoveDisconnectedEndpoints()
+ {
+ // This looks more complicated than it is, because of avoiding mutating the collection
+ // while iterating; instead, buffer any removals in a scratch buffer, and remove them in a second pass.
+ EndPoint[] scratch = [];
+ int count = 0;
+ foreach (var server in _servers)
+ {
+ if (server.Value.IsSubscriberConnected)
+ {
+ // flag for removal
+ if (scratch.Length == count) // need to resize the scratch buffer, using the pool
+ {
+ // let the array pool worry about min-sizing etc
+ var newLease = ArrayPool.Shared.Rent(count + 1);
+ scratch.CopyTo(newLease, 0);
+ ArrayPool.Shared.Return(scratch);
+ scratch = newLease;
+ }
+
+ scratch[count++] = server.Key;
+ }
+ }
+
+ // did we find anything to remove?
+ if (count != 0)
+ {
+ foreach (var ep in scratch.AsSpan(0, count))
+ {
+ _servers.TryRemove(ep, out _);
+ }
+ }
+
+ ArrayPool.Shared.Return(scratch);
+ }
+
+ internal override int EnsureSubscribedToServer(
+ RedisSubscriber subscriber,
+ in RedisChannel channel,
+ CommandFlags flags,
+ bool internalCall)
+ {
+ int delta = 0;
+ var muxer = subscriber.multiplexer;
+ foreach (var server in muxer.GetServerSnapshot())
+ {
+ // exclude sentinel, and only use replicas if we're explicitly asking for them
+ bool useReplica = (Flags & CommandFlags.DemandReplica) != 0;
+ if (server.ServerType != ServerType.Sentinel & server.IsReplica == useReplica)
+ {
+ if (!IsConnectedTo(server.EndPoint))
+ {
+ var message = GetSubscriptionMessage(channel, SubscriptionAction.Subscribe, flags, internalCall);
+ subscriber.ExecuteSync(message, Processor, server);
+ delta++;
+ }
+ }
+ }
+
+ return delta;
+ }
+
+ internal override async Task EnsureSubscribedToServerAsync(
+ RedisSubscriber subscriber,
+ RedisChannel channel,
+ CommandFlags flags,
+ bool internalCall,
+ ServerEndPoint? server = null)
+ {
+ int delta = 0;
+ var muxer = subscriber.multiplexer;
+ var snapshot = muxer.GetServerSnaphotMemory();
+ var len = snapshot.Length;
+ for (int i = 0; i < len; i++)
+ {
+ var loopServer = snapshot.Span[i]; // spans and async do not mix well
+ if (server is null || server == loopServer)
+ {
+ // exclude sentinel, and only use replicas if we're explicitly asking for them
+ bool useReplica = (Flags & CommandFlags.DemandReplica) != 0;
+ if (loopServer.ServerType != ServerType.Sentinel & loopServer.IsReplica == useReplica)
+ {
+ if (!IsConnectedTo(loopServer.EndPoint))
+ {
+ var message = GetSubscriptionMessage(channel, SubscriptionAction.Subscribe, flags, internalCall);
+ await subscriber.ExecuteAsync(message, Processor, loopServer).ForAwait();
+ delta++;
+ }
+ }
+ }
+ }
+
+ return delta;
+ }
+
+ internal override bool UnsubscribeFromServer(
+ RedisSubscriber subscriber,
+ in RedisChannel channel,
+ CommandFlags flags,
+ bool internalCall)
+ {
+ bool any = false;
+ foreach (var server in _servers)
+ {
+ var message = GetSubscriptionMessage(channel, SubscriptionAction.Unsubscribe, flags, internalCall);
+ any |= subscriber.ExecuteSync(message, Processor, server.Value);
+ }
+
+ return any;
+ }
+
+ internal override async Task UnsubscribeFromServerAsync(
+ RedisSubscriber subscriber,
+ RedisChannel channel,
+ CommandFlags flags,
+ object? asyncState,
+ bool internalCall)
+ {
+ bool any = false;
+ foreach (var server in _servers)
+ {
+ var message = GetSubscriptionMessage(channel, SubscriptionAction.Unsubscribe, flags, internalCall);
+ any |= await subscriber.ExecuteAsync(message, Processor, server.Value).ForAwait();
+ }
+
+ return any;
+ }
+ }
+}
diff --git a/tests/StackExchange.Redis.Tests/FailoverTests.cs b/tests/StackExchange.Redis.Tests/FailoverTests.cs
index 1f33275b5..33b24f16f 100644
--- a/tests/StackExchange.Redis.Tests/FailoverTests.cs
+++ b/tests/StackExchange.Redis.Tests/FailoverTests.cs
@@ -263,7 +263,7 @@ public async Task SubscriptionsSurviveConnectionFailureAsync()
foreach (var pair in muxerSubs)
{
var muxerSub = pair.Value;
- Log($" Muxer Sub: {pair.Key}: (EndPoint: {muxerSub.GetCurrentServer()}, Connected: {muxerSub.IsConnected})");
+ Log($" Muxer Sub: {pair.Key}: (EndPoint: {muxerSub.GetAnyCurrentServer()}, Connected: {muxerSub.IsConnectedAny()})");
}
Log("Publishing");
diff --git a/tests/StackExchange.Redis.Tests/FastHashTests.cs b/tests/StackExchange.Redis.Tests/FastHashTests.cs
index 418198cfd..a032cfc80 100644
--- a/tests/StackExchange.Redis.Tests/FastHashTests.cs
+++ b/tests/StackExchange.Redis.Tests/FastHashTests.cs
@@ -2,13 +2,14 @@
using System.Runtime.InteropServices;
using System.Text;
using Xunit;
+using Xunit.Sdk;
#pragma warning disable CS8981, SA1134, SA1300, SA1303, SA1502 // names are weird in this test!
// ReSharper disable InconsistentNaming - to better represent expected literals
// ReSharper disable IdentifierTypo
namespace StackExchange.Redis.Tests;
-public partial class FastHashTests
+public partial class FastHashTests(ITestOutputHelper log)
{
// note: if the hashing algorithm changes, we can update the last parameter freely; it doesn't matter
// what it *is* - what matters is that we can see that it has entropy between different values
@@ -83,6 +84,46 @@ public void FastHashIs_Long()
Assert.False(abcdefghijklmnopqrst.Is(hash, value));
}
+ [Fact]
+ public void KeyNotificationTypeFastHash_MinMaxBytes_ReflectsActualLengths()
+ {
+ // Use reflection to find all nested types in KeyNotificationTypeFastHash
+ var fastHashType = typeof(KeyNotificationTypeFastHash);
+ var nestedTypes = fastHashType.GetNestedTypes(System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Static);
+
+ int? minLength = null;
+ int? maxLength = null;
+
+ foreach (var nestedType in nestedTypes)
+ {
+ // Look for the Length field (generated by FastHash source generator)
+ var lengthField = nestedType.GetField("Length", System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Static);
+ if (lengthField != null && lengthField.FieldType == typeof(int))
+ {
+ var length = (int)lengthField.GetValue(null)!;
+
+ if (minLength == null || length < minLength)
+ {
+ minLength = length;
+ }
+
+ if (maxLength == null || length > maxLength)
+ {
+ maxLength = length;
+ }
+ }
+ }
+
+ // Assert that we found at least some nested types with Length fields
+ Assert.NotNull(minLength);
+ Assert.NotNull(maxLength);
+
+ // Assert that MinBytes and MaxBytes match the actual min/max lengths
+ log.WriteLine($"MinBytes: {KeyNotificationTypeFastHash.MinBytes}, MaxBytes: {KeyNotificationTypeFastHash.MaxBytes}");
+ Assert.Equal(KeyNotificationTypeFastHash.MinBytes, minLength.Value);
+ Assert.Equal(KeyNotificationTypeFastHash.MaxBytes, maxLength.Value);
+ }
+
[FastHash] private static partial class a { }
[FastHash] private static partial class ab { }
[FastHash] private static partial class abc { }
diff --git a/tests/StackExchange.Redis.Tests/KeyNotificationTests.cs b/tests/StackExchange.Redis.Tests/KeyNotificationTests.cs
new file mode 100644
index 000000000..b9548eb44
--- /dev/null
+++ b/tests/StackExchange.Redis.Tests/KeyNotificationTests.cs
@@ -0,0 +1,500 @@
+using System;
+using System.Buffers;
+using System.Text;
+using Xunit;
+
+namespace StackExchange.Redis.Tests;
+
+public class KeyNotificationTests(ITestOutputHelper log)
+{
+ [Fact]
+ public void Keyspace_Del_ParsesCorrectly()
+ {
+ // __keyspace@1__:mykey with payload "del"
+ var channel = RedisChannel.Literal("__keyspace@1__:mykey");
+ RedisValue value = "del";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeySpace);
+ Assert.False(notification.IsKeyEvent);
+ Assert.Equal(1, notification.Database);
+ Assert.Equal(KeyNotificationType.Del, notification.Type);
+ Assert.Equal("mykey", (string?)notification.GetKey());
+ Assert.Equal(5, notification.KeyByteCount);
+ }
+
+ [Fact]
+ public void Keyevent_Del_ParsesCorrectly()
+ {
+ // __keyevent@42__:del with value "mykey"
+ var channel = RedisChannel.Literal("__keyevent@42__:del");
+ RedisValue value = "mykey";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.False(notification.IsKeySpace);
+ Assert.True(notification.IsKeyEvent);
+ Assert.Equal(42, notification.Database);
+ Assert.Equal(KeyNotificationType.Del, notification.Type);
+ Assert.Equal("mykey", (string?)notification.GetKey());
+ Assert.Equal(5, notification.KeyByteCount);
+ }
+
+ [Fact]
+ public void Keyspace_Set_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyspace@0__:testkey");
+ RedisValue value = "set";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeySpace);
+ Assert.Equal(0, notification.Database);
+ Assert.Equal(KeyNotificationType.Set, notification.Type);
+ Assert.Equal("testkey", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyevent_Expire_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyevent@5__:expire");
+ RedisValue value = "session:12345";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeyEvent);
+ Assert.Equal(5, notification.Database);
+ Assert.Equal(KeyNotificationType.Expire, notification.Type);
+ Assert.Equal("session:12345", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyspace_Expired_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyspace@3__:cache:item");
+ RedisValue value = "expired";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeySpace);
+ Assert.Equal(3, notification.Database);
+ Assert.Equal(KeyNotificationType.Expired, notification.Type);
+ Assert.Equal("cache:item", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyevent_LPush_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyevent@0__:lpush");
+ RedisValue value = "queue:tasks";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeyEvent);
+ Assert.Equal(0, notification.Database);
+ Assert.Equal(KeyNotificationType.LPush, notification.Type);
+ Assert.Equal("queue:tasks", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyspace_HSet_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyspace@2__:user:1000");
+ RedisValue value = "hset";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeySpace);
+ Assert.Equal(2, notification.Database);
+ Assert.Equal(KeyNotificationType.HSet, notification.Type);
+ Assert.Equal("user:1000", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyevent_ZAdd_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyevent@7__:zadd");
+ RedisValue value = "leaderboard";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeyEvent);
+ Assert.Equal(7, notification.Database);
+ Assert.Equal(KeyNotificationType.ZAdd, notification.Type);
+ Assert.Equal("leaderboard", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void TryCopyKey_WorksCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyspace@0__:testkey");
+ RedisValue value = "set";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ var lease = ArrayPool.Shared.Rent(20);
+ Span buffer = lease.AsSpan(0, 20);
+ Assert.True(notification.TryCopyKey(buffer, out var bytesWritten));
+ Assert.Equal(7, bytesWritten);
+ Assert.Equal("testkey", Encoding.UTF8.GetString(lease, 0, bytesWritten));
+ ArrayPool.Shared.Return(lease);
+ }
+
+ [Fact]
+ public void TryCopyKey_FailsWithSmallBuffer()
+ {
+ var channel = RedisChannel.Literal("__keyspace@0__:testkey");
+ RedisValue value = "set";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Span buffer = stackalloc byte[3]; // too small
+ Assert.False(notification.TryCopyKey(buffer, out var bytesWritten));
+ Assert.Equal(0, bytesWritten);
+ }
+
+ [Fact]
+ public void InvalidChannel_ReturnsFalse()
+ {
+ var channel = RedisChannel.Literal("regular:channel");
+ RedisValue value = "data";
+
+ Assert.False(KeyNotification.TryParse(in channel, in value, out var notification));
+ }
+
+ [Fact]
+ public void InvalidKeyspaceChannel_MissingDelimiter_ReturnsFalse()
+ {
+ var channel = RedisChannel.Literal("__keyspace@0__"); // missing the key part
+ RedisValue value = "set";
+
+ Assert.False(KeyNotification.TryParse(in channel, in value, out var notification));
+ }
+
+ [Fact]
+ public void Keyspace_UnknownEventType_ReturnsUnknown()
+ {
+ var channel = RedisChannel.Literal("__keyspace@0__:mykey");
+ RedisValue value = "unknownevent";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeySpace);
+ Assert.Equal(0, notification.Database);
+ Assert.Equal(KeyNotificationType.Unknown, notification.Type);
+ Assert.Equal("mykey", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyevent_UnknownEventType_ReturnsUnknown()
+ {
+ var channel = RedisChannel.Literal("__keyevent@0__:unknownevent");
+ RedisValue value = "mykey";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeyEvent);
+ Assert.Equal(0, notification.Database);
+ Assert.Equal(KeyNotificationType.Unknown, notification.Type);
+ Assert.Equal("mykey", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyspace_WithColonInKey_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyspace@0__:user:session:12345");
+ RedisValue value = "del";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeySpace);
+ Assert.Equal(0, notification.Database);
+ Assert.Equal(KeyNotificationType.Del, notification.Type);
+ Assert.Equal("user:session:12345", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyevent_Evicted_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyevent@1__:evicted");
+ RedisValue value = "cache:old";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeyEvent);
+ Assert.Equal(1, notification.Database);
+ Assert.Equal(KeyNotificationType.Evicted, notification.Type);
+ Assert.Equal("cache:old", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyspace_New_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyspace@0__:newkey");
+ RedisValue value = "new";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeySpace);
+ Assert.Equal(0, notification.Database);
+ Assert.Equal(KeyNotificationType.New, notification.Type);
+ Assert.Equal("newkey", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyevent_XGroupCreate_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyevent@0__:xgroup-create");
+ RedisValue value = "mystream";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeyEvent);
+ Assert.Equal(0, notification.Database);
+ Assert.Equal(KeyNotificationType.XGroupCreate, notification.Type);
+ Assert.Equal("mystream", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyspace_TypeChanged_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyspace@0__:mykey");
+ RedisValue value = "type_changed";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeySpace);
+ Assert.Equal(0, notification.Database);
+ Assert.Equal(KeyNotificationType.TypeChanged, notification.Type);
+ Assert.Equal("mykey", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyevent_HighDatabaseNumber_ParsesCorrectly()
+ {
+ var channel = RedisChannel.Literal("__keyevent@999__:set");
+ RedisValue value = "testkey";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeyEvent);
+ Assert.Equal(999, notification.Database);
+ Assert.Equal(KeyNotificationType.Set, notification.Type);
+ Assert.Equal("testkey", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void Keyevent_NonIntegerDatabase_ParsesWellEnough()
+ {
+ var channel = RedisChannel.Literal("__keyevent@abc__:set");
+ RedisValue value = "testkey";
+
+ Assert.True(KeyNotification.TryParse(in channel, in value, out var notification));
+
+ Assert.True(notification.IsKeyEvent);
+ Assert.Equal(-1, notification.Database);
+ Assert.Equal(KeyNotificationType.Set, notification.Type);
+ Assert.Equal("testkey", (string?)notification.GetKey());
+ }
+
+ [Fact]
+ public void DefaultKeyNotification_HasExpectedProperties()
+ {
+ var notification = default(KeyNotification);
+
+ Assert.False(notification.IsKeySpace);
+ Assert.False(notification.IsKeyEvent);
+ Assert.Equal(-1, notification.Database);
+ Assert.Equal(KeyNotificationType.Unknown, notification.Type);
+ Assert.True(notification.GetKey().IsNull);
+ Assert.Equal(0, notification.KeyByteCount);
+ Assert.True(notification.Channel.IsNull);
+ Assert.True(notification.Value.IsNull);
+
+ // TryCopyKey should return false and write 0 bytes
+ Span buffer = stackalloc byte[10];
+ Assert.False(notification.TryCopyKey(buffer, out var bytesWritten));
+ Assert.Equal(0, bytesWritten);
+ }
+
+ [Theory]
+ [InlineData(KeyNotificationTypeFastHash.append.Text, KeyNotificationType.Append)]
+ [InlineData(KeyNotificationTypeFastHash.copy.Text, KeyNotificationType.Copy)]
+ [InlineData(KeyNotificationTypeFastHash.del.Text, KeyNotificationType.Del)]
+ [InlineData(KeyNotificationTypeFastHash.expire.Text, KeyNotificationType.Expire)]
+ [InlineData(KeyNotificationTypeFastHash.hdel.Text, KeyNotificationType.HDel)]
+ [InlineData(KeyNotificationTypeFastHash.hexpired.Text, KeyNotificationType.HExpired)]
+ [InlineData(KeyNotificationTypeFastHash.hincrbyfloat.Text, KeyNotificationType.HIncrByFloat)]
+ [InlineData(KeyNotificationTypeFastHash.hincrby.Text, KeyNotificationType.HIncrBy)]
+ [InlineData(KeyNotificationTypeFastHash.hpersist.Text, KeyNotificationType.HPersist)]
+ [InlineData(KeyNotificationTypeFastHash.hset.Text, KeyNotificationType.HSet)]
+ [InlineData(KeyNotificationTypeFastHash.incrbyfloat.Text, KeyNotificationType.IncrByFloat)]
+ [InlineData(KeyNotificationTypeFastHash.incrby.Text, KeyNotificationType.IncrBy)]
+ [InlineData(KeyNotificationTypeFastHash.linsert.Text, KeyNotificationType.LInsert)]
+ [InlineData(KeyNotificationTypeFastHash.lpop.Text, KeyNotificationType.LPop)]
+ [InlineData(KeyNotificationTypeFastHash.lpush.Text, KeyNotificationType.LPush)]
+ [InlineData(KeyNotificationTypeFastHash.lrem.Text, KeyNotificationType.LRem)]
+ [InlineData(KeyNotificationTypeFastHash.lset.Text, KeyNotificationType.LSet)]
+ [InlineData(KeyNotificationTypeFastHash.ltrim.Text, KeyNotificationType.LTrim)]
+ [InlineData(KeyNotificationTypeFastHash.move_from.Text, KeyNotificationType.MoveFrom)]
+ [InlineData(KeyNotificationTypeFastHash.move_to.Text, KeyNotificationType.MoveTo)]
+ [InlineData(KeyNotificationTypeFastHash.persist.Text, KeyNotificationType.Persist)]
+ [InlineData(KeyNotificationTypeFastHash.rename_from.Text, KeyNotificationType.RenameFrom)]
+ [InlineData(KeyNotificationTypeFastHash.rename_to.Text, KeyNotificationType.RenameTo)]
+ [InlineData(KeyNotificationTypeFastHash.restore.Text, KeyNotificationType.Restore)]
+ [InlineData(KeyNotificationTypeFastHash.rpop.Text, KeyNotificationType.RPop)]
+ [InlineData(KeyNotificationTypeFastHash.rpush.Text, KeyNotificationType.RPush)]
+ [InlineData(KeyNotificationTypeFastHash.sadd.Text, KeyNotificationType.SAdd)]
+ [InlineData(KeyNotificationTypeFastHash.set.Text, KeyNotificationType.Set)]
+ [InlineData(KeyNotificationTypeFastHash.setrange.Text, KeyNotificationType.SetRange)]
+ [InlineData(KeyNotificationTypeFastHash.sortstore.Text, KeyNotificationType.SortStore)]
+ [InlineData(KeyNotificationTypeFastHash.srem.Text, KeyNotificationType.SRem)]
+ [InlineData(KeyNotificationTypeFastHash.spop.Text, KeyNotificationType.SPop)]
+ [InlineData(KeyNotificationTypeFastHash.xadd.Text, KeyNotificationType.XAdd)]
+ [InlineData(KeyNotificationTypeFastHash.xdel.Text, KeyNotificationType.XDel)]
+ [InlineData(KeyNotificationTypeFastHash.xgroup_createconsumer.Text, KeyNotificationType.XGroupCreateConsumer)]
+ [InlineData(KeyNotificationTypeFastHash.xgroup_create.Text, KeyNotificationType.XGroupCreate)]
+ [InlineData(KeyNotificationTypeFastHash.xgroup_delconsumer.Text, KeyNotificationType.XGroupDelConsumer)]
+ [InlineData(KeyNotificationTypeFastHash.xgroup_destroy.Text, KeyNotificationType.XGroupDestroy)]
+ [InlineData(KeyNotificationTypeFastHash.xgroup_setid.Text, KeyNotificationType.XGroupSetId)]
+ [InlineData(KeyNotificationTypeFastHash.xsetid.Text, KeyNotificationType.XSetId)]
+ [InlineData(KeyNotificationTypeFastHash.xtrim.Text, KeyNotificationType.XTrim)]
+ [InlineData(KeyNotificationTypeFastHash.zadd.Text, KeyNotificationType.ZAdd)]
+ [InlineData(KeyNotificationTypeFastHash.zdiffstore.Text, KeyNotificationType.ZDiffStore)]
+ [InlineData(KeyNotificationTypeFastHash.zinterstore.Text, KeyNotificationType.ZInterStore)]
+ [InlineData(KeyNotificationTypeFastHash.zunionstore.Text, KeyNotificationType.ZUnionStore)]
+ [InlineData(KeyNotificationTypeFastHash.zincr.Text, KeyNotificationType.ZIncr)]
+ [InlineData(KeyNotificationTypeFastHash.zrembyrank.Text, KeyNotificationType.ZRemByRank)]
+ [InlineData(KeyNotificationTypeFastHash.zrembyscore.Text, KeyNotificationType.ZRemByScore)]
+ [InlineData(KeyNotificationTypeFastHash.zrem.Text, KeyNotificationType.ZRem)]
+ [InlineData(KeyNotificationTypeFastHash.expired.Text, KeyNotificationType.Expired)]
+ [InlineData(KeyNotificationTypeFastHash.evicted.Text, KeyNotificationType.Evicted)]
+ [InlineData(KeyNotificationTypeFastHash._new.Text, KeyNotificationType.New)]
+ [InlineData(KeyNotificationTypeFastHash.overwritten.Text, KeyNotificationType.Overwritten)]
+ [InlineData(KeyNotificationTypeFastHash.type_changed.Text, KeyNotificationType.TypeChanged)]
+ public unsafe void FastHashParse_AllKnownValues_ParseCorrectly(string raw, KeyNotificationType parsed)
+ {
+ var arr = ArrayPool.Shared.Rent(Encoding.UTF8.GetMaxByteCount(raw.Length));
+ int bytes;
+ fixed (byte* bPtr = arr) // encode into the buffer
+ {
+ fixed (char* cPtr = raw)
+ {
+ bytes = Encoding.UTF8.GetBytes(cPtr, raw.Length, bPtr, arr.Length);
+ }
+ }
+
+ var result = KeyNotificationTypeFastHash.Parse(arr.AsSpan(0, bytes));
+ log.WriteLine($"Parsed '{raw}' as {result}");
+ Assert.Equal(parsed, result);
+
+ // and the other direction:
+ var fetchedBytes = KeyNotificationTypeFastHash.GetRawBytes(parsed);
+ string fetched;
+ fixed (byte* bPtr = fetchedBytes)
+ {
+ fetched = Encoding.UTF8.GetString(bPtr, fetchedBytes.Length);
+ }
+
+ log.WriteLine($"Fetched '{raw}'");
+ Assert.Equal(raw, fetched);
+
+ ArrayPool.Shared.Return(arr);
+ }
+
+ [Fact]
+ public void CreateKeySpaceNotification_Valid()
+ {
+ var channel = RedisChannel.KeySpace("abc", 42);
+ Assert.Equal("__keyspace@42__:abc", channel.ToString());
+ Assert.False(channel.IsMultiNode);
+ Assert.True(channel.IsKeyRouted);
+ Assert.False(channel.IsSharded);
+ Assert.False(channel.IsPattern);
+ }
+
+ [Theory]
+ [InlineData(null, null, "__keyspace@*__:*")]
+ [InlineData("abc*", null, "__keyspace@*__:abc*")]
+ [InlineData(null, 42, "__keyspace@42__:*")]
+ [InlineData("abc*", 42, "__keyspace@42__:abc*")]
+ public void CreateKeySpaceNotificationPattern(string? pattern, int? database, string expected)
+ {
+ var channel = RedisChannel.KeySpacePattern(pattern, database);
+ Assert.Equal(expected, channel.ToString());
+ Assert.True(channel.IsMultiNode);
+ Assert.False(channel.IsKeyRouted);
+ Assert.False(channel.IsSharded);
+ Assert.True(channel.IsPattern);
+ }
+
+ [Theory]
+ [InlineData(KeyNotificationType.Set, null, "__keyevent@*__:set", true)]
+ [InlineData(KeyNotificationType.XGroupCreate, null, "__keyevent@*__:xgroup-create", true)]
+ [InlineData(KeyNotificationType.Set, 42, "__keyevent@42__:set", false)]
+ [InlineData(KeyNotificationType.XGroupCreate, 42, "__keyevent@42__:xgroup-create", false)]
+ public void CreateKeyEventNotification(KeyNotificationType type, int? database, string expected, bool isPattern)
+ {
+ var channel = RedisChannel.KeyEvent(type, database);
+ Assert.Equal(expected, channel.ToString());
+ Assert.True(channel.IsMultiNode);
+ Assert.False(channel.IsKeyRouted);
+ Assert.False(channel.IsSharded);
+ if (isPattern)
+ {
+ Assert.True(channel.IsPattern);
+ }
+ else
+ {
+ Assert.False(channel.IsPattern);
+ }
+ }
+
+ [Fact]
+ public void Cannot_KeyRoute_KeySpace_SingleKeyIsKeyRouted()
+ {
+ var channel = RedisChannel.KeySpace("abc", 42);
+ Assert.False(channel.IsMultiNode);
+ Assert.True(channel.IsKeyRouted);
+ Assert.True(channel.WithKeyRouting().IsKeyRouted); // no change, still key-routed
+ Assert.Equal(RedisCommand.PUBLISH, channel.GetPublishCommand());
+ }
+
+ [Fact]
+ public void Cannot_KeyRoute_KeySpacePattern()
+ {
+ var channel = RedisChannel.KeySpacePattern("abc", 42);
+ Assert.True(channel.IsMultiNode);
+ Assert.False(channel.IsKeyRouted);
+ Assert.StartsWith("Key routing is not supported for multi-node channels", Assert.Throws(() => channel.WithKeyRouting()).Message);
+ Assert.StartsWith("Publishing is not supported for multi-node channels", Assert.Throws(() => channel.GetPublishCommand()).Message);
+ }
+
+ [Fact]
+ public void Cannot_KeyRoute_KeyEvent()
+ {
+ var channel = RedisChannel.KeyEvent(KeyNotificationType.Set, 42);
+ Assert.True(channel.IsMultiNode);
+ Assert.False(channel.IsKeyRouted);
+ Assert.StartsWith("Key routing is not supported for multi-node channels", Assert.Throws(() => channel.WithKeyRouting()).Message);
+ Assert.StartsWith("Publishing is not supported for multi-node channels", Assert.Throws(() => channel.GetPublishCommand()).Message);
+ }
+
+ [Fact]
+ public void Cannot_KeyRoute_KeyEvent_Custom()
+ {
+ var channel = RedisChannel.KeyEvent("foo"u8, 42);
+ Assert.True(channel.IsMultiNode);
+ Assert.False(channel.IsKeyRouted);
+ Assert.StartsWith("Key routing is not supported for multi-node channels", Assert.Throws(() => channel.WithKeyRouting()).Message);
+ Assert.StartsWith("Publishing is not supported for multi-node channels", Assert.Throws(() => channel.GetPublishCommand()).Message);
+ }
+
+ [Fact]
+ public void KeyEventPrefix_KeySpacePrefix_Length_Matches()
+ {
+ // this is a sanity check for the parsing step in KeyNotification.TryParse
+ Assert.Equal(KeyNotificationChannels.KeySpacePrefix.Length, KeyNotificationChannels.KeyEventPrefix.Length);
+ }
+}
diff --git a/tests/StackExchange.Redis.Tests/PubSubKeyNotificationTests.cs b/tests/StackExchange.Redis.Tests/PubSubKeyNotificationTests.cs
new file mode 100644
index 000000000..d99c687dc
--- /dev/null
+++ b/tests/StackExchange.Redis.Tests/PubSubKeyNotificationTests.cs
@@ -0,0 +1,162 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace StackExchange.Redis.Tests;
+
+public sealed class PubSubKeyNotificationTestsCluster(ITestOutputHelper output, SharedConnectionFixture fixture)
+ : PubSubKeyNotificationTests(output, fixture)
+{
+ protected override string GetConfiguration() => TestConfig.Current.ClusterServersAndPorts;
+}
+
+public sealed class PubSubKeyNotificationTestsStandalone(ITestOutputHelper output, SharedConnectionFixture fixture)
+ : PubSubKeyNotificationTests(output, fixture)
+{
+}
+
+public abstract class PubSubKeyNotificationTests(ITestOutputHelper output, SharedConnectionFixture? fixture = null)
+ : TestBase(output, fixture)
+{
+ private const int DefaultKeyCount = 10;
+ private const int DefaultEventCount = 512;
+
+ private RedisKey[] InventKeys(int count = DefaultKeyCount)
+ {
+ RedisKey[] keys = new RedisKey[count];
+ for (int i = 0; i < count; i++)
+ {
+ keys[i] = Guid.NewGuid().ToString();
+ }
+ return keys;
+ }
+
+ private RedisKey SelectKey(RedisKey[] keys) => keys[SharedRandom.Next(0, keys.Length)];
+
+#if NET6_0_OR_GREATER
+ private static Random SharedRandom => Random.Shared;
+#else
+ private static Random SharedRandom { get; } = new();
+#endif
+
+ [Fact]
+ public async Task KeySpace_Events_Enabled()
+ {
+ // see https://redis.io/docs/latest/develop/pubsub/keyspace-notifications/#configuration
+ await using var conn = Create(allowAdmin: true);
+ int failures = 0;
+ foreach (var ep in conn.GetEndPoints())
+ {
+ var server = conn.GetServer(ep);
+ var config = (await server.ConfigGetAsync("notify-keyspace-events")).Single();
+ Log($"[{Format.ToString(ep)}] notify-keyspace-events: '{config.Value}'");
+
+ // this is a very broad config, but it's what we use in CI (and probably a common basic config)
+ if (config.Value != "AKE")
+ {
+ failures++;
+ }
+ }
+ // for details, check the log output
+ Assert.Equal(0, failures);
+ }
+
+ [Fact]
+ public async Task KeySpace_CanSubscribe_ManualPublish()
+ {
+ await using var conn = Create();
+ var db = conn.GetDatabase();
+
+ var channel = RedisChannel.KeyEvent("nonesuch"u8, database: null);
+ var sub = conn.GetSubscriber();
+ await sub.UnsubscribeAsync(channel);
+
+ int count = 0;
+ await sub.SubscribeAsync(channel, (_, _) => Interlocked.Increment(ref count));
+
+ // to publish, we need to remove the marker that this is a multi-node channel
+ var asLiteral = RedisChannel.Literal(channel.ToString());
+ await sub.PublishAsync(asLiteral, Guid.NewGuid().ToString());
+
+ int expected = GetConnectedCount(conn, channel);
+ await Task.Delay(100).ForAwait();
+ Assert.Equal(expected, count);
+ }
+
+ // this looks past the horizon to see how many connections we actually have for a given channel,
+ // which could be more than 1 in a cluster scenario
+ private static int GetConnectedCount(IConnectionMultiplexer muxer, in RedisChannel channel)
+ => muxer is ConnectionMultiplexer typed && typed.TryGetSubscription(channel, out var sub)
+ ? sub.GetConnectionCount() : 1;
+
+ [Fact]
+ public async Task KeyEvent_CanObserveSimple_ViaCallbackHandler()
+ {
+ await using var conn = Create();
+ var db = conn.GetDatabase();
+
+ var keys = InventKeys();
+ var channel = RedisChannel.KeyEvent(KeyNotificationType.SAdd);
+ var sub = conn.GetSubscriber();
+ await sub.UnsubscribeAsync(channel);
+ HashSet observedKeys = [];
+ int count = 0, callbackCount = 0;
+ TaskCompletionSource allDone = new();
+ await sub.SubscribeAsync(channel, (recvChannel, recvValue) =>
+ {
+ Interlocked.Increment(ref callbackCount);
+ if (KeyNotification.TryParse(in recvChannel, in recvValue, out var notification)
+ && notification is { IsKeyEvent: true, Type: KeyNotificationType.SAdd })
+ {
+ var recvKey = notification.GetKey();
+ lock (observedKeys)
+ {
+ int currentCount = ++count;
+ var newKey = observedKeys.Add(recvKey);
+ if (newKey)
+ {
+ Log($"Observed key: '{recvKey}' after {currentCount} events");
+ }
+
+ if (currentCount == DefaultEventCount)
+ {
+ allDone.TrySetResult(true);
+ }
+ }
+ }
+ });
+
+ await Task.Delay(300).ForAwait(); // give it a moment to settle
+
+ HashSet sentKeys = new(keys.Length);
+ for (int i = 0; i < DefaultEventCount; i++)
+ {
+ var key = SelectKey(keys);
+ await db.SetAddAsync(key, i);
+ sentKeys.Add(key); // just in case Random has a bad day (obvious Dilbert link is obvious)
+ }
+
+ // Wait for all events to be observed
+ try
+ {
+ Assert.True(await allDone.Task.WithTimeout(5000));
+ }
+ catch (TimeoutException ex)
+ {
+ // if this is zero, the real problem is probably ala KeySpace_Events_Enabled
+ throw new TimeoutException($"Timeout; {Volatile.Read(ref callbackCount)} events observed", ex);
+ }
+
+ lock (observedKeys)
+ {
+ Assert.Equal(sentKeys.Count, observedKeys.Count);
+ foreach (var key in sentKeys)
+ {
+ Assert.Contains(key, observedKeys);
+ }
+ }
+ }
+}
diff --git a/tests/StackExchange.Redis.Tests/PubSubMultiserverTests.cs b/tests/StackExchange.Redis.Tests/PubSubMultiserverTests.cs
index 43bb4b2b8..691232218 100644
--- a/tests/StackExchange.Redis.Tests/PubSubMultiserverTests.cs
+++ b/tests/StackExchange.Redis.Tests/PubSubMultiserverTests.cs
@@ -63,7 +63,7 @@ await sub.SubscribeAsync(channel, (_, val) =>
Assert.True(subscribedServerEndpoint.IsSubscriberConnected, "subscribedServerEndpoint.IsSubscriberConnected");
Assert.True(conn.GetSubscriptions().TryGetValue(channel, out var subscription));
- var initialServer = subscription.GetCurrentServer();
+ var initialServer = subscription.GetAnyCurrentServer();
Assert.NotNull(initialServer);
Assert.True(initialServer.IsConnected);
Log("Connected to: " + initialServer);
@@ -83,10 +83,10 @@ await sub.SubscribeAsync(channel, (_, val) =>
Assert.True(subscribedServerEndpoint.IsConnected, "subscribedServerEndpoint.IsConnected");
Assert.False(subscribedServerEndpoint.IsSubscriberConnected, "subscribedServerEndpoint.IsSubscriberConnected");
}
- await UntilConditionAsync(TimeSpan.FromSeconds(5), () => subscription.IsConnected);
- Assert.True(subscription.IsConnected);
+ await UntilConditionAsync(TimeSpan.FromSeconds(5), () => subscription.IsConnectedAny());
+ Assert.True(subscription.IsConnectedAny());
- var newServer = subscription.GetCurrentServer();
+ var newServer = subscription.GetAnyCurrentServer();
Assert.NotNull(newServer);
Assert.NotEqual(newServer, initialServer);
Log("Now connected to: " + newServer);
@@ -148,7 +148,7 @@ await sub.SubscribeAsync(
Assert.True(subscribedServerEndpoint.IsSubscriberConnected, "subscribedServerEndpoint.IsSubscriberConnected");
Assert.True(conn.GetSubscriptions().TryGetValue(channel, out var subscription));
- var initialServer = subscription.GetCurrentServer();
+ var initialServer = subscription.GetAnyCurrentServer();
Assert.NotNull(initialServer);
Assert.True(initialServer.IsConnected);
Log("Connected to: " + initialServer);
@@ -169,10 +169,10 @@ await sub.SubscribeAsync(
if (expectSuccess)
{
- await UntilConditionAsync(TimeSpan.FromSeconds(5), () => subscription.IsConnected);
- Assert.True(subscription.IsConnected);
+ await UntilConditionAsync(TimeSpan.FromSeconds(5), () => subscription.IsConnectedAny());
+ Assert.True(subscription.IsConnectedAny());
- var newServer = subscription.GetCurrentServer();
+ var newServer = subscription.GetAnyCurrentServer();
Assert.NotNull(newServer);
Assert.NotEqual(newServer, initialServer);
Log("Now connected to: " + newServer);
@@ -180,16 +180,16 @@ await sub.SubscribeAsync(
else
{
// This subscription shouldn't be able to reconnect by flags (demanding an unavailable server)
- await UntilConditionAsync(TimeSpan.FromSeconds(5), () => subscription.IsConnected);
- Assert.False(subscription.IsConnected);
+ await UntilConditionAsync(TimeSpan.FromSeconds(5), () => subscription.IsConnectedAny());
+ Assert.False(subscription.IsConnectedAny());
Log("Unable to reconnect (as expected)");
// Allow connecting back to the original
conn.AllowConnect = true;
- await UntilConditionAsync(TimeSpan.FromSeconds(5), () => subscription.IsConnected);
- Assert.True(subscription.IsConnected);
+ await UntilConditionAsync(TimeSpan.FromSeconds(5), () => subscription.IsConnectedAny());
+ Assert.True(subscription.IsConnectedAny());
- var newServer = subscription.GetCurrentServer();
+ var newServer = subscription.GetAnyCurrentServer();
Assert.NotNull(newServer);
Assert.Equal(newServer, initialServer);
Log("Now connected to: " + newServer);