Skip to content

Commit

Permalink
pass DeleteTo in recovery failure instead of Journaller
Browse files Browse the repository at this point in the history
  • Loading branch information
Denys Fakhritdinov committed Nov 21, 2024
1 parent 60dc363 commit c356ab1
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.evolutiongaming.akkaeffect.persistence

import akka.actor.Actor
import akka.persistence.SnapshotSelectionCriteria
import cats.MonadThrow
import cats.effect.implicits.effectResourceOps
import cats.effect.{Async, Concurrent, Ref, Resource}
import cats.syntax.all.*
Expand All @@ -11,7 +10,6 @@ import com.evolutiongaming.akkaeffect.persistence.SeqNr
import com.evolutiongaming.catshelper.{Log, LogOf, ToFuture}

import java.time.Instant
import javax.naming.OperationNotSupportedException

object EventSourcedActorOf {

Expand Down Expand Up @@ -129,10 +127,9 @@ object EventSourcedActorOf {

case Left(error) =>
for {
_ <- log.error(s"recovery failed", error)
journaller <- eventStore.asNonAppendingJournaller().toResource
_ <- recovering.failed(error, journaller, snapshotStore.asSnapshotter)
receive <- error.raiseError[F, Receive[F, Envelope[Any], ActorOf.Stop]].toResource
_ <- log.error(s"recovery failed", error)
_ <- recovering.failed(error, eventStore.asDeleteTo, snapshotStore.asSnapshotter)
receive <- error.raiseError[F, Receive[F, Envelope[Any], ActorOf.Stop]].toResource
} yield receive
}
} yield receive
Expand Down Expand Up @@ -175,63 +172,53 @@ object EventSourcedActorOf {

implicit final private[evolutiongaming] class EventStoreOps[F[_], E](val store: EventStore[F, E]) extends AnyVal {

def asNonAppendingJournaller()(implicit F: MonadThrow[F]): F[Journaller[F, E]] =
new Journaller[F, E] {
def append = new Append[F, E] {
def asDeleteTo: DeleteEventsTo[F] = new DeleteEventsTo[F] {

def apply(events: Events[E]): F[F[SeqNr]] = notSupported.raiseError[F, F[SeqNr]]
def apply(seqNr: SeqNr): F[F[Unit]] = store.deleteTo(seqNr)

def notSupported = new OperationNotSupportedException("The journaller does not support append")

}
}

def deleteTo = new DeleteEventsTo[F] {
def asAppend(
actorCtx: ActorCtx[F],
seqNrRef: Ref[F, SeqNr],
)(implicit F: Concurrent[F], log: Log[F]): Append[F, E] =
new Append[F, E] {

def apply(events: Events[E]): F[F[SeqNr]] =
seqNrRef
.modify { seqNr =>
events.mapAccumulate(seqNr) {
case (seqNr0, event) =>
val seqNr1 = seqNr0 + 1
seqNr1 -> EventStore.Event(event, seqNr1)
}
}
.flatMap { events =>
def handleError(err: Throwable) = {
val from = events.values.head.head.seqNr
val to = events.values.last.last.seqNr
stopActor(from, to, err)
}
store
.save(events)
.onError(handleError)
.flatTap(_.onError(handleError))
}

def apply(seqNr: SeqNr): F[F[Unit]] = store.deleteTo(seqNr)
private def stopActor(from: SeqNr, to: SeqNr, error: Throwable): F[Unit] =
for {
_ <- log.error(s"failed to append events with seqNr range [$from .. $to], stopping actor", error)
_ <- actorCtx.stop
} yield {}

}
}.pure[F]
}

def asJournaller(actorCtx: ActorCtx[F], seqNr: SeqNr)(implicit F: Concurrent[F], log: Log[F]): F[Journaller[F, E]] =
for {
seqNrRef <- Ref[F].of(seqNr)
} yield new Journaller[F, E] {
val append = new Append[F, E] {

def apply(events: Events[E]): F[F[SeqNr]] =
seqNrRef
.modify { seqNr =>
events.mapAccumulate(seqNr) {
case (seqNr0, event) =>
val seqNr1 = seqNr0 + 1
seqNr1 -> EventStore.Event(event, seqNr1)
}
}
.flatMap { events =>
def handleError(err: Throwable) = {
val from = events.values.head.head.seqNr
val to = events.values.last.last.seqNr
stopActor(from, to, err)
}
store
.save(events)
.onError(handleError)
.flatTap(_.onError(handleError))
}

private def stopActor(from: SeqNr, to: SeqNr, error: Throwable): F[Unit] =
for {
_ <- log.error(s"failed to append events with seqNr range [$from .. $to], stopping actor", error)
_ <- actorCtx.stop
} yield {}

}

val deleteTo = new DeleteEventsTo[F] {

def apply(seqNr: SeqNr): F[F[Unit]] = store.deleteTo(seqNr)

}
val append = asAppend(actorCtx, seqNrRef)
val deleteTo = asDeleteTo
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ trait Recovering[F[_], S, E, +A] {
): Resource[F, A]

/** Called when recovery failed
*
* @param journaller
* of type [[DeleteEventsTo]] because instance of [[Journaller]] is not available in this case. [[Journaller]] can
* be created based on known [[SeqNr]], while its now known in case of failure.
*/
def failed(
cause: Throwable,
journaller: Journaller[F, E],
journaller: DeleteEventsTo[F],
snapshotter: Snapshotter[F, S],
): Resource[F, Unit]

Expand Down Expand Up @@ -90,7 +94,7 @@ object Recovering {

override def failed(
cause: Throwable,
journaller: Journaller[F, E],
journaller: DeleteEventsTo[F],
snapshotter: Snapshotter[F, S],
) = Resource.raiseError[F, Unit, Throwable](cause)
}
Expand Down Expand Up @@ -124,7 +128,7 @@ object Recovering {

override def failed(
cause: Throwable,
journaller: Journaller[F, E],
journaller: DeleteEventsTo[F],
snapshotter: Snapshotter[F, S],
) = Resource.raiseError[F, Unit, Throwable](cause)
}
Expand All @@ -143,7 +147,7 @@ object Recovering {
)(
transferred: (SeqNr, Journaller[F, E], Snapshotter[F, S]) => Resource[F, A],
)(
failed: (Throwable, Journaller[F, E], Snapshotter[F, S]) => Resource[F, Unit],
failed: (Throwable, DeleteEventsTo[F], Snapshotter[F, S]) => Resource[F, Unit],
): Recovering[F, S, E, A] = {

val replay1 = replay
Expand All @@ -169,7 +173,7 @@ object Recovering {

override def failed(
cause: Throwable,
journaller: Journaller[F, E],
journaller: DeleteEventsTo[F],
snapshotter: Snapshotter[F, S],
) = failed1(cause, journaller, snapshotter)
}
Expand Down Expand Up @@ -215,7 +219,7 @@ object Recovering {

override def failed(
cause: Throwable,
journaller: Journaller[F, E],
journaller: DeleteEventsTo[F],
snapshotter: Snapshotter[F, S],
) = failed1
}
Expand Down Expand Up @@ -253,12 +257,11 @@ object Recovering {

override def failed(
cause: Throwable,
journaller: Journaller[F, E1],
journaller: DeleteEventsTo[F],
snapshotter: Snapshotter[F, S1],
) = {
val journaller1 = journaller.convert(ef)
val snapshotter1 = snapshotter.convert(sf)
self.failed(cause, journaller1, snapshotter1)
self.failed(cause, journaller, snapshotter1)
}

}
Expand All @@ -281,7 +284,7 @@ object Recovering {

override def failed(
cause: Throwable,
journaller: Journaller[F, E],
journaller: DeleteEventsTo[F],
snapshotter: Snapshotter[F, S],
) = self.failed(cause, journaller, snapshotter)

Expand All @@ -307,7 +310,7 @@ object Recovering {

override def failed(
cause: Throwable,
journaller: Journaller[F, E],
journaller: DeleteEventsTo[F],
snapshotter: Snapshotter[F, S],
) = self.failed(cause, journaller, snapshotter)
}
Expand Down Expand Up @@ -338,7 +341,7 @@ object Recovering {

override def failed(
cause: Throwable,
journaller: Journaller[F, E1],
journaller: DeleteEventsTo[F],
snapshotter: Snapshotter[F, S1],
) = self.failed(cause, journaller, snapshotter)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import cats.effect.unsafe.implicits.global
import cats.syntax.all.*
import com.evolutiongaming.akkaeffect.IOSuite.*
import com.evolutiongaming.akkaeffect.persistence.InstrumentEventSourced.Action
import com.evolutiongaming.akkaeffect.persistence.SeqNr
import com.evolutiongaming.akkaeffect.testkit.Probe
import com.evolutiongaming.akkaeffect.{ActorSuite, *}
import com.evolutiongaming.catshelper.CatsHelper.*
Expand Down Expand Up @@ -66,6 +67,10 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher
`recoveryCompleted stops`[IO](actorSystem).run()
}

test("recoveryFailed") {
`recoveryFailed`[IO](actorSystem).run()
}

test("append many") {
`append many`[IO](actorSystem).run()
}
Expand Down Expand Up @@ -986,6 +991,90 @@ class EventSourcedActorOfTest extends AsyncFunSuite with ActorSuite with Matcher
} yield {}
}

private def `recoveryFailed`[F[_]: Async: ToFuture: FromFuture: ToTry: LogOf](
actorSystem: ActorSystem,
): F[Unit] = {

val actorRefOf = ActorRefOf.fromActorRefFactory[F](actorSystem)

type S = Unit
type C = Any
type E = Int

def eventSourced[A](value: A) = EventSourced(EventSourcedId("5.1"), value = value)

val recoveryError = new IllegalArgumentException("test error: cannot apply event on state")

def eventSourcedOf(
lock: Deferred[F, Unit],
stopped: Deferred[F, Unit],
) =
EventSourcedOf[F] { actorCtx =>
val recoveryStarted =
RecoveryStarted
.const {
Recovering[S] {
Replay
.const[E](recoveryError.raiseError[F, Unit])
.pure[Resource[F, *]]
} {
case (_, _, _) =>
Resource
.eval(Async[F].delay(fail("Recovering.completed should not be called")))
.as(Receive.const[Envelope[C]](false.pure[F]))
} {
case (_, _, _) =>
Resource
.eval(Async[F].delay(fail("Recovering.transferred should not be called")))
.as(Receive.const[Envelope[C]](false.pure[F]))
} {
case (_, _, _) =>
Resource
.make(lock.get productR actorCtx.stop)(_ => stopped.complete(()).void)
.void
}.pure[Resource[F, *]]
}
.pure[Resource[F, *]]
eventSourced[
Resource[F, RecoveryStarted[F, S, E, Receive[F, Envelope[Any], ActorOf.Stop]]],
](recoveryStarted).pure[F]
}

for {
lock <- Deferred[F, Unit]
stopped <- Deferred[F, Unit]
actions <- Ref[F].of(List.empty[Action[S, C, E]])
eventSourcedOf <- InstrumentEventSourced(actions, eventSourcedOf(lock, stopped))
.typeless(_.castM[F, S], _.castM[F, E], _.pure[F])
.pure[F]
persistence <- persistence[F].pure[F]
eventStore <- persistence.eventStore(eventSourced {})
seqNr <- eventStore.save(Events.of(EventStore.Event(42, 1L))).flatten
actorEffect = EventSourcedActorEffect.of(actorRefOf, eventSourcedOf, persistence)
actorEffect <- actorEffect.allocated.map { case (actorEffect, _) => actorEffect }
_ <- Probe.of(actorRefOf).use { probe =>
for {
terminated <- probe.watch(actorEffect.toUnsafe)
_ <- lock.complete(())
_ <- terminated
} yield {}
}
_ <- stopped.get
_ <- Async[F].sleep(10.millis) // Make sure all actions are performed first
actions <- actions.get
_ = actions.reverse shouldEqual List(
Action.Created(EventSourcedId("5.1"), akka.persistence.Recovery(), PluginIds.Empty),
Action.Started,
Action.RecoveryAllocated(0L, none),
Action.ReplayAllocated,
Action.ReplayReleased,
Action.RecoveryFailed(recoveryError),
Action.RecoveryReleased,
Action.Released,
)
} yield {}
}

private def `append many`[F[_]: Async: ToFuture: FromFuture: ToTry: LogOf](
actorSystem: ActorSystem,
): F[Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ object InstrumentEventSourced {
snapshotOffer.copy(metadata = metadata)
}

def instrumentedDeleteEventsTo(deleteTo: DeleteEventsTo[F]) =
new Instrument with DeleteEventsTo[F] {
def apply(seqNr: SeqNr) =
for {
_ <- record(Action.DeleteEventsTo(seqNr))
a <- deleteTo(seqNr)
_ <- record(Action.DeleteEventsToOuter)
} yield for {
a <- a
_ <- record(Action.DeleteEventsToInner)
} yield a
}

def instrumentedJournaller(journaller: Journaller[F, E]) =
new Instrument with Journaller[F, E] {

Expand Down Expand Up @@ -145,7 +158,7 @@ object InstrumentEventSourced {
} yield instrumentedReceive(receive)
} {
case (error, journaller, snapshotter) =>
val journaller1 = instrumentedJournaller(journaller)
val journaller1 = instrumentedDeleteEventsTo(journaller)
val snapshotter1 = instrumentedSnapshotter(snapshotter)

for {
Expand Down

0 comments on commit c356ab1

Please sign in to comment.