Skip to content

Commit fedb049

Browse files
committed
Minimize sql-script string generations in PostgreSqlMessageStore
1 parent c240601 commit fedb049

File tree

1 file changed

+35
-31
lines changed

1 file changed

+35
-31
lines changed

Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlMessageStore.cs

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ private async Task<NpgsqlConnection> CreateConnection()
2525
await conn.OpenAsync();
2626
return conn;
2727
}
28-
28+
29+
private string? _initializeSql;
2930
public async Task Initialize()
3031
{
3132
await using var conn = await CreateConnection();
32-
var sql = @$"
33+
_initializeSql ??= @$"
3334
CREATE TABLE IF NOT EXISTS {_tablePrefix}_messages (
3435
function_type_id VARCHAR(255),
3536
function_instance_id VARCHAR(255),
@@ -40,42 +41,46 @@ idempotency_key VARCHAR(255),
4041
PRIMARY KEY (function_type_id, function_instance_id, position)
4142
);";
4243

43-
var command = new NpgsqlCommand(sql, conn);
44+
var command = new NpgsqlCommand(_initializeSql, conn);
4445
await command.ExecuteNonQueryAsync();
4546
}
4647

48+
private string? _dropUnderlyingTableSql;
4749
public async Task DropUnderlyingTable()
4850
{
4951
await using var conn = await CreateConnection();
50-
var sql = $"DROP TABLE IF EXISTS {_tablePrefix}_messages;";
51-
var command = new NpgsqlCommand(sql, conn);
52+
_dropUnderlyingTableSql ??= $"DROP TABLE IF EXISTS {_tablePrefix}_messages;";
53+
var command = new NpgsqlCommand(_dropUnderlyingTableSql, conn);
5254
await command.ExecuteNonQueryAsync();
5355
}
54-
56+
57+
private string? _truncateTableSql;
5558
public async Task TruncateTable()
5659
{
5760
await using var conn = await CreateConnection();
58-
var sql = @$"TRUNCATE TABLE {_tablePrefix}_messages;";
59-
var command = new NpgsqlCommand(sql, conn);
61+
_truncateTableSql ??= $"TRUNCATE TABLE {_tablePrefix}_messages;";
62+
var command = new NpgsqlCommand(_truncateTableSql, conn);
6063
await command.ExecuteNonQueryAsync();
6164
}
6265

66+
private string? _appendMessageSql;
67+
private string? _getFunctionStatusInAppendMessageSql;
6368
public async Task<FunctionStatus> AppendMessage(FunctionId functionId, StoredMessage storedMessage)
6469
{
6570
await using var conn = await CreateConnection();
6671
await using var batch = new NpgsqlBatch(conn);
6772
var (messageJson, messageType, idempotencyKey) = storedMessage;
6873

6974
{ //append Message to message stream sql
70-
var sql = @$"
75+
_appendMessageSql ??= @$"
7176
INSERT INTO {_tablePrefix}_messages
7277
(function_type_id, function_instance_id, position, message_json, message_type, idempotency_key)
7378
VALUES (
7479
$1, $2,
7580
(SELECT COALESCE(MAX(position), -1) + 1 FROM {_tablePrefix}_messages WHERE function_type_id = $1 AND function_instance_id = $2),
7681
$3, $4, $5
7782
) RETURNING position;";
78-
var command = new NpgsqlBatchCommand(sql)
83+
var command = new NpgsqlBatchCommand(_appendMessageSql)
7984
{
8085
Parameters =
8186
{
@@ -90,12 +95,12 @@ INSERT INTO {_tablePrefix}_messages
9095
}
9196

9297
{ //get function status
93-
var sql = @$"
98+
_getFunctionStatusInAppendMessageSql ??= @$"
9499
SELECT epoch, status
95100
FROM {_tablePrefix}
96101
WHERE function_type_id = $1 AND function_instance_id = $2;";
97102

98-
var command = new NpgsqlBatchCommand(sql)
103+
var command = new NpgsqlBatchCommand(_getFunctionStatusInAppendMessageSql)
99104
{
100105
Parameters = {
101106
new() {Value = functionId.TypeId.Value},
@@ -134,16 +139,17 @@ INSERT INTO {_tablePrefix}_messages
134139
throw new ConcurrentModificationException(functionId); //row must have been deleted concurrently
135140
}
136141

142+
private string? _replaceMessageSql;
137143
public async Task<bool> ReplaceMessage(FunctionId functionId, int position, StoredMessage storedMessage)
138144
{
139145
await using var conn = await CreateConnection();
140-
var sql = @$"
146+
_replaceMessageSql ??= @$"
141147
UPDATE {_tablePrefix}_messages
142148
SET message_json = $1, message_type = $2, idempotency_key = $3
143149
WHERE function_type_id = $4 AND function_instance_id = $5 AND position = $6";
144150

145151
var (messageJson, messageType, idempotencyKey) = storedMessage;
146-
var command = new NpgsqlCommand(sql, conn)
152+
var command = new NpgsqlCommand(_replaceMessageSql, conn)
147153
{
148154
Parameters =
149155
{
@@ -159,39 +165,35 @@ public async Task<bool> ReplaceMessage(FunctionId functionId, int position, Stor
159165
var affectedRows = await command.ExecuteNonQueryAsync();
160166
return affectedRows == 1;
161167
}
162-
168+
169+
private string? _truncateFunctionSql;
163170
public async Task Truncate(FunctionId functionId)
164171
{
165172
await using var conn = await CreateConnection();
166-
await Truncate(functionId, conn, transaction: null);
167-
}
168-
169-
internal async Task<int> Truncate(FunctionId functionId, NpgsqlConnection connection, NpgsqlTransaction? transaction)
170-
{
171-
var sql = @$"
173+
_truncateFunctionSql ??= @$"
172174
DELETE FROM {_tablePrefix}_messages
173175
WHERE function_type_id = $1 AND function_instance_id = $2;";
174-
await using var command = new NpgsqlCommand(sql, connection, transaction)
176+
await using var command = new NpgsqlCommand(_truncateFunctionSql, conn)
175177
{
176178
Parameters =
177179
{
178180
new() {Value = functionId.TypeId.Value},
179181
new() {Value = functionId.InstanceId.Value}
180182
}
181183
};
182-
var affectedRows = await command.ExecuteNonQueryAsync();
183-
return affectedRows;
184+
await command.ExecuteNonQueryAsync();
184185
}
185-
186+
187+
private string? _getMessagesSql;
186188
public async Task<IReadOnlyList<StoredMessage>> GetMessages(FunctionId functionId, int skip)
187189
{
188190
await using var conn = await CreateConnection();
189-
var sql = @$"
191+
_getMessagesSql ??= @$"
190192
SELECT message_json, message_type, idempotency_key
191193
FROM {_tablePrefix}_messages
192194
WHERE function_type_id = $1 AND function_instance_id = $2 AND position >= $3
193195
ORDER BY position ASC;";
194-
await using var command = new NpgsqlCommand(sql, conn)
196+
await using var command = new NpgsqlCommand(_getMessagesSql, conn)
195197
{
196198
Parameters =
197199
{
@@ -214,14 +216,15 @@ public async Task<IReadOnlyList<StoredMessage>> GetMessages(FunctionId functionI
214216
return storedMessages;
215217
}
216218

219+
private string? _hasMoreMessagesSql;
217220
public async Task<bool> HasMoreMessages(FunctionId functionId, int skip)
218221
{
219222
await using var conn = await CreateConnection();
220-
var sql = @$"
223+
_hasMoreMessagesSql ??= @$"
221224
SELECT COALESCE(MAX(position), -1)
222225
FROM {_tablePrefix}_messages
223226
WHERE function_type_id = $1 AND function_instance_id = $2";
224-
await using var command = new NpgsqlCommand(sql, conn)
227+
await using var command = new NpgsqlCommand(_hasMoreMessagesSql, conn)
225228
{
226229
Parameters =
227230
{
@@ -237,14 +240,15 @@ SELECT COALESCE(MAX(position), -1)
237240
return maxPosition + 1 > skip;
238241
}
239242

243+
private string? _getSuspensionStatusSql;
240244
private async Task<FunctionStatus> GetSuspensionStatus(FunctionId functionId)
241245
{
242246
await using var conn = await CreateConnection();
243-
var sql = @$"
247+
_getSuspensionStatusSql ??= @$"
244248
SELECT epoch, status
245249
FROM {_tablePrefix}
246250
WHERE function_type_id = $1 AND function_instance_id = $2;";
247-
await using var command = new NpgsqlCommand(sql, conn)
251+
await using var command = new NpgsqlCommand(_getSuspensionStatusSql, conn)
248252
{
249253
Parameters = {
250254
new() {Value = functionId.TypeId.Value},

0 commit comments

Comments
 (0)