Skip to content

Commit 3d25d4d

Browse files
committed
completed rest of stores
1 parent 9e6fc6f commit 3d25d4d

File tree

9 files changed

+336
-3
lines changed

9 files changed

+336
-3
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System.Threading.Tasks;
2+
using Microsoft.VisualStudio.TestTools.UnitTesting;
3+
4+
namespace Cleipnir.ResilientFunctions.Tests.InMemoryTests;
5+
6+
[TestClass]
7+
public class ReplicaStoreTests : TestTemplates.ReplicaStoreTests
8+
{
9+
[TestMethod]
10+
public override Task SunshineScenarioTest()
11+
=> SunshineScenarioTest(FunctionStoreFactory.Create());
12+
}

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class InMemoryFunctionStore : IFunctionStore, IMessageStore
3131

3232
public IMigrator Migrator { get; } = new InMemoryMigrator();
3333
public ISemaphoreStore SemaphoreStore { get; } = new InMemorySemaphoreStore();
34-
public IReplicaStore ReplicaStore => throw new NotImplementedException(); //todo
34+
public IReplicaStore ReplicaStore { get; } = new InMemoryReplicaStore();
3535

3636
public Task Initialize() => Task.CompletedTask;
3737

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Cleipnir.ResilientFunctions.Helpers;
7+
8+
namespace Cleipnir.ResilientFunctions.Storage;
9+
10+
public class InMemoryReplicaStore : IReplicaStore
11+
{
12+
private readonly Dictionary<Guid, int> _replicas = new();
13+
private readonly Lock _sync = new();
14+
15+
public Task Initialize() => Task.CompletedTask;
16+
17+
public Task Insert(Guid replicaId)
18+
{
19+
lock (_sync)
20+
_replicas.TryAdd(replicaId, 0);
21+
22+
return Task.CompletedTask;
23+
}
24+
25+
public Task Delete(Guid replicaId)
26+
{
27+
lock (_sync)
28+
_replicas.Remove(replicaId);
29+
30+
return Task.CompletedTask;
31+
}
32+
33+
public Task UpdateHeartbeat(Guid replicaId)
34+
{
35+
lock (_sync)
36+
if (_replicas.ContainsKey(replicaId))
37+
_replicas[replicaId]++;
38+
39+
return Task.CompletedTask;
40+
}
41+
42+
public Task<IReadOnlyList<StoredReplica>> GetAll()
43+
{
44+
lock (_sync)
45+
return _replicas
46+
.Select(kv => new StoredReplica(kv.Key, kv.Value))
47+
.ToList()
48+
.CastTo<IReadOnlyList<StoredReplica>>()
49+
.ToTask();
50+
}
51+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System.Threading.Tasks;
2+
using Microsoft.VisualStudio.TestTools.UnitTesting;
3+
4+
namespace Cleipnir.ResilientFunctions.PostgreSQL.Tests;
5+
6+
[TestClass]
7+
public class ReplicaStoreTests : ResilientFunctions.Tests.TestTemplates.ReplicaStoreTests
8+
{
9+
[TestMethod]
10+
public override Task SunshineScenarioTest()
11+
=> SunshineScenarioTest(FunctionStoreFactory.Create());
12+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
using Cleipnir.ResilientFunctions.Storage;
5+
using Npgsql;
6+
7+
namespace Cleipnir.ResilientFunctions.PostgreSQL;
8+
9+
public class PostgreSqlDbReplicaStore(string connectionString, string tablePrefix) : IReplicaStore
10+
{
11+
private string? _initializeSql;
12+
public async Task Initialize()
13+
{
14+
_initializeSql ??= $@"
15+
CREATE TABLE IF NOT EXISTS {tablePrefix}_replicas (
16+
id CHAR(32) PRIMARY KEY,
17+
heartbeat INT
18+
);";
19+
await using var conn = await CreateConnection();
20+
var command = new NpgsqlCommand(_initializeSql, conn);
21+
await command.ExecuteNonQueryAsync();
22+
}
23+
24+
private string? _insertSql;
25+
public async Task Insert(Guid replicaId)
26+
{
27+
_insertSql ??= $@"
28+
INSERT INTO {tablePrefix}_replicas
29+
(id, heartbeat)
30+
VALUES
31+
($1, 0)";
32+
33+
await using var conn = await CreateConnection();
34+
await using var command = new NpgsqlCommand(_insertSql, conn)
35+
{
36+
Parameters =
37+
{
38+
new() {Value = replicaId.ToString("N")}
39+
}
40+
};
41+
42+
await command.ExecuteNonQueryAsync();
43+
}
44+
45+
private string? _deleteSql;
46+
public async Task Delete(Guid replicaId)
47+
{
48+
_deleteSql ??= $"DELETE FROM {tablePrefix}_replicas WHERE id = $1";
49+
50+
await using var conn = await CreateConnection();
51+
await using var command = new NpgsqlCommand(_deleteSql, conn)
52+
{
53+
Parameters =
54+
{
55+
new() {Value = replicaId.ToString("N")}
56+
}
57+
};
58+
59+
await command.ExecuteNonQueryAsync();
60+
}
61+
62+
private string? _updateHeartbeatSql;
63+
public async Task UpdateHeartbeat(Guid replicaId)
64+
{
65+
_updateHeartbeatSql ??= $@"
66+
UPDATE {tablePrefix}_replicas
67+
SET heartbeat = heartbeat + 1
68+
WHERE id = $1";
69+
70+
await using var conn = await CreateConnection();
71+
await using var command = new NpgsqlCommand(_updateHeartbeatSql, conn)
72+
{
73+
Parameters =
74+
{
75+
new() {Value = replicaId.ToString("N")}
76+
}
77+
};
78+
79+
await command.ExecuteNonQueryAsync();
80+
}
81+
82+
private string? _getAllSql;
83+
public async Task<IReadOnlyList<StoredReplica>> GetAll()
84+
{
85+
_getAllSql ??= $"SELECT id, heartbeat FROM {tablePrefix}_replicas";
86+
87+
await using var conn = await CreateConnection();
88+
await using var command = new NpgsqlCommand(_getAllSql, conn);
89+
var storedReplicas = new List<StoredReplica>();
90+
91+
await using var reader = await command.ExecuteReaderAsync();
92+
while (await reader.ReadAsync())
93+
{
94+
var id = Guid.Parse(reader.GetString(0));
95+
var heartbeat = reader.GetInt32(1);
96+
storedReplicas.Add(new StoredReplica(id, heartbeat));
97+
}
98+
99+
return storedReplicas;
100+
}
101+
102+
private string? _truncateSql;
103+
public async Task Truncate()
104+
{
105+
_truncateSql ??= $"TRUNCATE TABLE {tablePrefix}_replicas";
106+
107+
await using var conn = await CreateConnection();
108+
await using var command = new NpgsqlCommand(_truncateSql, conn);
109+
110+
await command.ExecuteNonQueryAsync();
111+
}
112+
113+
private async Task<NpgsqlConnection> CreateConnection()
114+
{
115+
var conn = new NpgsqlConnection(connectionString);
116+
await conn.OpenAsync();
117+
return conn;
118+
}
119+
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public class PostgreSqlFunctionStore : IFunctionStore
3434

3535
private readonly PostgreSqlSemaphoreStore _semaphoreStore;
3636
public ISemaphoreStore SemaphoreStore => _semaphoreStore;
37-
public IReplicaStore ReplicaStore => throw new NotImplementedException();
37+
private readonly PostgreSqlDbReplicaStore _replicaStore;
38+
public IReplicaStore ReplicaStore => _replicaStore;
3839

3940
public Utilities Utilities { get; }
4041
public IMigrator Migrator => _migrator;
@@ -57,6 +58,7 @@ public PostgreSqlFunctionStore(string connectionString, string tablePrefix = "")
5758
_typeStore = new PostgreSqlTypeStore(connectionString, _tableName);
5859
_postgresSqlUnderlyingRegister = new PostgresSqlUnderlyingRegister(connectionString, _tableName);
5960
_migrator = new PostgreSqlMigrator(connectionString, _tableName);
61+
_replicaStore = new PostgreSqlDbReplicaStore(connectionString, _tableName);
6062
Utilities = new Utilities(_postgresSqlUnderlyingRegister);
6163
}
6264

@@ -81,6 +83,7 @@ public async Task Initialize()
8183
await _correlationStore.Initialize();
8284
await _semaphoreStore.Initialize();
8385
await _typeStore.Initialize();
86+
await _replicaStore.Initialize();
8487
await using var conn = await CreateConnection();
8588
_initializeSql ??= $@"
8689
CREATE TABLE IF NOT EXISTS {_tableName} (
@@ -122,6 +125,7 @@ public async Task TruncateTables()
122125
await _correlationStore.Truncate();
123126
await _typeStore.Truncate();
124127
await _semaphoreStore.Truncate();
128+
await _replicaStore.Truncate();
125129

126130
await using var conn = await CreateConnection();
127131
_truncateTableSql ??= $"TRUNCATE TABLE {_tableName}";
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System.Threading.Tasks;
2+
using Microsoft.VisualStudio.TestTools.UnitTesting;
3+
4+
namespace Cleipnir.ResilientFunctions.SqlServer.Tests;
5+
6+
[TestClass]
7+
public class ReplicaStoreTests : ResilientFunctions.Tests.TestTemplates.ReplicaStoreTests
8+
{
9+
[TestMethod]
10+
public override Task SunshineScenarioTest()
11+
=> SunshineScenarioTest(FunctionStoreFactory.Create());
12+
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public class SqlServerFunctionStore : IFunctionStore
3434
public IMigrator Migrator => _migrator;
3535
private readonly SqlServerSemaphoreStore _semaphoreStore;
3636
public ISemaphoreStore SemaphoreStore => _semaphoreStore;
37-
public IReplicaStore ReplicaStore => throw new NotImplementedException(); //todo
37+
private readonly SqlServerReplicaStore _replicaStore;
38+
public IReplicaStore ReplicaStore => _replicaStore;
3839

3940
private readonly SqlServerUnderlyingRegister _underlyingRegister;
4041

@@ -54,6 +55,7 @@ public SqlServerFunctionStore(string connectionString, string tablePrefix = "")
5455
_semaphoreStore = new SqlServerSemaphoreStore(connectionString, _tableName);
5556
_typeStore = new SqlServerTypeStore(connectionString, _tableName);
5657
_migrator = new SqlServerMigrator(connectionString, _tableName);
58+
_replicaStore = new SqlServerReplicaStore(connectionString, _tableName);
5759
Utilities = new Utilities(_underlyingRegister);
5860
}
5961

@@ -81,6 +83,7 @@ public async Task Initialize()
8183
await _correlationStore.Initialize();
8284
await _typeStore.Initialize();
8385
await _semaphoreStore.Initialize();
86+
await _replicaStore.Initialize();
8487
await using var conn = await _connFunc();
8588
_initializeSql ??= @$"
8689
CREATE TABLE {_tableName} (
@@ -127,6 +130,7 @@ public async Task TruncateTables()
127130
await _correlationStore.Truncate();
128131
await _typeStore.Truncate();
129132
await _semaphoreStore.Truncate();
133+
await _replicaStore.Truncate();
130134

131135
await using var conn = await _connFunc();
132136
_truncateSql ??= $"TRUNCATE TABLE {_tableName}";

0 commit comments

Comments
 (0)