Skip to content

Commit 1b0dd65

Browse files
committed
Fix stuck thread on server shutdown
The connect namedpipe thread would be in a suspended state when shutdown on the server is called. Setting the event to a signalled state to wake the thread up so everything can shut down properly. Signed-off-by: James Sturtevant <[email protected]>
1 parent e957414 commit 1b0dd65

File tree

3 files changed

+58
-29
lines changed

3 files changed

+58
-29
lines changed

src/sync/client.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ impl Client {
4545
pub fn connect(sockaddr: &str) -> Result<Client> {
4646
let conn = ClientConnection::client_connect(sockaddr)?;
4747

48-
Ok(Self::new_client(conn))
48+
Self::new_client(conn)
4949
}
5050

5151
#[cfg(unix)]
@@ -56,15 +56,15 @@ impl Client {
5656
Self::new_client(conn)
5757
}
5858

59-
fn new_client(pipe_client: ClientConnection) -> Client {
59+
fn new_client(pipe_client: ClientConnection) -> Result<Client> {
6060
let client = Arc::new(pipe_client);
6161

6262
let (sender_tx, rx): (Sender, Receiver) = mpsc::channel();
6363
let recver_map_orig = Arc::new(Mutex::new(HashMap::new()));
6464

6565

6666
let receiver_map = recver_map_orig.clone();
67-
let connection = Arc::new(client.get_pipe_connection());
67+
let connection = Arc::new(client.get_pipe_connection()?);
6868
let sender_client = connection.clone();
6969

7070
//Sender
@@ -171,10 +171,10 @@ impl Client {
171171
trace!("Receiver quit");
172172
});
173173

174-
Client {
174+
Ok(Client {
175175
_connection: client,
176176
sender_tx,
177-
}
177+
})
178178
}
179179
pub fn request(&self, req: Request) -> Result<Response> {
180180
let buf = req.encode().map_err(err_to_others_err!(e, ""))?;
@@ -220,7 +220,11 @@ impl Drop for ClientConnection {
220220
#[cfg(windows)]
221221
impl Drop for PipeConnection {
222222
fn drop(&mut self) {
223-
self.close().unwrap();
223+
self.close().unwrap_or_else(|e| {
224+
trace!(
225+
"connection may already be closed: {}", e
226+
)
227+
});
224228
trace!("pipe connection is dropped");
225229
}
226230
}

src/sync/sys/unix/net.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,8 +305,8 @@ impl ClientConnection {
305305
Ok(Some(()))
306306
}
307307

308-
pub fn get_pipe_connection(&self) -> PipeConnection {
309-
PipeConnection::new(self.fd)
308+
pub fn get_pipe_connection(&self) -> Result<PipeConnection> {
309+
Ok(PipeConnection::new(self.fd))
310310
}
311311

312312
pub fn close_receiver(&self) -> Result<()> {

src/sync/sys/windows/net.rs

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,15 @@ use windows_sys::Win32::Foundation::{ CloseHandle, ERROR_IO_PENDING, ERROR_PIPE_
3030
use windows_sys::Win32::Storage::FileSystem::{ ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX };
3131
use windows_sys::Win32::System::IO::{ GetOverlappedResult, OVERLAPPED };
3232
use windows_sys::Win32::System::Pipes::{ CreateNamedPipeW, ConnectNamedPipe,DisconnectNamedPipe, PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, PIPE_REJECT_REMOTE_CLIENTS };
33-
use windows_sys::Win32::System::Threading::CreateEventW;
33+
use windows_sys::Win32::System::Threading::{CreateEventW, SetEvent};
3434

3535
const PIPE_BUFFER_SIZE: u32 = 65536;
3636
const WAIT_FOR_EVENT: i32 = 1;
3737

3838
pub struct PipeListener {
3939
first_instance: AtomicBool,
4040
address: String,
41+
connection_event: isize,
4142
}
4243

4344
#[repr(C)]
@@ -54,22 +55,18 @@ impl Overlapped {
5455
ol
5556
}
5657

57-
fn new() -> Overlapped {
58-
Overlapped {
59-
inner: UnsafeCell::new(unsafe { std::mem::zeroed() }),
60-
}
61-
}
62-
6358
fn as_mut_ptr(&self) -> *mut OVERLAPPED {
6459
self.inner.get()
6560
}
6661
}
6762

6863
impl PipeListener {
6964
pub(crate) fn new(sockaddr: &str) -> Result<PipeListener> {
65+
let connection_event = create_event()?;
7066
Ok(PipeListener {
7167
first_instance: AtomicBool::new(true),
7268
address: sockaddr.to_string(),
69+
connection_event
7370
})
7471
}
7572

@@ -85,30 +82,40 @@ impl PipeListener {
8582
}
8683

8784
// Create a new pipe instance for every new client
88-
let np = self.new_instance().unwrap();
89-
let ol = Overlapped::new();
85+
let instance = self.new_instance()?;
86+
let np = match PipeConnection::new(instance) {
87+
Ok(np) => np,
88+
Err(e) => {
89+
return Err(io::Error::new(
90+
io::ErrorKind::Other,
91+
format!("failed to create new pipe instance: {:?}", e),
92+
));
93+
}
94+
};
95+
96+
let ol = Overlapped::new_with_event(self.connection_event);
9097

9198
trace!("listening for connection");
92-
let result = unsafe { ConnectNamedPipe(np, ol.as_mut_ptr())};
99+
let result = unsafe { ConnectNamedPipe(np.named_pipe, ol.as_mut_ptr())};
93100
if result != 0 {
94101
return Err(io::Error::last_os_error());
95102
}
96103

97104
match io::Error::last_os_error() {
98105
e if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => {
99106
let mut bytes_transfered = 0;
100-
let res = unsafe {GetOverlappedResult(np, ol.as_mut_ptr(), &mut bytes_transfered, WAIT_FOR_EVENT) };
107+
let res = unsafe {GetOverlappedResult(np.named_pipe, ol.as_mut_ptr(), &mut bytes_transfered, WAIT_FOR_EVENT) };
101108
match res {
102109
0 => {
103110
return Err(io::Error::last_os_error());
104111
}
105112
_ => {
106-
Ok(Some(PipeConnection::new(np)))
113+
Ok(Some(np))
107114
}
108115
}
109116
}
110117
e if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => {
111-
Ok(Some(PipeConnection::new(np)))
118+
Ok(Some(np))
112119
}
113120
e => {
114121
return Err(io::Error::new(
@@ -145,7 +152,9 @@ impl PipeListener {
145152
}
146153

147154
pub fn close(&self) -> Result<()> {
148-
Ok(())
155+
// release the ConnectNamedPipe thread by signaling the event and clean up event handle
156+
set_event(self.connection_event)?;
157+
close_handle(self.connection_event)
149158
}
150159
}
151160

@@ -170,15 +179,15 @@ pub struct PipeConnection {
170179
// "It is safer to use an event object because of the confusion that can occur when multiple simultaneous overlapped operations are performed on the same file, named pipe, or communications device."
171180
// "In this situation, there is no way to know which operation caused the object's state to be signaled."
172181
impl PipeConnection {
173-
pub(crate) fn new(h: isize) -> PipeConnection {
182+
pub(crate) fn new(h: isize) -> Result<PipeConnection> {
174183
trace!("creating events for thread {:?} on pipe instance {}", std::thread::current().id(), h as i32);
175-
let read_event = unsafe { CreateEventW(std::ptr::null_mut(), 0, 1, std::ptr::null_mut()) };
176-
let write_event = unsafe { CreateEventW(std::ptr::null_mut(), 0, 1, std::ptr::null_mut()) };
177-
PipeConnection {
184+
let read_event = create_event()?;
185+
let write_event = create_event()?;
186+
Ok(PipeConnection {
178187
named_pipe: h,
179188
read_event: read_event,
180189
write_event: write_event,
181-
}
190+
})
182191
}
183192

184193
pub(crate) fn id(&self) -> i32 {
@@ -275,6 +284,22 @@ fn close_handle(handle: isize) -> Result<()> {
275284
}
276285
}
277286

287+
fn create_event() -> Result<isize> {
288+
let result = unsafe { CreateEventW(std::ptr::null_mut(), 0, 1, std::ptr::null_mut()) };
289+
match result {
290+
0 => Err(Error::Windows(io::Error::last_os_error().raw_os_error().unwrap())),
291+
_ => Ok(result),
292+
}
293+
}
294+
295+
fn set_event(event: isize) -> Result<()> {
296+
let result = unsafe { SetEvent(event) };
297+
match result {
298+
0 => Err(Error::Windows(io::Error::last_os_error().raw_os_error().unwrap())),
299+
_ => Ok(()),
300+
}
301+
}
302+
278303
impl ClientConnection {
279304
pub fn client_connect(sockaddr: &str) -> Result<ClientConnection> {
280305
Ok(ClientConnection::new(sockaddr))
@@ -291,14 +316,14 @@ impl ClientConnection {
291316
Ok(Some(()))
292317
}
293318

294-
pub fn get_pipe_connection(&self) -> PipeConnection {
319+
pub fn get_pipe_connection(&self) -> Result<PipeConnection> {
295320
let mut opts = OpenOptions::new();
296321
opts.read(true)
297322
.write(true)
298323
.custom_flags(FILE_FLAG_OVERLAPPED);
299324
let file = opts.open(self.address.as_str());
300325

301-
PipeConnection::new(file.unwrap().into_raw_handle() as isize)
326+
return PipeConnection::new(file.unwrap().into_raw_handle() as isize)
302327
}
303328

304329
pub fn close_receiver(&self) -> Result<()> {

0 commit comments

Comments
 (0)