Skip to content

Commit 7f0abd1

Browse files
committed
Support batch write for Succeed-method
1 parent ec56e56 commit 7f0abd1

File tree

9 files changed

+218
-63
lines changed

9 files changed

+218
-63
lines changed

Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/StoreTests.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,4 +239,16 @@ public override Task MessagesOnlyArePersistedOnSuspendFunction()
239239
[TestMethod]
240240
public override Task EffectsOnlyArePersistedOnSuspendFunction()
241241
=> EffectsOnlyArePersistedOnSuspendFunction(FunctionStoreFactory.Create());
242+
243+
[TestMethod]
244+
public override Task EffectsAndMessagesArePersistedOnSucceededFunction()
245+
=> EffectsAndMessagesArePersistedOnSucceededFunction(FunctionStoreFactory.Create());
246+
247+
[TestMethod]
248+
public override Task MessagesOnlyArePersistedOnSucceedFunction()
249+
=> MessagesOnlyArePersistedOnSucceedFunction(FunctionStoreFactory.Create());
250+
251+
[TestMethod]
252+
public override Task EffectsOnlyArePersistedOnSucceedFunction()
253+
=> EffectsOnlyArePersistedOnSucceedFunction(FunctionStoreFactory.Create());
242254
}

Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/StoreTests.cs

Lines changed: 99 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1864,6 +1864,105 @@ await SharedEffectsAndMessagesArePersistedOnFunctionPersist(
18641864
);
18651865
}
18661866

1867+
public abstract Task MessagesOnlyArePersistedOnSuspendFunction();
1868+
protected async Task MessagesOnlyArePersistedOnSuspendFunction(Task<IFunctionStore> storeTask)
1869+
{
1870+
await SharedEffectsAndMessagesArePersistedOnFunctionPersist(
1871+
storeTask,
1872+
storeFunc: (store, id, effects, messages, complimentaryState) =>
1873+
store.SuspendFunction(
1874+
id,
1875+
DateTime.UtcNow.Ticks,
1876+
expectedEpoch: 0,
1877+
effects,
1878+
messages,
1879+
complimentaryState
1880+
),
1881+
effects: false,
1882+
messages: true
1883+
);
1884+
}
1885+
1886+
public abstract Task EffectsOnlyArePersistedOnSuspendFunction();
1887+
protected async Task EffectsOnlyArePersistedOnSuspendFunction(Task<IFunctionStore> storeTask)
1888+
{
1889+
await SharedEffectsAndMessagesArePersistedOnFunctionPersist(
1890+
storeTask,
1891+
storeFunc: (store, id, effects, messages, complimentaryState) =>
1892+
store.SuspendFunction(
1893+
id,
1894+
DateTime.UtcNow.Ticks,
1895+
expectedEpoch: 0,
1896+
effects,
1897+
messages,
1898+
complimentaryState
1899+
),
1900+
effects: true,
1901+
messages: false
1902+
);
1903+
}
1904+
1905+
//
1906+
public abstract Task EffectsAndMessagesArePersistedOnSucceededFunction();
1907+
protected async Task EffectsAndMessagesArePersistedOnSucceededFunction(Task<IFunctionStore> storeTask)
1908+
{
1909+
await SharedEffectsAndMessagesArePersistedOnFunctionPersist(
1910+
storeTask,
1911+
storeFunc: (store, id, effects, messages, complimentaryState) =>
1912+
store.SucceedFunction(
1913+
id,
1914+
result: null,
1915+
DateTime.UtcNow.Ticks,
1916+
expectedEpoch: 0,
1917+
effects,
1918+
messages,
1919+
complimentaryState
1920+
),
1921+
effects: true,
1922+
messages: true
1923+
);
1924+
}
1925+
1926+
public abstract Task MessagesOnlyArePersistedOnSucceedFunction();
1927+
protected async Task MessagesOnlyArePersistedOnSucceedFunction(Task<IFunctionStore> storeTask)
1928+
{
1929+
await SharedEffectsAndMessagesArePersistedOnFunctionPersist(
1930+
storeTask,
1931+
storeFunc: (store, id, effects, messages, complimentaryState) =>
1932+
store.SucceedFunction(
1933+
id,
1934+
result: null,
1935+
DateTime.UtcNow.Ticks,
1936+
expectedEpoch: 0,
1937+
effects,
1938+
messages,
1939+
complimentaryState
1940+
),
1941+
effects: false,
1942+
messages: true
1943+
);
1944+
}
1945+
1946+
public abstract Task EffectsOnlyArePersistedOnSucceedFunction();
1947+
protected async Task EffectsOnlyArePersistedOnSucceedFunction(Task<IFunctionStore> storeTask)
1948+
{
1949+
await SharedEffectsAndMessagesArePersistedOnFunctionPersist(
1950+
storeTask,
1951+
storeFunc: (store, id, effects, messages, complimentaryState) =>
1952+
store.SucceedFunction(
1953+
id,
1954+
result: null,
1955+
DateTime.UtcNow.Ticks,
1956+
expectedEpoch: 0,
1957+
effects,
1958+
messages,
1959+
complimentaryState
1960+
),
1961+
effects: true,
1962+
messages: false
1963+
);
1964+
}
1965+
18671966
private async Task SharedEffectsAndMessagesArePersistedOnFunctionPersist(
18681967
Task<IFunctionStore> storeTask,
18691968
Func<IFunctionStore, StoredId, List<StoredEffectChange>?, List<StoredMessage>?, ComplimentaryState, Task<bool>> storeFunc,
@@ -1934,42 +2033,4 @@ await storeFunc(
19342033
fetchedMessages.Count.ShouldBe(0);
19352034
}
19362035
}
1937-
1938-
public abstract Task MessagesOnlyArePersistedOnSuspendFunction();
1939-
protected async Task MessagesOnlyArePersistedOnSuspendFunction(Task<IFunctionStore> storeTask)
1940-
{
1941-
await SharedEffectsAndMessagesArePersistedOnFunctionPersist(
1942-
storeTask,
1943-
storeFunc: (store, id, effects, messages, complimentaryState) =>
1944-
store.SuspendFunction(
1945-
id,
1946-
DateTime.UtcNow.Ticks,
1947-
expectedEpoch: 0,
1948-
effects,
1949-
messages,
1950-
complimentaryState
1951-
),
1952-
effects: false,
1953-
messages: true
1954-
);
1955-
}
1956-
1957-
public abstract Task EffectsOnlyArePersistedOnSuspendFunction();
1958-
protected async Task EffectsOnlyArePersistedOnSuspendFunction(Task<IFunctionStore> storeTask)
1959-
{
1960-
await SharedEffectsAndMessagesArePersistedOnFunctionPersist(
1961-
storeTask,
1962-
storeFunc: (store, id, effects, messages, complimentaryState) =>
1963-
store.SuspendFunction(
1964-
id,
1965-
DateTime.UtcNow.Ticks,
1966-
expectedEpoch: 0,
1967-
effects,
1968-
messages,
1969-
complimentaryState
1970-
),
1971-
effects: true,
1972-
messages: false
1973-
);
1974-
}
19752036
}

Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,11 @@ public Task<bool> SucceedFunction(
242242
var state = _states[storedId];
243243
if (state.Epoch != expectedEpoch) return false.ToTask();
244244

245+
if (effects != null)
246+
_effectsStore.SetEffectResults(storedId, effects).Wait();
247+
if (messages != null)
248+
MessageStore.AppendMessages(messages.Select(msg => new StoredIdAndMessage(storedId, msg)).ToList(), interrupt: false).Wait();
249+
245250
state.Status = Status.Succeeded;
246251
state.Result = result;
247252
state.Timestamp = timestamp;

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/StoreTests.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,4 +230,16 @@ public override Task MessagesOnlyArePersistedOnSuspendFunction()
230230
[TestMethod]
231231
public override Task EffectsOnlyArePersistedOnSuspendFunction()
232232
=> EffectsOnlyArePersistedOnSuspendFunction(FunctionStoreFactory.Create());
233+
234+
[TestMethod]
235+
public override Task EffectsAndMessagesArePersistedOnSucceededFunction()
236+
=> EffectsAndMessagesArePersistedOnSucceededFunction(FunctionStoreFactory.Create());
237+
238+
[TestMethod]
239+
public override Task MessagesOnlyArePersistedOnSucceedFunction()
240+
=> MessagesOnlyArePersistedOnSucceedFunction(FunctionStoreFactory.Create());
241+
242+
[TestMethod]
243+
public override Task EffectsOnlyArePersistedOnSucceedFunction()
244+
=> EffectsOnlyArePersistedOnSucceedFunction(FunctionStoreFactory.Create());
233245
}

Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -349,13 +349,29 @@ public async Task<bool> SucceedFunction(
349349
IReadOnlyList<StoredMessage>? messages,
350350
ComplimentaryState complimentaryState)
351351
{
352-
await using var conn = await CreateOpenConnection(_connectionString);
353-
await using var command = _sqlGenerator
354-
.SucceedFunction(storedId, result, timestamp, expectedEpoch)
355-
.ToSqlCommand(conn);
352+
var suspendCommand = _sqlGenerator.SucceedFunction(storedId, result, timestamp, expectedEpoch);
353+
var effectsCommand = effects == null
354+
? null
355+
: _sqlGenerator.UpsertEffects(effects);
356+
var messagesCommand = messages == null
357+
? []
358+
: messages.Select(msg => _sqlGenerator.AppendMessage(storedId, msg));
356359

357-
var affectedRows = await command.ExecuteNonQueryAsync();
358-
return affectedRows == 1;
360+
await using var conn = await CreateOpenConnection(_connectionString);
361+
if (effects == null && messages == null)
362+
{
363+
await using var command = suspendCommand.ToSqlCommand(conn);
364+
var affectedRows = await command.ExecuteNonQueryAsync();
365+
return affectedRows == 1;
366+
}
367+
else
368+
{
369+
await using var command = StoreCommand.Merge(
370+
messagesCommand.Append(effectsCommand).Append(suspendCommand)
371+
)!.ToSqlCommand(conn);
372+
var affectedRows = await command.ExecuteNonQueryAsync();
373+
return affectedRows == 1 + (effects?.Count ?? 0) + (messages?.Count ?? 0);
374+
}
359375
}
360376

361377
public async Task<bool> PostponeFunction(

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/StoreTests.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,4 +232,16 @@ public override Task MessagesOnlyArePersistedOnSuspendFunction()
232232
[TestMethod]
233233
public override Task EffectsOnlyArePersistedOnSuspendFunction()
234234
=> EffectsOnlyArePersistedOnSuspendFunction(FunctionStoreFactory.Create());
235+
236+
[TestMethod]
237+
public override Task EffectsAndMessagesArePersistedOnSucceededFunction()
238+
=> EffectsAndMessagesArePersistedOnSucceededFunction(FunctionStoreFactory.Create());
239+
240+
[TestMethod]
241+
public override Task MessagesOnlyArePersistedOnSucceedFunction()
242+
=> MessagesOnlyArePersistedOnSucceedFunction(FunctionStoreFactory.Create());
243+
244+
[TestMethod]
245+
public override Task EffectsOnlyArePersistedOnSucceedFunction()
246+
=> EffectsOnlyArePersistedOnSucceedFunction(FunctionStoreFactory.Create());
235247
}

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -382,16 +382,32 @@ public async Task<bool> SucceedFunction(
382382
IReadOnlyList<StoredMessage>? messages,
383383
ComplimentaryState complimentaryState)
384384
{
385-
await using var conn = await CreateConnection();
386-
await using var command = _sqlGenerator.SucceedFunction(
387-
storedId,
388-
result,
389-
timestamp,
390-
expectedEpoch
391-
).ToNpgsqlCommand(conn);
392-
393-
var affectedRows = await command.ExecuteNonQueryAsync();
394-
return affectedRows == 1;
385+
var suspendCommand = _sqlGenerator.SucceedFunction(storedId, result, timestamp, expectedEpoch);
386+
var effectsCommand = effects == null
387+
? []
388+
: _sqlGenerator.UpdateEffects(effects);
389+
var messagesCommand = messages == null
390+
? []
391+
: messages.Select(msg => _sqlGenerator.AppendMessage(storedId, msg)).ToList();
392+
393+
if (effects == null && messages == null)
394+
{
395+
await using var conn = await CreateConnection();
396+
await using var command = suspendCommand.ToNpgsqlCommand(conn);
397+
var affectedRows = await command.ExecuteNonQueryAsync();
398+
return affectedRows == 1;
399+
}
400+
else
401+
{
402+
await using var conn = await CreateConnection();
403+
await using var command = effectsCommand
404+
.Concat(messagesCommand)
405+
.Append(suspendCommand)
406+
.CreateBatch(conn);
407+
408+
var affectedRows = await command.ExecuteNonQueryAsync();
409+
return affectedRows == 1 + (effects?.Count ?? 0) + (messages?.Count ?? 0);
410+
}
395411
}
396412

397413
public async Task<bool> PostponeFunction(

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/StoreTests.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,4 +231,16 @@ public override Task MessagesOnlyArePersistedOnSuspendFunction()
231231
[TestMethod]
232232
public override Task EffectsOnlyArePersistedOnSuspendFunction()
233233
=> EffectsOnlyArePersistedOnSuspendFunction(FunctionStoreFactory.Create());
234+
235+
[TestMethod]
236+
public override Task EffectsAndMessagesArePersistedOnSucceededFunction()
237+
=> EffectsAndMessagesArePersistedOnSucceededFunction(FunctionStoreFactory.Create());
238+
239+
[TestMethod]
240+
public override Task MessagesOnlyArePersistedOnSucceedFunction()
241+
=> MessagesOnlyArePersistedOnSucceedFunction(FunctionStoreFactory.Create());
242+
243+
[TestMethod]
244+
public override Task EffectsOnlyArePersistedOnSucceedFunction()
245+
=> EffectsOnlyArePersistedOnSucceedFunction(FunctionStoreFactory.Create());
234246
}

Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -399,18 +399,27 @@ public async Task<bool> SucceedFunction(
399399
IReadOnlyList<StoredMessage>? messages,
400400
ComplimentaryState complimentaryState)
401401
{
402+
var suspendCommand = _sqlGenerator.SucceedFunction(storedId, result, timestamp, expectedEpoch, paramPrefix: "Succeed");
403+
var effectCommand = effects == null
404+
? null
405+
: _sqlGenerator.UpdateEffects(effects, paramPrefix: "Effect");
406+
407+
var messageCommands = Array.Empty<StoreCommand>();
408+
409+
if (messages != null)
410+
messageCommands = messages
411+
.Select((msg, i) => _sqlGenerator.AppendMessage(storedId, msg, $"Message{i}"))
412+
.ToArray();
413+
402414
await using var conn = await _connFunc();
403-
await using var command = _sqlGenerator
404-
.SucceedFunction(
405-
storedId,
406-
result,
407-
timestamp,
408-
expectedEpoch,
409-
paramPrefix: ""
415+
await using var command = (effectCommand == null && messageCommands.Length == 0
416+
? suspendCommand
417+
: StoreCommand.Merge(messageCommands.Append(effectCommand).Append(suspendCommand))!
410418
).ToSqlCommand(conn);
411-
419+
420+
var expectedAffectedRows = 1 + (effects?.Count ?? 0) + (messages?.Count ?? 0);
412421
var affectedRows = await command.ExecuteNonQueryAsync();
413-
return affectedRows > 0;
422+
return affectedRows == expectedAffectedRows;
414423
}
415424

416425
public async Task<bool> PostponeFunction(

0 commit comments

Comments
 (0)