@@ -38,7 +38,11 @@ use socket2::{Domain, Protocol as SockProtocol, SockAddr, Socket, Type};
38
38
use std:: collections:: HashMap ;
39
39
use std:: convert:: Infallible ;
40
40
use std:: fmt:: { Debug , Display , Formatter , Write } ;
41
- use std:: io:: { self , BufRead , BufReader , ErrorKind , Write as IoWrite } ;
41
+ use std:: io:: { self , BufReader , ErrorKind , Write as IoWrite } ;
42
+
43
+ #[ cfg( feature = "sync-sender-tcp" ) ]
44
+ use std:: io:: BufRead ;
45
+
42
46
use std:: num:: NonZeroUsize ;
43
47
use std:: ops:: Deref ;
44
48
use std:: path:: PathBuf ;
@@ -350,12 +354,20 @@ fn write_escaped_quoted(output: &mut Vec<u8>, s: &str) {
350
354
write_escaped_impl ( must_escape_quoted, |output| output. push ( b'"' ) , output, s)
351
355
}
352
356
353
- enum Connection {
357
+ #[ cfg( feature = "sync-sender-tcp" ) ]
358
+ enum SyncConnection {
354
359
Direct ( Socket ) ,
355
360
Tls ( Box < StreamOwned < ClientConnection , Socket > > ) ,
356
361
}
357
362
358
- impl Connection {
363
+ #[ cfg( feature = "async-sender-tcp" ) ]
364
+ enum AsyncConnection {
365
+ Direct ( tokio:: net:: TcpStream ) ,
366
+ Tls ( Box < tokio_rustls:: client:: TlsStream < tokio:: net:: TcpStream > > ) ,
367
+ }
368
+
369
+ #[ cfg( feature = "sync-sender-tcp" ) ]
370
+ impl SyncConnection {
359
371
fn send_key_id ( & mut self , key_id : & str ) -> Result < ( ) > {
360
372
writeln ! ( self , "{}" , key_id)
361
373
. map_err ( |io_err| map_io_to_socket_err ( "Failed to send key_id: " , io_err) ) ?;
@@ -419,16 +431,22 @@ impl Connection {
419
431
}
420
432
}
421
433
422
- #[ cfg( any( feature = "sync-sender-tcp" , feature = "sync-sender-http" ) ) ]
423
434
enum ProtocolHandler {
424
435
#[ cfg( feature = "sync-sender-tcp" ) ]
425
- Socket ( Connection ) ,
436
+ SyncTcp ( SyncConnection ) ,
426
437
427
438
#[ cfg( feature = "sync-sender-http" ) ]
428
- Http ( HttpHandlerState ) ,
439
+ SyncHttp ( SyncHttpHandlerState ) ,
440
+
441
+ #[ cfg( feature = "async-sender-tcp" ) ]
442
+ AsyncTcp ( AsyncConnection ) ,
443
+
444
+ #[ cfg( feature = "async-sender-http" ) ]
445
+ AsyncHttp ( )
429
446
}
430
447
431
- impl io:: Read for Connection {
448
+ #[ cfg( feature = "sync-sender-tcp" ) ]
449
+ impl io:: Read for SyncConnection {
432
450
fn read ( & mut self , buf : & mut [ u8 ] ) -> io:: Result < usize > {
433
451
match self {
434
452
Self :: Direct ( sock) => sock. read ( buf) ,
@@ -437,7 +455,8 @@ impl io::Read for Connection {
437
455
}
438
456
}
439
457
440
- impl io:: Write for Connection {
458
+ #[ cfg( feature = "sync-sender-tcp" ) ]
459
+ impl IoWrite for SyncConnection {
441
460
fn write ( & mut self , buf : & [ u8 ] ) -> io:: Result < usize > {
442
461
match self {
443
462
Self :: Direct ( sock) => sock. write ( buf) ,
@@ -1379,6 +1398,7 @@ impl Buffer {
1379
1398
}
1380
1399
}
1381
1400
1401
+ #[ cfg( feature = "_sync-sender" ) ]
1382
1402
/// Connects to a QuestDB instance and inserts data via the ILP protocol.
1383
1403
///
1384
1404
/// * To construct an instance, use [`Sender::from_conf`] or the [`SenderBuilder`].
@@ -1393,7 +1413,8 @@ pub struct Sender {
1393
1413
max_name_len : usize ,
1394
1414
}
1395
1415
1396
- impl std:: fmt:: Debug for Sender {
1416
+ #[ cfg( feature = "_sync-sender" ) ]
1417
+ impl Debug for Sender {
1397
1418
fn fmt ( & self , f : & mut Formatter < ' _ > ) -> std:: result:: Result < ( ) , std:: fmt:: Error > {
1398
1419
f. write_str ( self . descr . as_str ( ) )
1399
1420
}
@@ -1867,7 +1888,7 @@ pub struct SenderBuilder {
1867
1888
1868
1889
tls_ca : ConfigSetting < CertificateAuthority > ,
1869
1890
tls_roots : ConfigSetting < Option < PathBuf > > ,
1870
-
1891
+
1871
1892
#[ cfg( feature = "_sender-http" ) ]
1872
1893
http : Option < HttpConfig > ,
1873
1894
}
@@ -2359,6 +2380,7 @@ impl SenderBuilder {
2359
2380
Ok ( self )
2360
2381
}
2361
2382
2383
+ #[ cfg( feature = "sync-sender-tcp" ) ]
2362
2384
fn connect_tcp ( & self , auth : & Option < AuthParams > ) -> Result < ProtocolHandler > {
2363
2385
let addr: SockAddr = gai:: resolve_host_port ( self . host . as_str ( ) , self . port . as_str ( ) ) ?;
2364
2386
let mut sock = Socket :: new ( Domain :: IPV4 , Type :: STREAM , Some ( SockProtocol :: TCP ) )
@@ -2439,16 +2461,16 @@ impl SenderBuilder {
2439
2461
}
2440
2462
} ) ?;
2441
2463
}
2442
- Connection :: Tls ( StreamOwned :: new ( tls_conn, sock) . into ( ) )
2464
+ SyncConnection :: Tls ( StreamOwned :: new ( tls_conn, sock) . into ( ) )
2443
2465
}
2444
- None => Connection :: Direct ( sock) ,
2466
+ None => SyncConnection :: Direct ( sock) ,
2445
2467
} ;
2446
2468
2447
2469
if let Some ( AuthParams :: Ecdsa ( auth) ) = auth {
2448
2470
conn. authenticate ( auth) ?;
2449
2471
}
2450
2472
2451
- Ok ( ProtocolHandler :: Socket ( conn) )
2473
+ Ok ( ProtocolHandler :: SyncTcp ( conn) )
2452
2474
}
2453
2475
2454
2476
fn build_auth ( & self ) -> Result < Option < AuthParams > > {
@@ -2545,6 +2567,7 @@ impl SenderBuilder {
2545
2567
}
2546
2568
}
2547
2569
2570
+ #[ cfg( feature = "_sync-sender" ) ]
2548
2571
/// Build the sender.
2549
2572
///
2550
2573
/// In the case of TCP, this synchronously establishes the TCP connection, and
@@ -2563,6 +2586,7 @@ impl SenderBuilder {
2563
2586
let auth = self . build_auth ( ) ?;
2564
2587
2565
2588
let handler = match self . protocol {
2589
+ #[ cfg( feature = "sync-sender-tcp" ) ]
2566
2590
Protocol :: Tcp | Protocol :: Tcps => self . connect_tcp ( & auth) ?,
2567
2591
#[ cfg( feature = "sync-sender-http" ) ]
2568
2592
Protocol :: Http | Protocol :: Https => {
@@ -2624,7 +2648,7 @@ impl SenderBuilder {
2624
2648
self . host. deref( ) ,
2625
2649
self . port. deref( )
2626
2650
) ;
2627
- ProtocolHandler :: Http ( HttpHandlerState {
2651
+ ProtocolHandler :: SyncHttp ( SyncHttpHandlerState {
2628
2652
agent,
2629
2653
url,
2630
2654
auth,
@@ -2642,7 +2666,7 @@ impl SenderBuilder {
2642
2666
Protocol :: Tcp | Protocol :: Tcps => ProtocolVersion :: V1 ,
2643
2667
#[ cfg( feature = "sync-sender-http" ) ]
2644
2668
Protocol :: Http | Protocol :: Https => {
2645
- if let ProtocolHandler :: Http ( http_state) = & handler {
2669
+ if let ProtocolHandler :: SyncHttp ( http_state) = & handler {
2646
2670
let settings_url = & format ! (
2647
2671
"{}://{}:{}/settings" ,
2648
2672
self . protocol. schema( ) ,
@@ -2837,6 +2861,7 @@ impl F64Serializer {
2837
2861
}
2838
2862
}
2839
2863
2864
+ #[ cfg( feature = "_sync-sender" ) ]
2840
2865
impl Sender {
2841
2866
/// Create a new `Sender` instance from the given configuration string.
2842
2867
///
@@ -2907,7 +2932,7 @@ impl Sender {
2907
2932
return Ok ( ( ) ) ;
2908
2933
}
2909
2934
match self . handler {
2910
- ProtocolHandler :: Socket ( ref mut conn) => {
2935
+ ProtocolHandler :: SyncTcp ( ref mut conn) => {
2911
2936
if transactional {
2912
2937
return Err ( error:: fmt!(
2913
2938
InvalidApiCall ,
@@ -2920,7 +2945,7 @@ impl Sender {
2920
2945
} ) ?;
2921
2946
}
2922
2947
#[ cfg( feature = "sync-sender-http" ) ]
2923
- ProtocolHandler :: Http ( ref state) => {
2948
+ ProtocolHandler :: SyncHttp ( ref state) => {
2924
2949
if transactional && !buf. transactional ( ) {
2925
2950
return Err ( error:: fmt!(
2926
2951
InvalidApiCall ,
0 commit comments