Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 100 additions & 65 deletions src/Microsoft.Health.TaskManagement/JobHosting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
// -------------------------------------------------------------------------------------------------

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Security.Cryptography;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -18,7 +21,7 @@ namespace Microsoft.Health.JobManagement
{
public class JobHosting
{
private static readonly ActivitySource JobHostingActivitySource = new ActivitySource(nameof(JobHosting));
private static readonly ActivitySource _jobHostingActivitySource = new ActivitySource(nameof(JobHosting));
private readonly IQueueClient _queueClient;
private readonly IJobFactory _jobFactory;
private readonly ILogger<JobHosting> _logger;
Expand All @@ -33,6 +36,7 @@ public JobHosting(IQueueClient queueClient, IJobFactory jobFactory, ILogger<JobH
_queueClient = queueClient;
_jobFactory = jobFactory;
_logger = logger;
RunningJobsTarget = new ConcurrentDictionary<byte, int>();
}

public int PollingFrequencyInSeconds { get; set; } = Constants.DefaultPollingFrequencyInSeconds;
Expand All @@ -41,104 +45,135 @@ public JobHosting(IQueueClient queueClient, IJobFactory jobFactory, ILogger<JobH

public double JobHeartbeatIntervalInSeconds { get; set; } = Constants.DefaultJobHeartbeatIntervalInSeconds;

public ConcurrentDictionary<byte, int> RunningJobsTarget { get; private set; }

public async Task ExecuteAsync(byte queueType, short runningJobCount, string workerName, CancellationTokenSource cancellationTokenSource)
{
var workers = new List<Task>();
_logger.LogInformation("Queue={QueueType}: job hosting is starting...", queueType);
SetRunningJobsTarget(queueType, runningJobCount); // this happens only once according to our current logic
_lastHeartbeatLog = DateTime.UtcNow;

_logger.LogInformation("Queue {QueueType} is starting.", queueType);

// parallel dequeue
for (var thread = 0; thread < runningJobCount; thread++)
var workers = new List<Task<JobInfo>>();
var dequeueDelay = true;
var dequeueTimeoutJobStopwatch = Stopwatch.StartNew();
var dequeueTimeoutJobsCounter = 0;
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
workers.Add(Task.Run(async () =>
RunningJobsTarget.TryGetValue(queueType, out var runningJobsTarget);
while (workers.Count < runningJobsTarget && !cancellationTokenSource.Token.IsCancellationRequested)
{
// random delay to avoid convoys
await Task.Delay(TimeSpan.FromSeconds(RandomNumberGenerator.GetInt32(100) / 100.0 * PollingFrequencyInSeconds));

var checkTimeoutJobStopwatch = Stopwatch.StartNew();

while (!cancellationTokenSource.Token.IsCancellationRequested)
workers.Add(Task.Run(async () =>
{
if (DateTime.UtcNow - _lastHeartbeatLog > TimeSpan.FromHours(1))
//// wait
if (dequeueDelay)
{
_lastHeartbeatLog = DateTime.UtcNow;
_logger.LogInformation("{QueueType} working is running.", queueType);
var delay = TimeSpan.FromSeconds(((RandomNumberGenerator.GetInt32(20) / 100.0) + 0.9) * PollingFrequencyInSeconds); // random delay to avoid convoys
_logger.LogDebug("Queue={QueueType}: delaying job execution for {DequeueDelay}.", queueType, delay);
await Task.Delay(delay);
}

JobInfo nextJob = null;
JobInfo job = null;
//// dequeue
if (_queueClient.IsInitialized())
{
try
{
_logger.LogDebug("Dequeuing next job for {QueueType}.", queueType);
_logger.LogDebug("Queue={QueueType}: dequeuing next job...", queueType);

if (checkTimeoutJobStopwatch.Elapsed.TotalSeconds > 600)
if (Interlocked.Decrement(ref dequeueTimeoutJobsCounter) >= 0)
{
checkTimeoutJobStopwatch.Restart();
nextJob = await _queueClient.DequeueAsync(queueType, workerName, JobHeartbeatTimeoutThresholdInSeconds, cancellationTokenSource.Token, null, true);
job = await _queueClient.DequeueAsync(queueType, workerName, JobHeartbeatTimeoutThresholdInSeconds, cancellationTokenSource.Token, null, true);
}

nextJob ??= await _queueClient.DequeueAsync(queueType, workerName, JobHeartbeatTimeoutThresholdInSeconds, cancellationTokenSource.Token);
job ??= await _queueClient.DequeueAsync(queueType, workerName, JobHeartbeatTimeoutThresholdInSeconds, cancellationTokenSource.Token);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to dequeue new job for {QueueType}.", queueType);
_logger.LogError(ex, "Queue={QueueType}: failed to dequeue new job.", queueType);
}
}

if (nextJob != null)
//// execute
if (job != null)
{
using (Activity activity = JobHostingActivitySource.StartActivity(
JobHostingActivitySource.Name,
ActivityKind.Server))
{
if (activity == null)
{
_logger.LogWarning("Failed to start an activity.");
}
await ExecuteJobWithActivityAsync(job);
}

activity?.SetTag("CreateDate", nextJob.CreateDate);
activity?.SetTag("HeartbeatDateTime", nextJob.HeartbeatDateTime);
activity?.SetTag("Id", nextJob.Id);
activity?.SetTag("QueueType", nextJob.QueueType);
activity?.SetTag("Version", nextJob.Version);
return job;
}));

_logger.LogJobInformation(nextJob, "Job dequeued.");
await ExecuteJobAsync(nextJob);
}
}
else
{
try
{
_logger.LogDebug("Empty queue {QueueType}. Delaying until next iteration.", queueType);
await Task.Delay(TimeSpan.FromSeconds(PollingFrequencyInSeconds), cancellationTokenSource.Token);
}
catch (TaskCanceledException)
{
_logger.LogInformation("Queue {QueueType} is stopping, worker is shutting down.", queueType);
}
}
}
}));
}
_logger.LogDebug("Queue={QueueType}: total workers = {Workers}.", queueType, workers.Count);
}

try
{
// If any worker crashes or complete after cancellation due to shutdown,
// cancel all workers and wait for completion so they don't crash unnecessarily.
await Task.WhenAny(workers.ToArray());
try
{
var completed = await Task.WhenAny(workers);
workers.Remove(completed);
dequeueDelay = await completed == null; // no job info == queue was empty
}
catch (Exception ex)
{
_logger.LogError(ex, "Queue={QueueType}: job hosting task failed.", queueType);
#if NET6_0
cancellationTokenSource.Cancel();
cancellationTokenSource.Cancel();
#else
await cancellationTokenSource.CancelAsync();
await cancellationTokenSource.CancelAsync();
#endif
}

if (dequeueTimeoutJobStopwatch.Elapsed.TotalSeconds > 600)
{
dequeueTimeoutJobsCounter = runningJobsTarget;
dequeueTimeoutJobStopwatch.Restart();
}

if (DateTime.UtcNow - _lastHeartbeatLog > TimeSpan.FromHours(1))
{
_lastHeartbeatLog = DateTime.UtcNow;
_logger.LogInformation("Queue={QueueType}: job hosting is running, total workers = {Workers}.", queueType, workers.Count);
}
}

try
{
await Task.WhenAll(workers.ToArray());
}
catch (Exception ex)
{
_logger.LogError(ex, "Job failed to execute. Queue type: {QueueType}", queueType);
_logger.LogError(ex, "Queue={QueueType}: job hosting task failed.", queueType);
}
}

public void SetRunningJobsTarget(byte queueType, int target)
{
if (!RunningJobsTarget.ContainsKey(queueType))
{
RunningJobsTarget.TryAdd(queueType, target);
}
else
{
RunningJobsTarget[queueType] = target;
}

_logger.LogInformation("Queue={QueueType}: running jobs target is set to {RunningJobsTarget}.", queueType, target);
}

private async Task ExecuteJobWithActivityAsync(JobInfo nextJob)
{
using (Activity activity = _jobHostingActivitySource.StartActivity(_jobHostingActivitySource.Name, ActivityKind.Server))
{
if (activity == null)
{
_logger.LogWarning("Failed to start an activity.");
}

activity?.SetTag("CreateDate", nextJob.CreateDate);
activity?.SetTag("HeartbeatDateTime", nextJob.HeartbeatDateTime);
activity?.SetTag("Id", nextJob.Id);
activity?.SetTag("QueueType", nextJob.QueueType);
activity?.SetTag("Version", nextJob.Version);

_logger.LogJobInformation(nextJob, "Job dequeued.");
await ExecuteJobAsync(nextJob);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Tracing;
using System.IO;
using System.Linq;
using System.Net;
Expand Down Expand Up @@ -57,6 +58,30 @@ public ImportTests(ImportTestFixture<StartupForImportTestProvider> fixture)
_fixture = fixture;
}

[Fact]
public async Task CheckNumberOfDequeues()
{
if (!_fixture.IsUsingInProcTestServer)
{
return;
}

ExecuteSql("INSERT INTO dbo.Parameters(Id, Char) SELECT 'DequeueJob', 'LogEvent'");

await Task.Delay(TimeSpan.FromSeconds(12));

var dequeues = (int)ExecuteSql(@$"
SELECT count(*)
FROM dbo.EventLog
WHERE EventDate > dateadd(second,-10,getUTCdate())
AND Process = 'DequeueJob'
AND Mode LIKE 'Q=2 %' -- import
");

// polling interval is set to 1 second. 2 jobs. expected 20.
Assert.True(dequeues > 16 && dequeues < 24, $"not expected dequeues={dequeues}");
}

[Theory]
[InlineData(false)] // eventualConsistency=false
[InlineData(true)] // eventualConsistency=true
Expand Down
Loading