Skip to content

Commit df403fb

Browse files
committed
Fix stuck thread on server shutdown
The connect namedpipe thread would be in a suspended state when shutdown on the server is called. Setting the event to a signalled state to wake the thread up so everything can shut down properly. Signed-off-by: James Sturtevant <[email protected]>
1 parent 746ddd1 commit df403fb

File tree

3 files changed

+58
-29
lines changed

3 files changed

+58
-29
lines changed

src/sync/client.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ impl Client {
4545
pub fn connect(sockaddr: &str) -> Result<Client> {
4646
let conn = ClientConnection::client_connect(sockaddr)?;
4747

48-
Ok(Self::new_client(conn))
48+
Self::new_client(conn)
4949
}
5050

5151
#[cfg(unix)]
@@ -56,15 +56,15 @@ impl Client {
5656
Self::new_client(conn)
5757
}
5858

59-
fn new_client(pipe_client: ClientConnection) -> Client {
59+
fn new_client(pipe_client: ClientConnection) -> Result<Client> {
6060
let client = Arc::new(pipe_client);
6161

6262
let (sender_tx, rx): (Sender, Receiver) = mpsc::channel();
6363
let recver_map_orig = Arc::new(Mutex::new(HashMap::new()));
6464

6565

6666
let receiver_map = recver_map_orig.clone();
67-
let connection = Arc::new(client.get_pipe_connection());
67+
let connection = Arc::new(client.get_pipe_connection()?);
6868
let sender_client = connection.clone();
6969

7070
//Sender
@@ -171,10 +171,10 @@ impl Client {
171171
trace!("Receiver quit");
172172
});
173173

174-
Client {
174+
Ok(Client {
175175
_connection: client,
176176
sender_tx,
177-
}
177+
})
178178
}
179179
pub fn request(&self, req: Request) -> Result<Response> {
180180
let buf = req.encode().map_err(err_to_others_err!(e, ""))?;
@@ -220,7 +220,11 @@ impl Drop for ClientConnection {
220220
#[cfg(windows)]
221221
impl Drop for PipeConnection {
222222
fn drop(&mut self) {
223-
self.close().unwrap();
223+
self.close().unwrap_or_else(|e| {
224+
trace!(
225+
"connection may already be closed: {}", e
226+
)
227+
});
224228
trace!("pipe connection is dropped");
225229
}
226230
}

src/sync/sys/unix/net.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,8 +304,8 @@ impl ClientConnection {
304304
Ok(Some(()))
305305
}
306306

307-
pub fn get_pipe_connection(&self) -> PipeConnection {
308-
PipeConnection::new(self.fd)
307+
pub fn get_pipe_connection(&self) -> Result<PipeConnection> {
308+
Ok(PipeConnection::new(self.fd))
309309
}
310310

311311
pub fn close_receiver(&self) -> Result<()> {

src/sync/sys/windows/net.rs

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,15 @@ use windows_sys::Win32::Foundation::{ CloseHandle, ERROR_IO_PENDING, ERROR_PIPE_
3030
use windows_sys::Win32::Storage::FileSystem::{ ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX };
3131
use windows_sys::Win32::System::IO::{ GetOverlappedResult, OVERLAPPED };
3232
use windows_sys::Win32::System::Pipes::{ CreateNamedPipeW, ConnectNamedPipe,DisconnectNamedPipe, PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, PIPE_REJECT_REMOTE_CLIENTS };
33-
use windows_sys::Win32::System::Threading::CreateEventW;
33+
use windows_sys::Win32::System::Threading::{CreateEventW, SetEvent};
3434

3535
const PIPE_BUFFER_SIZE: u32 = 65536;
3636
const WAIT_FOR_EVENT: i32 = 1;
3737

3838
pub struct PipeListener {
3939
first_instance: AtomicBool,
4040
address: String,
41+
connection_event: isize,
4142
}
4243

4344
#[repr(C)]
@@ -54,22 +55,18 @@ impl Overlapped {
5455
ol
5556
}
5657

57-
fn new() -> Overlapped {
58-
Overlapped {
59-
inner: UnsafeCell::new(unsafe { std::mem::zeroed() }),
60-
}
61-
}
62-
6358
fn as_mut_ptr(&self) -> *mut OVERLAPPED {
6459
self.inner.get()
6560
}
6661
}
6762

6863
impl PipeListener {
6964
pub(crate) fn new(sockaddr: &str) -> Result<PipeListener> {
65+
let connection_event = create_event()?;
7066
Ok(PipeListener {
7167
first_instance: AtomicBool::new(true),
7268
address: sockaddr.to_string(),
69+
connection_event
7370
})
7471
}
7572

@@ -83,30 +80,40 @@ impl PipeListener {
8380
}
8481

8582
// Create a new pipe instance for every new client
86-
let np = self.new_instance().unwrap();
87-
let ol = Overlapped::new();
83+
let instance = self.new_instance()?;
84+
let np = match PipeConnection::new(instance) {
85+
Ok(np) => np,
86+
Err(e) => {
87+
return Err(io::Error::new(
88+
io::ErrorKind::Other,
89+
format!("failed to create new pipe instance: {:?}", e),
90+
));
91+
}
92+
};
93+
94+
let ol = Overlapped::new_with_event(self.connection_event);
8895

8996
trace!("listening for connection");
90-
let result = unsafe { ConnectNamedPipe(np, ol.as_mut_ptr())};
97+
let result = unsafe { ConnectNamedPipe(np.named_pipe, ol.as_mut_ptr())};
9198
if result != 0 {
9299
return Err(io::Error::last_os_error());
93100
}
94101

95102
match io::Error::last_os_error() {
96103
e if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => {
97104
let mut bytes_transfered = 0;
98-
let res = unsafe {GetOverlappedResult(np, ol.as_mut_ptr(), &mut bytes_transfered, WAIT_FOR_EVENT) };
105+
let res = unsafe {GetOverlappedResult(np.named_pipe, ol.as_mut_ptr(), &mut bytes_transfered, WAIT_FOR_EVENT) };
99106
match res {
100107
0 => {
101108
return Err(io::Error::last_os_error());
102109
}
103110
_ => {
104-
Ok(Some(PipeConnection::new(np)))
111+
Ok(Some(np))
105112
}
106113
}
107114
}
108115
e if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => {
109-
Ok(Some(PipeConnection::new(np)))
116+
Ok(Some(np))
110117
}
111118
e => {
112119
return Err(io::Error::new(
@@ -143,7 +150,9 @@ impl PipeListener {
143150
}
144151

145152
pub fn close(&self) -> Result<()> {
146-
Ok(())
153+
// release the ConnectNamedPipe thread by signaling the event and clean up event handle
154+
set_event(self.connection_event)?;
155+
close_handle(self.connection_event)
147156
}
148157
}
149158

@@ -168,15 +177,15 @@ pub struct PipeConnection {
168177
// "It is safer to use an event object because of the confusion that can occur when multiple simultaneous overlapped operations are performed on the same file, named pipe, or communications device."
169178
// "In this situation, there is no way to know which operation caused the object's state to be signaled."
170179
impl PipeConnection {
171-
pub(crate) fn new(h: isize) -> PipeConnection {
180+
pub(crate) fn new(h: isize) -> Result<PipeConnection> {
172181
trace!("creating events for thread {:?} on pipe instance {}", std::thread::current().id(), h as i32);
173-
let read_event = unsafe { CreateEventW(std::ptr::null_mut(), 0, 1, std::ptr::null_mut()) };
174-
let write_event = unsafe { CreateEventW(std::ptr::null_mut(), 0, 1, std::ptr::null_mut()) };
175-
PipeConnection {
182+
let read_event = create_event()?;
183+
let write_event = create_event()?;
184+
Ok(PipeConnection {
176185
named_pipe: h,
177186
read_event: read_event,
178187
write_event: write_event,
179-
}
188+
})
180189
}
181190

182191
pub(crate) fn id(&self) -> i32 {
@@ -273,6 +282,22 @@ fn close_handle(handle: isize) -> Result<()> {
273282
}
274283
}
275284

285+
fn create_event() -> Result<isize> {
286+
let result = unsafe { CreateEventW(std::ptr::null_mut(), 0, 1, std::ptr::null_mut()) };
287+
match result {
288+
0 => Err(Error::Windows(io::Error::last_os_error().raw_os_error().unwrap())),
289+
_ => Ok(result),
290+
}
291+
}
292+
293+
fn set_event(event: isize) -> Result<()> {
294+
let result = unsafe { SetEvent(event) };
295+
match result {
296+
0 => Err(Error::Windows(io::Error::last_os_error().raw_os_error().unwrap())),
297+
_ => Ok(()),
298+
}
299+
}
300+
276301
impl ClientConnection {
277302
pub fn client_connect(sockaddr: &str) -> Result<ClientConnection> {
278303
Ok(ClientConnection::new(sockaddr))
@@ -289,14 +314,14 @@ impl ClientConnection {
289314
Ok(Some(()))
290315
}
291316

292-
pub fn get_pipe_connection(&self) -> PipeConnection {
317+
pub fn get_pipe_connection(&self) -> Result<PipeConnection> {
293318
let mut opts = OpenOptions::new();
294319
opts.read(true)
295320
.write(true)
296321
.custom_flags(FILE_FLAG_OVERLAPPED);
297322
let file = opts.open(self.address.as_str());
298323

299-
PipeConnection::new(file.unwrap().into_raw_handle() as isize)
324+
return PipeConnection::new(file.unwrap().into_raw_handle() as isize)
300325
}
301326

302327
pub fn close_receiver(&self) -> Result<()> {

0 commit comments

Comments
 (0)