Skip to content

Commit 04a9e24

Browse files
feat(forester): gRPC-based event-driven processing for V2 trees
1 parent 1bbc3ee commit 04a9e24

File tree

26 files changed

+1647
-121
lines changed

26 files changed

+1647
-121
lines changed

Cargo.lock

Lines changed: 372 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cli/src/utils/processPhotonIndexer.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ export async function startIndexer(
5757
indexerPort.toString(),
5858
"--rpc-url",
5959
rpcUrl,
60+
"--grpc-port",
61+
"50051",
6062
];
6163
if (photonDatabaseUrl) {
6264
args.push("--db-url", photonDatabaseUrl);

forester-utils/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,7 @@ num-traits = { workspace = true }
4747
bb8 = { workspace = true }
4848
async-trait = { workspace = true }
4949
governor = { workspace = true }
50+
51+
[dev-dependencies]
52+
tokio-postgres = "0.7"
53+
bs58 = { workspace = true }

forester-utils/src/instructions/address_batch_update.rs

Lines changed: 80 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@ use std::{pin::Pin, sync::Arc, time::Duration};
22

33
use account_compression::processor::initialize_address_merkle_tree::Pubkey;
44
use async_stream::stream;
5-
use futures::{
6-
stream::{FuturesOrdered, Stream},
7-
StreamExt,
8-
};
5+
use futures::stream::Stream;
96
use light_batched_merkle_tree::{
107
constants::DEFAULT_BATCH_ADDRESS_TREE_HEIGHT, merkle_tree::InstructionDataAddressAppendInputs,
118
};
@@ -23,8 +20,8 @@ use tracing::{debug, error, info, warn};
2320

2421
use crate::{error::ForesterUtilsError, rpc_pool::SolanaRpcPool, utils::wait_for_indexer};
2522

26-
const MAX_PHOTON_ELEMENTS_PER_CALL: usize = 500;
27-
const MAX_PROOFS_PER_TX: usize = 3;
23+
const MAX_PHOTON_ELEMENTS_PER_CALL: usize = 1000;
24+
const MAX_PROOFS_PER_TX: usize = 4;
2825

2926
pub struct AddressUpdateConfig<R: Rpc> {
3027
pub rpc_pool: Arc<SolanaRpcPool<R>>,
@@ -54,13 +51,14 @@ async fn stream_instruction_data<'a, R: Rpc>(
5451
let max_zkp_batches_per_call = calculate_max_zkp_batches_per_call(zkp_batch_size);
5552
let total_chunks = leaves_hash_chains.len().div_ceil(max_zkp_batches_per_call);
5653

54+
let mut next_queue_index: Option<u64> = None;
55+
5756
for chunk_idx in 0..total_chunks {
5857
let chunk_start = chunk_idx * max_zkp_batches_per_call;
5958
let chunk_end = std::cmp::min(chunk_start + max_zkp_batches_per_call, leaves_hash_chains.len());
6059
let chunk_hash_chains = &leaves_hash_chains[chunk_start..chunk_end];
6160

6261
let elements_for_chunk = chunk_hash_chains.len() * zkp_batch_size as usize;
63-
let processed_items_offset = chunk_start * zkp_batch_size as usize;
6462

6563
{
6664
if chunk_idx > 0 {
@@ -76,11 +74,15 @@ async fn stream_instruction_data<'a, R: Rpc>(
7674
let indexer_update_info = {
7775
let mut connection = rpc_pool.get_connection().await?;
7876
let indexer = connection.indexer_mut()?;
77+
debug!(
78+
"Requesting {} addresses from Photon for chunk {} with start_queue_index={:?}",
79+
elements_for_chunk, chunk_idx, next_queue_index
80+
);
7981
match indexer
8082
.get_address_queue_with_proofs(
8183
&merkle_tree_pubkey,
8284
elements_for_chunk as u16,
83-
Some(processed_items_offset as u64),
85+
next_queue_index,
8486
None,
8587
)
8688
.await {
@@ -92,6 +94,26 @@ async fn stream_instruction_data<'a, R: Rpc>(
9294
}
9395
};
9496

97+
// Log Photon response details
98+
debug!(
99+
"Photon response for chunk {}: received {} addresses, batch_start_index={}, first_queue_index={:?}, last_queue_index={:?}",
100+
chunk_idx,
101+
indexer_update_info.value.addresses.len(),
102+
indexer_update_info.value.batch_start_index,
103+
indexer_update_info.value.addresses.first().map(|a| a.queue_index),
104+
indexer_update_info.value.addresses.last().map(|a| a.queue_index)
105+
);
106+
107+
// Update next_queue_index for the next chunk based on the last address returned
108+
if let Some(last_address) = indexer_update_info.value.addresses.last() {
109+
next_queue_index = Some(last_address.queue_index + 1);
110+
debug!(
111+
"Setting next_queue_index={} for chunk {}",
112+
next_queue_index.unwrap(),
113+
chunk_idx + 1
114+
);
115+
}
116+
95117
if chunk_idx == 0 {
96118
if let Some(first_proof) = indexer_update_info.value.non_inclusion_proofs.first() {
97119
if first_proof.root != current_root {
@@ -121,56 +143,23 @@ async fn stream_instruction_data<'a, R: Rpc>(
121143
};
122144
current_root = new_current_root;
123145

124-
info!("Generating {} ZK proofs with hybrid approach for chunk {}", all_inputs.len(), chunk_idx + 1);
125-
126-
let mut futures_ordered = FuturesOrdered::new();
127-
let mut proof_buffer = Vec::new();
128-
let mut pending_count = 0;
146+
info!("Generating {} zk proofs for batch_address chunk {} (parallel)", all_inputs.len(), chunk_idx + 1);
129147

130-
for (i, inputs) in all_inputs.into_iter().enumerate() {
148+
// Generate ALL proofs in parallel using join_all
149+
let proof_futures: Vec<_> = all_inputs.into_iter().enumerate().map(|(i, inputs)| {
131150
let client = Arc::clone(&proof_client);
132-
futures_ordered.push_back(async move {
151+
async move {
133152
let result = client.generate_batch_address_append_proof(inputs).await;
134153
(i, result)
135-
});
136-
pending_count += 1;
137-
138-
if pending_count >= MAX_PROOFS_PER_TX {
139-
for _ in 0..MAX_PROOFS_PER_TX.min(pending_count) {
140-
if let Some((idx, result)) = futures_ordered.next().await {
141-
match result {
142-
Ok((compressed_proof, new_root)) => {
143-
let instruction_data = InstructionDataAddressAppendInputs {
144-
new_root,
145-
compressed_proof: CompressedProof {
146-
a: compressed_proof.a,
147-
b: compressed_proof.b,
148-
c: compressed_proof.c,
149-
},
150-
};
151-
proof_buffer.push(instruction_data);
152-
},
153-
Err(e) => {
154-
error!("Address proof failed to generate at index {}: {:?}", idx, e);
155-
yield Err(ForesterUtilsError::Prover(format!(
156-
"Address proof generation failed at batch {} in chunk {}: {}",
157-
idx, chunk_idx, e
158-
)));
159-
return;
160-
}
161-
}
162-
pending_count -= 1;
163-
}
164-
}
165-
166-
if !proof_buffer.is_empty() {
167-
yield Ok(proof_buffer.clone());
168-
proof_buffer.clear();
169-
}
170154
}
171-
}
155+
}).collect();
156+
157+
// Wait for all proofs to complete in parallel
158+
let proof_results = futures::future::join_all(proof_futures).await;
172159

173-
while let Some((idx, result)) = futures_ordered.next().await {
160+
// Process results and batch them into groups of MAX_PROOFS_PER_TX
161+
let mut proof_buffer = Vec::new();
162+
for (idx, result) in proof_results {
174163
match result {
175164
Ok((compressed_proof, new_root)) => {
176165
let instruction_data = InstructionDataAddressAppendInputs {
@@ -183,6 +172,7 @@ async fn stream_instruction_data<'a, R: Rpc>(
183172
};
184173
proof_buffer.push(instruction_data);
185174

175+
// Yield when we have MAX_PROOFS_PER_TX proofs ready
186176
if proof_buffer.len() >= MAX_PROOFS_PER_TX {
187177
yield Ok(proof_buffer.clone());
188178
proof_buffer.clear();
@@ -199,6 +189,7 @@ async fn stream_instruction_data<'a, R: Rpc>(
199189
}
200190
}
201191

192+
// Yield any remaining proofs
202193
if !proof_buffer.is_empty() {
203194
yield Ok(proof_buffer);
204195
}
@@ -249,27 +240,60 @@ fn get_all_circuit_inputs_for_chunk(
249240
for (batch_idx, leaves_hash_chain) in chunk_hash_chains.iter().enumerate() {
250241
let start_idx = batch_idx * batch_size as usize;
251242
let end_idx = start_idx + batch_size as usize;
243+
244+
let addresses_len = indexer_update_info.value.addresses.len();
245+
if start_idx >= addresses_len {
246+
return Err(ForesterUtilsError::Indexer(format!(
247+
"Insufficient addresses: batch {} requires start_idx {} but only {} addresses available",
248+
batch_idx, start_idx, addresses_len
249+
)));
250+
}
251+
let safe_end_idx = std::cmp::min(end_idx, addresses_len);
252+
if safe_end_idx - start_idx != batch_size as usize {
253+
return Err(ForesterUtilsError::Indexer(format!(
254+
"Insufficient addresses: batch {} requires {} addresses (indices {}..{}) but only {} available",
255+
batch_idx, batch_size, start_idx, end_idx, safe_end_idx - start_idx
256+
)));
257+
}
258+
252259
let batch_addresses: Vec<[u8; 32]> = indexer_update_info.value.addresses
253-
[start_idx..end_idx]
260+
[start_idx..safe_end_idx]
254261
.iter()
255262
.map(|x| x.address)
256263
.collect();
257264

265+
let proofs_len = indexer_update_info.value.non_inclusion_proofs.len();
266+
if start_idx >= proofs_len {
267+
return Err(ForesterUtilsError::Indexer(format!(
268+
"Insufficient non-inclusion proofs: batch {} requires start_idx {} but only {} proofs available",
269+
batch_idx, start_idx, proofs_len
270+
)));
271+
}
272+
let safe_proofs_end_idx = std::cmp::min(end_idx, proofs_len);
273+
if safe_proofs_end_idx - start_idx != batch_size as usize {
274+
return Err(ForesterUtilsError::Indexer(format!(
275+
"Insufficient non-inclusion proofs: batch {} requires {} proofs (indices {}..{}) but only {} available",
276+
batch_idx, batch_size, start_idx, end_idx, safe_proofs_end_idx - start_idx
277+
)));
278+
}
279+
258280
let mut low_element_values = Vec::new();
259281
let mut low_element_next_values = Vec::new();
260282
let mut low_element_indices = Vec::new();
261283
let mut low_element_next_indices = Vec::new();
262284
let mut low_element_proofs = Vec::new();
263285

264-
for proof in &indexer_update_info.value.non_inclusion_proofs[start_idx..end_idx] {
286+
for proof in &indexer_update_info.value.non_inclusion_proofs[start_idx..safe_proofs_end_idx]
287+
{
265288
low_element_values.push(proof.low_address_value);
266289
low_element_indices.push(proof.low_address_index as usize);
267290
low_element_next_indices.push(proof.low_address_next_index as usize);
268291
low_element_next_values.push(proof.low_address_next_value);
269292
low_element_proofs.push(proof.low_address_proof.to_vec());
270293
}
271294

272-
if create_hash_chain_from_slice(&batch_addresses)? != *leaves_hash_chain {
295+
let computed_hash_chain = create_hash_chain_from_slice(&batch_addresses)?;
296+
if computed_hash_chain != *leaves_hash_chain {
273297
return Err(ForesterUtilsError::Prover(
274298
"Addresses hash chain does not match".into(),
275299
));
@@ -323,6 +347,7 @@ pub async fn get_address_update_instruction_stream<'a, R: Rpc>(
323347
let (current_root, leaves_hash_chains, start_index, zkp_batch_size) = (
324348
merkle_tree_data.current_root,
325349
merkle_tree_data.leaves_hash_chains,
350+
// merkle_tree_data.batch_start_index,
326351
merkle_tree_data.next_index,
327352
merkle_tree_data.zkp_batch_size,
328353
);

forester-utils/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub struct ParsedMerkleTreeData {
2222
pub pending_batch_index: u32,
2323
pub num_inserted_zkps: u64,
2424
pub current_zkp_batch_index: u64,
25+
pub batch_start_index: u64,
2526
pub leaves_hash_chains: Vec<[u8; 32]>,
2627
}
2728

forester/Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ solana-client = { workspace = true }
1212
solana-account-decoder = { workspace = true }
1313
solana-program = { workspace = true }
1414
account-compression = { workspace = true }
15+
light-account-checks = { workspace = true }
1516
light-batched-merkle-tree = { workspace = true }
1617
light-compressed-account = { workspace = true, features = ["std"] }
1718
light-system-program-anchor = { workspace = true, features = ["cpi"] }
@@ -52,6 +53,16 @@ scopeguard = "1.2.0"
5253
itertools = "0.14.0"
5354
num-bigint = { workspace = true }
5455

56+
# gRPC client for Photon queue subscriptions (match Photon versions)
57+
tonic = "0.14.2"
58+
prost = "0.14.1"
59+
prost-types = "0.14.1"
60+
tonic-prost = "0.14.2"
61+
tokio-stream = { version = "0.1", features = ["sync"] }
62+
63+
[build-dependencies]
64+
tonic-prost-build = "0.14.2"
65+
5566
[dev-dependencies]
5667
serial_test = { workspace = true }
5768
light-prover-client = { workspace = true, features = ["devenv"] }

forester/build.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
fn main() -> Result<(), Box<dyn std::error::Error>> {
2+
tonic_prost_build::configure().compile_protos(&["proto/photon.proto"], &["proto"])?;
3+
println!("cargo:rerun-if-changed=proto/photon.proto");
4+
5+
Ok(())
6+
}

forester/proto/photon.proto

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
syntax = "proto3";
2+
3+
package photon;
4+
5+
// Queue information service
6+
service QueueService {
7+
// Get current queue information for all or specific trees
8+
rpc GetQueueInfo(GetQueueInfoRequest) returns (GetQueueInfoResponse);
9+
10+
// Subscribe to queue updates
11+
rpc SubscribeQueueUpdates(SubscribeQueueUpdatesRequest) returns (stream QueueUpdate);
12+
}
13+
14+
// Request message for GetQueueInfo
15+
message GetQueueInfoRequest {
16+
// Optional list of tree pubkeys to filter by (base58 encoded)
17+
// If empty, returns info for all trees
18+
repeated string trees = 1;
19+
}
20+
21+
// Response message for GetQueueInfo
22+
message GetQueueInfoResponse {
23+
repeated QueueInfo queues = 1;
24+
uint64 slot = 2;
25+
}
26+
27+
// Information about a single queue
28+
message QueueInfo {
29+
// Tree public key (base58 encoded)
30+
string tree = 1;
31+
32+
// Queue public key (base58 encoded)
33+
string queue = 2;
34+
35+
// Queue type: 3 = InputStateV2, 4 = AddressV2, 5 = OutputStateV2
36+
uint32 queue_type = 3;
37+
38+
// Current number of items in the queue
39+
uint64 queue_size = 4;
40+
}
41+
42+
// Request message for SubscribeQueueUpdates
43+
message SubscribeQueueUpdatesRequest {
44+
// Optional list of tree pubkeys to subscribe to (base58 encoded)
45+
// If empty, subscribes to all trees
46+
repeated string trees = 1;
47+
48+
// Whether to send initial state before streaming updates
49+
bool send_initial_state = 2;
50+
}
51+
52+
// Streamed queue update message
53+
message QueueUpdate {
54+
// The queue that was updated
55+
QueueInfo queue_info = 1;
56+
57+
// Slot at which the update occurred
58+
uint64 slot = 2;
59+
60+
// Type of update
61+
UpdateType update_type = 3;
62+
}
63+
64+
// Type of queue update
65+
enum UpdateType {
66+
UPDATE_TYPE_UNSPECIFIED = 0;
67+
UPDATE_TYPE_INITIAL = 1; // Initial state sent at subscription
68+
UPDATE_TYPE_ITEM_ADDED = 2; // Item added to queue
69+
UPDATE_TYPE_ITEM_REMOVED = 3; // Item removed from queue
70+
}

forester/src/cli.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ pub struct StartArgs {
6868
#[arg(long, env = "FORESTER_PHOTON_API_KEY")]
6969
pub photon_api_key: Option<String>,
7070

71+
#[arg(long, env = "FORESTER_PHOTON_GRPC_URL")]
72+
pub photon_grpc_url: Option<String>,
73+
7174
#[arg(long, env = "FORESTER_INDEXER_BATCH_SIZE", default_value = "50")]
7275
pub indexer_batch_size: usize,
7376

0 commit comments

Comments
 (0)