Skip to content

Commit d1a11f0

Browse files
committed
RestartFunction returns existing effects and messages
1 parent c44d1a7 commit d1a11f0

File tree

16 files changed

+468
-216
lines changed

16 files changed

+468
-216
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoreTests.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,4 +219,12 @@ public override Task FunctionCanBeCreatedWithMessagesOnly()
219219
[TestMethod]
220220
public override Task FunctionCanBeCreatedWithEffectsOnly()
221221
=> FunctionCanBeCreatedWithEffectsOnly(FunctionStoreFactory.Create());
222+
223+
[TestMethod]
224+
public override Task RestartExecutionReturnsEffectsAndMessages()
225+
=> RestartExecutionReturnsEffectsAndMessages(FunctionStoreFactory.Create());
226+
227+
[TestMethod]
228+
public override Task RestartExecutionWorksWithEmptyEffectsAndMessages()
229+
=> RestartExecutionWorksWithEmptyEffectsAndMessages(FunctionStoreFactory.Create());
222230
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreTests.cs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1723,4 +1723,88 @@ await store.CreateFunction(
17231723
messages: null
17241724
).ShouldBeFalseAsync();
17251725
}
1726+
1727+
public abstract Task RestartExecutionReturnsEffectsAndMessages();
1728+
protected async Task RestartExecutionReturnsEffectsAndMessages(Task<IFunctionStore> storeTask)
1729+
{
1730+
var functionId = TestStoredId.Create();
1731+
1732+
var store = await storeTask;
1733+
var paramJson = PARAM.ToJson();
1734+
1735+
await store.CreateFunction(
1736+
functionId,
1737+
"humanInstanceId",
1738+
paramJson.ToUtf8Bytes(),
1739+
leaseExpiration: DateTime.UtcNow.Ticks,
1740+
postponeUntil: null,
1741+
timestamp: DateTime.UtcNow.Ticks,
1742+
parent: null
1743+
).ShouldBeTrueAsync();
1744+
1745+
await store.MessageStore.AppendMessage(
1746+
functionId,
1747+
new StoredMessage(
1748+
"hallo message".ToUtf8Bytes(),
1749+
typeof(string).SimpleQualifiedName().ToUtf8Bytes()
1750+
)
1751+
);
1752+
1753+
await store.EffectsStore.SetEffectResult(
1754+
functionId,
1755+
new StoredEffect(
1756+
"Test".ToEffectId(),
1757+
"Test".ToEffectId().ToStoredEffectId(),
1758+
WorkStatus.Completed,
1759+
"hallo effect".ToUtf8Bytes(),
1760+
StoredException: null
1761+
)
1762+
);
1763+
1764+
var leaseExpiration = DateTime.UtcNow.Ticks;
1765+
var (sf, effects, messages) = await store
1766+
.RestartExecution(
1767+
functionId,
1768+
expectedEpoch: 0,
1769+
leaseExpiration
1770+
).ShouldNotBeNullAsync();
1771+
1772+
sf.StoredId.ShouldBe(functionId);
1773+
effects.Count.ShouldBe(1);
1774+
effects.Single().EffectId.Id.ShouldBe("Test");
1775+
effects.Single().Result!.ToStringFromUtf8Bytes().ShouldBe("hallo effect");
1776+
messages.Count.ShouldBe(1);
1777+
messages.Single().MessageContent.ToStringFromUtf8Bytes().ShouldBe("hallo message");
1778+
}
1779+
1780+
public abstract Task RestartExecutionWorksWithEmptyEffectsAndMessages();
1781+
protected async Task RestartExecutionWorksWithEmptyEffectsAndMessages(Task<IFunctionStore> storeTask)
1782+
{
1783+
var functionId = TestStoredId.Create();
1784+
1785+
var store = await storeTask;
1786+
var paramJson = PARAM.ToJson();
1787+
1788+
await store.CreateFunction(
1789+
functionId,
1790+
"humanInstanceId",
1791+
paramJson.ToUtf8Bytes(),
1792+
leaseExpiration: DateTime.UtcNow.Ticks,
1793+
postponeUntil: null,
1794+
timestamp: DateTime.UtcNow.Ticks,
1795+
parent: null
1796+
).ShouldBeTrueAsync();
1797+
1798+
var leaseExpiration = DateTime.UtcNow.Ticks;
1799+
var (sf, effects, messages) = await store
1800+
.RestartExecution(
1801+
functionId,
1802+
expectedEpoch: 0,
1803+
leaseExpiration
1804+
).ShouldNotBeNullAsync();
1805+
1806+
sf.StoredId.ShouldBe(functionId);
1807+
effects.Count.ShouldBe(0);
1808+
messages.Count.ShouldBe(0);
1809+
}
17261810
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/StoreTests.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,4 +210,13 @@ public override Task FunctionCanBeCreatedWithMessagesOnly()
210210
[TestMethod]
211211
public override Task FunctionCanBeCreatedWithEffectsOnly()
212212
=> FunctionCanBeCreatedWithEffectsOnly(FunctionStoreFactory.Create());
213+
214+
[TestMethod]
215+
public override Task RestartExecutionReturnsEffectsAndMessages()
216+
=> RestartExecutionReturnsEffectsAndMessages(FunctionStoreFactory.Create());
217+
218+
[TestMethod]
219+
public override Task RestartExecutionWorksWithEmptyEffectsAndMessages()
220+
=> RestartExecutionWorksWithEmptyEffectsAndMessages(FunctionStoreFactory.Create());
221+
213222
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbEffectsStore.cs

Lines changed: 4 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using Cleipnir.ResilientFunctions.Domain;
2-
using Cleipnir.ResilientFunctions.Helpers;
1+
using Cleipnir.ResilientFunctions.Helpers;
32
using Cleipnir.ResilientFunctions.Storage;
43
using Cleipnir.ResilientFunctions.Storage.Utils;
54
using MySqlConnector;
@@ -78,45 +77,14 @@ public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect
7877
await command.ExecuteNonQueryAsync();
7978
}
8079

81-
private string? _getEffectResultsSql;
8280
public async Task<IReadOnlyList<StoredEffect>> GetEffectResults(StoredId storedId)
8381
{
8482
await using var conn = await CreateConnection();
85-
_getEffectResultsSql ??= @$"
86-
SELECT id_hash, status, result, exception, effect_id
87-
FROM {tablePrefix}_effects
88-
WHERE type = ? AND instance = ?";
89-
await using var command = new MySqlCommand(_getEffectResultsSql, conn)
90-
{
91-
Parameters =
92-
{
93-
new() {Value = storedId.Type.Value},
94-
new() {Value = storedId.Instance.Value.ToString("N")},
95-
}
96-
};
97-
83+
await using var command = sqlGenerator.GetEffects(storedId).ToSqlCommand(conn);
9884
await using var reader = await command.ExecuteReaderAsync();
9985

100-
var functions = new List<StoredEffect>();
101-
while (await reader.ReadAsync())
102-
{
103-
var idHash = reader.GetString(0);
104-
var status = (WorkStatus) reader.GetInt32(1);
105-
var result = reader.IsDBNull(2) ? null : (byte[]) reader.GetValue(2);
106-
var exception = reader.IsDBNull(3) ? null : reader.GetString(3);
107-
var effectId = reader.GetString(4);
108-
functions.Add(
109-
new StoredEffect(
110-
EffectId.Deserialize(effectId),
111-
new StoredEffectId(Guid.Parse(idHash)),
112-
status,
113-
result,
114-
StoredException: JsonHelper.FromJson<StoredException>(exception)
115-
)
116-
);
117-
}
118-
119-
return functions;
86+
var effects = await sqlGenerator.ReadEffects(reader);
87+
return effects;
12088
}
12189

12290
private string? _deleteEffectResultSql;

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -200,17 +200,29 @@ INSERT IGNORE INTO {_tablePrefix}
200200

201201
public async Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
202202
{
203-
var storeCommand = _sqlGenerator.RestartExecution(storedId, expectedEpoch, leaseExpiration);
203+
var restartCommand = _sqlGenerator.RestartExecution(storedId, expectedEpoch, leaseExpiration);
204+
var effectsCommand = _sqlGenerator.GetEffects(storedId);
205+
var messagesCommand = _sqlGenerator.GetMessages(storedId, skip: 0);
206+
204207
await using var conn = await CreateOpenConnection(_connectionString);
205-
await using var command = storeCommand.ToSqlCommand(conn);
208+
await using var command = StoreCommand
209+
.Merge(restartCommand, effectsCommand, messagesCommand)
210+
.ToSqlCommand(conn);
211+
206212
var reader = await command.ExecuteReaderAsync();
207-
if (reader.RecordsAffected == 0)
213+
if (reader.RecordsAffected != 1)
208214
return null;
209-
215+
210216
var sf = await ReadToStoredFunction(storedId, reader);
211-
return sf?.Epoch == expectedEpoch + 1
212-
? new StoredFlowWithEffectsAndMessages(sf, Effects: [], Messages: [])
213-
: null;
217+
if (sf?.Epoch != expectedEpoch + 1)
218+
return null;
219+
await reader.NextResultAsync();
220+
221+
var effects = await _sqlGenerator.ReadEffects(reader);
222+
await reader.NextResultAsync();
223+
224+
var messages = await _sqlGenerator.ReadMessages(reader);
225+
return new StoredFlowWithEffectsAndMessages(sf, effects, messages);
214226
}
215227

216228
public async Task<int> RenewLeases(IReadOnlyList<LeaseUpdate> leaseUpdates, long leaseExpiration)

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbMessageStore.cs

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -197,36 +197,17 @@ DELETE FROM {_tablePrefix}_messages
197197
await command.ExecuteNonQueryAsync();
198198
}
199199

200-
private string? _getMessagesSql;
201200
public async Task<IReadOnlyList<StoredMessage>> GetMessages(StoredId storedId, int skip)
202201
{
203202
await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString);
204-
_getMessagesSql ??= @$"
205-
SELECT message_json, message_type, idempotency_key
206-
FROM {_tablePrefix}_messages
207-
WHERE type = ? AND instance = ? AND position >= ?
208-
ORDER BY position ASC;";
209-
await using var command = new MySqlCommand(_getMessagesSql, conn)
210-
{
211-
Parameters =
212-
{
213-
new() {Value = storedId.Type.Value},
214-
new() {Value = storedId.Instance.Value.ToString("N")},
215-
new () {Value = skip}
216-
}
217-
};
203+
await using var command = _sqlGenerator
204+
.GetMessages(storedId, skip)
205+
.ToSqlCommand(conn);
218206

219-
var storedMessages = new List<StoredMessage>();
220207
await using var reader = await command.ExecuteReaderAsync();
221-
while (await reader.ReadAsync())
222-
{
223-
var messageJson = (byte[]) reader.GetValue(0);
224-
var messageType = (byte[]) reader.GetValue(1);
225-
var idempotencyKey = reader.IsDBNull(2) ? null : reader.GetString(2);
226-
storedMessages.Add(new StoredMessage(messageJson, messageType, idempotencyKey));
227-
}
228208

229-
return storedMessages;
209+
var messages = await _sqlGenerator.ReadMessages(reader);
210+
return messages;
230211
}
231212

232213
public async Task<IDictionary<StoredId, int>> GetMaxPositions(IReadOnlyList<StoredId> storedIds)

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs

Lines changed: 102 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,50 @@ ELSE expires
3737
return StoreCommand.Create(sql);
3838
}
3939

40+
private string? _getEffectResultsSql;
41+
public StoreCommand GetEffects(StoredId storedId)
42+
{
43+
_getEffectResultsSql ??= @$"
44+
SELECT id_hash, status, result, exception, effect_id
45+
FROM {tablePrefix}_effects
46+
WHERE type = ? AND instance = ?;";
47+
48+
var command = StoreCommand.Create(
49+
_getEffectResultsSql,
50+
values:
51+
[
52+
storedId.Type.Value,
53+
storedId.Instance.Value.ToString("N")
54+
]
55+
);
56+
return command;
57+
}
58+
59+
public async Task<IReadOnlyList<StoredEffect>> ReadEffects(MySqlDataReader reader)
60+
{
61+
var functions = new List<StoredEffect>();
62+
while (await reader.ReadAsync())
63+
{
64+
var idHash = reader.GetString(0);
65+
var status = (WorkStatus) reader.GetInt32(1);
66+
var result = reader.IsDBNull(2) ? null : (byte[]) reader.GetValue(2);
67+
var exception = reader.IsDBNull(3) ? null : reader.GetString(3);
68+
var effectId = reader.GetString(4);
69+
functions.Add(
70+
new StoredEffect(
71+
EffectId.Deserialize(effectId),
72+
new StoredEffectId(Guid.Parse(idHash)),
73+
status,
74+
result,
75+
StoredException: JsonHelper.FromJson<StoredException>(exception)
76+
)
77+
);
78+
}
79+
80+
return functions;
81+
}
82+
83+
4084
public StoreCommand UpdateEffects(IReadOnlyList<StoredEffectChange> changes)
4185
{
4286
var upsertCommand = default(StoreCommand);
@@ -318,29 +362,6 @@ public StoreCommand RestartExecution(StoredId storedId, int expectedEpoch, long
318362
]);
319363
return command;
320364
}
321-
322-
public StoreCommand AppendMessages(IReadOnlyList<StoredIdAndMessageWithPosition> messages)
323-
{
324-
var sql = @$"
325-
INSERT INTO {tablePrefix}_messages
326-
(type, instance, position, message_json, message_type, idempotency_key)
327-
VALUES
328-
{"(?, ?, ?, ?, ?, ?)".Replicate(messages.Count).StringJoin($",{Environment.NewLine}")};";
329-
330-
var command = StoreCommand.Create(sql);
331-
foreach (var (storedId, (messageContent, messageType, idempotencyKey), position) in messages)
332-
{
333-
var (storedType, storedInstance) = storedId;
334-
command.AddParameter(storedType.Value);
335-
command.AddParameter(storedInstance.Value.ToString("N"));
336-
command.AddParameter(position);
337-
command.AddParameter(messageContent);
338-
command.AddParameter(messageType);
339-
command.AddParameter(idempotencyKey ?? (object)DBNull.Value);
340-
}
341-
342-
return command;
343-
}
344365

345366
public async Task<StoredFlow?> ReadToStoredFunction(StoredId storedId, MySqlDataReader reader)
346367
{
@@ -380,4 +401,62 @@ INSERT INTO {tablePrefix}_messages
380401

381402
return null;
382403
}
404+
405+
public StoreCommand AppendMessages(IReadOnlyList<StoredIdAndMessageWithPosition> messages)
406+
{
407+
var sql = @$"
408+
INSERT INTO {tablePrefix}_messages
409+
(type, instance, position, message_json, message_type, idempotency_key)
410+
VALUES
411+
{"(?, ?, ?, ?, ?, ?)".Replicate(messages.Count).StringJoin($",{Environment.NewLine}")};";
412+
413+
var command = StoreCommand.Create(sql);
414+
foreach (var (storedId, (messageContent, messageType, idempotencyKey), position) in messages)
415+
{
416+
var (storedType, storedInstance) = storedId;
417+
command.AddParameter(storedType.Value);
418+
command.AddParameter(storedInstance.Value.ToString("N"));
419+
command.AddParameter(position);
420+
command.AddParameter(messageContent);
421+
command.AddParameter(messageType);
422+
command.AddParameter(idempotencyKey ?? (object)DBNull.Value);
423+
}
424+
425+
return command;
426+
}
427+
428+
private string? _getMessagesSql;
429+
public StoreCommand GetMessages(StoredId storedId, int skip)
430+
{
431+
_getMessagesSql ??= @$"
432+
SELECT message_json, message_type, idempotency_key
433+
FROM {tablePrefix}_messages
434+
WHERE type = ? AND instance = ? AND position >= ?
435+
ORDER BY position ASC;";
436+
437+
var command = StoreCommand.Create(
438+
_getMessagesSql,
439+
values:
440+
[
441+
storedId.Type.Value,
442+
storedId.Instance.Value.ToString("N"),
443+
skip
444+
]
445+
);
446+
return command;
447+
}
448+
449+
public async Task<IReadOnlyList<StoredMessage>> ReadMessages(MySqlDataReader reader)
450+
{
451+
var storedMessages = new List<StoredMessage>();
452+
while (await reader.ReadAsync())
453+
{
454+
var messageJson = (byte[]) reader.GetValue(0);
455+
var messageType = (byte[]) reader.GetValue(1);
456+
var idempotencyKey = reader.IsDBNull(2) ? null : reader.GetString(2);
457+
storedMessages.Add(new StoredMessage(messageJson, messageType, idempotencyKey));
458+
}
459+
460+
return storedMessages;
461+
}
383462
}

0 commit comments

Comments
 (0)