Skip to content

Commit 8ce59a2

Browse files
committed
Reschedule postponed invocation when interrupted
1 parent ef20f9c commit 8ce59a2

File tree

24 files changed

+146
-42
lines changed

24 files changed

+146
-42
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdaterTests/LeaseUpdaterTestFunctionStore.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@ public Task<bool> PostponeFunction(
7272
StoredId storedId,
7373
long postponeUntil,
7474
long timestamp,
75-
bool onlyIfNotInterrupted,
75+
bool ignoreInterrupted,
7676
int expectedEpoch,
7777
ComplimentaryState complimentaryState
78-
) => _inner.PostponeFunction(storedId, postponeUntil, timestamp, onlyIfNotInterrupted, expectedEpoch, complimentaryState);
78+
) => _inner.PostponeFunction(storedId, postponeUntil, timestamp, ignoreInterrupted, expectedEpoch, complimentaryState);
7979

8080
public Task<bool> FailFunction(
8181
StoredId storedId,

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/DelayedStartUpTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ await store.PostponeFunction(
108108
registration.MapToStoredId(functionId.Instance),
109109
postponeUntil: 0,
110110
timestamp: DateTime.UtcNow.Ticks,
111-
onlyIfNotInterrupted: false,
111+
ignoreInterrupted: true,
112112
expectedEpoch: 0,
113113
complimentaryState: new ComplimentaryState(storedParameter.ToUtf8Bytes().ToFunc(), LeaseLength: 0)
114114
).ShouldBeTrueAsync();
@@ -146,7 +146,7 @@ await store.PostponeFunction(
146146
registration.MapToStoredId(functionId.Instance),
147147
postponeUntil: 0,
148148
timestamp: DateTime.UtcNow.Ticks,
149-
onlyIfNotInterrupted: false,
149+
ignoreInterrupted: true,
150150
expectedEpoch: 0,
151151
new ComplimentaryState(storedParameter.ToFunc(), LeaseLength: 0)
152152
);

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/RFunctionTests/PostponedTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,8 @@ public override Task WorkflowDelayInvocationDelaysFunction()
6161
[TestMethod]
6262
public override Task WorkflowDelayWithDateTimeInvocationDelaysFunction()
6363
=> WorkflowDelayWithDateTimeInvocationDelaysFunction(FunctionStoreFactory.Create());
64+
65+
[TestMethod]
66+
public override Task InterruptedFunctionIsRescheduledWhenPostponed()
67+
=> InterruptedFunctionIsRescheduledWhenPostponed(FunctionStoreFactory.Create());
6468
}

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/ShutdownCoordinationTests/RFunctionsShutdownTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ await store.PostponeFunction(
181181
registration.MapToStoredId(functionId.Instance),
182182
postponeUntil: DateTime.UtcNow.AddDays(-1).Ticks,
183183
timestamp: DateTime.UtcNow.Ticks,
184-
onlyIfNotInterrupted: false,
184+
ignoreInterrupted: true,
185185
expectedEpoch: 0,
186186
new ComplimentaryState(() => storedParameter.ToUtf8Bytes(), LeaseLength: 0)
187187
).ShouldBeTrueAsync();

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,4 +199,8 @@ public override Task MultipleFunctionsStatusCanBeFetched()
199199
[TestMethod]
200200
public override Task InterruptedFunctionIsNotPostponedWhenFlagIsSet()
201201
=> InterruptedFunctionIsNotPostponedWhenFlagIsSet(FunctionStoreFactory.Create());
202+
203+
[TestMethod]
204+
public override Task InterruptedFunctionIsPostponedWhenIgnoringInterruptedFunction()
205+
=> InterruptedFunctionIsPostponedWhenIgnoringInterruptedFunction(FunctionStoreFactory.Create());
202206
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/RFunctionTests/PostponedTests.cs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,7 @@ await store.PostponeFunction(
689689
storedId,
690690
postponeUntil: DateTime.UtcNow.AddDays(-1).Ticks,
691691
timestamp: DateTime.UtcNow.Ticks,
692-
onlyIfNotInterrupted: false,
692+
ignoreInterrupted: false,
693693
expectedEpoch: 0,
694694
complimentaryState: new ComplimentaryState(storedParameter.ToUtf8Bytes().ToFunc(), LeaseLength: 0)
695695
).ShouldBeTrueAsync();
@@ -876,4 +876,46 @@ await Should.ThrowAsync<InvocationPostponedException>(
876876

877877
unhandledExceptionHandler.ShouldNotHaveExceptions();
878878
}
879+
880+
public abstract Task InterruptedFunctionIsRescheduledWhenPostponed();
881+
protected async Task InterruptedFunctionIsRescheduledWhenPostponed(Task<IFunctionStore> storeTask)
882+
{
883+
var store = await storeTask;
884+
var testId = TestFlowId.Create();
885+
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
886+
887+
using var functionsRegistry = new FunctionsRegistry
888+
(
889+
store,
890+
new Settings(unhandledExceptionHandler.Catch)
891+
);
892+
893+
var insideFunctionFlag = new SyncedFlag();
894+
var mayContinueFlag = new SyncedFlag();
895+
var invocations = new SyncedCounter();
896+
897+
var registration = functionsRegistry
898+
.RegisterParamless(
899+
testId.Type,
900+
async workflow =>
901+
{
902+
invocations.Increment();
903+
insideFunctionFlag.Raise();
904+
await mayContinueFlag.WaitForRaised();
905+
await workflow.Delay(TimeSpan.FromDays(1));
906+
});
907+
908+
await registration.Schedule(testId.Instance);
909+
910+
await insideFunctionFlag.WaitForRaised();
911+
await registration.Interrupt([testId.Instance]);
912+
mayContinueFlag.Raise();
913+
914+
var controlPanel = await registration.ControlPanel(testId.Instance);
915+
controlPanel.ShouldNotBeNull();
916+
await controlPanel.BusyWaitUntil(c => c.Status == Status.Postponed);
917+
918+
invocations.Current.ShouldBe(2);
919+
unhandledExceptionHandler.ShouldNotHaveExceptions();
920+
}
879921
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreTests.cs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ await store.PostponeFunction(
291291
functionId,
292292
postponeUntil: nowTicks,
293293
timestamp: DateTime.UtcNow.Ticks,
294-
onlyIfNotInterrupted: false,
294+
ignoreInterrupted: false,
295295
expectedEpoch: 0,
296296
complimentaryState: new ComplimentaryState(storedParameter.ToUtf8Bytes().ToFunc(), LeaseLength: 0)
297297
).ShouldBeTrueAsync();
@@ -326,7 +326,7 @@ await store.PostponeFunction(
326326
functionId,
327327
postponeUntil: nowTicks,
328328
timestamp: DateTime.UtcNow.Ticks,
329-
onlyIfNotInterrupted: false,
329+
ignoreInterrupted: false,
330330
expectedEpoch: 0,
331331
complimentaryState: new ComplimentaryState(storedParameter.ToUtf8Bytes().ToFunc(), LeaseLength: 0)
332332
).ShouldBeTrueAsync();
@@ -361,7 +361,7 @@ await store.PostponeFunction(
361361
functionId,
362362
postponeUntil: nowTicks,
363363
timestamp: DateTime.UtcNow.Ticks,
364-
onlyIfNotInterrupted: false,
364+
ignoreInterrupted: false,
365365
expectedEpoch: 1,
366366
complimentaryState: new ComplimentaryState(storedParameter.ToUtf8Bytes().ToFunc(), LeaseLength: 0)
367367
).ShouldBeFalseAsync();
@@ -875,7 +875,7 @@ await store.PostponeFunction(
875875
functionId,
876876
postponeUntil: DateTime.UtcNow.Ticks,
877877
timestamp: DateTime.UtcNow.Ticks,
878-
onlyIfNotInterrupted: false,
878+
ignoreInterrupted: false,
879879
expectedEpoch: 0,
880880
complimentaryState: new ComplimentaryState(Test.SimpleStoredParameter.ToFunc(), LeaseLength: 0)
881881
);
@@ -1476,7 +1476,7 @@ await store.CreateFunction(
14761476
storedId,
14771477
postponeUntil: 0,
14781478
timestamp: 0,
1479-
onlyIfNotInterrupted: true,
1479+
ignoreInterrupted: false,
14801480
expectedEpoch: 0,
14811481
new ComplimentaryState(() => Test.SimpleStoredParameter, LeaseLength: 0)
14821482
);
@@ -1486,4 +1486,36 @@ await store.CreateFunction(
14861486
sf.Status.ShouldBe(Status.Executing);
14871487
sf.Epoch.ShouldBe(0);
14881488
}
1489+
1490+
public abstract Task InterruptedFunctionIsPostponedWhenIgnoringInterruptedFunction();
1491+
protected async Task InterruptedFunctionIsPostponedWhenIgnoringInterruptedFunction(Task<IFunctionStore> storeTask)
1492+
{
1493+
var storedId = TestStoredId.Create();
1494+
var store = await storeTask;
1495+
1496+
await store.CreateFunction(
1497+
storedId,
1498+
"humanInstanceId",
1499+
param: Test.SimpleStoredParameter,
1500+
leaseExpiration: DateTime.UtcNow.Ticks,
1501+
postponeUntil: null,
1502+
timestamp: DateTime.UtcNow.Ticks,
1503+
parent: null
1504+
).ShouldBeTrueAsync();
1505+
1506+
await store.Interrupt([storedId]);
1507+
1508+
var success = await store.PostponeFunction(
1509+
storedId,
1510+
postponeUntil: 0,
1511+
timestamp: 0,
1512+
ignoreInterrupted: true,
1513+
expectedEpoch: 0,
1514+
new ComplimentaryState(() => Test.SimpleStoredParameter, LeaseLength: 0)
1515+
);
1516+
success.ShouldBeTrue();
1517+
1518+
var sf = await store.GetFunction(storedId).ShouldNotBeNullAsync();
1519+
sf.Status.ShouldBe(Status.Postponed);
1520+
}
14891521
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,15 @@ public async Task<bool> PostponeFunction(
119119
StoredId storedId,
120120
long postponeUntil,
121121
long timestamp,
122-
bool onlyIfNotInterrupted,
122+
bool ignoreInterrupted,
123123
int expectedEpoch,
124124
ComplimentaryState complimentaryState
125125
)
126126
{
127127
if (_crashed)
128128
throw new TimeoutException();
129129

130-
var result = await _inner.PostponeFunction(storedId, postponeUntil, timestamp, onlyIfNotInterrupted, expectedEpoch, complimentaryState);
130+
var result = await _inner.PostponeFunction(storedId, postponeUntil, timestamp, ignoreInterrupted, expectedEpoch, complimentaryState);
131131
AfterPostponeFunctionFlag.Raise();
132132

133133
return result;

Core/Cleipnir.ResilientFunctions/BaseRegistration.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ public async Task<IReadOnlyList<StoredInstance>> GetInstances(Status? status = n
3232
: _functionStore.GetInstances(StoredType, status.Value)
3333
);
3434
}
35-
35+
36+
public Task Interrupt(IEnumerable<FlowInstance> instances)
37+
=> Interrupt(instances.Select(i => i.ToStoredInstance()));
3638
public async Task Interrupt(IEnumerable<StoredInstance> storedInstances)
3739
=> await _functionStore.Interrupt(storedInstances.Select(si => new StoredId(StoredType, si)));
3840
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,10 @@ public async Task<PersistResultOutcome> PersistResult(
160160
storedId,
161161
postponeUntil: result.Postpone!.DateTime.Ticks,
162162
timestamp: DateTime.UtcNow.Ticks,
163-
onlyIfNotInterrupted: false, //todo handler this the same way as suspension
163+
ignoreInterrupted: false,
164164
expectedEpoch,
165165
complementaryState
166-
) ? PersistResultOutcome.Success : PersistResultOutcome.Failed;
166+
) ? PersistResultOutcome.Success : PersistResultOutcome.Reschedule;
167167
case Outcome.Fail:
168168
return await _functionStore.FailFunction(
169169
storedId,
@@ -173,15 +173,12 @@ public async Task<PersistResultOutcome> PersistResult(
173173
complementaryState
174174
) ? PersistResultOutcome.Success : PersistResultOutcome.Failed;
175175
case Outcome.Suspend:
176-
var success = await _functionStore.SuspendFunction(
176+
return await _functionStore.SuspendFunction(
177177
storedId,
178178
timestamp: DateTime.UtcNow.Ticks,
179179
expectedEpoch,
180180
complementaryState
181-
);
182-
return success ?
183-
PersistResultOutcome.Success :
184-
PersistResultOutcome.Reschedule;
181+
) ? PersistResultOutcome.Success : PersistResultOutcome.Reschedule;
185182
default:
186183
throw new ArgumentOutOfRangeException();
187184
}

Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ Task<bool> PostponeFunction(
6969
StoredId storedId,
7070
long postponeUntil,
7171
long timestamp,
72-
bool onlyIfNotInterrupted,
72+
bool ignoreInterrupted,
7373
int expectedEpoch,
7474
ComplimentaryState complimentaryState
7575
);

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ public Task<bool> PostponeFunction(
232232
StoredId storedId,
233233
long postponeUntil,
234234
long timestamp,
235-
bool onlyIfNotInterrupted,
235+
bool ignoreInterrupted,
236236
int expectedEpoch,
237237
ComplimentaryState complimentaryState)
238238
{
@@ -243,7 +243,7 @@ public Task<bool> PostponeFunction(
243243
var state = _states[storedId];
244244
if (state.Epoch != expectedEpoch) return false.ToTask();
245245

246-
if (onlyIfNotInterrupted && state.Interrupted)
246+
if (!ignoreInterrupted && state.Interrupted)
247247
return false.ToTask();
248248

249249
state.Status = Status.Postponed;

Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,12 @@ public Task<bool> PostponeFunction(
101101
StoredId storedId,
102102
long postponeUntil,
103103
long timestamp,
104-
bool onlyIfNotInterrupted,
104+
bool ignoreInterrupted,
105105
int expectedEpoch,
106106
ComplimentaryState complimentaryState
107107
) => _crashed
108108
? Task.FromException<bool>(new TimeoutException())
109-
: _inner.PostponeFunction(storedId, postponeUntil, timestamp, onlyIfNotInterrupted, expectedEpoch, complimentaryState);
109+
: _inner.PostponeFunction(storedId, postponeUntil, timestamp, ignoreInterrupted, expectedEpoch, complimentaryState);
110110

111111
public Task<bool> FailFunction(
112112
StoredId storedId,

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/RFunctionTests/PostponedTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,8 @@ public override Task WorkflowDelayInvocationDelaysFunction()
6060
[TestMethod]
6161
public override Task WorkflowDelayWithDateTimeInvocationDelaysFunction()
6262
=> WorkflowDelayWithDateTimeInvocationDelaysFunction(FunctionStoreFactory.Create());
63+
64+
[TestMethod]
65+
public override Task InterruptedFunctionIsRescheduledWhenPostponed()
66+
=> InterruptedFunctionIsRescheduledWhenPostponed(FunctionStoreFactory.Create());
6367
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/StoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,4 +190,8 @@ public override Task MultipleFunctionsStatusCanBeFetched()
190190
[TestMethod]
191191
public override Task InterruptedFunctionIsNotPostponedWhenFlagIsSet()
192192
=> InterruptedFunctionIsNotPostponedWhenFlagIsSet(FunctionStoreFactory.Create());
193+
194+
[TestMethod]
195+
public override Task InterruptedFunctionIsPostponedWhenIgnoringInterruptedFunction()
196+
=> InterruptedFunctionIsPostponedWhenIgnoringInterruptedFunction(FunctionStoreFactory.Create());
193197
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ public async Task<bool> PostponeFunction(
381381
StoredId storedId,
382382
long postponeUntil,
383383
long timestamp,
384-
bool onlyIfNotInterrupted,
384+
bool ignoreInterrupted,
385385
int expectedEpoch,
386386
ComplimentaryState complimentaryState)
387387
{
@@ -393,11 +393,11 @@ public async Task<bool> PostponeFunction(
393393
type = ? AND
394394
instance = ? AND
395395
epoch = ? AND
396-
1 = 1";
396+
interrupted = 0";
397397

398398
var sql = _postponedFunctionSql;
399-
if (onlyIfNotInterrupted)
400-
sql = sql.Replace("1 = 1", "interrupted = 0");
399+
if (ignoreInterrupted)
400+
sql = sql.Replace("interrupted = 0", "1 = 1");
401401

402402
await using var command = new MySqlCommand(sql, conn)
403403
{

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/RFunctionTests/PostponedTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,8 @@ public override Task WorkflowDelayInvocationDelaysFunction()
6161
[TestMethod]
6262
public override Task WorkflowDelayWithDateTimeInvocationDelaysFunction()
6363
=> WorkflowDelayWithDateTimeInvocationDelaysFunction(FunctionStoreFactory.Create());
64+
65+
[TestMethod]
66+
public override Task InterruptedFunctionIsRescheduledWhenPostponed()
67+
=> InterruptedFunctionIsRescheduledWhenPostponed(FunctionStoreFactory.Create());
6468
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/StoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,4 +192,8 @@ public override Task MultipleFunctionsStatusCanBeFetched()
192192
[TestMethod]
193193
public override Task InterruptedFunctionIsNotPostponedWhenFlagIsSet()
194194
=> InterruptedFunctionIsNotPostponedWhenFlagIsSet(FunctionStoreFactory.Create());
195+
196+
[TestMethod]
197+
public override Task InterruptedFunctionIsPostponedWhenIgnoringInterruptedFunction()
198+
=> InterruptedFunctionIsPostponedWhenIgnoringInterruptedFunction(FunctionStoreFactory.Create());
195199
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ public async Task<bool> PostponeFunction(
397397
StoredId storedId,
398398
long postponeUntil,
399399
long timestamp,
400-
bool onlyIfNotInterrupted,
400+
bool ignoreInterrupted,
401401
int expectedEpoch,
402402
ComplimentaryState complimentaryState)
403403
{
@@ -409,11 +409,11 @@ public async Task<bool> PostponeFunction(
409409
type = $3 AND
410410
instance = $4 AND
411411
epoch = $5 AND
412-
1 = 1";
412+
interrupted = FALSE";
413413

414414
var sql = _postponeFunctionSql;
415-
if (onlyIfNotInterrupted)
416-
sql = sql.Replace("1 = 1", "interrupted = FALSE");
415+
if (ignoreInterrupted)
416+
sql = sql.Replace("interrupted = FALSE", "1 = 1");
417417

418418
await using var command = new NpgsqlCommand(sql, conn)
419419
{

0 commit comments

Comments
 (0)