1
- import sys
1
+ from tornado_mysql import ProgrammingError
2
+ from tornado import gen
3
+ from tornado .testing import gen_test
4
+ from tornado_mysql import NotSupportedError
2
5
3
- from tornado_mysql .tests import base
4
6
import tornado_mysql .cursors
7
+ from tornado_mysql .tests import base
5
8
6
9
7
10
class TestSSCursor (base .PyMySQLTestCase ):
11
+
12
+ data = [
13
+ ('America' , '' , 'America/Jamaica' ),
14
+ ('America' , '' , 'America/Los_Angeles' ),
15
+ ('America' , '' , 'America/Lima' ),
16
+ ('America' , '' , 'America/New_York' ),
17
+ ('America' , '' , 'America/Menominee' ),
18
+ ('America' , '' , 'America/Havana' ),
19
+ ('America' , '' , 'America/El_Salvador' ),
20
+ ('America' , '' , 'America/Costa_Rica' ),
21
+ ('America' , '' , 'America/Denver' ),
22
+ ('America' , '' , 'America/Detroit' ),]
23
+
24
+ @gen_test
8
25
def test_SSCursor (self ):
9
26
affected_rows = 18446744073709551615
10
27
11
28
conn = self .connections [0 ]
12
- data = [
13
- ('America' , '' , 'America/Jamaica' ),
14
- ('America' , '' , 'America/Los_Angeles' ),
15
- ('America' , '' , 'America/Lima' ),
16
- ('America' , '' , 'America/New_York' ),
17
- ('America' , '' , 'America/Menominee' ),
18
- ('America' , '' , 'America/Havana' ),
19
- ('America' , '' , 'America/El_Salvador' ),
20
- ('America' , '' , 'America/Costa_Rica' ),
21
- ('America' , '' , 'America/Denver' ),
22
- ('America' , '' , 'America/Detroit' ),]
23
29
24
30
try :
25
31
cursor = conn .cursor (tornado_mysql .cursors .SSCursor )
26
32
27
33
# Create table
28
- cursor .execute (('CREATE TABLE tz_data ('
29
- 'region VARCHAR(64),'
30
- 'zone VARCHAR(64),'
31
- 'name VARCHAR(64))' ))
34
+ yield cursor .execute ('DROP TABLE IF EXISTS tz_data;' )
35
+ yield cursor .execute (('CREATE TABLE tz_data ('
36
+ 'region VARCHAR(64),'
37
+ 'zone VARCHAR(64),'
38
+ 'name VARCHAR(64))' ))
32
39
33
40
# Test INSERT
34
- for i in data :
35
- cursor .execute ('INSERT INTO tz_data VALUES (%s, %s, %s)' , i )
41
+ for i in self . data :
42
+ yield cursor .execute ('INSERT INTO tz_data VALUES (%s, %s, %s)' , i )
36
43
self .assertEqual (conn .affected_rows (), 1 , 'affected_rows does not match' )
37
- conn .commit ()
38
-
44
+ yield conn .commit ()
39
45
# Test fetchone()
40
46
iter = 0
41
- cursor .execute ('SELECT * FROM tz_data' )
47
+ yield cursor .execute ('SELECT * FROM tz_data; ' )
42
48
while True :
43
- row = cursor .fetchone ()
49
+ row = yield cursor .fetchone ()
50
+
44
51
if row is None :
45
52
break
46
53
iter += 1
47
54
48
55
# Test cursor.rowcount
49
56
self .assertEqual (cursor .rowcount , affected_rows ,
50
- 'cursor.rowcount != %s' % (str (affected_rows )))
57
+ 'cursor.rowcount != %s' % (str (affected_rows )))
51
58
52
59
# Test cursor.rownumber
53
60
self .assertEqual (cursor .rownumber , iter ,
54
- 'cursor.rowcount != %s' % (str (iter )))
61
+ 'cursor.rowcount != %s' % (str (iter )))
55
62
56
63
# Test row came out the same as it went in
57
- self .assertEqual ((row in data ), True ,
58
- 'Row not found in source data' )
64
+ self .assertEqual ((row in self . data ), True ,
65
+ 'Row not found in source self. data' )
59
66
60
67
# Test fetchall
61
- cursor .execute ('SELECT * FROM tz_data' )
62
- self .assertEqual (len (cursor .fetchall ()), len (data ),
63
- 'fetchall failed. Number of rows does not match' )
68
+ yield cursor .execute ('SELECT * FROM tz_data' )
69
+ r = yield cursor .fetchall ()
70
+ self .assertEqual (len (r ), len (self .data ),
71
+ 'fetchall failed. Number of rows does not match' )
64
72
65
73
# Test fetchmany
66
- cursor .execute ('SELECT * FROM tz_data' )
67
- self .assertEqual (len (cursor .fetchmany (2 )), 2 ,
68
- 'fetchmany failed. Number of rows does not match' )
74
+ yield cursor .execute ('SELECT * FROM tz_data' )
75
+ r = yield cursor .fetchmany (2 )
76
+ self .assertEqual (len (r ), 2 ,
77
+ 'fetchmany failed. Number of rows does not match' )
69
78
70
79
# So MySQLdb won't throw "Commands out of sync"
71
80
while True :
72
- res = cursor .fetchone ()
81
+ res = yield cursor .fetchone ()
73
82
if res is None :
74
83
break
75
84
76
85
# Test update, affected_rows()
77
- cursor .execute ('UPDATE tz_data SET zone = %s' , ['Foo' ])
78
- conn .commit ()
79
- self .assertEqual (cursor .rowcount , len (data ),
80
- 'Update failed. affected_rows != %s' % (str (len (data ))))
86
+ yield cursor .execute ('UPDATE tz_data SET zone = %s' , ['Foo' ])
87
+ yield conn .commit ()
88
+ self .assertEqual (cursor .rowcount , len (self . data ),
89
+ 'Update failed. affected_rows != %s' % (str (len (self . data ))))
81
90
82
91
# Test executemany
83
- cursor .executemany ('INSERT INTO tz_data VALUES (%s, %s, %s)' , data )
84
- self .assertEqual (cursor .rowcount , len (data ),
85
- 'executemany failed. cursor.rowcount != %s' % (str (len (data ))))
92
+ yield cursor .executemany ('INSERT INTO tz_data VALUES (%s, %s, %s)' , self . data )
93
+ self .assertEqual (cursor .rowcount , len (self . data ),
94
+ 'executemany failed. cursor.rowcount != %s' % (str (len (self . data ))))
86
95
87
96
# Test multiple datasets
88
97
cursor .execute ('SELECT 1; SELECT 2; SELECT 3' )
@@ -94,11 +103,88 @@ def test_SSCursor(self):
94
103
self .assertFalse (cursor .nextset ())
95
104
96
105
finally :
97
- cursor .execute ('DROP TABLE tz_data' )
98
- cursor .close ()
106
+ yield cursor .execute ('DROP TABLE tz_data' )
107
+ yield cursor .close ()
108
+
109
+ @gen .coroutine
110
+ def _prepare (self ):
111
+ conn = self .connections [0 ]
112
+ cursor = conn .cursor ()
113
+ yield cursor .execute ('DROP TABLE IF EXISTS tz_data;' )
114
+ yield cursor .execute ('CREATE TABLE tz_data ('
115
+ 'region VARCHAR(64),'
116
+ 'zone VARCHAR(64),'
117
+ 'name VARCHAR(64))' )
118
+
119
+ yield cursor .executemany (
120
+ 'INSERT INTO tz_data VALUES (%s, %s, %s)' , self .data )
121
+ yield conn .commit ()
122
+ yield cursor .close ()
123
+
124
+ @gen .coroutine
125
+ def _cleanup (self ):
126
+ conn = self .connections [0 ]
127
+ cursor = conn .cursor ()
128
+ yield cursor .execute ('DROP TABLE IF EXISTS tz_data;' )
129
+
130
+ @gen_test
131
+ def test_sscursor_executemany (self ):
132
+ conn = self .connections [0 ]
133
+ yield self ._prepare ()
134
+ cursor = conn .cursor (tornado_mysql .cursors .SSCursor )
135
+ # Test executemany
136
+ yield cursor .executemany (
137
+ 'INSERT INTO tz_data VALUES (%s, %s, %s)' , self .data )
138
+ msg = 'executemany failed. cursor.rowcount != %s'
139
+ self .assertEqual (cursor .rowcount , len (self .data ),
140
+ msg % (str (len (self .data ))))
141
+ yield self ._cleanup ()
142
+
143
+ @gen_test
144
+ def test_sscursor_scroll_relative (self ):
145
+ conn = self .connections [0 ]
146
+ yield self ._prepare ()
147
+ cursor = conn .cursor (tornado_mysql .cursors .SSCursor )
148
+ yield cursor .execute ('SELECT * FROM tz_data;' )
149
+ yield cursor .scroll (1 )
150
+ ret = yield cursor .fetchone ()
151
+ self .assertEqual (('America' , '' , 'America/Los_Angeles' ), ret )
152
+ yield self ._cleanup ()
153
+
154
+ @gen_test
155
+ def test_sscursor_scroll_absolute (self ):
156
+ conn = self .connections [0 ]
157
+ yield self ._prepare ()
158
+ cursor = conn .cursor (tornado_mysql .cursors .SSCursor )
159
+ yield cursor .execute ('SELECT * FROM tz_data;' )
160
+ yield cursor .scroll (2 , mode = 'absolute' )
161
+ ret = yield cursor .fetchone ()
162
+ self .assertEqual (('America' , '' , 'America/Lima' ), ret )
163
+ yield self ._cleanup ()
164
+
165
+ @gen_test
166
+ def test_sscursor_scroll_errors (self ):
167
+ yield self ._prepare ()
168
+ conn = self .connections [0 ]
169
+ cursor = conn .cursor (tornado_mysql .cursors .SSCursor )
170
+
171
+ yield cursor .execute ('SELECT * FROM tz_data;' )
172
+
173
+ with self .assertRaises (NotSupportedError ):
174
+ yield cursor .scroll (- 2 , mode = 'relative' )
175
+
176
+ yield cursor .scroll (2 , mode = 'absolute' )
177
+
178
+ with self .assertRaises (NotSupportedError ):
179
+ yield cursor .scroll (1 , mode = 'absolute' )
180
+ with self .assertRaises (ProgrammingError ):
181
+ yield cursor .scroll (3 , mode = 'not_valid_mode' )
182
+ yield self ._cleanup ()
183
+
99
184
100
185
__all__ = ["TestSSCursor" ]
101
186
187
+
102
188
if __name__ == "__main__" :
103
189
import unittest
104
190
unittest .main ()
0 commit comments