Skip to content

Commit 2b2f683

Browse files
committed
ReplicaWatchdog reschedules crashed replica functions on detection
1 parent af85182 commit 2b2f683

File tree

6 files changed

+119
-51
lines changed

6 files changed

+119
-51
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,8 @@ public override Task ActiveReplicasDoNotDeleteEachOther()
3737
[TestMethod]
3838
public override Task NonExistingReplicaIdOffsetIsNull()
3939
=> NonExistingReplicaIdOffsetIsNull(FunctionStoreFactory.Create());
40+
41+
[TestMethod]
42+
public override Task StrikedOutReplicasFunctionIsPostponedAfterCrash()
43+
=> StrikedOutReplicasFunctionIsPostponedAfterCrash(FunctionStoreFactory.Create());
4044
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 80 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Cleipnir.ResilientFunctions.Domain;
66
using Cleipnir.ResilientFunctions.Helpers;
77
using Cleipnir.ResilientFunctions.Storage;
8+
using Cleipnir.ResilientFunctions.Tests.Utils;
89
using Shouldly;
910

1011
namespace Cleipnir.ResilientFunctions.Tests.TestTemplates.WatchDogsTests;
@@ -14,13 +15,13 @@ public abstract class ReplicaWatchdogTests
1415
public abstract Task SunshineScenario();
1516
public async Task SunshineScenario(Task<IFunctionStore> storeTask)
1617
{
17-
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
18+
var functionStore = await storeTask;
19+
var store = functionStore.ReplicaStore;
1820
var replicaId1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000").ToReplicaId());
1921
using var watchdog1 = new ReplicaWatchdog(
2022
replicaId1,
21-
store,
22-
checkFrequency: TimeSpan.FromHours(1),
23-
onStrikeOut: _ => {}
23+
functionStore,
24+
checkFrequency: TimeSpan.FromHours(1)
2425
);
2526
await watchdog1.Initialize();
2627
var allReplicas = await store.GetAll();
@@ -31,9 +32,8 @@ public async Task SunshineScenario(Task<IFunctionStore> storeTask)
3132
var replicaId2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000").ToReplicaId());
3233
using var watchdog2 = new ReplicaWatchdog(
3334
replicaId2,
34-
store,
35-
checkFrequency: TimeSpan.FromHours(1),
36-
onStrikeOut: _ => {}
35+
functionStore,
36+
checkFrequency: TimeSpan.FromHours(1)
3737
);
3838
await watchdog2.Initialize();
3939
allReplicas = await store.GetAll();
@@ -68,13 +68,13 @@ public async Task SunshineScenario(Task<IFunctionStore> storeTask)
6868
public abstract Task ReplicaWatchdogStartResultsInAddedReplicaInStore();
6969
public async Task ReplicaWatchdogStartResultsInAddedReplicaInStore(Task<IFunctionStore> storeTask)
7070
{
71-
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
71+
var functionStore = await storeTask;
72+
var store = functionStore.ReplicaStore;
7273
var replicaId1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000").ToReplicaId());
7374
using var watchdog1 = new ReplicaWatchdog(
7475
replicaId1,
75-
store,
76-
checkFrequency: TimeSpan.FromHours(1),
77-
onStrikeOut: _ => {}
76+
functionStore,
77+
checkFrequency: TimeSpan.FromHours(1)
7878
);
7979
await watchdog1.Start();
8080
var allReplicas = await store.GetAll();
@@ -83,9 +83,8 @@ public async Task ReplicaWatchdogStartResultsInAddedReplicaInStore(Task<IFunctio
8383
var replicaId2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000").ToReplicaId());
8484
using var watchdog2 = new ReplicaWatchdog(
8585
replicaId2,
86-
store,
87-
checkFrequency: TimeSpan.FromHours(1),
88-
onStrikeOut: _ => {}
86+
functionStore,
87+
checkFrequency: TimeSpan.FromHours(1)
8988
);
9089
await watchdog2.Start();
9190
allReplicas = await store.GetAll();
@@ -95,24 +94,23 @@ public async Task ReplicaWatchdogStartResultsInAddedReplicaInStore(Task<IFunctio
9594
public abstract Task StrikedOutReplicaIsRemovedFromStore();
9695
public async Task StrikedOutReplicaIsRemovedFromStore(Task<IFunctionStore> storeTask)
9796
{
98-
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
97+
var functionStore = await storeTask;
98+
var store = functionStore.ReplicaStore;
9999
var toBeStrikedOut = ReplicaId.NewId();
100-
ReplicaId? strikedOut = null;
100+
101101
await store.Insert(toBeStrikedOut);
102102
var replicaId1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000").ToReplicaId());
103103
using var watchdog1 = new ReplicaWatchdog(
104104
replicaId1,
105-
store,
106-
checkFrequency: TimeSpan.FromHours(1),
107-
onStrikeOut: id => strikedOut = id
105+
functionStore,
106+
checkFrequency: TimeSpan.FromHours(1)
108107
);
109108
await watchdog1.Initialize();
110109
await watchdog1.PerformIteration();
111-
strikedOut.ShouldBeNull();
110+
await store.GetAll().SelectAsync(rs => rs.Count == 2).ShouldBeTrueAsync();
112111
await watchdog1.PerformIteration();
113-
strikedOut.ShouldBeNull();
112+
await store.GetAll().SelectAsync(rs => rs.Count == 2).ShouldBeTrueAsync();
114113
await watchdog1.PerformIteration();
115-
strikedOut.ShouldBe(toBeStrikedOut);
116114

117115
var all = await store.GetAll();
118116
all.Count.ShouldBe(1);
@@ -122,14 +120,14 @@ public async Task StrikedOutReplicaIsRemovedFromStore(Task<IFunctionStore> store
122120
public abstract Task RunningWatchdogUpdatesItsOwnHeartbeat();
123121
public async Task RunningWatchdogUpdatesItsOwnHeartbeat(Task<IFunctionStore> storeTask)
124122
{
125-
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
126-
var anyStrikesOut = false;
123+
var functionStore = await storeTask;
124+
var store = functionStore.ReplicaStore;
125+
127126
var replicaId1 = new ClusterInfo(ReplicaId.NewId());
128127
using var watchdog1 = new ReplicaWatchdog(
129128
replicaId1,
130-
store,
131-
checkFrequency: TimeSpan.FromMilliseconds(100),
132-
onStrikeOut: _ => anyStrikesOut = true
129+
functionStore,
130+
checkFrequency: TimeSpan.FromMilliseconds(100)
133131
);
134132

135133
await watchdog1.Start();
@@ -142,22 +140,21 @@ await BusyWait.Until(async () =>
142140
single.ReplicaId.ShouldBe(replicaId1.ReplicaId);
143141
return single.Heartbeat > 0;
144142
});
145-
146-
anyStrikesOut.ShouldBe(false);
147143
}
148144

149145
public abstract Task ReplicaIdOffsetIfCalculatedCorrectly();
150146
public async Task ReplicaIdOffsetIfCalculatedCorrectly(Task<IFunctionStore> storeTask)
151147
{
152-
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
148+
var store = await storeTask;
149+
var replicaStore = store.ReplicaStore;
153150

154151
var replicaId1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000").ToReplicaId());
155152
var replicaId2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000").ToReplicaId());
156153
var replicaId3 = new ClusterInfo(Guid.Parse("30000000-0000-0000-0000-000000000000").ToReplicaId());
157154

158-
var watchdog1 = new ReplicaWatchdog(replicaId1, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
159-
var watchdog2 = new ReplicaWatchdog(replicaId2, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
160-
var watchdog3 = new ReplicaWatchdog(replicaId3, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
155+
var watchdog1 = new ReplicaWatchdog(replicaId1, store, checkFrequency: TimeSpan.FromHours(1));
156+
var watchdog2 = new ReplicaWatchdog(replicaId2, store, checkFrequency: TimeSpan.FromHours(1));
157+
var watchdog3 = new ReplicaWatchdog(replicaId3, store, checkFrequency: TimeSpan.FromHours(1));
161158

162159
await watchdog1.Initialize();
163160
await watchdog2.Initialize();
@@ -174,15 +171,16 @@ public async Task ReplicaIdOffsetIfCalculatedCorrectly(Task<IFunctionStore> stor
174171
public abstract Task ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted();
175172
public async Task ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted(Task<IFunctionStore> storeTask)
176173
{
177-
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
174+
var functionStore = await storeTask;
175+
var store = functionStore.ReplicaStore;
178176

179177
var cluster1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000").ToReplicaId());
180178
var cluster2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000").ToReplicaId());
181179
var cluster3 = new ClusterInfo(Guid.Parse("30000000-0000-0000-0000-000000000000").ToReplicaId());
182180

183-
var watchdog1 = new ReplicaWatchdog(cluster1, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
184-
var watchdog2 = new ReplicaWatchdog(cluster2, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
185-
var watchdog3 = new ReplicaWatchdog(cluster3, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
181+
var watchdog1 = new ReplicaWatchdog(cluster1, functionStore, checkFrequency: TimeSpan.FromHours(1));
182+
var watchdog2 = new ReplicaWatchdog(cluster2, functionStore, checkFrequency: TimeSpan.FromHours(1));
183+
var watchdog3 = new ReplicaWatchdog(cluster3, functionStore, checkFrequency: TimeSpan.FromHours(1));
186184

187185
await watchdog3.Initialize();
188186
cluster3.Offset.ShouldBe(0);
@@ -222,15 +220,16 @@ public async Task ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted(Task<IFuncti
222220
public abstract Task ActiveReplicasDoNotDeleteEachOther();
223221
public async Task ActiveReplicasDoNotDeleteEachOther(Task<IFunctionStore> storeTask)
224222
{
225-
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
223+
var store = await storeTask;
224+
var replicaStore = store.ReplicaStore;
226225

227226
var cluster1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000").ToReplicaId());
228227
var cluster2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000").ToReplicaId());
229228
var cluster3 = new ClusterInfo(Guid.Parse("30000000-0000-0000-0000-000000000000").ToReplicaId());
230229

231-
var watchdog1 = new ReplicaWatchdog(cluster1, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
232-
var watchdog2 = new ReplicaWatchdog(cluster2, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
233-
var watchdog3 = new ReplicaWatchdog(cluster3, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
230+
var watchdog1 = new ReplicaWatchdog(cluster1, store, checkFrequency: TimeSpan.FromHours(1));
231+
var watchdog2 = new ReplicaWatchdog(cluster2, store, checkFrequency: TimeSpan.FromHours(1));
232+
var watchdog3 = new ReplicaWatchdog(cluster3, store, checkFrequency: TimeSpan.FromHours(1));
234233

235234
await watchdog1.Initialize();
236235
await watchdog2.Initialize();
@@ -248,7 +247,7 @@ public async Task ActiveReplicasDoNotDeleteEachOther(Task<IFunctionStore> storeT
248247
await watchdog1.PerformIteration();
249248
await watchdog2.PerformIteration();
250249

251-
var storedReplicas = await store.GetAll();
250+
var storedReplicas = await replicaStore.GetAll();
252251
storedReplicas.Count.ShouldBe(2);
253252
storedReplicas.Any(sr => sr.ReplicaId == cluster1.ReplicaId).ShouldBeTrue();
254253
storedReplicas.Any(sr => sr.ReplicaId == cluster2.ReplicaId).ShouldBeTrue();
@@ -262,4 +261,43 @@ public Task NonExistingReplicaIdOffsetIsNull(Task<IFunctionStore> storeTask)
262261

263262
return Task.CompletedTask;
264263
}
264+
265+
public abstract Task StrikedOutReplicasFunctionIsPostponedAfterCrash();
266+
public async Task StrikedOutReplicasFunctionIsPostponedAfterCrash(Task<IFunctionStore> storeTask)
267+
{
268+
var functionStore = await storeTask;
269+
270+
var toBeStrikedOut = ReplicaId.NewId();
271+
var storedId = TestStoredId.Create();
272+
await functionStore.CreateFunction(
273+
storedId,
274+
humanInstanceId: "SomeInstanceId",
275+
param: null,
276+
leaseExpiration: DateTime.UtcNow.Ticks,
277+
postponeUntil: null,
278+
timestamp: DateTime.UtcNow.Ticks,
279+
parent: null,
280+
owner: toBeStrikedOut
281+
).ShouldBeTrueAsync();
282+
283+
var store = functionStore.ReplicaStore;
284+
285+
await store.Insert(toBeStrikedOut);
286+
var replicaId1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000").ToReplicaId());
287+
using var watchdog1 = new ReplicaWatchdog(
288+
replicaId1,
289+
functionStore,
290+
checkFrequency: TimeSpan.FromHours(1)
291+
);
292+
await watchdog1.Initialize();
293+
await watchdog1.PerformIteration();
294+
await watchdog1.PerformIteration();
295+
await watchdog1.PerformIteration();
296+
297+
var sf = await functionStore.GetFunction(storedId).ShouldNotBeNullAsync();
298+
sf.Epoch.ShouldBe(1);
299+
sf.Status.ShouldBe(Status.Postponed);
300+
sf.Expires.ShouldBe(0);
301+
sf.OwnerId.ShouldBeNull();
302+
}
265303
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/ReplicaWatchdog.cs

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,18 @@
88

99
namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
1010

11-
internal class ReplicaWatchdog(ClusterInfo clusterInfo, IReplicaStore replicaStore, TimeSpan checkFrequency, Action<ReplicaId> onStrikeOut) : IDisposable
11+
internal class ReplicaWatchdog(ClusterInfo clusterInfo, IFunctionStore functionStore, TimeSpan checkFrequency) : IDisposable
1212
{
1313
private volatile bool _disposed;
1414
private bool _started;
1515
private bool _initialized;
1616
private readonly Dictionary<StoredReplica, int> _strikes = new();
17+
private IReplicaStore ReplicaStore => functionStore.ReplicaStore;
1718

1819
public async Task Start()
1920
{
2021
var originalValue = Interlocked.CompareExchange(ref _started, value: true, comparand: false);
21-
if (originalValue is true)
22+
if (originalValue)
2223
return;
2324

2425
if (!_initialized)
@@ -29,11 +30,20 @@ public async Task Start()
2930

3031
public async Task Initialize()
3132
{
32-
await replicaStore.Insert(clusterInfo.ReplicaId);
33-
var replicas = await replicaStore.GetAll();
33+
await ReplicaStore.Insert(clusterInfo.ReplicaId);
34+
var replicas = await ReplicaStore.GetAll();
3435
var offset = CalculateOffset(replicas.Select(sr => sr.ReplicaId), clusterInfo.ReplicaId);
3536
if (offset is null)
3637
throw new InvalidOperationException("Replica offset was null after initialization");
38+
39+
var ownerReplicas = await functionStore.GetOwnerReplicas();
40+
41+
//handle crashed orphan functions
42+
var crashedReplicas = ownerReplicas
43+
.Where(ownerReplicaId => replicas.All(storedReplica => storedReplica.ReplicaId != ownerReplicaId))
44+
.ToList();
45+
foreach (var crashedReplicaId in crashedReplicas)
46+
await functionStore.RescheduleCrashedFunctions(crashedReplicaId);
3747

3848
clusterInfo.ReplicaCount = replicas.Count;
3949
clusterInfo.Offset = offset.Value;
@@ -51,9 +61,9 @@ private async Task Run()
5161

5262
public async Task PerformIteration()
5363
{
54-
await replicaStore.UpdateHeartbeat(clusterInfo.ReplicaId);
64+
await ReplicaStore.UpdateHeartbeat(clusterInfo.ReplicaId);
5565

56-
var storedReplicas = await replicaStore.GetAll();
66+
var storedReplicas = await ReplicaStore.GetAll();
5767
var offset = CalculateOffset(storedReplicas.Select(sr => sr.ReplicaId), clusterInfo.ReplicaId);
5868

5969
if (offset is not null)
@@ -63,7 +73,7 @@ public async Task PerformIteration()
6373
}
6474
else
6575
{
66-
await replicaStore.Insert(clusterInfo.ReplicaId);
76+
await ReplicaStore.Insert(clusterInfo.ReplicaId);
6777
_strikes.Clear();
6878
await PerformIteration();
6979
}
@@ -107,9 +117,13 @@ private async Task DeleteStrikedOutReplicas()
107117
foreach (var (storedReplica, _) in _strikes.Where(kv => kv.Value >= 2))
108118
{
109119
var strikedOutId = storedReplica.ReplicaId;
110-
await replicaStore.Delete(strikedOutId);
120+
await ReplicaStore.Delete(strikedOutId);
121+
await functionStore.RescheduleCrashedFunctions(strikedOutId);
122+
_ = Task
123+
.Delay(TimeSpan.FromSeconds(5))
124+
.ContinueWith(_ => functionStore.RescheduleCrashedFunctions(strikedOutId));
125+
111126
_strikes.Remove(storedReplica);
112-
onStrikeOut(strikedOutId);
113127
}
114128
}
115129

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,8 @@ public override Task ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted()
3737
[TestMethod]
3838
public override Task ActiveReplicasDoNotDeleteEachOther()
3939
=> ActiveReplicasDoNotDeleteEachOther(FunctionStoreFactory.Create());
40+
41+
[TestMethod]
42+
public override Task StrikedOutReplicasFunctionIsPostponedAfterCrash()
43+
=> StrikedOutReplicasFunctionIsPostponedAfterCrash(FunctionStoreFactory.Create());
4044
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,8 @@ public override Task ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted()
3737
[TestMethod]
3838
public override Task ActiveReplicasDoNotDeleteEachOther()
3939
=> ActiveReplicasDoNotDeleteEachOther(FunctionStoreFactory.Create());
40+
41+
[TestMethod]
42+
public override Task StrikedOutReplicasFunctionIsPostponedAfterCrash()
43+
=> StrikedOutReplicasFunctionIsPostponedAfterCrash(FunctionStoreFactory.Create());
4044
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,8 @@ public override Task ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted()
3737
[TestMethod]
3838
public override Task ActiveReplicasDoNotDeleteEachOther()
3939
=> ActiveReplicasDoNotDeleteEachOther(FunctionStoreFactory.Create());
40+
41+
[TestMethod]
42+
public override Task StrikedOutReplicasFunctionIsPostponedAfterCrash()
43+
=> StrikedOutReplicasFunctionIsPostponedAfterCrash(FunctionStoreFactory.Create());
4044
}

0 commit comments

Comments
 (0)