Skip to content

Commit 8c76481

Browse files
committed
Replace DispatchSource in URLSession on Windows with custom event listener (swiftlang#4791)
1 parent 912a617 commit 8c76481

File tree

1 file changed

+175
-1
lines changed

1 file changed

+175
-1
lines changed

Sources/FoundationNetworking/URLSession/libcurl/MultiHandle.swift

+175-1
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ fileprivate extension URLSession._MultiHandle {
127127
if let opaque = socketSourcePtr {
128128
Unmanaged<_SocketSources>.fromOpaque(opaque).release()
129129
}
130+
socketSources?.tearDown()
130131
socketSources = nil
131132
}
132133
if let ss = socketSources {
@@ -412,7 +413,7 @@ fileprivate extension URLSession._MultiHandle._Timeout {
412413
}
413414
}
414415

415-
416+
#if !os(Windows)
416417
/// Read and write libdispatch sources for a specific socket.
417418
///
418419
/// A simple helper that combines two sources -- both being optional.
@@ -470,6 +471,179 @@ extension _SocketSources {
470471
}
471472
}
472473
}
474+
475+
#else
476+
477+
private let threadpoolWaitCallback: PTP_WAIT_CALLBACK = { (inst, context, pwa, res) in
478+
guard let sources = _SocketSources.from(socketSourcePtr: context) else {
479+
fatalError("Context is not set in socket callback")
480+
}
481+
482+
sources.socketCallback()
483+
}
484+
485+
private class _SocketSources {
486+
struct SocketEvents: OptionSet {
487+
let rawValue: CLong
488+
489+
static let read = SocketEvents(rawValue: FD_READ)
490+
static let write = SocketEvents(rawValue: FD_WRITE)
491+
}
492+
493+
private var socket: SOCKET = INVALID_SOCKET
494+
private var queue: DispatchQueue?
495+
private var handler: DispatchWorkItem?
496+
497+
// Only the handlerCallout and callback properties are
498+
// accessed concurrently (from queue thread and ThreadpoolWait thread).
499+
// While callback property should not be raced due to specific
500+
// disarm logic, it is still guarded with lock for safety.
501+
private var handlerCallout: DispatchWorkItem?
502+
private var callback: (event: HANDLE, threadpoolWait: PTP_WAIT)?
503+
private let lock = NSLock()
504+
505+
private var networkEvents: CLong = 0
506+
private var events: SocketEvents = [] {
507+
didSet {
508+
guard oldValue != events else {
509+
return
510+
}
511+
triggerIO()
512+
}
513+
}
514+
515+
func triggerIO() {
516+
// Decide which network events we're interested in,
517+
// initialize callback lazily.
518+
let (networkEvents, event) = { () -> (CLong, HANDLE?) in
519+
guard !events.isEmpty else {
520+
return (0, nil)
521+
}
522+
let event = {
523+
if let callback = callback {
524+
return callback.event
525+
}
526+
guard let event = CreateEventW(nil, /* bManualReset */ false, /* bInitialState */ false, nil) else {
527+
fatalError("CreateEventW \(GetLastError())")
528+
}
529+
guard let threadpoolWait = CreateThreadpoolWait(threadpoolWaitCallback, Unmanaged.passUnretained(self).toOpaque(), /* PTP_CALLBACK_ENVIRON */ nil) else {
530+
fatalError("CreateThreadpoolWait \(GetLastError())")
531+
}
532+
SetThreadpoolWait(threadpoolWait, event, /* pftTimeout */ nil)
533+
callback = (event, threadpoolWait)
534+
return event
535+
}()
536+
return (FD_CLOSE | events.rawValue, event)
537+
}()
538+
539+
if self.networkEvents != networkEvents {
540+
guard WSAEventSelect(socket, event, networkEvents) == 0 else {
541+
fatalError("WSAEventSelect \(WSAGetLastError())")
542+
}
543+
self.networkEvents = networkEvents
544+
}
545+
546+
if events.contains(.write) {
547+
// FD_WRITE will only be signaled if the socket becomes writable after
548+
// a send() fails with WSAEWOULDBLOCK. If shis zero-byte send() doesn't fail,
549+
// we could immediately schedule the handler callout.
550+
if send(socket, "", 0, 0) == 0 {
551+
queue!.async(execute: handler!)
552+
}
553+
} else if events.isEmpty, let callback = callback {
554+
SetThreadpoolWait(callback.threadpoolWait, nil, nil)
555+
WaitForThreadpoolWaitCallbacks(callback.threadpoolWait, /* fCancelPendingCallbacks */ true)
556+
CloseThreadpoolWait(callback.threadpoolWait)
557+
CloseHandle(callback.event)
558+
559+
lock.lock()
560+
self.callback = nil
561+
handlerCallout?.cancel()
562+
handlerCallout = nil
563+
lock.unlock()
564+
565+
handler = nil
566+
}
567+
}
568+
569+
func createSources(with action: URLSession._MultiHandle._SocketRegisterAction, socket: CFURLSession_socket_t, queue: DispatchQueue, handler: DispatchWorkItem) {
570+
precondition(self.socket == INVALID_SOCKET || self.socket == socket, "Socket value changed")
571+
precondition(self.queue == nil || self.queue === queue, "Queue changed")
572+
573+
self.socket = socket
574+
self.queue = queue
575+
self.handler = handler
576+
577+
events = action.socketEvents
578+
}
579+
580+
func tearDown() {
581+
events = []
582+
}
583+
584+
func socketCallback() {
585+
// Note: this called on ThreadpoolWait thread.
586+
lock.lock()
587+
if let callback = callback {
588+
ResetEvent(callback.event)
589+
SetThreadpoolWait(callback.threadpoolWait, callback.event, /* pftTimeout */ nil)
590+
}
591+
lock.unlock()
592+
593+
performHandler()
594+
}
595+
596+
private func performHandler() {
597+
guard let queue = queue else {
598+
fatalError("Attempting callout without queue set")
599+
}
600+
601+
let handlerCallout = DispatchWorkItem {
602+
self.lock.lock()
603+
self.handlerCallout = nil
604+
self.lock.unlock()
605+
606+
if let handler = self.handler, !handler.isCancelled {
607+
handler.perform()
608+
}
609+
610+
// Check if new callout was scheduled while we were performing the handler.
611+
self.lock.lock()
612+
let hasCallout = self.handlerCallout != nil
613+
self.lock.unlock()
614+
guard !hasCallout, !self.events.isEmpty else {
615+
return
616+
}
617+
618+
self.triggerIO()
619+
}
620+
621+
// Simple callout merge implementation.
622+
// Just do not schedule additional work if there is pending item.
623+
lock.lock()
624+
if self.handlerCallout == nil {
625+
self.handlerCallout = handlerCallout
626+
queue.async(execute: handlerCallout)
627+
}
628+
lock.unlock()
629+
}
630+
631+
}
632+
633+
private extension URLSession._MultiHandle._SocketRegisterAction {
634+
var socketEvents: _SocketSources.SocketEvents {
635+
switch self {
636+
case .none: return []
637+
case .registerRead: return [.read]
638+
case .registerWrite: return [.write]
639+
case .registerReadAndWrite: return [.read, .write]
640+
case .unregister: return []
641+
}
642+
}
643+
}
644+
645+
#endif
646+
473647
extension _SocketSources {
474648
/// Unwraps the `SocketSources`
475649
///

0 commit comments

Comments
 (0)