Skip to content

Commit c3066d7

Browse files
unsubcribe on error; added how to get contributing price in callback
1 parent 1c0ec08 commit c3066d7

File tree

1 file changed

+42
-3
lines changed

1 file changed

+42
-3
lines changed

pctest/test_publish.cpp

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ class test_publish : public pc::request_sub,
4545
void on_response( pc::price_sched *, uint64_t ) override;
4646

4747
private:
48+
void unsubscribe();
49+
4850
pc::request_sub_set sub_; // request subscriptions for this object
4951
int64_t px_; // price to submit
5052
uint64_t sprd_; // confidence interval or bid-ask spread
@@ -106,18 +108,43 @@ bool test_connect::get_is_init() const
106108

107109
test_publish::~test_publish()
108110
{
111+
unsubscribe();
112+
}
113+
114+
void test_publish::unsubscribe()
115+
{
116+
// unsubscribe to callbacks
109117
sub_.del( sid1_ ); // unsubscribe price updates
110118
sub_.del( sid2_ ); // unsubscribe price schedule updates
111119
}
112120

113-
void test_publish::on_response( pc::price *sym, uint64_t sub_id )
121+
void test_publish::on_response( pc::price *sym, uint64_t )
114122
{
123+
// check if currently in error
115124
if ( sym->get_is_err() ) {
116125
PC_LOG_ERR( "error receiving aggregate price" )
117126
.add( "err", sym->get_err_msg() )
118127
.end();
128+
unsubscribe();
119129
return;
120130
}
131+
132+
// get my own contribution to the aggregate
133+
pc::symbol_status my_status = pc::symbol_status::e_unknown;
134+
double my_price = 0., my_conf = 0.;
135+
uint64_t my_slot = 0UL;
136+
pc::pub_key *my_key = sym->get_manager()->get_publish_pub_key();
137+
for(unsigned i=0; i != sym->get_num_publisher(); ++i ) {
138+
const pc::pub_key *ikey = sym->get_publisher( i );
139+
if ( *my_key == *ikey ) {
140+
my_price = expo_ * (double)sym->get_publisher_price( i );
141+
my_conf = expo_ * (double)sym->get_publisher_conf( i );
142+
my_slot = sym->get_publisher_slot( i );
143+
my_status = sym->get_publisher_status( i );
144+
break;
145+
}
146+
}
147+
121148
// received aggregate price update for this symbol
122149
double price = expo_ * (double)sym->get_price();
123150
double spread = expo_ * (double)sym->get_conf();
@@ -129,14 +156,26 @@ void test_publish::on_response( pc::price *sym, uint64_t sub_id )
129156
.add( "agg_spread", spread )
130157
.add( "valid_slot", sym->get_valid_slot() )
131158
.add( "pub_slot", sym->get_pub_slot() )
132-
.add( "sub_id", sub_id )
159+
.add( "my_price", my_price )
160+
.add( "my_conf", my_conf )
161+
.add( "my_status", pc::symbol_status_to_str( my_status ) )
162+
.add( "my_slot", my_slot )
133163
.end();
134164
}
135165

136166
void test_publish::on_response( pc::price_sched *ptr, uint64_t sub_id )
137167
{
138-
// submit next price to block chain for this symbol
168+
// check if currently in error
139169
pc::price *sym = ptr->get_price();
170+
if ( sym->get_is_err() ) {
171+
PC_LOG_ERR( "aggregate price in error" )
172+
.add( "err", sym->get_err_msg() )
173+
.end();
174+
unsubscribe();
175+
return;
176+
}
177+
178+
// submit next price to block chain for this symbol
140179
if ( !sym->update( px_, sprd_, pc::symbol_status::e_trading ) ) {
141180
PC_LOG_ERR( "failed to submit price" )
142181
.add( "symbol", *sym->get_symbol() )

0 commit comments

Comments
 (0)