Skip to content

Commit 4c24f09

Browse files
committed
Simplified SetEffectResults-method on IEffectsStore
1 parent 1a76bd1 commit 4c24f09

File tree

9 files changed

+57
-72
lines changed

9 files changed

+57
-72
lines changed

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/EffectStoreTests.cs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,11 @@ protected async Task BulkInsertTest(Task<IEffectsStore> storeTask)
268268
Result: "some result 2".ToUtf8Bytes(),
269269
StoredException: null
270270
);
271-
272-
await store.SetEffectResults(storedId, [storedEffect1, storedEffect2]);
271+
272+
await store.SetEffectResults(
273+
storedId,
274+
[storedEffect1.ToStoredChange(storedId), storedEffect2.ToStoredChange(storedId)]
275+
);
273276

274277
var effects = await store.GetEffectResults(storedId);
275278
effects.Count.ShouldBe(2);
@@ -306,11 +309,14 @@ protected async Task BulkInsertAndDeleteTest(Task<IEffectsStore> storeTask)
306309
StoredException: null
307310
);
308311

309-
await store.SetEffectResults(storedId, [storedEffect1, storedEffect2]);
312+
await store.SetEffectResults(storedId, [storedEffect1.ToStoredChange(storedId), storedEffect2.ToStoredChange(storedId)]);
310313
await store.SetEffectResults(
311314
storedId,
312-
upsertEffects: [storedEffect3],
313-
removeEffects: [storedEffect1.StoredEffectId, storedEffect2.StoredEffectId]
315+
changes: [
316+
storedEffect3.ToStoredChange(storedId),
317+
StoredEffectChange.CreateDelete(storedId, storedEffect1.StoredEffectId),
318+
StoredEffectChange.CreateDelete(storedId, storedEffect2.StoredEffectId)
319+
]
314320
);
315321

316322
var effects = await store.GetEffectResults(storedId);
@@ -338,12 +344,17 @@ protected async Task BulkDeleteTest(Task<IEffectsStore> storeTask)
338344
Result: "some result 2".ToUtf8Bytes(),
339345
StoredException: null
340346
);
341-
342-
await store.SetEffectResults(storedId, [storedEffect1, storedEffect2]);
347+
348+
await store.SetEffectResults(
349+
storedId,
350+
changes: [storedEffect1.ToStoredChange(storedId), storedEffect2.ToStoredChange(storedId)]
351+
);
343352
await store.SetEffectResults(
344353
storedId,
345-
upsertEffects: [],
346-
removeEffects: [storedEffect1.StoredEffectId, storedEffect2.StoredEffectId]
354+
changes: [
355+
StoredEffectChange.CreateDelete(storedId, storedEffect1.StoredEffectId),
356+
StoredEffectChange.CreateDelete(storedId, storedEffect2.StoredEffectId)
357+
]
347358
);
348359

349360
var effects = await store.GetEffectResults(storedId);

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableEffectStore.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ public Task SetEffectResult(StoredId storedId, StoredEffect storedEffect)
3030
? Task.FromException(new TimeoutException())
3131
: _inner.SetEffectResult(storedId, storedEffect);
3232

33-
public Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> upsertEffects, IReadOnlyList<StoredEffectId> removeEffects)
33+
public Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffectChange> changes)
3434
=> _crashed
3535
? Task.FromException(new TimeoutException())
36-
: _inner.SetEffectResults(storedId, upsertEffects, removeEffects);
36+
: _inner.SetEffectResults(storedId, changes);
3737

3838
public Task<IReadOnlyList<StoredEffect>> GetEffectResults(StoredId storedId)
3939
=> _crashed

Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -358,11 +358,14 @@ private async Task Flush(StoredEffectId changedStoredId, StoredEffect? change)
358358
return;
359359
}
360360

361-
await _effectsStore.SetEffectResults(
362-
_storedId,
363-
upsertEffects: pendingChanges.Where(pc => pc.StoredEffect != null).Select(pc => pc.StoredEffect!).ToList(),
364-
removeEffects: pendingChanges.Where(pc => pc.StoredEffect == null).Select(pc => pc.Id).ToList()
365-
);
361+
var changes = pendingChanges
362+
.Select(pc => new StoredEffectChange(
363+
_storedId,
364+
pc.Id,
365+
pc.StoredEffect == null ? CrudOperation.Delete : CrudOperation.Upsert,
366+
pc.StoredEffect)
367+
).ToList();
368+
await _effectsStore.SetEffectResults(_storedId, changes);
366369
}
367370
finally
368371
{

Core/Cleipnir.ResilientFunctions/Storage/IEffectsStore.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using System;
2-
using System.Collections.Generic;
1+
using System.Collections.Generic;
32
using System.Threading.Tasks;
43

54
namespace Cleipnir.ResilientFunctions.Storage;
@@ -9,8 +8,7 @@ public interface IEffectsStore
98
Task Initialize();
109
Task Truncate();
1110
Task SetEffectResult(StoredId storedId, StoredEffect storedEffect);
12-
Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> storedEffects) => SetEffectResults(storedId, storedEffects, removeEffects: Array.Empty<StoredEffectId>());
13-
Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> upsertEffects, IReadOnlyList<StoredEffectId> removeEffects);
11+
Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffectChange> changes);
1412
Task<IReadOnlyList<StoredEffect>> GetEffectResults(StoredId storedId);
1513
Task DeleteEffectResult(StoredId storedId, StoredEffectId effectId);
1614
Task DeleteEffectResults(StoredId storedId, IReadOnlyList<StoredEffectId> effectIds);

Core/Cleipnir.ResilientFunctions/Storage/InMemoryEffectsStore.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@ public Task SetEffectResult(StoredId storedId, StoredEffect storedEffect)
3434
return Task.CompletedTask;
3535
}
3636

37-
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> upsertEffects, IReadOnlyList<StoredEffectId> removeEffects)
37+
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffectChange> changes)
3838
{
39-
foreach (var storedEffect in upsertEffects)
39+
foreach (var storedEffect in changes.Where(c => c.Operation == CrudOperation.Upsert).Select(c => c.StoredEffect!))
4040
await SetEffectResult(storedId, storedEffect);
4141

42-
foreach (var effectId in removeEffects)
42+
foreach (var effectId in changes.Where(c => c.Operation == CrudOperation.Delete).Select(c => c.EffectId))
4343
await DeleteEffectResult(storedId, effectId);
4444
}
4545

Core/Cleipnir.ResilientFunctions/Storage/Types.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,15 @@ public static class StoredEffectIdExtensions
7474
public static StoredEffectId ToStoredEffectId(this EffectId effectId) => StoredEffectId.Create(effectId);
7575
}
7676

77-
public record StoredEffectChange(StoredId StoredId, StoredEffectId EffectId, CrudOperation Operation, StoredEffect? StoredEffect);
77+
public record StoredEffectChange(
78+
StoredId StoredId,
79+
StoredEffectId EffectId,
80+
CrudOperation Operation,
81+
StoredEffect? StoredEffect)
82+
{
83+
public static StoredEffectChange CreateDelete(StoredId storedId, StoredEffectId effectId)
84+
=> new(storedId, effectId, CrudOperation.Delete, StoredEffect: null);
85+
}
7886

7987
public enum CrudOperation
8088
{
@@ -115,4 +123,10 @@ public static StoredEffect CreateState(StoredState storedState)
115123
public record StoredState(StateId StateId, byte[] StateJson);
116124

117125
public record IdWithParam(StoredId StoredId, string HumanInstanceId, byte[]? Param);
118-
public record LeaseUpdate(StoredId StoredId, int ExpectedEpoch);
126+
public record LeaseUpdate(StoredId StoredId, int ExpectedEpoch);
127+
128+
public static class StoredEffectExtensions
129+
{
130+
public static StoredEffectChange ToStoredChange(this StoredEffect effect, StoredId storedId)
131+
=> new(storedId, effect.StoredEffectId, CrudOperation.Upsert, effect);
132+
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbEffectsStore.cs

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -64,23 +64,9 @@ ON DUPLICATE KEY UPDATE
6464

6565
await command.ExecuteNonQueryAsync();
6666
}
67-
68-
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> upsertEffects, IReadOnlyList<StoredEffectId> removeEffects)
67+
68+
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffectChange> changes)
6969
{
70-
var changes = upsertEffects
71-
.Select(u => new StoredEffectChange(
72-
storedId,
73-
u.StoredEffectId,
74-
CrudOperation.Upsert,
75-
u
76-
))
77-
.Concat(
78-
removeEffects.Select(id =>
79-
new StoredEffectChange(storedId, id, CrudOperation.Delete, StoredEffect: null)
80-
)
81-
)
82-
.ToList();
83-
8470
await using var conn = await CreateConnection();
8571
await using var command = new MySqlCommand();
8672
command.Connection = conn;

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlEffectsStore.cs

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -70,24 +70,10 @@ ON CONFLICT (type, instance, id_hash)
7070
await command.ExecuteNonQueryAsync();
7171
}
7272

73-
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> upsertEffects, IReadOnlyList<StoredEffectId> removeEffects)
73+
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffectChange> changes)
7474
{
7575
await using var conn = await CreateConnection();
7676
await using var batch = new NpgsqlBatch(conn);
77-
var changes = upsertEffects
78-
.Select(u => new StoredEffectChange(
79-
storedId,
80-
u.StoredEffectId,
81-
CrudOperation.Upsert,
82-
u
83-
))
84-
.Concat(
85-
removeEffects.Select(id =>
86-
new StoredEffectChange(storedId, id, CrudOperation.Delete, StoredEffect: null)
87-
)
88-
)
89-
.ToList();
90-
9177
SqlGenerator.UpdateEffects(batch, changes, tablePrefix);
9278
await batch.ExecuteNonQueryAsync();
9379
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerEffectsStore.cs

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -73,30 +73,17 @@ WHEN NOT MATCHED THEN
7373

7474
await command.ExecuteNonQueryAsync();
7575
}
76-
77-
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> upsertEffects, IReadOnlyList<StoredEffectId> removeEffects)
76+
77+
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffectChange> changes)
7878
{
79-
var changes = upsertEffects
80-
.Select(u => new StoredEffectChange(
81-
storedId,
82-
u.StoredEffectId,
83-
CrudOperation.Upsert,
84-
u
85-
))
86-
.Concat(
87-
removeEffects.Select(id =>
88-
new StoredEffectChange(storedId, id, CrudOperation.Delete, StoredEffect: null)
89-
)
90-
)
91-
.ToList();
9279
await using var conn = await CreateConnection();
9380
await using var command = new SqlCommand();
9481
command.Connection = conn;
9582
command.CommandText = SqlGenerator.UpdateEffects(command, changes, tablePrefix, paramPrefix: "");
9683

9784
await command.ExecuteNonQueryAsync();
9885
}
99-
86+
10087
private string? _getEffectResultsSql;
10188
public async Task<IReadOnlyList<StoredEffect>> GetEffectResults(StoredId storedId)
10289
{

0 commit comments

Comments
 (0)