@@ -45,6 +45,7 @@ extension URLSession {
45
45
let queue : DispatchQueue
46
46
let group = DispatchGroup ( )
47
47
fileprivate var easyHandles : [ _EasyHandle ] = [ ]
48
+ fileprivate var socketReferences : [ CFURLSession_socket_t : _SocketReference ] = [ : ]
48
49
fileprivate var timeoutSource : _TimeoutSource ? = nil
49
50
private var reentrantInUpdateTimeoutTimer = false
50
51
@@ -127,13 +128,14 @@ fileprivate extension URLSession._MultiHandle {
127
128
if let opaque = socketSourcePtr {
128
129
Unmanaged < _SocketSources > . fromOpaque ( opaque) . release ( )
129
130
}
131
+ socketSources? . tearDown ( handle: self , socket: socket, queue: queue)
130
132
socketSources = nil
131
133
}
132
134
if let ss = socketSources {
133
135
let handler = DispatchWorkItem { [ weak self] in
134
136
self ? . performAction ( for: socket)
135
137
}
136
- ss. createSources ( with: action, socket: socket, queue: queue, handler: handler)
138
+ ss. createSources ( with: action, handle : self , socket: socket, queue: queue, handler: handler)
137
139
}
138
140
return 0
139
141
}
@@ -161,9 +163,104 @@ extension Collection where Element == _EasyHandle {
161
163
}
162
164
}
163
165
166
+ private extension URLSession . _MultiHandle {
167
+ class _SocketReference {
168
+ let socket : CFURLSession_socket_t
169
+ var shouldClose : Bool
170
+ var workItem : DispatchWorkItem ?
171
+
172
+ init ( socket: CFURLSession_socket_t ) {
173
+ self . socket = socket
174
+ shouldClose = false
175
+ }
176
+
177
+ deinit {
178
+ if shouldClose {
179
+ #if os(Windows)
180
+ closesocket ( socket)
181
+ #else
182
+ close ( socket)
183
+ #endif
184
+ }
185
+ }
186
+ }
187
+
188
+ /// Creates and stores socket reference. Reentrancy is not supported.
189
+ /// Trying to begin operation for same socket twice would mean something
190
+ /// went horribly wrong, or our assumptions about CURL register/unregister
191
+ /// action flow are nor correct.
192
+ func beginOperation( for socket: CFURLSession_socket_t ) -> _SocketReference {
193
+ let reference = _SocketReference ( socket: socket)
194
+ precondition ( socketReferences. updateValue ( reference, forKey: socket) == nil , " Reentrancy is not supported for socket operations " )
195
+ return reference
196
+ }
197
+
198
+ /// Removes socket reference from the shared store. If there is work item scheduled,
199
+ /// executes it on the current thread.
200
+ func endOperation( for socketReference: _SocketReference ) {
201
+ precondition ( socketReferences. removeValue ( forKey: socketReference. socket) != nil , " No operation associated with the socket " )
202
+ if let workItem = socketReference. workItem, !workItem. isCancelled {
203
+ // CURL never asks for socket close without unregistering first, and
204
+ // we should cancel pending work when unregister action is requested.
205
+ precondition ( !socketReference. shouldClose, " Socket close was scheduled, but there is some pending work left " )
206
+ workItem. perform ( )
207
+ }
208
+ }
209
+
210
+ /// Marks this reference to close socket on deinit. This allows us
211
+ /// to extend socket lifecycle by keeping the reference alive.
212
+ func scheduleClose( for socket: CFURLSession_socket_t ) {
213
+ let reference = socketReferences [ socket] ?? _SocketReference ( socket: socket)
214
+ reference. shouldClose = true
215
+ }
216
+
217
+ /// Schedules work to be performed when an operation ends for the socket,
218
+ /// or performs it immediately if there is no operation in progress.
219
+ ///
220
+ /// We're using this to postpone Dispatch Source creation when
221
+ /// previous Dispatch Source is not cancelled yet.
222
+ func schedule( _ workItem: DispatchWorkItem , for socket: CFURLSession_socket_t ) {
223
+ guard let socketReference = socketReferences [ socket] else {
224
+ workItem. perform ( )
225
+ return
226
+ }
227
+ // CURL never asks for register without pairing it with unregister later,
228
+ // and we're cancelling pending work item on unregister.
229
+ // But it is safe to just drop existing work item anyway,
230
+ // and replace it with the new one.
231
+ socketReference. workItem = workItem
232
+ }
233
+
234
+ /// Cancels pending work for socket operation. Does nothing if
235
+ /// there is no operation in progress or no pending work item.
236
+ ///
237
+ /// CURL may become not interested in Dispatch Sources
238
+ /// we have planned to create. In this case we should just cancel
239
+ /// scheduled work.
240
+ func cancelWorkItem( for socket: CFURLSession_socket_t ) {
241
+ guard let socketReference = socketReferences [ socket] else {
242
+ return
243
+ }
244
+ socketReference. workItem? . cancel ( )
245
+ socketReference. workItem = nil
246
+ }
247
+
248
+ }
249
+
164
250
internal extension URLSession . _MultiHandle {
165
251
/// Add an easy handle -- start its transfer.
166
252
func add( _ handle: _EasyHandle ) {
253
+ // Set CLOSESOCKETFUNCTION. Note that while the option belongs to easy_handle,
254
+ // the connection cache is managed by CURL multi_handle, and sockets can actually
255
+ // outlive easy_handle (even after curl_easy_cleanup call). That's why
256
+ // socket management lives in _MultiHandle.
257
+ try ! CFURLSession_easy_setopt_ptr ( handle. rawHandle, CFURLSessionOptionCLOSESOCKETDATA, UnsafeMutableRawPointer ( Unmanaged . passUnretained ( self ) . toOpaque ( ) ) ) . asError ( )
258
+ try ! CFURLSession_easy_setopt_scl ( handle. rawHandle, CFURLSessionOptionCLOSESOCKETFUNCTION) { ( clientp: UnsafeMutableRawPointer ? , item: CFURLSession_socket_t ) in
259
+ guard let handle = URLSession . _MultiHandle. from ( callbackUserData: clientp) else { fatalError ( ) }
260
+ handle. scheduleClose ( for: item)
261
+ return 0
262
+ } . asError ( )
263
+
167
264
// If this is the first handle being added, we need to `kick` the
168
265
// underlying multi handle by calling `timeoutTimerFired` as
169
266
// described in
@@ -426,6 +523,7 @@ fileprivate class _SocketSources {
426
523
427
524
func createReadSource( socket: CFURLSession_socket_t , queue: DispatchQueue , handler: DispatchWorkItem ) {
428
525
guard readSource == nil else { return }
526
+
429
527
#if os(Windows)
430
528
let s = DispatchSource . makeReadSource ( handle: HANDLE ( bitPattern: Int ( socket) ) !, queue: queue)
431
529
#else
@@ -448,25 +546,56 @@ fileprivate class _SocketSources {
448
546
s. resume ( )
449
547
}
450
548
451
- func tearDown( ) {
452
- if let s = readSource {
453
- s. cancel ( )
549
+ func tearDown( handle: URLSession . _MultiHandle , socket: CFURLSession_socket_t , queue: DispatchQueue ) {
550
+ handle. cancelWorkItem ( for: socket) // There could be pending register action which needs to be cancelled
551
+
552
+ guard readSource != nil , writeSource != nil else {
553
+ // This means that we have posponed (and already abandoned)
554
+ // sources creation.
555
+ return
454
556
}
455
- readSource = nil
456
- if let s = writeSource {
457
- s. cancel ( )
557
+
558
+ // Socket is guaranteed to not to be closed as long as we keeping
559
+ // the reference.
560
+ let socketReference = handle. beginOperation ( for: socket)
561
+ let cancelHandlerGroup = DispatchGroup ( )
562
+ [ readSource, writeSource] . compactMap ( { $0 } ) . forEach { source in
563
+ cancelHandlerGroup. enter ( )
564
+ source. setCancelHandler {
565
+ cancelHandlerGroup. leave ( )
566
+ }
567
+ source. cancel ( )
568
+ }
569
+ cancelHandlerGroup. notify ( queue: queue) {
570
+ handle. endOperation ( for: socketReference)
458
571
}
572
+
573
+ readSource = nil
459
574
writeSource = nil
460
575
}
461
576
}
462
577
extension _SocketSources {
463
578
/// Create a read and/or write source as specified by the action.
464
- func createSources( with action: URLSession . _MultiHandle . _SocketRegisterAction , socket: CFURLSession_socket_t , queue: DispatchQueue , handler: DispatchWorkItem ) {
465
- if action. needsReadSource {
466
- createReadSource ( socket: socket, queue: queue, handler: handler)
579
+ func createSources( with action: URLSession . _MultiHandle . _SocketRegisterAction , handle: URLSession . _MultiHandle , socket: CFURLSession_socket_t , queue: DispatchQueue , handler: DispatchWorkItem ) {
580
+ // CURL casually requests to unregister and register handlers for same
581
+ // socket in a row. There is (pretty low) chance of overlapping tear-down operation
582
+ // with "register" request. Bad things could happen if we create
583
+ // a new Dispatch Source while other is being cancelled for the same socket.
584
+ // We're using `_MultiHandle.schedule(_:for:)` here to postpone sources creation until
585
+ // pending operation is finished (if there is none, submitted work item is performed
586
+ // immediately).
587
+ // Also, CURL may request unregister even before we perform any postponed work,
588
+ // so we have to cancel such work in such case. See
589
+ let createSources = DispatchWorkItem {
590
+ if action. needsReadSource {
591
+ self . createReadSource ( socket: socket, queue: queue, handler: handler)
592
+ }
593
+ if action. needsWriteSource {
594
+ self . createWriteSource ( socket: socket, queue: queue, handler: handler)
595
+ }
467
596
}
468
- if action. needsWriteSource {
469
- createWriteSource ( socket : socket , queue : queue , handler : handler )
597
+ if ( action. needsReadSource || action . needsWriteSource) {
598
+ handle . schedule ( createSources , for : socket )
470
599
}
471
600
}
472
601
}
0 commit comments