Skip to content

Commit 14410ab

Browse files
committed
Refactored LeasesUpdater functionality
1 parent 8687c57 commit 14410ab

File tree

23 files changed

+127
-376
lines changed

23 files changed

+127
-376
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdaterTests/LeaseUpdaterTestFunctionStore.cs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace Cleipnir.ResilientFunctions.Tests.InMemoryTests.LeaseUpdaterTests;
1010

1111
public class LeaseUpdaterTestFunctionStore : IFunctionStore
1212
{
13-
public delegate bool LeaseUpdaterCallback(StoredId flowId, int expectedEpoch, long newLeaseExpiry);
13+
public delegate int LeaseUpdaterCallback(IReadOnlyList<LeaseUpdate> leaseUpdates, long leaseExpiration);
1414

1515
private readonly LeaseUpdaterCallback _leaseUpdaterCallback;
1616
private readonly IFunctionStore _inner = new InMemoryFunctionStore();
@@ -43,14 +43,8 @@ public Task BulkScheduleFunctions(IEnumerable<IdWithParam> functionsWithParam, S
4343
public Task<StoredFlow?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
4444
=> _inner.RestartExecution(storedId, expectedEpoch, leaseExpiration);
4545

46-
public Task<bool> RenewLease(StoredId storedId, int expectedEpoch, long leaseExpiration)
47-
{
48-
var success = _leaseUpdaterCallback(storedId, expectedEpoch, leaseExpiration);
49-
return success.ToTask();
50-
}
51-
5246
public Task<int> RenewLeases(IReadOnlyList<LeaseUpdate> leaseUpdates, long leaseExpiration)
53-
=> _inner.RenewLeases(leaseUpdates, leaseExpiration);
47+
=> _leaseUpdaterCallback(leaseUpdates, leaseExpiration).ToTask();
5448

5549
public Task<IReadOnlyList<IdAndEpoch>> GetExpiredFunctions(long expiresBefore)
5650
=> _inner.GetExpiredFunctions(expiresBefore);

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdaterTests/LeaseUpdaterTests.cs

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,27 @@ public async Task AfterLeaseUpdaterIsStartedStoreIsInvokedContinuouslyWithExpect
2929
const int expectedEpoch = 100;
3030
var invocations = new SyncedList<Parameters>();
3131
var store = new LeaseUpdaterTestFunctionStore(
32-
(id, epoch, leaseExpiry) =>
32+
(leaseUpdates, leaseExpiry) =>
3333
{
34+
var (id, epoch) = leaseUpdates.Single();
3435
invocations.Add(new Parameters(id, ExpectedEpoch: epoch, LeaseExpiry: leaseExpiry));
35-
return true;
36+
return 1;
3637
});
37-
38-
var settings = new Settings(
39-
_unhandledExceptionCatcher.Catch,
40-
leaseLength: TimeSpan.FromMilliseconds(10)
38+
39+
using var leasesUpdater = new LeasesUpdater(
40+
leaseLength: TimeSpan.FromMilliseconds(10),
41+
store,
42+
new UnhandledExceptionHandler(e => _unhandledExceptionCatcher.Catch(e))
4143
);
44+
_ = Task.Run(leasesUpdater.Start);
45+
4246
var updater = LeaseUpdater.CreateAndStart(
4347
_storedId,
44-
_flowId,
4548
expectedEpoch,
46-
store,
47-
SettingsWithDefaults.Default.Merge(settings),
48-
new LeaseUpdaters(store, new UnhandledExceptionHandler(_ => {}))
49+
leasesUpdater
4950
);
50-
51-
await Task.Delay(200);
51+
52+
await Task.Delay(200);
5253
updater.Dispose();
5354

5455
invocations.Count.ShouldBeGreaterThan(2);
@@ -67,23 +68,23 @@ public async Task AfterLeaseUpdaterIsStartedStoreIsInvokedContinuouslyWithExpect
6768
public async Task LeaseUpdaterStopsInvokingStoreWhenFalseIsReturnedFromStore()
6869
{
6970
var syncedCounter = new SyncedCounter();
70-
var store = new LeaseUpdaterTestFunctionStore((id, epoch, life) =>
71+
var store = new LeaseUpdaterTestFunctionStore((leaseUpdates, leaseExpiration) =>
7172
{
7273
syncedCounter.Increment();
73-
return false;
74+
return 0;
7475
});
7576

76-
var settings = new Settings(
77-
_unhandledExceptionCatcher.Catch,
78-
leaseLength: TimeSpan.FromMilliseconds(10)
77+
using var leasesUpdater = new LeasesUpdater(
78+
leaseLength: TimeSpan.FromMilliseconds(10),
79+
store,
80+
new UnhandledExceptionHandler(e => _unhandledExceptionCatcher.Catch(e))
7981
);
82+
_ = leasesUpdater.Start();
83+
8084
var updater = LeaseUpdater.CreateAndStart(
8185
_storedId,
82-
_flowId,
8386
epoch: 0,
84-
store,
85-
SettingsWithDefaults.Default.Merge(settings),
86-
new LeaseUpdaters(store, new UnhandledExceptionHandler(_ => {}))
87+
leasesUpdater
8788
);
8889

8990
await Task.Delay(100);
@@ -98,23 +99,23 @@ public async Task WhenFunctionStoreThrowsExceptionAnTheUnhandledExceptionActionI
9899
{
99100
var syncedCounter = new SyncedCounter();
100101
var store = new LeaseUpdaterTestFunctionStore(
101-
(id, epoch, life) =>
102+
(leaseUpdates, leaseExpiration) =>
102103
{
103104
syncedCounter.Increment();
104105
throw new Exception();
105106
});
106-
107-
var settings = new Settings(
108-
_unhandledExceptionCatcher.Catch,
109-
leaseLength: TimeSpan.FromMilliseconds(10)
107+
108+
using var leasesUpdater = new LeasesUpdater(
109+
leaseLength: TimeSpan.FromMilliseconds(10),
110+
store,
111+
new UnhandledExceptionHandler(e => _unhandledExceptionCatcher.Catch(e))
110112
);
113+
_ = leasesUpdater.Start();
114+
111115
using var updater = LeaseUpdater.CreateAndStart(
112116
_storedId,
113-
_flowId,
114117
epoch: 0,
115-
store,
116-
SettingsWithDefaults.Default.Merge(settings),
117-
new LeaseUpdaters(store, new UnhandledExceptionHandler(_ => {}))
118+
leasesUpdater
118119
);
119120

120121
await BusyWait.Until(() => _unhandledExceptionCatcher.ThrownExceptions.Any());

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdatersTests.cs renamed to Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeasesUpdaterTests.cs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
namespace Cleipnir.ResilientFunctions.Tests.InMemoryTests;
55

66
[TestClass]
7-
public class LeaseUpdatersTests : TestTemplates.RFunctionTests.LeaseUpdatersForLeaseLengthTests
7+
public class LeasesUpdaterTests : TestTemplates.RFunctionTests.LeasesUpdaterTests
88
{
99
[TestMethod]
1010
public override Task LeaseUpdaterUpdatesExpiryForEligibleFlows()
@@ -18,10 +18,6 @@ public override Task LeaseUpdatersRefreshedCorrectlyOnUnexpectedNumberOfAffected
1818
public override Task LeaseUpdatersRepositoryThrowsResultsInUnhandledException()
1919
=> LeaseUpdatersRepositoryThrowsResultsInUnhandledException(FunctionStoreFactory.Create());
2020

21-
[TestMethod]
22-
public override Task RunningLeaseUpdatersCanBeDisposed()
23-
=> RunningLeaseUpdatersCanBeDisposed(FunctionStoreFactory.Create());
24-
2521
[TestMethod]
2622
public override Task FilterOutContainsFiltersOutActiveFlows()
2723
=> FilterOutContainsFiltersOutActiveFlows(FunctionStoreFactory.Create());
Lines changed: 14 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
namespace Cleipnir.ResilientFunctions.Tests.TestTemplates.RFunctionTests;
1313

14-
public abstract class LeaseUpdatersForLeaseLengthTests
14+
public abstract class LeasesUpdaterTests
1515
{
1616
public abstract Task LeaseUpdaterUpdatesExpiryForEligibleFlows();
1717
public async Task LeaseUpdaterUpdatesExpiryForEligibleFlows(Task<IFunctionStore> storeTask)
@@ -21,7 +21,7 @@ public async Task LeaseUpdaterUpdatesExpiryForEligibleFlows(Task<IFunctionStore>
2121
var store = await storeTask;
2222
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
2323
var handler = new UnhandledExceptionHandler(unhandledExceptionHandler.Catch);
24-
var leaseUpdaters = new LeaseUpdatersForLeaseLength(leaseLength, store, handler);
24+
var leaseUpdaters = new LeasesUpdater(leaseLength, store, handler);
2525

2626
var id1 = TestStoredId.Create();
2727
await store.CreateFunction(id1, id1.ToString(), param: null, leaseExpiration: 0, postponeUntil: null, timestamp: DateTime.UtcNow.Ticks, parent: null).ShouldBeTrueAsync();
@@ -67,7 +67,7 @@ public async Task LeaseUpdatersRefreshedCorrectlyOnUnexpectedNumberOfAffectedFlo
6767
var store = await storeTask;
6868
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
6969
var handler = new UnhandledExceptionHandler(unhandledExceptionHandler.Catch);
70-
var leaseUpdaters = new LeaseUpdatersForLeaseLength(leaseLength, store, handler);
70+
var leaseUpdaters = new LeasesUpdater(leaseLength, store, handler);
7171

7272
var id1 = TestStoredId.Create();
7373
await store.CreateFunction(id1, id1.ToString(), param: null, leaseExpiration: 0, postponeUntil: null, timestamp: DateTime.UtcNow.Ticks, parent: null).ShouldBeTrueAsync();
@@ -112,7 +112,7 @@ public async Task LeaseUpdatersRepositoryThrowsResultsInUnhandledException(Task<
112112

113113
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
114114
var handler = new UnhandledExceptionHandler(unhandledExceptionHandler.Catch);
115-
var leaseUpdaters = new LeaseUpdatersForLeaseLength(leaseLength, store, handler);
115+
var leaseUpdaters = new LeasesUpdater(leaseLength, store, handler);
116116

117117
var id1 = TestStoredId.Create();
118118
await store.CreateFunction(id1, id1.ToString(), param: null, leaseExpiration: 0, postponeUntil: null, timestamp: DateTime.UtcNow.Ticks, parent: null).ShouldBeTrueAsync();
@@ -125,32 +125,14 @@ public async Task LeaseUpdatersRepositoryThrowsResultsInUnhandledException(Task<
125125
unhandledExceptionHandler.ThrownExceptions.ShouldNotBeEmpty();
126126
}
127127

128-
public abstract Task RunningLeaseUpdatersCanBeDisposed();
129-
public async Task RunningLeaseUpdatersCanBeDisposed(Task<IFunctionStore> storeTask)
130-
{
131-
var leaseLength = TimeSpan.FromSeconds(120);
132-
var store = new CrashableFunctionStore(await storeTask);
133-
134-
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
135-
var handler = new UnhandledExceptionHandler(unhandledExceptionHandler.Catch);
136-
var leaseUpdaters = new LeaseUpdatersForLeaseLength(leaseLength, store, handler);
137-
138-
var leaseUpdatersTask = leaseUpdaters.Start();
139-
var stopWatch = Stopwatch.StartNew();
140-
await leaseUpdaters.DisposeAsync();
141-
stopWatch.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(5));
142-
143-
leaseUpdatersTask.Wait(TimeSpan.FromSeconds(1));
144-
}
145-
146128
public abstract Task FilterOutContainsFiltersOutActiveFlows();
147129
public async Task FilterOutContainsFiltersOutActiveFlows(Task<IFunctionStore> storeTask)
148130
{
149131
var leaseLength = TimeSpan.FromSeconds(120);
150132
var store = await storeTask;
151133
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
152134
var handler = new UnhandledExceptionHandler(unhandledExceptionHandler.Catch);
153-
var leaseUpdaters = new LeaseUpdatersForLeaseLength(leaseLength, store, handler);
135+
var leaseUpdaters = new LeasesUpdater(leaseLength, store, handler);
154136

155137
var id1 = TestStoredId.Create();
156138
var id2 = TestStoredId.Create();
@@ -176,7 +158,7 @@ public async Task FilterOutContainsReturnsSameCollectionUnmodifiedWhenNoFilterIs
176158
var store = await storeTask;
177159
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
178160
var handler = new UnhandledExceptionHandler(unhandledExceptionHandler.Catch);
179-
var leaseUpdaters = new LeaseUpdatersForLeaseLength(leaseLength, store, handler);
161+
var leaseUpdaters = new LeasesUpdater(leaseLength, store, handler);
180162

181163
var id1 = TestStoredId.Create();
182164
var id2 = TestStoredId.Create();
@@ -198,23 +180,19 @@ public async Task FilterOutContainsReturnsSameCollectionUnmodifiedWhenNoFilterIs
198180
public abstract Task LeaseUpdatersFiltersOutAlreadyContains();
199181
public async Task LeaseUpdatersFiltersOutAlreadyContains(Task<IFunctionStore> storeTask)
200182
{
201-
var leaseLength120 = TimeSpan.FromSeconds(120);
202-
var leaseLength125 = TimeSpan.FromSeconds(125);
183+
var leaseLength = TimeSpan.FromSeconds(120);
203184
var store = await storeTask;
204185
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
205186
var handler = new UnhandledExceptionHandler(unhandledExceptionHandler.Catch);
206-
var leaseUpdaters = new LeaseUpdaters(store, handler);
207-
208-
var leaseUpdatersForLeaseLength120 = leaseUpdaters.GetOrCreateLeaseUpdatersForLeaseLength(leaseLength120);
209-
var leaseUpdatersForLeaseLength125 = leaseUpdaters.GetOrCreateLeaseUpdatersForLeaseLength(leaseLength125);
187+
var leaseUpdaters = new LeasesUpdater(leaseLength, store, handler);
210188

211189
var id1 = TestStoredId.Create();
212190
var id2 = TestStoredId.Create();
213191
var id3 = TestStoredId.Create();
214192
var id4 = TestStoredId.Create();
215193

216-
leaseUpdatersForLeaseLength120.Set(id1, epoch: 0, expiresTicks: 0);
217-
leaseUpdatersForLeaseLength125.Set(id2, epoch: 0, expiresTicks: 0);
194+
leaseUpdaters.Set(id1, epoch: 0, expiresTicks: 0);
195+
leaseUpdaters.Set(id2, epoch: 0, expiresTicks: 0);
218196

219197
var idAndEpoches = new List<IdAndEpoch>
220198
{
@@ -234,23 +212,19 @@ public async Task LeaseUpdatersFiltersOutAlreadyContains(Task<IFunctionStore> st
234212
public abstract Task LeaseUpdatersReturnsSameReferenceWhenFiltersWhenThereAreNoAlreadyContains();
235213
public async Task LeaseUpdatersReturnsSameReferenceWhenFiltersWhenThereAreNoAlreadyContains(Task<IFunctionStore> storeTask)
236214
{
237-
var leaseLength120 = TimeSpan.FromSeconds(120);
238-
var leaseLength125 = TimeSpan.FromSeconds(125);
215+
var leaseLength = TimeSpan.FromSeconds(120);
239216
var store = await storeTask;
240217
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
241218
var handler = new UnhandledExceptionHandler(unhandledExceptionHandler.Catch);
242-
var leaseUpdaters = new LeaseUpdaters(store, handler);
243-
244-
var leaseUpdatersForLeaseLength120 = leaseUpdaters.GetOrCreateLeaseUpdatersForLeaseLength(leaseLength120);
245-
var leaseUpdatersForLeaseLength125 = leaseUpdaters.GetOrCreateLeaseUpdatersForLeaseLength(leaseLength125);
219+
var leaseUpdaters = new LeasesUpdater(leaseLength, store, handler);
246220

247221
var id1 = TestStoredId.Create();
248222
var id2 = TestStoredId.Create();
249223
var id3 = TestStoredId.Create();
250224
var id4 = TestStoredId.Create();
251225

252-
leaseUpdatersForLeaseLength120.Set(id1, epoch: 0, expiresTicks: 0);
253-
leaseUpdatersForLeaseLength125.Set(id2, epoch: 0, expiresTicks: 0);
226+
leaseUpdaters.Set(id1, epoch: 0, expiresTicks: 0);
227+
leaseUpdaters.Set(id2, epoch: 0, expiresTicks: 0);
254228

255229
var idAndEpoches = new List<IdAndEpoch>
256230
{

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreCrudTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ await store.CreateFunction(
116116
parent: null
117117
).ShouldBeTrueAsync();
118118

119-
await store.RenewLease(StoredId, expectedEpoch: 0, leaseExpiration: 1).ShouldBeTrueAsync();
119+
await store.RenewLeases([new LeaseUpdate(StoredId, ExpectedEpoch: 0)], leaseExpiration: 1).ShouldBeAsync(1);
120120

121121
var storedFunction = await store.GetFunction(StoredId);
122122
storedFunction!.Epoch.ShouldBe(0);
@@ -138,7 +138,7 @@ await store.CreateFunction(
138138
parent: null
139139
).ShouldBeTrueAsync();
140140

141-
await store.RenewLease(StoredId, expectedEpoch: 1, leaseExpiration: 1).ShouldBeFalseAsync();
141+
await store.RenewLeases([new LeaseUpdate(StoredId, ExpectedEpoch: 1)], leaseExpiration: 1).ShouldBeAsync(0);
142142

143143
var storedFunction = await store.GetFunction(StoredId);
144144
storedFunction!.Epoch.ShouldBe(0);

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreTests.cs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,13 @@ await store.CreateFunction(
9494
public abstract Task LeaseIsUpdatedWhenAsExpected();
9595
protected async Task LeaseIsUpdatedWhenAsExpected(Task<IFunctionStore> storeTask)
9696
{
97-
var functionId = TestStoredId.Create();
97+
var storedId = TestStoredId.Create();
9898

9999
var store = await storeTask;
100100
var paramJson = PARAM.ToJson();
101101

102102
await store.CreateFunction(
103-
functionId,
103+
storedId,
104104
"humanInstanceId",
105105
paramJson.ToUtf8Bytes(),
106106
leaseExpiration: DateTime.UtcNow.Ticks,
@@ -109,11 +109,10 @@ await store.CreateFunction(
109109
parent: null
110110
).ShouldBeTrueAsync();
111111

112-
await store
113-
.RenewLease(functionId, expectedEpoch: 0, leaseExpiration: 1)
114-
.ShouldBeTrueAsync();
112+
var affectedRows = await store.RenewLeases([new LeaseUpdate(storedId, ExpectedEpoch: 0)], leaseExpiration: 1);
113+
affectedRows.ShouldBe(1);
115114

116-
var sf = await store.GetFunction(functionId);
115+
var sf = await store.GetFunction(storedId);
117116
sf.ShouldNotBeNull();
118117
sf.Epoch.ShouldBe(0);
119118
sf.Expires.ShouldBe(1);
@@ -138,11 +137,7 @@ await store.CreateFunction(
138137
parent: null
139138
).ShouldBeTrueAsync();
140139

141-
await store.RenewLease(
142-
functionId,
143-
expectedEpoch: 1,
144-
leaseExpiration: 1
145-
).ShouldBeFalseAsync();
140+
await store.RenewLeases([new LeaseUpdate(functionId, ExpectedEpoch: 1)], leaseExpiration: 1).ShouldBeAsync(0);
146141

147142
await store
148143
.GetExpiredFunctions(expiresBefore: leaseExpiration + 1)

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,6 @@ public Task BulkScheduleFunctions(IEnumerable<IdWithParam> functionsWithParam, S
7676
? Task.FromException<StoredFlow?>(new TimeoutException())
7777
: _inner.RestartExecution(storedId, expectedEpoch, leaseExpiration);
7878

79-
public Task<bool> RenewLease(StoredId storedId, int expectedEpoch, long leaseExpiration)
80-
=> _crashed
81-
? Task.FromException<bool>(new TimeoutException())
82-
: _inner.RenewLease(storedId, expectedEpoch, leaseExpiration);
83-
8479
public Task<int> RenewLeases(IReadOnlyList<LeaseUpdate> leaseUpdates, long leaseExpiration)
8580
=> _crashed
8681
? Task.FromException<int>(new TimeoutException())

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,19 @@ internal class InvocationHelper<TParam, TReturn>
2222
private readonly bool _isParamlessFunction;
2323
private readonly FlowType _flowType;
2424
private readonly StoredType _storedType;
25-
private readonly LeaseUpdaters _leaseUpdaters;
25+
private readonly LeasesUpdater _leasesUpdater;
2626

2727
private ISerializer Serializer { get; }
2828

29-
public InvocationHelper(FlowType flowType, StoredType storedType, bool isParamlessFunction, SettingsWithDefaults settings, IFunctionStore functionStore, ShutdownCoordinator shutdownCoordinator, LeaseUpdaters leaseUpdaters)
29+
public InvocationHelper(FlowType flowType, StoredType storedType, bool isParamlessFunction, SettingsWithDefaults settings, IFunctionStore functionStore, ShutdownCoordinator shutdownCoordinator, LeasesUpdater leasesUpdater)
3030
{
3131
_flowType = flowType;
3232
_isParamlessFunction = isParamlessFunction;
3333
_settings = settings;
3434

3535
Serializer = new ErrorHandlingDecorator(new CustomSerializableDecorator(settings.Serializer));
3636
_shutdownCoordinator = shutdownCoordinator;
37-
_leaseUpdaters = leaseUpdaters;
37+
_leasesUpdater = leasesUpdater;
3838
_storedType = storedType;
3939
_functionStore = functionStore;
4040
}
@@ -306,8 +306,8 @@ await _functionStore.FailFunction(
306306

307307
internal record PreparedReInvocation(FlowId FlowId, TParam? Param, int Epoch, IDisposable RunningFunction, StoredId? Parent);
308308

309-
public IDisposable StartLeaseUpdater(StoredId storedId, FlowId flowId, int epoch = 0)
310-
=> LeaseUpdater.CreateAndStart(storedId, flowId, epoch, _functionStore, _settings, _leaseUpdaters);
309+
public IDisposable StartLeaseUpdater(StoredId storedId, int epoch = 0)
310+
=> LeaseUpdater.CreateAndStart(storedId, epoch, _leasesUpdater);
311311

312312
public async Task<bool> SetFunctionState(
313313
StoredId storedId,

0 commit comments

Comments
 (0)