29
29
from io import BytesIO
30
30
import errno
31
31
32
- from boto .s3 .connection import S3Connection
33
- from boto .s3 .key import Key
34
- from boto .s3 .connection import NoHostProvided
32
+ import boto3
35
33
import math
36
34
from logging import getLogger
37
35
log = getLogger ('datalake-archive' )
@@ -210,7 +208,26 @@ def push(self, f):
210
208
return self .url_from_file (f )
211
209
212
210
def _upload_file (self , f ):
213
- key = self ._s3_key_from_metadata (f )
211
+
212
+ # Implementation inspired by https://stackoverflow.com/a/60892027
213
+ obj = self ._s3_object_from_metadata (f )
214
+
215
+ # NB: we have an opportunitiy to turn on threading here, which may
216
+ # improve performance. However, in some cases (i.e., queue-based
217
+ # uploader) we already use threads. So let's add it later as a
218
+ # configuration if/when we want to experiment.
219
+ config = boto3 .s3 .transfer .TransferConfig (
220
+ # All sizes are bytes
221
+ multipart_threshold = CHUNK_SIZE (),
222
+ use_threads = False ,
223
+ multipart_chunksize = CHUNK_SIZE (),
224
+ )
225
+
226
+ extra = {
227
+ 'Metadata' : {
228
+ METADATA_NAME : json .dumps (f .metadata )
229
+ }
230
+ }
214
231
215
232
spos = f .tell ()
216
233
f .seek (0 , os .SEEK_END )
@@ -220,38 +237,22 @@ def _upload_file(self, f):
220
237
221
238
num_chunks = int (math .ceil (f_size / float (CHUNK_SIZE ())))
222
239
log .info ("Uploading {} ({} B / {} chunks)" .format (
223
- key .name , f_size , num_chunks ))
224
- if num_chunks <= 1 :
225
- key .set_metadata (METADATA_NAME , json .dumps (f .metadata ))
226
- completed_size = key .set_contents_from_file (f )
227
- log .info ("Upload of {} complete (1 part / {} B)." .format (
228
- key .name , completed_size ))
229
- return
230
- completed_size = 0
240
+ obj .key , f_size , num_chunks ))
241
+
231
242
chunk = 0
232
- mp = key .bucket .initiate_multipart_upload (
233
- key .name , metadata = {
234
- METADATA_NAME : json .dumps (f .metadata )
235
- })
236
- try :
237
- for chunk in range (1 , num_chunks + 1 ):
238
- part = mp .upload_part_from_file (
239
- f , chunk , size = CHUNK_SIZE ())
240
- completed_size += part .size
241
- log .debug ("Uploaded chunk {}/{} ({}B)" .format (
242
- chunk , num_chunks , part .size ))
243
- except : # NOQA
244
- # Any exception we want to attempt to cancel_upload, otherwise
245
- # AWS will bill us every month indefnitely for storing the
246
- # partial-uploaded chunks.
247
- log .exception ("Upload of {} failed on chunk {}" .format (
248
- key .name , chunk ))
249
- mp .cancel_upload ()
250
- raise
251
- else :
252
- completed = mp .complete_upload ()
253
- log .info ("Upload of {} complete ({} parts / {} B)." .format (
254
- completed .key_name , chunk , completed_size ))
243
+
244
+ def _progress (number_of_bytes ):
245
+ nonlocal chunk
246
+ log .info ("Uploaded chunk {}/{} ({}B)" .format (
247
+ chunk , num_chunks , CHUNK_SIZE ()))
248
+ chunk += 1
249
+
250
+ # NB: deep under the hood, upload_fileobj creates a
251
+ # CreateMultipartUploadTask. And that object cleans up after itself:
252
+ # https://github.com/boto/s3transfer/blob/develop/s3transfer/tasks.py#L353-L360 # noqa
253
+ obj .upload_fileobj (f , ExtraArgs = extra , Config = config ,
254
+ Callback = _progress )
255
+ obj .wait_until_exists ()
255
256
256
257
def url_from_file (self , f ):
257
258
return self ._get_s3_url (f )
@@ -279,12 +280,11 @@ def _is_valid_http_url(self, url):
279
280
return url .startswith ('http' ) and url .endswith ('/data' )
280
281
281
282
def _fetch_s3_url (self , url , stream = False ):
282
- k = self ._get_key_from_url (url )
283
- m = self ._get_metadata_from_key (k )
283
+ obj , m = self ._get_object_from_url (url )
284
284
if stream :
285
- return StreamingFile (k , ** m )
285
+ return StreamingFile (obj . _datalake_details [ 'Body' ] , ** m )
286
286
fd = BytesIO ()
287
- k . get_contents_to_file ( fd )
287
+ self . _s3_bucket . download_fileobj ( obj . key , fd )
288
288
fd .seek (0 )
289
289
return File (fd , ** m )
290
290
@@ -331,8 +331,7 @@ def fetch_to_filename(self, url, filename_template=None):
331
331
'''
332
332
k = None
333
333
if url .startswith ('s3://' ):
334
- k = self ._get_key_from_url (url )
335
- m = self ._get_metadata_from_key (k )
334
+ obj , m = self ._get_object_from_url (url )
336
335
else :
337
336
m = self ._get_metadata_from_http_url (url )
338
337
fname = self ._get_filename_from_template (filename_template , m )
@@ -357,18 +356,19 @@ def _mkdirs(self, path):
357
356
else :
358
357
raise
359
358
360
- def _get_key_from_url (self , url ):
359
+ def _get_object_from_url (self , url ):
361
360
self ._validate_fetch_url (url )
362
361
key_name = self ._get_key_name_from_url (url )
363
- k = self ._s3_bucket .get_key (key_name )
364
- if k is None :
362
+ obj = self ._s3 .Object (self ._s3_bucket_name , key_name )
363
+ try :
364
+ # cache the results of the get on the obj to avoid superfluous
365
+ # network calls.
366
+ obj ._datalake_details = obj .get ()
367
+ m = obj ._datalake_details ['Metadata' ].get (METADATA_NAME )
368
+ except self ._s3 .meta .client .exceptions .NoSuchKey :
365
369
msg = 'Failed to find {} in the datalake.' .format (url )
366
370
raise InvalidDatalakePath (msg )
367
- return k
368
-
369
- def _get_metadata_from_key (self , key ):
370
- m = key .get_metadata (METADATA_NAME )
371
- return Metadata .from_json (m )
371
+ return obj , Metadata .from_json (m )
372
372
373
373
def _get_filename_from_template (self , template , metadata ):
374
374
if template is None :
@@ -388,7 +388,12 @@ def _get_key_name_from_url(self, url):
388
388
msg = '{} is not a valid datalake url' .format (url )
389
389
raise InvalidDatalakePath (msg )
390
390
391
- return parts .path
391
+ # NB: under boto 2 we didn't used to have to have the lstrip. It seems
392
+ # that boto2 explicitly stripped these leading slashes for us:
393
+ # https://groups.google.com/g/boto-users/c/mv--NMPUXoU ...but boto3
394
+ # does not. So we must take care to strip it whenever we parse a URL to
395
+ # get a key.
396
+ return parts .path .lstrip ('/' )
392
397
393
398
def _validate_fetch_url (self , url ):
394
399
valid_base_urls = (self .storage_url , self .http_url )
@@ -398,51 +403,47 @@ def _validate_fetch_url(self, url):
398
403
raise InvalidDatalakePath (msg )
399
404
400
405
def _get_s3_url (self , f ):
401
- key = self ._s3_key_from_metadata (f )
406
+ obj = self ._s3_object_from_metadata (f )
402
407
return self ._URL_FORMAT .format (bucket = self ._s3_bucket_name ,
403
- key = key . name )
408
+ key = obj . key )
404
409
405
410
@property
406
411
def _s3_bucket_name (self ):
407
412
return self ._parsed_storage_url .netloc
408
413
409
414
@memoized_property
410
415
def _s3_bucket (self ):
411
- # Note: we pass validate=False because we may just have push
412
- # permissions. If validate is not False, boto tries to list the
413
- # bucket. And this will 403.
414
- return self ._s3_conn .get_bucket (self ._s3_bucket_name , validate = False )
416
+ return self ._s3 .Bucket (self ._s3_bucket_name )
415
417
416
418
_KEY_FORMAT = '{id}/data'
417
419
418
- def _s3_key_from_metadata (self , f ):
419
- # For performance reasons, s3 keys should start with a short random
420
- # sequence:
421
- # https://aws.amazon.com/blogs/aws/amazon-s3-performance-tips-tricks-seattle-hiring-event/
422
- # http://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html
420
+ def _s3_object_from_metadata (self , f ):
423
421
key_name = self ._KEY_FORMAT .format (** f .metadata )
424
- return Key ( self ._s3_bucket , name = key_name )
422
+ return self ._s3_bucket . Object ( key_name )
425
423
426
424
@property
427
425
def _s3_host (self ):
428
426
h = environ .get ('AWS_S3_HOST' )
429
427
if h is not None :
430
- return h
428
+ return 'https://' + h
431
429
r = environ .get ('AWS_REGION' ) or environ .get ('AWS_DEFAULT_REGION' )
432
430
if r is not None :
433
- return 's3-' + r + '.amazonaws.com'
431
+ return 'https:// s3-' + r + '.amazonaws.com'
434
432
else :
435
- return NoHostProvided
433
+ return None
436
434
437
- @property
438
- def _s3_conn (self ):
439
- if not hasattr (self , '_conn' ):
440
- k = environ .get ('AWS_ACCESS_KEY_ID' )
441
- s = environ .get ('AWS_SECRET_ACCESS_KEY' )
442
- self ._conn = S3Connection (aws_access_key_id = k ,
443
- aws_secret_access_key = s ,
444
- host = self ._s3_host )
445
- return self ._conn
435
+ @memoized_property
436
+ def _s3 (self ):
437
+ # boto3 uses AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
438
+ # boto3 will use AWS_DEFAULT_REGION if AWS_REGION is not set
439
+ return boto3 .resource ('s3' ,
440
+ region_name = environ .get ('AWS_REGION' ),
441
+ endpoint_url = self ._s3_host )
442
+
443
+ @memoized_property
444
+ def _s3_client (self ):
445
+ boto_session = boto3 .Session ()
446
+ return boto_session .client ('s3' )
446
447
447
448
def _requests_get (self , url , ** kwargs ):
448
449
return self ._session .get (url , timeout = TIMEOUT (), ** kwargs )
0 commit comments