55# --------------------------------------------------------------------------
66import threading
77
8- from azure .storage .common ._error import _ERROR_NO_SINGLE_THREAD_CHUNKING
9-
108
119def _download_blob_chunks (blob_service , container_name , blob_name , snapshot ,
1210 download_size , block_size , progress , start_range , end_range ,
1311 stream , max_connections , progress_callback , validate_content ,
1412 lease_id , if_modified_since , if_unmodified_since , if_match ,
1513 if_none_match , timeout , operation_context ):
16- if max_connections <= 1 :
17- raise ValueError (_ERROR_NO_SINGLE_THREAD_CHUNKING .format ('blob' ))
1814
19- downloader = _BlobChunkDownloader (
15+ downloader_class = _ParallelBlobChunkDownloader if max_connections > 1 else _SequentialBlobChunkDownloader
16+
17+ downloader = downloader_class (
2018 blob_service ,
2119 container_name ,
2220 blob_name ,
@@ -38,35 +36,42 @@ def _download_blob_chunks(blob_service, container_name, blob_name, snapshot,
3836 operation_context ,
3937 )
4038
41- import concurrent .futures
42- executor = concurrent .futures .ThreadPoolExecutor (max_connections )
43- result = list (executor .map (downloader .process_chunk , downloader .get_chunk_offsets ()))
39+ if max_connections > 1 :
40+ import concurrent .futures
41+ executor = concurrent .futures .ThreadPoolExecutor (max_connections )
42+ list (executor .map (downloader .process_chunk , downloader .get_chunk_offsets ()))
43+ else :
44+ for chunk in downloader .get_chunk_offsets ():
45+ downloader .process_chunk (chunk )
4446
4547
4648class _BlobChunkDownloader (object ):
4749 def __init__ (self , blob_service , container_name , blob_name , snapshot , download_size ,
4850 chunk_size , progress , start_range , end_range , stream ,
4951 progress_callback , validate_content , lease_id , if_modified_since ,
5052 if_unmodified_since , if_match , if_none_match , timeout , operation_context ):
53+ # identifiers for the blob
5154 self .blob_service = blob_service
5255 self .container_name = container_name
5356 self .blob_name = blob_name
5457 self .snapshot = snapshot
55- self .chunk_size = chunk_size
5658
59+ # information on the download range/chunk size
60+ self .chunk_size = chunk_size
5761 self .download_size = download_size
5862 self .start_index = start_range
5963 self .blob_end = end_range
6064
65+ # the destination that we will write to
6166 self .stream = stream
62- self . stream_start = stream . tell ()
63- self . stream_lock = threading . Lock ()
67+
68+ # progress related
6469 self .progress_callback = progress_callback
6570 self .progress_total = progress
66- self .progress_lock = threading .Lock ()
71+
72+ # parameters for each get blob operation
6773 self .timeout = timeout
6874 self .operation_context = operation_context
69-
7075 self .validate_content = validate_content
7176 self .lease_id = lease_id
7277 self .if_modified_since = if_modified_since
@@ -92,17 +97,13 @@ def process_chunk(self, chunk_start):
9297 self ._write_to_stream (chunk_data , chunk_start )
9398 self ._update_progress (length )
9499
100+ # should be provided by the subclass
95101 def _update_progress (self , length ):
96- if self .progress_callback is not None :
97- with self .progress_lock :
98- self .progress_total += length
99- total = self .progress_total
100- self .progress_callback (total , self .download_size )
102+ pass
101103
104+ # should be provided by the subclass
102105 def _write_to_stream (self , chunk_data , chunk_start ):
103- with self .stream_lock :
104- self .stream .seek (self .stream_start + (chunk_start - self .start_index ))
105- self .stream .write (chunk_data )
106+ pass
106107
107108 def _download_chunk (self , chunk_start , chunk_end ):
108109 response = self .blob_service ._get_blob (
@@ -125,3 +126,53 @@ def _download_chunk(self, chunk_start, chunk_end):
125126 # that subsequent downloads are to an unmodified blob
126127 self .if_match = response .properties .etag
127128 return response
129+
130+
131+ class _ParallelBlobChunkDownloader (_BlobChunkDownloader ):
132+ def __init__ (self , blob_service , container_name , blob_name , snapshot , download_size ,
133+ chunk_size , progress , start_range , end_range , stream ,
134+ progress_callback , validate_content , lease_id , if_modified_since ,
135+ if_unmodified_since , if_match , if_none_match , timeout , operation_context ):
136+
137+ super (_ParallelBlobChunkDownloader , self ).__init__ (blob_service , container_name , blob_name , snapshot ,
138+ download_size ,
139+ chunk_size , progress , start_range , end_range , stream ,
140+ progress_callback , validate_content , lease_id ,
141+ if_modified_since ,
142+ if_unmodified_since , if_match , if_none_match , timeout ,
143+ operation_context )
144+
145+ # for a parallel download, the stream is always seekable, so we note down the current position
146+ # in order to seek to the right place when out-of-order chunks come in
147+ self .stream_start = stream .tell ()
148+
149+ # since parallel operations are going on
150+ # it is essential to protect the writing and progress reporting operations
151+ self .stream_lock = threading .Lock ()
152+ self .progress_lock = threading .Lock ()
153+
154+ def _update_progress (self , length ):
155+ if self .progress_callback is not None :
156+ with self .progress_lock :
157+ self .progress_total += length
158+ total_so_far = self .progress_total
159+ self .progress_callback (total_so_far , self .download_size )
160+
161+ def _write_to_stream (self , chunk_data , chunk_start ):
162+ with self .stream_lock :
163+ self .stream .seek (self .stream_start + (chunk_start - self .start_index ))
164+ self .stream .write (chunk_data )
165+
166+
167+ class _SequentialBlobChunkDownloader (_BlobChunkDownloader ):
168+ def __init__ (self , * args ):
169+ super (_SequentialBlobChunkDownloader , self ).__init__ (* args )
170+
171+ def _update_progress (self , length ):
172+ if self .progress_callback is not None :
173+ self .progress_total += length
174+ self .progress_callback (self .progress_total , self .download_size )
175+
176+ def _write_to_stream (self , chunk_data , chunk_start ):
177+ # chunk_start is ignored in the case of sequential download since we cannot seek the destination stream
178+ self .stream .write (chunk_data )
0 commit comments