Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 58 additions & 25 deletions packages/commands/src/query/cctx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ import {
} from "../../../../src/constants/commands/cctx";
import { cctxOptionsSchema } from "../../../../src/schemas/commands/cctx";
import type { CrossChainTx } from "../../../../types/trackCCTX.types";
import { fetchFromApi, sleep } from "../../../../utils";
import {
getCctxByHash,
getCctxDataByInboundHash,
sleep,
} from "../../../../utils";

/**
* Event map:
Expand All @@ -24,10 +28,6 @@ export const cctxEmitter = new EventEmitter<CctxEvents>();

type CctxOptions = z.infer<typeof cctxOptionsSchema>;

interface CctxResponse {
CrossChainTxs: CrossChainTx[];
}

/**
* True if the CCTX is still in-flight and may mutate on‑chain.
*/
Expand Down Expand Up @@ -57,6 +57,9 @@ const gatherCctxs = async (
// Track which indexes we've *ever* queried so we still fetch each once
const queriedOnce = new Set<string>();

// Stay in discovery mode until the first CCTX is found
let awaitingRootDiscovery = true;

// eslint-disable-next-line no-constant-condition
while (true) {
// Check if we've exceeded the timeout (skip if timeout is 0)
Expand All @@ -70,31 +73,61 @@ const gatherCctxs = async (
await Promise.all(
[...frontier].map(async (hash) => {
try {
const endpoint = `/zeta-chain/crosschain/inboundHashToCctxData/${hash}`;
const response = await fetchFromApi<CctxResponse>(rpc, endpoint);
const cctxs = response.CrossChainTxs;

if (cctxs.length === 0) {
// Still 404 – keep trying
nextFrontier.add(hash);
return;
}
// In the very first round, try both endpoints so the root hash
// can be either an inbound tx hash or a CCTX index/hash.
if (awaitingRootDiscovery) {
let discovered: CrossChainTx[] = [];

// Try cctx single endpoint FIRST
const single = await getCctxByHash(rpc, hash);
if (single) {
discovered.push(single);
} else {
// If not found via cctx, fall back to inboundHashToCctxData
const byInbound = await getCctxDataByInboundHash(rpc, hash);
if (byInbound.length > 0) {
discovered = discovered.concat(byInbound);
}
}

for (const tx of cctxs) {
// Store latest version
results.set(tx.index, tx);
if (discovered.length === 0) {
// Still not found – keep retrying the root hash
nextFrontier.add(hash);
return;
}

// Always query this index at least once
if (!queriedOnce.has(tx.index)) {
nextFrontier.add(tx.index);
// We discovered at least one CCTX; switch to inbound-only mode next rounds
awaitingRootDiscovery = false;

for (const tx of discovered) {
results.set(tx.index, tx);
if (!queriedOnce.has(tx.index)) {
nextFrontier.add(tx.index);
}
if (isPending(tx)) {
nextFrontier.add(tx.inbound_params.observed_hash);
}
queriedOnce.add(tx.index);
}
} else {
const cctxs = await getCctxDataByInboundHash(rpc, hash);

// Keep querying while pending
if (isPending(tx)) {
nextFrontier.add(tx.inbound_params.observed_hash);
if (cctxs.length === 0) {
// Still 404 – keep trying
nextFrontier.add(hash);
return;
}

queriedOnce.add(tx.index);
for (const tx of cctxs) {
results.set(tx.index, tx);
if (!queriedOnce.has(tx.index)) {
nextFrontier.add(tx.index);
}
if (isPending(tx)) {
nextFrontier.add(tx.inbound_params.observed_hash);
}
queriedOnce.add(tx.index);
}
}
Comment on lines +112 to 131
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Bug: Post-discovery uses inbound-hash API for CCTX indexes, causing endless 404 loops.
After discovery you enqueue both tx.index (CCTX id) and inbound observed hashes into frontier, but the else-branch always calls getCctxDataByInboundHash for any hash. When hash is actually a CCTX index, this returns 404 forever and re-enqueues the same value.

Use a simple discriminator: if results already contains the key, treat it as CCTX index and refresh via getCctxByHash; otherwise treat it as inbound hash.

-          } else {
-            const cctxs = await getCctxDataByInboundHash(rpc, hash);
-
-            if (cctxs.length === 0) {
-              // Still 404 – keep trying
-              nextFrontier.add(hash);
-              return;
-            }
-
-            for (const tx of cctxs) {
-              results.set(tx.index, tx);
-              if (!queriedOnce.has(tx.index)) {
-                nextFrontier.add(tx.index);
-              }
-              if (isPending(tx)) {
-                nextFrontier.add(tx.inbound_params.observed_hash);
-              }
-              queriedOnce.add(tx.index);
-            }
-          }
+          } else {
+            // If we've seen this key as an index before, refresh via single CCTX endpoint.
+            if (results.has(hash)) {
+              const single = await getCctxByHash(rpc, hash);
+              if (!single) {
+                nextFrontier.add(hash);
+                return;
+              }
+              const tx = single;
+              results.set(tx.index, tx);
+              if (!queriedOnce.has(tx.index)) {
+                nextFrontier.add(tx.index);
+                queriedOnce.add(tx.index);
+              }
+              if (isPending(tx) && tx.inbound_params.observed_hash) {
+                nextFrontier.add(tx.inbound_params.observed_hash);
+              }
+            } else {
+              const cctxs = await getCctxDataByInboundHash(rpc, hash);
+              if (cctxs.length === 0) {
+                nextFrontier.add(hash);
+                return;
+              }
+              for (const tx of cctxs) {
+                results.set(tx.index, tx);
+                if (!queriedOnce.has(tx.index)) {
+                  nextFrontier.add(tx.index);
+                }
+                if (isPending(tx) && tx.inbound_params.observed_hash) {
+                  nextFrontier.add(tx.inbound_params.observed_hash);
+                }
+                queriedOnce.add(tx.index);
+              }
+            }
+          }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} else {
const cctxs = await getCctxDataByInboundHash(rpc, hash);
// Keep querying while pending
if (isPending(tx)) {
nextFrontier.add(tx.inbound_params.observed_hash);
if (cctxs.length === 0) {
// Still 404 – keep trying
nextFrontier.add(hash);
return;
}
queriedOnce.add(tx.index);
for (const tx of cctxs) {
results.set(tx.index, tx);
if (!queriedOnce.has(tx.index)) {
nextFrontier.add(tx.index);
}
if (isPending(tx)) {
nextFrontier.add(tx.inbound_params.observed_hash);
}
queriedOnce.add(tx.index);
}
}
} else {
// If we've seen this key as an index before, refresh via the single‐CCTX endpoint.
if (results.has(hash)) {
const single = await getCctxByHash(rpc, hash);
if (!single) {
// Still 404 – keep trying
nextFrontier.add(hash);
return;
}
const tx = single;
results.set(tx.index, tx);
if (!queriedOnce.has(tx.index)) {
nextFrontier.add(tx.index);
queriedOnce.add(tx.index);
}
if (isPending(tx) && tx.inbound_params.observed_hash) {
nextFrontier.add(tx.inbound_params.observed_hash);
}
} else {
// Otherwise treat it as an inbound‐hash lookup
const cctxs = await getCctxDataByInboundHash(rpc, hash);
if (cctxs.length === 0) {
// Still 404 – keep trying
nextFrontier.add(hash);
return;
}
for (const tx of cctxs) {
results.set(tx.index, tx);
if (!queriedOnce.has(tx.index)) {
nextFrontier.add(tx.index);
}
if (isPending(tx) && tx.inbound_params.observed_hash) {
nextFrontier.add(tx.inbound_params.observed_hash);
}
queriedOnce.add(tx.index);
}
}
}
🤖 Prompt for AI Agents
In packages/commands/src/query/cctx.ts around lines 112 to 131, the code always
calls getCctxDataByInboundHash(rpc, hash) causing 404 loops when the frontier
item is actually a CCTX index; change the branch to discriminate by whether
results already has the hash (i.e., results.has(hash) means this is a CCTX
index) and in that case call getCctxByHash(rpc, hash) to refresh the CCTX;
otherwise call getCctxDataByInboundHash for inbound hashes. Mirror the existing
handling for pushing tx.index, tx.inbound_params.observed_hash, queriedOnce and
nextFrontier in each path so you don't re-enqueue the same 404-producing value
and avoid infinite retry loops.

} catch (err) {
nextFrontier.add(hash); // retry on error
Expand Down Expand Up @@ -257,7 +290,7 @@ const main = async (options: CctxOptions) => {

export const cctxCommand = new Command("cctx")
.description("Query cross-chain transaction data in real-time")
.requiredOption("-h, --hash <hash>", "Inbound transaction hash")
.requiredOption("-h, --hash <hash>", "Inbound tx hash or CCTX hash")
.option("-r, --rpc <rpc>", "RPC endpoint", DEFAULT_API_URL)
.option(
"-d, --delay <ms>",
Expand Down
30 changes: 30 additions & 0 deletions utils/api.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import axios, { AxiosRequestConfig, isAxiosError } from "axios";

import {
CrossChainTx,
CrossChainTxResponse,
InboundHashToCctxResponseReturnType,
PendingNonce,
Expand Down Expand Up @@ -87,6 +88,35 @@ export const getCctxByInboundHash = async (
}
};

/**
* Fetch full CCTX objects by inbound transaction hash.
* Returns [] on 404/400, logs only unexpected errors.
*/
export const getCctxDataByInboundHash = async (
api: string,
hash: string
): Promise<CrossChainTx[]> => {
try {
const data = await fetchFromApi<{ CrossChainTxs: CrossChainTx[] }>(
api,
`/zeta-chain/crosschain/inboundHashToCctxData/${hash}`
);
return Array.isArray(data.CrossChainTxs) ? data.CrossChainTxs : [];
} catch (error) {
if (
isAxiosError(error) &&
(error.response?.status === 404 || error.response?.status === 400)
) {
return [];
}
handleError({
context: "Failed to fetch CCTX data by inbound hash",
error,
});
return [];
}
};

/**
* Fetch pending nonces for a specific TSS
*/
Expand Down