@@ -17,10 +17,11 @@ public MySqlMessageStore(string connectionString, string tablePrefix = "")
17
17
_tablePrefix = tablePrefix . ToLower ( ) ;
18
18
}
19
19
20
+ private string ? _initializeSql ;
20
21
public async Task Initialize ( )
21
22
{
22
23
await using var conn = await DatabaseHelper . CreateOpenConnection ( _connectionString ) ;
23
- var sql = @$ "
24
+ _initializeSql ?? = @$ "
24
25
CREATE TABLE IF NOT EXISTS { _tablePrefix } rfunctions_messages (
25
26
function_type_id VARCHAR(255),
26
27
function_instance_id VARCHAR(255),
@@ -30,26 +31,29 @@ message_type VARCHAR(255) NOT NULL,
30
31
idempotency_key VARCHAR(255),
31
32
PRIMARY KEY (function_type_id, function_instance_id, position)
32
33
);" ;
33
- var command = new MySqlCommand ( sql , conn ) ;
34
+ var command = new MySqlCommand ( _initializeSql , conn ) ;
34
35
await command . ExecuteNonQueryAsync ( ) ;
35
36
}
36
37
38
+ private string ? _dropUnderlyingTableSql ;
37
39
public async Task DropUnderlyingTable ( )
38
40
{
39
41
await using var conn = await DatabaseHelper . CreateOpenConnection ( _connectionString ) ;
40
- var sql = $ "DROP TABLE IF EXISTS { _tablePrefix } rfunctions_messages";
41
- await using var command = new MySqlCommand ( sql , conn ) ;
42
+ _dropUnderlyingTableSql ?? = $ "DROP TABLE IF EXISTS { _tablePrefix } rfunctions_messages";
43
+ await using var command = new MySqlCommand ( _dropUnderlyingTableSql , conn ) ;
42
44
await command . ExecuteNonQueryAsync ( ) ;
43
45
}
44
-
46
+
47
+ private string ? _truncateTableSql ;
45
48
public async Task TruncateTable ( )
46
49
{
47
50
await using var conn = await DatabaseHelper . CreateOpenConnection ( _connectionString ) ; ;
48
- var sql = @ $ "TRUNCATE TABLE { _tablePrefix } rfunctions_messages;";
49
- var command = new MySqlCommand ( sql , conn ) ;
51
+ _truncateTableSql ??= $ "TRUNCATE TABLE { _tablePrefix } rfunctions_messages;";
52
+ var command = new MySqlCommand ( _truncateTableSql , conn ) ;
50
53
await command . ExecuteNonQueryAsync ( ) ;
51
54
}
52
55
56
+ private string ? _appendMessageSql ;
53
57
public async Task < FunctionStatus > AppendMessage ( FunctionId functionId , StoredMessage storedMessage )
54
58
{
55
59
for ( var i = 0 ; i < 10 ; i ++ ) //retry if deadlock is occurs
@@ -59,7 +63,7 @@ public async Task<FunctionStatus> AppendMessage(FunctionId functionId, StoredMes
59
63
var ( messageJson , messageType , idempotencyKey ) = storedMessage ;
60
64
//https://dev.mysql.com/doc/refman/8.0/en/locking-functions.html#function_get-lock
61
65
var lockName = functionId . ToString ( ) . GenerateSHA256Hash ( ) ;
62
- var sql = @$ "
66
+ _appendMessageSql ?? = @$ "
63
67
SELECT GET_LOCK(?, 10);
64
68
INSERT INTO { _tablePrefix } rfunctions_messages
65
69
(function_type_id, function_instance_id, position, message_json, message_type, idempotency_key)
@@ -72,7 +76,7 @@ INSERT INTO {_tablePrefix}rfunctions_messages
72
76
FROM { _tablePrefix } rfunctions
73
77
WHERE function_type_id = ? AND function_instance_id = ?;" ;
74
78
75
- await using var command = new MySqlCommand ( sql , conn )
79
+ await using var command = new MySqlCommand ( _appendMessageSql , conn )
76
80
{
77
81
Parameters =
78
82
{
@@ -110,17 +114,18 @@ INSERT INTO {_tablePrefix}rfunctions_messages
110
114
111
115
throw new ConcurrentModificationException ( functionId ) ;
112
116
}
113
-
117
+
118
+ private string ? _replaceMessageSql ;
114
119
public async Task < bool > ReplaceMessage ( FunctionId functionId , int position , StoredMessage storedMessage )
115
120
{
116
121
await using var conn = await DatabaseHelper . CreateOpenConnection ( _connectionString ) ;
117
122
var ( messageJson , messageType , idempotencyKey ) = storedMessage ;
118
123
119
- var sql = @$ "
124
+ _replaceMessageSql ?? = @$ "
120
125
UPDATE { _tablePrefix } rfunctions_messages
121
126
SET message_json = ?, message_type = ?, idempotency_key = ?
122
127
WHERE function_type_id = ? AND function_instance_id = ? AND position = ?" ;
123
- await using var command = new MySqlCommand ( sql , conn )
128
+ await using var command = new MySqlCommand ( _replaceMessageSql , conn )
124
129
{
125
130
Parameters =
126
131
{
@@ -135,40 +140,32 @@ public async Task<bool> ReplaceMessage(FunctionId functionId, int position, Stor
135
140
var affectedRows = await command . ExecuteNonQueryAsync ( ) ;
136
141
return affectedRows == 1 ;
137
142
}
138
-
143
+
144
+ private string ? _truncateSql ;
139
145
public async Task Truncate ( FunctionId functionId )
140
146
{
141
147
await using var conn = await DatabaseHelper . CreateOpenConnection ( _connectionString ) ;
142
- await Truncate ( functionId , conn , transaction : null ) ;
143
- }
144
-
145
- internal async Task < int > Truncate ( FunctionId functionId , MySqlConnection connection , MySqlTransaction ? transaction )
146
- {
147
- var sql = @$ "
148
+ _truncateSql ??= @$ "
148
149
DELETE FROM { _tablePrefix } rfunctions_messages
149
150
WHERE function_type_id = ? AND function_instance_id = ?" ;
150
151
151
- await using var command =
152
- transaction == null
153
- ? new MySqlCommand ( sql , connection )
154
- : new MySqlCommand ( sql , connection , transaction ) ;
155
-
152
+ await using var command = new MySqlCommand ( _truncateSql , conn ) ;
156
153
command . Parameters . Add ( new ( ) { Value = functionId . TypeId . Value } ) ;
157
154
command . Parameters . Add ( new ( ) { Value = functionId . InstanceId . Value } ) ;
158
155
159
- var affectedRows = await command . ExecuteNonQueryAsync ( ) ;
160
- return affectedRows ;
156
+ await command . ExecuteNonQueryAsync ( ) ;
161
157
}
162
-
158
+
159
+ private string ? _getMessagesSql ;
163
160
public async Task < IReadOnlyList < StoredMessage > > GetMessages ( FunctionId functionId , int skip )
164
161
{
165
162
await using var conn = await DatabaseHelper . CreateOpenConnection ( _connectionString ) ;
166
- var sql = @$ "
163
+ _getMessagesSql ?? = @$ "
167
164
SELECT message_json, message_type, idempotency_key
168
165
FROM { _tablePrefix } rfunctions_messages
169
166
WHERE function_type_id = ? AND function_instance_id = ? AND position >= ?
170
167
ORDER BY position ASC;" ;
171
- await using var command = new MySqlCommand ( sql , conn )
168
+ await using var command = new MySqlCommand ( _getMessagesSql , conn )
172
169
{
173
170
Parameters =
174
171
{
@@ -191,14 +188,15 @@ public async Task<IReadOnlyList<StoredMessage>> GetMessages(FunctionId functionI
191
188
return storedMessages ;
192
189
}
193
190
191
+ private string ? _hasMoreMessagesSql ;
194
192
public async Task < bool > HasMoreMessages ( FunctionId functionId , int skip )
195
193
{
196
194
await using var conn = await DatabaseHelper . CreateOpenConnection ( _connectionString ) ;
197
- var sql = @$ "
195
+ _hasMoreMessagesSql ?? = @$ "
198
196
SELECT COALESCE(MAX(position), -1)
199
197
FROM { _tablePrefix } rfunctions_messages
200
198
WHERE function_type_id = ? AND function_instance_id = ?;" ;
201
- await using var command = new MySqlCommand ( sql , conn )
199
+ await using var command = new MySqlCommand ( _hasMoreMessagesSql , conn )
202
200
{
203
201
Parameters =
204
202
{
0 commit comments