Skip to content

Commit bab881e

Browse files
committed
Cancel DispatchSource before closing socket (swiftlang#4791)
Extends socket lifetime enough to let DispatchSource cancel properly. Also prevents from creating new DispatchSources while other are in the middle of cancelling. Also includes tests (see swiftlang#4854 for test details).
1 parent 7258a8d commit bab881e

File tree

5 files changed

+290
-28
lines changed

5 files changed

+290
-28
lines changed

CoreFoundation/URL.subproj/CFURLSessionInterface.c

+4
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,10 @@ CFURLSessionEasyCode CFURLSession_easy_setopt_tc(CFURLSessionEasyHandle _Nonnull
111111
return MakeEasyCode(curl_easy_setopt(curl, option.value, a));
112112
}
113113

114+
CFURLSessionEasyCode CFURLSession_easy_setopt_scl(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionOption option, CFURLSessionCloseSocketCallback * _Nullable a) {
115+
return MakeEasyCode(curl_easy_setopt(curl, option.value, a));
116+
}
117+
114118
CFURLSessionEasyCode CFURLSession_easy_getinfo_long(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionInfo info, long *_Nonnull a) {
115119
return MakeEasyCode(curl_easy_getinfo(curl, info.value, a));
116120
}

CoreFoundation/URL.subproj/CFURLSessionInterface.h

+2
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,8 @@ typedef int (CFURLSessionSeekCallback)(void *_Nullable userp, long long offset,
625625
CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_setopt_seek(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionOption option, CFURLSessionSeekCallback * _Nullable a);
626626
typedef int (CFURLSessionTransferInfoCallback)(void *_Nullable userp, long long dltotal, long long dlnow, long long ultotal, long long ulnow);
627627
CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_setopt_tc(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionOption option, CFURLSessionTransferInfoCallback * _Nullable a);
628+
typedef int (CFURLSessionCloseSocketCallback)(void *_Nullable clientp, CFURLSession_socket_t item);
629+
CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_setopt_scl(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionOption option, CFURLSessionCloseSocketCallback * _Nullable a);
628630

629631
CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_getinfo_long(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionInfo info, long *_Nonnull a);
630632
CF_EXPORT CFURLSessionEasyCode CFURLSession_easy_getinfo_double(CFURLSessionEasyHandle _Nonnull curl, CFURLSessionInfo info, double *_Nonnull a);

Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift

+140-12
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ extension URLSession {
4545
let queue: DispatchQueue
4646
let group = DispatchGroup()
4747
fileprivate var easyHandles: [_EasyHandle] = []
48+
fileprivate var socketReferences: [CFURLSession_socket_t: _SocketReference] = [:]
4849
fileprivate var timeoutSource: _TimeoutSource? = nil
4950
private var reentrantInUpdateTimeoutTimer = false
5051

@@ -127,13 +128,14 @@ fileprivate extension URLSession._MultiHandle {
127128
if let opaque = socketSourcePtr {
128129
Unmanaged<_SocketSources>.fromOpaque(opaque).release()
129130
}
131+
socketSources?.tearDown(handle: self, socket: socket, queue: queue)
130132
socketSources = nil
131133
}
132134
if let ss = socketSources {
133135
let handler = DispatchWorkItem { [weak self] in
134136
self?.performAction(for: socket)
135137
}
136-
ss.createSources(with: action, socket: socket, queue: queue, handler: handler)
138+
ss.createSources(with: action, handle: self, socket: socket, queue: queue, handler: handler)
137139
}
138140
return 0
139141
}
@@ -161,9 +163,104 @@ extension Collection where Element == _EasyHandle {
161163
}
162164
}
163165

166+
private extension URLSession._MultiHandle {
167+
class _SocketReference {
168+
let socket: CFURLSession_socket_t
169+
var shouldClose: Bool
170+
var workItem: DispatchWorkItem?
171+
172+
init(socket: CFURLSession_socket_t) {
173+
self.socket = socket
174+
shouldClose = false
175+
}
176+
177+
deinit {
178+
if shouldClose {
179+
#if os(Windows)
180+
closesocket(socket)
181+
#else
182+
close(socket)
183+
#endif
184+
}
185+
}
186+
}
187+
188+
/// Creates and stores socket reference. Reentrancy is not supported.
189+
/// Trying to begin operation for same socket twice would mean something
190+
/// went horribly wrong, or our assumptions about CURL register/unregister
191+
/// action flow are nor correct.
192+
func beginOperation(for socket: CFURLSession_socket_t) -> _SocketReference {
193+
let reference = _SocketReference(socket: socket)
194+
precondition(socketReferences.updateValue(reference, forKey: socket) == nil, "Reentrancy is not supported for socket operations")
195+
return reference
196+
}
197+
198+
/// Removes socket reference from the shared store. If there is work item scheduled,
199+
/// executes it on the current thread.
200+
func endOperation(for socketReference: _SocketReference) {
201+
precondition(socketReferences.removeValue(forKey: socketReference.socket) != nil, "No operation associated with the socket")
202+
if let workItem = socketReference.workItem, !workItem.isCancelled {
203+
// CURL never asks for socket close without unregistering first, and
204+
// we should cancel pending work when unregister action is requested.
205+
precondition(!socketReference.shouldClose, "Socket close was scheduled, but there is some pending work left")
206+
workItem.perform()
207+
}
208+
}
209+
210+
/// Marks this reference to close socket on deinit. This allows us
211+
/// to extend socket lifecycle by keeping the reference alive.
212+
func scheduleClose(for socket: CFURLSession_socket_t) {
213+
let reference = socketReferences[socket] ?? _SocketReference(socket: socket)
214+
reference.shouldClose = true
215+
}
216+
217+
/// Schedules work to be performed when an operation ends for the socket,
218+
/// or performs it immediately if there is no operation in progress.
219+
///
220+
/// We're using this to postpone Dispatch Source creation when
221+
/// previous Dispatch Source is not cancelled yet.
222+
func schedule(_ workItem: DispatchWorkItem, for socket: CFURLSession_socket_t) {
223+
guard let socketReference = socketReferences[socket] else {
224+
workItem.perform()
225+
return
226+
}
227+
// CURL never asks for register without pairing it with unregister later,
228+
// and we're cancelling pending work item on unregister.
229+
// But it is safe to just drop existing work item anyway,
230+
// and replace it with the new one.
231+
socketReference.workItem = workItem
232+
}
233+
234+
/// Cancels pending work for socket operation. Does nothing if
235+
/// there is no operation in progress or no pending work item.
236+
///
237+
/// CURL may become not interested in Dispatch Sources
238+
/// we have planned to create. In this case we should just cancel
239+
/// scheduled work.
240+
func cancelWorkItem(for socket: CFURLSession_socket_t) {
241+
guard let socketReference = socketReferences[socket] else {
242+
return
243+
}
244+
socketReference.workItem?.cancel()
245+
socketReference.workItem = nil
246+
}
247+
248+
}
249+
164250
internal extension URLSession._MultiHandle {
165251
/// Add an easy handle -- start its transfer.
166252
func add(_ handle: _EasyHandle) {
253+
// Set CLOSESOCKETFUNCTION. Note that while the option belongs to easy_handle,
254+
// the connection cache is managed by CURL multi_handle, and sockets can actually
255+
// outlive easy_handle (even after curl_easy_cleanup call). That's why
256+
// socket management lives in _MultiHandle.
257+
try! CFURLSession_easy_setopt_ptr(handle.rawHandle, CFURLSessionOptionCLOSESOCKETDATA, UnsafeMutableRawPointer(Unmanaged.passUnretained(self).toOpaque())).asError()
258+
try! CFURLSession_easy_setopt_scl(handle.rawHandle, CFURLSessionOptionCLOSESOCKETFUNCTION) { (clientp: UnsafeMutableRawPointer?, item: CFURLSession_socket_t) in
259+
guard let handle = URLSession._MultiHandle.from(callbackUserData: clientp) else { fatalError() }
260+
handle.scheduleClose(for: item)
261+
return 0
262+
}.asError()
263+
167264
// If this is the first handle being added, we need to `kick` the
168265
// underlying multi handle by calling `timeoutTimerFired` as
169266
// described in
@@ -448,25 +545,56 @@ fileprivate class _SocketSources {
448545
s.resume()
449546
}
450547

451-
func tearDown() {
452-
if let s = readSource {
453-
s.cancel()
548+
func tearDown(handle: URLSession._MultiHandle, socket: CFURLSession_socket_t, queue: DispatchQueue) {
549+
handle.cancelWorkItem(for: socket) // There could be pending register action which needs to be cancelled
550+
551+
guard readSource != nil || writeSource != nil else {
552+
// This means that we have posponed (and already abandoned)
553+
// sources creation.
554+
return
454555
}
455-
readSource = nil
456-
if let s = writeSource {
457-
s.cancel()
556+
557+
// Socket is guaranteed to not to be closed as long as we keeping
558+
// the reference.
559+
let socketReference = handle.beginOperation(for: socket)
560+
let cancelHandlerGroup = DispatchGroup()
561+
[readSource, writeSource].compactMap({ $0 }).forEach { source in
562+
cancelHandlerGroup.enter()
563+
source.setCancelHandler {
564+
cancelHandlerGroup.leave()
565+
}
566+
source.cancel()
567+
}
568+
cancelHandlerGroup.notify(queue: queue) {
569+
handle.endOperation(for: socketReference)
458570
}
571+
572+
readSource = nil
459573
writeSource = nil
460574
}
461575
}
462576
extension _SocketSources {
463577
/// Create a read and/or write source as specified by the action.
464-
func createSources(with action: URLSession._MultiHandle._SocketRegisterAction, socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) {
465-
if action.needsReadSource {
466-
createReadSource(socket: socket, queue: queue, handler: handler)
578+
func createSources(with action: URLSession._MultiHandle._SocketRegisterAction, handle: URLSession._MultiHandle, socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) {
579+
// CURL casually requests to unregister and register handlers for same
580+
// socket in a row. There is (pretty low) chance of overlapping tear-down operation
581+
// with "register" request. Bad things could happen if we create
582+
// a new Dispatch Source while other is being cancelled for the same socket.
583+
// We're using `_MultiHandle.schedule(_:for:)` here to postpone sources creation until
584+
// pending operation is finished (if there is none, submitted work item is performed
585+
// immediately).
586+
// Also, CURL may request unregister even before we perform any postponed work,
587+
// so we have to cancel such work in such case. See
588+
let createSources = DispatchWorkItem {
589+
if action.needsReadSource {
590+
self.createReadSource(socket: socket, queue: queue, handler: handler)
591+
}
592+
if action.needsWriteSource {
593+
self.createWriteSource(socket: socket, queue: queue, handler: handler)
594+
}
467595
}
468-
if action.needsWriteSource {
469-
createWriteSource(socket: socket, queue: queue, handler: handler)
596+
if (action.needsReadSource || action.needsWriteSource) {
597+
handle.schedule(createSources, for: socket)
470598
}
471599
}
472600
}

Tests/Foundation/HTTPServer.swift

+43-11
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class _TCPSocket: CustomStringConvertible {
9999
listening = false
100100
}
101101

102-
init(port: UInt16?) throws {
102+
init(port: UInt16?, backlog: Int32) throws {
103103
listening = true
104104
self.port = 0
105105

@@ -124,7 +124,7 @@ class _TCPSocket: CustomStringConvertible {
124124
try socketAddress.withMemoryRebound(to: sockaddr.self, capacity: MemoryLayout<sockaddr>.size, {
125125
let addr = UnsafePointer<sockaddr>($0)
126126
_ = try attempt("bind", valid: isZero, bind(_socket, addr, socklen_t(MemoryLayout<sockaddr>.size)))
127-
_ = try attempt("listen", valid: isZero, listen(_socket, SOMAXCONN))
127+
_ = try attempt("listen", valid: isZero, listen(_socket, backlog))
128128
})
129129

130130
var actualSA = sockaddr_in()
@@ -295,8 +295,8 @@ class _HTTPServer: CustomStringConvertible {
295295
let tcpSocket: _TCPSocket
296296
var port: UInt16 { tcpSocket.port }
297297

298-
init(port: UInt16?) throws {
299-
tcpSocket = try _TCPSocket(port: port)
298+
init(port: UInt16?, backlog: Int32 = SOMAXCONN) throws {
299+
tcpSocket = try _TCPSocket(port: port, backlog: backlog)
300300
}
301301

302302
init(socket: _TCPSocket) {
@@ -1094,15 +1094,32 @@ enum InternalServerError : Error {
10941094
case badHeaders
10951095
}
10961096

1097+
extension LoopbackServerTest {
1098+
struct Options {
1099+
var serverBacklog: Int32
1100+
var isAsynchronous: Bool
1101+
1102+
static let `default` = Options(serverBacklog: SOMAXCONN, isAsynchronous: true)
1103+
}
1104+
}
10971105

10981106
class LoopbackServerTest : XCTestCase {
10991107
private static let staticSyncQ = DispatchQueue(label: "org.swift.TestFoundation.HTTPServer.StaticSyncQ")
11001108

11011109
private static var _serverPort: Int = -1
11021110
private static var _serverActive = false
11031111
private static var testServer: _HTTPServer? = nil
1104-
1105-
1112+
private static var _options: Options = .default
1113+
1114+
static var options: Options {
1115+
get {
1116+
return staticSyncQ.sync { _options }
1117+
}
1118+
set {
1119+
staticSyncQ.sync { _options = newValue }
1120+
}
1121+
}
1122+
11061123
static var serverPort: Int {
11071124
get {
11081125
return staticSyncQ.sync { _serverPort }
@@ -1119,27 +1136,42 @@ class LoopbackServerTest : XCTestCase {
11191136

11201137
override class func setUp() {
11211138
super.setUp()
1139+
Self.startServer()
1140+
}
11221141

1142+
override class func tearDown() {
1143+
Self.stopServer()
1144+
super.tearDown()
1145+
}
1146+
1147+
static func startServer() {
11231148
var _serverPort = 0
11241149
let dispatchGroup = DispatchGroup()
11251150

11261151
func runServer() throws {
1127-
testServer = try _HTTPServer(port: nil)
1152+
testServer = try _HTTPServer(port: nil, backlog: options.serverBacklog)
11281153
_serverPort = Int(testServer!.port)
11291154
serverActive = true
11301155
dispatchGroup.leave()
11311156

11321157
while serverActive {
11331158
do {
11341159
let httpServer = try testServer!.listen()
1135-
globalDispatchQueue.async {
1160+
1161+
func handleRequest() {
11361162
let subServer = TestURLSessionServer(httpServer: httpServer)
11371163
do {
11381164
try subServer.readAndRespond()
11391165
} catch {
11401166
NSLog("readAndRespond: \(error)")
11411167
}
11421168
}
1169+
1170+
if options.isAsynchronous {
1171+
globalDispatchQueue.async(execute: handleRequest)
1172+
} else {
1173+
handleRequest()
1174+
}
11431175
} catch {
11441176
if (serverActive) { // Ignore errors thrown on shutdown
11451177
NSLog("httpServer: \(error)")
@@ -1165,11 +1197,11 @@ class LoopbackServerTest : XCTestCase {
11651197
fatalError("Timedout waiting for server to be ready")
11661198
}
11671199
serverPort = _serverPort
1200+
debugLog("Listening on \(serverPort)")
11681201
}
1169-
1170-
override class func tearDown() {
1202+
1203+
static func stopServer() {
11711204
serverActive = false
11721205
try? testServer?.stop()
1173-
super.tearDown()
11741206
}
11751207
}

0 commit comments

Comments
 (0)