-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathtests.py
805 lines (650 loc) · 28.6 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
from collections import namedtuple
from threading import Thread
from unittest import TestCase
from postgres import (
AlreadyRegistered, NotAModel, NotRegistered, NoSuchType, NoTypeSpecified,
Postgres,
)
from postgres.cache import Cache
from postgres.cursors import (
BadBackAs, TooFew, TooMany,
Row, SimpleDictCursor, SimpleNamedTupleCursor, SimpleRowCursor, SimpleTupleCursor,
)
from postgres.orm import Model, ReadOnlyAttribute, UnknownAttributes
from psycopg2.errors import InterfaceError, ProgrammingError, ReadOnlySqlTransaction
from pytest import mark, raises
class Heck(Exception):
pass
# harnesses
# =========
class WithSchema(TestCase):
def setUp(self):
self.db = Postgres()
self.db.run("DROP SCHEMA IF EXISTS public CASCADE")
self.db.run("CREATE SCHEMA public")
def tearDown(self):
self.db.run("DROP SCHEMA IF EXISTS public CASCADE")
del self.db
class WithData(WithSchema):
def setUp(self):
WithSchema.setUp(self)
self.db.run("CREATE TABLE foo (bar text)")
self.db.run("INSERT INTO foo VALUES ('baz')")
self.db.run("INSERT INTO foo VALUES ('buz')")
# db.run
# ======
class TestRun(WithSchema):
def test_run_runs(self):
self.db.run("CREATE TABLE foo (bar text)")
actual = self.db.all("SELECT tablename FROM pg_tables "
"WHERE schemaname='public'")
assert actual == ["foo"]
def test_run_inserts(self):
self.db.run("CREATE TABLE foo (bar text)")
self.db.run("INSERT INTO foo VALUES ('baz')")
actual = self.db.one("SELECT * FROM foo ORDER BY bar")
assert actual == "baz"
def test_run_accepts_bind_parameters_as_keyword_arguments(self):
self.db.run("CREATE TABLE foo (bar text)")
self.db.run("INSERT INTO foo VALUES (%(bar)s)", bar='baz')
actual = self.db.one("SELECT * FROM foo ORDER BY bar")
assert actual == "baz"
# db.all
# ======
class TestRows(WithData):
def test_all_fetches_all_rows(self):
actual = self.db.all("SELECT * FROM foo ORDER BY bar")
assert actual == ["baz", "buz"]
def test_all_fetches_one_row(self):
actual = self.db.all("SELECT * FROM foo WHERE bar='baz'")
assert actual == ["baz"]
def test_all_fetches_no_rows(self):
actual = self.db.all("SELECT * FROM foo WHERE bar='blam'")
assert actual == []
def test_all_doesnt_choke_on_values_column(self):
actual = self.db.all("SELECT bar AS values FROM foo")
assert actual == ["baz", "buz"]
def test_bind_parameters_as_dict_work(self):
params = {"bar": "baz"}
actual = self.db.all("SELECT * FROM foo WHERE bar=%(bar)s", params)
assert actual == ["baz"]
def test_bind_parameters_as_tuple_work(self):
actual = self.db.all("SELECT * FROM foo WHERE bar=%s", ("baz",))
assert actual == ["baz"]
def test_bind_parameters_as_kwargs_work(self):
actual = self.db.all("SELECT * FROM foo WHERE bar=%(bar)s", bar='baz')
assert actual == ["baz"]
def test_all_raises_BadBackAs(self):
with self.assertRaises(BadBackAs) as context:
self.db.all("SELECT * FROM foo", back_as='foo')
assert str(context.exception) == (
"%r is not a valid value for the back_as argument.\n"
"The available values are: Row, dict, namedtuple, tuple."
) % 'foo'
# db.one
# ======
class TestWrongNumberException(WithData):
def test_TooFew_message_is_helpful(self):
try:
actual = self.db.one("CREATE TABLE foux (baar text)")
except TooFew as exc:
actual = str(exc)
assert actual == "Got -1 rows; expecting 0 or 1."
def test_TooMany_message_is_helpful_for_two_options(self):
actual = str(TooMany(2, 1, 1))
assert actual == "Got 2 rows; expecting exactly 1."
def test_TooMany_message_is_helpful_for_a_range(self):
actual = str(TooMany(4, 1, 3))
assert actual == "Got 4 rows; expecting between 1 and 3 (inclusive)."
class TestOne(WithData):
def test_one_raises_TooFew(self):
with self.assertRaises(TooFew):
self.db.one("CREATE TABLE foux (baar text)")
def test_one_rollsback_on_error(self):
try:
self.db.one("CREATE TABLE foux (baar text)")
except TooFew:
pass
with self.assertRaises(ProgrammingError):
self.db.all("SELECT * FROM foux")
def test_one_returns_None(self):
actual = self.db.one("SELECT * FROM foo WHERE bar='blam'")
assert actual is None
def test_one_returns_default(self):
class WHEEEE: pass # noqa: E701
actual = self.db.one("SELECT * FROM foo WHERE bar='blam'", default=WHEEEE)
assert actual is WHEEEE
def test_one_raises_default(self):
exception = RuntimeError('oops')
try:
self.db.one("SELECT * FROM foo WHERE bar='blam'", default=exception)
except Exception as e:
if e is not exception:
raise
else:
raise AssertionError('exception not raised')
def test_one_returns_default_after_derefencing(self):
default = 0
actual = self.db.one("SELECT NULL AS foo", default=default)
assert actual is default
def test_one_raises_default_after_derefencing(self):
exception = RuntimeError('oops')
try:
self.db.one("SELECT NULL AS foo", default=exception)
except Exception as e:
if e is not exception:
raise
else:
raise AssertionError('exception not raised')
def test_one_returns_one(self):
actual = self.db.one("SELECT * FROM foo WHERE bar='baz'")
assert actual == "baz"
def test_one_accepts_a_dict_for_bind_parameters(self):
actual = self.db.one("SELECT %(bar)s as bar", {"bar": "baz"})
assert actual == "baz"
def test_one_accepts_a_tuple_for_bind_parameters(self):
actual = self.db.one("SELECT %s as bar", ("baz",))
assert actual == "baz"
def test_one_accepts_bind_parameters_as_keyword_arguments(self):
actual = self.db.one("SELECT %(bar)s as bar", bar='baz')
assert actual == "baz"
def test_one_doesnt_choke_on_values_column(self):
actual = self.db.one("SELECT 1 AS values")
assert actual == 1
def test_one_raises_TooMany(self):
self.assertRaises(TooMany, self.db.one, "SELECT * FROM foo")
def test_one_raises_BadBackAs(self):
with self.assertRaises(BadBackAs) as context:
self.db.one("SELECT * FROM foo LIMIT 1", back_as='foo')
assert str(context.exception) == (
"%r is not a valid value for the back_as argument.\n"
"The available values are: Row, dict, namedtuple, tuple."
) % 'foo'
# db.cache
# ========
class TestCache(TestCase):
def setUp(self):
self.db = Postgres(cache=Cache(max_size=1), cursor_factory=SimpleTupleCursor)
self.db.run("DROP SCHEMA IF EXISTS public CASCADE")
self.db.run("CREATE SCHEMA public")
self.db.run("CREATE TABLE foo (key text, value int)")
self.db.run("INSERT INTO foo VALUES ('a', 1)")
self.db.run("INSERT INTO foo VALUES ('b', 2)")
def test_one_returns_cached_row(self):
query = "SELECT * FROM foo WHERE key = 'a'"
r1 = self.db.one(query, max_age=10)
r2 = self.db.one(query, max_age=10)
assert r2 is r1
def test_all_returns_cached_rows(self):
query = "SELECT * FROM foo ORDER BY key"
r1 = self.db.all(query, max_age=10)
r2 = self.db.all(query, max_age=10)
assert r2 == r1
assert r2 is not r1
assert r2[0] is r1[0]
def test_back_as_is_compatible_with_caching(self):
query = "SELECT * FROM foo WHERE key = 'a'"
r1 = self.db.one(query, back_as=dict, max_age=10)
r2 = self.db.one(query, back_as=namedtuple, max_age=10)
assert r1 == r2._asdict()
rows = self.db.all(query, back_as='Row', max_age=10)
assert rows == [r1]
def test_all_returns_row_cached_by_one(self):
query = "SELECT * FROM foo WHERE key = 'a'"
row = self.db.one(query, max_age=10)
rows = self.db.all(query, max_age=10)
assert rows == [row]
assert rows[0] is row
def test_one_raises_TooMany_when_the_cache_contains_multiple_rows(self):
query = "SELECT * FROM foo"
rows = self.db.all(query, max_age=10)
assert len(rows) == 2
with self.assertRaises(TooMany):
self.db.one(query, max_age=10)
def test_cache_max_size(self):
query1 = b"SELECT * FROM foo WHERE key = 'a'"
query2 = b"SELECT * FROM foo WHERE key = 'b'"
self.db.all(query1, max_age=10)
assert set(self.db.cache.entries.keys()) == {query1}
self.db.all(query2, max_age=10)
assert set(self.db.cache.entries.keys()) == {query2}
def test_cache_max_age(self):
query = b"SELECT * FROM foo WHERE key = 'a'"
r1 = self.db.one(query, max_age=0)
r2 = self.db.one(query, max_age=10)
assert r2 is not r1
def test_cache_prune(self):
self.db.cache.max_size = 2
query1 = b"SELECT * FROM foo WHERE key = 'a'"
query2 = b"SELECT * FROM foo WHERE key = 'b'"
self.db.one(query1, max_age=-1)
self.db.one(query2, max_age=10)
assert set(self.db.cache.entries.keys()) == {query1, query2}
self.db.cache.prune()
assert set(self.db.cache.entries.keys()) == {query2}
def test_cache_prevents_concurrent_queries(self):
with self.db.get_cursor() as cursor:
cursor.run("LOCK TABLE foo IN EXCLUSIVE MODE")
def insert():
self.db.one("INSERT INTO foo VALUES ('c', 3) RETURNING *", max_age=1)
t1 = Thread(target=insert)
t2 = Thread(target=insert)
t1.start()
t2.start()
cursor.run("COMMIT") # this releases the table lock
t1.join()
t2.join()
n = self.db.one("SELECT count(*) FROM foo WHERE key = 'c'")
assert n == 1
# db.get_cursor
# =============
class TestCursor(WithData):
def test_get_cursor_gets_a_cursor(self):
with self.db.get_cursor(cursor_factory=SimpleDictCursor) as cursor:
cursor.execute("INSERT INTO foo VALUES ('blam')")
cursor.execute("SELECT * FROM foo ORDER BY bar")
actual = cursor.fetchall()
assert actual == [{"bar": "baz"}, {"bar": "blam"}, {"bar": "buz"}]
def test_transaction_is_isolated(self):
with self.db.get_cursor() as cursor:
cursor.execute("INSERT INTO foo VALUES ('blam')")
cursor.execute("SELECT * FROM foo ORDER BY bar")
actual = self.db.all("SELECT * FROM foo ORDER BY bar")
assert actual == ["baz", "buz"]
def test_transaction_commits_on_success(self):
with self.db.get_cursor() as cursor:
cursor.execute("INSERT INTO foo VALUES ('blam')")
cursor.execute("SELECT * FROM foo ORDER BY bar")
actual = self.db.all("SELECT * FROM foo ORDER BY bar")
assert actual == ["baz", "blam", "buz"]
def test_transaction_rolls_back_on_failure(self):
try:
with self.db.get_cursor() as cursor:
cursor.execute("INSERT INTO foo VALUES ('blam')")
cursor.execute("SELECT * FROM foo ORDER BY bar")
raise Heck
except Heck:
pass
actual = self.db.all("SELECT * FROM foo ORDER BY bar")
assert actual == ["baz", "buz"]
def test_cursor_rollback_exception_is_ignored(self):
try:
with self.db.get_cursor() as cursor:
cursor.connection.close()
raise Heck
except Heck:
pass
def test_we_close_the_cursor(self):
with self.db.get_cursor() as cursor:
cursor.execute("SELECT * FROM foo ORDER BY bar")
with self.assertRaises(InterfaceError):
cursor.fetchall()
def test_monkey_patch_execute(self):
expected = "SELECT 1"
def execute(this, sql, params=[]):
return sql
from postgres.cursors import SimpleCursorBase
SimpleCursorBase.execute = execute
with self.db.get_cursor() as cursor:
actual = cursor.execute(expected)
del SimpleCursorBase.execute
assert actual == expected
def test_autocommit_cursor(self):
try:
with self.db.get_cursor(autocommit=True) as cursor:
try:
cursor.execute("INVALID QUERY")
except ProgrammingError:
pass
cursor.execute("INSERT INTO foo VALUES ('blam')")
with self.db.get_cursor() as cursor:
n = cursor.one("SELECT count(*) FROM foo")
assert n == 3
raise KeyboardInterrupt()
except KeyboardInterrupt:
pass
with self.db.get_cursor() as cursor:
n = cursor.one("SELECT count(*) FROM foo")
assert n == 3
def test_readonly_cursor(self):
try:
with self.db.get_cursor(readonly=True) as cursor:
cursor.execute("INSERT INTO foo VALUES ('blam')")
except ReadOnlySqlTransaction:
pass
def test_get_cursor_supports_subtransactions(self):
before_count = self.db.one("SELECT count(*) FROM foo")
with self.db.get_cursor(back_as='dict') as outer_cursor:
outer_cursor.execute("INSERT INTO foo VALUES ('lorem')")
with self.db.get_cursor(cursor=outer_cursor) as inner_cursor:
assert inner_cursor is outer_cursor
assert inner_cursor.back_as == 'dict'
inner_cursor.execute("INSERT INTO foo VALUES ('ipsum')")
after_count = self.db.one("SELECT count(*) FROM foo")
assert after_count == (before_count + 2)
def test_subtransactions_do_not_swallow_exceptions(self):
before_count = self.db.one("SELECT count(*) FROM foo")
try:
with self.db.get_cursor() as cursor:
cursor.execute("INSERT INTO foo VALUES ('lorem')")
with self.db.get_cursor(cursor=cursor) as c:
c.execute("INSERT INTO foo VALUES ('ipsum')")
raise Heck
except Heck:
pass
after_count = self.db.one("SELECT count(*) FROM foo")
assert after_count == before_count
# db.get_connection
# =================
class TestConnection(WithData):
def test_get_connection_gets_a_connection(self):
with self.db.get_connection() as conn:
cursor = conn.cursor(cursor_factory=SimpleDictCursor)
cursor.execute("SELECT * FROM foo ORDER BY bar")
actual = cursor.fetchall()
assert actual == [{"bar": "baz"}, {"bar": "buz"}]
def test_connection_rollback_exception_is_ignored(self):
try:
with self.db.get_connection() as conn:
conn.close()
raise Heck
except Heck:
pass
def test_connection_has_get_cursor_method(self):
with self.db.get_connection() as conn:
with conn.get_cursor() as cursor:
cursor.execute("DELETE FROM foo WHERE bar = 'baz'")
with self.db.get_cursor(cursor_factory=SimpleDictCursor) as cursor:
cursor.execute("SELECT * FROM foo ORDER BY bar")
actual = cursor.fetchall()
assert actual == [{"bar": "buz"}]
def test_get_cursor_method_checks_cursor_argument(self):
with self.db.get_connection() as conn, self.db.get_cursor() as cursor:
with self.assertRaises(ValueError):
conn.get_cursor(cursor=cursor)
# orm
# ===
class TestORM(WithData):
class MyModel(Model):
__slots__ = ('bar', '__dict__')
typname = "foo"
def __init__(self, values):
Model.__init__(self, values)
self.bar_from_init = self.bar
def update_bar(self, bar):
self.db.run("UPDATE foo SET bar=%s WHERE bar=%s", (bar, self.bar))
self.set_attributes(bar=bar)
def setUp(self):
WithData.setUp(self)
self.db.register_model(self.MyModel)
def tearDown(self):
self.db.model_registry = {}
def installFlah(self):
self.db.run("CREATE TABLE flah (bar text)")
self.db.register_model(self.MyModel, 'flah')
def test_register_model_handles_schema(self):
self.db.run("DROP SCHEMA IF EXISTS foo CASCADE")
self.db.run("CREATE SCHEMA foo")
self.db.run("CREATE TABLE foo.flah (bar text)")
self.db.register_model(self.MyModel, 'foo.flah')
def test_register_model_raises_AlreadyRegistered(self):
with self.assertRaises(AlreadyRegistered) as context:
self.db.register_model(self.MyModel)
assert context.exception.args == (self.MyModel, self.MyModel.typname)
assert str(context.exception) == (
"The model MyModel is already registered for the typname foo."
)
def test_register_model_raises_NoSuchType(self):
with self.assertRaises(NoSuchType):
self.db.register_model(self.MyModel, 'nonexistent')
def test_register_model_raises_NoTypeSpecified(self):
with self.assertRaises(NoTypeSpecified):
self.db.register_model(Model)
def test_orm_basically_works(self):
one = self.db.one("SELECT foo FROM foo WHERE bar='baz'")
assert one.__class__ == self.MyModel
def test_orm_models_get_kwargs_to_init(self):
one = self.db.one("SELECT foo FROM foo WHERE bar='baz'")
assert one.bar_from_init == 'baz'
def test_updating_attributes_works(self):
one = self.db.one("SELECT foo FROM foo WHERE bar='baz'")
one.update_bar("blah")
bar = self.db.one("SELECT bar FROM foo WHERE bar='blah'")
assert bar == one.bar
def test_setting_unknown_attributes(self):
one = self.db.one("SELECT foo FROM foo WHERE bar='baz'")
with self.assertRaises(UnknownAttributes) as context:
one.set_attributes(bar='blah', x=0, y=1)
assert sorted(context.exception.args[0]) == ['x', 'y']
assert str(context.exception) == (
"The following attribute(s) are unknown to us: %s."
) % ', '.join(context.exception.args[0])
def test_attributes_are_read_only(self):
one = self.db.one("SELECT foo FROM foo WHERE bar='baz'")
with self.assertRaises(ReadOnlyAttribute) as context:
one.bar = "blah"
assert context.exception.args == ("bar",)
assert str(context.exception).startswith("bar is a read-only attribute.")
def test_check_register_raises_if_passed_a_model_instance(self):
obj = self.MyModel(['baz'])
raises(NotAModel, self.db.check_registration, obj)
def test_check_register_doesnt_include_subsubclasses(self):
class Other(self.MyModel): pass # noqa: E701
raises(NotRegistered, self.db.check_registration, Other)
def test_dot_dot_dot_unless_you_ask_it_to(self):
class Other(self.MyModel): pass # noqa: E701
assert self.db.check_registration(Other, True) == ['foo']
def test_check_register_handles_complex_cases(self):
self.installFlah()
class Second(Model): pass # noqa: E701
self.db.run("CREATE TABLE blum (bar text)")
self.db.register_model(Second, 'blum')
assert self.db.check_registration(Second) == ['blum']
class Third(self.MyModel, Second): pass # noqa: E701
actual = list(sorted(self.db.check_registration(Third, True)))
assert actual == ['blum', 'flah', 'foo']
def test_a_model_can_be_used_for_a_second_type(self):
self.installFlah()
self.db.run("INSERT INTO flah VALUES ('double')")
self.db.run("INSERT INTO flah VALUES ('trouble')")
flah = self.db.one("SELECT flah FROM flah WHERE bar='double'")
assert flah.bar == "double"
def test_check_register_returns_string_for_single(self):
assert self.db.check_registration(self.MyModel) == ['foo']
def test_check_register_returns_list_for_multiple(self):
self.installFlah()
actual = list(sorted(self.db.check_registration(self.MyModel)))
assert actual == ['flah', 'foo']
def test_unregister_unregisters_one(self):
self.db.unregister_model(self.MyModel)
assert self.db.model_registry == {}
def test_unregister_leaves_other(self):
self.db.run("CREATE TABLE flum (bar text)")
class OtherModel(Model): pass # noqa: E701
self.db.register_model(OtherModel, 'flum')
self.db.unregister_model(self.MyModel)
assert self.db.model_registry == {'flum': OtherModel}
def test_unregister_unregisters_multiple(self):
self.installFlah()
self.db.unregister_model(self.MyModel)
assert self.db.model_registry == {}
def test_add_column_doesnt_break_anything(self):
self.db.run("ALTER TABLE foo ADD COLUMN boo text")
one = self.db.one("SELECT foo FROM foo WHERE bar='baz'")
assert one.boo is None
def test_replace_column_different_type(self):
self.db.run("CREATE TABLE grok (bar int)")
self.db.run("INSERT INTO grok VALUES (0)")
class EmptyModel(Model): pass # noqa: E701
self.db.register_model(EmptyModel, 'grok')
# Add a new column then drop the original one
self.db.run("ALTER TABLE grok ADD COLUMN biz text NOT NULL DEFAULT 'x'")
self.db.run("ALTER TABLE grok DROP COLUMN bar")
# The number of columns hasn't changed but the names and types have
one = self.db.one("SELECT grok FROM grok LIMIT 1")
assert one.biz == 'x'
assert not hasattr(one, 'bar')
@mark.xfail(raises=AttributeError)
def test_replace_column_same_type_different_name(self):
self.db.run("ALTER TABLE foo ADD COLUMN biz text NOT NULL DEFAULT 0")
self.db.run("ALTER TABLE foo DROP COLUMN bar")
one = self.db.one("SELECT foo FROM foo LIMIT 1")
assert one.biz == 0
assert not hasattr(one, 'bar')
# SimpleCursorBase
# ================
class TestSimpleCursorBase(WithData):
def test_fetchone(self):
with self.db.get_cursor(cursor_factory=SimpleTupleCursor) as cursor:
cursor.execute("SELECT 1 as foo")
r = cursor.fetchone()
assert r == (1,)
def test_fetchone_supports_back_as(self):
with self.db.get_cursor() as cursor:
cursor.execute("SELECT 1 as foo")
r = cursor.fetchone(back_as=dict)
assert r == {'foo': 1}
cursor.execute("SELECT 2 as foo")
r = cursor.fetchone(back_as=tuple)
assert r == (2,)
def test_fetchone_raises_BadBackAs(self):
with self.db.get_cursor() as cursor:
cursor.execute("SELECT 1 as foo")
with self.assertRaises(BadBackAs) as context:
cursor.fetchone(back_as='bar')
assert str(context.exception) == (
"%r is not a valid value for the back_as argument.\n"
"The available values are: Row, dict, namedtuple, tuple."
) % 'bar'
def test_fetchmany(self):
with self.db.get_cursor(cursor_factory=SimpleTupleCursor) as cursor:
cursor.execute("SELECT 1 as foo")
r = cursor.fetchmany()
assert r == [(1,)]
def test_fetchmany_supports_back_as(self):
with self.db.get_cursor() as cursor:
cursor.execute("SELECT 1 as foo")
r = cursor.fetchmany(back_as=dict)
assert r == [{'foo': 1}]
cursor.execute("SELECT 2 as foo")
r = cursor.fetchmany(back_as=tuple)
assert r == [(2,)]
def test_fetchmany_raises_BadBackAs(self):
with self.db.get_cursor() as cursor:
cursor.execute("SELECT 1 as foo")
with self.assertRaises(BadBackAs) as context:
cursor.fetchmany(back_as='bar')
assert str(context.exception) == (
"%r is not a valid value for the back_as argument.\n"
"The available values are: Row, dict, namedtuple, tuple."
) % 'bar'
def test_fetchall(self):
with self.db.get_cursor(cursor_factory=SimpleTupleCursor) as cursor:
cursor.execute("SELECT 1 as foo")
r = cursor.fetchall()
assert r == [(1,)]
def test_fetchall_supports_back_as(self):
with self.db.get_cursor() as cursor:
cursor.execute("SELECT 1 as foo")
r = cursor.fetchall(back_as=dict)
assert r == [{'foo': 1}]
cursor.execute("SELECT 2 as foo")
r = cursor.fetchall(back_as=tuple)
assert r == [(2,)]
def test_fetchall_raises_BadBackAs(self):
with self.db.get_cursor() as cursor:
cursor.execute("SELECT 1 as foo")
with self.assertRaises(BadBackAs) as context:
cursor.fetchall(back_as='bar')
assert str(context.exception) == (
"%r is not a valid value for the back_as argument.\n"
"The available values are: Row, dict, namedtuple, tuple."
) % 'bar'
# cursor_factory
# ==============
class WithCursorFactory(WithSchema):
def setUp(self): # override
self.db = Postgres(cursor_factory=self.cursor_factory)
self.db.run("DROP SCHEMA IF EXISTS public CASCADE")
self.db.run("CREATE SCHEMA public")
self.db.run("CREATE TABLE foo (key text, value int)")
self.db.run("INSERT INTO foo VALUES ('buz', 42)")
self.db.run("INSERT INTO foo VALUES ('biz', 43)")
class TestNamedTupleCursorFactory(WithCursorFactory):
cursor_factory = SimpleNamedTupleCursor
def test_NamedDictCursor_results_in_namedtuples(self):
Record = namedtuple("Record", ["key", "value"])
expected = [Record(key="biz", value=43), Record(key="buz", value=42)]
actual = self.db.all("SELECT * FROM foo ORDER BY key")
assert actual == expected
assert actual[0].__class__.__name__ == 'Record'
def test_namedtuples_can_be_unrolled(self):
actual = self.db.all("SELECT value FROM foo ORDER BY key")
assert actual == [43, 42]
class TestRowCursorFactory(WithCursorFactory):
cursor_factory = SimpleRowCursor
def test_RowCursor_returns_Row_objects(self):
row = self.db.one("SELECT * FROM foo ORDER BY key LIMIT 1")
assert isinstance(row, Row)
rows = self.db.all("SELECT * FROM foo ORDER BY key")
assert all(isinstance(r, Row) for r in rows)
def test_Row_objects_can_be_unrolled(self):
actual = self.db.all("SELECT value FROM foo ORDER BY key")
assert actual == [43, 42]
def test_one(self):
r = self.db.one("SELECT * FROM foo ORDER BY key LIMIT 1")
assert isinstance(r, Row)
assert r[0] == 'biz'
assert r.key == 'biz'
assert r['key'] == 'biz'
assert r[1] == 43
assert r.value == 43
assert r['value'] == 43
assert repr(r) == "Row(key='biz', value=43)"
def test_all(self):
rows = self.db.all("SELECT * FROM foo ORDER BY key")
assert isinstance(rows[0], Row)
assert rows[0].key == 'biz'
assert rows[0].value == 43
assert rows[1].key == 'buz'
assert rows[1].value == 42
def test_iter(self):
with self.db.get_cursor() as cursor:
cursor.execute("SELECT * FROM foo ORDER BY key")
i = iter(cursor)
assert cursor.rownumber == 0
t = next(i)
assert isinstance(t, Row)
assert t.key == 'biz'
assert t.value == 43
assert cursor.rownumber == 1
assert cursor.rowcount == 2
t = next(i)
assert isinstance(t, Row)
assert t.key == 'buz'
assert t.value == 42
assert cursor.rownumber == 2
assert cursor.rowcount == 2
with self.assertRaises(StopIteration):
next(i)
assert cursor.rownumber == 2
assert cursor.rowcount == 2
def test_row_unpack(self):
foo, bar = self.db.one("SELECT 1 as foo, 2 as bar")
assert foo == 1
assert bar == 2
def test_row_comparison(self):
r = self.db.one("SELECT 1 as foo, 2 as bar")
assert r == r
assert r == (1, 2)
assert r == {'foo': 1, 'bar': 2}
assert r != None # noqa: E711
def test_special_col_names(self):
r = self.db.one('SELECT 1 as "foo.bar_baz", 2 as "?column?", 3 as "3"')
assert r['foo.bar_baz'] == 1
assert r['?column?'] == 2
assert r['3'] == 3
def test_nonascii_names(self):
r = self.db.one('SELECT 1 as \xe5h\xe9, 2 as \u2323')
assert getattr(r, '\xe5h\xe9') == 1
assert getattr(r, '\u2323') == 2