From 5eae1a2200b989768df87b30da1bbfe8c49fe1cf Mon Sep 17 00:00:00 2001 From: Dennis Gu Date: Sun, 30 Nov 2025 21:47:46 +0800 Subject: [PATCH 1/5] Add edge node/device status tracking service --- .../Caches/CacheHelperTests.cs | 56 ++++ .../Caches/StatusTrackingServiceTests.cs | 315 ++++++++++++++++++ .../HostApplication/Caches/CacheHelper.cs | 39 +++ .../Caches/IMessageOrderingService.cs | 2 +- .../Caches/IStatusTrackingService.cs | 33 ++ .../Caches/StatusTrackingService.cs | 155 +++++++++ SparklerNet/SparklerNet.csproj | 1 + 7 files changed, 600 insertions(+), 1 deletion(-) create mode 100644 SparklerNet.Tests/HostApplication/Caches/CacheHelperTests.cs create mode 100644 SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs create mode 100644 SparklerNet/HostApplication/Caches/CacheHelper.cs create mode 100644 SparklerNet/HostApplication/Caches/IStatusTrackingService.cs create mode 100644 SparklerNet/HostApplication/Caches/StatusTrackingService.cs diff --git a/SparklerNet.Tests/HostApplication/Caches/CacheHelperTests.cs b/SparklerNet.Tests/HostApplication/Caches/CacheHelperTests.cs new file mode 100644 index 0000000..f1a07ee --- /dev/null +++ b/SparklerNet.Tests/HostApplication/Caches/CacheHelperTests.cs @@ -0,0 +1,56 @@ +using Xunit; +using SparklerNet.HostApplication.Caches; + +namespace SparklerNet.Tests.HostApplication.Caches; + +public class CacheHelperTests +{ + [Theory] + [InlineData("prefix:", "group1", "edge1", "device1", "prefix:group1:edge1:device1")] + [InlineData("status:", "group2", "edge2", "device2", "status:group2:edge2:device2")] + [InlineData(null, "group3", "edge3", "device3", "group3:edge3:device3")] + [InlineData("", "group4", "edge4", "device4", "group4:edge4:device4")] + public void BuildCacheKey_WithDeviceId_ReturnsCorrectFormat(string? prefix, string groupId, string edgeNodeId, string? deviceId, string expected) + { + var result = CacheHelper.BuildCacheKey(prefix, groupId, edgeNodeId, deviceId); + Assert.Equal(expected, result); + } + + [Theory] + [InlineData("prefix:", "group1", "edge1", null, "prefix:group1:edge1")] + [InlineData("status:", "group2", "edge2", null, "status:group2:edge2")] + [InlineData(null, "group3", "edge3", null, "group3:edge3")] + [InlineData("", "group4", "edge4", null, "group4:edge4")] + public void BuildCacheKey_WithoutDeviceId_ReturnsCorrectFormat(string? prefix, string groupId, string edgeNodeId, string? deviceId, string expected) + { + var result = CacheHelper.BuildCacheKey(prefix, groupId, edgeNodeId, deviceId); + Assert.Equal(expected, result); + } + + [Fact] + public void GetSemaphore_ReturnsSameInstanceForSameKey() + { + var semaphore1 = CacheHelper.GetSemaphore("group1", "edge1", "device1"); + var semaphore2 = CacheHelper.GetSemaphore("group1", "edge1", "device1"); + + Assert.Same(semaphore1, semaphore2); + } + + [Fact] + public void GetSemaphore_ReturnsDifferentInstancesForDifferentKeys() + { + var semaphore1 = CacheHelper.GetSemaphore("group1", "edge1", "device1"); + var semaphore2 = CacheHelper.GetSemaphore("group2", "edge2", "device2"); + + Assert.NotSame(semaphore1, semaphore2); + } + + [Fact] + public void GetSemaphore_WithAndWithoutDeviceId_ReturnsDifferentInstances() + { + var semaphore1 = CacheHelper.GetSemaphore("group1", "edge1", "device1"); + var semaphore2 = CacheHelper.GetSemaphore("group1", "edge1", null); + + Assert.NotSame(semaphore1, semaphore2); + } +} \ No newline at end of file diff --git a/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs b/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs new file mode 100644 index 0000000..4cd0872 --- /dev/null +++ b/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs @@ -0,0 +1,315 @@ +using System.Diagnostics.CodeAnalysis; +using Microsoft.Extensions.Caching.Hybrid; +using Microsoft.Extensions.DependencyInjection; +using SparklerNet.HostApplication.Caches; +using Xunit; + +namespace SparklerNet.Tests.HostApplication.Caches; + +[SuppressMessage("ReSharper", "ConvertToConstant.Local")] +public class StatusTrackingServiceTests +{ + private readonly StatusTrackingService _statusService; + + public StatusTrackingServiceTests() + { + var services = new ServiceCollection(); + services.AddHybridCache(); + + var serviceProvider = services.BuildServiceProvider(); + var cache = serviceProvider.GetRequiredService(); + _statusService = new StatusTrackingService(cache); + } + + [Fact] + public async Task IsOnline_WhenNoStatusInCache_ReturnsFalse() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + var result = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + + Assert.False(result); + } + + [Fact] + public async Task IsOnline_WithOnlineStatus_ReturnsTrue() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + + var edgeNodeStatus = await _statusService.IsOnline(groupId, edgeNodeId, null); + + Assert.True(edgeNodeStatus); + } + + [Fact] + public async Task IsOnline_WithOfflineStatus_ReturnsFalse() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); + + var result = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + + Assert.False(result); + } + + [Fact] + public async Task IsOnline_WithNullDeviceId_ReturnsCorrectStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + + var result = await _statusService.IsOnline(groupId, edgeNodeId, null); + + Assert.True(result); + } + + [Fact] + public async Task UpdateEdgeNodeOnlineStatus_WhenSettingOnlineStatus_CachesStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var isOnline = true; + var bdSeq = 1; + var timestamp = 1000L; + + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, isOnline, bdSeq, timestamp); + + var status = await _statusService.IsOnline(groupId, edgeNodeId, null); + + Assert.True(status); + } + + [Fact] + public async Task UpdateEdgeNodeOnlineStatus_WhenNewerTimestampUpdatesOlderTimestamp() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + + // Set initial status + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + + // Update with newer timestamp + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 2, 2000); + + var status = await _statusService.IsOnline(groupId, edgeNodeId, null); + + Assert.True(status); + } + + [Fact] + public async Task UpdateEdgeNodeOnlineStatus_WhenOlderTimestampDoesNotUpdateNewerTimestamp() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + + // Set initial status with newer timestamp + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 2, 2000); + + // Try to update with an older timestamp + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); + + // Should remain online + var status = await _statusService.IsOnline(groupId, edgeNodeId, null); + + Assert.True(status); + } + + [Fact] + public async Task UpdateEdgeNodeOnlineStatus_WhenSettingOfflineStatusWithSameBdSeqUpdatesOnlineStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var bdSeq = 1; + + // Set online status + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, bdSeq, 1000); + + // Set offline status with same bdSeq + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, bdSeq, 2000); + + var status = await _statusService.IsOnline(groupId, edgeNodeId, null); + + Assert.False(status); + } + + [Fact] + public async Task UpdateEdgeNodeOnlineStatus_WhenSettingOfflineStatusWithNewerTimestampUpdatesOfflineStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + + // Set offline status + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); + + // Update offline status with newer timestamp + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 2, 2000); + + var status = await _statusService.IsOnline(groupId, edgeNodeId, null); + + Assert.False(status); + } + + [Fact] + public async Task UpdateDeviceOnlineStatus_WhenSettingOnlineStatus_CachesStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + var isOnline = true; + var timestamp = 1000L; + + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, isOnline, timestamp); + + var status = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + + Assert.True(status); + } + + [Fact] + public async Task UpdateDeviceOnlineStatus_WhenNewerTimestampUpdatesOlderTimestamp() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + // Set initial status + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, false, 1000); + + // Update with newer timestamp + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + var status = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + + Assert.True(status); + } + + [Fact] + public async Task UpdateDeviceOnlineStatus_WhenOlderTimestampDoesNotUpdateNewerTimestamp() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + // Set initial status with newer timestamp + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + // Try to update with an older timestamp + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, false, 1000); + + // Should remain online + var status = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + + Assert.True(status); + } + + [Fact] + public async Task UpdateDeviceOnlineStatus_DeviceStatusIsIndependentOfEdgeNodeStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + // Set edge node as offline + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); + + // Set device as online + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + // Check statuses separately + var edgeNodeStatus = await _statusService.IsOnline(groupId, edgeNodeId, null); + var deviceStatus = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + + Assert.False(edgeNodeStatus); + Assert.True(deviceStatus); + } + + [Fact] + public async Task WhenEdgeNodeGoesOffline_AssociatedDevicesBecomeOffline() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + // Set EdgeNode online + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + + // Set Device online + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + // Device should be online when EdgeNode is online + var deviceStatusBefore = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + Assert.True(deviceStatusBefore, "Device should be online when EdgeNode is online"); + + // Set EdgeNode offline + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 3000); + + // Device should now be offline due to EdgeNode going offline + var deviceStatusAfter = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + Assert.False(deviceStatusAfter, "Device should be offline when EdgeNode goes offline"); + } + + [Fact] + public async Task WhenEdgeNodeComesOnlineAgain_DeviceStatusRequiresReset() + { + var groupId = "group20"; + var edgeNodeId = "edgeNode20"; + var deviceId = "device20"; + + // Set EdgeNode and Device online + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + // Verify both are online + Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, null), "EdgeNode should be online"); + Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, deviceId), "Device should be online"); + + // Set EdgeNode offline and verify the Device becomes offline + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 3000); + Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, null), "EdgeNode should be offline"); + Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, deviceId), "Device should be offline when EdgeNode is offline"); + + // Set EdgeNode online again + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 4000); + Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, null), "EdgeNode should be online after reconnection"); + Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, deviceId), "Device should remain offline"); + } + + [Fact] + public async Task WhenEdgeNodeGoesOffline_MultipleDevicesBecomeOfflineSimultaneously() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId1 = "device1"; + var deviceId2 = "device2"; + + // Set EdgeNode online + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + + // Set multiple Devices online + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId1, true, 2000); + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId2, true, 3000); + + // Verify all are online + Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, null), "EdgeNode should be online"); + Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, deviceId1), "First device should be online"); + Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, deviceId2), "Second device should be online"); + + // Set EdgeNode offline + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 4000); + + // All devices should now be offline due to EdgeNode going offline + Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, null), "EdgeNode should be offline"); + Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, deviceId1), "First device should be offline"); + Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, deviceId2), "Second device should be offline"); + } +} \ No newline at end of file diff --git a/SparklerNet/HostApplication/Caches/CacheHelper.cs b/SparklerNet/HostApplication/Caches/CacheHelper.cs new file mode 100644 index 0000000..bc058a7 --- /dev/null +++ b/SparklerNet/HostApplication/Caches/CacheHelper.cs @@ -0,0 +1,39 @@ +using System.Collections.Concurrent; + +namespace SparklerNet.HostApplication.Caches; + +public static class CacheHelper +{ + private static readonly ConcurrentDictionary Semaphores = new(); + + /// + /// Builds a standardized cache key based on the provided prefix and identifiers + /// + /// The prefix to use for the key (can be null) + /// The group ID part of the key + /// The edge node ID part of the key + /// The device ID part of the key (optional) + /// The constructed cache key in format "prefix:groupId:edgeNodeId:deviceId" or "prefix:groupId:edgeNodeId" + public static string BuildCacheKey(string? prefix, string groupId, string edgeNodeId, string? deviceId) + { + var baseKey = !string.IsNullOrEmpty(deviceId) + ? $"{groupId}:{edgeNodeId}:{deviceId}" + : $"{groupId}:{edgeNodeId}"; + + return string.IsNullOrEmpty(prefix) ? baseKey : $"{prefix}{baseKey}"; + } + + /// + /// Gets a SemaphoreSlim object for the specified context to support async locking + /// Ensures thread safety for asynchronous operations on a specific device/node combination + /// + /// The group ID part of the key + /// The edge node ID part of the key + /// The device ID part of the key (optional) + /// The SemaphoreSlim object for the specified EdgeNode/Device + public static SemaphoreSlim GetSemaphore(string groupId, string edgeNodeId, string? deviceId) + { + var key = BuildCacheKey(null, groupId, edgeNodeId, deviceId); + return Semaphores.GetOrAdd(key, _ => new SemaphoreSlim(1, 1)); + } +} \ No newline at end of file diff --git a/SparklerNet/HostApplication/Caches/IMessageOrderingService.cs b/SparklerNet/HostApplication/Caches/IMessageOrderingService.cs index d7ff86e..a89ceff 100644 --- a/SparklerNet/HostApplication/Caches/IMessageOrderingService.cs +++ b/SparklerNet/HostApplication/Caches/IMessageOrderingService.cs @@ -45,7 +45,7 @@ public interface IMessageOrderingService /// Clears the sequence cache and pending messages for a specific edge node or device /// Also cleans up any associated timer resources /// - /// The group ID of the edge node + /// The group ID /// The edge node ID /// The device ID (optional) void ClearMessageOrder(string groupId, string edgeNodeId, string? deviceId); diff --git a/SparklerNet/HostApplication/Caches/IStatusTrackingService.cs b/SparklerNet/HostApplication/Caches/IStatusTrackingService.cs new file mode 100644 index 0000000..79727e4 --- /dev/null +++ b/SparklerNet/HostApplication/Caches/IStatusTrackingService.cs @@ -0,0 +1,33 @@ +namespace SparklerNet.HostApplication.Caches; + +public interface IStatusTrackingService +{ + /// + /// Determines if a specific edge node is currently online. + /// + /// The group ID + /// The edge node ID + /// The device ID (optional) + /// True if the edge node is online, otherwise false. + Task IsOnline(string groupId, string edgeNodeId, string? deviceId); + + /// + /// Updates the online status of a specific edge node. + /// + /// The group ID + /// The edge node ID + /// True if the edge node is online, otherwise false. + /// The bdSeq metric value of Birth or Death certificates + /// The timestamp of Birth or Death certificates + Task UpdateEdgeNodeOnlineStatus(string groupId, string edgeNodeId, bool isOnline, int bdSeq, long timestamp); + + /// + /// Updates the online status of a specific device. + /// + /// The group ID + /// The edge node ID + /// The device ID + /// True if the device is online, otherwise false. + /// The timestamp of Birth or Death certificates + Task UpdateDeviceOnlineStatus(string groupId, string edgeNodeId, string deviceId, bool isOnline, long timestamp); +} \ No newline at end of file diff --git a/SparklerNet/HostApplication/Caches/StatusTrackingService.cs b/SparklerNet/HostApplication/Caches/StatusTrackingService.cs new file mode 100644 index 0000000..2606cd2 --- /dev/null +++ b/SparklerNet/HostApplication/Caches/StatusTrackingService.cs @@ -0,0 +1,155 @@ +using System.Diagnostics.CodeAnalysis; +using Microsoft.Extensions.Caching.Hybrid; + +namespace SparklerNet.HostApplication.Caches; + +/// +/// Service responsible for tracking the online status of edge nodes and devices +/// Provides methods to check and update the online status of specific edge nodes and devices +/// Uses HybridCache for efficient caching with both in-memory and distributed capabilities +/// +public class StatusTrackingService : IStatusTrackingService +{ + private const string StatusKeyPrefix = "sparkplug:status:"; // Prefix for the status cache keys + private readonly HybridCache _cache; + + /// + /// Initializes a new instance of the + /// + /// The HybridCache instance for caching online status + public StatusTrackingService(HybridCache cache) + { + _cache = cache ?? throw new ArgumentNullException(nameof(cache)); + } + + /// + public async Task IsOnline(string groupId, string edgeNodeId, string? deviceId) + { + ArgumentException.ThrowIfNullOrWhiteSpace(groupId); + ArgumentException.ThrowIfNullOrWhiteSpace(edgeNodeId); + + // Build the cache key for status tracking + var cacheKey = CacheHelper.BuildCacheKey(StatusKeyPrefix, groupId, edgeNodeId, deviceId); + + // Get the status from the cache or create a new entry if it doesn't exist' + var status = await _cache.GetOrCreateAsync( + cacheKey, _ => ValueTask.FromResult(null)); + + // If the status is not in the cache, assume it offline + return status is { IsOnline: true }; + } + + /// + public async Task UpdateEdgeNodeOnlineStatus(string groupId, string edgeNodeId, bool isOnline, int bdSeq, + long timestamp) + { + ArgumentException.ThrowIfNullOrWhiteSpace(groupId); + ArgumentException.ThrowIfNullOrWhiteSpace(edgeNodeId); + + // Build the cache key for status tracking + var cacheKey = CacheHelper.BuildCacheKey(StatusKeyPrefix, groupId, edgeNodeId, null); + var cacheTag = CacheHelper.BuildCacheKey(null, groupId, edgeNodeId, null); + + // Create a new status object + var newStatus = new EndpointStatus { IsOnline = isOnline, BdSeq = bdSeq, Timestamp = timestamp }; + + // Use SemaphoreSlim for async thread safety + var semaphore = CacheHelper.GetSemaphore(groupId, edgeNodeId, null); + + try + { + // Wait for the semaphore asynchronously + await semaphore.WaitAsync(); + + // Get the current status from the cache or create a new entry if it doesn't exist + var currentStatus = await _cache.GetOrCreateAsync( + cacheKey, _ => ValueTask.FromResult(newStatus), tags: [cacheTag]); + + // Online status update logic + if (newStatus.IsOnline) + { + // Update the cache if the new status is newer than the current status + // Note the current status may already be updated because the value is null + if (newStatus.Timestamp > currentStatus.Timestamp) + await _cache.SetAsync(cacheKey, newStatus, tags: [cacheTag]); + } + // Offline status update logic + else + { + // When the current status is offline, update if the new status is newer than the current status + // ReSharper disable once ConvertIfStatementToSwitchStatement + if (!currentStatus.IsOnline && newStatus.Timestamp > currentStatus.Timestamp) + await _cache.SetAsync(cacheKey, newStatus, tags: [cacheTag]); + + // When the current status is online, update if: + // 1. The new status has the same bdSeq + // 2. The new status has the same timestamp + // 3. The new status is newer than the current status + if (currentStatus.IsOnline && + (newStatus.BdSeq == currentStatus.BdSeq || newStatus.Timestamp >= currentStatus.Timestamp)) + { + await _cache.RemoveByTagAsync(cacheTag); + await _cache.SetAsync(cacheKey, newStatus, tags: [cacheTag]); + } + } + } + finally + { + // Always release the semaphore to prevent deadlocks + semaphore.Release(); + } + } + + /// + public async Task UpdateDeviceOnlineStatus(string groupId, string edgeNodeId, string deviceId, bool isOnline, + long timestamp) + { + ArgumentException.ThrowIfNullOrWhiteSpace(groupId); + ArgumentException.ThrowIfNullOrWhiteSpace(edgeNodeId); + ArgumentException.ThrowIfNullOrWhiteSpace(deviceId); + + // Build the cache key for status tracking + var cacheKey = CacheHelper.BuildCacheKey(StatusKeyPrefix, groupId, edgeNodeId, deviceId); + var cacheTag = CacheHelper.BuildCacheKey(null, groupId, edgeNodeId, null); + + // Create a new status object + var newStatus = new EndpointStatus { IsOnline = isOnline, BdSeq = 0, Timestamp = timestamp }; + + // Use SemaphoreSlim for async thread safety + var semaphore = CacheHelper.GetSemaphore(groupId, edgeNodeId, deviceId); + + try + { + // Wait for the semaphore asynchronously + await semaphore.WaitAsync(); + + // Get the current status from the cache or create a new entry if it doesn't exist + var currentStatus = await _cache.GetOrCreateAsync( + cacheKey, _ => ValueTask.FromResult(newStatus), tags: [cacheTag]); + + // Update the cache if the new status is newer than the current status + // Note the current status may already be updated because the value is null + if (newStatus.Timestamp > currentStatus.Timestamp) + await _cache.SetAsync(cacheKey, newStatus, tags: [cacheTag]); + } + finally + { + // Always release the semaphore to prevent deadlocks + semaphore.Release(); + } + } + + /// + /// Simple data class to store online status information + /// Allows for future expansion of status data beyond just a boolean flag + /// + [SuppressMessage("ReSharper", "MemberCanBePrivate.Global")] + protected record EndpointStatus + { + public bool IsOnline { get; init; } + + public int BdSeq { get; init; } + + public long Timestamp { get; init; } + } +} \ No newline at end of file diff --git a/SparklerNet/SparklerNet.csproj b/SparklerNet/SparklerNet.csproj index fa31fd6..fab88a5 100644 --- a/SparklerNet/SparklerNet.csproj +++ b/SparklerNet/SparklerNet.csproj @@ -34,6 +34,7 @@ + From 31c6afd435549bee9ee4d73fa09f032620386e1f Mon Sep 17 00:00:00 2001 From: Dennis Gu Date: Sun, 30 Nov 2025 22:03:11 +0800 Subject: [PATCH 2/5] Refactor status tracking tests and cache helper --- .../Caches/StatusTrackingServiceTests.cs | 278 +++++++++--------- .../HostApplication/Caches/CacheHelper.cs | 14 +- .../Caches/StatusTrackingService.cs | 11 +- 3 files changed, 156 insertions(+), 147 deletions(-) diff --git a/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs b/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs index 4cd0872..d56a2c7 100644 --- a/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs +++ b/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs @@ -15,7 +15,7 @@ public StatusTrackingServiceTests() { var services = new ServiceCollection(); services.AddHybridCache(); - + var serviceProvider = services.BuildServiceProvider(); var cache = serviceProvider.GetRequiredService(); _statusService = new StatusTrackingService(cache); @@ -27,9 +27,9 @@ public async Task IsOnline_WhenNoStatusInCache_ReturnsFalse() var groupId = "group1"; var edgeNodeId = "edgeNode1"; var deviceId = "device1"; - + var result = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); - + Assert.False(result); } @@ -38,11 +38,11 @@ public async Task IsOnline_WithOnlineStatus_ReturnsTrue() { var groupId = "group1"; var edgeNodeId = "edgeNode1"; - + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); - + var edgeNodeStatus = await _statusService.IsOnline(groupId, edgeNodeId, null); - + Assert.True(edgeNodeStatus); } @@ -52,11 +52,11 @@ public async Task IsOnline_WithOfflineStatus_ReturnsFalse() var groupId = "group1"; var edgeNodeId = "edgeNode1"; var deviceId = "device1"; - + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); - + var result = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); - + Assert.False(result); } @@ -65,11 +65,11 @@ public async Task IsOnline_WithNullDeviceId_ReturnsCorrectStatus() { var groupId = "group1"; var edgeNodeId = "edgeNode1"; - + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); - + var result = await _statusService.IsOnline(groupId, edgeNodeId, null); - + Assert.True(result); } @@ -81,11 +81,11 @@ public async Task UpdateEdgeNodeOnlineStatus_WhenSettingOnlineStatus_CachesStatu var isOnline = true; var bdSeq = 1; var timestamp = 1000L; - + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, isOnline, bdSeq, timestamp); - + var status = await _statusService.IsOnline(groupId, edgeNodeId, null); - + Assert.True(status); } @@ -94,15 +94,15 @@ public async Task UpdateEdgeNodeOnlineStatus_WhenNewerTimestampUpdatesOlderTimes { var groupId = "group1"; var edgeNodeId = "edgeNode1"; - + // Set initial status await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); - + // Update with newer timestamp await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 2, 2000); - + var status = await _statusService.IsOnline(groupId, edgeNodeId, null); - + Assert.True(status); } @@ -111,16 +111,16 @@ public async Task UpdateEdgeNodeOnlineStatus_WhenOlderTimestampDoesNotUpdateNewe { var groupId = "group1"; var edgeNodeId = "edgeNode1"; - + // Set initial status with newer timestamp await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 2, 2000); - + // Try to update with an older timestamp await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); // Should remain online var status = await _statusService.IsOnline(groupId, edgeNodeId, null); - + Assert.True(status); } @@ -130,15 +130,15 @@ public async Task UpdateEdgeNodeOnlineStatus_WhenSettingOfflineStatusWithSameBdS var groupId = "group1"; var edgeNodeId = "edgeNode1"; var bdSeq = 1; - + // Set online status await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, bdSeq, 1000); - + // Set offline status with same bdSeq await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, bdSeq, 2000); - + var status = await _statusService.IsOnline(groupId, edgeNodeId, null); - + Assert.False(status); } @@ -147,15 +147,15 @@ public async Task UpdateEdgeNodeOnlineStatus_WhenSettingOfflineStatusWithNewerTi { var groupId = "group1"; var edgeNodeId = "edgeNode1"; - + // Set offline status await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); - + // Update offline status with newer timestamp await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 2, 2000); - + var status = await _statusService.IsOnline(groupId, edgeNodeId, null); - + Assert.False(status); } @@ -169,9 +169,9 @@ public async Task UpdateDeviceOnlineStatus_WhenSettingOnlineStatus_CachesStatus( var timestamp = 1000L; await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, isOnline, timestamp); - + var status = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); - + Assert.True(status); } @@ -181,15 +181,15 @@ public async Task UpdateDeviceOnlineStatus_WhenNewerTimestampUpdatesOlderTimesta var groupId = "group1"; var edgeNodeId = "edgeNode1"; var deviceId = "device1"; - + // Set initial status await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, false, 1000); - + // Update with newer timestamp await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); - + var status = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); - + Assert.True(status); } @@ -199,117 +199,119 @@ public async Task UpdateDeviceOnlineStatus_WhenOlderTimestampDoesNotUpdateNewerT var groupId = "group1"; var edgeNodeId = "edgeNode1"; var deviceId = "device1"; - + // Set initial status with newer timestamp await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); - + // Try to update with an older timestamp await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, false, 1000); // Should remain online var status = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); - + Assert.True(status); } [Fact] - public async Task UpdateDeviceOnlineStatus_DeviceStatusIsIndependentOfEdgeNodeStatus() - { - var groupId = "group1"; - var edgeNodeId = "edgeNode1"; - var deviceId = "device1"; - - // Set edge node as offline - await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); - - // Set device as online - await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); - - // Check statuses separately - var edgeNodeStatus = await _statusService.IsOnline(groupId, edgeNodeId, null); - var deviceStatus = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); - - Assert.False(edgeNodeStatus); - Assert.True(deviceStatus); - } - - [Fact] - public async Task WhenEdgeNodeGoesOffline_AssociatedDevicesBecomeOffline() - { - var groupId = "group1"; - var edgeNodeId = "edgeNode1"; - var deviceId = "device1"; - - // Set EdgeNode online - await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); - - // Set Device online - await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); - - // Device should be online when EdgeNode is online - var deviceStatusBefore = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); - Assert.True(deviceStatusBefore, "Device should be online when EdgeNode is online"); - - // Set EdgeNode offline - await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 3000); - - // Device should now be offline due to EdgeNode going offline - var deviceStatusAfter = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); - Assert.False(deviceStatusAfter, "Device should be offline when EdgeNode goes offline"); - } - - [Fact] - public async Task WhenEdgeNodeComesOnlineAgain_DeviceStatusRequiresReset() - { - var groupId = "group20"; - var edgeNodeId = "edgeNode20"; - var deviceId = "device20"; - - // Set EdgeNode and Device online - await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); - await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); - - // Verify both are online - Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, null), "EdgeNode should be online"); - Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, deviceId), "Device should be online"); - - // Set EdgeNode offline and verify the Device becomes offline - await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 3000); - Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, null), "EdgeNode should be offline"); - Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, deviceId), "Device should be offline when EdgeNode is offline"); - - // Set EdgeNode online again - await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 4000); - Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, null), "EdgeNode should be online after reconnection"); - Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, deviceId), "Device should remain offline"); - } - - [Fact] - public async Task WhenEdgeNodeGoesOffline_MultipleDevicesBecomeOfflineSimultaneously() - { - var groupId = "group1"; - var edgeNodeId = "edgeNode1"; - var deviceId1 = "device1"; - var deviceId2 = "device2"; - - // Set EdgeNode online - await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); - - // Set multiple Devices online - await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId1, true, 2000); - await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId2, true, 3000); - - // Verify all are online - Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, null), "EdgeNode should be online"); - Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, deviceId1), "First device should be online"); - Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, deviceId2), "Second device should be online"); - - // Set EdgeNode offline - await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 4000); - - // All devices should now be offline due to EdgeNode going offline - Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, null), "EdgeNode should be offline"); - Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, deviceId1), "First device should be offline"); - Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, deviceId2), "Second device should be offline"); - } + public async Task UpdateDeviceOnlineStatus_DeviceStatusIsIndependentOfEdgeNodeStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + // Set edge node as offline + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); + + // Set device as online + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + // Check statuses separately + var edgeNodeStatus = await _statusService.IsOnline(groupId, edgeNodeId, null); + var deviceStatus = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + + Assert.False(edgeNodeStatus); + Assert.True(deviceStatus); + } + + [Fact] + public async Task WhenEdgeNodeGoesOffline_AssociatedDevicesBecomeOffline() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + // Set EdgeNode online + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + + // Set Device online + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + // Device should be online when EdgeNode is online + var deviceStatusBefore = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + Assert.True(deviceStatusBefore, "Device should be online when EdgeNode is online"); + + // Set EdgeNode offline + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 3000); + + // Device should now be offline due to EdgeNode going offline + var deviceStatusAfter = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + Assert.False(deviceStatusAfter, "Device should be offline when EdgeNode goes offline"); + } + + [Fact] + public async Task WhenEdgeNodeComesOnlineAgain_DeviceStatusRequiresReset() + { + var groupId = "group20"; + var edgeNodeId = "edgeNode20"; + var deviceId = "device20"; + + // Set EdgeNode and Device online + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + // Verify both are online + Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, null), "EdgeNode should be online"); + Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, deviceId), "Device should be online"); + + // Set EdgeNode offline and verify the Device becomes offline + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 3000); + Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, null), "EdgeNode should be offline"); + Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, deviceId), + "Device should be offline when EdgeNode is offline"); + + // Set EdgeNode online again + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 4000); + Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, null), + "EdgeNode should be online after reconnection"); + Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, deviceId), "Device should remain offline"); + } + + [Fact] + public async Task WhenEdgeNodeGoesOffline_MultipleDevicesBecomeOfflineSimultaneously() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId1 = "device1"; + var deviceId2 = "device2"; + + // Set EdgeNode online + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + + // Set multiple Devices online + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId1, true, 2000); + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId2, true, 3000); + + // Verify all are online + Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, null), "EdgeNode should be online"); + Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, deviceId1), "First device should be online"); + Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, deviceId2), "Second device should be online"); + + // Set EdgeNode offline + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 4000); + + // All devices should now be offline due to EdgeNode going offline + Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, null), "EdgeNode should be offline"); + Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, deviceId1), "First device should be offline"); + Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, deviceId2), "Second device should be offline"); + } } \ No newline at end of file diff --git a/SparklerNet/HostApplication/Caches/CacheHelper.cs b/SparklerNet/HostApplication/Caches/CacheHelper.cs index bc058a7..4ed9018 100644 --- a/SparklerNet/HostApplication/Caches/CacheHelper.cs +++ b/SparklerNet/HostApplication/Caches/CacheHelper.cs @@ -5,7 +5,7 @@ namespace SparklerNet.HostApplication.Caches; public static class CacheHelper { private static readonly ConcurrentDictionary Semaphores = new(); - + /// /// Builds a standardized cache key based on the provided prefix and identifiers /// @@ -15,14 +15,14 @@ public static class CacheHelper /// The device ID part of the key (optional) /// The constructed cache key in format "prefix:groupId:edgeNodeId:deviceId" or "prefix:groupId:edgeNodeId" public static string BuildCacheKey(string? prefix, string groupId, string edgeNodeId, string? deviceId) - { + { var baseKey = !string.IsNullOrEmpty(deviceId) ? $"{groupId}:{edgeNodeId}:{deviceId}" : $"{groupId}:{edgeNodeId}"; return string.IsNullOrEmpty(prefix) ? baseKey : $"{prefix}{baseKey}"; } - + /// /// Gets a SemaphoreSlim object for the specified context to support async locking /// Ensures thread safety for asynchronous operations on a specific device/node combination @@ -36,4 +36,12 @@ public static SemaphoreSlim GetSemaphore(string groupId, string edgeNodeId, stri var key = BuildCacheKey(null, groupId, edgeNodeId, deviceId); return Semaphores.GetOrAdd(key, _ => new SemaphoreSlim(1, 1)); } + + /// + /// Clears all SemaphoreSlim objects from the cache + /// + public static void ClearSemaphores() + { + Semaphores.Clear(); + } } \ No newline at end of file diff --git a/SparklerNet/HostApplication/Caches/StatusTrackingService.cs b/SparklerNet/HostApplication/Caches/StatusTrackingService.cs index 2606cd2..aa441ed 100644 --- a/SparklerNet/HostApplication/Caches/StatusTrackingService.cs +++ b/SparklerNet/HostApplication/Caches/StatusTrackingService.cs @@ -31,11 +31,11 @@ public async Task IsOnline(string groupId, string edgeNodeId, string? devi // Build the cache key for status tracking var cacheKey = CacheHelper.BuildCacheKey(StatusKeyPrefix, groupId, edgeNodeId, deviceId); - // Get the status from the cache or create a new entry if it doesn't exist' + // Get the status from the cache or create a new entry if it doesn't exist var status = await _cache.GetOrCreateAsync( cacheKey, _ => ValueTask.FromResult(null)); - // If the status is not in the cache, assume it offline + // If the status is not in the cache, assume it is offline return status is { IsOnline: true }; } @@ -69,7 +69,7 @@ public async Task UpdateEdgeNodeOnlineStatus(string groupId, string edgeNodeId, if (newStatus.IsOnline) { // Update the cache if the new status is newer than the current status - // Note the current status may already be updated because the value is null + // Note: if the cache is empty, currentStatus will be set to newStatus by GetOrCreateAsync if (newStatus.Timestamp > currentStatus.Timestamp) await _cache.SetAsync(cacheKey, newStatus, tags: [cacheTag]); } @@ -83,8 +83,7 @@ public async Task UpdateEdgeNodeOnlineStatus(string groupId, string edgeNodeId, // When the current status is online, update if: // 1. The new status has the same bdSeq - // 2. The new status has the same timestamp - // 3. The new status is newer than the current status + // 2. The new status has the same or newer timestamp if (currentStatus.IsOnline && (newStatus.BdSeq == currentStatus.BdSeq || newStatus.Timestamp >= currentStatus.Timestamp)) { @@ -128,7 +127,7 @@ public async Task UpdateDeviceOnlineStatus(string groupId, string edgeNodeId, st cacheKey, _ => ValueTask.FromResult(newStatus), tags: [cacheTag]); // Update the cache if the new status is newer than the current status - // Note the current status may already be updated because the value is null + // Note: if the cache is empty, currentStatus will be set to newStatus by GetOrCreateAsync if (newStatus.Timestamp > currentStatus.Timestamp) await _cache.SetAsync(cacheKey, newStatus, tags: [cacheTag]); } From 68f3a92561535b2125c3bd33fa7b30deac6ac3f7 Mon Sep 17 00:00:00 2001 From: Dennis Gu Date: Mon, 1 Dec 2025 10:28:23 +0800 Subject: [PATCH 3/5] Add concurrent status update test and improve cache cleanup --- .../Caches/StatusTrackingServiceTests.cs | 62 +++++++++++++++++++ .../HostApplication/Caches/CacheHelper.cs | 4 ++ .../Caches/IStatusTrackingService.cs | 5 +- .../Caches/StatusTrackingService.cs | 3 +- 4 files changed, 72 insertions(+), 2 deletions(-) diff --git a/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs b/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs index d56a2c7..75de0f1 100644 --- a/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs +++ b/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs @@ -314,4 +314,66 @@ public async Task WhenEdgeNodeGoesOffline_MultipleDevicesBecomeOfflineSimultaneo Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, deviceId1), "First device should be offline"); Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, deviceId2), "Second device should be offline"); } + + [Fact] + public async Task TestMixedEdgeNodeAndDeviceStatusUpdates_Concurrent() + { + const string groupId = "test-group"; + const string edgeNodeId = "test-edge-node"; + const string deviceId = "test-device"; + + var completedEdgeNodeTasks = 0; + var completedDeviceTasks = 0; + var errors = new List(); + + // Create mixed edge node and device update tasks + var edgeNodeTasks = Enumerable.Range(0, 5).Select(async _ => + { + try + { + await _statusService.UpdateEdgeNodeOnlineStatus( + groupId, + edgeNodeId, + true, + 1, // KeepAliveInterval + DateTimeOffset.Now.ToUnixTimeMilliseconds()); + Interlocked.Increment(ref completedEdgeNodeTasks); + } + catch (Exception ex) + { + errors.Add(ex); + } + }); + + var deviceTasks = Enumerable.Range(0, 5).Select(async _ => + { + try + { + await _statusService.UpdateDeviceOnlineStatus( + groupId, + edgeNodeId, + deviceId, + true, + DateTimeOffset.Now.ToUnixTimeMilliseconds()); + Interlocked.Increment(ref completedDeviceTasks); + } + catch (Exception ex) + { + errors.Add(ex); + } + }); + + // Run all tasks concurrently + var allTasks = edgeNodeTasks.Concat(deviceTasks); + await Task.WhenAll(allTasks); + + // Verify no errors + Assert.Empty(errors); + Assert.Equal(5, completedEdgeNodeTasks); + Assert.Equal(5, completedDeviceTasks); + + // Verify statuses were updated correctly + Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, null)); + Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, deviceId)); + } } \ No newline at end of file diff --git a/SparklerNet/HostApplication/Caches/CacheHelper.cs b/SparklerNet/HostApplication/Caches/CacheHelper.cs index 4ed9018..36791ac 100644 --- a/SparklerNet/HostApplication/Caches/CacheHelper.cs +++ b/SparklerNet/HostApplication/Caches/CacheHelper.cs @@ -2,6 +2,9 @@ namespace SparklerNet.HostApplication.Caches; +/// +/// Provides helper methods for cache operations. +/// public static class CacheHelper { private static readonly ConcurrentDictionary Semaphores = new(); @@ -42,6 +45,7 @@ public static SemaphoreSlim GetSemaphore(string groupId, string edgeNodeId, stri /// public static void ClearSemaphores() { + foreach (var semaphore in Semaphores.Values) semaphore.Dispose(); Semaphores.Clear(); } } \ No newline at end of file diff --git a/SparklerNet/HostApplication/Caches/IStatusTrackingService.cs b/SparklerNet/HostApplication/Caches/IStatusTrackingService.cs index 79727e4..4f189a7 100644 --- a/SparklerNet/HostApplication/Caches/IStatusTrackingService.cs +++ b/SparklerNet/HostApplication/Caches/IStatusTrackingService.cs @@ -1,5 +1,8 @@ namespace SparklerNet.HostApplication.Caches; +/// +/// Provides methods to track and query the status of edge nodes and devices. +/// public interface IStatusTrackingService { /// @@ -20,7 +23,7 @@ public interface IStatusTrackingService /// The bdSeq metric value of Birth or Death certificates /// The timestamp of Birth or Death certificates Task UpdateEdgeNodeOnlineStatus(string groupId, string edgeNodeId, bool isOnline, int bdSeq, long timestamp); - + /// /// Updates the online status of a specific device. /// diff --git a/SparklerNet/HostApplication/Caches/StatusTrackingService.cs b/SparklerNet/HostApplication/Caches/StatusTrackingService.cs index aa441ed..7fb22ff 100644 --- a/SparklerNet/HostApplication/Caches/StatusTrackingService.cs +++ b/SparklerNet/HostApplication/Caches/StatusTrackingService.cs @@ -115,7 +115,8 @@ public async Task UpdateDeviceOnlineStatus(string groupId, string edgeNodeId, st var newStatus = new EndpointStatus { IsOnline = isOnline, BdSeq = 0, Timestamp = timestamp }; // Use SemaphoreSlim for async thread safety - var semaphore = CacheHelper.GetSemaphore(groupId, edgeNodeId, deviceId); + // Because when Edge Node is offline, all devices are also offline, so always use the Edge Node level semaphore + var semaphore = CacheHelper.GetSemaphore(groupId, edgeNodeId, null); try { From 10cb15ec9722463465341e4650cdaef4435b2dac Mon Sep 17 00:00:00 2001 From: Dennis Gu Date: Mon, 1 Dec 2025 15:01:10 +0800 Subject: [PATCH 4/5] Rename IsOnline to IsEndpointOnline in status service --- .../Caches/StatusTrackingServiceTests.cs | 67 ++++++++++--------- .../HostApplication/Caches/CacheHelper.cs | 1 + .../Caches/IStatusTrackingService.cs | 6 +- .../Caches/StatusTrackingService.cs | 2 +- 4 files changed, 41 insertions(+), 35 deletions(-) diff --git a/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs b/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs index 75de0f1..28abdf3 100644 --- a/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs +++ b/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs @@ -28,7 +28,7 @@ public async Task IsOnline_WhenNoStatusInCache_ReturnsFalse() var edgeNodeId = "edgeNode1"; var deviceId = "device1"; - var result = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + var result = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); Assert.False(result); } @@ -41,7 +41,7 @@ public async Task IsOnline_WithOnlineStatus_ReturnsTrue() await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); - var edgeNodeStatus = await _statusService.IsOnline(groupId, edgeNodeId, null); + var edgeNodeStatus = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); Assert.True(edgeNodeStatus); } @@ -55,7 +55,7 @@ public async Task IsOnline_WithOfflineStatus_ReturnsFalse() await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); - var result = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + var result = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); Assert.False(result); } @@ -68,7 +68,7 @@ public async Task IsOnline_WithNullDeviceId_ReturnsCorrectStatus() await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); - var result = await _statusService.IsOnline(groupId, edgeNodeId, null); + var result = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); Assert.True(result); } @@ -84,7 +84,7 @@ public async Task UpdateEdgeNodeOnlineStatus_WhenSettingOnlineStatus_CachesStatu await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, isOnline, bdSeq, timestamp); - var status = await _statusService.IsOnline(groupId, edgeNodeId, null); + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); Assert.True(status); } @@ -101,7 +101,7 @@ public async Task UpdateEdgeNodeOnlineStatus_WhenNewerTimestampUpdatesOlderTimes // Update with newer timestamp await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 2, 2000); - var status = await _statusService.IsOnline(groupId, edgeNodeId, null); + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); Assert.True(status); } @@ -119,7 +119,7 @@ public async Task UpdateEdgeNodeOnlineStatus_WhenOlderTimestampDoesNotUpdateNewe await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); // Should remain online - var status = await _statusService.IsOnline(groupId, edgeNodeId, null); + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); Assert.True(status); } @@ -137,7 +137,7 @@ public async Task UpdateEdgeNodeOnlineStatus_WhenSettingOfflineStatusWithSameBdS // Set offline status with same bdSeq await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, bdSeq, 2000); - var status = await _statusService.IsOnline(groupId, edgeNodeId, null); + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); Assert.False(status); } @@ -154,7 +154,7 @@ public async Task UpdateEdgeNodeOnlineStatus_WhenSettingOfflineStatusWithNewerTi // Update offline status with newer timestamp await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 2, 2000); - var status = await _statusService.IsOnline(groupId, edgeNodeId, null); + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); Assert.False(status); } @@ -170,7 +170,7 @@ public async Task UpdateDeviceOnlineStatus_WhenSettingOnlineStatus_CachesStatus( await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, isOnline, timestamp); - var status = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); Assert.True(status); } @@ -188,7 +188,7 @@ public async Task UpdateDeviceOnlineStatus_WhenNewerTimestampUpdatesOlderTimesta // Update with newer timestamp await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); - var status = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); Assert.True(status); } @@ -207,7 +207,7 @@ public async Task UpdateDeviceOnlineStatus_WhenOlderTimestampDoesNotUpdateNewerT await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, false, 1000); // Should remain online - var status = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); Assert.True(status); } @@ -226,8 +226,8 @@ public async Task UpdateDeviceOnlineStatus_DeviceStatusIsIndependentOfEdgeNodeSt await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); // Check statuses separately - var edgeNodeStatus = await _statusService.IsOnline(groupId, edgeNodeId, null); - var deviceStatus = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + var edgeNodeStatus = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); + var deviceStatus = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); Assert.False(edgeNodeStatus); Assert.True(deviceStatus); @@ -247,14 +247,14 @@ public async Task WhenEdgeNodeGoesOffline_AssociatedDevicesBecomeOffline() await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); // Device should be online when EdgeNode is online - var deviceStatusBefore = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + var deviceStatusBefore = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); Assert.True(deviceStatusBefore, "Device should be online when EdgeNode is online"); // Set EdgeNode offline await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 3000); // Device should now be offline due to EdgeNode going offline - var deviceStatusAfter = await _statusService.IsOnline(groupId, edgeNodeId, deviceId); + var deviceStatusAfter = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); Assert.False(deviceStatusAfter, "Device should be offline when EdgeNode goes offline"); } @@ -270,20 +270,21 @@ public async Task WhenEdgeNodeComesOnlineAgain_DeviceStatusRequiresReset() await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); // Verify both are online - Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, null), "EdgeNode should be online"); - Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, deviceId), "Device should be online"); + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null), "EdgeNode should be online"); + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId), "Device should be online"); // Set EdgeNode offline and verify the Device becomes offline await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 3000); - Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, null), "EdgeNode should be offline"); - Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, deviceId), + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null), "EdgeNode should be offline"); + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId), "Device should be offline when EdgeNode is offline"); // Set EdgeNode online again await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 4000); - Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, null), + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null), "EdgeNode should be online after reconnection"); - Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, deviceId), "Device should remain offline"); + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId), + "Device should remain offline"); } [Fact] @@ -302,17 +303,21 @@ public async Task WhenEdgeNodeGoesOffline_MultipleDevicesBecomeOfflineSimultaneo await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId2, true, 3000); // Verify all are online - Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, null), "EdgeNode should be online"); - Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, deviceId1), "First device should be online"); - Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, deviceId2), "Second device should be online"); + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null), "EdgeNode should be online"); + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId1), + "First device should be online"); + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId2), + "Second device should be online"); // Set EdgeNode offline await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 4000); // All devices should now be offline due to EdgeNode going offline - Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, null), "EdgeNode should be offline"); - Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, deviceId1), "First device should be offline"); - Assert.False(await _statusService.IsOnline(groupId, edgeNodeId, deviceId2), "Second device should be offline"); + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null), "EdgeNode should be offline"); + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId1), + "First device should be offline"); + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId2), + "Second device should be offline"); } [Fact] @@ -335,7 +340,7 @@ await _statusService.UpdateEdgeNodeOnlineStatus( groupId, edgeNodeId, true, - 1, // KeepAliveInterval + 1, DateTimeOffset.Now.ToUnixTimeMilliseconds()); Interlocked.Increment(ref completedEdgeNodeTasks); } @@ -373,7 +378,7 @@ await _statusService.UpdateDeviceOnlineStatus( Assert.Equal(5, completedDeviceTasks); // Verify statuses were updated correctly - Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, null)); - Assert.True(await _statusService.IsOnline(groupId, edgeNodeId, deviceId)); + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null)); + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId)); } } \ No newline at end of file diff --git a/SparklerNet/HostApplication/Caches/CacheHelper.cs b/SparklerNet/HostApplication/Caches/CacheHelper.cs index 36791ac..5efbdb5 100644 --- a/SparklerNet/HostApplication/Caches/CacheHelper.cs +++ b/SparklerNet/HostApplication/Caches/CacheHelper.cs @@ -42,6 +42,7 @@ public static SemaphoreSlim GetSemaphore(string groupId, string edgeNodeId, stri /// /// Clears all SemaphoreSlim objects from the cache + /// This method should only be called during application shutdown to prevent racing conditions /// public static void ClearSemaphores() { diff --git a/SparklerNet/HostApplication/Caches/IStatusTrackingService.cs b/SparklerNet/HostApplication/Caches/IStatusTrackingService.cs index 4f189a7..1700e8f 100644 --- a/SparklerNet/HostApplication/Caches/IStatusTrackingService.cs +++ b/SparklerNet/HostApplication/Caches/IStatusTrackingService.cs @@ -6,13 +6,13 @@ public interface IStatusTrackingService { /// - /// Determines if a specific edge node is currently online. + /// Determines if a specific endpoint (EdgeNode or Device) is currently online. /// /// The group ID /// The edge node ID /// The device ID (optional) - /// True if the edge node is online, otherwise false. - Task IsOnline(string groupId, string edgeNodeId, string? deviceId); + /// True if the endpoint is online, otherwise false. + Task IsEndpointOnline(string groupId, string edgeNodeId, string? deviceId); /// /// Updates the online status of a specific edge node. diff --git a/SparklerNet/HostApplication/Caches/StatusTrackingService.cs b/SparklerNet/HostApplication/Caches/StatusTrackingService.cs index 7fb22ff..e08417e 100644 --- a/SparklerNet/HostApplication/Caches/StatusTrackingService.cs +++ b/SparklerNet/HostApplication/Caches/StatusTrackingService.cs @@ -23,7 +23,7 @@ public StatusTrackingService(HybridCache cache) } /// - public async Task IsOnline(string groupId, string edgeNodeId, string? deviceId) + public async Task IsEndpointOnline(string groupId, string edgeNodeId, string? deviceId) { ArgumentException.ThrowIfNullOrWhiteSpace(groupId); ArgumentException.ThrowIfNullOrWhiteSpace(edgeNodeId); From ea312dbe4a340009b6297c0aae113199766289e5 Mon Sep 17 00:00:00 2001 From: Dennis Gu Date: Mon, 1 Dec 2025 15:41:10 +0800 Subject: [PATCH 5/5] Add async semaphore to status cache access --- .../Caches/StatusTrackingServiceTests.cs | 8 +++--- .../HostApplication/Caches/CacheHelper.cs | 2 +- .../Caches/StatusTrackingService.cs | 25 ++++++++++++++----- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs b/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs index 28abdf3..ba6e8b1 100644 --- a/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs +++ b/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs @@ -22,7 +22,7 @@ public StatusTrackingServiceTests() } [Fact] - public async Task IsOnline_WhenNoStatusInCache_ReturnsFalse() + public async Task IsEndpointOnline_WhenNoStatusInCache_ReturnsFalse() { var groupId = "group1"; var edgeNodeId = "edgeNode1"; @@ -34,7 +34,7 @@ public async Task IsOnline_WhenNoStatusInCache_ReturnsFalse() } [Fact] - public async Task IsOnline_WithOnlineStatus_ReturnsTrue() + public async Task IsEndpointOnline_WithOnlineStatus_ReturnsTrue() { var groupId = "group1"; var edgeNodeId = "edgeNode1"; @@ -47,7 +47,7 @@ public async Task IsOnline_WithOnlineStatus_ReturnsTrue() } [Fact] - public async Task IsOnline_WithOfflineStatus_ReturnsFalse() + public async Task IsEndpointOnline_WithOfflineStatus_ReturnsFalse() { var groupId = "group1"; var edgeNodeId = "edgeNode1"; @@ -61,7 +61,7 @@ public async Task IsOnline_WithOfflineStatus_ReturnsFalse() } [Fact] - public async Task IsOnline_WithNullDeviceId_ReturnsCorrectStatus() + public async Task IsEndpointOnline_WithNullDeviceId_ReturnsCorrectStatus() { var groupId = "group1"; var edgeNodeId = "edgeNode1"; diff --git a/SparklerNet/HostApplication/Caches/CacheHelper.cs b/SparklerNet/HostApplication/Caches/CacheHelper.cs index 5efbdb5..ac420e5 100644 --- a/SparklerNet/HostApplication/Caches/CacheHelper.cs +++ b/SparklerNet/HostApplication/Caches/CacheHelper.cs @@ -42,7 +42,7 @@ public static SemaphoreSlim GetSemaphore(string groupId, string edgeNodeId, stri /// /// Clears all SemaphoreSlim objects from the cache - /// This method should only be called during application shutdown to prevent racing conditions + /// This method should only be called during application shutdown to prevent race conditions /// public static void ClearSemaphores() { diff --git a/SparklerNet/HostApplication/Caches/StatusTrackingService.cs b/SparklerNet/HostApplication/Caches/StatusTrackingService.cs index e08417e..7b98a7f 100644 --- a/SparklerNet/HostApplication/Caches/StatusTrackingService.cs +++ b/SparklerNet/HostApplication/Caches/StatusTrackingService.cs @@ -30,13 +30,26 @@ public async Task IsEndpointOnline(string groupId, string edgeNodeId, stri // Build the cache key for status tracking var cacheKey = CacheHelper.BuildCacheKey(StatusKeyPrefix, groupId, edgeNodeId, deviceId); + + // Use SemaphoreSlim for async thread safety + var semaphore = CacheHelper.GetSemaphore(groupId, edgeNodeId, null); + + try + { + // Wait for the semaphore asynchronously + await semaphore.WaitAsync(); + + // Get the status from the cache or create a new entry if it doesn't exist + var status = await _cache.GetOrCreateAsync( + cacheKey, _ => ValueTask.FromResult(null)); - // Get the status from the cache or create a new entry if it doesn't exist - var status = await _cache.GetOrCreateAsync( - cacheKey, _ => ValueTask.FromResult(null)); - - // If the status is not in the cache, assume it is offline - return status is { IsOnline: true }; + // If the status is not in the cache, assume it is offline + return status is { IsOnline: true }; + } + finally + { + semaphore.Release(); + } } ///