Skip to content

Commit 1ecf9ed

Browse files
author
hearsaydev
committed
Merge pull request #1 from robertengel/master
Extend redshift_sqlalchemy by UNLOAD and COPY support
2 parents ac0edff + 902246c commit 1ecf9ed

File tree

5 files changed

+127
-24
lines changed

5 files changed

+127
-24
lines changed

redshift_sqlalchemy/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = '0.4'
1+
__version__ = '0.5.0a'
22

33
from sqlalchemy.dialects import registry
44

redshift_sqlalchemy/dialect.py

Lines changed: 93 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ def _fetch_redshift_column_attributes(self, column):
108108
text += " SORTKEY"
109109
return text
110110

111+
111112
class RedshiftDialect(PGDialect_psycopg2):
112113
name = 'redshift'
113114
ddl_compiler = RedShiftDDLCompiler
@@ -179,37 +180,113 @@ class UnloadFromSelect(Executable, ClauseElement):
179180
''' Prepares a RedShift unload statement to drop a query to Amazon S3
180181
http://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD_command_examples.html
181182
'''
182-
def __init__(self, select, bucket, access_key, secret_key, parallel='on'):
183+
def __init__(self, select, unload_location, access_key, secret_key, session_token='', options={}):
183184
''' Initializes an UnloadFromSelect instance
184185
185186
Args:
186187
self: An instance of UnloadFromSelect
187188
select: The select statement to be unloaded
188-
bucket: The Amazon S3 bucket where the result will be stored
189-
access_key: The Amazon Access Key ID
190-
secret_key: The Amazon Secret Access Key
191-
parallel: If 'ON' the result will be written to multiple files. If
192-
'OFF' the result will write to one (1) file up to 6.2GB before
193-
splitting
189+
unload_location: The Amazon S3 bucket where the result will be stored
190+
access_key - AWS Access Key (required)
191+
secret_key - AWS Secret Key (required)
192+
session_token - AWS STS Session Token (optional)
193+
options - Set of optional parameters to modify the UNLOAD sql
194+
parallel: If 'ON' the result will be written to multiple files. If
195+
'OFF' the result will write to one (1) file up to 6.2GB before
196+
splitting
197+
add_quotes: Boolean value for ADDQUOTES; defaults to True
198+
delimiter - File delimiter. Defaults to ','
194199
'''
195200
self.select = select
196-
self.bucket = bucket
201+
self.unload_location = unload_location
197202
self.access_key = access_key
198203
self.secret_key = secret_key
199-
self.parallel = parallel
204+
self.session_token = session_token
205+
self.options = options
200206

201207

202208
@compiles(UnloadFromSelect)
203209
def visit_unload_from_select(element, compiler, **kw):
204210
''' Returns the actual sql query for the UnloadFromSelect class
211+
212+
'''
213+
return """
214+
UNLOAD ('%(query)s') TO '%(unload_location)s'
215+
CREDENTIALS 'aws_access_key_id=%(access_key)s;aws_secret_access_key=%(secret_key)s%(session_token)s'
216+
DELIMITER '%(delimiter)s'
217+
%(add_quotes)s
218+
ALLOWOVERWRITE
219+
PARALLEL %(parallel)s;
220+
""" % \
221+
{'query': compiler.process(element.select, unload_select=True, literal_binds=True),
222+
'unload_location': element.unload_location,
223+
'access_key': element.access_key,
224+
'secret_key': element.secret_key,
225+
'session_token': ';token=%s' % element.session_token if element.session_token else '',
226+
'add_quotes': 'ADDQUOTES' if bool(element.options.get('add_quotes', True)) else '',
227+
'delimiter': element.options.get('delimiter', ','),
228+
'parallel': element.options.get('parallel', 'ON')}
229+
230+
231+
class CopyCommand(Executable, ClauseElement):
232+
''' Prepares a RedShift COPY statement
233+
'''
234+
def __init__(self, schema_name, table_name, data_location, access_key, secret_key, session_token='', options={}):
235+
''' Initializes a CopyCommand instance
236+
237+
Args:
238+
self: An instance of CopyCommand
239+
schema_name - Schema associated with the table_name
240+
table_name: The table to copy the data into
241+
data_location The Amazon S3 location from where to copy
242+
access_key - AWS Access Key (required)
243+
secret_key - AWS Secret Key (required)
244+
session_token - AWS STS Session Token (optional)
245+
options - Set of optional parameters to modify the COPY sql
246+
delimiter - File delimiter; defaults to ','
247+
ignore_header - Integer value of number of lines to skip at the start of each file
248+
null - String value denoting what to interpret as a NULL value from the file; defaults to '---'
249+
empty_as_null - Boolean value denoting whether to load VARCHAR fields with
250+
empty values as NULL instead of empty string; defaults to True
251+
blanks_as_null - Boolean value denoting whether to load VARCHAR fields with
252+
whitespace only values as NULL instead of whitespace; defaults to True
253+
'''
254+
self.schema_name = schema_name
255+
self.table_name = table_name
256+
self.data_location = data_location
257+
self.access_key = access_key
258+
self.secret_key = secret_key
259+
self.session_token = session_token
260+
self.options = options
261+
262+
263+
@compiles(CopyCommand)
264+
def visit_copy_command(element, compiler, **kw):
265+
''' Returns the actual sql query for the CopyCommand class
205266
'''
206-
return "unload ('%(query)s') to '%(bucket)s' credentials 'aws_access_key_id=%(access_key)s;aws_secret_access_key=%(secret_key)s' delimiter ',' addquotes allowoverwrite parallel %(parallel)s" % {
207-
'query': compiler.process(element.select, unload_select=True, literal_binds=True),
208-
'bucket': element.bucket,
209-
'access_key': element.access_key,
210-
'secret_key': element.secret_key,
211-
'parallel': element.parallel,
212-
}
267+
return """
268+
COPY %(schema_name)s.%(table_name)s FROM '%(data_location)s'
269+
CREDENTIALS 'aws_access_key_id=%(access_key)s;aws_secret_access_key=%(secret_key)s%(session_token)s'
270+
CSV
271+
TRUNCATECOLUMNS
272+
DELIMITER '%(delimiter)s'
273+
IGNOREHEADER %(ignore_header)s
274+
NULL '%(null)s'
275+
%(empty_as_null)s
276+
%(blanks_as_null)s;
277+
""" % \
278+
{'schema_name': element.schema_name,
279+
'table_name': element.table_name,
280+
'data_location': element.data_location,
281+
'access_key': element.access_key,
282+
'secret_key': element.secret_key,
283+
'session_token': ';token=%s' % element.session_token if element.session_token else '',
284+
'null': element.options.get('null', '---'),
285+
'delimiter': element.options.get('delimiter', ','),
286+
'ignore_header': element.options.get('ignore_header', 0),
287+
'empty_as_null': 'EMPTYASNULL' if bool(element.options.get('empty_as_null', True)) else '',
288+
'blanks_as_null': 'BLANKSASNULL' if bool(element.options.get('blanks_as_null', True)) else ''}
289+
213290

214291
@compiles(BindParameter)
215292
def visit_bindparam(bindparam, compiler, **kw):

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ def run_tests(self):
1010

1111
setup(
1212
name='redshift-sqlalchemy',
13-
version='0.4.1',
13+
version='0.5.0a',
1414
description='Amazon Redshift Dialect for sqlalchemy',
1515
long_description=open("README.rst").read(),
1616
author='Matt George',

tests/test_copy_command.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from unittest import TestCase
2+
from redshift_sqlalchemy.dialect import CopyCommand
3+
4+
5+
class TestCopyCommand(TestCase):
6+
def setUp(self):
7+
pass
8+
9+
def test_basic_copy_case(self):
10+
''' Tests that the simplest type of CopyCommand works
11+
'''
12+
expected_result = "COPY schema1.t1 FROM 's3://mybucket/data/listing/'\n" \
13+
" CREDENTIALS 'aws_access_key_id=cookies;aws_secret_access_key=cookies'\n" \
14+
" CSV\n TRUNCATECOLUMNS\n DELIMITER ','\n" \
15+
" IGNOREHEADER 0\n NULL '---'\n EMPTYASNULL\n" \
16+
" BLANKSASNULL;"
17+
insert = CopyCommand('schema1', 't1', 's3://mybucket/data/listing/', 'cookies', 'cookies')
18+
self.assertEqual(expected_result, str(insert).strip())

tests/test_unload_from_select.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,21 @@ def setUp(self):
1414
def test_basic_unload_case(self):
1515
''' Tests that the simplest type of UnloadFromSelect works
1616
'''
17-
expected_result = "unload ('SELECT count(t1.id) AS count_1 \nFROM t1') to 'cookies' credentials 'aws_access_key_id=cookies;aws_secret_access_key=cookies' delimiter ',' addquotes allowoverwrite parallel on"
18-
insert = UnloadFromSelect(select([func.count(self.t1.c.id)]), 'cookies', 'cookies', 'cookies')
19-
self.assertEqual(expected_result, str(insert))
17+
expected_result = "UNLOAD ('SELECT count(t1.id) AS count_1 \nFROM t1') TO 's3://bucket/key'\n " \
18+
"CREDENTIALS 'aws_access_key_id=cookies;aws_secret_access_key=cookies;token=cookies'\n" \
19+
" DELIMITER ','\n ADDQUOTES\n ALLOWOVERWRITE\n " \
20+
" PARALLEL ON;"
21+
insert = UnloadFromSelect(select([func.count(self.t1.c.id)]), 's3://bucket/key', 'cookies', 'cookies',
22+
'cookies')
23+
self.assertEqual(expected_result, str(insert).strip())
2024

2125
def test_parallel_off_unload_case(self):
2226
''' Tests that UnloadFromSelect handles parallel being set to off
2327
'''
24-
expected_result = "unload ('SELECT count(t1.id) AS count_1 \nFROM t1') to 'cookies' credentials 'aws_access_key_id=cookies;aws_secret_access_key=cookies' delimiter ',' addquotes allowoverwrite parallel off"
25-
insert = UnloadFromSelect(select([func.count(self.t1.c.id)]), 'cookies', 'cookies', 'cookies', 'off')
26-
self.assertEqual(expected_result, str(insert))
28+
expected_result = "UNLOAD ('SELECT count(t1.id) AS count_1 \nFROM t1') TO 's3://bucket/key'\n " \
29+
"CREDENTIALS 'aws_access_key_id=cookies;aws_secret_access_key=cookies;token=cookies'\n" \
30+
" DELIMITER ','\n ADDQUOTES\n ALLOWOVERWRITE\n " \
31+
" PARALLEL OFF;"
32+
insert = UnloadFromSelect(select([func.count(self.t1.c.id)]), 's3://bucket/key', 'cookies', 'cookies',
33+
'cookies', {'parallel': 'OFF'})
34+
self.assertEqual(expected_result, str(insert).strip())

0 commit comments

Comments
 (0)