Skip to content

Commit 2e7c291

Browse files
committed
Support initial state on first invocation
1 parent aa98f54 commit 2e7c291

File tree

12 files changed

+241
-23
lines changed

12 files changed

+241
-23
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/SunshineTests.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,16 @@ public override Task FlowIdCanBeExtractedFromAmbientState()
7171
[TestMethod]
7272
public override Task FlowIdCanBeExtractedFromAmbientStateAfterSuspension()
7373
=> FlowIdCanBeExtractedFromAmbientStateAfterSuspension(FunctionStoreFactory.Create());
74+
75+
[TestMethod]
76+
public override Task FuncCanBeCreatedWithInitialState()
77+
=> FuncCanBeCreatedWithInitialState(FunctionStoreFactory.Create());
78+
79+
[TestMethod]
80+
public override Task ActionCanBeCreatedWithInitialState()
81+
=> ActionCanBeCreatedWithInitialState(FunctionStoreFactory.Create());
82+
83+
[TestMethod]
84+
public override Task ParamlessCanBeCreatedWithInitialState()
85+
=> ParamlessCanBeCreatedWithInitialState(FunctionStoreFactory.Create());
7486
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/FunctionTests/SunshineTests.cs

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
using Cleipnir.ResilientFunctions.CoreRuntime.Invocation;
66
using Cleipnir.ResilientFunctions.Domain;
77
using Cleipnir.ResilientFunctions.Helpers;
8+
using Cleipnir.ResilientFunctions.Messaging;
9+
using Cleipnir.ResilientFunctions.Reactive.Extensions;
810
using Cleipnir.ResilientFunctions.Storage;
911
using Cleipnir.ResilientFunctions.Tests.TestTemplates.WatchDogsTests;
1012
using Cleipnir.ResilientFunctions.Tests.Utils;
@@ -542,4 +544,118 @@ public async Task FlowIdCanBeExtractedFromAmbientStateAfterSuspension(Task<IFunc
542544
storedId.ShouldBe(reg.MapToStoredId(instance));
543545
unhandledExceptionHandler.ShouldNotHaveExceptions();
544546
}
547+
548+
public abstract Task FuncCanBeCreatedWithInitialState();
549+
public async Task FuncCanBeCreatedWithInitialState(Task<IFunctionStore> storeTask)
550+
{
551+
var store = await storeTask;
552+
var flowId = TestFlowId.Create();
553+
554+
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
555+
556+
using var functionsRegistry = new FunctionsRegistry(store, new Settings(unhandledExceptionHandler.Catch));
557+
558+
string? initialEffectValue = null;
559+
string? initialMessageValue = null;
560+
561+
var registration = functionsRegistry
562+
.RegisterFunc(
563+
flowId.Type,
564+
async (string s, Workflow workflow) =>
565+
{
566+
initialEffectValue = await workflow.Effect.Get<string>("InitialEffectId");
567+
initialMessageValue = await workflow.Messages.OfType<string>().First();
568+
return s;
569+
});
570+
571+
572+
await registration.Invoke(
573+
flowInstance: "hello",
574+
param: "hello",
575+
initialState: new InitialState(
576+
Messages: [new MessageAndIdempotencyKey("InitialMessage")],
577+
Effects: [new InitialEffect(Id: "InitialEffectId", Value: "InitialEffectValue", Exception: null)]
578+
)
579+
);
580+
581+
initialEffectValue.ShouldBe("InitialEffectValue");
582+
initialMessageValue.ShouldBe("InitialMessage");
583+
584+
unhandledExceptionHandler.ShouldNotHaveExceptions();
585+
}
586+
587+
public abstract Task ActionCanBeCreatedWithInitialState();
588+
public async Task ActionCanBeCreatedWithInitialState(Task<IFunctionStore> storeTask)
589+
{
590+
var store = await storeTask;
591+
var flowId = TestFlowId.Create();
592+
593+
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
594+
595+
using var functionsRegistry = new FunctionsRegistry(store, new Settings(unhandledExceptionHandler.Catch));
596+
597+
string? initialEffectValue = null;
598+
string? initialMessageValue = null;
599+
600+
var registration = functionsRegistry
601+
.RegisterAction(
602+
flowId.Type,
603+
async (string _, Workflow workflow) =>
604+
{
605+
initialEffectValue = await workflow.Effect.Get<string>("InitialEffectId");
606+
initialMessageValue = await workflow.Messages.OfType<string>().First();
607+
});
608+
609+
610+
await registration.Invoke(
611+
flowInstance: "hello",
612+
param: "hello",
613+
initialState: new InitialState(
614+
Messages: [new MessageAndIdempotencyKey("InitialMessage")],
615+
Effects: [new InitialEffect(Id: "InitialEffectId", Value: "InitialEffectValue", Exception: null)]
616+
)
617+
);
618+
619+
initialEffectValue.ShouldBe("InitialEffectValue");
620+
initialMessageValue.ShouldBe("InitialMessage");
621+
622+
unhandledExceptionHandler.ShouldNotHaveExceptions();
623+
}
624+
625+
public abstract Task ParamlessCanBeCreatedWithInitialState();
626+
public async Task ParamlessCanBeCreatedWithInitialState(Task<IFunctionStore> storeTask)
627+
{
628+
var store = await storeTask;
629+
var flowId = TestFlowId.Create();
630+
631+
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
632+
633+
using var functionsRegistry = new FunctionsRegistry(store, new Settings(unhandledExceptionHandler.Catch));
634+
635+
string? initialEffectValue = null;
636+
string? initialMessageValue = null;
637+
638+
var registration = functionsRegistry
639+
.RegisterParamless(
640+
flowId.Type,
641+
async (Workflow workflow) =>
642+
{
643+
initialEffectValue = await workflow.Effect.Get<string>("InitialEffectId");
644+
initialMessageValue = await workflow.Messages.OfType<string>().First();
645+
});
646+
647+
648+
await registration.Invoke(
649+
flowInstance: "hello",
650+
initialState: new InitialState(
651+
Messages: [new MessageAndIdempotencyKey("InitialMessage")],
652+
Effects: [new InitialEffect(Id: "InitialEffectId", Value: "InitialEffectValue", Exception: null)]
653+
)
654+
);
655+
656+
initialEffectValue.ShouldBe("InitialEffectValue");
657+
initialMessageValue.ShouldBe("InitialMessage");
658+
659+
unhandledExceptionHandler.ShouldNotHaveExceptions();
660+
}
545661
}

Core/Cleipnir.ResilientFunctions/ActionRegistration.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ namespace Cleipnir.ResilientFunctions;
99

1010
public static class ActionRegistration
1111
{
12-
public delegate Task Invoke<in TParam>(FlowInstance flowInstance, TParam param) where TParam : notnull;
13-
public delegate Task<Scheduled> Schedule<in TParam>(FlowInstance flowInstance, TParam param, bool? detach = null) where TParam : notnull;
12+
public delegate Task Invoke<in TParam>(FlowInstance flowInstance, TParam param, InitialState? initialState = null) where TParam : notnull;
13+
public delegate Task<Scheduled> Schedule<in TParam>(FlowInstance flowInstance, TParam param, bool? detach = null, InitialState? initialState = null) where TParam : notnull;
1414
public delegate Task<BulkScheduled> BulkSchedule<TParam>(IEnumerable<BulkWork<TParam>> instances, bool? detach = null) where TParam : notnull;
1515

1616
public delegate Task<Scheduled> ScheduleAt<in TParam>(

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,13 @@ public InvocationHelper(FlowType flowType, StoredType storedType, bool isParamle
4040
}
4141

4242
public async Task<Tuple<bool, IDisposable>> PersistFunctionInStore(
43+
FlowId flowId,
4344
StoredId storedId,
4445
FlowInstance humanInstanceId,
4546
TParam param,
4647
DateTime? scheduleAt,
47-
StoredId? parent)
48+
StoredId? parent,
49+
InitialState? initialState)
4850
{
4951
if (!_isParamlessFunction)
5052
ArgumentNullException.ThrowIfNull(param);
@@ -53,16 +55,24 @@ public async Task<Tuple<bool, IDisposable>> PersistFunctionInStore(
5355
try
5456
{
5557
var storedParameter = SerializeParameter(param);
56-
5758
var utcNowTicks = DateTime.UtcNow.Ticks;
59+
var effects = initialState == null
60+
? null
61+
: MapInitialEffects(initialState.Effects, flowId);
62+
var messages = initialState == null
63+
? null
64+
: MapInitialMessages(initialState.Messages);
65+
5866
var created = await _functionStore.CreateFunction(
5967
storedId,
6068
humanInstanceId,
6169
storedParameter,
62-
postponeUntil: scheduleAt?.ToUniversalTime().Ticks,
6370
leaseExpiration: utcNowTicks + _settings.LeaseLength.Ticks,
71+
postponeUntil: scheduleAt?.ToUniversalTime().Ticks,
6472
timestamp: utcNowTicks,
65-
parent: parent
73+
parent: parent,
74+
effects,
75+
messages
6676
);
6777

6878
if (!created) runningFunction.Dispose();
@@ -474,4 +484,30 @@ public InnerScheduled<TReturn> CreateInnerScheduled(List<FlowId> scheduledIds, W
474484

475485
return parentWorkflow;
476486
}
487+
488+
private IReadOnlyList<StoredEffect> MapInitialEffects(IEnumerable<InitialEffect> initialEffects, FlowId flowId)
489+
=> initialEffects
490+
.Select(e =>
491+
e.Exception == null
492+
? new StoredEffect(
493+
e.Id.ToEffectId(EffectType.Effect),
494+
e.Id.ToEffectId(EffectType.Effect).ToStoredEffectId(),
495+
WorkStatus.Completed,
496+
Result: Serializer.Serialize(e.Value, e.Value?.GetType() ?? typeof(object)),
497+
StoredException: null)
498+
: new StoredEffect(
499+
e.Id.ToEffectId(EffectType.Effect),
500+
e.Id.ToEffectId(EffectType.Effect).ToStoredEffectId(),
501+
WorkStatus.Completed,
502+
Result: null,
503+
StoredException: Serializer.SerializeException(FatalWorkflowException.CreateNonGeneric(flowId, e.Exception))
504+
)
505+
).ToList();
506+
507+
private IReadOnlyList<StoredMessage> MapInitialMessages(IEnumerable<MessageAndIdempotencyKey> initialMessages)
508+
=> initialMessages.Select(m =>
509+
{
510+
var (content, type) = Serializer.SerializeMessage(m.Message, m.Message.GetType());
511+
return new StoredMessage(content, type, m.IdempotencyKey);
512+
}).ToList();
477513
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/Invoker.cs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ Utilities utilities
3434
_utilities = utilities;
3535
}
3636

37-
public async Task<TReturn> Invoke(FlowInstance instance, TParam param)
37+
public async Task<TReturn> Invoke(FlowInstance instance, TParam param, InitialState? initialState = null)
3838
{
3939
var (flowId, storedId) = CreateIds(instance);
40-
var (created, workflow, disposables) = await PrepareForInvocation(flowId, storedId, param, parent: null);
40+
var (created, workflow, disposables) = await PrepareForInvocation(flowId, storedId, param, parent: null, initialState);
4141
CurrentFlow._workflow.Value = workflow;
4242
if (!created) return await WaitForFunctionResult(flowId, storedId, maxWait: null);
4343

@@ -59,7 +59,7 @@ public async Task<TReturn> Invoke(FlowInstance instance, TParam param)
5959
return result.SucceedWithValue!;
6060
}
6161

62-
public async Task<InnerScheduled<TReturn>> ScheduleInvoke(FlowInstance flowInstance, TParam param, bool? detach)
62+
public async Task<InnerScheduled<TReturn>> ScheduleInvoke(FlowInstance flowInstance, TParam param, bool? detach, InitialState? initialState)
6363
{
6464
var parent = GetAndEnsureParent(detach);
6565
var (flowId, storedId) = CreateIds(flowInstance);
@@ -71,7 +71,7 @@ public async Task<InnerScheduled<TReturn>> ScheduleInvoke(FlowInstance flowInsta
7171
return _invocationHelper.CreateInnerScheduled([flowId], parent, detach);
7272
}
7373

74-
var (created, workflow, disposables) = await PrepareForInvocation(flowId, storedId, param, parent?.StoredId);
74+
var (created, workflow, disposables) = await PrepareForInvocation(flowId, storedId, param, parent?.StoredId, initialState);
7575
CurrentFlow._workflow.Value = workflow;
7676
if (!created)
7777
return _invocationHelper.CreateInnerScheduled([flowId], parent, detach);
@@ -101,7 +101,7 @@ public async Task<InnerScheduled<TReturn>> ScheduleInvoke(FlowInstance flowInsta
101101
public async Task<InnerScheduled<TReturn>> ScheduleAt(FlowInstance instanceId, TParam param, DateTime scheduleAt, bool? detach)
102102
{
103103
if (scheduleAt.ToUniversalTime() <= DateTime.UtcNow)
104-
return await ScheduleInvoke(instanceId, param, detach);
104+
return await ScheduleInvoke(instanceId, param, detach, initialState: null);
105105

106106
var parent = GetAndEnsureParent(detach);
107107
var id = new FlowId(_flowType, instanceId);
@@ -113,11 +113,13 @@ public async Task<InnerScheduled<TReturn>> ScheduleAt(FlowInstance instanceId, T
113113
}
114114

115115
var (_, disposable) = await _invocationHelper.PersistFunctionInStore(
116+
id,
116117
id.ToStoredId(_storedType),
117118
instanceId,
118119
param,
119120
scheduleAt,
120-
parent?.StoredId
121+
parent?.StoredId,
122+
initialState: null
121123
);
122124

123125
disposable.Dispose();
@@ -206,27 +208,29 @@ internal async Task ScheduleRestart(StoredInstance instance, RestartedFunction r
206208
});
207209
}
208210

209-
private async Task<PreparedInvocation> PrepareForInvocation(FlowId flowId, StoredId storedId, TParam param, StoredId? parent)
211+
private async Task<PreparedInvocation> PrepareForInvocation(FlowId flowId, StoredId storedId, TParam param, StoredId? parent, InitialState? initialState)
210212
{
211213
var disposables = new List<IDisposable>(capacity: 3);
212214
var success = false;
213215
try
214216
{
215217
var (persisted, runningFunction) =
216218
await _invocationHelper.PersistFunctionInStore(
219+
flowId,
217220
storedId,
218221
flowId.Instance.Value,
219222
param,
220223
scheduleAt: null,
221-
parent
224+
parent,
225+
initialState
222226
);
223227
disposables.Add(runningFunction);
224228
disposables.Add(_invocationHelper.StartLeaseUpdater(storedId, epoch: 0));
225229
var isWorkflowRunningDisposable = new PropertyDisposable();
226230
disposables.Add(isWorkflowRunningDisposable);
227231
success = persisted;
228232

229-
var (effect, states) = _invocationHelper.CreateEffectAndStates(storedId, flowId, anyEffects: false);
233+
var (effect, states) = _invocationHelper.CreateEffectAndStates(storedId, flowId, anyEffects: initialState != null);
230234
var messages = _invocationHelper.CreateMessages(
231235
storedId,
232236
ScheduleRestart,
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using Cleipnir.ResilientFunctions.Messaging;
4+
5+
namespace Cleipnir.ResilientFunctions.Domain;
6+
7+
public record InitialState(
8+
IEnumerable<MessageAndIdempotencyKey> Messages,
9+
IEnumerable<InitialEffect> Effects
10+
);
11+
12+
public record InitialEffect(string Id, object? Value, Exception? Exception);

Core/Cleipnir.ResilientFunctions/FuncRegistration.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ public static class FuncRegistration
1111
{
1212
public delegate Task<TReturn> Invoke<in TParam, TReturn>(
1313
FlowInstance flowInstance,
14-
TParam param
14+
TParam param,
15+
InitialState? initialState = null
1516
) where TParam : notnull;
1617

1718
public delegate Task<Scheduled<TReturn>> Schedule<in TParam, TReturn>(
1819
FlowInstance flowInstance,
1920
TParam param,
20-
bool? detach = null
21+
bool? detach = null,
22+
InitialState? initialState = null
2123
) where TParam : notnull;
2224

2325
public delegate Task<Scheduled<TReturn>> ScheduleAt<in TParam, TReturn>(

Core/Cleipnir.ResilientFunctions/FunctionsRegistry.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ public FuncRegistration<TParam, TReturn> RegisterFunc<TParam, TReturn>(
250250
storedType,
251251
_functionStore,
252252
invoker.Invoke,
253-
schedule: async (instance, param, detach) => (await invoker.ScheduleInvoke(instance, param, detach)).ToScheduledWithResult(),
253+
schedule: async (instance, param, detach, initialState) => (await invoker.ScheduleInvoke(instance, param, detach, initialState)).ToScheduledWithResult(),
254254
scheduleAt: async (instance, param, until, detach) => (await invoker.ScheduleAt(instance, param, until, detach) ).ToScheduledWithResult(),
255255
bulkSchedule: async (instances, detach) => (await invocationHelper.BulkSchedule(instances.ToList(), detach)).ToScheduledWithResults(),
256256
controlPanels,
@@ -335,8 +335,8 @@ private ParamlessRegistration RegisterParamless(
335335
flowType,
336336
storedType,
337337
_functionStore,
338-
invoke: id => invoker.Invoke(id.Value, param: Unit.Instance),
339-
schedule: async (id, detach) => (await invoker.ScheduleInvoke(id.Value, param: Unit.Instance, detach)).ToScheduledWithoutResult(),
338+
invoke: (id, initialState) => invoker.Invoke(id.Value, param: Unit.Instance, initialState),
339+
schedule: async (id, detach, initialState) => (await invoker.ScheduleInvoke(id.Value, param: Unit.Instance, detach, initialState)).ToScheduledWithoutResult(),
340340
scheduleAt: async (id, at, detach) => (await invoker.ScheduleAt(id.Value, param: Unit.Instance, at, detach)).ToScheduledWithoutResult(),
341341
bulkSchedule: async (ids, detach) => (await invocationHelper.BulkSchedule(ids.Select(id => new BulkWork<Unit>(id.Value, Unit.Instance)).ToList(), detach)).ToScheduledWithoutResults(),
342342
controlPanels,
@@ -421,7 +421,7 @@ public ActionRegistration<TParam> RegisterAction<TParam>(
421421
storedType,
422422
_functionStore,
423423
rActionInvoker.Invoke,
424-
schedule: async (instance, param, detach) => (await rActionInvoker.ScheduleInvoke(instance, param, detach)).ToScheduledWithoutResult(),
424+
schedule: async (instance, param, detach, initialState) => (await rActionInvoker.ScheduleInvoke(instance, param, detach, initialState)).ToScheduledWithoutResult(),
425425
scheduleAt: async (instance, param, until, completion) => (await rActionInvoker.ScheduleAt(instance, param, until, completion)).ToScheduledWithoutResult(),
426426
bulkSchedule: async (instances, detach) => (await invocationHelper.BulkSchedule(instances.ToList(), detach)).ToScheduledWithoutResults(),
427427
controlPanels,

Core/Cleipnir.ResilientFunctions/ParamlessRegistration.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ namespace Cleipnir.ResilientFunctions;
99

1010
public static class Paramless
1111
{
12-
public delegate Task Invoke(FlowInstance flowInstance);
13-
public delegate Task<Scheduled> Schedule(FlowInstance flowInstance, bool? detach = null);
12+
public delegate Task Invoke(FlowInstance flowInstance, InitialState? initialState = null);
13+
public delegate Task<Scheduled> Schedule(FlowInstance flowInstance, bool? detach = null, InitialState? initialState = null);
1414
public delegate Task<BulkScheduled> BulkSchedule(IEnumerable<FlowInstance> instances, bool? detach = null);
1515
public delegate Task<Scheduled> ScheduleAt(
1616
FlowInstance flowInstance,

0 commit comments

Comments
 (0)