[codex] Add query progress reporting#18649
Conversation
There was a problem hiding this comment.
Pull request overview
Adds end-to-end query progress reporting for long-running Pinot queries, exposing a unified progress model (processed work units / total work units) across SSE (segment-based) and MSE (operator/stage-based) execution paths, and surfacing it via REST/gRPC, Query Console UI, and Pinot CLI.
Changes:
- Introduces
QueryProgressStatsinpinot-spi, plus progress counters inQueryExecutionContext. - Implements progress tracking and retrieval across servers/brokers/controller (including new REST endpoints and a new gRPC
ProgressRPC for MSE). - Adds polling + rendering in Query Console and an interactive CLI progress line / progress bar.
Reviewed changes
Copilot reviewed 35 out of 35 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| pinot-spi/src/test/java/org/apache/pinot/spi/query/QueryProgressStatsTest.java | Adds unit tests for percent calculation, aggregation, JSON round-trip, and execution context accumulation. |
| pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryProgressStats.java | New progress stats model with JSON support, aggregation, and derived percent. |
| pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java | Adds atomic progress counters and APIs to mutate/read progress. |
| pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java | Adds server REST endpoint to fetch per-query progress and aggregate OFFLINE/REALTIME. |
| pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java | Adds test coverage for progress tracking on completed op-chains. |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java | Adds gRPC Progress RPC handler for MSE worker progress. |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java | Adds broker-side dispatch logic to query MSE workers for progress and aggregate responses. |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java | Adds client call implementation for the new gRPC progress RPC. |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java | Exposes execution-context tracking and progress retrieval via OpChainSchedulerService. |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java | Plumbs a shared QueryExecutionContext into leaf-stage ServerQueryRequests for progress attribution. |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java | Tracks execution contexts and increments processed work units on op-chain completion/failure. |
| pinot-core/src/test/java/org/apache/pinot/core/transport/InstanceRequestHandlerTest.java | Updates tests for renamed/cached execution-context retrieval API. |
| pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java | Uses cached execution context and exposes server-side progress stats lookup. |
| pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java | Uses cached execution context when opening QueryThreadContext. |
| pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java | Adds execution-context caching + setter to support shared context plumbing. |
| pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java | Adds total segment accounting to drive SSE progress denominators. |
| pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java | Marks segments as processed during combine execution to advance progress. |
| pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SequentialSortedGroupByCombineOperator.java | Marks segments as processed for sequential sorted group-by combine. |
| pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java | Marks segments as processed (including skipped segments) for progress accuracy. |
| pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByCombineOperator.java | Marks processed segments during group-by combine. |
| pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseSingleBlockCombineOperator.java | Marks segments as processed when producing results blocks. |
| pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java | Adds shared helper to increment processed-segment progress via thread context. |
| pinot-controller/src/main/resources/app/requests/index.ts | Adds Query Console API call for controller clientQueryId progress endpoint. |
| pinot-controller/src/main/resources/app/pages/Query.tsx | Adds clientQueryId injection, progress polling, and progress UI (numbers + bar). |
| pinot-controller/src/main/resources/app/Models.ts | Adds QueryProgressStats type to UI model definitions. |
| pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java | Adds controller REST endpoint to fetch progress by clientQueryId by polling brokers. |
| pinot-common/src/main/proto/worker.proto | Adds gRPC Progress RPC and request/response messages for MSE worker progress. |
| pinot-clients/pinot-cli/src/main/java/org/apache/pinot/cli/PinotCli.java | Adds CLI progress polling/rendering, config + flag, and clientQueryId injection. |
| pinot-clients/pinot-cli/README.md | Documents CLI/query-console progress behavior and usage examples. |
| pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java | Tracks MSE execution contexts and aggregates broker+server progress for MSE queries. |
| pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java | Routes broker progress requests to MSE handler first, then SSE handler. |
| pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java | Extends broker handler interface with getQueryProgressStats(...). |
| pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java | Implements SSE progress retrieval by polling servers’ new progress endpoint. |
| pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java | Adds default getQueryProgressStats(...) method stub + precondition for clientQueryId mapping. |
| pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java | Adds broker REST endpoint to fetch progress by internal requestId or clientQueryId. |
6cb4cc3 to
d601f43
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18649 +/- ##
============================================
+ Coverage 64.39% 64.44% +0.04%
Complexity 1291 1291
============================================
Files 3364 3372 +8
Lines 207935 208973 +1038
Branches 32467 32638 +171
============================================
+ Hits 133906 134675 +769
- Misses 63255 63499 +244
- Partials 10774 10799 +25
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
d601f43 to
1940943
Compare
xiangfu0
left a comment
There was a problem hiding this comment.
Found one high-signal issue; see inline comment.
5dda15c to
44ae161
Compare
xiangfu0
left a comment
There was a problem hiding this comment.
Found 1 high-signal issue; see inline comment.
|
I'll take a look today, but remember I also been working on #18458, which should help to produce more precise reports for MSE |
|
This is a really useful feature — having progress while a query runs is something users ask for constantly. A few thoughts on how it interacts with #18458 (SubmitWithStream bidi stats), which I think is worth addressing before merge since the two PRs modify some of the same infrastructure. Merge conflict in Both PRs add fields and lifecycle logic to This PR adds:
#18458 adds:
The I'd suggest deferring this PR until after #18458 merges, then replacing
The current MSE progress model counts op-chains as work units. That means progress only moves when an op-chain finishes. In a typical pipeline, leaf scan op-chains finish early while join and aggregation op-chains run for the full query duration: With op-chain counting: progress reads 2/5 = 40% for most of the query, then 5/5 = 100% in rapid succession. The bar sits still for the vast majority of the query duration.
// At query start: use leaf segment count as the denominator (exact, known upfront)
ctx.addTotalWorkUnits(totalLeafSegments);
// In OpChainCompletionListener (fires per op-chain, with stats):
if (isLeafStage(opChainId)) {
long rowsScanned = stats.get(LeafOperator.StatKey.NUM_DOCS_SCANNED);
ctx.addProcessedWorkUnits(rowsScanned);
}
// Non-leaf op-chains don't contribute — they're bounded by what the leaves produceThis makes progress increase smoothly as leaf segments are scanned, which is both more accurate and more informative. It also naturally fixes the double-counting issue where Rows-per-second as the primary signal (optional) A related idea worth considering: rather than a percentage (which requires a reliable denominator), expose This is the model that both Trino and ClickHouse have converged on:
Both approaches show that rows/s is valuable even without a perfect denominator. When a percentage is available (Trino has For Pinot, the simplest path is the Trino approach: add |
|
The polling chain this PR introduces is functional but has a cost-multiplier property that becomes significant at scale. Flagging it here as a design discussion point rather than a blocker, since fixing it is a larger change that can be done in a follow-up. The problem with polling When a client calls For a 30-second query with a 1-second poll interval and 3 servers: At 100 concurrent queries: ~9,000 extra calls/minute. Each tick also requires the server to have live progress state accessible at any time (the Guava caches in A push alternative The natural fix is to flip the direction: client opens one persistent connection, broker pushes events as they arrive. Cost for the same 30-second query: The broker SSE endpoint just fans out events it already holds in Why this is feasible with #18458 in place #18458 introduces a long-lived gRPC bidi channel ( // In StreamingQuerySession, when OpChainComplete arrives (already called by #18458):
public void onOpChainComplete(...) {
mergeStats(...); // existing #18458 logic
broadcastProgressSnapshot(); // new: push to SSE subscribers
}
// New broker endpoint:
@GET @Path("query/{id}/progress/stream") @Produces(SERVER_SENT_EVENTS)
public void streamProgress(@PathParam("id") long queryId, @Context SseEventSink sink) {
_queryDispatcher.subscribeProgressStream(queryId, sink);
}Connection cleanup is handled automatically: when the SSE connection drops, What this PR should do now This is a non-trivial change that I wouldn't block the current PR on. But it would be worth:
The main thing to avoid is designing the server-side state (Guava caches, eviction timings) in a way that makes it hard to remove when the push path lands. The ref-counted |
44ae161 to
5cb38b9
Compare
|
Addressed the adaptive progress display path in the latest push.
I kept the rows/s metric and broker-pushed progress stream as follow-up scope. The current shape keeps the aggregate fields backward compatible while allowing richer MSE status when the response includes details. |
xiangfu0
left a comment
There was a problem hiding this comment.
Found one correctness issue; see inline comment.
| } | ||
| } | ||
| if (!serverProgressStats.isEmpty()) { | ||
| return QueryProgressStats.aggregate(serverProgressStats); |
There was a problem hiding this comment.
This still returns a partial aggregate over only the SSE servers that replied with 200. If one targeted server times out or is temporarily unreachable, its unfinished work disappears from the denominator and the broker can report inflated progress, including 100%, even though the query is still blocked on that server. The MSE path now avoids that by treating missing servers as unknown progress; SSE needs the same treatment here (or retained last-known totals) instead of returning a partial aggregate.
Summary
Adds query progress reporting for long-running Pinot queries across the broker, controller, server, V1 execution, V2 execution, query console, and Pinot CLI.
The progress model reports processed work units over total work units. V1 uses server segment progress; V2 estimates work from multi-stage operators and stage execution progress. The controller exposes progress by
clientQueryId, the query console polls it while a query is running, and the CLI renders adaptive progress: SSE/simple responses stay one compact line, while MSE responses can render aggregate progress plus labeled component rows.User impact
RUNNINGstate.pinot-clisupports--progress-interval-msand config keyprogress-interval-ms.--progress-interval-ms=0and is only rendered for interactive terminals, so redirected output/logs stay clean.Screenshot
Query console progress while a V2 quickstart query is running:
Notes
The CLI injects a generated
clientQueryIdas a quoted query option so progress polling can correlate the client request with running query state.For MSE, progress keeps missing or non-responsive workers as labeled unknown rows instead of dropping them from the aggregate denominator. That prevents a partially reported query from falsely showing 100% complete.
Validation
./mvnw -pl pinot-controller,pinot-clients/pinot-cli -am -DskipTests -DskipITs -Dmaven.javadoc.skip=true compile./mvnw -pl pinot-broker -am -DskipTests -DskipITs -Dmaven.javadoc.skip=true compile./mvnw -pl pinot-spi -Dtest=QueryProgressStatsTest test./mvnw -pl pinot-query-runtime -am -Dtest=OpChainSchedulerServiceTest -Dsurefire.failIfNoSpecifiedTests=false test./mvnw -pl pinot-core -am -Dtest=InstanceRequestHandlerTest -Dsurefire.failIfNoSpecifiedTests=false test./mvnw -pl pinot-clients/pinot-cli -DskipTests -DskipITs -Dmaven.javadoc.skip=true packagespotless:apply,license:format,license:check, andcheckstyle:checkon affected modulesgit diff --check