Skip to content

Commit 9e6fc6f

Browse files
StidsborgStidsborg
authored andcommitted
WIP
1 parent 4aa334e commit 9e6fc6f

File tree

13 files changed

+232
-2
lines changed

13 files changed

+232
-2
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdaterTests/LeaseUpdaterTestFunctionStore.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public class LeaseUpdaterTestFunctionStore : IFunctionStore
2525
public Utilities Utilities => _inner.Utilities;
2626
public IMigrator Migrator => _inner.Migrator;
2727
public ISemaphoreStore SemaphoreStore => _inner.SemaphoreStore;
28+
public IReplicaStore ReplicaStore => _inner.ReplicaStore;
2829
public Task Initialize() => _inner.Initialize();
2930

3031
public Task<bool> CreateFunction(
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using Cleipnir.ResilientFunctions.Helpers;
5+
using Cleipnir.ResilientFunctions.Storage;
6+
using Cleipnir.ResilientFunctions.Tests.Utils;
7+
using Shouldly;
8+
9+
namespace Cleipnir.ResilientFunctions.Tests.TestTemplates;
10+
11+
public abstract class ReplicaStoreTests
12+
{
13+
public abstract Task SunshineScenarioTest();
14+
protected async Task SunshineScenarioTest(Task<IFunctionStore> storeTask)
15+
{
16+
var store = await storeTask.SelectAsync(s => s.ReplicaStore);
17+
await store.GetAll().ShouldBeEmptyAsync();
18+
var replicaId1 = Guid.NewGuid();
19+
var replicaId2 = Guid.NewGuid();
20+
21+
{
22+
await store.Insert(replicaId1);
23+
var all = await store.GetAll();
24+
all.Count.ShouldBe(1);
25+
var stored = all.Single();
26+
stored.ReplicaId.ShouldBe(replicaId1);
27+
stored.Heartbeat.ShouldBe(0);
28+
}
29+
30+
{
31+
await store.Insert(replicaId2);
32+
var all = await store.GetAll();
33+
all.Count.ShouldBe(2);
34+
var stored = all.Single(id => id.ReplicaId == replicaId2);
35+
stored.ReplicaId.ShouldBe(replicaId2);
36+
stored.Heartbeat.ShouldBe(0);
37+
}
38+
39+
await store.UpdateHeartbeat(replicaId1);
40+
{
41+
var all = await store.GetAll();
42+
all.Count.ShouldBe(2);
43+
var stored1 = all.Single(r => r.ReplicaId == replicaId1);
44+
stored1.Heartbeat.ShouldBe(1);
45+
var stored2 = all.Single(r => r.ReplicaId == replicaId2);
46+
stored2.Heartbeat.ShouldBe(0);
47+
}
48+
49+
await store.UpdateHeartbeat(replicaId2);
50+
{
51+
var all = await store.GetAll();
52+
all.Count.ShouldBe(2);
53+
var stored1 = all.Single(r => r.ReplicaId == replicaId1);
54+
stored1.Heartbeat.ShouldBe(1);
55+
var stored2 = all.Single(r => r.ReplicaId == replicaId2);
56+
stored2.Heartbeat.ShouldBe(1);
57+
}
58+
59+
await store.Delete(replicaId1);
60+
{
61+
var all = await store.GetAll();
62+
all.Count.ShouldBe(1);
63+
var stored2 = all.Single(r => r.ReplicaId == replicaId2);
64+
stored2.Heartbeat.ShouldBe(1);
65+
}
66+
67+
await store.Delete(replicaId2);
68+
{
69+
var all = await store.GetAll();
70+
all.ShouldBeEmpty();
71+
}
72+
}
73+
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public class CrashableFunctionStore : IFunctionStore
2525
public Utilities Utilities => _crashed ? throw new TimeoutException() : _inner.Utilities;
2626
public IMigrator Migrator => _crashed ? throw new TimeoutException() : _inner.Migrator;
2727
public ISemaphoreStore SemaphoreStore => _crashed ? throw new TimeoutException() : _inner.SemaphoreStore;
28+
public IReplicaStore ReplicaStore => _crashed ? throw new TimeoutException() : _inner.ReplicaStore;
2829

2930
public CrashableFunctionStore(IFunctionStore inner)
3031
{

Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public interface IFunctionStore
1616
public Utilities Utilities { get; }
1717
public IMigrator Migrator { get; }
1818
public ISemaphoreStore SemaphoreStore { get; }
19+
public IReplicaStore ReplicaStore { get; }
1920
public Task Initialize();
2021

2122
Task<bool> CreateFunction(
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
5+
namespace Cleipnir.ResilientFunctions.Storage;
6+
7+
public interface IReplicaStore
8+
{
9+
public Task Initialize();
10+
public Task Insert(Guid replicaId);
11+
public Task Delete(Guid replicaId);
12+
public Task UpdateHeartbeat(Guid replicaId);
13+
public Task<IReadOnlyList<StoredReplica>> GetAll();
14+
}

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Collections.Generic;
1+
using System;
2+
using System.Collections.Generic;
23
using System.Linq;
34
using System.Threading;
45
using System.Threading.Tasks;
@@ -30,6 +31,7 @@ public class InMemoryFunctionStore : IFunctionStore, IMessageStore
3031

3132
public IMigrator Migrator { get; } = new InMemoryMigrator();
3233
public ISemaphoreStore SemaphoreStore { get; } = new InMemorySemaphoreStore();
34+
public IReplicaStore ReplicaStore => throw new NotImplementedException(); //todo
3335

3436
public Task Initialize() => Task.CompletedTask;
3537

Core/Cleipnir.ResilientFunctions/Storage/Types.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,6 @@ public static class StoredEffectExtensions
137137
{
138138
public static StoredEffectChange ToStoredChange(this StoredEffect effect, StoredId storedId, CrudOperation operation)
139139
=> new(storedId, effect.StoredEffectId, operation, effect);
140-
}
140+
}
141+
142+
public record StoredReplica(Guid ReplicaId, int Heartbeat);

Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class CrashableFunctionStore : IFunctionStore
2121
public Utilities Utilities => _inner.Utilities;
2222
public IMigrator Migrator => _inner.Migrator;
2323
public ISemaphoreStore SemaphoreStore => _inner.SemaphoreStore;
24+
public IReplicaStore ReplicaStore => _inner.ReplicaStore;
2425

2526
public CrashableFunctionStore(IFunctionStore inner) => _inner = inner;
2627

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using Microsoft.VisualStudio.TestTools.UnitTesting;
2+
3+
namespace Cleipnir.ResilientFunctions.MariaDb.Tests;
4+
5+
[TestClass]
6+
public class ReplicaStoreTests : ResilientFunctions.Tests.TestTemplates.ReplicaStoreTests
7+
{
8+
[TestMethod]
9+
public override Task SunshineScenarioTest()
10+
=> SunshineScenarioTest(FunctionStoreFactory.Create());
11+
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ public class MariaDbFunctionStore : IFunctionStore
3636
private readonly MariaDbSemaphoreStore _semaphoreStore;
3737
public ISemaphoreStore SemaphoreStore => _semaphoreStore;
3838

39+
private readonly MariaDbReplicaStore _replicaStore;
40+
public IReplicaStore ReplicaStore => _replicaStore;
41+
3942
public Utilities Utilities { get; }
4043
private readonly MariaDbUnderlyingRegister _mariaDbUnderlyingRegister;
4144

@@ -57,6 +60,7 @@ public MariaDbFunctionStore(string connectionString, string tablePrefix = "")
5760
_mariaDbUnderlyingRegister = new MariaDbUnderlyingRegister(connectionString, tablePrefix);
5861
_typeStore = new MariaDbTypeStore(connectionString, tablePrefix);
5962
_migrator = new MariaDbMigrator(connectionString, tablePrefix);
63+
_replicaStore = new MariaDbReplicaStore(connectionString, tablePrefix);
6064

6165
Utilities = new Utilities(_mariaDbUnderlyingRegister);
6266
}
@@ -75,6 +79,7 @@ public async Task Initialize()
7579
await _semaphoreStore.Initialize();
7680
await TimeoutStore.Initialize();
7781
await _typeStore.Initialize();
82+
await _replicaStore.Initialize();
7883
await using var conn = await CreateOpenConnection(_connectionString);
7984
_initializeSql ??= $@"
8085
CREATE TABLE IF NOT EXISTS {_tablePrefix} (
@@ -108,6 +113,7 @@ public async Task TruncateTables()
108113
await _correlationStore.Truncate();
109114
await _semaphoreStore.Truncate();
110115
await _typeStore.Truncate();
116+
await _replicaStore.Truncate();
111117

112118
await using var conn = await CreateOpenConnection(_connectionString);
113119
_truncateTablesSql ??= $"TRUNCATE TABLE {_tablePrefix}";

0 commit comments

Comments
 (0)