Skip to content

Commit c00d691

Browse files
author
Timothy Dodd
committed
Enhance SyncCoordinator for better resource management
- Implement IDisposable in SyncCoordinator for semaphore cleanup. - Replace single semaphore with ConcurrentDictionary for per-device locking. - Reduce webhook lock acquisition timeout from 5s to 2s. - Add CleanupUnusedDeviceLocks method to remove inactive device semaphores. - Update ReleaseWebhookLock to use specific device semaphores. - Modify WebhookLockReleaser to support new locking mechanism. - Call cleanup method in Worker after clearing pending webhook updates.
1 parent da013d2 commit c00d691

File tree

2 files changed

+76
-23
lines changed

2 files changed

+76
-23
lines changed

src/HubitatMqtt/Services/SyncCoordinator.cs

Lines changed: 73 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,52 +5,54 @@ namespace HubitatMqtt.Services
55
/// <summary>
66
/// Coordinates between webhook updates and periodic sync to prevent race conditions
77
/// </summary>
8-
public class SyncCoordinator
8+
public class SyncCoordinator : IDisposable
99
{
1010
private readonly ILogger<SyncCoordinator> _logger;
11-
private readonly SemaphoreSlim _syncSemaphore;
11+
private readonly ConcurrentDictionary<string, SemaphoreSlim> _deviceLocks;
12+
private readonly SemaphoreSlim _fullSyncSemaphore;
1213
private readonly ConcurrentDictionary<string, DateTime> _pendingWebhookUpdates;
13-
private bool _fullSyncInProgress;
14+
private volatile bool _fullSyncInProgress;
1415
private DateTime _lastFullSync;
1516

1617
public SyncCoordinator(ILogger<SyncCoordinator> logger)
1718
{
1819
_logger = logger;
19-
_syncSemaphore = new SemaphoreSlim(1, 1);
20+
_deviceLocks = new ConcurrentDictionary<string, SemaphoreSlim>();
21+
_fullSyncSemaphore = new SemaphoreSlim(1, 1);
2022
_pendingWebhookUpdates = new ConcurrentDictionary<string, DateTime>();
2123
_fullSyncInProgress = false;
2224
_lastFullSync = DateTime.MinValue;
2325
}
2426

2527
/// <summary>
26-
/// Acquires a lock for webhook processing, preventing conflicts with full sync
28+
/// Acquires a lock for webhook processing. Uses per-device locking for better concurrency.
2729
/// </summary>
2830
public async Task<IDisposable> AcquireWebhookLockAsync(string deviceId, TimeSpan timeout = default)
2931
{
3032
if (timeout == default)
31-
timeout = TimeSpan.FromSeconds(5);
32-
33-
if (!await _syncSemaphore.WaitAsync(timeout))
34-
{
35-
_logger.LogWarning("Timeout waiting for webhook lock for device {DeviceId}", deviceId);
36-
throw new TimeoutException($"Failed to acquire webhook lock for device {deviceId} within {timeout}");
37-
}
33+
timeout = TimeSpan.FromSeconds(2); // Reduced timeout since we're using per-device locks
3834

35+
// If full sync is in progress, defer webhook updates
3936
if (_fullSyncInProgress)
4037
{
4138
_logger.LogDebug("Full sync in progress, deferring webhook update for device {DeviceId}", deviceId);
42-
_syncSemaphore.Release();
43-
44-
// Record this webhook update as pending
4539
_pendingWebhookUpdates.TryAdd(deviceId, DateTime.UtcNow);
46-
4740
throw new InvalidOperationException("Full sync in progress, webhook update deferred");
4841
}
4942

43+
// Get or create device-specific semaphore
44+
var deviceSemaphore = _deviceLocks.GetOrAdd(deviceId, _ => new SemaphoreSlim(1, 1));
45+
46+
if (!await deviceSemaphore.WaitAsync(timeout))
47+
{
48+
_logger.LogWarning("Timeout waiting for webhook lock for device {DeviceId}", deviceId);
49+
throw new TimeoutException($"Failed to acquire webhook lock for device {deviceId} within {timeout}");
50+
}
51+
5052
// Mark this device as having a pending webhook update
5153
_pendingWebhookUpdates.TryAdd(deviceId, DateTime.UtcNow);
5254

53-
return new WebhookLockReleaser(this, deviceId);
55+
return new WebhookLockReleaser(this, deviceId, deviceSemaphore);
5456
}
5557

5658
/// <summary>
@@ -61,7 +63,7 @@ public async Task<IDisposable> AcquireFullSyncLockAsync(TimeSpan timeout = defau
6163
if (timeout == default)
6264
timeout = TimeSpan.FromSeconds(30);
6365

64-
if (!await _syncSemaphore.WaitAsync(timeout))
66+
if (!await _fullSyncSemaphore.WaitAsync(timeout))
6567
{
6668
_logger.LogWarning("Timeout waiting for full sync lock");
6769
throw new TimeoutException($"Failed to acquire full sync lock within {timeout}");
@@ -105,40 +107,88 @@ public void ClearPendingWebhookUpdates(HashSet<string> syncedDeviceIds)
105107
_logger.LogDebug("Cleared pending webhook updates for {Count} devices", syncedDeviceIds.Count);
106108
}
107109

108-
private void ReleaseWebhookLock(string deviceId)
110+
private void ReleaseWebhookLock(string deviceId, SemaphoreSlim deviceSemaphore)
109111
{
110112
_logger.LogDebug("Released webhook lock for device {DeviceId}", deviceId);
111-
_syncSemaphore.Release();
113+
deviceSemaphore.Release();
112114
}
113115

114116
private void ReleaseFullSyncLock()
115117
{
116118
_fullSyncInProgress = false;
117119
_lastFullSync = DateTime.UtcNow;
118120
_logger.LogDebug("Released full sync lock");
119-
_syncSemaphore.Release();
121+
_fullSyncSemaphore.Release();
120122
}
121123

122124
public bool IsFullSyncInProgress => _fullSyncInProgress;
123125
public DateTime LastFullSync => _lastFullSync;
124126

127+
/// <summary>
128+
/// Cleanup unused device locks to prevent memory leaks
129+
/// </summary>
130+
public void CleanupUnusedDeviceLocks(HashSet<string> activeDeviceIds)
131+
{
132+
var locksToRemove = new List<string>();
133+
134+
foreach (var kvp in _deviceLocks)
135+
{
136+
var deviceId = kvp.Key;
137+
var semaphore = kvp.Value;
138+
139+
// If device is not active and semaphore is not in use, remove it
140+
if (!activeDeviceIds.Contains(deviceId) && semaphore.CurrentCount == 1)
141+
{
142+
locksToRemove.Add(deviceId);
143+
}
144+
}
145+
146+
foreach (var deviceId in locksToRemove)
147+
{
148+
if (_deviceLocks.TryRemove(deviceId, out var removedSemaphore))
149+
{
150+
removedSemaphore.Dispose();
151+
_logger.LogDebug("Cleaned up unused device lock for {DeviceId}", deviceId);
152+
}
153+
}
154+
155+
if (locksToRemove.Count > 0)
156+
{
157+
_logger.LogDebug("Cleaned up {Count} unused device locks", locksToRemove.Count);
158+
}
159+
}
160+
161+
public void Dispose()
162+
{
163+
_fullSyncSemaphore?.Dispose();
164+
165+
foreach (var kvp in _deviceLocks)
166+
{
167+
kvp.Value.Dispose();
168+
}
169+
170+
_deviceLocks.Clear();
171+
}
172+
125173
private class WebhookLockReleaser : IDisposable
126174
{
127175
private readonly SyncCoordinator _coordinator;
128176
private readonly string _deviceId;
177+
private readonly SemaphoreSlim _deviceSemaphore;
129178
private bool _disposed;
130179

131-
public WebhookLockReleaser(SyncCoordinator coordinator, string deviceId)
180+
public WebhookLockReleaser(SyncCoordinator coordinator, string deviceId, SemaphoreSlim deviceSemaphore)
132181
{
133182
_coordinator = coordinator;
134183
_deviceId = deviceId;
184+
_deviceSemaphore = deviceSemaphore;
135185
}
136186

137187
public void Dispose()
138188
{
139189
if (!_disposed)
140190
{
141-
_coordinator.ReleaseWebhookLock(_deviceId);
191+
_coordinator.ReleaseWebhookLock(_deviceId, _deviceSemaphore);
142192
_disposed = true;
143193
}
144194
}

src/HubitatMqtt/Services/Worker.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,9 @@ private async Task PerformFullPollAsync()
146146
// Clear pending webhook updates for devices that were successfully synced
147147
var syncedDeviceIds = devices.Where(d => d.Id != null).Select(d => d.Id!).ToHashSet();
148148
_syncCoordinator.ClearPendingWebhookUpdates(syncedDeviceIds);
149+
150+
// Cleanup unused device locks to prevent memory leaks
151+
_syncCoordinator.CleanupUnusedDeviceLocks(syncedDeviceIds);
149152

150153
if (failedCount > 0)
151154
{

0 commit comments

Comments
 (0)