-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathutil.py
More file actions
170 lines (128 loc) · 5.09 KB
/
util.py
File metadata and controls
170 lines (128 loc) · 5.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
from typing import Dict, List
from solana.blockhash import Blockhash
from solana.keypair import Keypair
from solana.publickey import PublicKey
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Commitment
from solana.transaction import Transaction
from program_admin.types import (
Network,
PythMappingAccount,
ReferenceOverrides,
ReferencePermissions,
)
MAPPING_ACCOUNT_SIZE = 20536 # https://github.com/pyth-network/pyth-client/blob/b49f73afe32ce8685a3d05e32d8f3bb51909b061/program/src/oracle/oracle.h#L88
MAPPING_ACCOUNT_PRODUCT_LIMIT = 640
PRICE_ACCOUNT_V1_SIZE = 3312
PRICE_ACCOUNT_V2_SIZE = 12576
PRICE_V1_COMP_COUNT = 32
PRICE_V2_COMP_COUNT = 128
PRODUCT_ACCOUNT_SIZE = 512
SOL_LAMPORTS = pow(10, 9)
async def recent_blockhash(client: AsyncClient) -> Blockhash:
blockhash_response = await client.get_latest_blockhash(
commitment=Commitment("finalized")
)
if not blockhash_response.value:
raise RuntimeError("Failed to get recent blockhash")
return Blockhash(str(blockhash_response.value.blockhash))
async def account_exists(rpc_endpoint: str, key: PublicKey) -> bool:
client = AsyncClient(rpc_endpoint)
response = await client.get_account_info(key)
return bool(response.value)
def compute_transaction_size(transaction: Transaction) -> int:
"""
Returns the total over-the-wire size of a transaction
"""
return len(transaction.serialize())
def encode_product_metadata(data: Dict[str, str]) -> bytes:
buffer = b""
for key, value in data.items():
key_bytes = key.encode("utf8")
key_len = len(key_bytes).to_bytes(1, byteorder="little")
value_bytes = value.encode("utf8")
value_len = len(value_bytes).to_bytes(1, byteorder="little")
buffer += key_len + key_bytes + value_len + value_bytes
return buffer
def sort_mapping_account_keys(accounts: List[PythMappingAccount]) -> List[PublicKey]:
"""
Takes a list of mapping accounts and returns a list of mapping account keys
matching the order of the mapping linked list
"""
if not accounts:
return []
# We can easily tell which is the last key (its next key is 0), so start
# from it and build a reverse linked list as a "previous keys" dict.
previous_keys = {}
last_key = None
for account in accounts:
this_key = account.public_key
next_key = account.data.next_mapping_account_key
if next_key == PublicKey(0):
last_key = this_key
previous_keys[next_key] = this_key
if not last_key:
raise RuntimeError("The linked list has no end")
# Now traverse the inverted linked list and build a list in the right order
sorted_keys: List[PublicKey] = []
current: PublicKey = last_key
while len(accounts) != len(sorted_keys):
sorted_keys.insert(0, current)
# There is no previous key to the first key
if previous_keys.get(current):
current = previous_keys[current]
return sorted_keys
def get_available_mapping_account_key(
mapping_accounts: List[PythMappingAccount],
) -> PublicKey:
"""
Returns the first non-full mapping account.
"""
# Create a dictionary with account key as key and product count as value
num_products_by_key = {
account.public_key: account.data.product_count for account in mapping_accounts
}
sorted_keys = sort_mapping_account_keys(mapping_accounts)
for key in sorted_keys:
if num_products_by_key[key] < MAPPING_ACCOUNT_PRODUCT_LIMIT:
return key
raise RuntimeError("All mapping accounts are full")
def apply_overrides(
ref_permissions: ReferencePermissions,
ref_overrides: ReferenceOverrides,
network: Network,
) -> ReferencePermissions:
network_overrides = ref_overrides.get(network, {})
overridden_permissions: ReferencePermissions = {}
for key, value in ref_permissions.items():
if key in network_overrides and not network_overrides[key]:
# Remove all publishers from all account types for this symbol
overridden_permissions[key] = {k: [] for k in value.keys()}
else:
overridden_permissions[key] = value
return overridden_permissions
def get_actual_signers(
signers: List[Keypair], transaction: Transaction
) -> List[Keypair]:
"""
Given a list of keypairs and a transaction, returns the keypairs that actually need to sign the transaction,
i.e. those whose pubkey appears in at least one of the instructions as a signer.
"""
actual_signers = []
for signer in signers:
instruction_has_signer = [
any(
signer.public_key == account.pubkey and account.is_signer
for account in instruction.keys
)
for instruction in transaction.instructions
]
if any(instruction_has_signer):
actual_signers.append(signer)
return actual_signers
def get_permissions_account(program_key, seed):
[permissions_account, _bump] = PublicKey.find_program_address(
[seed],
program_key,
)
return permissions_account