-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathupload.py
93 lines (72 loc) · 2.67 KB
/
upload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import os
import math
import base64
import hashlib
import requests
import traceback
import threading
import concurrent.futures
thread_local = threading.local()
class FrameioUploader(object):
def __init__(self, asset, file):
self.asset = asset
self.file = file
self.chunk_size = None
def _calculate_chunks(self, total_size, chunk_count):
self.chunk_size = int(math.ceil(total_size / chunk_count))
chunk_offsets = list()
for index in range(chunk_count):
offset_amount = index * self.chunk_size
chunk_offsets.append(offset_amount)
return chunk_offsets
def _get_content_md5(self, data):
digest = hashlib.md5(data[:]).digest()
# print(base64.b64encode(digest).decode('utf-8'))
return base64.b64encode(digest).decode('utf-8')
def _get_session(self):
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
return thread_local.session
def _smart_read_chunk(self, chunk_offset, is_final_chunk):
with open(os.path.realpath(self.file.name), "rb") as file:
file.seek(chunk_offset, 0)
if is_final_chunk: # If it's the final chunk, we want to just read until the end of the file
data = file.read()
else: # If it's not the final chunk, we want to ONLY read the specified chunk
data = file.read(self.chunk_size)
# self._get_content_md5(data)
return data
def _upload_chunk(self, task):
url = task[0]
chunk_offset = task[1]
chunk_id = task[2]
chunks_total = len(self.asset['upload_urls'])
is_final_chunk = False
if chunk_id+1 == chunks_total:
is_final_chunk = True
session = self._get_session()
chunk_data = self._smart_read_chunk(chunk_offset, is_final_chunk)
chunk_md5 = self._get_content_md5(chunk_data)
print(chunk_md5)
try:
r = session.put(url, data=chunk_data, headers={
'content-type': self.asset['filetype'],
'x-amz-acl': 'private',
'Content-MD5': chunk_md5
})
# 'Content-Length': str(len(chunk_data))
print("Completed chunk, status: {}".format(r.content))
print("Completed chunk, status: {}".format(r.status_code))
except Exception as e:
print(e)
r.raise_for_status()
def upload(self):
total_size = self.asset['filesize']
upload_urls = self.asset['upload_urls']
chunk_offsets = self._calculate_chunks(total_size, chunk_count=len(upload_urls))
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for i in range(len(upload_urls)):
url = upload_urls[i]
chunk_offset = chunk_offsets[i]
task = (url, chunk_offset, i)
executor.submit(self._upload_chunk, task)