Skip to content

Commit caecb70

Browse files
committed
Merge pull request #22 from robertengel/extended_unload_and_copy_commands
Extended unload and copy commands
2 parents 3deb772 + 034951a commit caecb70

File tree

5 files changed

+155
-28
lines changed

5 files changed

+155
-28
lines changed

redshift_sqlalchemy/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = '0.4'
1+
__version__ = '0.5.1a'
22

33
from sqlalchemy.dialects import registry
44

redshift_sqlalchemy/dialect.py

+99-16
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ def _fetch_redshift_column_attributes(self, column):
111111
text += " SORTKEY"
112112
return text
113113

114+
114115
class RedshiftDialect(PGDialect_psycopg2):
115116
name = 'redshift'
116117
ddl_compiler = RedShiftDDLCompiler
@@ -182,37 +183,119 @@ class UnloadFromSelect(Executable, ClauseElement):
182183
''' Prepares a RedShift unload statement to drop a query to Amazon S3
183184
http://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD_command_examples.html
184185
'''
185-
def __init__(self, select, bucket, access_key, secret_key, parallel='on'):
186+
def __init__(self, select, unload_location, access_key, secret_key, session_token='', options={}):
186187
''' Initializes an UnloadFromSelect instance
187188
188189
Args:
189190
self: An instance of UnloadFromSelect
190191
select: The select statement to be unloaded
191-
bucket: The Amazon S3 bucket where the result will be stored
192-
access_key: The Amazon Access Key ID
193-
secret_key: The Amazon Secret Access Key
194-
parallel: If 'ON' the result will be written to multiple files. If
195-
'OFF' the result will write to one (1) file up to 6.2GB before
196-
splitting
192+
unload_location: The Amazon S3 bucket where the result will be stored
193+
access_key - AWS Access Key (required)
194+
secret_key - AWS Secret Key (required)
195+
session_token - AWS STS Session Token (optional)
196+
options - Set of optional parameters to modify the UNLOAD sql
197+
parallel: If 'ON' the result will be written to multiple files. If
198+
'OFF' the result will write to one (1) file up to 6.2GB before
199+
splitting
200+
add_quotes: Boolean value for ADDQUOTES; defaults to True
201+
null_as: optional string that represents a null value in unload output
202+
delimiter - File delimiter. Defaults to ','
197203
'''
198204
self.select = select
199-
self.bucket = bucket
205+
self.unload_location = unload_location
200206
self.access_key = access_key
201207
self.secret_key = secret_key
202-
self.parallel = parallel
208+
self.session_token = session_token
209+
self.options = options
203210

204211

205212
@compiles(UnloadFromSelect)
206213
def visit_unload_from_select(element, compiler, **kw):
207214
''' Returns the actual sql query for the UnloadFromSelect class
215+
216+
'''
217+
return """
218+
UNLOAD ('%(query)s') TO '%(unload_location)s'
219+
CREDENTIALS 'aws_access_key_id=%(access_key)s;aws_secret_access_key=%(secret_key)s%(session_token)s'
220+
DELIMITER '%(delimiter)s'
221+
%(add_quotes)s
222+
%(null_as)s
223+
ALLOWOVERWRITE
224+
PARALLEL %(parallel)s;
225+
""" % \
226+
{'query': compiler.process(element.select, unload_select=True, literal_binds=True),
227+
'unload_location': element.unload_location,
228+
'access_key': element.access_key,
229+
'secret_key': element.secret_key,
230+
'session_token': ';token=%s' % element.session_token if element.session_token else '',
231+
'add_quotes': 'ADDQUOTES' if bool(element.options.get('add_quotes', True)) else '',
232+
'null_as': ("NULL '%s'" % element.options.get('null_as')) if element.options.get('null_as') else '',
233+
'delimiter': element.options.get('delimiter', ','),
234+
'parallel': element.options.get('parallel', 'ON')}
235+
236+
237+
class CopyCommand(Executable, ClauseElement):
238+
''' Prepares a RedShift COPY statement
239+
'''
240+
def __init__(self, schema_name, table_name, data_location, access_key, secret_key, session_token='', options={}):
241+
''' Initializes a CopyCommand instance
242+
243+
Args:
244+
self: An instance of CopyCommand
245+
schema_name - Schema associated with the table_name
246+
table_name: The table to copy the data into
247+
data_location The Amazon S3 location from where to copy - or a manifest file if 'manifest' option is used
248+
access_key - AWS Access Key (required)
249+
secret_key - AWS Secret Key (required)
250+
session_token - AWS STS Session Token (optional)
251+
options - Set of optional parameters to modify the COPY sql
252+
delimiter - File delimiter; defaults to ','
253+
ignore_header - Integer value of number of lines to skip at the start of each file
254+
null - Optional string value denoting what to interpret as a NULL value from the file
255+
manifest - Boolean value denoting whether data_location is a manifest file; defaults to False
256+
empty_as_null - Boolean value denoting whether to load VARCHAR fields with
257+
empty values as NULL instead of empty string; defaults to True
258+
blanks_as_null - Boolean value denoting whether to load VARCHAR fields with
259+
whitespace only values as NULL instead of whitespace; defaults to True
260+
'''
261+
self.schema_name = schema_name
262+
self.table_name = table_name
263+
self.data_location = data_location
264+
self.access_key = access_key
265+
self.secret_key = secret_key
266+
self.session_token = session_token
267+
self.options = options
268+
269+
270+
@compiles(CopyCommand)
271+
def visit_copy_command(element, compiler, **kw):
272+
''' Returns the actual sql query for the CopyCommand class
208273
'''
209-
return "unload ('%(query)s') to '%(bucket)s' credentials 'aws_access_key_id=%(access_key)s;aws_secret_access_key=%(secret_key)s' delimiter ',' addquotes allowoverwrite parallel %(parallel)s" % {
210-
'query': compiler.process(element.select, unload_select=True, literal_binds=True),
211-
'bucket': element.bucket,
212-
'access_key': element.access_key,
213-
'secret_key': element.secret_key,
214-
'parallel': element.parallel,
215-
}
274+
return """
275+
COPY %(schema_name)s.%(table_name)s FROM '%(data_location)s'
276+
CREDENTIALS 'aws_access_key_id=%(access_key)s;aws_secret_access_key=%(secret_key)s%(session_token)s'
277+
CSV
278+
TRUNCATECOLUMNS
279+
DELIMITER '%(delimiter)s'
280+
IGNOREHEADER %(ignore_header)s
281+
%(null)s
282+
%(manifest)s
283+
%(empty_as_null)s
284+
%(blanks_as_null)s;
285+
""" % \
286+
{'schema_name': element.schema_name,
287+
'table_name': element.table_name,
288+
'data_location': element.data_location,
289+
'access_key': element.access_key,
290+
'secret_key': element.secret_key,
291+
'session_token': ';token=%s' % element.session_token if element.session_token else '',
292+
'null': ("NULL '%s'" % element.options.get('null')) if element.options.get('null') else '',
293+
'delimiter': element.options.get('delimiter', ','),
294+
'ignore_header': element.options.get('ignore_header', 0),
295+
'manifest': 'MANIFEST' if bool(element.options.get('manifest', False)) else '',
296+
'empty_as_null': 'EMPTYASNULL' if bool(element.options.get('empty_as_null', True)) else '',
297+
'blanks_as_null': 'BLANKSASNULL' if bool(element.options.get('blanks_as_null', True)) else ''}
298+
216299

217300
@compiles(BindParameter)
218301
def visit_bindparam(bindparam, compiler, **kw):

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ def run_tests(self):
1010

1111
setup(
1212
name='redshift-sqlalchemy',
13-
version='0.4.1',
13+
version='0.5.1a',
1414
description='Amazon Redshift Dialect for sqlalchemy',
1515
long_description=open("README.rst").read(),
1616
author='Matt George',

tests/test_copy_command.py

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from unittest import TestCase
2+
from redshift_sqlalchemy.dialect import CopyCommand
3+
import re
4+
5+
6+
class TestCopyCommand(TestCase):
7+
8+
def setUp(self):
9+
pass
10+
11+
def test_basic_copy_case(self):
12+
'''
13+
Tests that the simplest type of CopyCommand works
14+
'''
15+
expected_result = re.sub(r'\s+', ' ',
16+
"COPY schema1.t1 FROM 's3://mybucket/data/listing/' "
17+
"CREDENTIALS 'aws_access_key_id=cookies;aws_secret_access_key=cookies' "
18+
"CSV TRUNCATECOLUMNS DELIMITER ',' IGNOREHEADER 0 EMPTYASNULL BLANKSASNULL;").strip()
19+
copy = CopyCommand('schema1', 't1', 's3://mybucket/data/listing/', 'cookies', 'cookies')
20+
21+
copy_str = re.sub(r'\s+', ' ', str(copy)).strip()
22+
23+
self.assertEqual(expected_result, copy_str)

tests/test_unload_from_select.py

+31-10
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,46 @@
22
from sqlalchemy import Table, Column, Integer, String, MetaData
33
from sqlalchemy.sql import select, func
44
from redshift_sqlalchemy.dialect import UnloadFromSelect
5+
import re
56

67

78
class TestUnloadFromSelect(TestCase):
9+
810
def setUp(self):
9-
''' Sets up a table and associate meta data for the test queries to build against
11+
'''
12+
Sets up a table and associate meta data for the test queries to build against
1013
'''
1114
self.metadata = MetaData()
1215
self.t1 = Table('t1', self.metadata, Column('id', Integer, primary_key=True), Column('name', String))
1316

1417
def test_basic_unload_case(self):
15-
''' Tests that the simplest type of UnloadFromSelect works
1618
'''
17-
expected_result = "unload ('SELECT count(t1.id) AS count_1 \nFROM t1') to 'cookies' credentials 'aws_access_key_id=cookies;aws_secret_access_key=cookies' delimiter ',' addquotes allowoverwrite parallel on"
18-
insert = UnloadFromSelect(select([func.count(self.t1.c.id)]), 'cookies', 'cookies', 'cookies')
19-
self.assertEqual(expected_result, str(insert))
19+
Tests that the simplest type of UnloadFromSelect works
20+
'''
21+
expected_result = re.sub(r'\s+', ' ',
22+
"UNLOAD ('SELECT count(t1.id) AS count_1 \nFROM t1') TO 's3://bucket/key' "
23+
"CREDENTIALS 'aws_access_key_id=cookies;aws_secret_access_key=cookies;token=cookies' "
24+
"DELIMITER ',' ADDQUOTES ALLOWOVERWRITE PARALLEL ON;").strip()
25+
26+
unload = UnloadFromSelect(select([func.count(self.t1.c.id)]), 's3://bucket/key', 'cookies', 'cookies',
27+
'cookies')
28+
29+
unload_str = re.sub(r'\s+', ' ', str(unload)).strip()
2030

21-
def test_parallel_off_unload_case(self):
22-
''' Tests that UnloadFromSelect handles parallel being set to off
31+
self.assertEqual(expected_result, unload_str)
32+
33+
def test_unload_with_options(self):
34+
'''
35+
Tests that UnloadFromSelect handles options correctly
2336
'''
24-
expected_result = "unload ('SELECT count(t1.id) AS count_1 \nFROM t1') to 'cookies' credentials 'aws_access_key_id=cookies;aws_secret_access_key=cookies' delimiter ',' addquotes allowoverwrite parallel off"
25-
insert = UnloadFromSelect(select([func.count(self.t1.c.id)]), 'cookies', 'cookies', 'cookies', 'off')
26-
self.assertEqual(expected_result, str(insert))
37+
expected_result = re.sub(r'\s+', ' ',
38+
"UNLOAD ('SELECT count(t1.id) AS count_1 \nFROM t1') TO 's3://bucket/key' "
39+
"CREDENTIALS 'aws_access_key_id=cookies;aws_secret_access_key=cookies;token=cookies' "
40+
"DELIMITER ',' ADDQUOTES NULL '---' ALLOWOVERWRITE PARALLEL OFF;").strip()
41+
42+
unload = UnloadFromSelect(select([func.count(self.t1.c.id)]), 's3://bucket/key', 'cookies', 'cookies',
43+
'cookies', {'parallel': 'OFF', 'null_as': '---'})
44+
45+
unload_str = re.sub(r'\s+', ' ', str(unload)).strip()
46+
47+
self.assertEqual(expected_result, unload_str)

0 commit comments

Comments
 (0)