Skip to content

Commit 6afe7c1

Browse files
committed
Implemented LeaseUpdaters with batching capability
1 parent c68bf35 commit 6afe7c1

File tree

17 files changed

+666
-39
lines changed

17 files changed

+666
-39
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdaterTests/LeaseUpdaterTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public async Task AfterLeaseUpdaterIsStartedStoreIsInvokedContinuouslyWithExpect
4545
expectedEpoch,
4646
store,
4747
SettingsWithDefaults.Default.Merge(settings),
48-
new LeaseUpdaters()
48+
new LeaseUpdaters(store, new UnhandledExceptionHandler(_ => {}))
4949
);
5050

5151
await Task.Delay(200);
@@ -83,7 +83,7 @@ public async Task LeaseUpdaterStopsInvokingStoreWhenFalseIsReturnedFromStore()
8383
epoch: 0,
8484
store,
8585
SettingsWithDefaults.Default.Merge(settings),
86-
new LeaseUpdaters()
86+
new LeaseUpdaters(store, new UnhandledExceptionHandler(_ => {}))
8787
);
8888

8989
await Task.Delay(100);
@@ -114,7 +114,7 @@ public async Task WhenFunctionStoreThrowsExceptionAnTheUnhandledExceptionActionI
114114
epoch: 0,
115115
store,
116116
SettingsWithDefaults.Default.Merge(settings),
117-
new LeaseUpdaters()
117+
new LeaseUpdaters(store, new UnhandledExceptionHandler(_ => {}))
118118
);
119119

120120
await BusyWait.Until(() => _unhandledExceptionCatcher.ThrownExceptions.Any());
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
using System.Threading.Tasks;
2+
using Microsoft.VisualStudio.TestTools.UnitTesting;
3+
4+
namespace Cleipnir.ResilientFunctions.Tests.InMemoryTests;
5+
6+
[TestClass]
7+
public class LeaseUpdatersTests : TestTemplates.RFunctionTests.LeaseUpdatersForLeaseLengthTests
8+
{
9+
[TestMethod]
10+
public override Task LeaseUpdaterUpdatesExpiryForEligibleFlows()
11+
=> LeaseUpdaterUpdatesExpiryForEligibleFlows(FunctionStoreFactory.Create());
12+
13+
[TestMethod]
14+
public override Task LeaseUpdatersRefreshedCorrectlyOnUnexpectedNumberOfAffectedFlows()
15+
=> LeaseUpdatersRefreshedCorrectlyOnUnexpectedNumberOfAffectedFlows(FunctionStoreFactory.Create());
16+
17+
[TestMethod]
18+
public override Task LeaseUpdatersRepositoryThrowsResultsInUnhandledException()
19+
=> LeaseUpdatersRepositoryThrowsResultsInUnhandledException(FunctionStoreFactory.Create());
20+
21+
[TestMethod]
22+
public override Task RunningLeaseUpdatersCanBeDisposed()
23+
=> RunningLeaseUpdatersCanBeDisposed(FunctionStoreFactory.Create());
24+
25+
[TestMethod]
26+
public override Task FilterOutContainsFiltersOutActiveFlows()
27+
=> FilterOutContainsFiltersOutActiveFlows(FunctionStoreFactory.Create());
28+
29+
[TestMethod]
30+
public override Task FilterOutContainsReturnsSameCollectionUnmodifiedWhenNoFilterIsPerformed()
31+
=> FilterOutContainsReturnsSameCollectionUnmodifiedWhenNoFilterIsPerformed(FunctionStoreFactory.Create());
32+
33+
[TestMethod]
34+
public override Task LeaseUpdatersFiltersOutAlreadyContains()
35+
=> LeaseUpdatersFiltersOutAlreadyContains(FunctionStoreFactory.Create());
36+
37+
[TestMethod]
38+
public override Task LeaseUpdatersReturnsSameReferenceWhenFiltersWhenThereAreNoAlreadyContains()
39+
=> LeaseUpdatersReturnsSameReferenceWhenFiltersWhenThereAreNoAlreadyContains(FunctionStoreFactory.Create());
40+
}
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Diagnostics;
4+
using System.Linq;
5+
using System.Threading.Tasks;
6+
using Cleipnir.ResilientFunctions.CoreRuntime;
7+
using Cleipnir.ResilientFunctions.Storage;
8+
using Cleipnir.ResilientFunctions.Tests.TestTemplates.WatchDogsTests;
9+
using Cleipnir.ResilientFunctions.Tests.Utils;
10+
using Shouldly;
11+
12+
namespace Cleipnir.ResilientFunctions.Tests.TestTemplates.RFunctionTests;
13+
14+
public abstract class LeaseUpdatersForLeaseLengthTests
15+
{
16+
public abstract Task LeaseUpdaterUpdatesExpiryForEligibleFlows();
17+
public async Task LeaseUpdaterUpdatesExpiryForEligibleFlows(Task<IFunctionStore> storeTask)
18+
{
19+
var beforeTicks = DateTime.UtcNow.Ticks;
20+
var leaseLength = TimeSpan.FromSeconds(120);
21+
var store = await storeTask;
22+
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
23+
var handler = new UnhandledExceptionHandler(unhandledExceptionHandler.Catch);
24+
var leaseUpdaters = new LeaseUpdatersForLeaseLength(leaseLength, store, handler);
25+
26+
var id1 = TestStoredId.Create();
27+
await store.CreateFunction(id1, id1.ToString(), param: null, leaseExpiration: 0, postponeUntil: null, timestamp: DateTime.UtcNow.Ticks, parent: null).ShouldBeTrueAsync();
28+
29+
var id2 = TestStoredId.Create();
30+
var id2Expires = 1_000_000_000_000_000_000L;
31+
await store.CreateFunction(id2, id2.ToString(), param: null, leaseExpiration: id2Expires, postponeUntil: null, timestamp: DateTime.UtcNow.Ticks, parent: null).ShouldBeTrueAsync();
32+
33+
var id3 = TestStoredId.Create();
34+
await store.CreateFunction(id3, id3.ToString(), param: null, leaseExpiration: 0, postponeUntil: null, timestamp: DateTime.UtcNow.Ticks, parent: null).ShouldBeTrueAsync();
35+
36+
leaseUpdaters.Set(id1, epoch: 0, expiresTicks: 0);
37+
leaseUpdaters.Set(id2, epoch: 0, expiresTicks: id2Expires);
38+
leaseUpdaters.Set(id3, epoch: 0, expiresTicks: 0);
39+
await leaseUpdaters.RenewLeases();
40+
41+
var sf1 = await store.GetFunction(id1).ShouldNotBeNullAsync();
42+
sf1.Expires.ShouldBeGreaterThan(beforeTicks);
43+
44+
var sf2 = await store.GetFunction(id2).ShouldNotBeNullAsync();
45+
sf2.Expires.ShouldBe(id2Expires);
46+
47+
var sf3 = await store.GetFunction(id3).ShouldNotBeNullAsync();
48+
sf3.Expires.ShouldBeGreaterThan(beforeTicks);
49+
50+
var executingFlows = leaseUpdaters.GetExecutingFlows();
51+
executingFlows.Count.ShouldBe(3);
52+
executingFlows[id1].Epoch.ShouldBe(0);
53+
executingFlows[id1].Expiry.ShouldBe(sf1.Expires);
54+
executingFlows[id2].Epoch.ShouldBe(0);
55+
executingFlows[id2].Expiry.ShouldBe(id2Expires);
56+
executingFlows[id3].Epoch.ShouldBe(0);
57+
executingFlows[id3].Expiry.ShouldBe(sf3.Expires);
58+
59+
unhandledExceptionHandler.ShouldNotHaveExceptions();
60+
}
61+
62+
public abstract Task LeaseUpdatersRefreshedCorrectlyOnUnexpectedNumberOfAffectedFlows();
63+
public async Task LeaseUpdatersRefreshedCorrectlyOnUnexpectedNumberOfAffectedFlows(Task<IFunctionStore> storeTask)
64+
{
65+
var beforeTicks = DateTime.UtcNow.Ticks;
66+
var leaseLength = TimeSpan.FromSeconds(120);
67+
var store = await storeTask;
68+
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
69+
var handler = new UnhandledExceptionHandler(unhandledExceptionHandler.Catch);
70+
var leaseUpdaters = new LeaseUpdatersForLeaseLength(leaseLength, store, handler);
71+
72+
var id1 = TestStoredId.Create();
73+
await store.CreateFunction(id1, id1.ToString(), param: null, leaseExpiration: 0, postponeUntil: null, timestamp: DateTime.UtcNow.Ticks, parent: null).ShouldBeTrueAsync();
74+
75+
var id2 = TestStoredId.Create();
76+
var id2Expires = 1_000_000_000_000_000_000L;
77+
await store.CreateFunction(id2, id2.ToString(), param: null, leaseExpiration: id2Expires, postponeUntil: null, timestamp: DateTime.UtcNow.Ticks, parent: null).ShouldBeTrueAsync();
78+
79+
var id3 = TestStoredId.Create();
80+
await store.CreateFunction(id3, id3.ToString(), param: null, leaseExpiration: 0, postponeUntil: null, timestamp: DateTime.UtcNow.Ticks, parent: null).ShouldBeTrueAsync();
81+
await store.RestartExecution(id3, expectedEpoch: 0, leaseExpiration: 0).ShouldNotBeNullAsync();
82+
83+
leaseUpdaters.Set(id1, epoch: 0, expiresTicks: 0);
84+
leaseUpdaters.Set(id2, epoch: 0, expiresTicks: id2Expires);
85+
leaseUpdaters.Set(id3, epoch: 0, expiresTicks: 0);
86+
await leaseUpdaters.RenewLeases();
87+
88+
var sf1 = await store.GetFunction(id1).ShouldNotBeNullAsync();
89+
sf1.Expires.ShouldBeGreaterThan(beforeTicks);
90+
91+
var sf2 = await store.GetFunction(id2).ShouldNotBeNullAsync();
92+
sf2.Expires.ShouldBe(id2Expires);
93+
94+
var sf3 = await store.GetFunction(id3).ShouldNotBeNullAsync();
95+
sf3.Expires.ShouldBe(0);
96+
97+
var executingFlows = leaseUpdaters.GetExecutingFlows();
98+
executingFlows.Count.ShouldBe(2);
99+
executingFlows[id1].Epoch.ShouldBe(0);
100+
executingFlows[id1].Expiry.ShouldBe(sf1.Expires);
101+
executingFlows[id2].Epoch.ShouldBe(0);
102+
executingFlows[id2].Expiry.ShouldBe(id2Expires);
103+
104+
unhandledExceptionHandler.ShouldNotHaveExceptions();
105+
}
106+
107+
public abstract Task LeaseUpdatersRepositoryThrowsResultsInUnhandledException();
108+
public async Task LeaseUpdatersRepositoryThrowsResultsInUnhandledException(Task<IFunctionStore> storeTask)
109+
{
110+
var leaseLength = TimeSpan.FromSeconds(120);
111+
var store = new CrashableFunctionStore(await storeTask);
112+
113+
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
114+
var handler = new UnhandledExceptionHandler(unhandledExceptionHandler.Catch);
115+
var leaseUpdaters = new LeaseUpdatersForLeaseLength(leaseLength, store, handler);
116+
117+
var id1 = TestStoredId.Create();
118+
await store.CreateFunction(id1, id1.ToString(), param: null, leaseExpiration: 0, postponeUntil: null, timestamp: DateTime.UtcNow.Ticks, parent: null).ShouldBeTrueAsync();
119+
120+
leaseUpdaters.Set(id1, epoch: 0, expiresTicks: 0);
121+
store.Crash();
122+
123+
await leaseUpdaters.RenewLeases();
124+
125+
unhandledExceptionHandler.ThrownExceptions.ShouldNotBeEmpty();
126+
}
127+
128+
public abstract Task RunningLeaseUpdatersCanBeDisposed();
129+
public async Task RunningLeaseUpdatersCanBeDisposed(Task<IFunctionStore> storeTask)
130+
{
131+
var leaseLength = TimeSpan.FromSeconds(120);
132+
var store = new CrashableFunctionStore(await storeTask);
133+
134+
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
135+
var handler = new UnhandledExceptionHandler(unhandledExceptionHandler.Catch);
136+
var leaseUpdaters = new LeaseUpdatersForLeaseLength(leaseLength, store, handler);
137+
138+
var leaseUpdatersTask = leaseUpdaters.Start();
139+
var stopWatch = Stopwatch.StartNew();
140+
await leaseUpdaters.DisposeAsync();
141+
stopWatch.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(5));
142+
143+
leaseUpdatersTask.Wait(TimeSpan.FromSeconds(1));
144+
}
145+
146+
public abstract Task FilterOutContainsFiltersOutActiveFlows();
147+
public async Task FilterOutContainsFiltersOutActiveFlows(Task<IFunctionStore> storeTask)
148+
{
149+
var leaseLength = TimeSpan.FromSeconds(120);
150+
var store = await storeTask;
151+
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
152+
var handler = new UnhandledExceptionHandler(unhandledExceptionHandler.Catch);
153+
var leaseUpdaters = new LeaseUpdatersForLeaseLength(leaseLength, store, handler);
154+
155+
var id1 = TestStoredId.Create();
156+
var id2 = TestStoredId.Create();
157+
var id3 = TestStoredId.Create();
158+
159+
leaseUpdaters.Set(id1, epoch: 0, expiresTicks: 0);
160+
leaseUpdaters.Set(id2, epoch: 0, expiresTicks: 0);
161+
162+
var filtered = leaseUpdaters.FindAlreadyContains([
163+
new IdAndEpoch(id1, Epoch: 0),
164+
new IdAndEpoch(id3, Epoch: 0),
165+
]);
166+
167+
filtered.Single().FlowId.ShouldBe(id1);
168+
169+
unhandledExceptionHandler.ShouldNotHaveExceptions();
170+
}
171+
172+
public abstract Task FilterOutContainsReturnsSameCollectionUnmodifiedWhenNoFilterIsPerformed();
173+
public async Task FilterOutContainsReturnsSameCollectionUnmodifiedWhenNoFilterIsPerformed(Task<IFunctionStore> storeTask)
174+
{
175+
var leaseLength = TimeSpan.FromSeconds(120);
176+
var store = await storeTask;
177+
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
178+
var handler = new UnhandledExceptionHandler(unhandledExceptionHandler.Catch);
179+
var leaseUpdaters = new LeaseUpdatersForLeaseLength(leaseLength, store, handler);
180+
181+
var id1 = TestStoredId.Create();
182+
var id2 = TestStoredId.Create();
183+
var id3 = TestStoredId.Create();
184+
185+
leaseUpdaters.Set(id1, epoch: 0, expiresTicks: 0);
186+
187+
var idAndEpoches = new List<IdAndEpoch>
188+
{
189+
new(id2, Epoch: 0),
190+
new(id3, Epoch: 0),
191+
};
192+
var filtered = leaseUpdaters.FindAlreadyContains(idAndEpoches);
193+
filtered.Count.ShouldBe(0);
194+
195+
unhandledExceptionHandler.ShouldNotHaveExceptions();
196+
}
197+
198+
public abstract Task LeaseUpdatersFiltersOutAlreadyContains();
199+
public async Task LeaseUpdatersFiltersOutAlreadyContains(Task<IFunctionStore> storeTask)
200+
{
201+
var leaseLength120 = TimeSpan.FromSeconds(120);
202+
var leaseLength125 = TimeSpan.FromSeconds(125);
203+
var store = await storeTask;
204+
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
205+
var handler = new UnhandledExceptionHandler(unhandledExceptionHandler.Catch);
206+
var leaseUpdaters = new LeaseUpdaters(store, handler);
207+
208+
var leaseUpdatersForLeaseLength120 = leaseUpdaters.GetOrCreateLeaseUpdatersForLeaseLength(leaseLength120);
209+
var leaseUpdatersForLeaseLength125 = leaseUpdaters.GetOrCreateLeaseUpdatersForLeaseLength(leaseLength125);
210+
211+
var id1 = TestStoredId.Create();
212+
var id2 = TestStoredId.Create();
213+
var id3 = TestStoredId.Create();
214+
var id4 = TestStoredId.Create();
215+
216+
leaseUpdatersForLeaseLength120.Set(id1, epoch: 0, expiresTicks: 0);
217+
leaseUpdatersForLeaseLength125.Set(id2, epoch: 0, expiresTicks: 0);
218+
219+
var idAndEpoches = new List<IdAndEpoch>
220+
{
221+
new(id1, Epoch: 0),
222+
new(id2, Epoch: 0),
223+
new(id3, Epoch: 0),
224+
new(id4, Epoch: 0),
225+
};
226+
var filtered = leaseUpdaters.FilterOutContains(idAndEpoches);
227+
filtered.Count.ShouldBe(2);
228+
filtered.Single(i => i.FlowId == id3).Epoch.ShouldBe(0);
229+
filtered.Single(i => i.FlowId == id4).Epoch.ShouldBe(0);
230+
231+
unhandledExceptionHandler.ShouldNotHaveExceptions();
232+
}
233+
234+
public abstract Task LeaseUpdatersReturnsSameReferenceWhenFiltersWhenThereAreNoAlreadyContains();
235+
public async Task LeaseUpdatersReturnsSameReferenceWhenFiltersWhenThereAreNoAlreadyContains(Task<IFunctionStore> storeTask)
236+
{
237+
var leaseLength120 = TimeSpan.FromSeconds(120);
238+
var leaseLength125 = TimeSpan.FromSeconds(125);
239+
var store = await storeTask;
240+
var unhandledExceptionHandler = new UnhandledExceptionCatcher();
241+
var handler = new UnhandledExceptionHandler(unhandledExceptionHandler.Catch);
242+
var leaseUpdaters = new LeaseUpdaters(store, handler);
243+
244+
var leaseUpdatersForLeaseLength120 = leaseUpdaters.GetOrCreateLeaseUpdatersForLeaseLength(leaseLength120);
245+
var leaseUpdatersForLeaseLength125 = leaseUpdaters.GetOrCreateLeaseUpdatersForLeaseLength(leaseLength125);
246+
247+
var id1 = TestStoredId.Create();
248+
var id2 = TestStoredId.Create();
249+
var id3 = TestStoredId.Create();
250+
var id4 = TestStoredId.Create();
251+
252+
leaseUpdatersForLeaseLength120.Set(id1, epoch: 0, expiresTicks: 0);
253+
leaseUpdatersForLeaseLength125.Set(id2, epoch: 0, expiresTicks: 0);
254+
255+
var idAndEpoches = new List<IdAndEpoch>
256+
{
257+
new(id3, Epoch: 0),
258+
new(id4, Epoch: 0),
259+
};
260+
var filtered = leaseUpdaters.FilterOutContains(idAndEpoches);
261+
ReferenceEquals(idAndEpoches, filtered).ShouldBeTrue();
262+
263+
unhandledExceptionHandler.ShouldNotHaveExceptions();
264+
}
265+
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/LeaseUpdater.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ internal class LeaseUpdater : IDisposable
1414
private readonly int _epoch;
1515

1616
private readonly TimeSpan _leaseLength;
17-
private readonly LeaseUpdaters _leaseUpdaters;
17+
private readonly LeaseUpdatersForLeaseLength _leaseUpdaters;
1818

1919
private readonly IFunctionStore _functionStore;
2020
private readonly UnhandledExceptionHandler _unhandledExceptionHandler;
@@ -28,7 +28,7 @@ private LeaseUpdater(
2828
IFunctionStore functionStore,
2929
UnhandledExceptionHandler unhandledExceptionHandler,
3030
TimeSpan leaseLength,
31-
LeaseUpdaters leaseUpdaters)
31+
LeaseUpdatersForLeaseLength leaseUpdaters)
3232
{
3333
_flowId = flowId;
3434
_storedId = storedId;
@@ -45,7 +45,8 @@ public static IDisposable CreateAndStart(
4545
IFunctionStore functionStore, SettingsWithDefaults settings,
4646
LeaseUpdaters leaseUpdaters)
4747
{
48-
leaseUpdaters.Add(storedId);
48+
var leaseUpdatersForLeaseLength = leaseUpdaters.GetOrCreateLeaseUpdatersForLeaseLength(settings.LeaseLength);
49+
leaseUpdatersForLeaseLength.Set(storedId, epoch);
4950

5051
var leaseUpdater = new LeaseUpdater(
5152
flowId,
@@ -54,7 +55,7 @@ public static IDisposable CreateAndStart(
5455
functionStore,
5556
settings.UnhandledExceptionHandler,
5657
leaseLength: settings.LeaseLength,
57-
leaseUpdaters
58+
leaseUpdatersForLeaseLength
5859
);
5960

6061
Task.Run(leaseUpdater.Start);
@@ -106,10 +107,7 @@ private async Task Start()
106107
RemoveFromLeaseUpdaters();
107108
}
108109

109-
private void RemoveFromLeaseUpdaters()
110-
{
111-
_leaseUpdaters?.Remove(_storedId);
112-
}
110+
private void RemoveFromLeaseUpdaters() => _leaseUpdaters.ConditionalRemove(_storedId, _epoch);
113111

114112
public void Dispose()
115113
{

0 commit comments

Comments
 (0)