Skip to content

Commit 1c0ec08

Browse files
fix edge-case where we might see a very large ack round-trip time
1 parent a743429 commit 1c0ec08

File tree

2 files changed

+16
-11
lines changed

2 files changed

+16
-11
lines changed

pc/manager.cpp

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ using namespace pc;
66
#define PC_RPC_HTTP_PORT 8899
77
#define PC_RPC_WEBSOCKET_PORT 8900
88
#define PC_RECONNECT_TIMEOUT (120L*1000000000L)
9-
#define PC_BLOCKHASH_TIMEOUT 5
9+
#define PC_BLOCKHASH_TIMEOUT 3
1010

1111
///////////////////////////////////////////////////////////////////////////
1212
// manager_sub
@@ -47,10 +47,9 @@ manager::manager()
4747
slot_min_( 0L ),
4848
slot_( 0UL ),
4949
slot_cnt_( 0UL ),
50-
cum_ack_( 0L ),
51-
num_ack_( 0L ),
5250
wait_conn_( false ),
53-
do_cap_( false )
51+
do_cap_( false ),
52+
first_ack_( true )
5453
{
5554
breq_->set_sub( this );
5655
sreq_->set_sub( this );
@@ -356,8 +355,8 @@ void manager::poll( bool do_wait )
356355
int64_t ts = get_now();
357356
while ( kidx_ < kvec_.size() ) {
358357
price_sched *kptr = kvec_[kidx_];
359-
int64_t ack_ts = num_ack_?(cum_ack_/num_ack_):0L;
360-
int64_t tot_ts = slot_min_ - ack_ts - ack_ts;
358+
int64_t tot_ts = slot_min_ - ack_ts_ - ack_ts_;
359+
tot_ts = std::max( slot_min_/10, tot_ts );
361360
int64_t pub_ts = slot_ts_ + ( tot_ts * kptr->get_hash() ) /
362361
price_sched::fraction;
363362
if ( ts > pub_ts ) {
@@ -392,6 +391,7 @@ void manager::reconnect_rpc()
392391

393392
// reset state
394393
wait_conn_ = false;
394+
first_ack_ = true;
395395
ctimeout_ = PC_NSECS_IN_SEC;
396396
slot_ts_ = slot_int_ = 0L;
397397
slot_cnt_ = 0UL;
@@ -551,9 +551,14 @@ void manager::on_response( rpc::get_recent_block_hash *m )
551551
+ m->get_err_msg() + "]" );
552552
return;
553553
}
554+
static const double afactor = 2./(1+8.);
554555
int64_t ack_ts = m->get_recv_time() - m->get_sent_time();
555-
cum_ack_ += ack_ts;
556-
++num_ack_;
556+
if ( !first_ack_ ) {
557+
ack_ts_ = (1.-afactor)*ack_ts_ + afactor * ack_ts;
558+
} else {
559+
ack_ts_ = ack_ts;
560+
first_ack_ = false;
561+
}
557562
if ( has_status( PC_PYTH_HAS_BLOCK_HASH ) ) {
558563
return;
559564
}
@@ -562,7 +567,7 @@ void manager::on_response( rpc::get_recent_block_hash *m )
562567
PC_LOG_INF( "received_recent_block_hash" )
563568
.add( "slot", m->get_slot() )
564569
.add( "slot_interval(ms)", 1e-6*slot_int_ )
565-
.add( "rount_trip_time(ms)", 1e-6*ack_ts )
570+
.add( "rount_trip_time(ms)", 1e-6*ack_ts_ )
566571
.end();
567572

568573
// subscribe to mapping account if not done before

pc/manager.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,11 +219,11 @@ namespace pc
219219
int64_t slot_min_; // slot minimum interval
220220
uint64_t slot_; // current slot
221221
uint64_t slot_cnt_; // slot count
222-
int64_t cum_ack_; // cumulative block hash ack times
223-
int64_t num_ack_; // number of block hash acks
222+
int64_t ack_ts_; // ack time ema
224223
kpx_vec_t kvec_; // symbol price scheduling
225224
bool wait_conn_;// waiting on connection
226225
bool do_cap_; // do capture flag
226+
bool first_ack_;// first ack flag
227227
capture cap_; // aggregate price capture
228228

229229
// requests

0 commit comments

Comments
 (0)