Skip to content

Commit 9ede809

Browse files
submit on leader schedule
more on leader rotation simpler publishing end-to-end latency tracking and backwards in time checking print price account name after add_price support (re)initialization of price accounts rust example to print all pyth price data on devnet track publish latency statistics and hit rates sort dashboard products by asset_type, symbol
1 parent 38c36e2 commit 9ede809

24 files changed

+1073
-121
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22
build
33
debug
44
target
5+
Cargo.lock

CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ set( PC_SRC
3232
pc/mem_map.cpp;
3333
pc/misc.cpp;
3434
pc/net_socket.cpp;
35+
pc/pub_stats.cpp;
3536
pc/replay.cpp;
3637
pc/request.cpp;
3738
pc/rpc_client.cpp;

dashboard/dashboard.js

+9-2
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,17 @@ class Prices
7272
}
7373
get_product_list( res ) {
7474
let tab = document.getElementById( "prices" );
75+
let k = 1;
76+
res.result.sort( function(a,b) {
77+
return a['attr_dict']['symbol'] > b['attr_dict']['symbol'];
78+
} );
79+
res.result.sort( function(a,b) {
80+
return a['attr_dict']['asset_type'] > b['attr_dict']['asset_type'];
81+
} );
7582
for( let i = 0; i != res.result.length; ++i ) {
7683
let sym = res.result[i];
7784
let att = sym['attr_dict']
78-
for( let j=0; j != sym.price.length; ++j ) {
85+
for( let j=0; j != sym.price.length; ++j, ++k ) {
7986
let pxa = sym.price[j];
8087
let row = tab.insertRow(-1);
8188
row.appendChild( this.get_title( att, 'asset_type') );
@@ -96,7 +103,7 @@ class Prices
96103
'account' : pxa['account']
97104
}
98105
};
99-
pxa.idx = i+1;
106+
pxa.idx = k;
100107
this.send( msg, this.get_price.bind( this, pxa ) );
101108
}
102109
}

etc/products.json

-12
Original file line numberDiff line numberDiff line change
@@ -125,17 +125,5 @@
125125
"nasdaq_symbol": "QQQ"
126126
}
127127
}
128-
{
129-
"account": "96NiWypEbNs56cBEiYACEzZtFTLqhV2TQ6wH2A2CqxbM",
130-
"attr_dict":{
131-
"symbol": "GBP/USD",
132-
"asset_type": "Currency",
133-
"country": "US",
134-
"description": "BRITISH POUND STERLING",
135-
"quote_currency" : "USD",
136-
"tenor": "Spot",
137-
"jlqd_symbol": "GBPUSD"
138-
}
139-
}
140128
]
141129

pc/manager.cpp

+68-63
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ using namespace pc;
77
#define PC_RPC_WEBSOCKET_PORT 8900
88
#define PC_RECONNECT_TIMEOUT (120L*1000000000L)
99
#define PC_BLOCKHASH_TIMEOUT 3
10+
#define PC_PUB_INTERVAL (293L*PC_NSECS_IN_MSEC)
1011

1112
///////////////////////////////////////////////////////////////////////////
1213
// manager_sub
@@ -39,17 +40,17 @@ manager::manager()
3940
status_( 0 ),
4041
num_sub_( 0 ),
4142
version_( PC_VERSION ),
42-
kidx_( 0 ),
43+
kidx_( (unsigned)-1 ),
4344
cts_( 0L ),
4445
ctimeout_( PC_NSECS_IN_SEC ),
45-
slot_ts_( 0L ),
46-
slot_int_( 0L ),
47-
slot_min_( 0L ),
4846
slot_( 0UL ),
4947
slot_cnt_( 0UL ),
48+
curr_ts_( 0L ),
49+
pub_ts_( 0L ),
50+
pub_int_( PC_PUB_INTERVAL ),
5051
wait_conn_( false ),
5152
do_cap_( false ),
52-
first_ack_( true )
53+
is_pub_( false )
5354
{
5455
breq_->set_sub( this );
5556
sreq_->set_sub( this );
@@ -105,6 +106,16 @@ std::string manager::get_capture_file() const
105106
return cap_.get_file();
106107
}
107108

109+
void manager::set_publish_interval( int64_t pub_int )
110+
{
111+
pub_int_ = pub_int * PC_NSECS_IN_MSEC;
112+
}
113+
114+
int64_t manager::get_publish_interval() const
115+
{
116+
return pub_int_ / PC_NSECS_IN_MSEC;
117+
}
118+
108119
void manager::set_do_capture( bool do_cap )
109120
{
110121
do_cap_ = do_cap;
@@ -155,6 +166,11 @@ manager_sub *manager::get_manager_sub() const
155166
return sub_;
156167
}
157168

169+
int64_t manager::get_curr_time() const
170+
{
171+
return curr_ts_;
172+
}
173+
158174
rpc_client *manager::get_rpc_client()
159175
{
160176
return &clnt_;
@@ -170,16 +186,6 @@ uint64_t manager::get_slot() const
170186
return slot_;
171187
}
172188

173-
int64_t manager::get_slot_time() const
174-
{
175-
return slot_ts_;
176-
}
177-
178-
int64_t manager::get_slot_interval() const
179-
{
180-
return slot_int_;
181-
}
182-
183189
void manager::teardown()
184190
{
185191
PC_LOG_INF( "pythd_teardown" ).end();
@@ -258,6 +264,7 @@ bool manager::init()
258264
PC_LOG_INF( "initialized" )
259265
.add( "version", version_ )
260266
.add( "capture_file", get_capture_file() )
267+
.add( "publish_interval(ms)", get_publish_interval() )
261268
.end();
262269

263270
return true;
@@ -347,24 +354,30 @@ void manager::poll( bool do_wait )
347354
// destroy any users scheduled for deletion
348355
teardown_users();
349356

350-
// check if we need to reconnect rpc services
351-
if ( PC_UNLIKELY( !has_status( PC_PYTH_RPC_CONNECTED ) ||
352-
hconn_.get_is_err() || wconn_.get_is_err() )) {
357+
// get current time
358+
curr_ts_ = get_now();
359+
360+
// submit new quotes while connected
361+
if ( has_status( PC_PYTH_RPC_CONNECTED ) &&
362+
!hconn_.get_is_err() &&
363+
!wconn_.get_is_err() ) {
364+
poll_schedule();
365+
} else {
353366
reconnect_rpc();
354-
return;
355367
}
368+
}
356369

357-
// schedule next price update but only if we're connected to rpc port
358-
int64_t ts = get_now();
359-
while ( kidx_ < kvec_.size() ) {
370+
void manager::poll_schedule()
371+
{
372+
while ( is_pub_ && kidx_ < kvec_.size() ) {
360373
price_sched *kptr = kvec_[kidx_];
361-
int64_t tot_ts = slot_min_ - ack_ts_ - ack_ts_;
362-
tot_ts = std::max( slot_min_/10, tot_ts );
363-
int64_t pub_ts = slot_ts_ + ( tot_ts * kptr->get_hash() ) /
374+
int64_t pub_ts = pub_ts_ + ( pub_int_ * kptr->get_hash() ) /
364375
price_sched::fraction;
365-
if ( ts > pub_ts ) {
376+
if ( curr_ts_ > pub_ts ) {
366377
kptr->schedule();
367-
++kidx_;
378+
if ( ++kidx_ >= kvec_.size() ) {
379+
is_pub_ = false;
380+
}
368381
} else {
369382
break;
370383
}
@@ -391,15 +404,17 @@ void manager::reconnect_rpc()
391404

392405
// reset state
393406
wait_conn_ = false;
394-
first_ack_ = true;
407+
is_pub_ = false;
408+
kidx_ = 0;
395409
ctimeout_ = PC_NSECS_IN_SEC;
396-
slot_ts_ = slot_int_ = 0L;
410+
pub_ts_ = 0L;
397411
slot_cnt_ = 0UL;
412+
slot_ = 0L;
398413
num_sub_ = 0;
399414
clnt_.reset();
400415
plist_.clear();
401416

402-
// resubmit slot subscription
417+
// subscribe to slots and get first block hash
403418
clnt_.send( sreq_ );
404419

405420
// resubscribe to mapping and symbol accounts
@@ -419,6 +434,13 @@ void manager::reconnect_rpc()
419434
add_map_sub();
420435
}
421436
}
437+
438+
// subscribe to mapping account if not done before
439+
pub_key *mpub = get_mapping_pub_key();
440+
if ( mvec_.empty() && mpub ) {
441+
add_mapping( *mpub );
442+
}
443+
422444
// callback user with connection status
423445
if ( sub_ ) {
424446
sub_->on_connect( this );
@@ -518,36 +540,31 @@ void manager::schedule( price_sched *kptr )
518540

519541
void manager::on_response( rpc::slot_subscribe *res )
520542
{
521-
if ( !slot_ts_ ) {
522-
slot_ts_ = res->get_recv_time();
543+
// ignore slots that go back in time
544+
uint64_t slot = res->get_slot();
545+
int64_t ts = res->get_recv_time();
546+
if ( slot <= slot_ ) {
523547
return;
524548
}
525-
slot_ = res->get_slot();
526-
slot_int_ = res->get_recv_time() - slot_ts_;
527-
slot_ts_ = res->get_recv_time();
549+
slot_ = slot;
550+
PC_LOG_DBG( "receive slot" ).add( "slot", slot_ ).end();
551+
552+
// submit block hash every N slots
528553
if ( slot_cnt_++ % PC_BLOCKHASH_TIMEOUT == 0 ) {
529-
// submit block hash every N slots
530554
clnt_.send( breq_ );
531555
}
532-
// reset scheduler
533-
kidx_ = 0;
534556

535-
// derive minimum slot interval
536-
if ( slot_min_ ) {
537-
slot_min_ = std::min( slot_int_, slot_min_ );
538-
} else {
539-
slot_min_ = slot_int_;
557+
// reset submit
558+
if ( !is_pub_ ) {
559+
kidx_ = 0;
560+
pub_ts_ = ts;
561+
is_pub_ = true;
540562
}
541563

542564
// flush capture
543565
if ( do_cap_ ) {
544566
cap_.flush();
545567
}
546-
PC_LOG_DBG( "receive slot" )
547-
.add( "slot_num", res->get_slot() )
548-
.add( "slot_int", 1e-6*slot_int_ )
549-
.add( "slot_min", 1e-6*slot_min_ )
550-
.end();
551568
}
552569

553570
void manager::on_response( rpc::get_recent_block_hash *m )
@@ -557,30 +574,18 @@ void manager::on_response( rpc::get_recent_block_hash *m )
557574
+ m->get_err_msg() + "]" );
558575
return;
559576
}
560-
static const double afactor = 2./(1+8.);
561577
int64_t ack_ts = m->get_recv_time() - m->get_sent_time();
562-
if ( !first_ack_ ) {
563-
ack_ts_ = (1.-afactor)*ack_ts_ + afactor * ack_ts;
564-
} else {
565-
ack_ts_ = ack_ts;
566-
first_ack_ = false;
567-
}
568578
if ( has_status( PC_PYTH_HAS_BLOCK_HASH ) ) {
569579
return;
570580
}
571581
// set initialized status for block hash
572582
set_status( PC_PYTH_HAS_BLOCK_HASH );
573583
PC_LOG_INF( "received_recent_block_hash" )
574-
.add( "slot", m->get_slot() )
575-
.add( "slot_interval(ms)", 1e-6*slot_int_ )
576-
.add( "rount_trip_time(ms)", 1e-6*ack_ts_ )
584+
.add( "curr_slot", slot_ )
585+
.add( "hash_slot", m->get_slot() )
586+
.add( "rount_trip_time(ms)", 1e-6*ack_ts )
577587
.end();
578588

579-
// subscribe to mapping account if not done before
580-
pub_key *mpub = get_mapping_pub_key();
581-
if ( mvec_.empty() && mpub ) {
582-
add_mapping( *mpub );
583-
}
584589
}
585590

586591
void manager::submit( request *req )

pc/manager.hpp

+12-11
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,17 @@ namespace pc
7373
void set_capture_file( const std::string& cap_file );
7474
std::string get_capture_file() const;
7575

76+
// override default publish interval (in milliseconds)
77+
void set_publish_interval( int64_t mill_secs );
78+
int64_t get_publish_interval() const;
79+
7680
// event subscription callback
7781
void set_manager_sub( manager_sub * );
7882
manager_sub *get_manager_sub() const;
7983

84+
// current time in manager in nanoseconds from epoch
85+
int64_t get_curr_time() const;
86+
8087
// rpc client interface
8188
rpc_client *get_rpc_client();
8289

@@ -86,12 +93,6 @@ namespace pc
8693
// get most recently processed slot
8794
uint64_t get_slot() const;
8895

89-
// slot start time esitimate
90-
int64_t get_slot_time() const;
91-
92-
// slot interval time estimate
93-
int64_t get_slot_interval() const;
94-
9596
// add and subscribe to new mapping account
9697
void add_mapping( const pub_key& );
9798

@@ -171,6 +172,7 @@ namespace pc
171172
void reconnect_rpc();
172173
void log_disconnect();
173174
void teardown_users();
175+
void poll_schedule();
174176
void reset_status( int );
175177

176178
net_loop nl_; // epoll loop
@@ -193,16 +195,15 @@ namespace pc
193195
uint32_t kidx_; // schedule index
194196
int64_t cts_; // (re)connect timestamp
195197
int64_t ctimeout_; // connection timeout
196-
int64_t slot_ts_; // slot start time estimate
197-
int64_t slot_int_; // slot interval
198-
int64_t slot_min_; // slot minimum interval
199198
uint64_t slot_; // current slot
200199
uint64_t slot_cnt_; // slot count
201-
int64_t ack_ts_; // ack time ema
200+
int64_t curr_ts_; // current time
201+
int64_t pub_ts_; // start publish time
202+
int64_t pub_int_; // publish interval
202203
kpx_vec_t kvec_; // symbol price scheduling
203204
bool wait_conn_;// waiting on connection
204205
bool do_cap_; // do capture flag
205-
bool first_ack_;// first ack flag
206+
bool is_pub_; // is publishing mode
206207
capture cap_; // aggregate price capture
207208

208209
// requests

pc/misc.hpp

+8
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#define PC_PACKED __attribute__((__packed__))
77
#define PC_UNLIKELY(ARG) __builtin_expect((ARG),1)
88
#define PC_NSECS_IN_SEC 1000000000L
9+
#define PC_NSECS_IN_MSEC 1000000L
910

1011
namespace pc
1112
{
@@ -44,6 +45,7 @@ namespace pc
4445
str( const std::string& );
4546
std::string as_string() const;
4647
bool operator==( const str& ) const;
48+
bool operator!=( const str& ) const;
4749
const char *str_;
4850
size_t len_;
4951
};
@@ -77,6 +79,12 @@ namespace pc
7779
0 == __builtin_strncmp( str_, obj.str_, len_);
7880
}
7981

82+
inline bool str::operator!=( const str& obj ) const
83+
{
84+
return len_ != obj.len_ ||
85+
0 != __builtin_strncmp( str_, obj.str_, len_);
86+
}
87+
8088
inline std::string str::as_string() const
8189
{
8290
return std::string( str_, len_ );

0 commit comments

Comments
 (0)