Skip to content

Commit c929fde

Browse files
committed
Extended CrudOperations with Insert and changed EffectResults accordingly
1 parent 9512970 commit c929fde

File tree

13 files changed

+299
-152
lines changed

13 files changed

+299
-152
lines changed

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/EffectStoreTests.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Cleipnir.ResilientFunctions.Storage;
66
using Cleipnir.ResilientFunctions.Tests.Utils;
77
using Shouldly;
8+
using static Cleipnir.ResilientFunctions.Storage.CrudOperation;
89

910
namespace Cleipnir.ResilientFunctions.Tests.TestTemplates;
1011

@@ -271,7 +272,7 @@ protected async Task BulkInsertTest(Task<IEffectsStore> storeTask)
271272

272273
await store.SetEffectResults(
273274
storedId,
274-
[storedEffect1.ToStoredChange(storedId), storedEffect2.ToStoredChange(storedId)]
275+
[storedEffect1.ToStoredChange(storedId, Insert), storedEffect2.ToStoredChange(storedId, Insert)]
275276
);
276277

277278
var effects = await store.GetEffectResults(storedId);
@@ -309,11 +310,11 @@ protected async Task BulkInsertAndDeleteTest(Task<IEffectsStore> storeTask)
309310
StoredException: null
310311
);
311312

312-
await store.SetEffectResults(storedId, [storedEffect1.ToStoredChange(storedId), storedEffect2.ToStoredChange(storedId)]);
313+
await store.SetEffectResults(storedId, [storedEffect1.ToStoredChange(storedId, Insert), storedEffect2.ToStoredChange(storedId, Insert)]);
313314
await store.SetEffectResults(
314315
storedId,
315316
changes: [
316-
storedEffect3.ToStoredChange(storedId),
317+
storedEffect3.ToStoredChange(storedId, Insert),
317318
StoredEffectChange.CreateDelete(storedId, storedEffect1.StoredEffectId),
318319
StoredEffectChange.CreateDelete(storedId, storedEffect2.StoredEffectId)
319320
]
@@ -347,7 +348,7 @@ protected async Task BulkDeleteTest(Task<IEffectsStore> storeTask)
347348

348349
await store.SetEffectResults(
349350
storedId,
350-
changes: [storedEffect1.ToStoredChange(storedId), storedEffect2.ToStoredChange(storedId)]
351+
changes: [storedEffect1.ToStoredChange(storedId, Insert), storedEffect2.ToStoredChange(storedId, Insert)]
351352
);
352353
await store.SetEffectResults(
353354
storedId,

Core/Cleipnir.ResilientFunctions/Domain/EffectResults.cs

Lines changed: 102 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,13 @@ public class EffectResults
1414
{
1515
private readonly StoredId _storedId;
1616
private readonly FlowId _flowId;
17-
private readonly Dictionary<EffectId, StoredEffect> _effectResults = new();
1817
private readonly Lock _sync = new();
1918
private readonly Lazy<Task<IReadOnlyList<StoredEffect>>> _lazyExistingEffects;
2019
private readonly IEffectsStore _effectsStore;
2120
private readonly ISerializer _serializer;
2221
private volatile bool _initialized;
2322

24-
private readonly Dictionary<StoredEffectId, PendingChange> _pendingChanges = new();
23+
private readonly Dictionary<EffectId, PendingEffectChange> _effectResults = new();
2524

2625
public EffectResults(
2726
FlowId flowId,
@@ -49,7 +48,13 @@ private async Task InitializeIfRequired()
4948
return;
5049

5150
foreach (var existingEffect in existingEffects)
52-
_effectResults[existingEffect.EffectId] = existingEffect;
51+
_effectResults[existingEffect.EffectId] =
52+
new PendingEffectChange(
53+
existingEffect.StoredEffectId,
54+
existingEffect,
55+
Operation: null,
56+
Existing: true
57+
);
5358

5459
_initialized = true;
5560
}
@@ -66,7 +71,7 @@ public async Task<bool> Contains(EffectId effectId)
6671
{
6772
await InitializeIfRequired();
6873
lock (_sync)
69-
return _effectResults.GetValueOrDefault(effectId);
74+
return _effectResults.GetValueOrDefault(effectId)?.StoredEffect;
7075
}
7176

7277
public async Task Set(StoredEffect storedEffect, bool flush)
@@ -76,7 +81,8 @@ await FlushOrAddToPending(
7681
storedEffect.EffectId,
7782
storedEffect.StoredEffectId,
7883
storedEffect,
79-
flush
84+
flush,
85+
delete: false
8086
);
8187
}
8288

@@ -85,19 +91,20 @@ public async Task<T> CreateOrGet<T>(EffectId effectId, T value, bool flush)
8591
await InitializeIfRequired();
8692
lock (_sync)
8793
{
88-
if (_effectResults.TryGetValue(effectId, out var existing) && existing.WorkStatus == WorkStatus.Completed)
89-
return _serializer.Deserialize<T>(existing.Result!);
94+
if (_effectResults.TryGetValue(effectId, out var existing) && existing.StoredEffect?.WorkStatus == WorkStatus.Completed)
95+
return _serializer.Deserialize<T>(existing.StoredEffect.Result!);
9096

91-
if (existing?.StoredException != null)
92-
throw _serializer.DeserializeException(_flowId, existing.StoredException!);
97+
if (existing?.StoredEffect?.StoredException != null)
98+
throw _serializer.DeserializeException(_flowId, existing.StoredEffect.StoredException!);
9399
}
94100

95101
var storedEffect = StoredEffect.CreateCompleted(effectId, _serializer.Serialize(value));
96102
await FlushOrAddToPending(
97103
storedEffect.EffectId,
98104
storedEffect.StoredEffectId,
99105
storedEffect,
100-
flush
106+
flush,
107+
delete: false
101108
);
102109

103110
return value;
@@ -112,7 +119,8 @@ await FlushOrAddToPending(
112119
storedEffect.EffectId,
113120
storedEffect.StoredEffectId,
114121
storedEffect,
115-
flush
122+
flush,
123+
delete: false
116124
);
117125
}
118126

@@ -122,15 +130,16 @@ public async Task<Option<T>> TryGet<T>(EffectId effectId)
122130

123131
lock (_sync)
124132
{
125-
if (_effectResults.TryGetValue(effectId, out var storedEffect))
133+
if (_effectResults.TryGetValue(effectId, out var change))
126134
{
127-
if (storedEffect.WorkStatus == WorkStatus.Completed)
135+
var storedEffect = change.StoredEffect;
136+
if (storedEffect?.WorkStatus == WorkStatus.Completed)
128137
{
129138
var value = _serializer.Deserialize<T>(storedEffect.Result!)!;
130139
return Option.Create(value);
131140
}
132141

133-
if (storedEffect.StoredException != null)
142+
if (storedEffect?.StoredException != null)
134143
throw _serializer.DeserializeException(_flowId, storedEffect.StoredException!);
135144
}
136145
}
@@ -147,21 +156,20 @@ public async Task InnerCapture(string id, EffectType effectType, Func<Task> work
147156

148157
lock (_sync)
149158
{
150-
var success = _effectResults.TryGetValue(effectId, out var storedEffect);
151-
if (success && storedEffect!.WorkStatus == WorkStatus.Completed)
159+
var success = _effectResults.TryGetValue(effectId, out var pendingChange);
160+
var storedEffect = pendingChange?.StoredEffect;
161+
if (success && storedEffect?.WorkStatus == WorkStatus.Completed)
152162
return;
153-
if (success && storedEffect!.WorkStatus == WorkStatus.Failed)
163+
if (success && storedEffect?.WorkStatus == WorkStatus.Failed)
154164
throw _serializer.DeserializeException(_flowId, storedEffect.StoredException!);
155165
if (success && resiliency == ResiliencyLevel.AtMostOnce)
156166
throw new InvalidOperationException($"Effect '{id}' started but did not complete previously");
157167
}
158168

159169
if (resiliency == ResiliencyLevel.AtMostOnce)
160170
{
161-
var storedEffect = StoredEffect.CreateStarted(effectId);
162-
await _effectsStore.SetEffectResult(_storedId, storedEffect);
163-
lock (_sync)
164-
_effectResults[effectId] = storedEffect;
171+
var storedEffect = StoredEffect.CreateStarted(effectId);
172+
await FlushOrAddToPending(effectId, storedEffect.StoredEffectId, storedEffect, flush: true, delete: false);
165173
}
166174

167175
try
@@ -184,7 +192,8 @@ await FlushOrAddToPending(
184192
storedEffect.EffectId,
185193
storedEffect.StoredEffectId,
186194
storedEffect,
187-
flush: true
195+
flush: true,
196+
delete: false
188197
);
189198

190199
exception.FlowId = _flowId;
@@ -199,7 +208,8 @@ await FlushOrAddToPending(
199208
storedEffect.EffectId,
200209
storedEffect.StoredEffectId,
201210
storedEffect,
202-
flush: true
211+
flush: true,
212+
delete: false
203213
);
204214

205215
throw fatalWorkflowException;
@@ -211,7 +221,8 @@ await FlushOrAddToPending(
211221
storedEffect.EffectId,
212222
storedEffect.StoredEffectId,
213223
storedEffect,
214-
flush: resiliency != ResiliencyLevel.AtLeastOnceDelayFlush
224+
flush: resiliency != ResiliencyLevel.AtLeastOnceDelayFlush,
225+
delete: false
215226
);
216227
}
217228
}
@@ -226,20 +237,24 @@ public async Task<T> InnerCapture<T>(string id, EffectType effectType, Func<Task
226237
lock (_sync)
227238
{
228239
var success = _effectResults.TryGetValue(effectId, out var storedEffect);
229-
if (success && storedEffect!.WorkStatus == WorkStatus.Completed)
230-
return (storedEffect.Result == null ? default : _serializer.Deserialize<T>(storedEffect.Result))!;
231-
if (success && storedEffect!.WorkStatus == WorkStatus.Failed)
232-
throw FatalWorkflowException.Create(_flowId, storedEffect.StoredException!);
240+
if (success && storedEffect!.StoredEffect?.WorkStatus == WorkStatus.Completed)
241+
return (storedEffect.StoredEffect?.Result == null ? default : _serializer.Deserialize<T>(storedEffect.StoredEffect?.Result!))!;
242+
if (success && storedEffect!.StoredEffect?.WorkStatus == WorkStatus.Failed)
243+
throw FatalWorkflowException.Create(_flowId, storedEffect.StoredEffect?.StoredException!);
233244
if (success && resiliency == ResiliencyLevel.AtMostOnce)
234245
throw new InvalidOperationException($"Effect '{id}' started but did not complete previously");
235246
}
236247

237248
if (resiliency == ResiliencyLevel.AtMostOnce)
238249
{
239250
var storedEffect = StoredEffect.CreateStarted(effectId);
240-
await _effectsStore.SetEffectResult(_storedId, storedEffect);
241-
lock (_sync)
242-
_effectResults[effectId] = storedEffect;
251+
await FlushOrAddToPending(
252+
effectId,
253+
storedEffect.StoredEffectId,
254+
storedEffect,
255+
flush: true,
256+
delete: false
257+
);
243258
}
244259

245260
T result;
@@ -263,7 +278,8 @@ await FlushOrAddToPending(
263278
storedEffect.EffectId,
264279
storedEffect.StoredEffectId,
265280
storedEffect,
266-
flush: true
281+
flush: true,
282+
delete: false
267283
);
268284

269285
exception.FlowId = _flowId;
@@ -279,7 +295,8 @@ await FlushOrAddToPending(
279295
storedEffect.EffectId,
280296
storedEffect.StoredEffectId,
281297
storedEffect,
282-
flush: true
298+
flush: true,
299+
delete: false
283300
);
284301
throw fatalWorkflowException;
285302
}
@@ -290,7 +307,8 @@ await FlushOrAddToPending(
290307
storedEffect.EffectId,
291308
storedEffect.StoredEffectId,
292309
storedEffect,
293-
flush: resiliency != ResiliencyLevel.AtLeastOnceDelayFlush
310+
flush: resiliency != ResiliencyLevel.AtLeastOnceDelayFlush,
311+
delete: false
294312
);
295313

296314
return result;
@@ -309,64 +327,79 @@ await FlushOrAddToPending(
309327
effectId,
310328
effectId.ToStoredEffectId(),
311329
storedEffect: null,
312-
flush
330+
flush,
331+
delete: true
313332
);
314333
}
315334

316-
private async Task FlushOrAddToPending(EffectId effectId, StoredEffectId storedEffectId, StoredEffect? storedEffect, bool flush)
335+
private async Task FlushOrAddToPending(EffectId effectId, StoredEffectId storedEffectId, StoredEffect? storedEffect, bool flush, bool delete)
317336
{
318-
if (flush)
319-
await Flush(storedEffectId, storedEffect);
320-
else
321-
lock (_sync)
322-
_pendingChanges[storedEffectId] = new PendingChange(storedEffectId, storedEffect);
323-
324337
lock (_sync)
325-
if (storedEffect == null)
326-
_effectResults.Remove(effectId);
338+
if (_effectResults.ContainsKey(effectId))
339+
{
340+
var existing = _effectResults[effectId];
341+
_effectResults[effectId] = existing with
342+
{
343+
StoredEffect = storedEffect,
344+
Operation = delete
345+
? CrudOperation.Delete
346+
: (existing.Existing ? CrudOperation.Update : CrudOperation.Insert)
347+
};
348+
}
327349
else
328-
_effectResults[effectId] = storedEffect;
350+
{
351+
_effectResults[effectId] = new PendingEffectChange(
352+
storedEffectId,
353+
storedEffect,
354+
CrudOperation.Insert,
355+
Existing: false
356+
);
357+
}
358+
359+
if (flush)
360+
await Flush();
329361
}
330362

331363
private readonly SemaphoreSlim _flushSync = new(initialCount: 1, maxCount: 1);
332-
private async Task Flush(StoredEffectId changedStoredId, StoredEffect? change)
364+
private async Task Flush()
333365
{
334366
await _flushSync.WaitAsync();
335367

336368
try
337369
{
338-
IReadOnlyList<PendingChange> pendingChanges;
370+
IReadOnlyList<PendingEffectChange> pendingChanges;
339371
lock (_sync)
340-
{
341-
if (_pendingChanges.Count == 0)
342-
pendingChanges = [];
343-
else
344-
{
345-
_pendingChanges[changedStoredId] = new PendingChange(changedStoredId, change);
346-
pendingChanges = _pendingChanges.Values.ToList();
347-
_pendingChanges.Clear();
348-
}
349-
}
372+
pendingChanges = _effectResults.Values.Where(r => r.Operation != null).ToList();
350373

351374
if (pendingChanges.Count == 0)
352-
{
353-
if (change == null)
354-
await _effectsStore.DeleteEffectResult(_storedId, changedStoredId);
355-
else
356-
await _effectsStore.SetEffectResult(_storedId, change);
357-
358375
return;
359-
}
360376

361377
var changes = pendingChanges
362-
.Select(pc => new StoredEffectChange(
363-
_storedId,
364-
pc.Id,
365-
pc.StoredEffect == null ? CrudOperation.Delete : CrudOperation.Upsert,
366-
pc.StoredEffect)
378+
.Select(pc =>
379+
new StoredEffectChange(
380+
_storedId,
381+
pc.Id,
382+
pc.Operation!.Value,
383+
pc.StoredEffect
384+
)
367385
).ToList();
386+
368387
await _effectsStore.SetEffectResults(_storedId, changes);
388+
389+
lock (_sync)
390+
foreach (var (key, value) in _effectResults.ToList())
391+
{
392+
if (value.Operation == CrudOperation.Delete)
393+
_effectResults.Remove(key);
394+
else
395+
_effectResults[key] = value with
396+
{
397+
Existing = true,
398+
Operation = null
399+
};
400+
}
369401
}
402+
370403
finally
371404
{
372405
_flushSync.Release();

Core/Cleipnir.ResilientFunctions/Domain/PendingChange.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
namespace Cleipnir.ResilientFunctions.Domain;
44

5-
public record PendingChange(StoredEffectId Id, StoredEffect? StoredEffect);
5+
public record PendingEffectChange(StoredEffectId Id, StoredEffect? StoredEffect, CrudOperation? Operation, bool Existing);

Core/Cleipnir.ResilientFunctions/Storage/InMemoryEffectsStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public Task SetEffectResult(StoredId storedId, StoredEffect storedEffect)
3636

3737
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffectChange> changes)
3838
{
39-
foreach (var storedEffect in changes.Where(c => c.Operation == CrudOperation.Upsert).Select(c => c.StoredEffect!))
39+
foreach (var storedEffect in changes.Where(c => c.Operation != CrudOperation.Delete).Select(c => c.StoredEffect!))
4040
await SetEffectResult(storedId, storedEffect);
4141

4242
foreach (var effectId in changes.Where(c => c.Operation == CrudOperation.Delete).Select(c => c.EffectId))

0 commit comments

Comments
 (0)