Skip to content

Commit 1a76bd1

Browse files
committed
Added UpdateEffects to SqlServer's SqlGenerator
1 parent 3c32dfe commit 1a76bd1

File tree

2 files changed

+86
-48
lines changed

2 files changed

+86
-48
lines changed

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlGenerator.cs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1+
using System;
12
using System.Collections.Generic;
3+
using System.Data.SqlTypes;
24
using System.Linq;
5+
using System.Text;
36
using Cleipnir.ResilientFunctions.Domain;
47
using Cleipnir.ResilientFunctions.Helpers;
58
using Cleipnir.ResilientFunctions.Storage;
9+
using Cleipnir.ResilientFunctions.Storage.Utils;
10+
using Microsoft.Data.SqlClient;
611

712
namespace Cleipnir.ResilientFunctions.SqlServer;
813

@@ -34,4 +39,68 @@ ELSE Expires
3439

3540
return sql;
3641
}
42+
43+
public static string UpdateEffects(SqlCommand command, IReadOnlyList<StoredEffectChange> changes, string tablePrefix, string paramPrefix)
44+
{
45+
var stringBuilder = new StringBuilder(capacity: 2);
46+
var upserts = changes
47+
.Where(c => c.Operation == CrudOperation.Upsert)
48+
.Select(c => new
49+
{
50+
Type = c.StoredId.Type.Value,
51+
Instance = c.StoredId.Instance.Value,
52+
StoredEffectId = c.EffectId.Value,
53+
WorkStatus = (int)c.StoredEffect!.WorkStatus,
54+
Result = c.StoredEffect!.Result,
55+
Exception = c.StoredEffect!.StoredException,
56+
EffectId = c.StoredEffect!.EffectId
57+
})
58+
.ToList();
59+
60+
var parameterValues = upserts
61+
.Select((_, i) => $"(@{paramPrefix}FlowType{i}, @{paramPrefix}FlowInstance{i}, @{paramPrefix}StoredId{i}, @{paramPrefix}EffectId{i}, @{paramPrefix}Status{i}, @{paramPrefix}Result{i}, @{paramPrefix}Exception{i})")
62+
.StringJoin(", ");
63+
64+
var setSql = $@"
65+
MERGE INTO {tablePrefix}_Effects
66+
USING (VALUES {parameterValues})
67+
AS source (FlowType, FlowInstance, StoredId, EffectId, Status, Result, Exception)
68+
ON {tablePrefix}_Effects.FlowType = source.FlowType AND {tablePrefix}_Effects.FlowInstance = source.FlowInstance AND {tablePrefix}_Effects.StoredId = source.StoredId
69+
WHEN MATCHED THEN
70+
UPDATE SET Status = source.Status, Result = source.Result, Exception = source.Exception
71+
WHEN NOT MATCHED THEN
72+
INSERT (FlowType, FlowInstance, StoredId, EffectId, Status, Result, Exception)
73+
VALUES (source.FlowType, source.FlowInstance, source.StoredId, source.EffectId, source.Status, source.Result, source.Exception);";
74+
75+
if (upserts.Any())
76+
stringBuilder.AppendLine(setSql);
77+
for (var i = 0; i < upserts.Count; i++)
78+
{
79+
var upsert = upserts[i];
80+
command.Parameters.AddWithValue($"@{paramPrefix}FlowType{i}", upsert.Type);
81+
command.Parameters.AddWithValue($"@{paramPrefix}FlowInstance{i}", upsert.Instance);
82+
command.Parameters.AddWithValue($"@{paramPrefix}StoredId{i}", upsert.StoredEffectId);
83+
command.Parameters.AddWithValue($"@{paramPrefix}EffectId{i}", upsert.EffectId.Serialize());
84+
command.Parameters.AddWithValue($"@{paramPrefix}Status{i}", upsert.WorkStatus);
85+
command.Parameters.AddWithValue($"@{paramPrefix}Result{i}", upsert.Result ?? (object) SqlBinary.Null);
86+
command.Parameters.AddWithValue($"@{paramPrefix}Exception{i}", JsonHelper.ToJson(upsert.Exception) ?? (object) DBNull.Value);
87+
}
88+
89+
var removes = changes
90+
.Where(c => c.Operation == CrudOperation.Delete)
91+
.Select(c => new { Type = c.StoredId.Type.Value, Instance = c.StoredId.Instance.Value, IdHash = c.EffectId.Value })
92+
.GroupBy(a => new {a.Type, a.Instance }, a => a.IdHash)
93+
.ToList();
94+
var predicates = removes
95+
.Select(r =>
96+
$"(FlowType = {r.Key.Type} AND FlowInstance = '{r.Key.Instance}' AND StoredId IN ({r.Select(id => $"'{id}'").StringJoin(", ")}))")
97+
.StringJoin($" OR {Environment.NewLine}");
98+
var removeSql = @$"
99+
DELETE FROM {tablePrefix}_effects
100+
WHERE {predicates}";
101+
if (removes.Any())
102+
stringBuilder.AppendLine(removeSql);
103+
104+
return stringBuilder.ToString();
105+
}
37106
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerEffectsStore.cs

Lines changed: 17 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -73,58 +73,27 @@ WHEN NOT MATCHED THEN
7373

7474
await command.ExecuteNonQueryAsync();
7575
}
76-
77-
private string? _setEffectResultsSql;
76+
7877
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> upsertEffects, IReadOnlyList<StoredEffectId> removeEffects)
7978
{
80-
if (upsertEffects.Count == 0 && removeEffects.Count == 0)
81-
return;
82-
if (upsertEffects.Count == 0)
83-
{
84-
await DeleteEffectResults(storedId, removeEffects);
85-
return;
86-
}
87-
79+
var changes = upsertEffects
80+
.Select(u => new StoredEffectChange(
81+
storedId,
82+
u.StoredEffectId,
83+
CrudOperation.Upsert,
84+
u
85+
))
86+
.Concat(
87+
removeEffects.Select(id =>
88+
new StoredEffectChange(storedId, id, CrudOperation.Delete, StoredEffect: null)
89+
)
90+
)
91+
.ToList();
8892
await using var conn = await CreateConnection();
89-
_setEffectResultsSql ??= $@"
90-
MERGE INTO {tablePrefix}_Effects
91-
USING (VALUES @VALUES)
92-
AS source (FlowType, FlowInstance, StoredId, EffectId, Status, Result, Exception)
93-
ON {tablePrefix}_Effects.FlowType = source.FlowType AND {tablePrefix}_Effects.FlowInstance = source.FlowInstance AND {tablePrefix}_Effects.StoredId = source.StoredId
94-
WHEN MATCHED THEN
95-
UPDATE SET Status = source.Status, Result = source.Result, Exception = source.Exception
96-
WHEN NOT MATCHED THEN
97-
INSERT (FlowType, FlowInstance, StoredId, EffectId, Status, Result, Exception)
98-
VALUES (source.FlowType, source.FlowInstance, source.StoredId, source.EffectId, source.Status, source.Result, source.Exception);";
99-
100-
var sql = _setEffectResultsSql.Replace(
101-
"@VALUES",
102-
"(@FlowType#, @FlowInstance#, @StoredId#, @EffectId#, @Status#, @Result#, @Exception#)"
103-
.Replicate(upsertEffects.Count)
104-
.Select((s, i) => s.Replace("#", i.ToString()))
105-
.StringJoin(", ")
106-
);
107-
108-
if (removeEffects.Count > 0)
109-
sql += Environment.NewLine +
110-
@$"DELETE FROM {tablePrefix}_Effects
111-
WHERE FlowType = {storedId.Type.Value} AND
112-
FlowInstance = '{storedId.Instance.Value}' AND
113-
StoredId IN ({removeEffects.Select(id => $"'{id.Value}'").StringJoin(", ")}) ";
93+
await using var command = new SqlCommand();
94+
command.Connection = conn;
95+
command.CommandText = SqlGenerator.UpdateEffects(command, changes, tablePrefix, paramPrefix: "");
11496

115-
await using var command = new SqlCommand(sql, conn);
116-
for (var i = 0; i < upsertEffects.Count; i++)
117-
{
118-
var storedEffect = upsertEffects[i];
119-
command.Parameters.AddWithValue($"@FlowType{i}", storedId.Type.Value);
120-
command.Parameters.AddWithValue($"@FlowInstance{i}", storedId.Instance.Value);
121-
command.Parameters.AddWithValue($"@StoredId{i}", storedEffect.StoredEffectId.Value);
122-
command.Parameters.AddWithValue($"@EffectId{i}", storedEffect.EffectId.Serialize());
123-
command.Parameters.AddWithValue($"@Status{i}", storedEffect.WorkStatus);
124-
command.Parameters.AddWithValue($"@Result{i}", storedEffect.Result ?? (object) SqlBinary.Null);
125-
command.Parameters.AddWithValue($"@Exception{i}", JsonHelper.ToJson(storedEffect.StoredException) ?? (object) DBNull.Value);
126-
}
127-
12897
await command.ExecuteNonQueryAsync();
12998
}
13099

0 commit comments

Comments
 (0)