Skip to content

Commit a2b25e1

Browse files
committed
Implemented SetEffectResults with remove effects overload
1 parent 276af8f commit a2b25e1

File tree

11 files changed

+100
-12
lines changed

11 files changed

+100
-12
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/EffectStoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,8 @@ public override Task TruncateDeletesAllEffects()
3434
[TestMethod]
3535
public override Task BulkInsertTest()
3636
=> BulkInsertTest(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore));
37+
38+
[TestMethod]
39+
public override Task BulkInsertAndDeleteTest()
40+
=> BulkInsertAndDeleteTest(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore));
3741
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/EffectStoreTests.cs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,4 +278,44 @@ protected async Task BulkInsertTest(Task<IEffectsStore> storeTask)
278278
var effect2 = effects.Single(e => e.EffectId == storedEffect2.EffectId);
279279
effect2.Result.ShouldBe("some result 2".ToUtf8Bytes());
280280
}
281+
282+
public abstract Task BulkInsertAndDeleteTest();
283+
protected async Task BulkInsertAndDeleteTest(Task<IEffectsStore> storeTask)
284+
{
285+
var store = await storeTask;
286+
var storedId = TestStoredId.Create();
287+
var storedEffect1 = new StoredEffect(
288+
"EffectId1".ToEffectId(),
289+
"EffectId1".ToStoredEffectId(EffectType.Effect),
290+
WorkStatus.Started,
291+
Result: "some result 1".ToUtf8Bytes(),
292+
StoredException: null
293+
);
294+
var storedEffect2 = new StoredEffect(
295+
"EffectId2".ToEffectId(),
296+
"EffectId2".ToStoredEffectId(EffectType.Effect),
297+
WorkStatus.Completed,
298+
Result: "some result 2".ToUtf8Bytes(),
299+
StoredException: null
300+
);
301+
var storedEffect3 = new StoredEffect(
302+
"EffectId3".ToEffectId(),
303+
"EffectId3".ToStoredEffectId(EffectType.Effect),
304+
WorkStatus.Completed,
305+
Result: "some result 3".ToUtf8Bytes(),
306+
StoredException: null
307+
);
308+
309+
await store.SetEffectResults(storedId, [storedEffect1, storedEffect2]);
310+
await store.SetEffectResults(
311+
storedId,
312+
upsertEffects: [storedEffect3],
313+
removeEffects: [storedEffect1.StoredEffectId, storedEffect2.StoredEffectId]
314+
);
315+
316+
var effects = await store.GetEffectResults(storedId);
317+
effects.Count.ShouldBe(1);
318+
var effect3 = effects.Single(e => e.EffectId == storedEffect3.EffectId);
319+
effect3.Result.ShouldBe("some result 3".ToUtf8Bytes());
320+
}
281321
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableEffectStore.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ public Task SetEffectResult(StoredId storedId, StoredEffect storedEffect)
3030
? Task.FromException(new TimeoutException())
3131
: _inner.SetEffectResult(storedId, storedEffect);
3232

33-
public Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> storedEffects)
33+
public Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> upsertEffects, IReadOnlyList<StoredEffectId> removeEffects)
3434
=> _crashed
3535
? Task.FromException(new TimeoutException())
36-
: _inner.SetEffectResults(storedId, storedEffects);
36+
: _inner.SetEffectResults(storedId, upsertEffects, removeEffects);
3737

3838
public Task<IReadOnlyList<StoredEffect>> GetEffectResults(StoredId storedId)
3939
=> _crashed

Core/Cleipnir.ResilientFunctions/Storage/IEffectsStore.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Collections.Generic;
1+
using System;
2+
using System.Collections.Generic;
23
using System.Threading.Tasks;
34

45
namespace Cleipnir.ResilientFunctions.Storage;
@@ -8,7 +9,8 @@ public interface IEffectsStore
89
Task Initialize();
910
Task Truncate();
1011
Task SetEffectResult(StoredId storedId, StoredEffect storedEffect);
11-
Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> storedEffects);
12+
Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> storedEffects) => SetEffectResults(storedId, storedEffects, removeEffects: Array.Empty<StoredEffectId>());
13+
Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> upsertEffects, IReadOnlyList<StoredEffectId> removeEffects);
1214
Task<IReadOnlyList<StoredEffect>> GetEffectResults(StoredId storedId);
1315
Task DeleteEffectResult(StoredId storedId, StoredEffectId effectId);
1416
Task Remove(StoredId storedId);

Core/Cleipnir.ResilientFunctions/Storage/InMemoryEffectsStore.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,13 @@ public Task SetEffectResult(StoredId storedId, StoredEffect storedEffect)
3434
return Task.CompletedTask;
3535
}
3636

37-
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> storedEffects)
37+
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> upsertEffects, IReadOnlyList<StoredEffectId> removeEffects)
3838
{
39-
foreach (var storedEffect in storedEffects)
39+
foreach (var storedEffect in upsertEffects)
4040
await SetEffectResult(storedId, storedEffect);
41+
42+
foreach (var effectId in removeEffects)
43+
await DeleteEffectResult(storedId, effectId);
4144
}
4245

4346
public Task<IReadOnlyList<StoredEffect>> GetEffectResults(StoredId storedId)

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/EffectsStoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,8 @@ public override Task TruncateDeletesAllEffects()
3333
[TestMethod]
3434
public override Task BulkInsertTest()
3535
=> BulkInsertTest(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore));
36+
37+
[TestMethod]
38+
public override Task BulkInsertAndDeleteTest()
39+
=> BulkInsertAndDeleteTest(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore));
3640
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbEffectsStore.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ ON DUPLICATE KEY UPDATE
6666
}
6767

6868
private string? _setEffectResultsSql;
69-
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> storedEffects)
69+
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> storedEffects, IReadOnlyList<StoredEffectId> removeEffects)
7070
{
7171
await using var conn = await CreateConnection();
7272
_setEffectResultsSql ??= $@"
@@ -75,12 +75,19 @@ INSERT INTO {tablePrefix}_effects
7575
VALUES
7676
@VALUES
7777
ON DUPLICATE KEY UPDATE
78-
status = VALUES(status), result = VALUES(result), exception = VALUES(exception)";
78+
status = VALUES(status), result = VALUES(result), exception = VALUES(exception);";
7979

8080
var sql = _setEffectResultsSql.Replace(
8181
"@VALUES",
8282
"(?, ?, ?, ?, ?, ?, ?)".Replicate(storedEffects.Count).StringJoin(", ")
8383
);
84+
85+
if (removeEffects.Count > 0)
86+
sql += Environment.NewLine +
87+
@$"DELETE FROM {tablePrefix}_effects
88+
WHERE type = {storedId.Type.Value} AND
89+
instance = '{storedId.Instance.Value:N}' AND
90+
id_hash IN ({removeEffects.Select(id => $"'{id.Value:N}'").StringJoin(", ")});";
8491

8592
await using var command = new MySqlCommand(sql, conn);
8693
foreach (var storedEffect in storedEffects)

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/EffectStoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,8 @@ public override Task TruncateDeletesAllEffects()
3434
[TestMethod]
3535
public override Task BulkInsertTest()
3636
=> BulkInsertTest(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore));
37+
38+
[TestMethod]
39+
public override Task BulkInsertAndDeleteTest()
40+
=> BulkInsertAndDeleteTest(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore));
3741
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlEffectsStore.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Linq;
34
using System.Threading.Tasks;
45
using Cleipnir.ResilientFunctions.Domain;
6+
using Cleipnir.ResilientFunctions.Helpers;
57
using Cleipnir.ResilientFunctions.Storage;
68
using Cleipnir.ResilientFunctions.Storage.Utils;
79
using Npgsql;
@@ -68,7 +70,7 @@ ON CONFLICT (type, instance, id_hash)
6870
await command.ExecuteNonQueryAsync();
6971
}
7072

71-
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> storedEffects)
73+
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> upsertEffects, IReadOnlyList<StoredEffectId> removeEffects)
7274
{
7375
await using var conn = await CreateConnection();
7476
_setEffectResultSql ??= $@"
@@ -81,7 +83,7 @@ ON CONFLICT (type, instance, id_hash)
8183
UPDATE SET status = EXCLUDED.status, result = EXCLUDED.result, exception = EXCLUDED.exception";
8284

8385
await using var batch = new NpgsqlBatch(conn);
84-
foreach (var storedEffect in storedEffects)
86+
foreach (var storedEffect in upsertEffects)
8587
{
8688
var command = new NpgsqlBatchCommand(_setEffectResultSql)
8789
{
@@ -98,6 +100,16 @@ ON CONFLICT (type, instance, id_hash)
98100
};
99101
batch.BatchCommands.Add(command);
100102
}
103+
104+
if (removeEffects.Count > 0)
105+
batch.BatchCommands.Add(
106+
new NpgsqlBatchCommand(
107+
@$"DELETE FROM {tablePrefix}_effects
108+
WHERE type = {storedId.Type.Value} AND
109+
instance = '{storedId.Instance.Value}' AND
110+
id_hash IN ({removeEffects.Select(id => $"'{id.Value}'").StringJoin(", ")});"
111+
)
112+
);
101113

102114
await batch.ExecuteNonQueryAsync();
103115
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/EffectStoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,8 @@ public override Task TruncateDeletesAllEffects()
3434
[TestMethod]
3535
public override Task BulkInsertTest()
3636
=> BulkInsertTest(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore));
37+
38+
[TestMethod]
39+
public override Task BulkInsertAndDeleteTest()
40+
=> BulkInsertAndDeleteTest(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore));
3741
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerEffectsStore.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ WHEN NOT MATCHED THEN
7474
await command.ExecuteNonQueryAsync();
7575
}
7676

77-
private static string? _setEffectResultsSql;
78-
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> storedEffects)
77+
private string? _setEffectResultsSql;
78+
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> storedEffects, IReadOnlyList<StoredEffectId> removeEffects)
7979
{
8080
await using var conn = await CreateConnection();
8181
_setEffectResultsSql ??= $@"
@@ -96,6 +96,14 @@ WHEN NOT MATCHED THEN
9696
.Select((s, i) => s.Replace("#", i.ToString()))
9797
.StringJoin(", ")
9898
);
99+
100+
if (removeEffects.Count > 0)
101+
sql += Environment.NewLine +
102+
@$"DELETE FROM {tablePrefix}_Effects
103+
WHERE FlowType = {storedId.Type.Value} AND
104+
FlowInstance = '{storedId.Instance.Value}' AND
105+
StoredId IN ({removeEffects.Select(id => $"'{id.Value}'").StringJoin(", ")}) ";
106+
99107
await using var command = new SqlCommand(sql, conn);
100108
for (var i = 0; i < storedEffects.Count; i++)
101109
{

0 commit comments

Comments
 (0)