Skip to content

Commit 6b84d1c

Browse files
authored
[Storage] [DataMovement] Setting delegate process to null and objects to null (#52994)
* Set ChannelProcessor process to null in cleanup; exception handler in chunkhandlers will log unexpected exceptions; remove Transfermanager TransferInternalState reference after transfer completes; Dispose TransferManager in ClientExtensions * Update changelog; change transfermanager use in copy tests to dispose * WIP - updating tests to dipose Transfermanager * Update tests to dispose TransferManager after usage * Undo change to changelog * Update Snippets * Undo transfermanager dispose test changes * Prevent state change after transfer has been completed
1 parent 5d25ff2 commit 6b84d1c

File tree

6 files changed

+73
-21
lines changed

6 files changed

+73
-21
lines changed

sdk/storage/Azure.Storage.DataMovement.Blobs/perf/Azure.Storage.DataMovement.Blobs.Perf/Infrastructure/DirectoryTransferTest.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,12 @@ public DirectoryTransferTest(TOptions options) : base(options)
3434
_transferManager = new TransferManager(managerOptions);
3535
}
3636

37+
public override async Task CleanupAsync()
38+
{
39+
await ((IAsyncDisposable)_transferManager).DisposeAsync();
40+
await base.CleanupAsync();
41+
}
42+
3743
protected string CreateLocalDirectory(bool populate = false)
3844
{
3945
string directory = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());

sdk/storage/Azure.Storage.DataMovement/src/ChannelProcessing.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ public async Task CleanUpAsync()
9191
{
9292
_channel.Writer.TryComplete();
9393
await _processorTaskCompletionSource.Task.ConfigureAwait(false);
94+
95+
// Null out the Process delegate to release references
96+
Interlocked.Exchange(ref _process, null);
9497
}
9598

9699
protected abstract ValueTask NotifyOfPendingItemProcessing();

sdk/storage/Azure.Storage.DataMovement/src/CommitChunkHandler.cs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,10 @@ public CommitChunkHandler(
8383
_isChunkHandlerRunning = true;
8484
}
8585

86-
public Task CleanUpAsync()
86+
public async Task CleanUpAsync()
8787
{
8888
_isChunkHandlerRunning = false;
89-
return _stageChunkProcessor.CleanUpAsync();
89+
await _stageChunkProcessor.CleanUpAsync().ConfigureAwait(false);
9090
}
9191

9292
public async ValueTask QueueChunkAsync(QueueStageChunkArgs args, CancellationToken cancellationToken = default)
@@ -137,7 +137,21 @@ await _queuePutBlockTask(
137137
if (_isChunkHandlerRunning)
138138
{
139139
// This will trigger the job part to call Dispose on this object
140-
_ = Task.Run(() => _invokeFailedEventHandler(ex));
140+
_ = Task.Run(async () =>
141+
{
142+
try
143+
{
144+
await _invokeFailedEventHandler(ex).ConfigureAwait(false);
145+
}
146+
catch
147+
{
148+
// Log and swallow any exceptions to prevent crashing the process
149+
DataMovementEventSource.Singleton
150+
.UnexpectedTransferFailed(
151+
nameof(CommitChunkHandler),
152+
ex.ToString());
153+
}
154+
});
141155
}
142156
}
143157
}

sdk/storage/Azure.Storage.DataMovement/src/DataMovementEventSource.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ internal class DataMovementEventSource : AzureEventSource
1616
private const int EnumerationCompleteEvent = 4;
1717
private const int ResumeTransferEvent = 5;
1818
private const int ResumeEnumerationCompleteEvent = 6;
19+
private const int UnexpectedTransferFailedEvent = 7;
1920

2021
private DataMovementEventSource() : base(EventSourceName) { }
2122

@@ -92,5 +93,11 @@ public void ResumeEnumerationComplete(string transferId, int jobPartCount)
9293
{
9394
WriteEvent(ResumeEnumerationCompleteEvent, transferId, jobPartCount);
9495
}
96+
97+
[Event(UnexpectedTransferFailedEvent, Level = EventLevel.Error, Message = "Transfer [{0}] Transfer failed: {1}")]
98+
public void UnexpectedTransferFailed(string transferId, string errorMessage)
99+
{
100+
WriteEvent(UnexpectedTransferFailedEvent, transferId, errorMessage);
101+
}
95102
}
96103
}

sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,21 @@ await _copyToDestinationFile(
136136
if (_isChunkHandlerRunning)
137137
{
138138
// This will trigger the job part to call Dispose on this object
139-
_ = Task.Run(() => _invokeFailedEventHandler(ex));
139+
_ = Task.Run(async () =>
140+
{
141+
try
142+
{
143+
await _invokeFailedEventHandler(ex).ConfigureAwait(false);
144+
}
145+
catch
146+
{
147+
// Log and swallow any exceptions to prevent crashing the process
148+
DataMovementEventSource.Singleton
149+
.UnexpectedTransferFailed(
150+
nameof(CommitChunkHandler),
151+
ex.ToString());
152+
}
153+
});
140154
}
141155
}
142156
}

sdk/storage/Azure.Storage.DataMovement/src/TransferInternalState.cs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -91,26 +91,34 @@ public TransferStatus GetTransferStatus()
9191
/// <returns>Returns whether or not the status has been changed from its original state.</returns>
9292
public bool SetTransferState(TransferState state)
9393
{
94-
Argument.AssertNotNull(TransferManager, nameof(TransferManager));
95-
if (_status.SetTransferStateChange(state))
94+
// Only allow state changes if not already in a Completed/Paused state.
95+
if (TransferState.Completed != _status.State &&
96+
TransferState.Paused != _status.State)
9697
{
97-
if (TransferState.Completed == _status.State ||
98-
TransferState.Paused == _status.State)
98+
Argument.AssertNotNull(TransferManager, nameof(TransferManager));
99+
if (_status.SetTransferStateChange(state))
99100
{
100-
DataMovementEventSource.Singleton.TransferCompleted(Id, _status);
101-
// If the _completionSource has been cancelled or the exception
102-
// has been set, we don't need to check if TrySetResult returns false
103-
// because it's acceptable to cancel or have an error occur before then.
104-
105-
CompletionSource.TrySetResult(_status);
106-
107-
// Tell the transfer manager to clean up the completed/paused job.
108-
TransferManager.TryRemoveTransfer(_id);
109-
110-
// Once we reach a Completed/Paused, Dispose the CancellationTokenSource to release resources (since it is no longer needed).
111-
DisposeCancellationTokenSource();
101+
if (TransferState.Completed == _status.State ||
102+
TransferState.Paused == _status.State)
103+
{
104+
DataMovementEventSource.Singleton.TransferCompleted(Id, _status);
105+
// If the _completionSource has been cancelled or the exception
106+
// has been set, we don't need to check if TrySetResult returns false
107+
// because it's acceptable to cancel or have an error occur before then.
108+
109+
CompletionSource.TrySetResult(_status);
110+
111+
// Tell the transfer manager to clean up the completed/paused job.
112+
TransferManager.TryRemoveTransfer(_id);
113+
114+
// Remove Transfer Manager reference after no longer needed.
115+
TransferManager = null;
116+
117+
// Once we reach a Completed/Paused, Dispose the CancellationTokenSource to release resources (since it is no longer needed).
118+
DisposeCancellationTokenSource();
119+
}
120+
return true;
112121
}
113-
return true;
114122
}
115123
return false;
116124
}

0 commit comments

Comments
 (0)