-
Notifications
You must be signed in to change notification settings - Fork 34
Expand file tree
/
Copy pathperf_ledger.py
More file actions
1942 lines (1647 loc) · 105 KB
/
perf_ledger.py
File metadata and controls
1942 lines (1647 loc) · 105 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import json
import math
import os
import time
import traceback
import datetime
from collections import defaultdict
from copy import deepcopy
from enum import Enum
from typing import List
import bittensor as bt
from pydantic import BaseModel, ConfigDict
from setproctitle import setproctitle
from vali_objects.utils.position_source import PositionSourceManager, PositionSource
from shared_objects.sn8_multiprocessing import ParallelizationMode, get_spark_session, get_multiprocessing_pool
from shared_objects.mock_metagraph import MockMetagraph
from time_util.time_util import MS_IN_8_HOURS, MS_IN_24_HOURS, timeme
import vali_objects.position as position_file
from shared_objects.cache_controller import CacheController
from time_util.time_util import TimeUtil, UnifiedMarketCalendar
from vali_objects.utils.elimination_manager import EliminationManager, EliminationReason
from vali_objects.utils.position_manager import PositionManager
from vali_objects.vali_config import ValiConfig
from vali_objects.position import Position
from vali_objects.utils.live_price_fetcher import LivePriceFetcher
from vali_objects.utils.vali_bkp_utils import ValiBkpUtils
from vali_objects.utils.vali_utils import ValiUtils
TP_ID_PORTFOLIO = 'portfolio'
class ShortcutReason(Enum):
NO_SHORTCUT = 0
NO_OPEN_POSITIONS = 1
OUTSIDE_WINDOW = 2
ZERO_TIME_DELTA = 3
class FeeCache():
def __init__(self):
self.spread_fee: float = 1.0
self.spread_fee_last_order_processed_ms: int = 0
self.carry_fee: float = 1.0 # product of all individual interval fees.
self.carry_fee_next_increase_time_ms: int = 0 # Compute fees based off the prior interval
def get_spread_fee(self, position: Position, current_time_ms: int) -> (float, bool):
# Non cache after SLIPPAGE_V1_TIME_MS. Fee is just 1.
if current_time_ms < position_file.SLIPPAGE_V1_TIME_MS:
if position.orders[-1].processed_ms == self.spread_fee_last_order_processed_ms:
return self.spread_fee, False
if position.is_closed_position:
current_time_ms = min(current_time_ms, position.close_ms)
self.spread_fee = position.get_spread_fee(current_time_ms)
self.spread_fee_last_order_processed_ms = position.orders[-1].processed_ms
return self.spread_fee, True
def get_carry_fee(self, current_time_ms, position: Position) -> (float, bool):
# Calculate the number of times a new day occurred (UTC). If a position is opened at 23:59:58 and this function is
# called at 00:00:02, the carry fee will be calculated as if a day has passed. Another example: if a position is
# opened at 23:59:58 and this function is called at 23:59:59, the carry fee will be calculated as 0 days have passed
if position.is_closed_position:
current_time_ms = min(current_time_ms, position.close_ms)
# cache hit?
if position.trade_pair.is_crypto:
start_time_cache_hit = self.carry_fee_next_increase_time_ms - MS_IN_8_HOURS
elif position.trade_pair.is_forex or position.trade_pair.is_indices or position.trade_pair.is_equities:
start_time_cache_hit = self.carry_fee_next_increase_time_ms - MS_IN_24_HOURS
else:
raise Exception(f"Unknown trade pair type: {position.trade_pair}")
if start_time_cache_hit <= current_time_ms < self.carry_fee_next_increase_time_ms:
return self.carry_fee, False
# cache miss
carry_fee, next_update_time_ms = position.get_carry_fee(current_time_ms)
assert next_update_time_ms > current_time_ms, [TimeUtil.millis_to_verbose_formatted_date_str(x) for x in (self.carry_fee_next_increase_time_ms, next_update_time_ms, current_time_ms)] + [carry_fee, position] + [self.carry_fee_next_increase_time_ms, next_update_time_ms, current_time_ms]
assert carry_fee >= 0, (carry_fee, next_update_time_ms, position)
self.carry_fee = carry_fee
self.carry_fee_next_increase_time_ms = next_update_time_ms
return self.carry_fee, True
# Enum class TradePairReturnStatus with 3 options 1. TP_MARKET_NOT_OPEN, TP_MARKET_OPEN_NO_PRICE_CHANGE, TP_MARKET_OPEN_PRICE_CHANGE
class TradePairReturnStatus(Enum):
TP_NO_OPEN_POSITIONS = 0
TP_MARKET_NOT_OPEN = 1
TP_MARKET_OPEN_NO_PRICE_CHANGE = 2
TP_MARKET_OPEN_PRICE_CHANGE = 3
# Define greater than oeprator for TradePairReturnStatus
def __gt__(self, other):
return self.value > other.value
class PerfCheckpoint(BaseModel):
last_update_ms: int
prev_portfolio_ret: float
prev_portfolio_spread_fee: float = 1.0
prev_portfolio_carry_fee: float = 1.0
accum_ms: int = 0
open_ms: int = 0
n_updates: int = 0
gain: float = 0.0
loss: float = 0.0
spread_fee_loss: float = 0.0
carry_fee_loss: float = 0.0
mdd: float = 1.0
mpv: float = 0.0
model_config = ConfigDict(extra="allow")
def __str__(self):
return str(self.to_dict())
def to_dict(self):
return self.__dict__
@property
def lowerbound_time_created_ms(self):
# accum_ms boundary alignment makes this a lowerbound for the first cp.
return self.last_update_ms - self.accum_ms
class PerfLedger():
def __init__(self, initialization_time_ms: int=0, max_return:float=1.0,
target_cp_duration_ms:int=ValiConfig.TARGET_CHECKPOINT_DURATION_MS,
target_ledger_window_ms=ValiConfig.TARGET_LEDGER_WINDOW_MS, cps: list[PerfCheckpoint]=None,
tp_id: str=TP_ID_PORTFOLIO):
if cps is None:
cps = []
self.max_return = float(max_return)
self.target_cp_duration_ms = int(target_cp_duration_ms)
self.target_ledger_window_ms = target_ledger_window_ms
self.initialization_time_ms = int(initialization_time_ms)
self.tp_id = str(tp_id)
self.cps = cps
def to_dict(self):
return {
"initialization_time_ms": self.initialization_time_ms,
"max_return": self.max_return,
"target_cp_duration_ms": self.target_cp_duration_ms,
"target_ledger_window_ms": self.target_ledger_window_ms,
"cps": [cp.to_dict() for cp in self.cps]
}
@classmethod
def from_dict(cls, x):
assert isinstance(x, dict), x
x['cps'] = [PerfCheckpoint(**cp) for cp in x['cps']]
x.pop('global_worst_mdd', None) # Remove legacy field if present
x.pop('last_known_prices', None) # Remove legacy field if present
instance = cls(**x)
return instance
@property
def mdd(self):
return min(cp.mdd for cp in self.cps) if self.cps else 1.0
@property
def total_open_ms(self):
if len(self.cps) == 0:
return 0
return sum(cp.open_ms for cp in self.cps)
@property
def last_update_ms(self):
if len(self.cps) == 0: # important to return 0 as default value. Otherwise update flow wont trigger after init.
return 0
return self.cps[-1].last_update_ms
@property
def prev_portfolio_ret(self):
if len(self.cps) == 0:
return 1.0 # Initial value
return self.cps[-1].prev_portfolio_ret
@property
def start_time_ms(self):
if len(self.cps) == 0:
return 0
elif self.initialization_time_ms != 0: # 0 default value for old ledgers that haven't rebuilt as of this update.
return self.initialization_time_ms
else:
return self.cps[0].lowerbound_time_created_ms # legacy calculation that will stop being used in ~24 hrs
def init_max_portfolio_value(self):
if self.cps:
self.max_return = max(x.mpv for x in self.cps)
# Initial portfolio value is 1.0
self.max_return = max(self.max_return, 1.0)
def init_with_first_order(self, order_processed_ms: int, point_in_time_dd: float, current_portfolio_value: float, current_portfolio_fee_spread:float, current_portfolio_carry:float):
# figure out how many ms we want to initalize the checkpoint with so that once self.target_cp_duration_ms is
# reached, the CP ends at 00:00:00 UTC or 12:00:00 UTC (12 hr cp case). This may change based on self.target_cp_duration_ms
# |----x------midday-----------| -> accum_ms_for_utc_alignment = (distance between start of day and x) = x - start_of_day_ms
# |-----------midday-----x-----| -> accum_ms_for_utc_alignment = (distance between midday and x) = x - midday_ms
# By calculating the initial accum_ms this way, the co will always end at middday or 00:00:00 the next day.
datetime_representation = TimeUtil.millis_to_datetime(order_processed_ms)
assert self.target_cp_duration_ms == 43200000, f'self.target_cp_duration_ms is not 12 hours {self.target_cp_duration_ms}'
midday = datetime_representation.replace(hour=12, minute=0, second=0, microsecond=0)
midday_ms = int(midday.timestamp() * 1000)
if order_processed_ms < midday_ms:
start_of_day = datetime_representation.replace(hour=0, minute=0, second=0, microsecond=0)
start_of_day_ms = int(start_of_day.timestamp() * 1000)
accum_ms_for_utc_alignment = order_processed_ms - start_of_day_ms
else:
accum_ms_for_utc_alignment = order_processed_ms - midday_ms
# Start with open_ms equal to accum_ms (assuming positions are open from the start)
new_cp = PerfCheckpoint(last_update_ms=order_processed_ms, prev_portfolio_ret=current_portfolio_value,
mdd=point_in_time_dd, prev_portfolio_spread_fee=current_portfolio_fee_spread,
prev_portfolio_carry_fee=current_portfolio_carry, accum_ms=accum_ms_for_utc_alignment, mpv=1.0)
self.cps.append(new_cp)
def compute_delta_between_ticks(self, cur: float, prev: float):
return math.log(cur / prev)
def purge_old_cps(self):
while self.get_total_ledger_duration_ms() > self.target_ledger_window_ms:
bt.logging.trace(
f"Purging old perf cp {self.cps[0]}. Total ledger duration: {self.get_total_ledger_duration_ms()}. Target ledger window: {self.target_ledger_window_ms}")
self.cps = self.cps[1:] # Drop the first cp (oldest)
def trim_checkpoints(self, cutoff_ms: int):
new_cps = []
any_changes = False
for cp in self.cps:
if cp.lowerbound_time_created_ms + self.target_cp_duration_ms >= cutoff_ms:
any_changes = True
continue
new_cps.append(cp)
if any_changes:
self.cps = new_cps
self.init_max_portfolio_value()
def update_pl(self, current_portfolio_value: float, now_ms: int, miner_hotkey: str, any_open: TradePairReturnStatus,
current_portfolio_fee_spread: float, current_portfolio_carry: float, tp_debug=None, debug_dict=None):
if len(self.cps) == 0:
self.init_with_first_order(now_ms, point_in_time_dd=1.0, current_portfolio_value=1.0,
current_portfolio_fee_spread=1.0, current_portfolio_carry=1.0)
prev_max_return = self.max_return
last_portfolio_return = self.cps[-1].prev_portfolio_ret
prev_mdd = CacheController.calculate_drawdown(last_portfolio_return, prev_max_return)
self.max_return = max(self.max_return, current_portfolio_value)
point_in_time_dd = CacheController.calculate_drawdown(current_portfolio_value, self.max_return)
if not point_in_time_dd:
time_formatted = TimeUtil.millis_to_verbose_formatted_date_str(now_ms)
raise Exception(f'point_in_time_dd is {point_in_time_dd} at time {time_formatted}. '
f'any_open: {any_open}, prev_portfolio_value {self.cps[-1].prev_portfolio_ret}, '
f'current_portfolio_value: {current_portfolio_value}, self.max_return: {self.max_return}, debug_dict: {debug_dict}')
if len(self.cps) == 0:
self.init_with_first_order(now_ms, point_in_time_dd, current_portfolio_value, current_portfolio_fee_spread,
current_portfolio_carry)
return
time_since_last_update_ms = now_ms - self.cps[-1].last_update_ms
assert time_since_last_update_ms >= 0, self.cps
"""
if time_since_last_update_ms == 0 and last_portfolio_return != current_portfolio_value:
bt.logging.warning(f"Update called with no time delta since last update for miner {miner_hotkey}. n_cps: {len(self.cps)}, tp_debug {tp_debug} "
f"last_update_ms: {self.cps[-1].last_update_ms}, "
f"current_portfolio_value: {current_portfolio_value}, "
f"prev_portfolio_ret: {last_portfolio_return}, "
f"any_open: {any_open}, current_portfolio_fee_spread: {current_portfolio_fee_spread}, "
f"current_portfolio_carry: {current_portfolio_carry}")
return
"""
if time_since_last_update_ms + self.cps[-1].accum_ms > self.target_cp_duration_ms:
# Need to fill void - complete current checkpoint and create new ones
# Validate that we're working with 12-hour checkpoints
if self.target_cp_duration_ms != 43200000: # 12 hours in milliseconds
raise Exception(f"Checkpoint boundary alignment only supports 12-hour checkpoints, "
f"but target_cp_duration_ms is {self.target_cp_duration_ms} ms "
f"({self.target_cp_duration_ms / 3600000:.1f} hours)")
# Step 1: Complete the current checkpoint by aligning to 12-hour boundary
# Find the next 12-hour boundary
next_boundary = TimeUtil.align_to_12hour_checkpoint_boundary(self.cps[-1].last_update_ms)
if next_boundary > now_ms:
raise Exception(
f"Cannot align checkpoint: next boundary {next_boundary} ({TimeUtil.millis_to_formatted_date_str(next_boundary)}) "
f"exceeds current time {now_ms} ({TimeUtil.millis_to_formatted_date_str(now_ms)})")
# Update the current checkpoint to end at the boundary
delta_to_boundary = self.target_cp_duration_ms - self.cps[-1].accum_ms
self.cps[-1].last_update_ms = next_boundary
self.cps[-1].accum_ms = self.target_cp_duration_ms
# Complete the current checkpoint using last_portfolio_return (no change in value during void)
# The current checkpoint should be filled to the boundary but without value changes
# Only the final checkpoint after void filling gets the new portfolio value
if any_open > TradePairReturnStatus.TP_MARKET_NOT_OPEN:
self.cps[-1].open_ms += delta_to_boundary
# Step 2: Create full 12-hour checkpoints for the void period
current_boundary = next_boundary
# During void periods, portfolio value remains constant at last_portfolio_return
# Do NOT update last_portfolio_return to current_portfolio_value yet
while now_ms - current_boundary > self.target_cp_duration_ms:
current_boundary += self.target_cp_duration_ms
new_cp = PerfCheckpoint(
last_update_ms=current_boundary,
prev_portfolio_ret=last_portfolio_return, # Keep constant during void
prev_portfolio_spread_fee=self.cps[-1].prev_portfolio_spread_fee,
prev_portfolio_carry_fee=self.cps[-1].prev_portfolio_carry_fee,
accum_ms=self.target_cp_duration_ms,
open_ms=0, # No market data for void periods
mdd=prev_mdd,
mpv=last_portfolio_return
)
assert new_cp.last_update_ms % self.target_cp_duration_ms == 0, f"Checkpoint not aligned: {new_cp.last_update_ms}"
self.cps.append(new_cp)
# Step 3: Create final partial checkpoint from last boundary to now
time_since_boundary = now_ms - current_boundary
assert 0 <= time_since_boundary <= self.target_cp_duration_ms
final_open_ms = time_since_boundary if any_open > TradePairReturnStatus.TP_MARKET_NOT_OPEN else 0
# Calculate MDD for this checkpoint period based on the change from boundary to now
# MDD should be the worst decline within this checkpoint period
new_cp = PerfCheckpoint(
last_update_ms=now_ms,
prev_portfolio_ret=last_portfolio_return, # old for now, update below
prev_portfolio_spread_fee=self.cps[-1].prev_portfolio_spread_fee, # old for now update below
prev_portfolio_carry_fee=self.cps[-1].prev_portfolio_carry_fee, # old for now update below
carry_fee_loss=0, # 0 for now, update below
spread_fee_loss=0, # 0 for now, update below
n_updates = 0, # 0 for now, update below
gain=0, # 0 for now, update below
loss=0, # 0 for now, update below
mdd=prev_mdd, # old for now update below
mpv=last_portfolio_return, # old for now, update below
accum_ms=time_since_boundary,
open_ms=final_open_ms,
)
self.cps.append(new_cp)
else:
# Nominal update. No void to fill
current_cp = self.cps[-1]
# Calculate time since this checkpoint's last update
time_to_accumulate = now_ms - current_cp.last_update_ms
if time_to_accumulate < 0:
bt.logging.error(f"Negative accumulated time: {time_to_accumulate} for miner {miner_hotkey}."
f" start_time_ms: {self.start_time_ms}, now_ms: {now_ms}")
time_to_accumulate = 0
current_cp.accum_ms += time_to_accumulate
# Update open_ms only when market is actually open
if any_open > TradePairReturnStatus.TP_MARKET_NOT_OPEN:
current_cp.open_ms += time_to_accumulate
current_cp = self.cps[-1] # Get the current checkpoint after updates
current_cp.mdd = min(current_cp.mdd, point_in_time_dd)
# Update gains/losses based on portfolio value change
n_updates = 1
delta_return = self.compute_delta_between_ticks(current_portfolio_value, current_cp.prev_portfolio_ret)
if delta_return > 0:
current_cp.gain += delta_return
elif delta_return < 0:
current_cp.loss += delta_return
else:
n_updates = 0
# Update fee losses
if current_cp.prev_portfolio_carry_fee != current_portfolio_carry:
current_cp.carry_fee_loss += self.compute_delta_between_ticks(current_portfolio_carry,
current_cp.prev_portfolio_carry_fee)
if current_cp.prev_portfolio_spread_fee != current_portfolio_fee_spread:
current_cp.spread_fee_loss += self.compute_delta_between_ticks(current_portfolio_fee_spread,
current_cp.prev_portfolio_spread_fee)
# Update portfolio values
current_cp.prev_portfolio_ret = current_portfolio_value
current_cp.last_update_ms = now_ms
current_cp.prev_portfolio_spread_fee = current_portfolio_fee_spread
current_cp.prev_portfolio_carry_fee = current_portfolio_carry
current_cp.mpv = max(current_cp.mpv, current_portfolio_value)
current_cp.n_updates += n_updates
def count_events(self):
# Return the number of events currently stored
return len(self.cps)
def get_product_of_gains(self):
cumulative_gains = sum(cp.gain for cp in self.cps)
return math.exp(cumulative_gains)
def get_product_of_loss(self):
cumulative_loss = sum(cp.loss for cp in self.cps)
return math.exp(cumulative_loss)
def get_total_product(self):
cumulative_gains = sum(cp.gain for cp in self.cps)
cumulative_loss = sum(cp.loss for cp in self.cps)
return math.exp(cumulative_gains + cumulative_loss)
def get_total_ledger_duration_ms(self):
return sum(cp.accum_ms for cp in self.cps)
class PerfLedgerManager(CacheController):
def __init__(self, metagraph, ipc_manager=None, running_unit_tests=False, shutdown_dict=None,
perf_ledger_hks_to_invalidate=None, live_price_fetcher=None, position_manager=None,
use_slippage=None,
enable_rss=True, is_backtesting=False, parallel_mode=ParallelizationMode.SERIAL, secrets=None,
build_portfolio_ledgers_only=False, target_ledger_window_ms=ValiConfig.TARGET_LEDGER_WINDOW_MS,
is_testing=False):
super().__init__(metagraph=metagraph, running_unit_tests=running_unit_tests, is_backtesting=is_backtesting)
self.shutdown_dict = shutdown_dict
self.live_price_fetcher = live_price_fetcher
self.running_unit_tests = running_unit_tests
self.enable_rss = enable_rss
self.parallel_mode = parallel_mode
self.use_slippage = use_slippage
self.is_testing = is_testing
position_file.ALWAYS_USE_SLIPPAGE = use_slippage
self.build_portfolio_ledgers_only = build_portfolio_ledgers_only
if perf_ledger_hks_to_invalidate is None:
self.perf_ledger_hks_to_invalidate = {}
else:
self.perf_ledger_hks_to_invalidate = perf_ledger_hks_to_invalidate
if ipc_manager:
self.pl_elimination_rows = ipc_manager.list()
self.hotkey_to_perf_bundle = ipc_manager.dict()
else:
self.pl_elimination_rows = []
self.hotkey_to_perf_bundle = {}
self.running_unit_tests = running_unit_tests
self.position_manager = position_manager
self.pds = live_price_fetcher.polygon_data_service if live_price_fetcher else None # Load it later once the process starts so ipc works.
self.live_price_fetcher = live_price_fetcher # For unit tests only
# Every update, pick a hotkey to rebuild in case polygon 1s candle data changed.
self.trade_pair_to_price_info = {'second':{}, 'minute':{}}
self.trade_pair_to_position_ret = {}
self.random_security_screenings = set()
self.market_calendar = UnifiedMarketCalendar()
self.n_api_calls = 0
self.POLYGON_MAX_CANDLE_LIMIT = 49999
self.UPDATE_LOOKBACK_MS = 600000 # 10 minutes ago. Want to give Polygon time to create candles on the backend.
self.UPDATE_LOOKBACK_S = self.UPDATE_LOOKBACK_MS // 1000
self.now_ms = 0 # The largest timestamp we want to buffer candles for. time.time() - UPDATE_LOOKBACK_S
#self.base_dd_stats = {'worst_dd':1.0, 'last_dd':0, 'mrpv':1.0, 'n_closed_pos':0, 'n_checks':0, 'current_portfolio_return': 1.0}
#self.hk_to_dd_stats = defaultdict(lambda: deepcopy(self.base_dd_stats))
self.candidate_pl_elimination_rows = []
self.hk_to_last_order_processed_ms = {}
self.mode_to_n_updates = {}
self.update_to_n_open_positions = {}
self.position_uuid_to_cache = defaultdict(FeeCache)
self.target_ledger_window_ms = target_ledger_window_ms
if self.is_backtesting or self.parallel_mode != ParallelizationMode.SERIAL:
pass
else:
bt.logging.info(f"Running performance ledger manager with mode {self.parallel_mode.name}")
initial_perf_ledgers = self.get_perf_ledgers(from_disk=True, portfolio_only=False)
for k, v in initial_perf_ledgers.items():
self.hotkey_to_perf_bundle[k] = v
# ipc list does not update the object without using __setitem__
temp = self.get_perf_ledger_eliminations(first_fetch=True)
self.pl_elimination_rows.extend(temp)
for i, x in enumerate(temp):
self.pl_elimination_rows[i] = x
if secrets:
self.secrets = secrets
else:
self.secrets = ValiUtils.get_secrets(running_unit_tests=self.running_unit_tests)
@staticmethod
def print_bundles(ans: dict[str, dict[str, PerfLedger]]):
for hk, bundle in ans.items():
print(f'-----------({hk})-----------')
PerfLedgerManager.print_bundle(hk, bundle)
@staticmethod
def print_bundle(hk:str, bundle: dict[str, PerfLedger]):
bt.logging.success(f'Hotkey: {hk}. Max return: {bundle[TP_ID_PORTFOLIO].max_return}. Initialization time: {TimeUtil.millis_to_timestamp(bundle[TP_ID_PORTFOLIO].initialization_time_ms)}')
for tp_id, pl in sorted(bundle.items(), key=lambda x: 1 if x[0] == TP_ID_PORTFOLIO else ord(x[0][0]) / 27):
bt.logging.info(f' --{tp_id}-- ')
for idx, x in enumerate(pl.cps):
last_update_formatted = TimeUtil.millis_to_timestamp(x.last_update_ms)
if 1:#idx == 0 or idx == len(pl.cps) - 1:
bt.logging.info(f' {idx} {last_update_formatted} {x}')
bt.logging.info(tp_id, f'max_perf_ledger_return: {pl.max_return}')
def _is_v1_perf_ledger(self, ledger_value):
if self.build_portfolio_ledgers_only:
return False
ans = False
if 'initialization_time_ms' in ledger_value:
ans = True
# "Faked" v2 ledger
elif 'portfolio' in ledger_value and len(ledger_value) == 1:
ans = True
return ans
def get_perf_ledgers(self, portfolio_only=True, from_disk=False) -> dict[str, dict[str, PerfLedger]] | dict[str, PerfLedger]:
ret = {}
if from_disk:
file_path = ValiBkpUtils.get_perf_ledgers_path(self.running_unit_tests)
if not os.path.exists(file_path):
return ret
with open(file_path, 'r') as file:
data = json.load(file)
for hk, possible_bundles in data.items():
if self._is_v1_perf_ledger(possible_bundles):
if portfolio_only:
ret[hk] = PerfLedger.from_dict(possible_bundles) # v1 is portfolio ledgers. Fake it.
else:
# Incompatible but we can fake it for now.
if 'initialization_time_ms' in possible_bundles:
ret[hk] = {TP_ID_PORTFOLIO: PerfLedger.from_dict(possible_bundles)}
elif TP_ID_PORTFOLIO in possible_bundles:
ret[hk] = {TP_ID_PORTFOLIO: PerfLedger.from_dict(possible_bundles[TP_ID_PORTFOLIO])}
else:
if portfolio_only:
ret[hk] = PerfLedger.from_dict(possible_bundles[TP_ID_PORTFOLIO])
else:
ret[hk] = {k: PerfLedger.from_dict(v) for k, v in possible_bundles.items()}
return ret
# Everything here is in v2 format
if portfolio_only:
dat = dict(self.hotkey_to_perf_bundle)
return {hk: bundle[TP_ID_PORTFOLIO] for hk, bundle in dat.items()}
else:
return dict(self.hotkey_to_perf_bundle)
def filtered_ledger_for_scoring(
self,
portfolio_only: bool = False,
hotkeys: List[str] = None
) -> dict[str, PerfLedger]:
"""
Filter the ledger for a set of hotkeys.
"""
if hotkeys is None:
hotkeys = self.metagraph.hotkeys
# Note, eliminated miners will not appear in the dict below
filtered_ledger = {}
for hotkey, miner_portfolio_ledger in self.get_perf_ledgers(portfolio_only=False).items():
if hotkey not in hotkeys:
continue
if hotkey in self.perf_ledger_hks_to_invalidate:
bt.logging.warning(f"Skipping hotkey {hotkey} in filtered_ledger_for_scoring due to invalidation.")
continue
if miner_portfolio_ledger is None:
continue
miner_overall_ledger = miner_portfolio_ledger.get("portfolio", PerfLedger())
if len(miner_overall_ledger.cps) == 0:
continue
if portfolio_only:
filtered_ledger[hotkey] = miner_overall_ledger
else:
filtered_ledger[hotkey] = miner_portfolio_ledger
return filtered_ledger
def clear_perf_ledgers_from_disk(self):
assert self.running_unit_tests, 'this is only valid for unit tests'
self.hotkey_to_perf_bundle = {}
file_path = ValiBkpUtils.get_perf_ledgers_path(self.running_unit_tests)
if os.path.exists(file_path):
ValiBkpUtils.write_file(file_path, {})
for k in list(self.hotkey_to_perf_bundle.keys()):
del self.hotkey_to_perf_bundle[k]
@staticmethod
def clear_perf_ledgers_from_disk_autosync(hotkeys:list):
file_path = ValiBkpUtils.get_perf_ledgers_path()
filtered_data = {}
if os.path.exists(file_path):
with open(file_path, 'r') as file:
existing_data = json.load(file)
for hk, bundles in existing_data.items():
if hk in hotkeys:
filtered_data[hk] = bundles
ValiBkpUtils.write_file(file_path, filtered_data)
def run_update_loop(self):
setproctitle(f"vali_{self.__class__.__name__}")
bt.logging.enable_info()
while not self.shutdown_dict:
try:
if self.refresh_allowed(ValiConfig.PERF_LEDGER_REFRESH_TIME_MS):
self.update()
self.set_last_update_time(skip_message=True)
except Exception as e:
# Handle exceptions or log errors
bt.logging.error(f"Error during perf ledger update: {e}. Please alert a team member ASAP!")
bt.logging.error(traceback.format_exc())
time.sleep(30)
time.sleep(1)
def get_historical_position(self, position: Position, timestamp_ms: int):
hk = position.miner_hotkey # noqa: F841
new_orders = []
position_at_start_timestamp = deepcopy(position)
position_at_end_timestamp = deepcopy(position)
for o in position.orders:
if o.processed_ms <= timestamp_ms:
new_orders.append(o)
position_at_start_timestamp.orders = new_orders[:-1]
position_at_start_timestamp.rebuild_position_with_updated_orders()
position_at_end_timestamp.orders = new_orders
position_at_end_timestamp.rebuild_position_with_updated_orders()
# Handle position that was forced closed due to realtime data (liquidated)
if len(new_orders) == len(position.orders) and position.return_at_close == 0:
position_at_end_timestamp.return_at_close = 0
position_at_end_timestamp.close_out_position(position.close_ms)
return position_at_start_timestamp, position_at_end_timestamp
def generate_order_timeline(self, positions: list[Position], now_ms: int, hk: str) -> (list[tuple], int):
# order to understand timestamps needing checking, position to understand returns per timestamp (will be adjusted)
# (order, position)
time_sorted_orders = []
last_event_time_ms = 0
for p in positions:
last_event_time_ms = max(p.orders[-1].processed_ms, last_event_time_ms)
if p.is_closed_position and len(p.orders) < 2:
bt.logging.info(f"perf ledger generate_order_timeline. Skipping closed position for hk {hk} with < 2 orders: {p}")
continue
for o in p.orders:
if o.processed_ms <= now_ms:
time_sorted_orders.append((o, p))
# sort
time_sorted_orders.sort(key=lambda x: x[0].processed_ms)
return time_sorted_orders, last_event_time_ms
def _can_shortcut(self, tp_to_historical_positions: dict[str: Position], end_time_ms: int,
realtime_position_to_pop: Position | None, start_time_ms: int, perf_ledger_bundle: dict[str, PerfLedger]) -> (ShortcutReason, float, float, float, TradePairReturnStatus):
tp_to_return = {}
tp_to_spread_fee = {}
tp_to_carry_fee = {}
for k in list(tp_to_historical_positions.keys()) + [TP_ID_PORTFOLIO]:
tp_to_return[k] = 1.0
tp_to_spread_fee[k] = 1.0
tp_to_carry_fee[k] = 1.0
n_open_positions = 0
# Set now_ms to end_time_ms when backtesting for historical perf ledger generation
if self.is_backtesting:
ledger_cutoff_ms = end_time_ms
else:
ledger_cutoff_ms = TimeUtil.now_in_millis() - perf_ledger_bundle[TP_ID_PORTFOLIO].target_ledger_window_ms
n_positions = 0
n_closed_positions = 0
n_positions_newly_opened = 0
any_open : TradePairReturnStatus = TradePairReturnStatus.TP_MARKET_NOT_OPEN
for tp_id, historical_positions in tp_to_historical_positions.items():
for i, historical_position in enumerate(historical_positions):
n_positions += 1
if len(historical_position.orders) == 0:
n_positions_newly_opened += 1
elif historical_position.is_open_position:
n_open_positions += 1
else:
n_closed_positions += 1
if realtime_position_to_pop and tp_id == realtime_position_to_pop.trade_pair.trade_pair_id and i == len(historical_positions) - 1:
historical_position = realtime_position_to_pop
for tp_id in [TP_ID_PORTFOLIO, tp_id]:
csf, _ = self.position_uuid_to_cache[historical_position.position_uuid].get_spread_fee(historical_position, end_time_ms)
tp_to_spread_fee[tp_id] *= csf
ccf, _ = self.position_uuid_to_cache[historical_position.position_uuid].get_carry_fee(end_time_ms, historical_position)
tp_to_carry_fee[tp_id] *= ccf
tp_to_return[tp_id] *= historical_position.return_at_close
self.trade_pair_to_position_ret[tp_id] = historical_position.return_at_close
assert tp_to_carry_fee[TP_ID_PORTFOLIO] > 0, (tp_to_carry_fee[TP_ID_PORTFOLIO], tp_to_spread_fee[TP_ID_PORTFOLIO])
reason = ''
ans = ShortcutReason.NO_SHORTCUT
# When building from orders, we will always have at least one open position. When opening a position after a
# period of all closed positions, we can shortcut by identifying that the new position is the only open position
# and all other positions are closed. The time before this period, we have only closed positions.
# Alternatively, we can be attempting to build the ledger after all orders have been accounted for. In this
# case, we simply need to check if all positions are closed.
if n_open_positions == 0:
if n_positions_newly_opened not in (0, 1):
for tp, historical_positions in tp_to_historical_positions.items():
for i, historical_position in enumerate(historical_positions):
if len(historical_position.orders) == 0:
print(historical_position)
raise Exception(f'n_positions_newly_opened should be 0 or 1 but got {n_positions_newly_opened}')
reason += 'No open positions. '
ans = ShortcutReason.NO_OPEN_POSITIONS
any_open = TradePairReturnStatus.TP_NO_OPEN_POSITIONS
# This window would be dropped anyway
if (end_time_ms < ledger_cutoff_ms):
reason += 'Ledger cutoff. '
ans = ShortcutReason.OUTSIDE_WINDOW
# simultaneous orders were placed
if start_time_ms == end_time_ms:
reason += 'start_time_ms == end_time_ms. Simultaneous orders.'
ans = ShortcutReason.ZERO_TIME_DELTA
#print('start and end time the same.')
#for tp, positions in tp_to_historical_positions.items():
# for p in positions:
# if realtime_position_to_pop and realtime_position_to_pop.trade_pair == p.trade_pair and p.is_open_position:
# p = realtime_position_to_pop
# if any(o.processed_ms == end_time_ms for o in p.orders):
# p2 = deepcopy(p.__dict__)
# orders = p2.pop('orders')
# print(f' tp {tp} position {p2}')
# for o in orders:
# print(f' order {o}')
if 0 and ans != ShortcutReason.NO_SHORTCUT:
for tp_id, historical_positions in tp_to_historical_positions.items():
positions = []
for i, historical_position in enumerate(historical_positions):
if realtime_position_to_pop and tp_id == realtime_position_to_pop.trade_pair.trade_pair_id and i == len(
historical_positions) - 1:
historical_position = realtime_position_to_pop
foo = True
else:
foo = False
positions.append((historical_position.position_uuid, [x.price for x in historical_position.orders],
historical_position.return_at_close, foo, historical_position.is_open_position))
print(f'{tp_id}: {positions}')
final_cp = None
if perf_ledger_bundle and TP_ID_PORTFOLIO in perf_ledger_bundle and perf_ledger_bundle[TP_ID_PORTFOLIO].cps:
final_cp = perf_ledger_bundle[TP_ID_PORTFOLIO].cps[-1]
print('---------------------------------------------------------------------')
print(f' Skipping ({reason}) with n_positions: {n_positions} n_open_positions: {n_open_positions} n_closed_positions: '
f'{n_closed_positions}, n_positions_newly_opened: {n_positions_newly_opened}, '
f'start_time_ms: {TimeUtil.millis_to_formatted_date_str(start_time_ms)} ({start_time_ms}) , '
f'end_time_ms: {TimeUtil.millis_to_formatted_date_str(end_time_ms)} ({end_time_ms}) , '
f'portfolio_value: {tp_to_return[TP_ID_PORTFOLIO]} '
f'ledger_cutoff_ms: {TimeUtil.millis_to_formatted_date_str(ledger_cutoff_ms)}, '
f'realtime_position_to_pop.trade_pair.trade_pair: {realtime_position_to_pop.trade_pair.trade_pair if realtime_position_to_pop else None}, '
f'trade_pair_to_position_ret: {self.trade_pair_to_position_ret} '
f'final portfolio cp {final_cp}')
print('---------------------------------------------------------------------')
return ans, tp_to_return, tp_to_spread_fee, tp_to_carry_fee, any_open
def new_window_intersects_old_window(self, start_time_ms, end_time_ms, existing_lb_ms, existing_ub_ms):
# Check if new window intersects with the old window
# An intersection occurs if the start of the new window is before the end of the old window,
# and the end of the new window is after the start of the old window
return start_time_ms <= existing_ub_ms and end_time_ms >= existing_lb_ms
def align_t_ms_to_mode(self, t_ms, mode):
if mode == 'second':
return t_ms - (t_ms % 1000)
elif mode == 'minute':
return t_ms - (t_ms % 60000)
else:
raise Exception(f"Unknown mode: {mode}")
def refresh_price_info(self, t_ms, end_time_ms, tp, mode):
def populate_price_info(pi, price_info_raw):
for a in price_info_raw:
pi[a.timestamp] = a.close
min_candles_per_request = 3600 if mode == 'second' else 1440
existing_lb_ms = None
existing_ub_ms = None
existing_window_ms = None
if tp.trade_pair_id in self.trade_pair_to_price_info[mode]:
price_info = self.trade_pair_to_price_info[mode][tp.trade_pair_id]
existing_ub_ms = price_info['ub_ms']
existing_lb_ms = price_info['lb_ms']
existing_window_ms = existing_ub_ms - existing_lb_ms
if existing_lb_ms <= t_ms <= existing_ub_ms: # No refresh needed
return
#else:
# print('11111', tp.trade_pair, trade_pair_to_price_info.keys())
start_time_ms = t_ms
requested_milliseconds = end_time_ms - start_time_ms
n_candles_requested = requested_milliseconds // 1000 if mode == 'second' else requested_milliseconds // 60000
if n_candles_requested > self.POLYGON_MAX_CANDLE_LIMIT: # Polygon limit
end_time_ms = start_time_ms + self.POLYGON_MAX_CANDLE_LIMIT * 1000 if mode == 'second' else start_time_ms + self.POLYGON_MAX_CANDLE_LIMIT * 60000
elif n_candles_requested < min_candles_per_request: # Get a batch of candles to minimize number of fetches
offset = min_candles_per_request * 1000 if mode == 'second' else min_candles_per_request * 60000
end_time_ms = start_time_ms + offset
end_time_ms = min(int(self.now_ms), end_time_ms) # Don't fetch candles beyond check time or will fill in null.
#t0 = time.time()
#print(f"Starting #{requested_seconds} candle fetch for {tp.trade_pair}")
if self.pds is None:
if self.is_testing:
# Create a minimal mock data service for testing
from unittest.mock import Mock
self.pds = Mock()
self.pds.unified_candle_fetcher.return_value = []
self.pds.tp_to_mfs = {}
else:
# Production path - create real price fetcher
live_price_fetcher = LivePriceFetcher(self.secrets, disable_ws=True)
self.pds = live_price_fetcher.polygon_data_service
price_info_raw = self.pds.unified_candle_fetcher(
trade_pair=tp, start_timestamp_ms=start_time_ms, end_timestamp_ms=end_time_ms, timespan=mode)
self.tp_to_mfs.update(self.pds.tp_to_mfs)
self.n_api_calls += 1
#print(f'Fetched candles for tp {tp.trade_pair} for window {TimeUtil.millis_to_formatted_date_str(start_time_ms)} to {TimeUtil.millis_to_formatted_date_str(end_time_ms)}')
#print(f'Got {len(price_info)} candles after request of {requested_seconds} candles for tp {tp.trade_pair} in {time.time() - t0}s')
#assert lb_ms >= start_time_ms, (lb_ms, start_time_ms)
#assert ub_ms <= end_time_ms, (ub_ms, end_time_ms)
# Can we build on top of existing data or should we wipe?
perform_wipe = True
if tp.trade_pair_id in self.trade_pair_to_price_info[mode]:
new_window_size_ms = end_time_ms - start_time_ms
candidate_window_size = new_window_size_ms + existing_window_ms
candidate_n_candles_in_memory = candidate_window_size // 1000 if mode == 'second' else candidate_window_size // 60000
if candidate_n_candles_in_memory < self.POLYGON_MAX_CANDLE_LIMIT and \
self.new_window_intersects_old_window(start_time_ms, end_time_ms, existing_lb_ms, existing_ub_ms):
perform_wipe = False
if perform_wipe:
price_info = {}
populate_price_info(price_info, price_info_raw)
self.trade_pair_to_price_info[mode][tp.trade_pair_id] = price_info
self.trade_pair_to_price_info[mode][tp.trade_pair_id]['lb_ms'] = start_time_ms
self.trade_pair_to_price_info[mode][tp.trade_pair_id]['ub_ms'] = end_time_ms
else:
self.trade_pair_to_price_info[mode][tp.trade_pair_id]['ub_ms'] = max(existing_ub_ms, end_time_ms)
self.trade_pair_to_price_info[mode][tp.trade_pair_id]['lb_ms'] = min(existing_lb_ms, start_time_ms)
populate_price_info(self.trade_pair_to_price_info[mode][tp.trade_pair_id], price_info_raw)
#print(f'Fetched {requested_seconds} s of candles for tp {tp.trade_pair} in {time.time() - t0}s')
#print('22222', tp.trade_pair, trade_pair_to_price_info.keys())
def positions_to_portfolio_return(self, possible_tp_ids, tp_to_historical_positions_dense: dict[str: Position],
t_ms, mode, end_time_ms, tp_to_initial_return, tp_to_initial_spread_fee,
tp_to_initial_carry_fee):
# Answers "What is the portfolio return at this time t_ms?"
tp_to_any_open : dict[str, TradePairReturnStatus] = {x: TradePairReturnStatus.TP_NO_OPEN_POSITIONS for x in possible_tp_ids}
tp_to_return = tp_to_initial_return.copy()
tp_to_spread_fee = tp_to_initial_spread_fee.copy()
tp_to_carry_fee = tp_to_initial_carry_fee.copy()
t_ms = self.align_t_ms_to_mode(t_ms, mode)
for tp_id, historical_positions in tp_to_historical_positions_dense.items():
assert len(historical_positions) < 2, ('maybe a recently opened position?', historical_positions)
# Determine which IDs to update for this trade pair
tp_ids_to_build = [TP_ID_PORTFOLIO] if self.build_portfolio_ledgers_only else [tp_id, TP_ID_PORTFOLIO]
for historical_position in historical_positions:
if self.shutdown_dict:
return tp_to_return, tp_to_any_open, tp_to_spread_fee, tp_to_carry_fee
# Calculate fees for this position
position_spread_fee, psf_updated = self.position_uuid_to_cache[historical_position.position_uuid].get_spread_fee(historical_position, t_ms)
position_carry_fee, pcf_updated = self.position_uuid_to_cache[historical_position.position_uuid].get_carry_fee(t_ms, historical_position)
# Apply fees to the appropriate IDs
for x in tp_ids_to_build:
tp_to_spread_fee[x] *= position_spread_fee
tp_to_carry_fee[x] *= position_carry_fee
# Check if market is open
if not self.market_calendar.is_market_open(historical_position.trade_pair, t_ms):
for x in tp_ids_to_build:
tp_to_return[x] *= historical_position.return_at_close
# Only update to MARKET_NOT_OPEN if we haven't seen any open positions yet
if tp_to_any_open[x] == TradePairReturnStatus.TP_NO_OPEN_POSITIONS:
tp_to_any_open[x] = TradePairReturnStatus.TP_MARKET_NOT_OPEN
continue
# Market is open - fetch price info
self.refresh_price_info(t_ms, end_time_ms, historical_position.trade_pair, mode)
price_at_t_ms = self.trade_pair_to_price_info[mode][tp_id].get(t_ms)
# Determine if price changed
price_changed = False
if price_at_t_ms is not None:
prev_price = self.tp_to_last_price.get(tp_id, None)
price_changed = price_at_t_ms != prev_price
self.tp_to_last_price[tp_id] = price_at_t_ms
# Update position returns based on whether price changed
if price_changed:
historical_position.set_returns(price_at_t_ms, time_ms=t_ms, total_fees=position_spread_fee * position_carry_fee)
self.trade_pair_to_position_ret[tp_id] = historical_position.return_at_close
else:
historical_position.set_returns_with_updated_fees(position_spread_fee * position_carry_fee, t_ms)
# Update returns for all relevant IDs
for x in tp_ids_to_build:
tp_to_return[x] *= historical_position.return_at_close
# Update status based on price change
# Use the enum ordering to ensure we keep the highest priority status
if price_changed:
for x in tp_ids_to_build:
if tp_to_any_open[x] < TradePairReturnStatus.TP_MARKET_OPEN_PRICE_CHANGE:
tp_to_any_open[x] = TradePairReturnStatus.TP_MARKET_OPEN_PRICE_CHANGE
else:
# Market is open but no price change
for x in tp_ids_to_build:
if tp_to_any_open[x] < TradePairReturnStatus.TP_MARKET_OPEN_NO_PRICE_CHANGE:
tp_to_any_open[x] = TradePairReturnStatus.TP_MARKET_OPEN_NO_PRICE_CHANGE
return tp_to_return, tp_to_any_open, tp_to_spread_fee, tp_to_carry_fee
def check_liquidated(self, miner_hotkey, portfolio_return, t_ms, tp_to_historical_positions):
if portfolio_return == 0:
bt.logging.warning(f"Portfolio value is {portfolio_return} for miner {miner_hotkey} at {t_ms}. Eliminating miner.")
elimination_row = self.generate_elimination_row(miner_hotkey, 0.0, EliminationReason.LIQUIDATED.value, t_ms=t_ms, price_info=self.tp_to_last_price, return_info={'dd_stats': {}, 'returns': self.trade_pair_to_position_ret})
self.candidate_pl_elimination_rows.append(elimination_row)
self.candidate_pl_elimination_rows[-1] = elimination_row # Trigger the update on the multiprocessing Manager
#self.hk_to_dd_stats[miner_hotkey]['eliminated'] = True
for _, v in tp_to_historical_positions.items():
for pos in v:
print(
f" time {TimeUtil.millis_to_formatted_date_str(t_ms)} hk {miner_hotkey[-5:]} {pos.trade_pair.trade_pair_id} return {pos.current_return} return_at_close {pos.return_at_close} closed@{'NA' if pos.is_open_position else TimeUtil.millis_to_formatted_date_str(pos.orders[-1].processed_ms)}")
return True
return False
def init_tp_to_last_price(self, tp_to_historical_positions: dict[str: Position]):
self.tp_to_last_price = {}
for k, v in tp_to_historical_positions.items():
last_pos = v[-1]
if not last_pos:
continue
orders = last_pos.orders
if not orders:
continue
last_order_price = orders[-1].price
self.tp_to_last_price[k] = last_order_price
def condense_positions(self, tp_ids_to_build, tp_to_historical_positions: dict[str: Position]) -> (float, float, float, dict[str: Position]):
tp_to_initial_return = {x: 1.0 for x in tp_ids_to_build}
tp_to_initial_spread_fee = {x: 1.0 for x in tp_ids_to_build}
tp_to_initial_carry_fee = {x: 1.0 for x in tp_ids_to_build}
tp_to_historical_positions_dense = {}
open_positions_tp_ids = set()
for tp_id, historical_positions in tp_to_historical_positions.items():
dense_positions = []
for historical_position in historical_positions:
if historical_position.is_closed_position:
tp_ids_to_build = [TP_ID_PORTFOLIO] if self.build_portfolio_ledgers_only else [tp_id, TP_ID_PORTFOLIO]
for x in tp_ids_to_build:
tp_to_initial_return[x] *= historical_position.return_at_close
tp_to_initial_spread_fee[x] *= self.position_uuid_to_cache[historical_position.position_uuid].get_spread_fee(historical_position, historical_position.orders[-1].processed_ms)[0]
tp_to_initial_carry_fee[x] *= self.position_uuid_to_cache[historical_position.position_uuid].get_carry_fee(historical_position.orders[-1].processed_ms, historical_position)[0]
elif len(historical_position.orders) == 0:
continue