@@ -70,29 +70,21 @@ def __init__(
70
70
self .server = server
71
71
self .adj = adj
72
72
self .outbufs = [OverflowableBuffer (adj .outbuf_overflow )]
73
+ self .total_outbufs_len = 0
73
74
self .creation_time = self .last_activity = time .time ()
74
75
75
76
# task_lock used to push/pop requests
76
77
self .task_lock = threading .Lock ()
77
78
# outbuf_lock used to access any outbuf
78
- self .outbuf_lock = threading .Lock ()
79
+ self .outbuf_lock = threading .RLock ()
79
80
80
81
wasyncore .dispatcher .__init__ (self , sock , map = map )
81
82
82
83
# Don't let wasyncore.dispatcher throttle self.addr on us.
83
84
self .addr = addr
84
85
85
86
def any_outbuf_has_data (self ):
86
- for outbuf in self .outbufs :
87
- if bool (outbuf ):
88
- return True
89
- return False
90
-
91
- def total_outbufs_len (self ):
92
- # genexpr == more funccalls
93
- # use b.__len__ rather than len(b) FBO of not getting OverflowError
94
- # on Python 2
95
- return sum ([b .__len__ () for b in self .outbufs ])
87
+ return self .total_outbufs_len > 0
96
88
97
89
def writable (self ):
98
90
# if there's data in the out buffer or we've been instructed to close
@@ -124,7 +116,7 @@ def handle_write(self):
124
116
# won't get done.
125
117
flush = self ._flush_some_if_lockable
126
118
self .force_flush = False
127
- elif (self .total_outbufs_len () >= self .adj .send_bytes ):
119
+ elif (self .total_outbufs_len >= self .adj .send_bytes ):
128
120
# 1. There's a running task, so we need to try to lock
129
121
# the outbuf before sending
130
122
# 2. Only try to send if the data in the out buffer is larger
@@ -196,7 +188,9 @@ def received(self, data):
196
188
if not self .sent_continue :
197
189
# there's no current task, so we don't need to try to
198
190
# lock the outbuf to append to it.
199
- self .outbufs [- 1 ].append (b'HTTP/1.1 100 Continue\r \n \r \n ' )
191
+ outbuf_payload = b'HTTP/1.1 100 Continue\r \n \r \n '
192
+ self .outbufs [- 1 ].append (outbuf_payload )
193
+ self .total_outbufs_len += len (outbuf_payload )
200
194
self .sent_continue = True
201
195
self ._flush_some ()
202
196
request .completed = False
@@ -261,6 +255,7 @@ def _flush_some(self):
261
255
outbuf .skip (num_sent , True )
262
256
outbuflen -= num_sent
263
257
sent += num_sent
258
+ self .total_outbufs_len -= num_sent
264
259
else :
265
260
dobreak = True
266
261
break
@@ -275,15 +270,15 @@ def _flush_some(self):
275
270
return False
276
271
277
272
def handle_close (self ):
278
- # avoid closing the outbufs while a task is potentially adding data
279
- # to them in write_soon
273
+ # NB: default to True for when asyncore calls this function directly
280
274
with self .outbuf_lock :
281
275
for outbuf in self .outbufs :
282
276
try :
283
277
outbuf .close ()
284
278
except :
285
279
self .logger .exception (
286
280
'Unknown exception while trying to close outbuf' )
281
+ self .total_outbufs_len = 0
287
282
self .connected = False
288
283
wasyncore .dispatcher .close (self )
289
284
@@ -330,11 +325,13 @@ def write_soon(self, data):
330
325
self .outbufs .append (nextbuf )
331
326
else :
332
327
self .outbufs [- 1 ].append (data )
328
+ num_bytes = len (data )
329
+ self .total_outbufs_len += num_bytes
333
330
# XXX We might eventually need to pull the trigger here (to
334
331
# instruct select to stop blocking), but it slows things down so
335
332
# much that I'll hold off for now; "server push" on otherwise
336
333
# unbusy systems may suffer.
337
- return len ( data )
334
+ return num_bytes
338
335
return 0
339
336
340
337
def service (self ):
0 commit comments