Skip to content

Commit 6aaa016

Browse files
kalbhorLakshay Kalbhor
andauthored
fix: mysql results table commands and add tests (#49)
Closes #48. * fix: mysql results table commands and add tests * fix: update CI workflow to include mysql tests --------- Co-authored-by: Lakshay Kalbhor <[email protected]>
1 parent e7df3a1 commit 6aaa016

File tree

6 files changed

+128
-31
lines changed

6 files changed

+128
-31
lines changed

.github/workflows/test.yml

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,17 @@ jobs:
3939
--health-retries 5
4040
ports:
4141
- 5432:5432
42-
42+
db:
43+
image: mysql:8.0
44+
env:
45+
MYSQL_ROOT_PASSWORD: rootpassword
46+
MYSQL_DATABASE: mydb
47+
MYSQL_USER: user
48+
MYSQL_PASSWORD: userpassword
49+
ports:
50+
- "3306:3306"
51+
# volumes:
52+
# - mysql_data:/var/lib/mysql
4353
steps:
4454
- name: Install Go
4555
if: success()
@@ -52,14 +62,23 @@ jobs:
5262
- name: Checkout code
5363
uses: actions/checkout@v3
5464

55-
- name: Add tables
65+
- name: Add postgres tables
5666
run: PGPASSWORD=testPass psql -U testUser -h localhost -p 5432 -d testDB -c 'CREATE TABLE entries (id BIGSERIAL PRIMARY KEY, amount REAL, user_id VARCHAR(6), entry_date DATE, timestamp TIMESTAMP)';
57-
67+
68+
- name: Add mysql tables
69+
run: mysql -h localhost -P 3306 -u root -p mydb --protocol=TCP --password=rootpassword -e 'CREATE TABLE entries (id BIGINT PRIMARY KEY, amount REAL, user_id VARCHAR(6), entry_date DATE, timestamp TIMESTAMP)'
70+
5871
- name: Build binary
5972
run: CGO_ENABLED=0 go build -o server.bin -ldflags="-s -w -X 'main.buildString=${BUILDSTR}'" ./cmd/*.go
6073

6174
- name: Run binary server
62-
run: ./server.bin --config config.test.toml &
75+
run: ./server.bin --config config.test_pg.toml --sql-directory=sql/pg &
76+
77+
- name: Run tests
78+
run: sleep 5 && go test ./client -v -covermode=count
79+
80+
- name: Run binary server
81+
run: ./server.bin --config config.test_mysql.toml --sql-directory=sql/mysql &
6382

6483
- name: Run tests
6584
run: sleep 5 && go test ./client -v -covermode=count

config.test_mysql.toml

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
[app]
2+
log_level = "DEBUG"
3+
default_job_ttl = "60s"
4+
5+
[job_queue.broker]
6+
type = "redis"
7+
addresses = ["localhost:6379"]
8+
password = ""
9+
db = 1
10+
max_active = 50
11+
max_idle = 20
12+
dial_timeout = "1s"
13+
read_timeout = "1s"
14+
write_timeout = "1s"
15+
16+
[job_queue.state]
17+
type = "redis"
18+
addresses = ["localhost:6379"]
19+
password = ""
20+
db = 1
21+
max_active = 50
22+
max_idle = 20
23+
dial_timeout = "1s"
24+
read_timeout = "1s"
25+
write_timeout = "1s"
26+
expiry = "30s"
27+
meta_expiry = "3600s"
28+
29+
# Results database configuration (MySQL)
30+
[results.my_results]
31+
type = "mysql"
32+
dsn = "root:rootpassword@tcp(127.0.0.1:3306)/mydb"
33+
max_idle = 10
34+
max_active = 100
35+
connect_timeout = "10s"
36+
results_table = "results_%s"
37+
38+
[db.my_db]
39+
type = "mysql"
40+
dsn = "root:rootpassword@tcp(127.0.0.1:3306)/mydb"
41+
max_idle = 10
42+
max_active = 100
43+
connect_timeout = "10s"

config.test.toml renamed to config.test_pg.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ default_job_ttl = "60s"
66
type = "redis"
77
addresses = ["localhost:6379"]
88
password = ""
9-
db = 1
9+
db = 2
1010
max_active = 50
1111
max_idle = 20
1212
dial_timeout = "1s"
@@ -17,7 +17,7 @@ write_timeout = "1s"
1717
type = "redis"
1818
addresses = ["localhost:6379"]
1919
password = ""
20-
db = 1
20+
db = 2
2121
max_active = 50
2222
max_idle = 20
2323
dial_timeout = "1s"

internal/resultbackends/sqldb/sqldb.go

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ func (w *SQLDBResultSet) WriteCols(cols []string) error {
187187
return err
188188
}
189189

190-
return err
190+
return nil
191191
}
192192

193193
// WriteRow writes an individual row from a result set to the backend.
@@ -234,7 +234,7 @@ func (s *SqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins
234234
)
235235

236236
for i := range cols {
237-
colNameHolder[i] = fmt.Sprintf(`"%s"`, cols[i])
237+
colNameHolder[i] = s.quoteIdentifier(cols[i])
238238

239239
// This will be filled by the driver.
240240
if s.opt.DBType == dbTypePostgres {
@@ -247,37 +247,35 @@ func (s *SqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins
247247

248248
var (
249249
fields = make([]string, len(cols))
250-
typ = ""
251-
unlogged = ""
250+
typ string
251+
unlogged string
252252
)
253253

254254
for i := 0; i < len(cols); i++ {
255255
typ = colTypes[i].DatabaseTypeName()
256-
switch colTypes[i].DatabaseTypeName() {
257-
case "INT2", "INT4", "INT8", // Postgres
258-
"TINYINT", "SMALLINT", "INT", "MEDIUMINT", "BIGINT": // MySQL
256+
switch typ {
257+
case "INT2", "INT4", "INT8", "TINYINT", "SMALLINT", "INT", "MEDIUMINT", "BIGINT":
259258
typ = "BIGINT"
260-
case "FLOAT4", "FLOAT8", // Postgres
261-
"DECIMAL", "FLOAT", "DOUBLE", "NUMERIC": // MySQL
259+
case "FLOAT4", "FLOAT8", "DECIMAL", "FLOAT", "DOUBLE", "NUMERIC":
262260
typ = "DECIMAL"
263-
case "TIMESTAMP", // Postgres, MySQL
264-
"DATETIME": // MySQL
261+
case "TIMESTAMP", "DATETIME":
265262
typ = "TIMESTAMP"
266-
case "DATE": // Postgres, MySQL
263+
case "DATE":
267264
typ = "DATE"
268-
case "BOOLEAN": // Postgres, MySQL
265+
case "BOOLEAN":
269266
typ = "BOOLEAN"
270-
case "JSON", "JSONB": // Postgres
267+
case "JSON", "JSONB":
268+
if s.opt.DBType == dbTypePostgres {
269+
typ = "JSONB"
270+
} else {
271+
typ = "JSON"
272+
}
273+
case "_INT4", "_INT8", "_TEXT":
271274
if s.opt.DBType != dbTypePostgres {
272275
typ = "TEXT"
273276
}
274-
// _INT4, _INT8, _TEXT represent array types in Postgres
275-
case "_INT4": // Postgres
276-
typ = "_INT4"
277-
case "_INT8": // Postgres
278-
typ = "_INT8"
279-
case "_TEXT": // Postgres
280-
typ = "_TEXT"
277+
case "VARCHAR":
278+
typ = "VARCHAR(255)"
281279
default:
282280
typ = "TEXT"
283281
}
@@ -286,7 +284,7 @@ func (s *SqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins
286284
typ += " NOT NULL"
287285
}
288286

289-
fields[i] = fmt.Sprintf(`"%s" %s`, cols[i], typ)
287+
fields[i] = fmt.Sprintf("%s %s", s.quoteIdentifier(cols[i]), typ)
290288
}
291289

292290
// If the DB is Postgres, optionally create an "unlogged" table that disables
@@ -297,9 +295,20 @@ func (s *SqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins
297295
}
298296

299297
return insertSchema{
300-
dropTable: `DROP TABLE IF EXISTS "%s";`,
301-
createTable: fmt.Sprintf(`CREATE %s TABLE IF NOT EXISTS "%%s" (%s);`, unlogged, strings.Join(fields, ",")),
302-
insertRow: fmt.Sprintf(`INSERT INTO "%%s" (%s) VALUES (%s)`, strings.Join(colNameHolder, ","),
298+
dropTable: fmt.Sprintf("DROP TABLE IF EXISTS %s;", s.quoteIdentifier("%s")),
299+
createTable: fmt.Sprintf("CREATE %s TABLE IF NOT EXISTS %s (%s);", unlogged, s.quoteIdentifier("%s"), strings.Join(fields, ",")),
300+
insertRow: fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
301+
s.quoteIdentifier("%s"),
302+
strings.Join(colNameHolder, ","),
303303
strings.Join(colValHolder, ",")),
304304
}
305305
}
306+
307+
// quoteIdentifier quotes an identifier (table or column name) based on the database type
308+
func (s *SqlDB) quoteIdentifier(name string) string {
309+
if s.opt.DBType == dbTypePostgres {
310+
return fmt.Sprintf(`"%s"`, name)
311+
}
312+
// MySQL uses backticks
313+
return fmt.Sprintf("`%s`", name)
314+
}

sql/mysql/test.mysql.sql

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
-- test.sql
2+
-- concurrency parameter is associated with the queue.
3+
-- Once a queue is set to a particular concurrency, it cannot be changed.
4+
-- In the below example both `get_profit_summary` and `get_profit_entries` use
5+
-- a common queue with concurrency = 5. It is okay to pass concurrency
6+
-- again in `get_profit_entries` as long as it is the same as the one defined initially (5)
7+
8+
-- name: get_profit_summary
9+
-- db: my_db
10+
-- concurrency: 5
11+
-- queue: test
12+
SELECT SUM(amount) AS total, entry_date FROM entries WHERE user_id = ? GROUP BY entry_date;
13+
14+
-- name: get_profit_entries
15+
-- db: my_db
16+
-- queue: test
17+
SELECT * FROM entries WHERE user_id = ?;
18+
19+
-- name: get_profit_entries_by_date
20+
-- queue: test
21+
SELECT * FROM entries WHERE user_id = ? AND timestamp > ? and timestamp < ?;
22+
23+
-- name: slow_query
24+
-- db: my_db
25+
-- queue: test
26+
SELECT SLEEP(?);
File renamed without changes.

0 commit comments

Comments
 (0)