Skip to content

Commit d2e3afc

Browse files
committed
Better download connection handling: Detect server error earlier, fallback to single connection if possible
1 parent 9264524 commit d2e3afc

File tree

8 files changed

+150
-51
lines changed

8 files changed

+150
-51
lines changed

module/Api.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,13 @@
2929
from remote import activated
3030

3131
if activated:
32-
from remote.thriftbackend.thriftgen.pyload.ttypes import *
33-
from remote.thriftbackend.thriftgen.pyload.Pyload import Iface
34-
BaseObject = TBase
32+
try:
33+
from remote.thriftbackend.thriftgen.pyload.ttypes import *
34+
from remote.thriftbackend.thriftgen.pyload.Pyload import Iface
35+
BaseObject = TBase
36+
except ImportError:
37+
print "Thrift not imported"
38+
from remote.socketbackend.ttypes import *
3539
else:
3640
from remote.socketbackend.ttypes import *
3741

module/network/HTTPChunk.py

+20-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
1717
@author: RaNaN
1818
"""
19-
from os import remove, stat
19+
from os import remove, stat, fsync
2020
from os.path import exists
2121
from time import sleep
2222
from re import search
@@ -146,6 +146,9 @@ def __init__(self, id, parent, range=None, resume=False):
146146
self.sleep = 0.000
147147
self.lastSize = 0
148148

149+
def __repr__(self):
150+
return "<HTTPChunk id=%d, size=%d, arrived=%d>" % (self.id, self.size, self.arrived)
151+
149152
@property
150153
def cj(self):
151154
return self.p.cj
@@ -157,7 +160,7 @@ def getHandle(self):
157160
self.c.setopt(pycurl.WRITEFUNCTION, self.writeBody)
158161
self.c.setopt(pycurl.HEADERFUNCTION, self.writeHeader)
159162

160-
# request one byte more, since some servers in russia seems to have a defect arihmetic unit
163+
# request all bytes, since some servers in russia seems to have a defect arihmetic unit
161164

162165
if self.resume:
163166
self.fp = open(self.p.info.getChunkName(self.id), "ab")
@@ -259,10 +262,25 @@ def parseHeader(self):
259262

260263
self.headerParsed = True
261264

265+
def stop(self):
266+
"""The download will not proceed after next call of writeBody"""
267+
self.range = [0,0]
268+
self.size = 0
269+
270+
def resetRange(self):
271+
""" Reset the range, so the download will load all data available """
272+
self.range = None
273+
262274
def setRange(self, range):
263275
self.range = range
264276
self.size = range[1] - range[0]
265277

278+
def flushFile(self):
279+
""" flush and close file """
280+
self.fp.flush()
281+
fsync(self.fp.fileno()) #make sure everything was written to disk
282+
self.fp.close() #needs to be closed, or merging chunks will fail
283+
266284
def close(self):
267285
""" closes everything, unusable after this """
268286
if self.fp: self.fp.close()

module/network/HTTPDownload.py

+66-25
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ def download(self, chunks=1, resume=False):
140140

141141
return self._download(chunks, False)
142142
else:
143-
raise e
143+
raise
144144
finally:
145145
self.close()
146146

@@ -161,7 +161,7 @@ def _download(self, chunks, resume):
161161

162162
lastFinishCheck = 0
163163
lastTimeCheck = 0
164-
chunksDone = set()
164+
chunksDone = set() # list of curl handles that are finished
165165
chunksCreated = False
166166
done = False
167167
if self.info.getCount() > 1: # This is a resume, if we were chunked originally assume still can
@@ -202,32 +202,76 @@ def _download(self, chunks, resume):
202202
t = time()
203203

204204
# reduce these calls
205-
while lastFinishCheck + 1 < t:
205+
while lastFinishCheck + 0.5 < t:
206+
# list of failed curl handles
207+
failed = []
208+
ex = None # save only last exception, we can only raise one anyway
209+
206210
num_q, ok_list, err_list = self.m.info_read()
207211
for c in ok_list:
208-
chunksDone.add(c)
212+
chunk = self.findChunk(c)
213+
try: # check if the header implies success, else add it to failed list
214+
chunk.verifyHeader()
215+
except BadHeader, e:
216+
self.log.debug("Chunk %d failed: %s" % (chunk.id + 1, str(e)))
217+
failed.append(chunk)
218+
ex = e
219+
else:
220+
chunksDone.add(c)
221+
209222
for c in err_list:
210223
curl, errno, msg = c
211-
#test if chunk was finished, otherwise raise the exception
224+
chunk = self.findChunk(curl)
225+
#test if chunk was finished
212226
if errno != 23 or "0 !=" not in msg:
213-
raise pycurl.error(errno, msg)
214-
215-
#@TODO KeyBoardInterrupts are seen as finished chunks,
216-
#but normally not handled to this process, only in the testcase
227+
failed.append(chunk)
228+
ex = pycurl.error(errno, msg)
229+
self.log.debug("Chunk %d failed: %s" % (chunk.id + 1, str(ex)))
230+
continue
231+
232+
try: # check if the header implies success, else add it to failed list
233+
chunk.verifyHeader()
234+
except BadHeader, e:
235+
self.log.debug("Chunk %d failed: %s" % (chunk.id + 1, str(e)))
236+
failed.append(chunk)
237+
ex = e
238+
else:
239+
chunksDone.add(curl)
240+
if not num_q: # no more infos to get
241+
242+
# check if init is not finished so we reset download connections
243+
# note that other chunks are closed and downloaded with init too
244+
if failed and init not in failed and init.c not in chunksDone:
245+
self.log.error(_("Download chunks failed, fallback to single connection | %s" % (str(ex))))
246+
247+
#list of chunks to clean and remove
248+
to_clean = filter(lambda x: x is not init, self.chunks)
249+
for chunk in to_clean:
250+
self.closeChunk(chunk)
251+
self.chunks.remove(chunk)
252+
remove(self.info.getChunkName(chunk.id))
253+
254+
#let first chunk load the rest and update the info file
255+
init.resetRange()
256+
self.info.clear()
257+
self.info.addChunk("%s.chunk0" % self.filename, (0, self.size))
258+
self.info.save()
259+
elif failed:
260+
raise ex
217261

218-
chunksDone.add(curl)
219-
if not num_q:
220262
lastFinishCheck = t
221263

222-
if len(chunksDone) == len(self.chunks):
223-
done = True #all chunks loaded
264+
if len(chunksDone) >= len(self.chunks):
265+
if len(chunksDone) > len(self.chunks):
266+
self.log.warning("Finished download chunks size incorrect, please report bug.")
267+
done = True #all chunks loaded
224268

225269
break
226270

227271
if done:
228272
break #all chunks loaded
229273

230-
# calc speed once per second
274+
# calc speed once per second, averaging over 3 seconds
231275
if lastTimeCheck + 1 < t:
232276
diff = [c.arrived - (self.lastArrived[i] if len(self.lastArrived) > i else 0) for i, c in
233277
enumerate(self.chunks)]
@@ -247,15 +291,7 @@ def _download(self, chunks, resume):
247291

248292
failed = False
249293
for chunk in self.chunks:
250-
try:
251-
chunk.verifyHeader()
252-
except BadHeader, e:
253-
failed = e.code
254-
remove(self.info.getChunkName(chunk.id))
255-
256-
chunk.fp.flush()
257-
fsync(chunk.fp.fileno()) #make sure everything was written to disk
258-
chunk.fp.close() #needs to be closed, or merging chunks will fail
294+
chunk.flushFile() #make sure downloads are written to disk
259295

260296
if failed: raise BadHeader(failed)
261297

@@ -265,11 +301,16 @@ def updateProgress(self):
265301
if self.progressNotify:
266302
self.progressNotify(self.percent)
267303

304+
def findChunk(self, handle):
305+
""" linear search to find a chunk (should be ok since chunk size is usually low) """
306+
for chunk in self.chunks:
307+
if chunk.c == handle: return chunk
308+
268309
def closeChunk(self, chunk):
269310
try:
270311
self.m.remove_handle(chunk.c)
271-
except pycurl.error:
272-
self.log.debug("Error removing chunk")
312+
except pycurl.error, e:
313+
self.log.debug("Error removing chunk: %s" % str(e))
273314
finally:
274315
chunk.close()
275316

module/network/HTTPRequest.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
def myquote(url):
3131
return quote(url, safe="%/:=&?~#+!$,;'@()*[]")
3232

33+
bad_headers = range(400, 404) + range(405, 418) + range(500, 506)
3334

3435
class BadHeader(Exception):
3536
def __init__(self, code, content=""):
@@ -211,11 +212,15 @@ def load(self, url, get={}, post={}, referer=True, cookies=True, just_header=Fal
211212
def verifyHeader(self):
212213
""" raise an exceptions on bad headers """
213214
code = int(self.c.getinfo(pycurl.RESPONSE_CODE))
214-
if code in range(400, 404) or code in range(405, 418) or code in range(500, 506):
215+
if code in bad_headers:
215216
#404 will NOT raise an exception
216217
raise BadHeader(code, self.getResponse())
217218
return code
218219

220+
def checkHeader(self):
221+
""" check if header indicates failure"""
222+
return int(self.c.getinfo(pycurl.RESPONSE_CODE)) not in bad_headers
223+
219224
def getResponse(self):
220225
""" retrieve response from string io """
221226
if self.rep is None: return ""

module/remote/socketbackend/create_ttypes.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class BaseObject(object):
6868
#create init
6969
args = ["self"] + ["%s=None" % x for x in klass.__slots__]
7070

71-
f.write("\tdef init(%s):\n" % ", ".join(args))
71+
f.write("\tdef __init__(%s):\n" % ", ".join(args))
7272
for attr in klass.__slots__:
7373
f.write("\t\tself.%s = %s\n" % (attr, attr))
7474

0 commit comments

Comments
 (0)