@@ -31,15 +31,10 @@ use winapi;
31
31
use winapi:: { HANDLE , INVALID_HANDLE_VALUE , LPVOID } ;
32
32
use kernel32;
33
33
34
+ // some debug bump macros to better track what's going on in case of errors
34
35
lazy_static ! {
35
- static ref DD_ENABLED : bool = match env:: var_os( "DD" ) {
36
- Some ( _) => true ,
37
- None => false ,
38
- } ;
39
- static ref DD2_ENABLED : bool = match env:: var_os( "DD2" ) {
40
- Some ( _) => true ,
41
- None => false ,
42
- } ;
36
+ static ref DD_ENABLED : bool = env:: var_os( "IPC_CHANNEL_WIN_DEBUG_DUMP" ) . is_some( ) ;
37
+ static ref DD2_ENABLED : bool = env:: var_os( "IPC_CHANNEL_WIN_DEBUG_MORE_DUMP" ) . is_some( ) ;
43
38
}
44
39
45
40
macro_rules! dd { ( $( $rest: tt) * ) => { if * DD_ENABLED { println!( $( $rest) * ) ; } } }
@@ -108,8 +103,8 @@ impl OsIpcOutOfBandMessage {
108
103
}
109
104
110
105
fn needs_to_be_sent ( & self ) -> bool {
111
- self . channel_handles . len ( ) > 0 ||
112
- self . shmem_handles . len ( ) > 0 ||
106
+ ! self . channel_handles . is_empty ( ) ||
107
+ ! self . shmem_handles . is_empty ( ) ||
113
108
self . big_data_receiver_handle != 0
114
109
}
115
110
@@ -374,7 +369,8 @@ impl MessageReader {
374
369
375
370
// if the remote end closed...
376
371
if err != winapi:: ERROR_SUCCESS {
377
- panic ! ( "[$ {:?}:{:?}] *** notify_completion: need to handle error! {}" , self . iocp, self . handle, err) ;
372
+ // This should never happen
373
+ panic ! ( "[$ {:?}:{:?}] *** notify_completion: unhandled error reported! {}" , self . iocp, self . handle, err) ;
378
374
}
379
375
380
376
unsafe {
@@ -395,17 +391,18 @@ impl MessageReader {
395
391
}
396
392
397
393
dd2 ! ( "[$ {:?}:{:?}] start_read ov {:?}" , self . iocp, self . handle, self . ov_ptr( ) ) ;
398
- let mut bytes_read: u32 = 0 ;
399
394
400
- // if the buffer is full, add more space
401
395
let buf_len = self . read_buf . len ( ) ;
402
396
let mut buf_cap = self . read_buf . capacity ( ) ;
397
+ let mut bytes_read: u32 = 0 ;
398
+
399
+ // if the buffer is full, add more capacity
403
400
if buf_cap == buf_len {
404
- let more =
405
- if buf_cap == 0 { READ_BUFFER_SIZE }
406
- else if buf_cap < READ_BUFFER_MAX_GROWTH { buf_cap }
407
- else { READ_BUFFER_MAX_GROWTH } ;
408
- self . read_buf . reserve ( more ) ;
401
+ self . read_buf . reserve ( match buf_cap {
402
+ 0 => READ_BUFFER_SIZE ,
403
+ 1 ... READ_BUFFER_MAX_GROWTH => buf_cap,
404
+ _ => READ_BUFFER_MAX_GROWTH
405
+ } ) ;
409
406
buf_cap = self . read_buf . capacity ( ) ;
410
407
}
411
408
@@ -463,8 +460,8 @@ impl MessageReader {
463
460
464
461
// Err(false) -> something really failed
465
462
// Err(true) -> no message
466
- // XXX This is dumb, we should return
467
- // Result<Option<(...)>,WinError>
463
+ // FIXME This is dumb, we should probably make this return Result<Option<(...)>,WinError>
464
+ // so that we can pass through the error that's lost in map_err below
468
465
fn get_message ( & mut self ) -> Result < ( Vec < u8 > , Vec < OsOpaqueIpcChannel > , Vec < OsIpcSharedMemory > ) , bool > {
469
466
let message_lengths = self . message_length ( ) ;
470
467
if message_lengths. is_none ( ) {
@@ -476,7 +473,6 @@ impl MessageReader {
476
473
477
474
// remove this message's bytes from read_buf, or just take read_buf
478
475
// if it contains exactly one message
479
- //dd!("[$ {:?}:{:?}] rb {:?}", self.iocp, self.handle, self.read_buf);
480
476
let msg_buf = if self . read_buf . len ( ) == bytes_needed {
481
477
mem:: replace ( & mut self . read_buf , Vec :: with_capacity ( READ_BUFFER_SIZE ) )
482
478
} else {
@@ -515,7 +511,6 @@ impl MessageReader {
515
511
516
512
dd ! ( "[$ {:?}:{:?}] get_message success -> {} bytes, {} channels, {} shmems" ,
517
513
self . iocp, self . handle, buf_data. len( ) , channels. len( ) , shmems. len( ) ) ;
518
- //dd!("[$ {:?}:{:?}] bd {:?}", self.iocp, self.handle, buf_data);
519
514
Ok ( ( buf_data, channels, shmems) )
520
515
}
521
516
}
@@ -621,7 +616,7 @@ impl OsIpcReceiver {
621
616
// cancel any outstanding IO request
622
617
reader. cancel_io ( ) ;
623
618
// this is only okay if we have nothing in the read buf
624
- Ok ( reader. read_buf . len ( ) == 0 )
619
+ Ok ( reader. read_buf . is_empty ( ) )
625
620
}
626
621
627
622
pub fn consume ( & self ) -> OsIpcReceiver {
@@ -719,7 +714,7 @@ impl OsIpcReceiver {
719
714
iocp,
720
715
* self . handle as winapi:: ULONG_PTR ,
721
716
0 ) ;
722
- if ret == ptr :: null_mut ( ) {
717
+ if ret. is_null ( ) {
723
718
return Err ( WinError :: last ( "CreateIoCompletionPort" ) ) ;
724
719
}
725
720
@@ -811,7 +806,6 @@ unsafe fn write_buf(handle: HANDLE, bytes: &[u8]) -> Result<(),WinError> {
811
806
return Ok ( ( ) ) ;
812
807
}
813
808
let mut nwritten: u32 = 0 ;
814
- //dd!("[c {:?}] writing: {:?}", handle, bytes);
815
809
while nwritten < ntowrite {
816
810
let mut nwrote: u32 = 0 ;
817
811
if kernel32:: WriteFile ( handle,
@@ -825,7 +819,7 @@ unsafe fn write_buf(handle: HANDLE, bytes: &[u8]) -> Result<(),WinError> {
825
819
}
826
820
nwritten += nwrote;
827
821
ntowrite -= nwrote;
828
- //dd !("[c {:?}] ... wrote {} bytes, total {}/{} err {}", handle, nwrote, nwritten, bytes.len(), GetLastError());
822
+ dd2 ! ( "[c {:?}] ... wrote {} bytes, total {}/{} err {}" , handle, nwrote, nwritten, bytes. len( ) , GetLastError ( ) ) ;
829
823
}
830
824
831
825
Ok ( ( ) )
@@ -880,7 +874,7 @@ impl OsIpcSender {
880
874
let raw_handle = kernel32:: OpenProcess ( winapi:: PROCESS_DUP_HANDLE ,
881
875
winapi:: FALSE ,
882
876
server_pid as winapi:: DWORD ) ;
883
- if raw_handle == ptr :: null_mut ( ) {
877
+ if raw_handle. is_null ( ) {
884
878
return Err ( WinError :: last ( "OpenProcess" ) ) ;
885
879
}
886
880
@@ -924,7 +918,7 @@ impl OsIpcSender {
924
918
assert ! ( data. len( ) < INVALID_HEADER_DATA_SIZE as usize ) ;
925
919
926
920
let server_process_handle =
927
- if ports. len ( ) > 0 || shared_memory_regions. len ( ) > 0 {
921
+ if ! ports. is_empty ( ) || ! shared_memory_regions. is_empty ( ) {
928
922
try!( self . get_pipe_server_process_handle ( ) )
929
923
} else {
930
924
WinHandle :: invalid ( )
@@ -1016,7 +1010,7 @@ impl OsIpcReceiverSet {
1016
1010
ptr:: null_mut ( ) ,
1017
1011
0 as winapi:: ULONG_PTR ,
1018
1012
0 ) ;
1019
- if iocp == ptr :: null_mut ( ) {
1013
+ if iocp. is_null ( ) {
1020
1014
return Err ( WinError :: last ( "CreateIoCompletionPort" ) ) ;
1021
1015
}
1022
1016
@@ -1031,9 +1025,18 @@ impl OsIpcReceiverSet {
1031
1025
// use this to identify the receiver
1032
1026
let receiver_handle = * receiver. handle ;
1033
1027
1034
- // XXX we'll need a mutex here... at least while we loop through
1035
- // receivers to find a matching handle when we get a IOCP
1036
1028
try!( receiver. add_to_iocp ( * self . iocp ) ) ;
1029
+
1030
+ // FIXME we *may* need a mutex to protect self.receivers --
1031
+ // one thread could be adding something to this Set while
1032
+ // another is calling select(); the add() could cause the
1033
+ // receivers array to reallocate while we're doing stuff with
1034
+ // it in select(). That would mean an add() would block while
1035
+ // a select() is blocking.
1036
+ //
1037
+ // A better option would be to have a mutex around a
1038
+ // self.receivers_to_add array, and have select drain those
1039
+ // and append to self.receivers whenever it's called.
1037
1040
self . receivers . push ( receiver) ;
1038
1041
1039
1042
dd ! ( "[# {:?}] ReceiverSet add {:?}" , * self . iocp, receiver_handle) ;
@@ -1042,7 +1045,7 @@ impl OsIpcReceiverSet {
1042
1045
}
1043
1046
1044
1047
pub fn select ( & mut self ) -> Result < Vec < OsIpcSelectionResult > , WinError > {
1045
- assert ! ( self . receivers. len ( ) > 0 , "selecting with no objects?" ) ;
1048
+ assert ! ( ! self . receivers. is_empty ( ) , "selecting with no objects?" ) ;
1046
1049
dd ! ( "[# {:?}] select() with {} receivers" , * self . iocp, self . receivers. len( ) ) ;
1047
1050
1048
1051
unsafe {
@@ -1068,7 +1071,7 @@ impl OsIpcReceiverSet {
1068
1071
} ) ;
1069
1072
1070
1073
// if we had prematurely closed elements, just process them first
1071
- if selection_results. len ( ) > 0 {
1074
+ if ! selection_results. is_empty ( ) {
1072
1075
return Ok ( selection_results) ;
1073
1076
}
1074
1077
@@ -1091,7 +1094,7 @@ impl OsIpcReceiverSet {
1091
1094
// function call itself failed or timed out.
1092
1095
// Otherwise, the async IO operation failed, and
1093
1096
// we want to hand io_err to notify_completion below.
1094
- if ov_ptr == ptr :: null_mut ( ) {
1097
+ if ov_ptr. is_null ( ) {
1095
1098
return Err ( WinError :: last ( "GetQueuedCompletionStatus" ) ) ;
1096
1099
}
1097
1100
@@ -1153,7 +1156,7 @@ impl OsIpcReceiverSet {
1153
1156
1154
1157
// if we didn't dequeue at least one complete message -- we need to loop through GetQueuedCS again;
1155
1158
// otherwise we're done.
1156
- if selection_results. len ( ) > 0 {
1159
+ if ! selection_results. is_empty ( ) {
1157
1160
break ;
1158
1161
}
1159
1162
}
@@ -1263,7 +1266,7 @@ impl OsIpcSharedMemory {
1263
1266
winapi:: FILE_MAP_ALL_ACCESS ,
1264
1267
0 , 0 , 0 )
1265
1268
} ;
1266
- if address == ptr :: null_mut ( ) {
1269
+ if address. is_null ( ) {
1267
1270
return Err ( WinError :: last ( "MapViewOfFile" ) ) ;
1268
1271
}
1269
1272
@@ -1435,7 +1438,6 @@ impl From<WinError> for DeserializeError {
1435
1438
1436
1439
impl From < WinError > for Error {
1437
1440
fn from ( mpsc_error : WinError ) -> Error {
1438
- //Error::new(ErrorKind::Other, format!("Win channel error ({} from {})", mpsc_error.0, mpsc_error.1))
1439
1441
Error :: new ( ErrorKind :: Other , format ! ( "Win channel error ({})" , mpsc_error. 0 ) )
1440
1442
}
1441
1443
}
0 commit comments