@@ -31,18 +31,13 @@ use winapi;
31
31
use winapi:: { HANDLE , INVALID_HANDLE_VALUE , LPVOID } ;
32
32
use kernel32;
33
33
34
+ // some debug bump macros to better track what's going on in case of errors
34
35
lazy_static ! {
35
36
static ref CURRENT_PROCESS_ID : winapi:: ULONG = unsafe { kernel32:: GetCurrentProcessId ( ) } ;
36
37
static ref CURRENT_PROCESS_HANDLE : intptr_t = unsafe { kernel32:: GetCurrentProcess ( ) as intptr_t } ;
37
38
38
- static ref DD_ENABLED : bool = match env:: var_os( "DD" ) {
39
- Some ( _) => true ,
40
- None => false ,
41
- } ;
42
- static ref DD2_ENABLED : bool = match env:: var_os( "DD2" ) {
43
- Some ( _) => true ,
44
- None => false ,
45
- } ;
39
+ static ref DD_ENABLED : bool = env:: var_os( "IPC_CHANNEL_WIN_DEBUG_DUMP" ) . is_some( ) ;
40
+ static ref DD2_ENABLED : bool = env:: var_os( "IPC_CHANNEL_WIN_DEBUG_MORE_DUMP" ) . is_some( ) ;
46
41
}
47
42
48
43
macro_rules! dd { ( $( $rest: tt) * ) => { if * DD_ENABLED { println!( $( $rest) * ) ; } } }
@@ -111,8 +106,8 @@ impl OsIpcOutOfBandMessage {
111
106
}
112
107
113
108
fn needs_to_be_sent ( & self ) -> bool {
114
- self . channel_handles . len ( ) > 0 ||
115
- self . shmem_handles . len ( ) > 0 ||
109
+ ! self . channel_handles . is_empty ( ) ||
110
+ ! self . shmem_handles . is_empty ( ) ||
116
111
self . big_data_receiver_handle != 0
117
112
}
118
113
@@ -378,7 +373,8 @@ impl MessageReader {
378
373
379
374
// if the remote end closed...
380
375
if err != winapi:: ERROR_SUCCESS {
381
- panic ! ( "[$ {:?}:{:?}] *** notify_completion: need to handle error! {}" , self . iocp, self . handle, err) ;
376
+ // This should never happen
377
+ panic ! ( "[$ {:?}:{:?}] *** notify_completion: unhandled error reported! {}" , self . iocp, self . handle, err) ;
382
378
}
383
379
384
380
unsafe {
@@ -399,17 +395,18 @@ impl MessageReader {
399
395
}
400
396
401
397
dd2 ! ( "[$ {:?}:{:?}] start_read ov {:?}" , self . iocp, self . handle, self . ov_ptr( ) ) ;
402
- let mut bytes_read: u32 = 0 ;
403
398
404
- // if the buffer is full, add more space
405
399
let buf_len = self . read_buf . len ( ) ;
406
400
let mut buf_cap = self . read_buf . capacity ( ) ;
401
+ let mut bytes_read: u32 = 0 ;
402
+
403
+ // if the buffer is full, add more capacity
407
404
if buf_cap == buf_len {
408
- let more =
409
- if buf_cap == 0 { READ_BUFFER_SIZE }
410
- else if buf_cap < READ_BUFFER_MAX_GROWTH { buf_cap }
411
- else { READ_BUFFER_MAX_GROWTH } ;
412
- self . read_buf . reserve ( more ) ;
405
+ self . read_buf . reserve ( match buf_cap {
406
+ 0 => READ_BUFFER_SIZE ,
407
+ 1 ... READ_BUFFER_MAX_GROWTH => buf_cap,
408
+ _ => READ_BUFFER_MAX_GROWTH
409
+ } ) ;
413
410
buf_cap = self . read_buf . capacity ( ) ;
414
411
}
415
412
@@ -467,8 +464,8 @@ impl MessageReader {
467
464
468
465
// Err(false) -> something really failed
469
466
// Err(true) -> no message
470
- // XXX This is dumb, we should return
471
- // Result<Option<(...)>,WinError>
467
+ // FIXME This is dumb, we should probably make this return Result<Option<(...)>,WinError>
468
+ // so that we can pass through the error that's lost in map_err below
472
469
fn get_message ( & mut self ) -> Result < ( Vec < u8 > , Vec < OsOpaqueIpcChannel > , Vec < OsIpcSharedMemory > ) , bool > {
473
470
let message_lengths = self . message_length ( ) ;
474
471
if message_lengths. is_none ( ) {
@@ -480,7 +477,6 @@ impl MessageReader {
480
477
481
478
// remove this message's bytes from read_buf, or just take read_buf
482
479
// if it contains exactly one message
483
- //dd!("[$ {:?}:{:?}] rb {:?}", self.iocp, self.handle, self.read_buf);
484
480
let msg_buf = if self . read_buf . len ( ) == bytes_needed {
485
481
mem:: replace ( & mut self . read_buf , Vec :: with_capacity ( READ_BUFFER_SIZE ) )
486
482
} else {
@@ -525,7 +521,6 @@ impl MessageReader {
525
521
526
522
dd ! ( "[$ {:?}:{:?}] get_message success -> {} bytes, {} channels, {} shmems" ,
527
523
self . iocp, self . handle, buf_data. len( ) , channels. len( ) , shmems. len( ) ) ;
528
- //dd!("[$ {:?}:{:?}] bd {:?}", self.iocp, self.handle, buf_data);
529
524
Ok ( ( buf_data, channels, shmems) )
530
525
}
531
526
}
@@ -635,7 +630,7 @@ impl OsIpcReceiver {
635
630
// cancel any outstanding IO request
636
631
reader. cancel_io ( ) ;
637
632
// this is only okay if we have nothing in the read buf
638
- Ok ( reader. read_buf . len ( ) == 0 )
633
+ Ok ( reader. read_buf . is_empty ( ) )
639
634
}
640
635
641
636
pub fn consume ( & self ) -> OsIpcReceiver {
@@ -733,7 +728,7 @@ impl OsIpcReceiver {
733
728
iocp,
734
729
* self . handle as winapi:: ULONG_PTR ,
735
730
0 ) ;
736
- if ret == ptr :: null_mut ( ) {
731
+ if ret. is_null ( ) {
737
732
return Err ( WinError :: last ( "CreateIoCompletionPort" ) ) ;
738
733
}
739
734
@@ -825,7 +820,6 @@ unsafe fn write_buf(handle: HANDLE, bytes: &[u8]) -> Result<(),WinError> {
825
820
return Ok ( ( ) ) ;
826
821
}
827
822
let mut nwritten: u32 = 0 ;
828
- //dd!("[c {:?}] writing: {:?}", handle, bytes);
829
823
while nwritten < ntowrite {
830
824
let mut nwrote: u32 = 0 ;
831
825
if kernel32:: WriteFile ( handle,
@@ -839,7 +833,7 @@ unsafe fn write_buf(handle: HANDLE, bytes: &[u8]) -> Result<(),WinError> {
839
833
}
840
834
nwritten += nwrote;
841
835
ntowrite -= nwrote;
842
- //dd !("[c {:?}] ... wrote {} bytes, total {}/{} err {}", handle, nwrote, nwritten, bytes.len(), GetLastError());
836
+ dd2 ! ( "[c {:?}] ... wrote {} bytes, total {}/{} err {}" , handle, nwrote, nwritten, bytes. len( ) , GetLastError ( ) ) ;
843
837
}
844
838
845
839
Ok ( ( ) )
@@ -908,7 +902,7 @@ impl OsIpcSender {
908
902
let raw_handle = kernel32:: OpenProcess ( winapi:: PROCESS_DUP_HANDLE ,
909
903
winapi:: FALSE ,
910
904
server_pid as winapi:: DWORD ) ;
911
- if raw_handle == ptr :: null_mut ( ) {
905
+ if raw_handle. is_null ( ) {
912
906
return Err ( WinError :: last ( "OpenProcess" ) ) ;
913
907
}
914
908
@@ -1055,7 +1049,7 @@ impl OsIpcReceiverSet {
1055
1049
ptr:: null_mut ( ) ,
1056
1050
0 as winapi:: ULONG_PTR ,
1057
1051
0 ) ;
1058
- if iocp == ptr :: null_mut ( ) {
1052
+ if iocp. is_null ( ) {
1059
1053
return Err ( WinError :: last ( "CreateIoCompletionPort" ) ) ;
1060
1054
}
1061
1055
@@ -1070,9 +1064,18 @@ impl OsIpcReceiverSet {
1070
1064
// use this to identify the receiver
1071
1065
let receiver_handle = * receiver. handle ;
1072
1066
1073
- // XXX we'll need a mutex here... at least while we loop through
1074
- // receivers to find a matching handle when we get a IOCP
1075
1067
try!( receiver. add_to_iocp ( * self . iocp ) ) ;
1068
+
1069
+ // FIXME we *may* need a mutex to protect self.receivers --
1070
+ // one thread could be adding something to this Set while
1071
+ // another is calling select(); the add() could cause the
1072
+ // receivers array to reallocate while we're doing stuff with
1073
+ // it in select(). That would mean an add() would block while
1074
+ // a select() is blocking.
1075
+ //
1076
+ // A better option would be to have a mutex around a
1077
+ // self.receivers_to_add array, and have select drain those
1078
+ // and append to self.receivers whenever it's called.
1076
1079
self . receivers . push ( receiver) ;
1077
1080
1078
1081
dd ! ( "[# {:?}] ReceiverSet add {:?}" , * self . iocp, receiver_handle) ;
@@ -1081,7 +1084,7 @@ impl OsIpcReceiverSet {
1081
1084
}
1082
1085
1083
1086
pub fn select ( & mut self ) -> Result < Vec < OsIpcSelectionResult > , WinError > {
1084
- assert ! ( self . receivers. len ( ) > 0 , "selecting with no objects?" ) ;
1087
+ assert ! ( ! self . receivers. is_empty ( ) , "selecting with no objects?" ) ;
1085
1088
dd ! ( "[# {:?}] select() with {} receivers" , * self . iocp, self . receivers. len( ) ) ;
1086
1089
1087
1090
unsafe {
@@ -1107,7 +1110,7 @@ impl OsIpcReceiverSet {
1107
1110
} ) ;
1108
1111
1109
1112
// if we had prematurely closed elements, just process them first
1110
- if selection_results. len ( ) > 0 {
1113
+ if ! selection_results. is_empty ( ) {
1111
1114
return Ok ( selection_results) ;
1112
1115
}
1113
1116
@@ -1130,7 +1133,7 @@ impl OsIpcReceiverSet {
1130
1133
// function call itself failed or timed out.
1131
1134
// Otherwise, the async IO operation failed, and
1132
1135
// we want to hand io_err to notify_completion below.
1133
- if ov_ptr == ptr :: null_mut ( ) {
1136
+ if ov_ptr. is_null ( ) {
1134
1137
return Err ( WinError :: last ( "GetQueuedCompletionStatus" ) ) ;
1135
1138
}
1136
1139
@@ -1192,7 +1195,7 @@ impl OsIpcReceiverSet {
1192
1195
1193
1196
// if we didn't dequeue at least one complete message -- we need to loop through GetQueuedCS again;
1194
1197
// otherwise we're done.
1195
- if selection_results. len ( ) > 0 {
1198
+ if ! selection_results. is_empty ( ) {
1196
1199
break ;
1197
1200
}
1198
1201
}
@@ -1302,7 +1305,7 @@ impl OsIpcSharedMemory {
1302
1305
winapi:: FILE_MAP_ALL_ACCESS ,
1303
1306
0 , 0 , 0 )
1304
1307
} ;
1305
- if address == ptr :: null_mut ( ) {
1308
+ if address. is_null ( ) {
1306
1309
return Err ( WinError :: last ( "MapViewOfFile" ) ) ;
1307
1310
}
1308
1311
0 commit comments