-
Notifications
You must be signed in to change notification settings - Fork 348
/
Copy pathdb_helpers.py
221 lines (163 loc) · 6.53 KB
/
db_helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
import os
import subprocess
import sqlite3
import pytest
from django.conf import settings
from django.db import connection
from django.db import transaction
from .compat import force_text
from .app.models import Item
DB_NAME = settings.DATABASES['default']['NAME']
if DB_NAME == ':memory:':
TEST_DB_NAME = DB_NAME
else:
DB_NAME += '_db_test'
TEST_DB_NAME = 'test_' + DB_NAME
def get_db_engine():
from django.conf import settings
return settings.DATABASES['default']['ENGINE'].split('.')[-1]
class CmdResult(object):
def __init__(self, status_code, std_out, std_err):
self.status_code = status_code
self.std_out = std_out
self.std_err = std_err
def run_cmd(*args):
r = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdoutdata, stderrdata = r.communicate()
ret = r.wait()
return CmdResult(ret, stdoutdata, stderrdata)
def run_mysql(*args):
from django.conf import settings
user = settings.DATABASES['default'].get('USER', None)
if user:
args = ('-u', user) + tuple(args)
args = ('mysql',) + tuple(args)
return run_cmd(*args)
def skip_if_sqlite():
from django.conf import settings
if settings.DATABASES['default']['ENGINE'] == 'django.db.backends.sqlite3':
pytest.skip('Skip if sqlite3 backend')
def skip_if_sqlite_in_memory():
from django.conf import settings
if settings.DATABASES['default']['ENGINE'] == 'django.db.backends.sqlite3' \
and settings.DATABASES['default']['NAME'] == ':memory:':
pytest.skip('Do not test db reuse since database does not support it')
def create_empty_production_database():
drop_database(name=DB_NAME)
if get_db_engine() == 'postgresql_psycopg2':
r = run_cmd('psql', 'postgres', '-c', 'CREATE DATABASE %s' % DB_NAME)
assert ('CREATE DATABASE' in force_text(r.std_out) or
'already exists' in force_text(r.std_err))
return
if get_db_engine() == 'mysql':
r = run_mysql('-e', 'CREATE DATABASE %s' % DB_NAME)
assert (r.status_code == 0 or
'database exists' in force_text(r.std_out) or
'database exists' in force_text(r.std_err))
return
if get_db_engine() == 'sqlite3':
if DB_NAME == ':memory:':
raise AssertionError(
'sqlite in-memory database must not be created!')
open(DB_NAME, 'a').close()
return
raise AssertionError('%s cannot be tested properly' % get_db_engine())
def drop_database(name=TEST_DB_NAME, suffix=None):
assert bool(name) ^ bool(suffix), 'name and suffix cannot be used together'
if suffix:
name = '%s_%s' % (name, suffix)
if get_db_engine() == 'postgresql_psycopg2':
r = run_cmd('psql', 'postgres', '-c', 'DROP DATABASE %s' % name)
assert ('DROP DATABASE' in force_text(r.std_out) or
'does not exist' in force_text(r.std_err))
return
if get_db_engine() == 'mysql':
r = run_mysql('-e', 'DROP DATABASE %s' % name)
assert ('database doesn\'t exist' in force_text(r.std_err) or
r.status_code == 0)
return
if get_db_engine() == 'sqlite3':
if name == ':memory:':
raise AssertionError(
'sqlite in-memory database cannot be dropped!')
if os.path.exists(name):
os.unlink(name)
return
raise AssertionError('%s cannot be tested properly!' % get_db_engine())
def db_exists(db_suffix=None):
name = TEST_DB_NAME
if db_suffix:
name = '%s_%s' % (name, db_suffix)
if get_db_engine() == 'postgresql_psycopg2':
r = run_cmd('psql', name, '-c', 'SELECT 1')
return r.status_code == 0
if get_db_engine() == 'mysql':
r = run_mysql(name, '-e', 'SELECT 1')
return r.status_code == 0
if get_db_engine() == 'sqlite3':
if TEST_DB_NAME == ':memory:':
raise AssertionError(
'sqlite in-memory database cannot be checked for existence!')
return os.path.exists(name)
raise AssertionError('%s cannot be tested properly!' % get_db_engine())
def mark_database():
if get_db_engine() == 'postgresql_psycopg2':
r = run_cmd('psql', TEST_DB_NAME, '-c', 'CREATE TABLE mark_table();')
assert r.status_code == 0
return
if get_db_engine() == 'mysql':
r = run_mysql(TEST_DB_NAME, '-e', 'CREATE TABLE mark_table(kaka int);')
assert r.status_code == 0
return
if get_db_engine() == 'sqlite3':
if TEST_DB_NAME == ':memory:':
raise AssertionError('sqlite in-memory database cannot be marked!')
conn = sqlite3.connect(TEST_DB_NAME)
try:
with conn:
conn.execute('CREATE TABLE mark_table(kaka int);')
finally: # Close the DB even if an error is raised
conn.close()
return
raise AssertionError('%s cannot be tested properly!' % get_db_engine())
def mark_exists():
if get_db_engine() == 'postgresql_psycopg2':
r = run_cmd('psql', TEST_DB_NAME, '-c', 'SELECT 1 FROM mark_table')
# When something pops out on std_out, we are good
return bool(r.std_out)
if get_db_engine() == 'mysql':
r = run_mysql(TEST_DB_NAME, '-e', 'SELECT 1 FROM mark_table')
return r.status_code == 0
if get_db_engine() == 'sqlite3':
if TEST_DB_NAME == ':memory:':
raise AssertionError(
'sqlite in-memory database cannot be checked for mark!')
conn = sqlite3.connect(TEST_DB_NAME)
try:
with conn:
conn.execute('SELECT 1 FROM mark_table')
return True
except sqlite3.OperationalError:
return False
finally: # Close the DB even if an error is raised
conn.close()
raise AssertionError('%s cannot be tested properly!' % get_db_engine())
def noop_transactions():
"""Test whether transactions are disabled.
Return True if transactions are disabled, False if they are
enabled.
"""
# Newer versions of Django simply run standard tests in an atomic block.
if hasattr(connection, 'in_atomic_block'):
return connection.in_atomic_block
else:
with transaction.commit_manually():
Item.objects.create(name='transaction_noop_test')
transaction.rollback()
try:
item = Item.objects.get(name='transaction_noop_test')
except Item.DoesNotExist:
return False
else:
item.delete()
return True