Skip to content

Commit ccea7f4

Browse files
Clean up histogram generator and add classic txn byte sizes (#4927)
Resolves #4869. Starts work toward #4858. We could merge the two csv files that use the same table. Technically, we could rewrite most of the Python logic in BigQuery (which has support for JSON + base64 decoding), but it may be easier to keep it as is with the simple queries and more xdr-specific logic in Python.
2 parents 9d1075b + 83f15f5 commit ccea7f4

File tree

5 files changed

+469
-114
lines changed

5 files changed

+469
-114
lines changed

scripts/.python-version

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.13

scripts/HistogramGenerator.py

Lines changed: 208 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -4,94 +4,127 @@
44
# under the Apache License, Version 2.0. See the COPYING file at the root
55
# of this distribution or at http://www.apache.org/licenses/LICENSE-2.0
66

7+
import argparse
78
from base64 import b64decode
9+
from concurrent.futures import ProcessPoolExecutor
810
import csv
911
import json
10-
from multiprocessing.pool import Pool
11-
from typing import Any, Optional, Tuple
12+
from typing import Any, Callable
13+
from dataclasses import dataclass
1214
import subprocess
13-
import sys
1415

1516
import numpy as np
1617
import numpy.typing as npt
1718

18-
# Sample query to gather history_transactions data:
19-
# SELECT soroban_resources_instructions, soroban_resources_write_bytes, tx_envelope FROM `crypto-stellar.crypto_stellar.history_transactions` WHERE batch_run_date BETWEEN DATETIME("2024-06-24") AND DATETIME("2024-09-24") AND soroban_resources_instructions > 0
20-
21-
# Sample query to gather history_contract_events data:
22-
# SELECT topics_decoded, data_decoded FROM `crypto-stellar.crypto_stellar.history_contract_events` WHERE type = 2 AND TIMESTAMP_TRUNC(closed_at, MONTH) between TIMESTAMP("2024-06-27") AND TIMESTAMP("2024-09-27") AND contains_substr(topics_decoded, "write_entry")
23-
# NOTE: this query filters out anything that isn't a write_entry. This is
24-
# required for the script to work correctly!
25-
26-
# Threads to use for parallel processing
27-
WORKERS=9
28-
29-
# Maximum number of histogram bins to generate
30-
MAX_BINS=100
31-
32-
# Maximum number of histogram bins to output. This is much lower than MAX_BINS,
33-
# because most bins will be empty (and therefore pruned from the output). If
34-
# there are too many bins with nonzero values, the script will reduce the number
35-
# of bins until there are at most MAX_OUTPUT_BINS bins with nonzero values.
36-
MAX_OUTPUT_BINS=10
37-
38-
def decode_xdr(xdr: str) -> dict[str, Any]:
39-
""" Decode a TransactionEnvelope using the stellar-xdr tool. """
40-
decoded = subprocess.check_output(
41-
["stellar-xdr", "decode",
42-
"--type", "TransactionEnvelope",
43-
"--input", "single-base64",
44-
"--output", "json"
45-
],
46-
input=xdr.encode("utf-8"))
47-
return json.loads(decoded)
48-
49-
def process_history_row(row: dict[str, str]) -> Tuple[Optional[Tuple[int, int, int]], Optional[int]]:
19+
20+
@dataclass(slots=True)
21+
class WasmUpload:
22+
size: int
23+
24+
25+
@dataclass(slots=True)
26+
class InvokeTransaction:
27+
instructions: int
28+
write_bytes: int
29+
tx_size: int
30+
31+
32+
def make_decoder() -> Callable[[bytes], dict[str, Any]]:
33+
process = subprocess.Popen(
34+
[
35+
"stellar",
36+
"xdr",
37+
"decode",
38+
"--type",
39+
"TransactionEnvelope",
40+
"--input",
41+
"stream",
42+
"--output",
43+
"json",
44+
],
45+
stdin=subprocess.PIPE,
46+
stdout=subprocess.PIPE,
47+
)
48+
49+
# asserts and assignment to satisfy typechecker that these really are streams
50+
assert process.stdin
51+
assert process.stdout
52+
stdin = process.stdin
53+
stdout = process.stdout
54+
55+
def decode_xdr(xdr: bytes) -> dict[str, Any]:
56+
"""Decode a TransactionEnvelope using the stellar-xdr tool."""
57+
stdin.write(xdr)
58+
stdin.flush()
59+
return json.loads(stdout.readline())
60+
61+
return decode_xdr
62+
63+
64+
# Note: decode_xdr is a global that gets initialized in process_history_row so
65+
# that each Python subprocess runs an independent copy of stellar xdr decode
66+
decode_xdr = None
67+
68+
69+
def process_history_row(
70+
row: dict[str, str],
71+
) -> WasmUpload | InvokeTransaction:
5072
"""
51-
Process a row from the history_transactions table. Returns:
52-
* (None, None) if the row is not a transaction
53-
* (None, wasm_size) if the row is a wasm upload
54-
* ((instructions, write_bytes, tx_size), None) if the row is an invoke
55-
transaction
73+
Process a soroban row from the history_transactions table.
5674
"""
57-
envelope_xdr = row["tx_envelope"]
58-
assert isinstance(envelope_xdr, str)
75+
global decode_xdr
76+
if decode_xdr is None:
77+
decode_xdr = make_decoder()
78+
79+
envelope_xdr = b64decode(row["tx_envelope"], validate=True)
5980
envelope = decode_xdr(envelope_xdr)
60-
if "tx" not in envelope:
61-
# Skip anything that isn't a transaction (such as a fee bump)
62-
return (None, None)
81+
82+
# Grab inner transaction from fee bump frames
83+
if "tx_fee_bump" in envelope:
84+
envelope = envelope["tx_fee_bump"]["tx"]["inner_tx"]
6385
operations = envelope["tx"]["tx"]["operations"]
6486
assert len(operations) == 1
6587
body = operations[0]["body"]
66-
if "invoke_host_function" in body:
67-
ihf = body["invoke_host_function"]
68-
if "upload_contract_wasm" in ihf["host_function"]:
69-
# Count wasm bytes
70-
wasm = ihf["host_function"]["upload_contract_wasm"]
71-
return (None, len(bytes.fromhex(wasm)))
72-
else:
73-
# Treat as a "normal" invoke
74-
instructions = row["soroban_resources_instructions"]
75-
write_bytes = row["soroban_resources_write_bytes"]
76-
return ( ( int(instructions),
77-
int(write_bytes),
78-
len(b64decode(envelope_xdr, validate=True))),
79-
None)
80-
return (None, None)
88+
89+
ihf = body["invoke_host_function"]
90+
if "upload_contract_wasm" in ihf["host_function"]:
91+
# Count wasm bytes
92+
wasm = ihf["host_function"]["upload_contract_wasm"]
93+
return WasmUpload(len(bytes.fromhex(wasm)))
94+
else:
95+
# Treat as a "normal" invoke
96+
instructions = row["soroban_resources_instructions"]
97+
write_bytes = row["soroban_resources_write_bytes"]
98+
return InvokeTransaction(
99+
int(instructions),
100+
int(write_bytes),
101+
len(envelope_xdr),
102+
)
103+
81104

82105
def process_event_row(row: dict[str, str]) -> int:
83106
"""
84107
Process a row from the history_events table. Must already be filtered to
85108
contain only write entries. Returns an int with the number of write entries.
86109
"""
87-
return int(json.loads(row["data_decoded"])["value"])
110+
return int(json.loads(row["data_decoded"])["u64"])
111+
88112

89-
def to_normalized_histogram(data: npt.ArrayLike) -> None:
113+
def process_classic_transaction(row: dict[str, str]) -> int:
114+
"""
115+
Process a classic row from the history_transactions table.
116+
"""
117+
return int(row["envelope_size"])
118+
119+
120+
def to_normalized_histogram(
121+
data: npt.ArrayLike, max_bins: int, max_output_bins: int
122+
) -> None:
90123
"""
91124
Given a set of data, print a normalized histogram with at most
92125
MAX_OUTPUT_BINS bins formatted for easy pasting into supercluster.
93126
"""
94-
for i in range(MAX_BINS, MAX_OUTPUT_BINS-1, -1):
127+
for i in range(max_bins, max_output_bins - 1, -1):
95128
hist, bins = np.histogram(data, bins=i)
96129

97130
# Add up counts in each bin
@@ -103,10 +136,14 @@ def to_normalized_histogram(data: npt.ArrayLike) -> None:
103136
# Convert to ints
104137
normalized = normalized.round().astype(int)
105138

106-
if len([x for x in normalized if x != 0]) <= MAX_OUTPUT_BINS:
139+
if len([x for x in normalized if x != 0]) <= max_output_bins:
107140
break
108141
# We have too many non-zero output bins. Reduce the number of total bins
109142
# and try again.
143+
else:
144+
raise RuntimeError(
145+
f"Could not generate {max_output_bins} bins (make sure max_output_bins is less than max_bins)!"
146+
)
110147

111148
# Find midpoint of each bin
112149
midpoints = np.empty(len(bins) - 1)
@@ -125,40 +162,43 @@ def to_normalized_histogram(data: npt.ArrayLike) -> None:
125162
print(f"({point}, {count}); ", end="")
126163
print("]")
127164

128-
def process_soroban_history(history_transactions_csv: str) -> None:
129-
""" Generate histograms from data in the history_transactions table. """
165+
166+
def process_soroban_history(
167+
history_transactions_csv: str, workers: int, max_bins: int, max_output_bins: int
168+
) -> None:
169+
"""Generate histograms from data in the history_transactions table."""
130170
with open(history_transactions_csv) as f:
131171
reader = csv.DictReader(f)
132172

133173
# Decode XDR in parallel
134-
with Pool(WORKERS) as p:
135-
processed_rows = p.imap_unordered(process_history_row, reader)
136-
137-
# Filter to just valid rows
138-
valid = [row for row in processed_rows if row != (None, None)]
174+
with ProcessPoolExecutor(workers) as executor:
175+
processed_rows = list(executor.map(process_history_row, reader))
139176

140-
# Parse out invokes
141-
invokes = [i for (i, u) in valid if i is not None]
142-
wasms = [u for (i, u) in valid if u is not None]
177+
invokes = [x for x in processed_rows if isinstance(x, InvokeTransaction)]
178+
wasms = [x.size for x in processed_rows if isinstance(x, WasmUpload)]
143179

144-
# Decompose into instructions, write bytes, and tx size
145-
instructions, write_bytes, tx_size = zip(*invokes)
180+
print("Instructions:")
181+
to_normalized_histogram(
182+
[i.instructions for i in invokes], max_bins, max_output_bins
183+
)
146184

147-
print("Instructions:")
148-
to_normalized_histogram(instructions)
185+
# Convert write_bytes to kilobytes
186+
write_kilobytes = (
187+
(np.array([i.write_bytes for i in invokes]) / 1024).round().astype(int)
188+
)
189+
print("\nI/O Kilobytes:")
190+
to_normalized_histogram(write_kilobytes, max_bins, max_output_bins)
149191

150-
# Convert write_bytes to kilobytes
151-
write_kilobytes = (np.array(write_bytes) / 1024).round().astype(int)
152-
print("\nI/O Kilobytes:")
153-
to_normalized_histogram(write_kilobytes)
192+
print("\nTransaction Size Bytes:")
193+
to_normalized_histogram([i.tx_size for i in invokes], max_bins, max_output_bins)
154194

155-
print("\nTransaction Size Bytes:")
156-
to_normalized_histogram(tx_size)
195+
print("\nWasm Size Bytes:")
196+
to_normalized_histogram(wasms, max_bins, max_output_bins)
157197

158-
print("\nWasm Size Bytes:")
159-
to_normalized_histogram(wasms)
160198

161-
def process_soroban_events(history_contract_events_csv) -> None:
199+
def process_soroban_events(
200+
history_contract_events_csv: str, workers: int, max_bins: int, max_output_bins: int
201+
) -> None:
162202
"""
163203
Generate a histogram for data entries from data in the
164204
history_contract_events table.
@@ -167,28 +207,93 @@ def process_soroban_events(history_contract_events_csv) -> None:
167207
reader = csv.DictReader(f)
168208

169209
# Process CSV in parallel
170-
with Pool(WORKERS) as p:
171-
processed_rows = list(p.imap_unordered(process_event_row, reader))
210+
with ProcessPoolExecutor(workers) as executor:
211+
processed_rows = list(executor.map(process_event_row, reader))
212+
213+
print("Data Entries:")
214+
to_normalized_histogram(processed_rows, max_bins, max_output_bins)
215+
216+
217+
def process_classic_transactions(
218+
history_contract_events_csv: str, workers: int, max_bins: int, max_output_bins: int
219+
) -> None:
220+
"""
221+
Generate a histogram for classic data entries from data in the
222+
history_transactions table.
223+
"""
224+
with open(history_contract_events_csv) as f:
225+
reader = csv.DictReader(f)
226+
227+
# Process CSV in parallel
228+
with ProcessPoolExecutor(workers) as executor:
229+
processed_rows = list(executor.map(process_classic_transaction, reader))
172230

173-
print("Data Entries:")
174-
to_normalized_histogram(processed_rows)
231+
print("Classic Transaction Sizes:")
232+
to_normalized_histogram(processed_rows, max_bins, max_output_bins)
175233

176-
def help_and_exit() -> None:
177-
print(f"Usage: {sys.argv[0]} <history_transactions data> "
178-
"<history_contract_events data>")
179-
print("See the comments at the top of this file for sample Hubble queries "
180-
"to generate the appropriate data.")
181-
sys.exit(1)
182234

183235
def main() -> None:
184-
if len(sys.argv) != 3:
185-
help_and_exit()
236+
parser = argparse.ArgumentParser(
237+
description="See the comments at the end of this help for sample Hubble queries to generate the appropriate data.",
238+
epilog="""You can use the following sample queries as a jumping off point for writing your own queries to generate these CSV files:
239+
240+
history_transactions sample query
241+
SELECT soroban_resources_instructions, soroban_resources_write_bytes, tx_envelope FROM `crypto-stellar.crypto_stellar.history_transactions` WHERE batch_run_date BETWEEN DATETIME("2024-06-24") AND DATETIME("2024-09-24") AND soroban_resources_instructions > 0
242+
243+
history_contract_events sample query
244+
SELECT topics_decoded, data_decoded FROM `crypto-stellar.crypto_stellar.history_contract_events` WHERE type = 2 AND TIMESTAMP_TRUNC(closed_at, MONTH) between TIMESTAMP("2024-06-27") AND TIMESTAMP("2024-09-27") AND contains_substr(topics_decoded, "write_entry")
245+
246+
NOTE: this query filters out anything that isn't a write_entry. This is required for the script to work correctly!
247+
248+
classic_transactions sample query
249+
SELECT LENGTH(FROM_BASE64(tx_envelope)) as envelope_size FROM `crypto-stellar.crypto_stellar.history_transactions` WHERE batch_run_date BETWEEN DATETIME("2025-09-09") AND DATETIME("2025-09-09") AND soroban_resources_instructions = 0
250+
""",
251+
formatter_class=argparse.RawDescriptionHelpFormatter,
252+
)
253+
parser.add_argument(
254+
"history_transactions", type=str, help="history_transactions csv file"
255+
)
256+
parser.add_argument(
257+
"history_contract_events", type=str, help="history_contract events csv file."
258+
)
259+
parser.add_argument(
260+
"classic_transactions", type=str, help="classic_transactions csv file."
261+
)
262+
parser.add_argument(
263+
"-j",
264+
"--workers",
265+
type=int,
266+
default=9,
267+
help="Number of Python subprocesses to run in parallel",
268+
)
269+
parser.add_argument(
270+
"--max-bins",
271+
type=int,
272+
default=100,
273+
help="Maximum number of histogram bins to generate",
274+
)
275+
parser.add_argument(
276+
"--max-output-bins",
277+
type=int,
278+
default=10,
279+
help="Maximum number of histogram bins to output. This is much lower than MAX_BINS, because most bins will be empty (and therefore pruned from the output). If there are too many bins with nonzero values, the script will reduce the number of bins until there are at most MAX_OUTPUT_BINS bins with nonzero values.",
280+
)
281+
args = parser.parse_args()
186282

187283
print("Processing data. This might take a few minutes...")
188284

189-
process_soroban_history(sys.argv[1])
190-
print("")
191-
process_soroban_events(sys.argv[2])
285+
process_soroban_history(
286+
args.history_transactions, args.workers, args.max_bins, args.max_output_bins
287+
)
288+
print()
289+
process_soroban_events(
290+
args.history_contract_events, args.workers, args.max_bins, args.max_output_bins
291+
)
292+
print()
293+
process_classic_transactions(
294+
args.classic_transactions, args.workers, args.max_bins, args.max_output_bins
295+
)
296+
192297

193298
if __name__ == "__main__":
194-
main()
299+
main()

0 commit comments

Comments
 (0)