Skip to content

Commit

Permalink
Provide API that allow user defined action in case of recovery failure
Browse files Browse the repository at this point in the history
  • Loading branch information
Denys Fakhritdinov committed Nov 19, 2024
1 parent 44db0ed commit 4e9ad3f
Show file tree
Hide file tree
Showing 7 changed files with 670 additions and 461 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import com.evolutiongaming.akkaeffect.persistence.SeqNr
import com.evolutiongaming.catshelper.{Log, LogOf, ToFuture}

import java.time.Instant
import javax.naming.OperationNotSupportedException
import cats.MonadThrow

object EventSourcedActorOf {

Expand Down Expand Up @@ -91,33 +93,48 @@ object EventSourcedActorOf {

replay = recovering.replay

seqNr <- replay.use { replay =>
// used to recover snapshot, i.e. the snapshot stored with [[snapSeqNr]] will be loaded, if any
val snapSeqNr = snapshot.map(_.metadata.seqNr).getOrElse(SeqNr.Min)
// used to recover events _following_ the snapshot OR if no snapshot available then [[SeqNr.Min]]
val fromSeqNr = snapshot.map(_.metadata.seqNr + 1).getOrElse(SeqNr.Min)
for {
_ <- log.debug(s"snapshot seqNr: $snapSeqNr, load events from seqNr: $fromSeqNr").allocated
events <- eventStore.events(fromSeqNr)
seqNrL <- events.foldWhileM(snapSeqNr) {
case (_, EventStore.Event(event, seqNr)) => replay(event, seqNr).as(seqNr.asLeft[Unit])
case (_, EventStore.HighestSeqNr(seqNr)) => seqNr.asLeft[Unit].pure[F]
}
seqNr <- seqNrL match {
case Left(seqNr) => seqNr.pure[F]
case Right(_) =>
// function used in events.foldWhileM always returns Left
// and sstream.Stream.foldWhileM returns Right only if passed function do so
// thus makes getting Right here impossible
new IllegalStateException("should never happened").raiseError[F, SeqNr]
}
} yield seqNr
}.toResource

_ <- log.debug(s"recovery completed with seqNr $seqNr")
journaller <- eventStore.asJournaller(actorCtx, seqNr).toResource
context = Recovering.RecoveryContext(seqNr, journaller, snapshotStore.asSnapshotter)
receive <- recovering.completed(context)
seqNr <- replay
.use { replay =>
// used to recover snapshot, i.e. the snapshot stored with [[snapSeqNr]] will be loaded, if any
val snapSeqNr = snapshot.map(_.metadata.seqNr).getOrElse(SeqNr.Min)
// used to recover events _following_ the snapshot OR if no snapshot available then [[SeqNr.Min]]
val fromSeqNr = snapshot.map(_.metadata.seqNr + 1).getOrElse(SeqNr.Min)
for {
_ <- log.debug(s"snapshot seqNr: $snapSeqNr, load events from seqNr: $fromSeqNr").allocated
events <- eventStore.events(fromSeqNr)
seqNrL <- events.foldWhileM(snapSeqNr) {
case (_, EventStore.Event(event, seqNr)) => replay(event, seqNr).as(seqNr.asLeft[Unit])
case (_, EventStore.HighestSeqNr(seqNr)) => seqNr.asLeft[Unit].pure[F]
}
seqNr <- seqNrL match {
case Left(seqNr) => seqNr.pure[F]
case Right(_) =>
// function used in events.foldWhileM always returns Left
// and sstream.Stream.foldWhileM returns Right only if passed function do so
// thus makes getting Right here impossible
new IllegalStateException("should never happened").raiseError[F, SeqNr]
}
} yield seqNr
}
.toResource
.attempt

receive <- seqNr match {
case Right(seqNr) =>
for {
_ <- log.debug(s"recovery completed with seqNr $seqNr")
journaller <- eventStore.asJournaller(actorCtx, seqNr).toResource
receive <- recovering.completed(seqNr, journaller, snapshotStore.asSnapshotter)
} yield receive

case Left(error) =>
for {
_ <- log.error(s"recovery failed", error)
journaller <- eventStore.asNonAppendingJournaller().toResource
_ <- recovering.failed(error, journaller, snapshotStore.asSnapshotter)
receive <- error.raiseError[F, Receive[F, Envelope[Any], ActorOf.Stop]].toResource
} yield receive
}
} yield receive

receive.onError {
Expand Down Expand Up @@ -158,6 +175,23 @@ object EventSourcedActorOf {

implicit final private[evolutiongaming] class EventStoreOps[F[_], E](val store: EventStore[F, E]) extends AnyVal {

def asNonAppendingJournaller()(implicit F: MonadThrow[F]): F[Journaller[F, E]] =
new Journaller[F, E] {
def append = new Append[F, E] {

def apply(events: Events[E]): F[F[SeqNr]] = notSupported.raiseError[F, F[SeqNr]]

def notSupported = new OperationNotSupportedException("The journaller does not support append")

}

def deleteTo = new DeleteEventsTo[F] {

def apply(seqNr: SeqNr): F[F[Unit]] = store.deleteTo(seqNr)

}
}.pure[F]

def asJournaller(actorCtx: ActorCtx[F], seqNr: SeqNr)(implicit F: Concurrent[F], log: Log[F]): F[Journaller[F, E]] =
for {
seqNrRef <- Ref[F].of(seqNr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ private[akkaeffect] object Persistence {
) = {
val receive = for {
recovering <- recoveryStarted(seqNr, none)
context = Recovering.RecoveryContext(seqNr, journaller, snapshotter)
receive <- recovering.completed(context)
receive <- recovering.completed(seqNr, journaller, snapshotter)
} yield Persistence.receive[F, S, E, C](receive)
receive.toReleasable
}
Expand Down Expand Up @@ -114,9 +113,8 @@ private[akkaeffect] object Persistence {
.foldMapM(_.release)
.toResource
.productR {
val context = Recovering.RecoveryContext(seqNr, journaller, snapshotter)
recovering
.completed(context)
.completed(seqNr, journaller, snapshotter)
.map(receive => Persistence.receive[F, S, E, C](receive))
}
.toReleasable
Expand Down
Loading

0 comments on commit 4e9ad3f

Please sign in to comment.