Skip to content

Commit bcd1006

Browse files
committed
Added ReplicaId value-type
1 parent 08c5c81 commit bcd1006

File tree

11 files changed

+82
-60
lines changed

11 files changed

+82
-60
lines changed

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/ReplicaStoreTests.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Linq;
33
using System.Threading.Tasks;
4+
using Cleipnir.ResilientFunctions.Domain;
45
using Cleipnir.ResilientFunctions.Helpers;
56
using Cleipnir.ResilientFunctions.Storage;
67
using Cleipnir.ResilientFunctions.Tests.Utils;
@@ -15,8 +16,8 @@ protected async Task SunshineScenarioTest(Task<IFunctionStore> storeTask)
1516
{
1617
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
1718
await store.GetAll().ShouldBeEmptyAsync();
18-
var replicaId1 = Guid.NewGuid();
19-
var replicaId2 = Guid.NewGuid();
19+
var replicaId1 = Guid.NewGuid().ToReplicaId();
20+
var replicaId2 = Guid.NewGuid().ToReplicaId();
2021

2122
{
2223
await store.Insert(replicaId1);

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/ReplicaWatchdogTests.cs

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
using Cleipnir.ResilientFunctions.Domain;
66
using Cleipnir.ResilientFunctions.Helpers;
77
using Cleipnir.ResilientFunctions.Storage;
8-
using Cleipnir.ResilientFunctions.Tests.Utils;
98
using Shouldly;
109

1110
namespace Cleipnir.ResilientFunctions.Tests.TestTemplates.WatchDogsTests;
@@ -16,7 +15,7 @@ public abstract class ReplicaWatchdogTests
1615
public async Task SunshineScenario(Task<IFunctionStore> storeTask)
1716
{
1817
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
19-
var replicaId1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000"));
18+
var replicaId1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000").ToReplicaId());
2019
using var watchdog1 = new ReplicaWatchdog(
2120
replicaId1,
2221
store,
@@ -29,7 +28,7 @@ public async Task SunshineScenario(Task<IFunctionStore> storeTask)
2928
var storedReplica1 = allReplicas.Single(sr => sr.ReplicaId == replicaId1.ReplicaId);
3029
storedReplica1.Heartbeat.ShouldBe(0);
3130

32-
var replicaId2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000"));
31+
var replicaId2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000").ToReplicaId());
3332
using var watchdog2 = new ReplicaWatchdog(
3433
replicaId2,
3534
store,
@@ -70,7 +69,7 @@ public async Task SunshineScenario(Task<IFunctionStore> storeTask)
7069
public async Task ReplicaWatchdogStartResultsInAddedReplicaInStore(Task<IFunctionStore> storeTask)
7170
{
7271
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
73-
var replicaId1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000"));
72+
var replicaId1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000").ToReplicaId());
7473
using var watchdog1 = new ReplicaWatchdog(
7574
replicaId1,
7675
store,
@@ -81,7 +80,7 @@ public async Task ReplicaWatchdogStartResultsInAddedReplicaInStore(Task<IFunctio
8180
var allReplicas = await store.GetAll();
8281
allReplicas.Count.ShouldBe(1);
8382

84-
var replicaId2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000"));
83+
var replicaId2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000").ToReplicaId());
8584
using var watchdog2 = new ReplicaWatchdog(
8685
replicaId2,
8786
store,
@@ -97,10 +96,10 @@ public async Task ReplicaWatchdogStartResultsInAddedReplicaInStore(Task<IFunctio
9796
public async Task StrikedOutReplicaIsRemovedFromStore(Task<IFunctionStore> storeTask)
9897
{
9998
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
100-
var toBeStrikedOut = Guid.NewGuid();
101-
Guid? strikedOut = null;
99+
var toBeStrikedOut = ReplicaId.NewId();
100+
ReplicaId? strikedOut = null;
102101
await store.Insert(toBeStrikedOut);
103-
var replicaId1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000"));
102+
var replicaId1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000").ToReplicaId());
104103
using var watchdog1 = new ReplicaWatchdog(
105104
replicaId1,
106105
store,
@@ -125,7 +124,7 @@ public async Task RunningWatchdogUpdatesItsOwnHeartbeat(Task<IFunctionStore> sto
125124
{
126125
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
127126
var anyStrikesOut = false;
128-
var replicaId1 = new ClusterInfo(Guid.NewGuid());
127+
var replicaId1 = new ClusterInfo(ReplicaId.NewId());
129128
using var watchdog1 = new ReplicaWatchdog(
130129
replicaId1,
131130
store,
@@ -152,9 +151,9 @@ public async Task ReplicaIdOffsetIfCalculatedCorrectly(Task<IFunctionStore> stor
152151
{
153152
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
154153

155-
var replicaId1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000"));
156-
var replicaId2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000"));
157-
var replicaId3 = new ClusterInfo(Guid.Parse("30000000-0000-0000-0000-000000000000"));
154+
var replicaId1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000").ToReplicaId());
155+
var replicaId2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000").ToReplicaId());
156+
var replicaId3 = new ClusterInfo(Guid.Parse("30000000-0000-0000-0000-000000000000").ToReplicaId());
158157

159158
var watchdog1 = new ReplicaWatchdog(replicaId1, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
160159
var watchdog2 = new ReplicaWatchdog(replicaId2, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
@@ -177,9 +176,9 @@ public async Task ReplicaIdOffsetIsUpdatedWhenNodeIsAddedAndDeleted(Task<IFuncti
177176
{
178177
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
179178

180-
var cluster1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000"));
181-
var cluster2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000"));
182-
var cluster3 = new ClusterInfo(Guid.Parse("30000000-0000-0000-0000-000000000000"));
179+
var cluster1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000").ToReplicaId());
180+
var cluster2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000").ToReplicaId());
181+
var cluster3 = new ClusterInfo(Guid.Parse("30000000-0000-0000-0000-000000000000").ToReplicaId());
183182

184183
var watchdog1 = new ReplicaWatchdog(cluster1, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
185184
var watchdog2 = new ReplicaWatchdog(cluster2, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
@@ -225,9 +224,9 @@ public async Task ActiveReplicasDoNotDeleteEachOther(Task<IFunctionStore> storeT
225224
{
226225
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
227226

228-
var cluster1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000"));
229-
var cluster2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000"));
230-
var cluster3 = new ClusterInfo(Guid.Parse("30000000-0000-0000-0000-000000000000"));
227+
var cluster1 = new ClusterInfo(Guid.Parse("10000000-0000-0000-0000-000000000000").ToReplicaId());
228+
var cluster2 = new ClusterInfo(Guid.Parse("20000000-0000-0000-0000-000000000000").ToReplicaId());
229+
var cluster3 = new ClusterInfo(Guid.Parse("30000000-0000-0000-0000-000000000000").ToReplicaId());
231230

232231
var watchdog1 = new ReplicaWatchdog(cluster1, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
233232
var watchdog2 = new ReplicaWatchdog(cluster2, store, checkFrequency: TimeSpan.FromHours(1), onStrikeOut: _ => { });
@@ -258,7 +257,7 @@ public async Task ActiveReplicasDoNotDeleteEachOther(Task<IFunctionStore> storeT
258257
public abstract Task NonExistingReplicaIdOffsetIsNull();
259258
public Task NonExistingReplicaIdOffsetIsNull(Task<IFunctionStore> storeTask)
260259
{
261-
var offset = ReplicaWatchdog.CalculateOffset(allReplicaIds: [], ownReplicaId: Guid.NewGuid());
260+
var offset = ReplicaWatchdog.CalculateOffset(allReplicaIds: [], ownReplicaId: ReplicaId.NewId());
262261
offset.ShouldBeNull();
263262

264263
return Task.CompletedTask;

Core/Cleipnir.ResilientFunctions/CoreRuntime/Watchdogs/ReplicaWatchdog.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
namespace Cleipnir.ResilientFunctions.CoreRuntime.Watchdogs;
1010

11-
internal class ReplicaWatchdog(ClusterInfo clusterInfo, IReplicaStore replicaStore, TimeSpan checkFrequency, Action<Guid> onStrikeOut) : IDisposable
11+
internal class ReplicaWatchdog(ClusterInfo clusterInfo, IReplicaStore replicaStore, TimeSpan checkFrequency, Action<ReplicaId> onStrikeOut) : IDisposable
1212
{
1313
private volatile bool _disposed;
1414
private bool _started;
@@ -75,7 +75,7 @@ public async Task PerformIteration()
7575
await DeleteStrikedOutReplicas();
7676
}
7777

78-
public static int? CalculateOffset(IEnumerable<Guid> allReplicaIds, Guid ownReplicaId)
78+
public static int? CalculateOffset(IEnumerable<ReplicaId> allReplicaIds, ReplicaId ownReplicaId)
7979
=> allReplicaIds
8080
.Select(s => s)
8181
.Order()
@@ -104,7 +104,7 @@ private void IncrementStrikesCount()
104104

105105
private async Task DeleteStrikedOutReplicas()
106106
{
107-
foreach (var (storedReplica, strikes) in _strikes.Where(kv => kv.Value >= 2))
107+
foreach (var (storedReplica, _) in _strikes.Where(kv => kv.Value >= 2))
108108
{
109109
var strikedOutId = storedReplica.ReplicaId;
110110
await replicaStore.Delete(strikedOutId);

Core/Cleipnir.ResilientFunctions/Domain/ClusterInfo.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
using System;
2-
using System.Threading;
1+
using System.Threading;
32

43
namespace Cleipnir.ResilientFunctions.Domain;
54

65
public class ClusterInfo
76
{
8-
public Guid ReplicaId { get; }
7+
public ReplicaId ReplicaId { get; }
98

109
private int _offset;
1110
public int Offset
@@ -39,5 +38,5 @@ public int ReplicaCount
3938

4039
private readonly Lock _sync = new();
4140

42-
public ClusterInfo(Guid replicaId) => ReplicaId = replicaId;
41+
public ClusterInfo(ReplicaId replicaId) => ReplicaId = replicaId;
4342
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using System;
2+
3+
namespace Cleipnir.ResilientFunctions.Domain;
4+
5+
public record ReplicaId(Guid AsGuid) : IComparable<ReplicaId>
6+
{
7+
public static ReplicaId NewId() => new ReplicaId(Guid.NewGuid());
8+
9+
public int CompareTo(ReplicaId? other)
10+
{
11+
if (ReferenceEquals(this, other)) return 0;
12+
if (other is null) return 1;
13+
return AsGuid.CompareTo(other.AsGuid);
14+
}
15+
}
16+
17+
public static class ReplicaIdExtensions
18+
{
19+
public static ReplicaId ToReplicaId(this Guid replicaId) => new(replicaId);
20+
}
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
using System;
21
using System.Collections.Generic;
32
using System.Threading.Tasks;
3+
using Cleipnir.ResilientFunctions.Domain;
44

55
namespace Cleipnir.ResilientFunctions.Storage;
66

77
public interface IReplicaStore
88
{
99
public Task Initialize();
10-
public Task Insert(Guid replicaId);
11-
public Task Delete(Guid replicaId);
12-
public Task UpdateHeartbeat(Guid replicaId);
10+
public Task Insert(ReplicaId replicaId);
11+
public Task Delete(ReplicaId replicaId);
12+
public Task UpdateHeartbeat(ReplicaId replicaId);
1313
public Task<IReadOnlyList<StoredReplica>> GetAll();
1414
}

Core/Cleipnir.ResilientFunctions/Storage/InMemoryReplicaStore.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,36 @@
1-
using System;
2-
using System.Collections.Generic;
1+
using System.Collections.Generic;
32
using System.Linq;
43
using System.Threading;
54
using System.Threading.Tasks;
5+
using Cleipnir.ResilientFunctions.Domain;
66
using Cleipnir.ResilientFunctions.Helpers;
77

88
namespace Cleipnir.ResilientFunctions.Storage;
99

1010
public class InMemoryReplicaStore : IReplicaStore
1111
{
12-
private readonly Dictionary<Guid, int> _replicas = new();
12+
private readonly Dictionary<ReplicaId, int> _replicas = new();
1313
private readonly Lock _sync = new();
1414

1515
public Task Initialize() => Task.CompletedTask;
1616

17-
public Task Insert(Guid replicaId)
17+
public Task Insert(ReplicaId replicaId)
1818
{
1919
lock (_sync)
2020
_replicas.TryAdd(replicaId, 0);
2121

2222
return Task.CompletedTask;
2323
}
2424

25-
public Task Delete(Guid replicaId)
25+
public Task Delete(ReplicaId replicaId)
2626
{
2727
lock (_sync)
2828
_replicas.Remove(replicaId);
2929

3030
return Task.CompletedTask;
3131
}
3232

33-
public Task UpdateHeartbeat(Guid replicaId)
33+
public Task UpdateHeartbeat(ReplicaId replicaId)
3434
{
3535
lock (_sync)
3636
if (_replicas.ContainsKey(replicaId))

Core/Cleipnir.ResilientFunctions/Storage/Types.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,4 +139,4 @@ public static StoredEffectChange ToStoredChange(this StoredEffect effect, Stored
139139
=> new(storedId, effect.StoredEffectId, operation, effect);
140140
}
141141

142-
public record StoredReplica(Guid ReplicaId, int Heartbeat);
142+
public record StoredReplica(ReplicaId ReplicaId, int Heartbeat);

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbReplicaStore.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using Cleipnir.ResilientFunctions.Domain;
12
using Cleipnir.ResilientFunctions.Storage;
23
using MySqlConnector;
34

@@ -19,7 +20,7 @@ heartbeat INT
1920
}
2021

2122
private string? _insertSql;
22-
public async Task Insert(Guid replicaId)
23+
public async Task Insert(ReplicaId replicaId)
2324
{
2425
_insertSql ??= $@"
2526
INSERT INTO {tablePrefix}_replicas
@@ -32,15 +33,15 @@ INSERT INTO {tablePrefix}_replicas
3233
{
3334
Parameters =
3435
{
35-
new() {Value = replicaId.ToString("N")}
36+
new() {Value = replicaId.AsGuid.ToString("N")}
3637
}
3738
};
3839

3940
await command.ExecuteNonQueryAsync();
4041
}
4142

4243
private string? _deleteSql;
43-
public async Task Delete(Guid replicaId)
44+
public async Task Delete(ReplicaId replicaId)
4445
{
4546
_deleteSql ??= $"DELETE FROM {tablePrefix}_replicas WHERE id = ?";
4647

@@ -49,15 +50,15 @@ public async Task Delete(Guid replicaId)
4950
{
5051
Parameters =
5152
{
52-
new() {Value = replicaId.ToString("N")}
53+
new() {Value = replicaId.AsGuid.ToString("N")}
5354
}
5455
};
5556

5657
await command.ExecuteNonQueryAsync();
5758
}
5859

5960
private string? _updateHeartbeatSql;
60-
public async Task UpdateHeartbeat(Guid replicaId)
61+
public async Task UpdateHeartbeat(ReplicaId replicaId)
6162
{
6263
_updateHeartbeatSql ??= $@"
6364
UPDATE {tablePrefix}_replicas
@@ -69,7 +70,7 @@ public async Task UpdateHeartbeat(Guid replicaId)
6970
{
7071
Parameters =
7172
{
72-
new() {Value = replicaId.ToString("N")}
73+
new() {Value = replicaId.AsGuid.ToString("N")}
7374
}
7475
};
7576

@@ -90,7 +91,7 @@ public async Task<IReadOnlyList<StoredReplica>> GetAll()
9091
{
9192
var id = Guid.Parse(reader.GetString(0));
9293
var heartbeat = reader.GetInt32(1);
93-
storedReplicas.Add(new StoredReplica(id, heartbeat));
94+
storedReplicas.Add(new StoredReplica(id.ToReplicaId(), heartbeat));
9495
}
9596

9697
return storedReplicas;

0 commit comments

Comments
 (0)