Skip to content

Commit 45c707a

Browse files
committed
Added BulkScheduleFunctions method to IFunctionStore
1 parent 4a66ae4 commit 45c707a

File tree

15 files changed

+215
-6
lines changed

15 files changed

+215
-6
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdaterTests/LeaseUpdaterTestFunctionStore.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ public Task<bool> CreateFunction(
3232
long? postponeUntil,
3333
long timestamp
3434
) => _inner.CreateFunction(functionId, param, leaseExpiration, postponeUntil, timestamp);
35-
35+
36+
public Task BulkScheduleFunctions(IEnumerable<FunctionIdWithParam> functionsWithParam)
37+
=> _inner.BulkScheduleFunctions(functionsWithParam);
38+
3639
public Task<StoredFunction?> RestartExecution(FunctionId functionId, int expectedEpoch, long leaseExpiration)
3740
=> _inner.RestartExecution(functionId, expectedEpoch, leaseExpiration);
3841

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,4 +171,8 @@ public override Task DefaultStateCanSetOnSucceedAndFetchedAfterwards()
171171
[TestMethod]
172172
public override Task SucceededFunctionsCanBeFetchedSuccessfully()
173173
=> SucceededFunctionsCanBeFetchedSuccessfully(FunctionStoreFactory.Create());
174+
175+
[TestMethod]
176+
public override Task BulkScheduleInsertsAllFunctionsSuccessfully()
177+
=> BulkScheduleInsertsAllFunctionsSuccessfully(FunctionStoreFactory.Create());
174178
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreTests.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1199,4 +1199,30 @@ await store.SucceedFunction(
11991199
succeededFunctions.Count.ShouldBe(1);
12001200
succeededFunctions.Single().ShouldBe(functionId1.InstanceId);
12011201
}
1202+
1203+
public abstract Task BulkScheduleInsertsAllFunctionsSuccessfully();
1204+
protected async Task BulkScheduleInsertsAllFunctionsSuccessfully(Task<IFunctionStore> storeTask)
1205+
{
1206+
var store = await storeTask;
1207+
1208+
var typeId = TestFunctionId.Create().TypeId;
1209+
var functionIds = Enumerable
1210+
.Range(0, 101)
1211+
.Select(_ => TestFunctionId.Create().WithTypeId(typeId))
1212+
.ToList();
1213+
1214+
await store.BulkScheduleFunctions(
1215+
functionIds.Select(functionId => new FunctionIdWithParam(functionId, Param: ""))
1216+
);
1217+
1218+
var eligibleFunctions = await store
1219+
.GetPostponedFunctions(typeId, DateTime.UtcNow.Ticks)
1220+
.ToListAsync();
1221+
1222+
eligibleFunctions.Count.ShouldBe(functionIds.Count);
1223+
foreach (var (_, instanceId) in functionIds)
1224+
{
1225+
eligibleFunctions.Any(f => f.InstanceId == instanceId).ShouldBeTrue();
1226+
}
1227+
}
12021228
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,12 @@ long timestamp
5353
postponeUntil,
5454
timestamp
5555
);
56-
56+
57+
public Task BulkScheduleFunctions(IEnumerable<FunctionIdWithParam> functionsWithParam)
58+
=> _crashed
59+
? Task.FromException(new TimeoutException())
60+
: _inner.BulkScheduleFunctions(functionsWithParam);
61+
5762
public Task<StoredFunction?> RestartExecution(FunctionId functionId, int expectedEpoch, long leaseExpiration)
5863
=> _crashed
5964
? Task.FromException<StoredFunction?>(new TimeoutException())

Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using System.Collections.Generic;
23
using System.Threading.Tasks;
34
using Cleipnir.ResilientFunctions.CoreRuntime.Invocation;
@@ -23,6 +24,10 @@ Task<bool> CreateFunction(
2324
long? postponeUntil,
2425
long timestamp
2526
);
27+
28+
Task BulkScheduleFunctions(
29+
IEnumerable<FunctionIdWithParam> functionsWithParam
30+
);
2631

2732
Task<StoredFunction?> RestartExecution(FunctionId functionId, int expectedEpoch, long leaseExpiration);
2833

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,33 @@ public virtual Task<bool> CreateFunction(
7373
return true.ToTask();
7474
}
7575
}
76-
76+
77+
public Task BulkScheduleFunctions(IEnumerable<FunctionIdWithParam> functionsWithParam)
78+
{
79+
lock (_sync)
80+
{
81+
foreach (var (functionId, param) in functionsWithParam)
82+
{
83+
if (!_states.ContainsKey(functionId))
84+
_states[functionId] = new InnerState
85+
{
86+
FunctionId = functionId,
87+
DefaultState = null,
88+
Epoch = 0,
89+
Exception = null,
90+
InterruptCount = 0,
91+
LeaseExpiration = 0,
92+
Param = param,
93+
PostponeUntil = 0,
94+
Result = null,
95+
Status = Status.Postponed
96+
};
97+
}
98+
}
99+
100+
return Task.CompletedTask;
101+
}
102+
77103
public virtual Task<StoredFunction?> RestartExecution(FunctionId functionId, int expectedEpoch, long leaseExpiration)
78104
{
79105
lock (_sync)

Core/Cleipnir.ResilientFunctions/Storage/Types.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,6 @@ public record StoredException(string ExceptionMessage, string? ExceptionStackTra
2323
public record StatusAndEpoch(Status Status, int Epoch);
2424

2525
public record StoredEffect(EffectId EffectId, WorkStatus WorkStatus, string? Result, StoredException? StoredException);
26-
public record StoredState(StateId StateId, string StateJson);
26+
public record StoredState(StateId StateId, string StateJson);
27+
28+
public record FunctionIdWithParam(FunctionId FunctionId, string? Param);

Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,12 @@ long timestamp
4242
postponeUntil,
4343
timestamp
4444
);
45-
45+
46+
public Task BulkScheduleFunctions(IEnumerable<FunctionIdWithParam> functionsWithParam)
47+
=> _crashed
48+
? Task.FromException(new TimeoutException())
49+
: _inner.BulkScheduleFunctions(functionsWithParam);
50+
4651
public Task<StoredFunction?> RestartExecution(FunctionId functionId, int expectedEpoch, long leaseExpiration)
4752
=> _crashed
4853
? Task.FromException<StoredFunction?>(new TimeoutException())

Stores/MySQL/Cleipnir.ResilientFunctions.MySQL.Tests/StoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,4 +162,8 @@ public override Task DefaultStateCanSetOnSucceedAndFetchedAfterwards()
162162
[TestMethod]
163163
public override Task SucceededFunctionsCanBeFetchedSuccessfully()
164164
=> SucceededFunctionsCanBeFetchedSuccessfully(FunctionStoreFactory.Create());
165+
166+
[TestMethod]
167+
public override Task BulkScheduleInsertsAllFunctionsSuccessfully()
168+
=> BulkScheduleInsertsAllFunctionsSuccessfully(FunctionStoreFactory.Create());
165169
}

Stores/MySQL/Cleipnir.ResilientFunctions.MySQL/DatabaseHelper.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,6 @@ public static async Task<MySqlConnection> CreateOpenConnection(string connection
3232
await conn.OpenAsync();
3333
return conn;
3434
}
35+
36+
public static string EscapeString(this string value) => MySqlHelper.EscapeString(value);
3537
}

Stores/MySQL/Cleipnir.ResilientFunctions.MySQL/MySqlFunctionStore.cs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
using System.Text.Json;
1+
using System.Data;
2+
using System.Text;
3+
using System.Text.Json;
24
using Cleipnir.ResilientFunctions.CoreRuntime.Invocation;
35
using Cleipnir.ResilientFunctions.Domain;
6+
using Cleipnir.ResilientFunctions.Helpers;
47
using Cleipnir.ResilientFunctions.Messaging;
58
using Cleipnir.ResilientFunctions.Storage;
69
using MySqlConnector;
@@ -140,6 +143,34 @@ INSERT IGNORE INTO {_tablePrefix}
140143
return affectedRows == 1;
141144
}
142145

146+
public async Task BulkScheduleFunctions(IEnumerable<FunctionIdWithParam> functionsWithParam)
147+
{
148+
var insertSql = @$"
149+
INSERT IGNORE INTO {_tablePrefix}
150+
(function_type_id, function_instance_id, param_json, status, epoch, lease_expiration, postponed_until, timestamp)
151+
VALUES
152+
";
153+
154+
var now = DateTime.UtcNow.Ticks;
155+
156+
var rows = new List<string>();
157+
foreach (var ((type, instance), param) in functionsWithParam)
158+
{
159+
var row = $"('{type.Value.EscapeString()}', '{instance.Value.EscapeString()}', {(param == null ? "NULL" : $"'{param.EscapeString()}'")}, {(int) Status.Postponed}, 0, 0, 0, {now})";
160+
rows.Add(row);
161+
}
162+
var rowsSql = string.Join(", " + Environment.NewLine, rows);
163+
var strBuilder = new StringBuilder(rowsSql.Length + 2);
164+
strBuilder.Append(insertSql);
165+
strBuilder.Append(rowsSql);
166+
strBuilder.Append(";");
167+
var sql = strBuilder.ToString();
168+
169+
await using var conn = await CreateOpenConnection(_connectionString);
170+
await using var cmd = new MySqlCommand(sql, conn);
171+
cmd.ExecuteNonQuery();
172+
}
173+
143174
private string? _restartExecutionSql;
144175
public async Task<StoredFunction?> RestartExecution(FunctionId functionId, int expectedEpoch, long leaseExpiration)
145176
{

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/StoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,8 @@ public override Task DefaultStateCanSetOnSucceedAndFetchedAfterwards()
164164
[TestMethod]
165165
public override Task SucceededFunctionsCanBeFetchedSuccessfully()
166166
=> SucceededFunctionsCanBeFetchedSuccessfully(FunctionStoreFactory.Create());
167+
168+
[TestMethod]
169+
public override Task BulkScheduleInsertsAllFunctionsSuccessfully()
170+
=> BulkScheduleInsertsAllFunctionsSuccessfully(FunctionStoreFactory.Create());
167171
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Linq;
34
using System.Text.Json;
45
using System.Threading.Tasks;
56
using Cleipnir.ResilientFunctions.CoreRuntime.Invocation;
@@ -161,6 +162,39 @@ INSERT INTO {_tablePrefix}
161162
return affectedRows == 1;
162163
}
163164

165+
private string? _bulkScheduleFunctionsSql;
166+
public async Task BulkScheduleFunctions(IEnumerable<FunctionIdWithParam> functionsWithParam)
167+
{
168+
_bulkScheduleFunctionsSql ??= @$"
169+
INSERT INTO {_tablePrefix}
170+
(function_type_id, function_instance_id, status, param_json, lease_expiration, postponed_until, timestamp)
171+
VALUES
172+
($1, $2, {(int) Status.Postponed}, $3, 0, 0, 0)
173+
ON CONFLICT DO NOTHING;";
174+
175+
await using var conn = await CreateConnection();
176+
var chunks = functionsWithParam.Chunk(100);
177+
foreach (var chunk in chunks)
178+
{
179+
await using var batch = new NpgsqlBatch(conn);
180+
foreach (var idWithParam in chunk)
181+
{
182+
var batchCommand = new NpgsqlBatchCommand(_bulkScheduleFunctionsSql)
183+
{
184+
Parameters =
185+
{
186+
new() { Value = idWithParam.FunctionId.TypeId.Value },
187+
new() { Value = idWithParam.FunctionId.InstanceId.Value },
188+
new() { Value = idWithParam.Param == null ? DBNull.Value : idWithParam.Param }
189+
}
190+
};
191+
batch.BatchCommands.Add(batchCommand);
192+
}
193+
194+
await batch.ExecuteNonQueryAsync();
195+
}
196+
}
197+
164198
private string? _restartExecutionSql;
165199
public async Task<StoredFunction?> RestartExecution(FunctionId functionId, int expectedEpoch, long leaseExpiration)
166200
{

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/StoreTests.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,4 +163,8 @@ public override Task DefaultStateCanSetOnSucceedAndFetchedAfterwards()
163163
[TestMethod]
164164
public override Task SucceededFunctionsCanBeFetchedSuccessfully()
165165
=> SucceededFunctionsCanBeFetchedSuccessfully(FunctionStoreFactory.Create());
166+
167+
[TestMethod]
168+
public override Task BulkScheduleInsertsAllFunctionsSuccessfully()
169+
=> BulkScheduleInsertsAllFunctionsSuccessfully(FunctionStoreFactory.Create());
166170
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Linq;
34
using System.Text.Json;
45
using System.Threading.Tasks;
56
using Cleipnir.ResilientFunctions.CoreRuntime.Invocation;
@@ -178,6 +179,59 @@ public async Task<bool> CreateFunction(
178179
return true;
179180
}
180181

182+
private string? _bulkScheduleFunctionsSql;
183+
public async Task BulkScheduleFunctions(IEnumerable<FunctionIdWithParam> functionsWithParam)
184+
{
185+
_bulkScheduleFunctionsSql ??= @$"
186+
MERGE INTO {_tablePrefix}
187+
USING (VALUES @VALUES)
188+
AS source (
189+
FunctionTypeId,
190+
FunctionInstanceId,
191+
ParamJson,
192+
Status,
193+
Epoch,
194+
LeaseExpiration,
195+
PostponedUntil,
196+
Timestamp
197+
)
198+
ON {_tablePrefix}.FunctionTypeId = source.FunctionTypeId AND {_tablePrefix}.FunctionInstanceId = source.FunctionInstanceId
199+
WHEN NOT MATCHED THEN
200+
INSERT (FunctionTypeId, FunctionInstanceId, ParamJson, Status, Epoch, LeaseExpiration, PostponedUntil, Timestamp)
201+
VALUES (source.FunctionTypeId, source.FunctionInstanceId, source.ParamJson, source.Status, source.Epoch, source.LeaseExpiration, source.PostponedUntil, source.Timestamp);";
202+
203+
var valueSql = $"(@FunctionTypeId, @FunctionInstanceId, @ParamJson, {(int)Status.Postponed}, 0, 0, 0, 0)";
204+
var chunk = functionsWithParam
205+
.Select(
206+
fp =>
207+
{
208+
var id = Guid.NewGuid().ToString("N");
209+
var sql = valueSql
210+
.Replace("@FunctionTypeId", $"@FunctionTypeId{id}")
211+
.Replace("@FunctionInstanceId", $"@FunctionInstanceId{id}")
212+
.Replace("@ParamJson", $"@ParamJson{id}");
213+
214+
return new { Id = id, Sql = sql, FunctionId = fp.FunctionId, Param = fp.Param };
215+
}).Chunk(100);
216+
217+
await using var conn = await _connFunc();
218+
foreach (var idAndSqls in chunk)
219+
{
220+
var valuesSql = string.Join($",{Environment.NewLine}", idAndSqls.Select(a => a.Sql));
221+
var sql = _bulkScheduleFunctionsSql.Replace("@VALUES", valuesSql);
222+
223+
await using var command = new SqlCommand(sql, conn);
224+
foreach (var idAndSql in idAndSqls)
225+
{
226+
command.Parameters.AddWithValue($"@FunctionTypeId{idAndSql.Id}", idAndSql.FunctionId.TypeId.Value);
227+
command.Parameters.AddWithValue($"@FunctionInstanceId{idAndSql.Id}", idAndSql.FunctionId.InstanceId.Value);
228+
command.Parameters.AddWithValue($"@ParamJson{idAndSql.Id}", idAndSql.Param == null ? DBNull.Value : idAndSql.Param);
229+
}
230+
231+
await command.ExecuteNonQueryAsync();
232+
}
233+
}
234+
181235
private string? _restartExecutionSql;
182236
public async Task<StoredFunction?> RestartExecution(FunctionId functionId, int expectedEpoch, long leaseExpiration)
183237
{

0 commit comments

Comments
 (0)