Skip to content

Conversation

Arkatufus
Copy link
Contributor

@Arkatufus Arkatufus commented Jan 17, 2022

Fixes #5493

Changes

  • Adds custom persistence provider page to the documentation
  • Add sample project for code linking in documentation page

Copy link
Contributor

@eaba eaba left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Arkatufus src/example is the right folder for the custom persistence provider code!

@@ -0,0 +1,511 @@
# Writing A Custom Akka.Persistence Provider
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing uid:

---
uid: custom-persistent-provider
title: Custom Persistent Provider
---


All of the code examples in this documentation will assume a Sqlite database and the `Journal` table schema we will be using are:

```sqlite
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[!code-sql[Main](../../../src/examples/Akka.Persistence.Custom/Journal/SqliteJournal.cs?range=29-38,41-44)]?


#### ReplayMessagesAsync

```c#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[!code-csharp[Main](../../../src/examples/Akka.Persistence.Custom/Journal/SqliteJournal.cs#L169-175)]


`ByPersistenceIdSql` in this example refers to this SQL query statement:

```sqlite
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[!code-sql[Main](../../../src/examples/Akka.Persistence.Custom/Journal/SqliteJournal.cs#L47-57)]

```

Semi pseudo-code example:
```c#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[!code-csharp[Main](../../../src/examples/Akka.Persistence.Custom/Journal/SqliteJournal.cs#L169-222)]


#### DeleteMessagesToAsync

```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[!code-csharp[Main](../../../src/examples/Akka.Persistence.Custom/Journal/SqliteJournal.cs#L329)]


`DeleteBatchSql` in this example refers to this SQL query statement:

```sqlite
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[!code-sql[Main](../../../src/examples/Akka.Persistence.Custom/Journal/SqliteJournal.cs#L85-88)]


`HighestSequenceNrSql` in this example refers to this SQL query statement:

```sqlite
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did not find this in the source. sql instead of sqlite


`UpdateSequenceNrSql` in this example refers to this SQL query statement:

```sqlite
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[!code-sql[Main](../../../src/examples/Akka.Persistence.Custom/Journal/SqliteJournal.cs#L91-92)]

VALUES (@PersistenceId, @SequenceNr);
```

```c#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[!code-csharp[Main](../../../src/examples/Akka.Persistence.Custom/Journal/SqliteJournal.cs#L329-387)]

protected sealed override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages)
{
// For each of the atomic write request, create an async Task
var writeTasks = messages.Select(async message =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Questions:

  • What is the risk that this task spawning pattern writes messages out of order? (In theory this may not be a big deal, but could lead to interesting edge cases)

  • What is the risk of this pattern hammering the datasource with open connections and causing issues on scale?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Arkatufus when we were discussing this documentation, we had raised the issue of having a "caveats and pitfalls" section - I think these two questions definitely belong in there: thinking about operation order (how does Akka.Persistence manage this for you vs. what do you need to worry about yourself) and connection management.

The built-in Akka.Persistence fundamentals should take care of the former (AtomicWrite is ordered and all operations in the AtomicWrite should be written in the same batch) but that semantic needs to be explained so an implementer can do this correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is an interesting question, in theory, there is no guarantee that each AtomicWrite Task would be written in order, especially if one of them in the middle failed to be written to the datasource, the ordering would be screwed. @Aaronontheweb, do you have any input on this?

For the second question, I really have no answer. I would need to analyze the call chain and see if there are any optimization such as batching upstream of this method, I have a feeling that you're correct, that this method is being called for each produced message, but I would have to confirm it first.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when a PersistentActor writes multiple events via PersistAll - all of those events end up inside the same AtomicWrite; they're considered to be an atomic transaction from the point of view of the actor who wrote them.

The journal is supposed to process all of the IPersistentRepresentation (I think) objects that are included in the AtomicWrite in a single transaction to preserve atomicity. If there are multiple AtomicWrites to process at once the order doesn't matter since they all came from different actors or the same actor using PersistAsync - if writes complete in different orders coming from the same actor I believe the Resequencer actor, built into the journal, will unroll the order in which the WriteMessageSuccess messages are delivered back to the original actor based on the sequence numbers that were generated prior to the write.

So the amount of things the implementor has to worry about is relatively small - the biggest one is making sure they don't group multiple AtomicWrites into the same transaction. There are some implementations which explicitly break these atomicity guarantees, such as the BatchingSqlJournal, but that should be done only while explicitly making a trade off in the name of performance or throughput - not simply because the implementor didn't know better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the second question, I really have no answer. I would need to analyze the call chain and see if there are any optimization such as batching upstream of this method, I have a feeling that you're correct, that this method is being called for each produced message, but I would have to confirm it first.

Don't worry about this specific implementation. Worry about the general advice you're to give all implementors about connection management.

Copy link
Member

@to11mtm to11mtm Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is an interesting question, in theory, there is no guarantee that each AtomicWrite Task would be written in order, especially if one of them in the middle failed to be written to the datasource, the ordering would be screwed. @Aaronontheweb, do you have any input on this?

For the second question, I really have no answer. I would need to analyze the call chain and see if there are any optimization such as batching upstream of this method, I have a feeling that you're correct, that this method is being called for each produced message, but I would have to confirm it first.

From the Docs on asyncwritejournal

Ordering:

Journals should aim to persist events in-order for a given persistenceId as otherwise in case of a failure, the persistent state may be end up being inconsistent.

It's not a hard requirement, but it saves a lot of pain down the road, particularly if persistAsync is involved.

Atomicity:

All PersistentRepr of the AtomicWrite must be written to the data store atomically, i.e. all or none must be stored. If the journal (data store) cannot support atomic writes of multiple events it should reject such writes with a Try Failure
This limitation should also be documented by the journal plugin.

Blocking Sequence Reads on writes:

Please also note that requests for the highest sequence number may be made concurrently to this call executing for the same persistenceId, in particular it is possible that a restarting actor tries to recover before its outstanding writes have completed. In the latter case it is highly desirable to defer reading the highest sequence number until all outstanding writes have completed, otherwise the may reuse sequence numbers.

Note there's some edge cases around the above, even if you get it right in the provider itself (e.x. too short a recovery timeout on cluster shards/singletons, and they somehow read before the previous system finished it's somehow-delayed write)

I do know that as far as ordering, in persistince-jdbc's batching they used mapAsync rather than mapAsyncUnordered, even though the atomicWrites are atomically grouped (via batchWeighted). I'm not certain whether there's still some edge cases with aggressive PersistAsync usage that could cause issues with this pattern (I think EventSourced handles this with it's batching).

Also, they handle the case of readHighestSequenceNr blocking for writes via a Future Well, here's the ported version since it may be more useful.

If there are multiple AtomicWrites to process at once the order doesn't matter since they all came from different actors or the same actor using PersistAsync

@Aaronontheweb So if you look at the ported code above, what's interesting is that it appears there is an implicit guarantee that for a given call to WriteMessagesAsync, all of the AtomicWrites will be the same persistenceId. here's the scala version for ref.

The implication of all this, is for a given call to WriteMessagesAsync, you're best off doing one of the following to keep things as 'Akka' as possible (i.e. fail fast in a recoverable way):

  • Treating all AtomicWrites in that call as a single transacted write to the store
    • very easy to implement for the example IMO
    • As noted above, basically what persistence-jdbc and Persistence.Linq2Db do.
  • Ensuring that all AtomicWrites in a given call to WriteMessagesAsync are chunked into an AtomicWriteGroup such that:
    • A chunk contains 1 or more AtomicWrites from the group
    • An AtomicWrite is only inside one chunk
    • Chunks are processed in-order
    • As soon as one chunk fails, all subsequent chunks in that Group ignored.

Obviously the second option is a good bit more complicated, but I threw it out there for others who may want to roll their own.

The risk if you don't chunk/bailout or transact the whole atomicwrite group, is Journal corruption when using PersistAsync and/or PersistAllAsync (if one call fails, but then later calls succeed).

The risk if you don't block the ReadHighestSequenceNrAsync calls, is under some extreme conditions you can wind up with bad journal recoveries.

@Aaronontheweb
Copy link
Member

@eaba are those sections of the Sqlite journal code that @Arkatufus could reference directly? We could decorate that code using // <tagname> // </tagname> blocks to target it.

@eaba
Copy link
Contributor

eaba commented Jan 18, 2022

@eaba are those sections of the Sqlite journal code that @Arkatufus could reference directly? We could decorate that code using // <tagname> // </tagname> blocks to target it.

// <tagname> // </tagname> will be a good approach. I was thinking about the future when more lines are added!

---
uid: custom-persistent-provider
title: Writing A Custom Persistent Provider
---
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


#### ReplayMessagesAsync

```c#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we leave this as it is or make use of the range selection though the line number may change in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the name of the function that needs to be implemented, the code block is used here just for formatting


#### ReadHighestSequenceNrAsync

```c#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we leave this as it is or make use of the range selection though the line number may change in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above


#### WriteMessagesAsync

```c#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we leave this as it is or make use of the range selection though the line number may change in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above


#### DeleteMessagesToAsync

```c#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we leave this as it is or make use of the range selection though the line number may change in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above


#### SaveAsync

```c#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we leave this as it is or make use of the range selection though the line number may change in the future?


#### DeleteAsync

```c#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we leave this as it is or make use of the range selection though the line number may change in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above


#### DeleteAsync

```c#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we leave this as it is or make use of the range selection though the line number may change in the future?


There are some HOCON settings that are by default loaded by the snapshot store base class and these can be overriden in your HOCON settings. The minimum HOCON settings that need to be defined for your custom plugin are:

```hocon
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the best option here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing, HOCON is a proprietary format, there are no CSS equivalent, no support in DocFX

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can still reference a HOCON file use a doc include - we do this in all of the default config docs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the support is limited to code line range? and since HOCON is a nested structure in nature, linking to a line of code inside a HOCON is a bit meaningless because you would not get all of the parent property names before and can actually lead to more confusion by the reader

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you'd have to either include an entire stand-alone HOCON file (which works) or wrap around a C# format string using line numbers. Separate file is probably best.

href: persistence/custom-persistence-provider.md
- name: Persistence Query
href: persistence/persistence-query.md
- name: Persistence Testing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are here, can we create toc.yml for the persistence section and use that here? For example: https://github.com/Arkatufus/akka.net/tree/master/docs/articles/intro/getting-started/toc.yml

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be out of scope of this PR, you're mixing 2 kinds of issues

@Arkatufus Arkatufus marked this pull request as ready for review January 24, 2022 16:14
@Aaronontheweb Aaronontheweb merged commit a5ef30f into akkadotnet:dev Jan 25, 2022

This call is protected with a circuit-breaker.

Calls to this method are serialized by the enclosing journal actor. If you spawn work in asynchronous tasks it is alright that they complete the futures in any order, but the actual writes for a specific persistenceId should be serialized to avoid issues such as events of a later write are visible to consumers (query side, or replay) before the events of an earlier write are visible. A `PersistentActor` will not send a new `WriteMessages` request before the previous one has been completed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% true - PersistAsync explicitly does this. The journal may save events out of order, but they're ACKed back to the caller in the original order in which they were sent. Since the caller sets the SeqNo, which determines the ordering, this operation is safe.


##### Batching

The batch is only for performance reasons, i.e. all messages don't have to be written atomically. Higher throughput can typically be achieved by using batch inserts of many records compared to inserting records one-by-one, but this aspect depends on the underlying data store and a journal implementation can implement it as efficiently as possible.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear what you're referring to here by "the Batch"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add documentation on making custom Akka.Persistence provider

4 participants