Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace Akka.Persistence.Azure
}
public class static CloudTableExtensions
{
[Akka.Annotations.InternalApiAttribute()]
public static System.Threading.Tasks.Task<System.Collections.Generic.IReadOnlyList<Azure.Response>> ExecuteBatchAsLimitedBatches(this Azure.Data.Tables.TableClient table, System.Collections.Generic.List<Azure.Data.Tables.TableTransactionAction> batch, System.Threading.CancellationToken token) { }
}
public class static ListExtensions
Expand Down Expand Up @@ -129,6 +130,11 @@ namespace Akka.Persistence.Azure.Journal
public Akka.Persistence.Azure.Journal.AzureTableStorageJournalSetup Get(string journalId) { }
public void Set(string journalId, Akka.Persistence.Azure.Journal.AzureTableStorageJournalSetup setup) { }
}
public class PersistenceOperationException : System.Exception
{
public PersistenceOperationException(string message) { }
public PersistenceOperationException(string message, System.Exception innerException) { }
}
}
namespace Akka.Persistence.Azure.Query
{
Expand Down
59 changes: 54 additions & 5 deletions src/Akka.Persistence.Azure/CloudTableExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,26 @@
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Akka.Annotations;
using Akka.Persistence.Azure.Journal;
using Akka.Persistence.Azure.TableEntities;
using Azure;
using Azure.Data.Tables;

#nullable enable
namespace Akka.Persistence.Azure
{
public static class CloudTableExtensions
{
private const int MaxBatchSize = 100;

/// <summary>
/// <para>
/// Execute a batch transaction to the service. This method automatically chunks the batch request into chunks
Expand All @@ -30,6 +37,7 @@ public static class CloudTableExtensions
/// <param name="token">Cancellation token</param>
/// <returns>List of <see cref="Response"/> for each items</returns>
// TODO Replace this with real transactional execution if Azure Table Storage supports it in the future.
[InternalApi]
public static async Task<IReadOnlyList<Response>> ExecuteBatchAsLimitedBatches(
this TableClient table,
List<TableTransactionAction> batch,
Expand All @@ -43,11 +51,52 @@ public static async Task<IReadOnlyList<Response>> ExecuteBatchAsLimitedBatches(

var result = new List<Response>();
var limitedBatchOperationLists = batch.ChunkBy(MaxBatchSize);
foreach (var limitedBatchOperationList in limitedBatchOperationLists)

for (var i = 0; i < limitedBatchOperationLists.Count; i++)
{
var limitedBatchResponse = await table.SubmitTransactionAsync(limitedBatchOperationList, token);
result.AddRange(limitedBatchResponse.Value);
try
{
var limitedBatchOperationList = limitedBatchOperationLists[i];
var limitedBatchResponse = await table.SubmitTransactionAsync(limitedBatchOperationList, token);
result.AddRange(limitedBatchResponse.Value);
}
catch (Exception ex)
{
var failedBatch = limitedBatchOperationLists[i].ToArray();
var sb = new StringBuilder("Failed to execute transaction batch operation");

TableTransactionAction? failedAction;
if (failedBatch.Length == 1)
failedAction = failedBatch[0];
else if (ex is TableTransactionFailedException { FailedTransactionActionIndex: not null } transactionEx)
{
var batchIndex = transactionEx.FailedTransactionActionIndex.Value;
failedAction = failedBatch[batchIndex];

sb.Append($" while processing batch index {batchIndex}");
}
else
failedAction = null;

if (failedAction is null)
throw new PersistenceOperationException(sb.ToString(), ex);

sb.Append($", action type: {failedAction.ActionType}");
var entity = (TableEntity)failedAction.Entity;
sb.Append($", persistence id: {entity.PartitionKey}");
sb.Append($", row key: {entity.RowKey}");

if (entity.RowKey == HighestSequenceNrEntry.RowKeyValue)
sb.Append($", {HighestSequenceNrEntry.HighestSequenceNrKey}: {entity[HighestSequenceNrEntry.HighestSequenceNrKey]}");

if(entity.ContainsKey(PersistentJournalEntry.SeqNoKeyName))
sb.Append($", sequence number: {entity.GetInt64(PersistentJournalEntry.SeqNoKeyName)}");

if(entity.ContainsKey(PersistentJournalEntry.ManifestKeyName))
sb.Append($", manifest: {entity.GetString(PersistentJournalEntry.ManifestKeyName)}");

throw new PersistenceOperationException(sb.ToString(), ex);
}
}

return result;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;

namespace Akka.Persistence.Azure.Journal;

public class PersistenceOperationException: Exception
{
public PersistenceOperationException(string message) : base(message)
{
}

public PersistenceOperationException(string message, Exception innerException) : base(message, innerException)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ internal sealed class PersistentJournalEntry
{
public const string TagsKeyName = "tags";
public const string UtcTicksKeyName = "utcTicks";
private const string ManifestKeyName = "manifest";
public const string ManifestKeyName = "manifest";
private const string PayloadKeyName = "payload";
private const string SeqNoKeyName = "seqno";
public const string SeqNoKeyName = "seqno";

public PersistentJournalEntry(TableEntity entity)
{
Expand Down