@@ -24,8 +24,8 @@ use aptos_data_client::{
24
24
use aptos_id_generator:: { IdGenerator , U64IdGenerator } ;
25
25
use aptos_infallible:: Mutex ;
26
26
use aptos_logger:: prelude:: * ;
27
- use channel :: { aptos_channel , message_queues :: QueueStyle } ;
28
- use futures:: { stream:: FusedStream , Stream } ;
27
+ use futures :: channel :: mpsc ;
28
+ use futures:: { stream:: FusedStream , SinkExt , Stream } ;
29
29
use std:: {
30
30
collections:: { BTreeMap , VecDeque } ,
31
31
pin:: Pin ,
@@ -79,7 +79,7 @@ pub struct DataStream<T> {
79
79
notifications_to_responses : BTreeMap < NotificationId , ResponseContext > ,
80
80
81
81
// The channel on which to send data notifications when they are ready.
82
- notification_sender : channel :: aptos_channel :: Sender < ( ) , DataNotification > ,
82
+ notification_sender : mpsc :: Sender < DataNotification > ,
83
83
84
84
// A unique notification ID generator
85
85
notification_id_generator : Arc < U64IdGenerator > ,
@@ -108,11 +108,8 @@ impl<T: AptosDataClient + Send + Clone + 'static> DataStream<T> {
108
108
advertised_data : & AdvertisedData ,
109
109
) -> Result < ( Self , DataStreamListener ) , Error > {
110
110
// Create a new data stream listener
111
- let ( notification_sender, notification_receiver) = aptos_channel:: new (
112
- QueueStyle :: FIFO , // If the stream overflows, drop the new messages
113
- config. max_data_stream_channel_sizes as usize ,
114
- None ,
115
- ) ;
111
+ let ( notification_sender, notification_receiver) =
112
+ mpsc:: channel ( config. max_data_stream_channel_sizes as usize ) ;
116
113
let data_stream_listener = DataStreamListener :: new ( data_stream_id, notification_receiver) ;
117
114
118
115
// Create a new stream engine
@@ -276,8 +273,13 @@ impl<T: AptosDataClient + Send + Clone + 'static> DataStream<T> {
276
273
pending_client_response
277
274
}
278
275
279
- fn send_data_notification ( & mut self , data_notification : DataNotification ) -> Result < ( ) , Error > {
280
- if let Err ( error) = self . notification_sender . push ( ( ) , data_notification) {
276
+ // TODO(joshlind): this function shouldn't be blocking when trying to send! If there are
277
+ // multiple streams, a single blocked stream could cause them all to block.
278
+ async fn send_data_notification (
279
+ & mut self ,
280
+ data_notification : DataNotification ,
281
+ ) -> Result < ( ) , Error > {
282
+ if let Err ( error) = self . notification_sender . send ( data_notification) . await {
281
283
let error = Error :: UnexpectedErrorEncountered ( error. to_string ( ) ) ;
282
284
warn ! (
283
285
( LogSchema :: new( LogEntry :: StreamNotification )
@@ -298,7 +300,7 @@ impl<T: AptosDataClient + Send + Clone + 'static> DataStream<T> {
298
300
self . send_failure
299
301
}
300
302
301
- fn send_end_of_stream_notification ( & mut self ) -> Result < ( ) , Error > {
303
+ async fn send_end_of_stream_notification ( & mut self ) -> Result < ( ) , Error > {
302
304
// Create end of stream notification
303
305
let notification_id = self . notification_id_generator . next ( ) ;
304
306
let data_notification = DataNotification {
@@ -314,12 +316,12 @@ impl<T: AptosDataClient + Send + Clone + 'static> DataStream<T> {
314
316
. message( "Sent the end of stream notification" ) )
315
317
) ;
316
318
self . stream_end_notification_id = Some ( notification_id) ;
317
- self . send_data_notification ( data_notification)
319
+ self . send_data_notification ( data_notification) . await
318
320
}
319
321
320
322
/// Processes any data client responses that have been received. Note: the
321
323
/// responses must be processed in FIFO order.
322
- pub fn process_data_responses (
324
+ pub async fn process_data_responses (
323
325
& mut self ,
324
326
global_data_summary : GlobalDataSummary ,
325
327
) -> Result < ( ) , Error > {
@@ -328,25 +330,26 @@ impl<T: AptosDataClient + Send + Clone + 'static> DataStream<T> {
328
330
|| self . send_failure
329
331
{
330
332
if !self . send_failure && self . stream_end_notification_id . is_none ( ) {
331
- self . send_end_of_stream_notification ( ) ?;
333
+ self . send_end_of_stream_notification ( ) . await ?;
332
334
}
333
335
return Ok ( ( ) ) ; // There's nothing left to do
334
336
}
335
337
336
338
// Process any ready data responses
337
339
for _ in 0 ..self . get_max_concurrent_requests ( ) {
338
340
if let Some ( pending_response) = self . pop_pending_response_queue ( ) {
339
- let mut pending_response = pending_response. lock ( ) ;
340
341
let client_response = pending_response
342
+ . lock ( )
341
343
. client_response
342
344
. take ( )
343
345
. expect ( "The client response should be ready!" ) ;
344
- let client_request = & pending_response. client_request ;
346
+ let client_request = & pending_response. lock ( ) . client_request . clone ( ) ;
345
347
346
348
match client_response {
347
349
Ok ( client_response) => {
348
350
if sanity_check_client_response ( client_request, & client_response) {
349
- self . send_data_notification_to_client ( client_request, client_response) ?;
351
+ self . send_data_notification_to_client ( client_request, client_response)
352
+ . await ?;
350
353
} else {
351
354
self . handle_sanity_check_failure (
352
355
client_request,
@@ -457,7 +460,7 @@ impl<T: AptosDataClient + Send + Clone + 'static> DataStream<T> {
457
460
}
458
461
459
462
/// Sends a data notification to the client along the stream
460
- fn send_data_notification_to_client (
463
+ async fn send_data_notification_to_client (
461
464
& mut self ,
462
465
data_client_request : & DataClientRequest ,
463
466
data_client_response : Response < ResponsePayload > ,
@@ -487,7 +490,7 @@ impl<T: AptosDataClient + Send + Clone + 'static> DataStream<T> {
487
490
notification_id
488
491
) ) )
489
492
) ;
490
- self . send_data_notification ( data_notification) ?;
493
+ self . send_data_notification ( data_notification) . await ?;
491
494
492
495
// Reset the failure count. We've sent a notification and can move on.
493
496
self . request_failure_count = 0 ;
@@ -606,7 +609,7 @@ impl<T> Drop for DataStream<T> {
606
609
#[ derive( Debug ) ]
607
610
pub struct DataStreamListener {
608
611
pub data_stream_id : DataStreamId ,
609
- notification_receiver : channel :: aptos_channel :: Receiver < ( ) , DataNotification > ,
612
+ notification_receiver : mpsc :: Receiver < DataNotification > ,
610
613
611
614
/// Stores the number of consecutive timeouts encountered when listening to this stream
612
615
pub num_consecutive_timeouts : u64 ,
@@ -615,7 +618,7 @@ pub struct DataStreamListener {
615
618
impl DataStreamListener {
616
619
pub fn new (
617
620
data_stream_id : DataStreamId ,
618
- notification_receiver : channel :: aptos_channel :: Receiver < ( ) , DataNotification > ,
621
+ notification_receiver : mpsc :: Receiver < DataNotification > ,
619
622
) -> Self {
620
623
Self {
621
624
data_stream_id,
0 commit comments