forked from Reecepbcups/interchain-indexer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.py
81 lines (60 loc) · 2.06 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import json
import os
from shutil import which
import httpx
current_dir = os.path.dirname(os.path.realpath(__file__))
def run_decode_file(
COSMOS_BINARY_FILE: str, file_loc: str, output_file_loc: str
) -> dict:
# TODO: What if I just return to stdout instead of an output file, and speef improvement? Need to benchmark
res = os.popen(
f"{COSMOS_BINARY_FILE} tx decode-file {file_loc} {output_file_loc}"
).read()
with open(output_file_loc, "r") as f:
return json.load(f)
def command_exists(cmd):
if which(cmd) == None:
return False
return True
def get_sender(msg: dict, WALLET_PREFIX: str, VALOPER_PREFIX: str) -> str | None:
# MultibankSend not yet supported
keys = [
"sender",
"delegator_address",
"from_address",
"grantee",
"voter",
"signer",
"depositor",
"proposer",
]
for key in keys:
if key in msg.keys():
return msg[key]
# tries to find the sender in the msg even if the key is not found
for key, value in msg.items():
if not isinstance(value, str):
continue
if not (value.startswith(WALLET_PREFIX) or value.startswith(VALOPER_PREFIX)):
continue
# smart contracts are ignored. junovaloper1 = 50 characters.
if len(value) > 50:
continue
return value
# write error to file if there is no sender found (we need to add this type)
with open(os.path.join(current_dir, "no_sender_error.txt"), "a") as f:
f.write(str(msg) + "\n\n")
return None
def get_latest_chain_height(RPC_ARCHIVE: str) -> int:
r = httpx.get(f"{RPC_ARCHIVE}/abci_info?")
if r.status_code != 200:
# TODO: backup RPC?
print(
f"Error: get_latest_chain_height status_code: {r.status_code} @ {RPC_ARCHIVE}. Exiting..."
)
exit(1)
current_height = (
r.json().get("result", {}).get("response", {}).get("last_block_height", "-1")
)
print(f"Current Height: {current_height}")
return int(current_height)