Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 45 additions & 32 deletions Hangfire.Redis.StackExchange/ExpiredJobsWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ internal class ExpiredJobsWatcher : IServerComponent
#pragma warning restore 618
{
private static readonly ILog Logger = LogProvider.For<ExpiredJobsWatcher>();
private static readonly string OwnerId = Guid.NewGuid().ToString();
private static readonly TimeSpan DefaultHoldDuration = TimeSpan.FromSeconds(30);

private readonly RedisStorage _storage;
private readonly TimeSpan _checkInterval;
Expand Down Expand Up @@ -46,52 +48,63 @@ void IServerComponent.Execute(CancellationToken cancellationToken)
using (var connection = (RedisConnection)_storage.GetConnection())
{
var redis = connection.Redis;

foreach (var key in ProcessedKeys)
{
var redisKey = _storage.GetRedisKey(key);
var redisKeyLock = $"{redisKey}:execute:lock";
if (!redis.LockTake(redisKeyLock, OwnerId, DefaultHoldDuration))
{
continue;
}

var count = redis.ListLength(redisKey);
if (count == 0) continue;

Logger.InfoFormat("Removing expired records from the '{0}' list...", key);

const int batchSize = 100;
var keysToRemove = new List<string>();

for (var last = count - 1; last >= 0; last -= batchSize)
try
{
var first = Math.Max(0, last - batchSize + 1);
var count = redis.ListLength(redisKey);
if (count == 0) continue;

Logger.InfoFormat("Removing expired records from the '{0}' list...", key);

var jobIds = redis.ListRange(redisKey, first, last).ToStringArray();
if (jobIds.Length == 0) continue;
const int batchSize = 100;
var keysToRemove = new List<string>();

var pipeline = redis.CreateBatch();
var tasks = new Task[jobIds.Length];

for (var i = 0; i < jobIds.Length; i++)
for (var last = count - 1; last >= 0; last -= batchSize)
{
tasks[i] = pipeline.KeyExistsAsync(_storage.GetRedisKey($"job:{jobIds[i]}"));
var first = Math.Max(0, last - batchSize + 1);

var jobIds = redis.ListRange(redisKey, first, last).ToStringArray();
if (jobIds.Length == 0) continue;

var pipeline = redis.CreateBatch();
var tasks = new Task[jobIds.Length];

for (var i = 0; i < jobIds.Length; i++)
{
tasks[i] = pipeline.KeyExistsAsync(_storage.GetRedisKey($"job:{jobIds[i]}"));
}

pipeline.Execute();
Task.WaitAll(tasks);

keysToRemove.AddRange(jobIds.Where((t, i) => !((Task<bool>)tasks[i]).Result));
}

pipeline.Execute();
Task.WaitAll(tasks);
if (keysToRemove.Count == 0) continue;

keysToRemove.AddRange(jobIds.Where((t, i) => !((Task<bool>)tasks[i]).Result));
}

if (keysToRemove.Count == 0) continue;

Logger.InfoFormat("Removing {0} expired jobs from '{1}' list...", keysToRemove.Count, key);
Logger.InfoFormat("Removing {0} expired jobs from '{1}' list...", keysToRemove.Count, key);

using (var transaction = connection.CreateWriteTransaction())
{
foreach (var jobId in keysToRemove)
using (var transaction = connection.CreateWriteTransaction())
{
transaction.RemoveFromList(key, jobId);
foreach (var jobId in keysToRemove)
{
transaction.RemoveFromList(key, jobId);
}

transaction.Commit();
}

transaction.Commit();
}
finally
{
redis.LockRelease(redisKeyLock, OwnerId);
}
}
}
Expand Down
79 changes: 79 additions & 0 deletions Hangfire.Redis.StackExchange/LocalCache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using System;
using System.Collections.Concurrent;
using System.Linq;

namespace Hangfire.Redis
{
public sealed class LocalCache
{
private static readonly Lazy<LocalCache> _instance = new Lazy<LocalCache>(() => new LocalCache());
public static LocalCache Instance => _instance.Value;

private const int DefaultMax = 1000;
private const int DefaultExpiredMinutes = 10;
private readonly ConcurrentDictionary<string, long> _cache;
private readonly ConcurrentQueue<string> _queue;

private long _checkTicks;
private static readonly object syncRoot = new object();

public LocalCache()
{
_cache = new ConcurrentDictionary<string, long>();
_queue = new ConcurrentQueue<string>();
_checkTicks = 0;
}

/// <summary>
/// 如果已存在则不添加并返回false,如果不存在则添加,并清理超出默认队列长度的旧Key
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public bool TryAdd(string key)
{
if (_cache.ContainsKey(key)) return false;
lock (syncRoot)
{
if (_cache.ContainsKey(key)) return false;

var expiredTicks = DateTime.Now.AddMinutes(DefaultExpiredMinutes).Ticks;
if (_cache.TryAdd(key, expiredTicks))
{
_queue.Enqueue(key);
}
else
{
return false;
}

if (_queue.Count > DefaultMax && _queue.TryDequeue(out var lastKey))
{
_cache.TryRemove(lastKey, out _);
}

CleanIfExpired();
return true;
}
}

private void CleanIfExpired()
{
if (_queue.IsEmpty && _cache.IsEmpty) return;
if (_queue.Count < 2 && _cache.Count < 2) return; // 数量为1则表示新增加的数据,不做过期处理

var lastExpiredTicks = DateTime.Now.AddMinutes(-DefaultExpiredMinutes).Ticks;
if (_checkTicks > lastExpiredTicks) return;

var expiredKeys = _cache.Where(x => x.Value < _checkTicks).Select(x => x.Key);
_checkTicks = lastExpiredTicks; // 放在check和筛选后
if (!expiredKeys.Any()) return;

foreach (var expiredKey in expiredKeys)
{
_cache.TryRemove(expiredKey, out _);
_queue.TryDequeue(out _); // 出队相应数量的数据
}
}

}
}
10 changes: 8 additions & 2 deletions Hangfire.Redis.StackExchange/RedisSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Hangfire.Redis
internal class RedisSubscription : IServerComponent
#pragma warning restore 618
{
private readonly ManualResetEvent _mre = new ManualResetEvent(false);
private readonly AutoResetEvent _mre = new AutoResetEvent(false);
private readonly RedisStorage _storage;
private readonly ISubscriber _subscriber;

Expand All @@ -20,7 +20,13 @@ public RedisSubscription([NotNull] RedisStorage storage, [NotNull] ISubscriber s
Channel = _storage.GetRedisKey("JobFetchChannel");

_subscriber = subscriber ?? throw new ArgumentNullException(nameof(subscriber));
_subscriber.Subscribe(Channel, (channel, value) => _mre.Set());
_subscriber.Subscribe(Channel, (channel, value) =>
{
if (value.HasValue && LocalCache.Instance.TryAdd(value.ToString()))
{
_mre.Set();
}
});
}

public string Channel { get; }
Expand Down