Skip to content

[Persistence] AllEvents Query appears to be missing some events #5824

@SeanKilleen

Description

@SeanKilleen

Version Information

  • Akka.NET: 1.4.34
  • Akka.Persistence.Query: 1.4.34
  • Akka.Persistence.Query.SQL: 1.4.34
  • Akka.Persistence.SqlServer: 1.4.32
  • SQL Server: Docker image server:2019-latest via wsl2

Describe the bug

An AllEvents query does not appear to replay all the events.

I have a persistence query to read everything:

readJournal.AllEvents(Offset.NoOffset()).RunForeach(env => writer.Tell(env), materialize);

My persistence-related HOCON:

akka.persistence{
	journal {
	        plugin = "akka.persistence.journal.sql-server"
		sql-server {
			# qualified type name of the SQL Server persistence journal actor
			class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer"

			# dispatcher used to drive journal actor
			plugin-dispatcher = "akka.actor.default-dispatcher"

			# connection string used for database access
			connection-string = "Server=localhost;Database=master;User Id=sa;Password=REDACTED"

			# default SQL commands timeout
			connection-timeout = 30s

			# SQL server schema name to table corresponding with persistent journal
			schema-name = dbo

			# SQL server table corresponding with persistent journal
			table-name = EventJournal

			# should corresponding journal table be initialized automatically
			auto-initialize = on

			# timestamp provider used for generation of journal entries timestamps
			timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"

			# metadata table
			metadata-table-name = Metadata
			
			# Recommended: change default circuit breaker settings
			# By uncommenting below and using Connection Timeout + Command Timeout
			# circuit-breaker.call-timeout=30s
		}
	}

	snapshot-store {
	        plugin = "akka.persistence.snapshot-store.sql-server"
		sql-server {

			# qualified type name of the SQL Server persistence journal actor
			class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer"

			# dispatcher used to drive journal actor
			plugin-dispatcher = ""akka.actor.default-dispatcher""

			# connection string used for database access
			connection-string = "Server=localhost;Database=master;User Id=sa;Password=REDACTED"

			# default SQL commands timeout
			connection-timeout = 30s

			# SQL server schema name to table corresponding with persistent journal
			schema-name = dbo

			# SQL server table corresponding with persistent journal
			table-name = SnapshotStore

			# should corresponding journal table be initialized automatically
			auto-initialize = on
			
			# Recommended: change default circuit breaker settings
			# By uncommenting below and using Connection Timeout + Command Timeout
			# circuit-breaker.call-timeout=30s
		}
	}
	query {
		my-read-journal {
				# Implementation class of the SQL ReadJournalProvider
				class = "Akka.Persistence.Query.Sql.SqlReadJournalProvider, Akka.Persistence.Query.Sql"
  
				# Absolute path to the write journal plugin configuration entry that this 
				# query journal will connect to. 
				# If undefined (or "") it will connect to the default journal as specified by the
				# akka.persistence.journal.plugin property.
				write-plugin = ""
  
				# The SQL write journal is notifying the query side as soon as things
				# are persisted, but for efficiency reasons the query side retrieves the events 
				# in batches that sometimes can be delayed up to the configured `refresh-interval`.
				refresh-interval = 3s
  
				# How many events to fetch in one query (replay) and keep buffered until they
				# are delivered downstreams.
				max-buffer-size = 10000				
		}
	}

Within the actor I have:

            ReceiveAsync<EventEnvelope>(async message =>
            {
                // ....
                capturedEvents++;
                _logger.Warning("RECEIVED EVENT # {EventNumber} in my processing. PersistenceId {PersistenceId} Sequence number {SequenceNumber}", capturedEvents, message.PersistenceId, message.SequenceNr);
                // ....
           }

When running this, I noticed that my logger (logged out to Seq) showed that only a certain event number was processed. When querying the EventJournal table, I noticed there were more events and it contained persistenceIds or some events for those IDs that appeared to be "missing".

Reproduction

I have a private repo that I am willing to invite you to and help configure for your testing if it would be helpful. It will eventually be a public repo.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions