Skip to content

Commit a2fab73

Browse files
committed
Implemented replica's function ownership method
1 parent 8767ef6 commit a2fab73

File tree

7 files changed

+91
-30
lines changed

7 files changed

+91
-30
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,8 @@ public override Task StrikedOutReplicasFunctionIsPostponedAfterCrash()
4545
[TestMethod]
4646
public override Task ReplicaWatchdogUpdatesHeartbeat()
4747
=> ReplicaWatchdogUpdatesHeartbeat(FunctionStoreFactory.Create());
48+
49+
[TestMethod]
50+
public override Task WorkIsDividedBetweenReplicas()
51+
=> WorkIsDividedBetweenReplicas(FunctionStoreFactory.Create());
4852
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 52 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -168,11 +168,11 @@ public async Task ReplicaIdOffsetIfCalculatedCorrectly(Task<IFunctionStore> stor
168168
await watchdog3.Initialize();
169169

170170
await watchdog3.PerformIteration();
171-
replicaId3.Offset.ShouldBe(2);
171+
replicaId3.Offset.ShouldBe((ulong) 2);
172172
await watchdog2.PerformIteration();
173-
replicaId2.Offset.ShouldBe(1);
173+
replicaId2.Offset.ShouldBe((ulong) 1);
174174
await watchdog1.PerformIteration();
175-
replicaId1.Offset.ShouldBe(0);
175+
replicaId1.Offset.ShouldBe((ulong) 0);
176176
}
177177

178178
public abstract Task ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted();
@@ -190,38 +190,38 @@ public async Task ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted(Task<IFuncti
190190
var watchdog3 = new ReplicaWatchdog(cluster3, functionStore, checkFrequency: TimeSpan.FromHours(1), default(UnhandledExceptionHandler)!);
191191

192192
await watchdog3.Initialize();
193-
cluster3.Offset.ShouldBe(0);
194-
cluster3.ReplicaCount.ShouldBe(1);
193+
cluster3.Offset.ShouldBe((ulong) 0);
194+
cluster3.ReplicaCount.ShouldBe((ulong) 1);
195195

196196
await watchdog2.Initialize();
197197
await watchdog3.PerformIteration();
198-
cluster3.Offset.ShouldBe(1);
199-
cluster3.ReplicaCount.ShouldBe(2);
200-
cluster2.Offset.ShouldBe(0);
201-
cluster2.ReplicaCount.ShouldBe(2);
198+
cluster3.Offset.ShouldBe((ulong) 1);
199+
cluster3.ReplicaCount.ShouldBe((ulong) 2);
200+
cluster2.Offset.ShouldBe((ulong) 0);
201+
cluster2.ReplicaCount.ShouldBe((ulong) 2);
202202

203203
await watchdog1.Initialize();
204204
await watchdog2.PerformIteration();
205205
await watchdog3.PerformIteration();
206-
cluster3.Offset.ShouldBe(2);
207-
cluster3.ReplicaCount.ShouldBe(3);
208-
cluster2.Offset.ShouldBe(1);
209-
cluster2.ReplicaCount.ShouldBe(3);
210-
cluster1.Offset.ShouldBe(0);
211-
cluster1.ReplicaCount.ShouldBe(3);
206+
cluster3.Offset.ShouldBe((ulong) 2);
207+
cluster3.ReplicaCount.ShouldBe((ulong) 3);
208+
cluster2.Offset.ShouldBe((ulong) 1);
209+
cluster2.ReplicaCount.ShouldBe((ulong) 3);
210+
cluster1.Offset.ShouldBe((ulong) 0);
211+
cluster1.ReplicaCount.ShouldBe((ulong) 3);
212212

213213
await store.Delete(cluster1.ReplicaId);
214214
await watchdog3.PerformIteration();
215215
await watchdog2.PerformIteration();
216-
cluster3.Offset.ShouldBe(1);
217-
cluster3.ReplicaCount.ShouldBe(2);
218-
cluster2.Offset.ShouldBe(0);
219-
cluster2.ReplicaCount.ShouldBe(2);
216+
cluster3.Offset.ShouldBe((ulong) 1);
217+
cluster3.ReplicaCount.ShouldBe((ulong) 2);
218+
cluster2.Offset.ShouldBe((ulong) 0);
219+
cluster2.ReplicaCount.ShouldBe((ulong) 2);
220220

221221
await store.Delete(cluster2.ReplicaId);
222222
await watchdog3.PerformIteration();
223-
cluster3.Offset.ShouldBe(0);
224-
cluster3.ReplicaCount.ShouldBe(1);
223+
cluster3.Offset.ShouldBe((ulong) 0);
224+
cluster3.ReplicaCount.ShouldBe((ulong) 1);
225225
}
226226

227227
public abstract Task ActiveReplicasDoNotDeleteEachOther();
@@ -328,4 +328,35 @@ public async Task ReplicaWatchdogUpdatesHeartbeat(Task<IFunctionStore> storeTask
328328
storedReplicas.Single().Heartbeat.ShouldBeGreaterThan(1);
329329

330330
}
331+
332+
public abstract Task WorkIsDividedBetweenReplicas();
333+
public Task WorkIsDividedBetweenReplicas(Task<IFunctionStore> storeTask)
334+
{
335+
var replicaId1 = ReplicaId.NewId();
336+
var clusterInfo = new ClusterInfo(replicaId1)
337+
{
338+
ReplicaCount = 3
339+
};
340+
341+
var storedIds = Enumerable
342+
.Range(0, 10)
343+
.Select(i => new StoredId(new StoredType(0), i.ToString().ToStoredInstance()))
344+
.ToList();
345+
346+
//offset 0
347+
var owned = storedIds.Select(clusterInfo.OwnedByThisReplica).ToList();
348+
owned.Any().ShouldBeTrue();
349+
350+
//offset 1
351+
clusterInfo.Offset = 1;
352+
owned = storedIds.Select(clusterInfo.OwnedByThisReplica).ToList();
353+
owned.Any().ShouldBeTrue();
354+
355+
//offset 2
356+
clusterInfo.Offset = 2;
357+
owned = storedIds.Select(clusterInfo.OwnedByThisReplica).ToList();
358+
owned.Any().ShouldBeTrue();
359+
360+
return Task.CompletedTask;
361+
}
331362
}

Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/ReplicaWatchdog.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ public async Task Initialize()
4646
foreach (var crashedReplicaId in crashedReplicas)
4747
await functionStore.RescheduleCrashedFunctions(crashedReplicaId);
4848

49-
clusterInfo.ReplicaCount = replicas.Count;
50-
clusterInfo.Offset = offset.Value;
49+
clusterInfo.ReplicaCount = (ulong) replicas.Count;
50+
clusterInfo.Offset = (ulong) offset.Value;
5151
_initialized = true;
5252
}
5353

@@ -77,8 +77,8 @@ public async Task PerformIteration()
7777

7878
if (offset is not null)
7979
{
80-
clusterInfo.Offset = offset.Value;
81-
clusterInfo.ReplicaCount = storedReplicas.Count;
80+
clusterInfo.Offset = (ulong) offset.Value;
81+
clusterInfo.ReplicaCount = (ulong) storedReplicas.Count;
8282
}
8383
else
8484
{

Core/Cleipnir.ResilientFunctions/Domain/ClusterInfo.cs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
1-
using System.Threading;
1+
using System;
2+
using System.Security.Cryptography;
3+
using System.Text;
4+
using System.Threading;
5+
using Cleipnir.ResilientFunctions.Storage;
26

37
namespace Cleipnir.ResilientFunctions.Domain;
48

59
public class ClusterInfo
610
{
711
public ReplicaId ReplicaId { get; }
812

9-
private int _offset;
10-
public int Offset
13+
private ulong _offset;
14+
public ulong Offset
1115
{
1216
get
1317
{
@@ -21,8 +25,8 @@ internal set
2125
}
2226
}
2327

24-
private int _replicaCount;
25-
public int ReplicaCount
28+
private ulong _replicaCount;
29+
public ulong ReplicaCount
2630
{
2731
get
2832
{
@@ -39,4 +43,14 @@ internal set
3943
private readonly Lock _sync = new();
4044

4145
public ClusterInfo(ReplicaId replicaId) => ReplicaId = replicaId;
46+
47+
public bool OwnedByThisReplica(StoredId storedId)
48+
{
49+
var serializedStoredId = storedId.Serialize();
50+
using SHA256 sha256 = SHA256.Create();
51+
var hashBytes = sha256.ComputeHash(Encoding.UTF8.GetBytes(serializedStoredId));
52+
var number = BitConverter.ToUInt64(hashBytes);
53+
var owner = number % ReplicaCount;
54+
return Offset == owner;
55+
}
4256
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,8 @@ public override Task StrikedOutReplicasFunctionIsPostponedAfterCrash()
4545
[TestMethod]
4646
public override Task ReplicaWatchdogUpdatesHeartbeat()
4747
=> ReplicaWatchdogUpdatesHeartbeat(FunctionStoreFactory.Create());
48+
49+
[TestMethod]
50+
public override Task WorkIsDividedBetweenReplicas()
51+
=> WorkIsDividedBetweenReplicas(FunctionStoreFactory.Create());
4852
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,8 @@ public override Task StrikedOutReplicasFunctionIsPostponedAfterCrash()
4545
[TestMethod]
4646
public override Task ReplicaWatchdogUpdatesHeartbeat()
4747
=> ReplicaWatchdogUpdatesHeartbeat(FunctionStoreFactory.Create());
48+
49+
[TestMethod]
50+
public override Task WorkIsDividedBetweenReplicas()
51+
=> WorkIsDividedBetweenReplicas(FunctionStoreFactory.Create());
4852
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,8 @@ public override Task StrikedOutReplicasFunctionIsPostponedAfterCrash()
4545
[TestMethod]
4646
public override Task ReplicaWatchdogUpdatesHeartbeat()
4747
=> ReplicaWatchdogUpdatesHeartbeat(FunctionStoreFactory.Create());
48+
49+
[TestMethod]
50+
public override Task WorkIsDividedBetweenReplicas()
51+
=> WorkIsDividedBetweenReplicas(FunctionStoreFactory.Create());
4852
}

0 commit comments

Comments
 (0)