Skip to content

Commit c44d1a7

Browse files
committed
Changed IFunctionStore's RestartExecution-method returning StoredFlowWithEffectsAndMessages
1 parent c929fde commit c44d1a7

File tree

13 files changed

+269
-116
lines changed

13 files changed

+269
-116
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdaterTests/LeaseUpdaterTestFunctionStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public Task<bool> CreateFunction(
4242
public Task BulkScheduleFunctions(IEnumerable<IdWithParam> functionsWithParam, StoredId? parent)
4343
=> _inner.BulkScheduleFunctions(functionsWithParam, parent);
4444

45-
public Task<StoredFlow?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
45+
public Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
4646
=> _inner.RestartExecution(storedId, expectedEpoch, leaseExpiration);
4747

4848
public Task<int> RenewLeases(IReadOnlyList<LeaseUpdate> leaseUpdates, long leaseExpiration)

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ public Task BulkScheduleFunctions(IEnumerable<IdWithParam> functionsWithParam, S
7373
? Task.FromException(new TimeoutException())
7474
: _inner.BulkScheduleFunctions(functionsWithParam, parent);
7575

76-
public Task<StoredFlow?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
76+
public Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
7777
=> _crashed
78-
? Task.FromException<StoredFlow?>(new TimeoutException())
78+
? Task.FromException<StoredFlowWithEffectsAndMessages?>(new TimeoutException())
7979
: _inner.RestartExecution(storedId, expectedEpoch, leaseExpiration);
8080

8181
public Task<int> RenewLeases(IReadOnlyList<LeaseUpdate> leaseUpdates, long leaseExpiration)

Core/Cleipnir.ResilientFunctions/CoreRuntime/Invocation/InvocationHelper.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ public async Task PublishCompletionMessageToParent(StoredId? parent, FlowId chil
215215
);
216216

217217
return sf != null
218-
? new RestartedFunction(sf, runningFunction)
218+
? new RestartedFunction(sf.StoredFlow, runningFunction) //todo extend this class as well
219219
: null;
220220
}
221221
catch

Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ Task BulkScheduleFunctions(
3535
StoredId? parent
3636
);
3737

38-
Task<StoredFlow?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration);
38+
Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration);
3939

4040
Task<int> RenewLeases(IReadOnlyList<LeaseUpdate> leaseUpdates, long leaseExpiration);
4141

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,23 +113,33 @@ public Task BulkScheduleFunctions(IEnumerable<IdWithParam> functionsWithParam, S
113113
return Task.CompletedTask;
114114
}
115115

116-
public virtual Task<StoredFlow?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
116+
public virtual async Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
117117
{
118118
lock (_sync)
119119
{
120120
if (!_states.ContainsKey(storedId))
121-
return default(StoredFlow).ToTask();
121+
return null;
122122

123123
var state = _states[storedId];
124124
if (state.Epoch != expectedEpoch)
125-
return default(StoredFlow).ToTask();
125+
return null;
126126

127127
state.Epoch += 1;
128128
state.Status = Status.Executing;
129129
state.Expires = leaseExpiration;
130130
state.Interrupted = false;
131-
return GetFunction(storedId);
132131
}
132+
var sf = await GetFunction(storedId);
133+
var effects = await EffectsStore.GetEffectResults(storedId);
134+
var messages = await MessageStore.GetMessages(storedId, skip: 0);
135+
return
136+
sf == null
137+
? null
138+
: new StoredFlowWithEffectsAndMessages(
139+
sf,
140+
effects,
141+
messages
142+
);
133143
}
134144

135145
public virtual Task<bool> RenewLease(StoredId storedId, int expectedEpoch, long leaseExpiration)

Core/Cleipnir.ResilientFunctions/Storage/Types.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using Cleipnir.ResilientFunctions.Domain;
4+
using Cleipnir.ResilientFunctions.Messaging;
45

56
namespace Cleipnir.ResilientFunctions.Storage;
67

@@ -126,6 +127,12 @@ public record StoredState(StateId StateId, byte[] StateJson);
126127
public record IdWithParam(StoredId StoredId, string HumanInstanceId, byte[]? Param);
127128
public record LeaseUpdate(StoredId StoredId, int ExpectedEpoch);
128129

130+
public record StoredFlowWithEffectsAndMessages(
131+
StoredFlow StoredFlow,
132+
IReadOnlyList<StoredEffect> Effects,
133+
IReadOnlyList<StoredMessage> Messages
134+
);
135+
129136
public static class StoredEffectExtensions
130137
{
131138
public static StoredEffectChange ToStoredChange(this StoredEffect effect, StoredId storedId, CrudOperation operation)

Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ public Task BulkScheduleFunctions(IEnumerable<IdWithParam> functionsWithParam, S
5555
? Task.FromException(new TimeoutException())
5656
: _inner.BulkScheduleFunctions(functionsWithParam, parent);
5757

58-
public Task<StoredFlow?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
58+
public Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
5959
=> _crashed
60-
? Task.FromException<StoredFlow?>(new TimeoutException())
60+
? Task.FromException<StoredFlowWithEffectsAndMessages?>(new TimeoutException())
6161
: _inner.RestartExecution(storedId, expectedEpoch, leaseExpiration);
6262

6363
public Task<int> RenewLeases(IReadOnlyList<LeaseUpdate> leaseUpdates, long leaseExpiration)

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs

Lines changed: 7 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -197,50 +197,20 @@ INSERT IGNORE INTO {_tablePrefix}
197197
await using var cmd = new MySqlCommand(sql, conn);
198198
cmd.ExecuteNonQuery();
199199
}
200-
201-
private string? _restartExecutionSql;
202-
public async Task<StoredFlow?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
200+
201+
public async Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
203202
{
203+
var storeCommand = _sqlGenerator.RestartExecution(storedId, expectedEpoch, leaseExpiration);
204204
await using var conn = await CreateOpenConnection(_connectionString);
205-
_restartExecutionSql ??= @$"
206-
UPDATE {_tablePrefix}
207-
SET epoch = epoch + 1, status = {(int)Status.Executing}, expires = ?, interrupted = FALSE
208-
WHERE type = ? AND instance = ? AND epoch = ?;
209-
SELECT
210-
param_json,
211-
status,
212-
result_json,
213-
exception_json,
214-
epoch,
215-
expires,
216-
interrupted,
217-
timestamp,
218-
human_instance_id,
219-
parent
220-
FROM {_tablePrefix}
221-
WHERE type = ? AND instance = ?;";
222-
223-
await using var command = new MySqlCommand(_restartExecutionSql, conn)
224-
{
225-
Parameters =
226-
{
227-
new() { Value = leaseExpiration },
228-
new() { Value = storedId.Type.Value },
229-
new() { Value = storedId.Instance.Value.ToString("N") },
230-
new() { Value = expectedEpoch },
231-
new() { Value = storedId.Type.Value },
232-
new() { Value = storedId.Instance.Value.ToString("N") },
233-
}
234-
};
235-
205+
await using var command = storeCommand.ToSqlCommand(conn);
236206
var reader = await command.ExecuteReaderAsync();
237207
if (reader.RecordsAffected == 0)
238-
return default;
208+
return null;
239209

240210
var sf = await ReadToStoredFunction(storedId, reader);
241211
return sf?.Epoch == expectedEpoch + 1
242-
? sf
243-
: default;
212+
? new StoredFlowWithEffectsAndMessages(sf, Effects: [], Messages: [])
213+
: null;
244214
}
245215

246216
public async Task<int> RenewLeases(IReadOnlyList<LeaseUpdate> leaseUpdates, long leaseExpiration)

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/SqlGenerator.cs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Cleipnir.ResilientFunctions.Messaging;
55
using Cleipnir.ResilientFunctions.Storage;
66
using Cleipnir.ResilientFunctions.Storage.Utils;
7+
using MySqlConnector;
78

89
namespace Cleipnir.ResilientFunctions.MariaDb;
910

@@ -282,6 +283,41 @@ public StoreCommand SuspendFunction(StoredId storedId, long timestamp, int expec
282283
]
283284
);
284285
}
286+
287+
private string? _restartExecutionSql;
288+
public StoreCommand RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
289+
{
290+
_restartExecutionSql ??= @$"
291+
UPDATE {tablePrefix}
292+
SET epoch = epoch + 1, status = {(int)Status.Executing}, expires = ?, interrupted = FALSE
293+
WHERE type = ? AND instance = ? AND epoch = ?;
294+
295+
SELECT
296+
param_json,
297+
status,
298+
result_json,
299+
exception_json,
300+
epoch,
301+
expires,
302+
interrupted,
303+
timestamp,
304+
human_instance_id,
305+
parent
306+
FROM {tablePrefix}
307+
WHERE type = ? AND instance = ?;";
308+
309+
var command = StoreCommand.Create(
310+
_restartExecutionSql,
311+
values: [
312+
leaseExpiration,
313+
storedId.Type.Value,
314+
storedId.Instance.Value.ToString("N"),
315+
expectedEpoch,
316+
storedId.Type.Value,
317+
storedId.Instance.Value.ToString("N"),
318+
]);
319+
return command;
320+
}
285321

286322
public StoreCommand AppendMessages(IReadOnlyList<StoredIdAndMessageWithPosition> messages)
287323
{
@@ -305,4 +341,43 @@ INSERT INTO {tablePrefix}_messages
305341

306342
return command;
307343
}
344+
345+
public async Task<StoredFlow?> ReadToStoredFunction(StoredId storedId, MySqlDataReader reader)
346+
{
347+
const int paramIndex = 0;
348+
const int statusIndex = 1;
349+
const int resultIndex = 2;
350+
const int exceptionIndex = 3;
351+
const int epochIndex = 4;
352+
const int expiresIndex = 5;
353+
const int interruptedIndex = 6;
354+
const int timestampIndex = 7;
355+
const int humanInstanceIdIndex = 8;
356+
const int parentIndex = 9;
357+
358+
while (await reader.ReadAsync())
359+
{
360+
var hasParam = !await reader.IsDBNullAsync(paramIndex);
361+
var hasResult = !await reader.IsDBNullAsync(resultIndex);
362+
var hasError = !await reader.IsDBNullAsync(exceptionIndex);
363+
var hasParent = !await reader.IsDBNullAsync(parentIndex);
364+
var storedException = hasError
365+
? JsonSerializer.Deserialize<StoredException>(reader.GetString(exceptionIndex))
366+
: null;
367+
return new StoredFlow(
368+
storedId,
369+
HumanInstanceId: reader.GetString(humanInstanceIdIndex),
370+
hasParam ? (byte[]) reader.GetValue(paramIndex) : null,
371+
Status: (Status) reader.GetInt32(statusIndex),
372+
Result: hasResult ? (byte[]) reader.GetValue(resultIndex) : null,
373+
storedException, Epoch: reader.GetInt32(epochIndex),
374+
Expires: reader.GetInt64(expiresIndex),
375+
Interrupted: reader.GetBoolean(interruptedIndex),
376+
Timestamp: reader.GetInt64(timestampIndex),
377+
ParentId: hasParent ? StoredId.Deserialize(reader.GetString(parentIndex)) : null
378+
);
379+
}
380+
381+
return null;
382+
}
308383
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs

Lines changed: 7 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -233,46 +233,19 @@ INSERT INTO {_tableName}
233233
}
234234
}
235235

236-
private string? _restartExecutionSql;
237-
public async Task<StoredFlow?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
236+
public async Task<StoredFlowWithEffectsAndMessages?> RestartExecution(StoredId storedId, int expectedEpoch, long leaseExpiration)
238237
{
238+
var storeCommand = _sqlGenerator.RestartExecution(storedId, expectedEpoch, leaseExpiration);
239239
await using var conn = await CreateConnection();
240-
241-
_restartExecutionSql ??= @$"
242-
UPDATE {_tableName}
243-
SET epoch = epoch + 1, status = {(int)Status.Executing}, expires = $1, interrupted = FALSE
244-
WHERE type = $2 AND instance = $3 AND epoch = $4
245-
RETURNING
246-
param_json,
247-
status,
248-
result_json,
249-
exception_json,
250-
expires,
251-
epoch,
252-
interrupted,
253-
timestamp,
254-
human_instance_id,
255-
parent";
256-
257-
await using var command = new NpgsqlCommand(_restartExecutionSql, conn)
258-
{
259-
Parameters =
260-
{
261-
new() { Value = leaseExpiration },
262-
new() { Value = storedId.Type.Value },
263-
new() { Value = storedId.Instance.Value },
264-
new() { Value = expectedEpoch },
265-
}
266-
};
267-
240+
await using var command = storeCommand.ToNpgsqlCommand(conn);
268241
await using var reader = await command.ExecuteReaderAsync();
269242
if (reader.RecordsAffected == 0)
270-
return default;
243+
return null;
271244

272-
var sf = await ReadToStoredFunction(storedId, reader);
245+
var sf = await _sqlGenerator.ReadToStoredFunction(storedId, reader);
273246
return sf?.Epoch == expectedEpoch + 1
274-
? sf
275-
: default;
247+
? new StoredFlowWithEffectsAndMessages(sf, Effects: [], Messages: [])
248+
: null;
276249
}
277250

278251
public async Task<int> RenewLeases(IReadOnlyList<LeaseUpdate> leaseUpdates, long leaseExpiration)

0 commit comments

Comments
 (0)