Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

namespace Akka.Persistence.Sql.Snapshot
{
public class ByteArraySnapshotDao : ISnapshotDao
public class ByteArraySnapshotDao : ISnapshotDao, IDisposable
{
private readonly AkkaPersistenceDataConnectionFactory _connectionFactory;
private readonly ByteArrayDateTimeSnapshotSerializer _dateTimeSerializer;
Expand Down Expand Up @@ -54,7 +54,7 @@ public async Task DeleteAllSnapshotsAsync(
string persistenceId,
CancellationToken cancellationToken)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
await _connectionFactory.ExecuteWithTransactionAsync(
_writeIsolationLevel,
cts.Token,
Expand Down Expand Up @@ -82,7 +82,7 @@ public async Task DeleteUpToMaxSequenceNrAsync(
long maxSequenceNr,
CancellationToken cancellationToken)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
await _connectionFactory.ExecuteWithTransactionAsync(
_writeIsolationLevel,
cts.Token,
Expand Down Expand Up @@ -116,7 +116,7 @@ public async Task DeleteUpToMaxTimestampAsync(
DateTime maxTimestamp,
CancellationToken cancellationToken)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
await _connectionFactory.ExecuteWithTransactionAsync(
_writeIsolationLevel,
cts.Token,
Expand Down Expand Up @@ -151,7 +151,7 @@ public async Task DeleteUpToMaxSequenceNrAndMaxTimestampAsync(
DateTime maxTimestamp,
CancellationToken cancellationToken)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
await _connectionFactory.ExecuteWithTransactionAsync(
_writeIsolationLevel,
cts.Token,
Expand Down Expand Up @@ -186,7 +186,7 @@ public async Task<Option<SelectedSnapshot>> LatestSnapshotAsync(
string persistenceId,
CancellationToken cancellationToken)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
return await _connectionFactory.ExecuteWithTransactionAsync(
_readIsolationLevel,
cts.Token,
Expand Down Expand Up @@ -224,7 +224,7 @@ public async Task<Option<SelectedSnapshot>> SnapshotForMaxTimestampAsync(
DateTime timestamp,
CancellationToken cancellationToken)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
return await _connectionFactory.ExecuteWithTransactionAsync(
_readIsolationLevel,
cts.Token,
Expand Down Expand Up @@ -268,7 +268,7 @@ public async Task<Option<SelectedSnapshot>> SnapshotForMaxSequenceNrAsync(
long sequenceNr,
CancellationToken cancellationToken)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
return await _connectionFactory.ExecuteWithTransactionAsync(
_readIsolationLevel,
cts.Token,
Expand Down Expand Up @@ -313,7 +313,7 @@ public async Task<Option<SelectedSnapshot>> SnapshotForMaxSequenceNrAndMaxTimest
DateTime timestamp,
CancellationToken cancellationToken)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
return await _connectionFactory.ExecuteWithTransactionAsync(
_readIsolationLevel,
cts.Token,
Expand Down Expand Up @@ -360,7 +360,7 @@ public async Task DeleteAsync(
DateTime timestamp,
CancellationToken cancellationToken)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
await _connectionFactory.ExecuteWithTransactionAsync(
_writeIsolationLevel,
cts.Token,
Expand Down Expand Up @@ -403,7 +403,7 @@ public async Task SaveAsync(
object snapshot,
CancellationToken cancellationToken)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
await _connectionFactory.ExecuteWithTransactionAsync(
_writeIsolationLevel,
cts.Token,
Expand Down Expand Up @@ -443,5 +443,11 @@ public async Task InitializeTables()
await connection.CreateTableAsync<LongSnapshotRow>(TableOptions.CreateIfNotExists, footer);
}
}

public void Dispose()
{
_shutdownCts.Cancel();
_shutdownCts.Dispose();
}
}
}
6 changes: 6 additions & 0 deletions src/Akka.Persistence.Sql/Snapshot/SqlSnapshotStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ protected override void PreStart()
BecomeStacked(WaitingForInitialization);
}

protected override void PostStop()
{
_dao.Dispose();
base.PostStop();
}

private bool WaitingForInitialization(object message)
{
switch (message)
Expand Down
Loading