diff --git a/SparklerNet.Tests/HostApplication/Caches/CacheHelperTests.cs b/SparklerNet.Tests/HostApplication/Caches/CacheHelperTests.cs new file mode 100644 index 0000000..f1a07ee --- /dev/null +++ b/SparklerNet.Tests/HostApplication/Caches/CacheHelperTests.cs @@ -0,0 +1,56 @@ +using Xunit; +using SparklerNet.HostApplication.Caches; + +namespace SparklerNet.Tests.HostApplication.Caches; + +public class CacheHelperTests +{ + [Theory] + [InlineData("prefix:", "group1", "edge1", "device1", "prefix:group1:edge1:device1")] + [InlineData("status:", "group2", "edge2", "device2", "status:group2:edge2:device2")] + [InlineData(null, "group3", "edge3", "device3", "group3:edge3:device3")] + [InlineData("", "group4", "edge4", "device4", "group4:edge4:device4")] + public void BuildCacheKey_WithDeviceId_ReturnsCorrectFormat(string? prefix, string groupId, string edgeNodeId, string? deviceId, string expected) + { + var result = CacheHelper.BuildCacheKey(prefix, groupId, edgeNodeId, deviceId); + Assert.Equal(expected, result); + } + + [Theory] + [InlineData("prefix:", "group1", "edge1", null, "prefix:group1:edge1")] + [InlineData("status:", "group2", "edge2", null, "status:group2:edge2")] + [InlineData(null, "group3", "edge3", null, "group3:edge3")] + [InlineData("", "group4", "edge4", null, "group4:edge4")] + public void BuildCacheKey_WithoutDeviceId_ReturnsCorrectFormat(string? prefix, string groupId, string edgeNodeId, string? deviceId, string expected) + { + var result = CacheHelper.BuildCacheKey(prefix, groupId, edgeNodeId, deviceId); + Assert.Equal(expected, result); + } + + [Fact] + public void GetSemaphore_ReturnsSameInstanceForSameKey() + { + var semaphore1 = CacheHelper.GetSemaphore("group1", "edge1", "device1"); + var semaphore2 = CacheHelper.GetSemaphore("group1", "edge1", "device1"); + + Assert.Same(semaphore1, semaphore2); + } + + [Fact] + public void GetSemaphore_ReturnsDifferentInstancesForDifferentKeys() + { + var semaphore1 = CacheHelper.GetSemaphore("group1", "edge1", "device1"); + var semaphore2 = CacheHelper.GetSemaphore("group2", "edge2", "device2"); + + Assert.NotSame(semaphore1, semaphore2); + } + + [Fact] + public void GetSemaphore_WithAndWithoutDeviceId_ReturnsDifferentInstances() + { + var semaphore1 = CacheHelper.GetSemaphore("group1", "edge1", "device1"); + var semaphore2 = CacheHelper.GetSemaphore("group1", "edge1", null); + + Assert.NotSame(semaphore1, semaphore2); + } +} \ No newline at end of file diff --git a/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs b/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs new file mode 100644 index 0000000..ba6e8b1 --- /dev/null +++ b/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs @@ -0,0 +1,384 @@ +using System.Diagnostics.CodeAnalysis; +using Microsoft.Extensions.Caching.Hybrid; +using Microsoft.Extensions.DependencyInjection; +using SparklerNet.HostApplication.Caches; +using Xunit; + +namespace SparklerNet.Tests.HostApplication.Caches; + +[SuppressMessage("ReSharper", "ConvertToConstant.Local")] +public class StatusTrackingServiceTests +{ + private readonly StatusTrackingService _statusService; + + public StatusTrackingServiceTests() + { + var services = new ServiceCollection(); + services.AddHybridCache(); + + var serviceProvider = services.BuildServiceProvider(); + var cache = serviceProvider.GetRequiredService(); + _statusService = new StatusTrackingService(cache); + } + + [Fact] + public async Task IsEndpointOnline_WhenNoStatusInCache_ReturnsFalse() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + var result = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); + + Assert.False(result); + } + + [Fact] + public async Task IsEndpointOnline_WithOnlineStatus_ReturnsTrue() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + + var edgeNodeStatus = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); + + Assert.True(edgeNodeStatus); + } + + [Fact] + public async Task IsEndpointOnline_WithOfflineStatus_ReturnsFalse() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); + + var result = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); + + Assert.False(result); + } + + [Fact] + public async Task IsEndpointOnline_WithNullDeviceId_ReturnsCorrectStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + + var result = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); + + Assert.True(result); + } + + [Fact] + public async Task UpdateEdgeNodeOnlineStatus_WhenSettingOnlineStatus_CachesStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var isOnline = true; + var bdSeq = 1; + var timestamp = 1000L; + + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, isOnline, bdSeq, timestamp); + + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); + + Assert.True(status); + } + + [Fact] + public async Task UpdateEdgeNodeOnlineStatus_WhenNewerTimestampUpdatesOlderTimestamp() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + + // Set initial status + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + + // Update with newer timestamp + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 2, 2000); + + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); + + Assert.True(status); + } + + [Fact] + public async Task UpdateEdgeNodeOnlineStatus_WhenOlderTimestampDoesNotUpdateNewerTimestamp() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + + // Set initial status with newer timestamp + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 2, 2000); + + // Try to update with an older timestamp + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); + + // Should remain online + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); + + Assert.True(status); + } + + [Fact] + public async Task UpdateEdgeNodeOnlineStatus_WhenSettingOfflineStatusWithSameBdSeqUpdatesOnlineStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var bdSeq = 1; + + // Set online status + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, bdSeq, 1000); + + // Set offline status with same bdSeq + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, bdSeq, 2000); + + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); + + Assert.False(status); + } + + [Fact] + public async Task UpdateEdgeNodeOnlineStatus_WhenSettingOfflineStatusWithNewerTimestampUpdatesOfflineStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + + // Set offline status + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); + + // Update offline status with newer timestamp + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 2, 2000); + + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); + + Assert.False(status); + } + + [Fact] + public async Task UpdateDeviceOnlineStatus_WhenSettingOnlineStatus_CachesStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + var isOnline = true; + var timestamp = 1000L; + + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, isOnline, timestamp); + + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); + + Assert.True(status); + } + + [Fact] + public async Task UpdateDeviceOnlineStatus_WhenNewerTimestampUpdatesOlderTimestamp() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + // Set initial status + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, false, 1000); + + // Update with newer timestamp + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); + + Assert.True(status); + } + + [Fact] + public async Task UpdateDeviceOnlineStatus_WhenOlderTimestampDoesNotUpdateNewerTimestamp() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + // Set initial status with newer timestamp + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + // Try to update with an older timestamp + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, false, 1000); + + // Should remain online + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); + + Assert.True(status); + } + + [Fact] + public async Task UpdateDeviceOnlineStatus_DeviceStatusIsIndependentOfEdgeNodeStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + // Set edge node as offline + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); + + // Set device as online + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + // Check statuses separately + var edgeNodeStatus = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); + var deviceStatus = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); + + Assert.False(edgeNodeStatus); + Assert.True(deviceStatus); + } + + [Fact] + public async Task WhenEdgeNodeGoesOffline_AssociatedDevicesBecomeOffline() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + // Set EdgeNode online + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + + // Set Device online + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + // Device should be online when EdgeNode is online + var deviceStatusBefore = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); + Assert.True(deviceStatusBefore, "Device should be online when EdgeNode is online"); + + // Set EdgeNode offline + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 3000); + + // Device should now be offline due to EdgeNode going offline + var deviceStatusAfter = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); + Assert.False(deviceStatusAfter, "Device should be offline when EdgeNode goes offline"); + } + + [Fact] + public async Task WhenEdgeNodeComesOnlineAgain_DeviceStatusRequiresReset() + { + var groupId = "group20"; + var edgeNodeId = "edgeNode20"; + var deviceId = "device20"; + + // Set EdgeNode and Device online + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + // Verify both are online + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null), "EdgeNode should be online"); + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId), "Device should be online"); + + // Set EdgeNode offline and verify the Device becomes offline + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 3000); + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null), "EdgeNode should be offline"); + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId), + "Device should be offline when EdgeNode is offline"); + + // Set EdgeNode online again + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 4000); + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null), + "EdgeNode should be online after reconnection"); + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId), + "Device should remain offline"); + } + + [Fact] + public async Task WhenEdgeNodeGoesOffline_MultipleDevicesBecomeOfflineSimultaneously() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId1 = "device1"; + var deviceId2 = "device2"; + + // Set EdgeNode online + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + + // Set multiple Devices online + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId1, true, 2000); + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId2, true, 3000); + + // Verify all are online + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null), "EdgeNode should be online"); + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId1), + "First device should be online"); + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId2), + "Second device should be online"); + + // Set EdgeNode offline + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 4000); + + // All devices should now be offline due to EdgeNode going offline + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null), "EdgeNode should be offline"); + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId1), + "First device should be offline"); + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId2), + "Second device should be offline"); + } + + [Fact] + public async Task TestMixedEdgeNodeAndDeviceStatusUpdates_Concurrent() + { + const string groupId = "test-group"; + const string edgeNodeId = "test-edge-node"; + const string deviceId = "test-device"; + + var completedEdgeNodeTasks = 0; + var completedDeviceTasks = 0; + var errors = new List(); + + // Create mixed edge node and device update tasks + var edgeNodeTasks = Enumerable.Range(0, 5).Select(async _ => + { + try + { + await _statusService.UpdateEdgeNodeOnlineStatus( + groupId, + edgeNodeId, + true, + 1, + DateTimeOffset.Now.ToUnixTimeMilliseconds()); + Interlocked.Increment(ref completedEdgeNodeTasks); + } + catch (Exception ex) + { + errors.Add(ex); + } + }); + + var deviceTasks = Enumerable.Range(0, 5).Select(async _ => + { + try + { + await _statusService.UpdateDeviceOnlineStatus( + groupId, + edgeNodeId, + deviceId, + true, + DateTimeOffset.Now.ToUnixTimeMilliseconds()); + Interlocked.Increment(ref completedDeviceTasks); + } + catch (Exception ex) + { + errors.Add(ex); + } + }); + + // Run all tasks concurrently + var allTasks = edgeNodeTasks.Concat(deviceTasks); + await Task.WhenAll(allTasks); + + // Verify no errors + Assert.Empty(errors); + Assert.Equal(5, completedEdgeNodeTasks); + Assert.Equal(5, completedDeviceTasks); + + // Verify statuses were updated correctly + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null)); + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId)); + } +} \ No newline at end of file diff --git a/SparklerNet/HostApplication/Caches/CacheHelper.cs b/SparklerNet/HostApplication/Caches/CacheHelper.cs new file mode 100644 index 0000000..ac420e5 --- /dev/null +++ b/SparklerNet/HostApplication/Caches/CacheHelper.cs @@ -0,0 +1,52 @@ +using System.Collections.Concurrent; + +namespace SparklerNet.HostApplication.Caches; + +/// +/// Provides helper methods for cache operations. +/// +public static class CacheHelper +{ + private static readonly ConcurrentDictionary Semaphores = new(); + + /// + /// Builds a standardized cache key based on the provided prefix and identifiers + /// + /// The prefix to use for the key (can be null) + /// The group ID part of the key + /// The edge node ID part of the key + /// The device ID part of the key (optional) + /// The constructed cache key in format "prefix:groupId:edgeNodeId:deviceId" or "prefix:groupId:edgeNodeId" + public static string BuildCacheKey(string? prefix, string groupId, string edgeNodeId, string? deviceId) + { + var baseKey = !string.IsNullOrEmpty(deviceId) + ? $"{groupId}:{edgeNodeId}:{deviceId}" + : $"{groupId}:{edgeNodeId}"; + + return string.IsNullOrEmpty(prefix) ? baseKey : $"{prefix}{baseKey}"; + } + + /// + /// Gets a SemaphoreSlim object for the specified context to support async locking + /// Ensures thread safety for asynchronous operations on a specific device/node combination + /// + /// The group ID part of the key + /// The edge node ID part of the key + /// The device ID part of the key (optional) + /// The SemaphoreSlim object for the specified EdgeNode/Device + public static SemaphoreSlim GetSemaphore(string groupId, string edgeNodeId, string? deviceId) + { + var key = BuildCacheKey(null, groupId, edgeNodeId, deviceId); + return Semaphores.GetOrAdd(key, _ => new SemaphoreSlim(1, 1)); + } + + /// + /// Clears all SemaphoreSlim objects from the cache + /// This method should only be called during application shutdown to prevent race conditions + /// + public static void ClearSemaphores() + { + foreach (var semaphore in Semaphores.Values) semaphore.Dispose(); + Semaphores.Clear(); + } +} \ No newline at end of file diff --git a/SparklerNet/HostApplication/Caches/IMessageOrderingService.cs b/SparklerNet/HostApplication/Caches/IMessageOrderingService.cs index d7ff86e..a89ceff 100644 --- a/SparklerNet/HostApplication/Caches/IMessageOrderingService.cs +++ b/SparklerNet/HostApplication/Caches/IMessageOrderingService.cs @@ -45,7 +45,7 @@ public interface IMessageOrderingService /// Clears the sequence cache and pending messages for a specific edge node or device /// Also cleans up any associated timer resources /// - /// The group ID of the edge node + /// The group ID /// The edge node ID /// The device ID (optional) void ClearMessageOrder(string groupId, string edgeNodeId, string? deviceId); diff --git a/SparklerNet/HostApplication/Caches/IStatusTrackingService.cs b/SparklerNet/HostApplication/Caches/IStatusTrackingService.cs new file mode 100644 index 0000000..1700e8f --- /dev/null +++ b/SparklerNet/HostApplication/Caches/IStatusTrackingService.cs @@ -0,0 +1,36 @@ +namespace SparklerNet.HostApplication.Caches; + +/// +/// Provides methods to track and query the status of edge nodes and devices. +/// +public interface IStatusTrackingService +{ + /// + /// Determines if a specific endpoint (EdgeNode or Device) is currently online. + /// + /// The group ID + /// The edge node ID + /// The device ID (optional) + /// True if the endpoint is online, otherwise false. + Task IsEndpointOnline(string groupId, string edgeNodeId, string? deviceId); + + /// + /// Updates the online status of a specific edge node. + /// + /// The group ID + /// The edge node ID + /// True if the edge node is online, otherwise false. + /// The bdSeq metric value of Birth or Death certificates + /// The timestamp of Birth or Death certificates + Task UpdateEdgeNodeOnlineStatus(string groupId, string edgeNodeId, bool isOnline, int bdSeq, long timestamp); + + /// + /// Updates the online status of a specific device. + /// + /// The group ID + /// The edge node ID + /// The device ID + /// True if the device is online, otherwise false. + /// The timestamp of Birth or Death certificates + Task UpdateDeviceOnlineStatus(string groupId, string edgeNodeId, string deviceId, bool isOnline, long timestamp); +} \ No newline at end of file diff --git a/SparklerNet/HostApplication/Caches/StatusTrackingService.cs b/SparklerNet/HostApplication/Caches/StatusTrackingService.cs new file mode 100644 index 0000000..7b98a7f --- /dev/null +++ b/SparklerNet/HostApplication/Caches/StatusTrackingService.cs @@ -0,0 +1,168 @@ +using System.Diagnostics.CodeAnalysis; +using Microsoft.Extensions.Caching.Hybrid; + +namespace SparklerNet.HostApplication.Caches; + +/// +/// Service responsible for tracking the online status of edge nodes and devices +/// Provides methods to check and update the online status of specific edge nodes and devices +/// Uses HybridCache for efficient caching with both in-memory and distributed capabilities +/// +public class StatusTrackingService : IStatusTrackingService +{ + private const string StatusKeyPrefix = "sparkplug:status:"; // Prefix for the status cache keys + private readonly HybridCache _cache; + + /// + /// Initializes a new instance of the + /// + /// The HybridCache instance for caching online status + public StatusTrackingService(HybridCache cache) + { + _cache = cache ?? throw new ArgumentNullException(nameof(cache)); + } + + /// + public async Task IsEndpointOnline(string groupId, string edgeNodeId, string? deviceId) + { + ArgumentException.ThrowIfNullOrWhiteSpace(groupId); + ArgumentException.ThrowIfNullOrWhiteSpace(edgeNodeId); + + // Build the cache key for status tracking + var cacheKey = CacheHelper.BuildCacheKey(StatusKeyPrefix, groupId, edgeNodeId, deviceId); + + // Use SemaphoreSlim for async thread safety + var semaphore = CacheHelper.GetSemaphore(groupId, edgeNodeId, null); + + try + { + // Wait for the semaphore asynchronously + await semaphore.WaitAsync(); + + // Get the status from the cache or create a new entry if it doesn't exist + var status = await _cache.GetOrCreateAsync( + cacheKey, _ => ValueTask.FromResult(null)); + + // If the status is not in the cache, assume it is offline + return status is { IsOnline: true }; + } + finally + { + semaphore.Release(); + } + } + + /// + public async Task UpdateEdgeNodeOnlineStatus(string groupId, string edgeNodeId, bool isOnline, int bdSeq, + long timestamp) + { + ArgumentException.ThrowIfNullOrWhiteSpace(groupId); + ArgumentException.ThrowIfNullOrWhiteSpace(edgeNodeId); + + // Build the cache key for status tracking + var cacheKey = CacheHelper.BuildCacheKey(StatusKeyPrefix, groupId, edgeNodeId, null); + var cacheTag = CacheHelper.BuildCacheKey(null, groupId, edgeNodeId, null); + + // Create a new status object + var newStatus = new EndpointStatus { IsOnline = isOnline, BdSeq = bdSeq, Timestamp = timestamp }; + + // Use SemaphoreSlim for async thread safety + var semaphore = CacheHelper.GetSemaphore(groupId, edgeNodeId, null); + + try + { + // Wait for the semaphore asynchronously + await semaphore.WaitAsync(); + + // Get the current status from the cache or create a new entry if it doesn't exist + var currentStatus = await _cache.GetOrCreateAsync( + cacheKey, _ => ValueTask.FromResult(newStatus), tags: [cacheTag]); + + // Online status update logic + if (newStatus.IsOnline) + { + // Update the cache if the new status is newer than the current status + // Note: if the cache is empty, currentStatus will be set to newStatus by GetOrCreateAsync + if (newStatus.Timestamp > currentStatus.Timestamp) + await _cache.SetAsync(cacheKey, newStatus, tags: [cacheTag]); + } + // Offline status update logic + else + { + // When the current status is offline, update if the new status is newer than the current status + // ReSharper disable once ConvertIfStatementToSwitchStatement + if (!currentStatus.IsOnline && newStatus.Timestamp > currentStatus.Timestamp) + await _cache.SetAsync(cacheKey, newStatus, tags: [cacheTag]); + + // When the current status is online, update if: + // 1. The new status has the same bdSeq + // 2. The new status has the same or newer timestamp + if (currentStatus.IsOnline && + (newStatus.BdSeq == currentStatus.BdSeq || newStatus.Timestamp >= currentStatus.Timestamp)) + { + await _cache.RemoveByTagAsync(cacheTag); + await _cache.SetAsync(cacheKey, newStatus, tags: [cacheTag]); + } + } + } + finally + { + // Always release the semaphore to prevent deadlocks + semaphore.Release(); + } + } + + /// + public async Task UpdateDeviceOnlineStatus(string groupId, string edgeNodeId, string deviceId, bool isOnline, + long timestamp) + { + ArgumentException.ThrowIfNullOrWhiteSpace(groupId); + ArgumentException.ThrowIfNullOrWhiteSpace(edgeNodeId); + ArgumentException.ThrowIfNullOrWhiteSpace(deviceId); + + // Build the cache key for status tracking + var cacheKey = CacheHelper.BuildCacheKey(StatusKeyPrefix, groupId, edgeNodeId, deviceId); + var cacheTag = CacheHelper.BuildCacheKey(null, groupId, edgeNodeId, null); + + // Create a new status object + var newStatus = new EndpointStatus { IsOnline = isOnline, BdSeq = 0, Timestamp = timestamp }; + + // Use SemaphoreSlim for async thread safety + // Because when Edge Node is offline, all devices are also offline, so always use the Edge Node level semaphore + var semaphore = CacheHelper.GetSemaphore(groupId, edgeNodeId, null); + + try + { + // Wait for the semaphore asynchronously + await semaphore.WaitAsync(); + + // Get the current status from the cache or create a new entry if it doesn't exist + var currentStatus = await _cache.GetOrCreateAsync( + cacheKey, _ => ValueTask.FromResult(newStatus), tags: [cacheTag]); + + // Update the cache if the new status is newer than the current status + // Note: if the cache is empty, currentStatus will be set to newStatus by GetOrCreateAsync + if (newStatus.Timestamp > currentStatus.Timestamp) + await _cache.SetAsync(cacheKey, newStatus, tags: [cacheTag]); + } + finally + { + // Always release the semaphore to prevent deadlocks + semaphore.Release(); + } + } + + /// + /// Simple data class to store online status information + /// Allows for future expansion of status data beyond just a boolean flag + /// + [SuppressMessage("ReSharper", "MemberCanBePrivate.Global")] + protected record EndpointStatus + { + public bool IsOnline { get; init; } + + public int BdSeq { get; init; } + + public long Timestamp { get; init; } + } +} \ No newline at end of file diff --git a/SparklerNet/SparklerNet.csproj b/SparklerNet/SparklerNet.csproj index fa31fd6..fab88a5 100644 --- a/SparklerNet/SparklerNet.csproj +++ b/SparklerNet/SparklerNet.csproj @@ -34,6 +34,7 @@ +