Skip to content

Commit 3c32dfe

Browse files
committed
Added UpdateEffects to MariaDB's SqlGenerator
1 parent d4b0026 commit 3c32dfe

File tree

2 files changed

+77
-40
lines changed

2 files changed

+77
-40
lines changed

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbEffectsStore.cs

Lines changed: 19 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -64,51 +64,30 @@ ON DUPLICATE KEY UPDATE
6464

6565
await command.ExecuteNonQueryAsync();
6666
}
67-
68-
private string? _setEffectResultsSql;
67+
6968
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> upsertEffects, IReadOnlyList<StoredEffectId> removeEffects)
7069
{
71-
if (upsertEffects.Count == 0 && removeEffects.Count == 0)
72-
return;
73-
if (upsertEffects.Count == 0)
74-
{
75-
await DeleteEffectResults(storedId, removeEffects);
76-
return;
77-
}
70+
var changes = upsertEffects
71+
.Select(u => new StoredEffectChange(
72+
storedId,
73+
u.StoredEffectId,
74+
CrudOperation.Upsert,
75+
u
76+
))
77+
.Concat(
78+
removeEffects.Select(id =>
79+
new StoredEffectChange(storedId, id, CrudOperation.Delete, StoredEffect: null)
80+
)
81+
)
82+
.ToList();
7883

7984
await using var conn = await CreateConnection();
80-
_setEffectResultsSql ??= $@"
81-
INSERT INTO {tablePrefix}_effects
82-
(type, instance, id_hash, status, result, exception, effect_id)
83-
VALUES
84-
@VALUES
85-
ON DUPLICATE KEY UPDATE
86-
status = VALUES(status), result = VALUES(result), exception = VALUES(exception);";
87-
88-
var sql = _setEffectResultsSql.Replace(
89-
"@VALUES",
90-
"(?, ?, ?, ?, ?, ?, ?)".Replicate(upsertEffects.Count).StringJoin(", ")
91-
);
92-
93-
if (removeEffects.Count > 0)
94-
sql += Environment.NewLine +
95-
@$"DELETE FROM {tablePrefix}_effects
96-
WHERE type = {storedId.Type.Value} AND
97-
instance = '{storedId.Instance.Value:N}' AND
98-
id_hash IN ({removeEffects.Select(id => $"'{id.Value:N}'").StringJoin(", ")});";
99-
100-
await using var command = new MySqlCommand(sql, conn);
101-
foreach (var storedEffect in upsertEffects)
102-
{
103-
command.Parameters.Add(new MySqlParameter(name: null, storedId.Type.Value));
104-
command.Parameters.Add(new MySqlParameter(name: null, storedId.Instance.Value.ToString("N")));
105-
command.Parameters.Add(new MySqlParameter(name: null, storedEffect.StoredEffectId.Value.ToString("N")));
106-
command.Parameters.Add(new MySqlParameter(name: null, (int) storedEffect.WorkStatus));
107-
command.Parameters.Add(new MySqlParameter(name: null, storedEffect.Result ?? (object) DBNull.Value));
108-
command.Parameters.Add(new MySqlParameter(name: null, JsonHelper.ToJson(storedEffect.StoredException) ?? (object) DBNull.Value));
109-
command.Parameters.Add(new MySqlParameter(name: null, storedEffect.EffectId.Serialize()));
110-
}
85+
await using var command = new MySqlCommand();
86+
command.Connection = conn;
11187

88+
var sql = SqlGenerator.UpdateEffects(command, changes, tablePrefix);
89+
command.CommandText = sql;
90+
11291
await command.ExecuteNonQueryAsync();
11392
}
11493

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
using System.Text;
12
using Cleipnir.ResilientFunctions.Domain;
23
using Cleipnir.ResilientFunctions.Helpers;
34
using Cleipnir.ResilientFunctions.Storage;
5+
using Cleipnir.ResilientFunctions.Storage.Utils;
6+
using MySqlConnector;
47

58
namespace Cleipnir.ResilientFunctions.MariaDb;
69

@@ -34,4 +37,59 @@ ELSE expires
3437

3538
return sql;
3639
}
40+
41+
public static string UpdateEffects(MySqlCommand command, IReadOnlyList<StoredEffectChange> changes, string tablePrefix)
42+
{
43+
var stringBuilder = new StringBuilder(capacity: 2);
44+
var upserts = changes
45+
.Where(c => c.Operation == CrudOperation.Upsert)
46+
.Select(c => new
47+
{
48+
Type = c.StoredId.Type.Value,
49+
Instance = c.StoredId.Instance.Value,
50+
IdHash = c.EffectId.Value,
51+
WorkStatus = (int)c.StoredEffect!.WorkStatus,
52+
Result = c.StoredEffect!.Result,
53+
Exception = c.StoredEffect!.StoredException,
54+
EffectId = c.StoredEffect!.EffectId
55+
})
56+
.ToList();
57+
58+
var setSql = $@"
59+
INSERT INTO {tablePrefix}_effects
60+
(type, instance, id_hash, status, result, exception, effect_id)
61+
VALUES
62+
{"(?, ?, ?, ?, ?, ?, ?)".Replicate(upserts.Count).StringJoin(", ")}
63+
ON DUPLICATE KEY UPDATE
64+
status = VALUES(status), result = VALUES(result), exception = VALUES(exception);";
65+
if (upserts.Any())
66+
stringBuilder.AppendLine(setSql);
67+
foreach (var a in upserts)
68+
{
69+
command.Parameters.Add(new MySqlParameter(name: null, a.Type));
70+
command.Parameters.Add(new MySqlParameter(name: null, a.Instance.ToString("N")));
71+
command.Parameters.Add(new MySqlParameter(name: null, a.IdHash.ToString("N")));
72+
command.Parameters.Add(new MySqlParameter(name: null, a.WorkStatus));
73+
command.Parameters.Add(new MySqlParameter(name: null, a.Result ?? (object) DBNull.Value));
74+
command.Parameters.Add(new MySqlParameter(name: null, JsonHelper.ToJson(a.Exception) ?? (object) DBNull.Value));
75+
command.Parameters.Add(new MySqlParameter(name: null, a.EffectId.Serialize()));
76+
}
77+
78+
var removes = changes
79+
.Where(c => c.Operation == CrudOperation.Delete)
80+
.Select(c => new { Type = c.StoredId.Type.Value, Instance = c.StoredId.Instance.Value, IdHash = c.EffectId.Value })
81+
.GroupBy(a => new {a.Type, a.Instance }, a => a.IdHash)
82+
.ToList();
83+
var predicates = removes
84+
.Select(r =>
85+
$"(type = {r.Key.Type} AND instance = '{r.Key.Instance:N}' AND id_hash IN ({r.Select(id => $"'{id:N}'").StringJoin(", ")}))")
86+
.StringJoin($" OR {Environment.NewLine}");
87+
var removeSql = @$"
88+
DELETE FROM {tablePrefix}_effects
89+
WHERE {predicates}";
90+
if (removes.Any())
91+
stringBuilder.AppendLine(removeSql);
92+
93+
return stringBuilder.ToString();
94+
}
3795
}

0 commit comments

Comments
 (0)