@@ -38,7 +38,11 @@ use socket2::{Domain, Protocol as SockProtocol, SockAddr, Socket, Type};
38
38
use std:: collections:: HashMap ;
39
39
use std:: convert:: Infallible ;
40
40
use std:: fmt:: { Debug , Display , Formatter , Write } ;
41
- use std:: io:: { self , BufRead , BufReader , ErrorKind , Write as IoWrite } ;
41
+ use std:: io:: { self , BufReader , ErrorKind , Write as IoWrite } ;
42
+
43
+ #[ cfg( feature = "sync-sender-tcp" ) ]
44
+ use std:: io:: BufRead ;
45
+
42
46
use std:: num:: NonZeroUsize ;
43
47
use std:: ops:: Deref ;
44
48
use std:: path:: PathBuf ;
@@ -356,12 +360,20 @@ fn write_escaped_quoted(output: &mut Vec<u8>, s: &str) {
356
360
write_escaped_impl ( must_escape_quoted, |output| output. push ( b'"' ) , output, s)
357
361
}
358
362
359
- enum Connection {
363
+ #[ cfg( feature = "sync-sender-tcp" ) ]
364
+ enum SyncConnection {
360
365
Direct ( Socket ) ,
361
366
Tls ( Box < StreamOwned < ClientConnection , Socket > > ) ,
362
367
}
363
368
364
- impl Connection {
369
+ #[ cfg( feature = "async-sender-tcp" ) ]
370
+ enum AsyncConnection {
371
+ Direct ( tokio:: net:: TcpStream ) ,
372
+ Tls ( Box < tokio_rustls:: client:: TlsStream < tokio:: net:: TcpStream > > ) ,
373
+ }
374
+
375
+ #[ cfg( feature = "sync-sender-tcp" ) ]
376
+ impl SyncConnection {
365
377
fn send_key_id ( & mut self , key_id : & str ) -> Result < ( ) > {
366
378
writeln ! ( self , "{}" , key_id)
367
379
. map_err ( |io_err| map_io_to_socket_err ( "Failed to send key_id: " , io_err) ) ?;
@@ -425,16 +437,22 @@ impl Connection {
425
437
}
426
438
}
427
439
428
- #[ cfg( any( feature = "sync-sender-tcp" , feature = "sync-sender-http" ) ) ]
429
440
enum ProtocolHandler {
430
441
#[ cfg( feature = "sync-sender-tcp" ) ]
431
- Socket ( Connection ) ,
442
+ SyncTcp ( SyncConnection ) ,
432
443
433
444
#[ cfg( feature = "sync-sender-http" ) ]
434
- Http ( HttpHandlerState ) ,
445
+ SyncHttp ( SyncHttpHandlerState ) ,
446
+
447
+ #[ cfg( feature = "async-sender-tcp" ) ]
448
+ AsyncTcp ( AsyncConnection ) ,
449
+
450
+ #[ cfg( feature = "async-sender-http" ) ]
451
+ AsyncHttp ( )
435
452
}
436
453
437
- impl io:: Read for Connection {
454
+ #[ cfg( feature = "sync-sender-tcp" ) ]
455
+ impl io:: Read for SyncConnection {
438
456
fn read ( & mut self , buf : & mut [ u8 ] ) -> io:: Result < usize > {
439
457
match self {
440
458
Self :: Direct ( sock) => sock. read ( buf) ,
@@ -443,7 +461,8 @@ impl io::Read for Connection {
443
461
}
444
462
}
445
463
446
- impl io:: Write for Connection {
464
+ #[ cfg( feature = "sync-sender-tcp" ) ]
465
+ impl IoWrite for SyncConnection {
447
466
fn write ( & mut self , buf : & [ u8 ] ) -> io:: Result < usize > {
448
467
match self {
449
468
Self :: Direct ( sock) => sock. write ( buf) ,
@@ -1385,6 +1404,7 @@ impl Buffer {
1385
1404
}
1386
1405
}
1387
1406
1407
+ #[ cfg( feature = "_sync-sender" ) ]
1388
1408
/// Connects to a QuestDB instance and inserts data via the ILP protocol.
1389
1409
///
1390
1410
/// * To construct an instance, use [`Sender::from_conf`] or the [`SenderBuilder`].
@@ -1399,7 +1419,8 @@ pub struct Sender {
1399
1419
max_name_len : usize ,
1400
1420
}
1401
1421
1402
- impl std:: fmt:: Debug for Sender {
1422
+ #[ cfg( feature = "_sync-sender" ) ]
1423
+ impl Debug for Sender {
1403
1424
fn fmt ( & self , f : & mut Formatter < ' _ > ) -> std:: result:: Result < ( ) , std:: fmt:: Error > {
1404
1425
f. write_str ( self . descr . as_str ( ) )
1405
1426
}
@@ -1873,7 +1894,7 @@ pub struct SenderBuilder {
1873
1894
1874
1895
tls_ca : ConfigSetting < CertificateAuthority > ,
1875
1896
tls_roots : ConfigSetting < Option < PathBuf > > ,
1876
-
1897
+
1877
1898
#[ cfg( feature = "_sender-http" ) ]
1878
1899
http : Option < HttpConfig > ,
1879
1900
}
@@ -2365,6 +2386,7 @@ impl SenderBuilder {
2365
2386
Ok ( self )
2366
2387
}
2367
2388
2389
+ #[ cfg( feature = "sync-sender-tcp" ) ]
2368
2390
fn connect_tcp ( & self , auth : & Option < AuthParams > ) -> Result < ProtocolHandler > {
2369
2391
let addr: SockAddr = gai:: resolve_host_port ( self . host . as_str ( ) , self . port . as_str ( ) ) ?;
2370
2392
let mut sock = Socket :: new ( Domain :: IPV4 , Type :: STREAM , Some ( SockProtocol :: TCP ) )
@@ -2445,16 +2467,16 @@ impl SenderBuilder {
2445
2467
}
2446
2468
} ) ?;
2447
2469
}
2448
- Connection :: Tls ( StreamOwned :: new ( tls_conn, sock) . into ( ) )
2470
+ SyncConnection :: Tls ( StreamOwned :: new ( tls_conn, sock) . into ( ) )
2449
2471
}
2450
- None => Connection :: Direct ( sock) ,
2472
+ None => SyncConnection :: Direct ( sock) ,
2451
2473
} ;
2452
2474
2453
2475
if let Some ( AuthParams :: Ecdsa ( auth) ) = auth {
2454
2476
conn. authenticate ( auth) ?;
2455
2477
}
2456
2478
2457
- Ok ( ProtocolHandler :: Socket ( conn) )
2479
+ Ok ( ProtocolHandler :: SyncTcp ( conn) )
2458
2480
}
2459
2481
2460
2482
fn build_auth ( & self ) -> Result < Option < AuthParams > > {
@@ -2551,6 +2573,7 @@ impl SenderBuilder {
2551
2573
}
2552
2574
}
2553
2575
2576
+ #[ cfg( feature = "_sync-sender" ) ]
2554
2577
/// Build the sender.
2555
2578
///
2556
2579
/// In the case of TCP, this synchronously establishes the TCP connection, and
@@ -2569,6 +2592,7 @@ impl SenderBuilder {
2569
2592
let auth = self . build_auth ( ) ?;
2570
2593
2571
2594
let handler = match self . protocol {
2595
+ #[ cfg( feature = "sync-sender-tcp" ) ]
2572
2596
Protocol :: Tcp | Protocol :: Tcps => self . connect_tcp ( & auth) ?,
2573
2597
#[ cfg( feature = "sync-sender-http" ) ]
2574
2598
Protocol :: Http | Protocol :: Https => {
@@ -2630,7 +2654,7 @@ impl SenderBuilder {
2630
2654
self . host. deref( ) ,
2631
2655
self . port. deref( )
2632
2656
) ;
2633
- ProtocolHandler :: Http ( HttpHandlerState {
2657
+ ProtocolHandler :: SyncHttp ( SyncHttpHandlerState {
2634
2658
agent,
2635
2659
url,
2636
2660
auth,
@@ -2648,7 +2672,7 @@ impl SenderBuilder {
2648
2672
Protocol :: Tcp | Protocol :: Tcps => ProtocolVersion :: V1 ,
2649
2673
#[ cfg( feature = "sync-sender-http" ) ]
2650
2674
Protocol :: Http | Protocol :: Https => {
2651
- if let ProtocolHandler :: Http ( http_state) = & handler {
2675
+ if let ProtocolHandler :: SyncHttp ( http_state) = & handler {
2652
2676
let settings_url = & format ! (
2653
2677
"{}://{}:{}/settings" ,
2654
2678
self . protocol. schema( ) ,
@@ -2843,6 +2867,7 @@ impl F64Serializer {
2843
2867
}
2844
2868
}
2845
2869
2870
+ #[ cfg( feature = "_sync-sender" ) ]
2846
2871
impl Sender {
2847
2872
/// Create a new `Sender` instance from the given configuration string.
2848
2873
///
@@ -2913,7 +2938,7 @@ impl Sender {
2913
2938
return Ok ( ( ) ) ;
2914
2939
}
2915
2940
match self . handler {
2916
- ProtocolHandler :: Socket ( ref mut conn) => {
2941
+ ProtocolHandler :: SyncTcp ( ref mut conn) => {
2917
2942
if transactional {
2918
2943
return Err ( error:: fmt!(
2919
2944
InvalidApiCall ,
@@ -2926,7 +2951,7 @@ impl Sender {
2926
2951
} ) ?;
2927
2952
}
2928
2953
#[ cfg( feature = "sync-sender-http" ) ]
2929
- ProtocolHandler :: Http ( ref state) => {
2954
+ ProtocolHandler :: SyncHttp ( ref state) => {
2930
2955
if transactional && !buf. transactional ( ) {
2931
2956
return Err ( error:: fmt!(
2932
2957
InvalidApiCall ,
0 commit comments