Skip to content

Commit

Permalink
feat: Fill gaps in EventWriter via message instead of config (#32237)
Browse files Browse the repository at this point in the history
* makes it possible to use same EventWriter for both types of producers
* and we don't need to document that the config has to be set
  • Loading branch information
patriknw committed Nov 28, 2023
1 parent 95d7210 commit 82586be
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ class EventWriterFillGapsSpec
with AnyWordSpecLike
with LogCapturing {

val settings = EventWriter.EventWriterSettings(
10,
5.seconds,
fillSequenceNumberGaps = true,
latestSequenceNumberCacheCapacity = 100)
val settings = EventWriter.EventWriterSettings(10, 5.seconds, latestSequenceNumberCacheCapacity = 100)
implicit val ec: ExecutionContext = testKit.system.executionContext

"The event writer" should {
Expand Down Expand Up @@ -344,7 +340,15 @@ class EventWriterFillGapsSpec
(1 to 1000).map { pidN =>
Future {
for (n <- 1 to 20) {
writer ! EventWriter.Write(s"A|pid$pidN", n.toLong, n.toString, false, None, Set.empty, probe.ref)
writer ! EventWriter.Write(
s"A|pid$pidN",
n.toLong,
n.toString,
isSnapshotEvent = false,
fillSequenceNumberGaps = true,
None,
Set.empty,
probe.ref)
}
}
}
Expand All @@ -365,7 +369,15 @@ class EventWriterFillGapsSpec
else false

if (!gap)
writer ! EventWriter.Write(s"B|pid$pidN", n.toLong, n.toString, false, None, Set.empty, probe.ref)
writer ! EventWriter.Write(
s"B|pid$pidN",
n.toLong,
n.toString,
isSnapshotEvent = false,
fillSequenceNumberGaps = true,
None,
Set.empty,
probe.ref)
}
}
}
Expand All @@ -381,11 +393,19 @@ class EventWriterFillGapsSpec
val writer = spawn(EventWriter(fakeJournal.ref, settings))
val clientProbe = createTestProbe[StatusReply[EventWriter.WriteAck]]()
def sendWrite(seqNr: Long, pid: String = pid1): Unit = {
writer ! EventWriter.Write(pid, seqNr, seqNr.toString, false, None, Set.empty, clientProbe.ref)
writer ! EventWriter.Write(
pid,
seqNr,
seqNr.toString,
isSnapshotEvent = false,
fillSequenceNumberGaps = true,
None,
Set.empty,
clientProbe.ref)
}
def journalAckWrite(pid: String = pid1, expectedSequenceNumbers: Vector[Long] = Vector.empty): Int = {
val write = fakeJournal.expectMessageType[JournalProtocol.WriteMessages]
write.messages should have size (1)
write.messages should have size 1
val atomicWrite = write.messages.head.asInstanceOf[AtomicWrite]

val seqNrs =
Expand All @@ -402,7 +422,7 @@ class EventWriterFillGapsSpec

def journalFailWrite(reason: String, pid: String = pid1): Int = {
val write = fakeJournal.expectMessageType[JournalProtocol.WriteMessages]
write.messages should have size (1)
write.messages should have size 1
val atomicWrite = write.messages.head.asInstanceOf[AtomicWrite]
atomicWrite.payload.foreach { repr =>
repr.persistenceId should ===(pid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ object EventWriterSpec {

class EventWriterSpec extends ScalaTestWithActorTestKit(EventWriterSpec.config) with AnyWordSpecLike with LogCapturing {

private val settings = EventWriter.EventWriterSettings(
10,
5.seconds,
fillSequenceNumberGaps = false,
latestSequenceNumberCacheCapacity = 1000)
private val settings = EventWriter.EventWriterSettings(10, 5.seconds, latestSequenceNumberCacheCapacity = 1000)
implicit val ec: ExecutionContext = testKit.system.executionContext

"The event writer" should {
Expand Down Expand Up @@ -151,7 +147,7 @@ class EventWriterSpec extends ScalaTestWithActorTestKit(EventWriterSpec.config)
(1 to 1000).map { pidN =>
Future {
for (n <- 1 to 20) {
writer ! EventWriter.Write(s"A|pid$pidN", n.toLong, n.toString, false, None, Set.empty, probe.ref)
writer ! EventWriter.Write(s"A|pid$pidN", n.toLong, n.toString, false, false, None, Set.empty, probe.ref)
}
}
}
Expand Down Expand Up @@ -326,7 +322,15 @@ class EventWriterSpec extends ScalaTestWithActorTestKit(EventWriterSpec.config)
else false

if (!gap)
writer ! EventWriter.Write(s"B|pid$pidN", n.toLong, n.toString, snapshot, None, Set.empty, probe.ref)
writer ! EventWriter.Write(
s"B|pid$pidN",
n.toLong,
n.toString,
snapshot,
false,
None,
Set.empty,
probe.ref)
}
}
}
Expand All @@ -343,15 +347,23 @@ class EventWriterSpec extends ScalaTestWithActorTestKit(EventWriterSpec.config)
val writer = spawn(EventWriter(fakeJournal.ref, settings))
val clientProbe = createTestProbe[StatusReply[EventWriter.WriteAck]]()
def sendWrite(seqNr: Long, pid: String = pid1): Unit = {
writer ! EventWriter.Write(pid, seqNr, seqNr.toString, false, None, Set.empty, clientProbe.ref)
writer ! EventWriter.Write(pid, seqNr, seqNr.toString, false, false, None, Set.empty, clientProbe.ref)
}
def sendSnapshotWrite(seqNr: Long, pid: String = pid1): Unit = {
writer ! EventWriter.Write(pid, seqNr, seqNr.toString, isSnapshotEvent = true, None, Set.empty, clientProbe.ref)
writer ! EventWriter.Write(
pid,
seqNr,
seqNr.toString,
isSnapshotEvent = true,
false,
None,
Set.empty,
clientProbe.ref)
}

def journalAckWrite(pid: String = pid1, expectedSequenceNumbers: Vector[Long] = Vector.empty): Int = {
val write = fakeJournal.expectMessageType[JournalProtocol.WriteMessages]
write.messages should have size (1)
write.messages should have size 1
val atomicWrite = write.messages.head.asInstanceOf[AtomicWrite]

val seqNrs =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# internal
ProblemFilters.exclude[Problem]("akka.persistence.typed.internal.EventWriter*")
4 changes: 1 addition & 3 deletions akka-persistence-typed/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ akka.persistence.typed {
# The event-writer occasionally needs to ask the journal about highest sequence number to handle duplicate
# writes, this timeout is for that interaction
ask-timeout = 20s
# Detect gaps in sequence numbers and fill them with FilteredPayload
fill-sequence-number-gaps = off
# When fill-sequence-number-gaps is enabled it will keep latest sequence
# When fillSequenceNumberGaps is enabled it will keep latest sequence
# number in memory for this many persistence ids.
latest-sequence-number-cache-capacity = 1000
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,13 @@ private[akka] object EventWriter {
EventWriterSettings(
maxBatchSize = config.getInt("max-batch-size"),
askTimeout = config.getDuration("ask-timeout").asScala,
fillSequenceNumberGaps = config.getBoolean("fill-sequence-number-gaps"),
latestSequenceNumberCacheCapacity = config.getInt("latest-sequence-number-cache-capacity"))
}

}
final case class EventWriterSettings(
maxBatchSize: Int,
askTimeout: FiniteDuration,
fillSequenceNumberGaps: Boolean,
latestSequenceNumberCacheCapacity: Int)

sealed trait Command
Expand All @@ -94,6 +92,7 @@ private[akka] object EventWriter {
sequenceNumber: SeqNr,
event: Any,
isSnapshotEvent: Boolean,
fillSequenceNumberGaps: Boolean,
metadata: Option[Any],
tags: Set[String],
replyTo: ActorRef[StatusReply[WriteAck]])
Expand All @@ -117,6 +116,7 @@ private[akka] object EventWriter {
writeErrorHandlingInProgress: Boolean = false,
currentTransactionId: Int = 0,
latestSeqNr: SeqNr = -1L,
fillSequenceNumberGaps: Boolean,
usedTimestamp: Long = 0L,
waitingForSeqNrLookup: Vector[(PersistentRepr, ActorRef[StatusReply[WriteAck]])] = Vector.empty) {
def idle: Boolean =
Expand Down Expand Up @@ -158,7 +158,6 @@ private[akka] object EventWriter {
var bypassCircuitBreaker = true // otherwise the duplicate key violations will flip the circuit breaker

implicit val askTimeout: Timeout = settings.askTimeout
import settings.fillSequenceNumberGaps

def sendToJournal(transactionId: Int, reprs: Vector[PersistentRepr]): Unit = {
if (context.log.isTraceEnabled)
Expand All @@ -180,7 +179,8 @@ private[akka] object EventWriter {
// more waiting replyTo before we could batch it or scrap the entry
perPidWriteState = perPidWriteState.updated(pid, newStateForPid)
} else {
if (newStateForPid.waitingForWrite.isEmpty && fillSequenceNumberGaps) {
// note that cache eviction is different for fillSequenceNumberGaps and isSnapshotEvent
if (newStateForPid.waitingForWrite.isEmpty && newStateForPid.fillSequenceNumberGaps) {
perPidWriteState = perPidWriteState.updated(pid, newStateForPid)
evictLeastRecentlyUsedPids()
} else if (newStateForPid.waitingForWrite.isEmpty) {
Expand Down Expand Up @@ -355,6 +355,7 @@ private[akka] object EventWriter {
repr: PersistentRepr,
replyTo: ActorRef[StatusReply[WriteAck]],
isSnapshotEvent: Boolean,
fillSequenceNumberGaps: Boolean,
calledAfterMaxSeqNr: Boolean): Unit = {
val persistenceId = repr.persistenceId
val sequenceNumber = repr.sequenceNr
Expand All @@ -364,29 +365,43 @@ private[akka] object EventWriter {
if ((fillSequenceNumberGaps || isSnapshotEvent) && sequenceNumber != 1L) {
val reason = if (isSnapshotEvent) AskMaxSeqNrReason.SnapshotEvent else AskMaxSeqNrReason.FillGaps
askMaxSeqNr(persistenceId, reason)
StateForPid(waitingForReply = Map.empty, waitingForSeqNrLookup = Vector((repr, replyTo)))
StateForPid(
waitingForReply = Map.empty,
waitingForSeqNrLookup = Vector((repr, replyTo)),
fillSequenceNumberGaps = fillSequenceNumberGaps)
} else {
sendToJournal(1, Vector(repr))
StateForPid(Map((repr.sequenceNr, (repr, replyTo))), currentTransactionId = 1)
StateForPid(
Map((repr.sequenceNr, (repr, replyTo))),
currentTransactionId = 1,
fillSequenceNumberGaps = fillSequenceNumberGaps)
}

case Some(state) =>
// For a given persistenceId the fillSequenceNumberGaps is not supposed to change,
// but we keep `true` if it has been set to `true`.
// isSnapshotEvent and fillSequenceNumberGaps are handled in very similar way, but there is a
// difference in the way latest sequence number is cached (see handleUpdatedStateForPid).
val newFillSequenceNumberGaps = fillSequenceNumberGaps || state.fillSequenceNumberGaps
val expectedSeqNr =
if (fillSequenceNumberGaps || isSnapshotEvent) state.nextExpectedSeqNr
if (newFillSequenceNumberGaps || isSnapshotEvent) state.nextExpectedSeqNr
else repr.sequenceNr

if (state.seqNrlookupInProgress) {
context.log.trace2(
"Seq nr lookup in progress for persistence id [{}], adding sequence nr [{}] to pending",
persistenceId,
sequenceNumber)
state.copy(waitingForSeqNrLookup = state.waitingForSeqNrLookup :+ ((repr, replyTo)))
state.copy(
waitingForSeqNrLookup = state.waitingForSeqNrLookup :+ ((repr, replyTo)),
fillSequenceNumberGaps = newFillSequenceNumberGaps)
} else if (sequenceNumber == expectedSeqNr) {
if (state.idle) {
sendToJournal(state.currentTransactionId + 1, Vector(repr))
state.copy(
waitingForReply = Map((repr.sequenceNr, (repr, replyTo))),
currentTransactionId = state.currentTransactionId + 1)
currentTransactionId = state.currentTransactionId + 1,
fillSequenceNumberGaps = newFillSequenceNumberGaps)
} else {
// write in progress for pid, add write to batch and perform once current write completes
if (state.waitingForWriteExceedingMaxBatchSize(settings.maxBatchSize)) {
Expand All @@ -399,7 +414,9 @@ private[akka] object EventWriter {
"Writing event in progress for persistence id [{}], adding sequence nr [{}] to batch",
persistenceId,
sequenceNumber)
state.copy(waitingForWrite = state.waitingForWrite :+ ((repr, replyTo)))
state.copy(
waitingForWrite = state.waitingForWrite :+ ((repr, replyTo)),
fillSequenceNumberGaps = newFillSequenceNumberGaps)
}
}
} else if (sequenceNumber < expectedSeqNr) {
Expand All @@ -408,7 +425,7 @@ private[akka] object EventWriter {
state
} else { // sequenceNumber > expectedSeqNr
require(
fillSequenceNumberGaps || isSnapshotEvent,
newFillSequenceNumberGaps || isSnapshotEvent,
s"Unexpected sequence number gap, expected [$expectedSeqNr], received [$sequenceNumber]. " +
"Enable akka.persistence.typed.event-writer.fill-sequence-number-gaps config if gaps are " +
"expected and should be filled with FilteredPayload.")
Expand Down Expand Up @@ -437,7 +454,8 @@ private[akka] object EventWriter {
(fillRepr.map(r => r.sequenceNr -> (r -> ignoreRef)) :+ (repr.sequenceNr -> (repr -> replyTo))).toMap
state.copy(
waitingForReply = newWaitingForReply,
currentTransactionId = state.currentTransactionId + 1)
currentTransactionId = state.currentTransactionId + 1,
fillSequenceNumberGaps = newFillSequenceNumberGaps)
} else {
if (context.log.isTraceEnabled)
context.log.traceN(
Expand All @@ -446,7 +464,9 @@ private[akka] object EventWriter {
fillRepr.head.sequenceNr,
sequenceNumber)
val newWaitingForWrite = fillRepr.map(_ -> ignoreRef) :+ (repr -> replyTo)
state.copy(waitingForWrite = state.waitingForWrite ++ newWaitingForWrite)
state.copy(
waitingForWrite = state.waitingForWrite ++ newWaitingForWrite,
fillSequenceNumberGaps = newFillSequenceNumberGaps)
}
} else {
// No pending writes or we haven't just looked up latest sequence nr.
Expand All @@ -458,7 +478,9 @@ private[akka] object EventWriter {
"Seq nr lookup needed for persistence id [{}], adding sequence nr [{}] to pending",
persistenceId,
sequenceNumber)
state.copy(waitingForSeqNrLookup = state.waitingForSeqNrLookup :+ ((repr, replyTo)))
state.copy(
waitingForSeqNrLookup = state.waitingForSeqNrLookup :+ ((repr, replyTo)),
fillSequenceNumberGaps = newFillSequenceNumberGaps)
}

}
Expand All @@ -467,7 +489,15 @@ private[akka] object EventWriter {
}

Behaviors.receiveMessage {
case Write(persistenceId, sequenceNumber, event, isSnapshotEvent, metadata, tags, replyTo) =>
case Write(
persistenceId,
sequenceNumber,
event,
isSnapshotEvent,
fillSequenceNumberGaps,
metadata,
tags,
replyTo) =>
val payload = if (tags.isEmpty) event else Tagged(event, tags)
val repr = PersistentRepr(
payload,
Expand All @@ -482,7 +512,7 @@ private[akka] object EventWriter {
case _ => repr
}

handleWrite(reprWithMeta, replyTo, isSnapshotEvent, calledAfterMaxSeqNr = false)
handleWrite(reprWithMeta, replyTo, isSnapshotEvent, fillSequenceNumberGaps, calledAfterMaxSeqNr = false)
Behaviors.same

case MaxSeqNrForPid(pid, maxSeqNr, AskMaxSeqNrReason.WriteFailure(errorDesc)) =>
Expand Down Expand Up @@ -562,6 +592,7 @@ private[akka] object EventWriter {
repr,
replyTo,
isSnapshotEvent = reason == AskMaxSeqNrReason.SnapshotEvent,
fillSequenceNumberGaps = reason == AskMaxSeqNrReason.FillGaps,
calledAfterMaxSeqNr = true)
}
}
Expand Down

0 comments on commit 82586be

Please sign in to comment.