feat(entity-caching-1): raw event pipeline (draft)#2827
Conversation
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughAdds a complete entity-cache event export pipeline: new proto schema, router-side builders/exporter/sink, GraphQL-metrics ingest service with batch processor and ClickHouse writer, ClickHouse migrations, config and wiring, bearer-auth client option and retry predicate, CI/codegen updates, and comprehensive tests and integration wiring. ChangesEntity Cache Events Export
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
This comment has been minimized.
This comment has been minimized.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## milinda/entity-intelligence-0 #2827 +/- ##
================================================================
Coverage ? 63.66%
================================================================
Files ? 325
Lines ? 45816
Branches ? 4764
================================================================
Hits ? 29167
Misses ? 16511
Partials ? 138 🚀 New features to boost your workflow:
|
The raw-event pipeline previously depended on graphql-go-tools fields that only exist on the unmerged engine branch (per-event Timestamp, DataSource on MutationEvent/EntityFieldHash, FieldPath on EntityFieldHash, and the ForceHashAnalyticsKeys planner override). CI builds against the pinned go.mod version, which does not yet have those fields, so the package failed to compile. Use a single build-time timestamp for every event in a snapshot, drop the fields that the pinned engine does not expose, and rely on per-entity SDL configuration for KeyHash population until the engine bump lands. Also wire proto/wg/cosmo/cacheevents into make generate-go so the router and graphqlmetrics gen dirs stay in sync with `make generate`.
Router image scan passed✅ No security vulnerabilities found in image: |
The graphqlmetrics CI workflow runs its own buf generate command rather than make generate-go, so the Makefile fix did not cover it. Add the new cacheevents proto path so CI does not delete the committed gen files.
There was a problem hiding this comment.
Claude Code Review
This repository is configured for manual code reviews. Comment @claude review to trigger a review and subscribe this PR to future pushes, or @claude review once for a one-time review.
Tip: disable this comment in your organization's Code Review settings.
✅ Actions performedReview triggered.
|
|
@claude review once |
There was a problem hiding this comment.
Code review is billed via overage credits. To resume reviews, an organization admin can raise the monthly limit at claude.ai/admin-settings/claude-code.
Once credits are available, comment @claude review on this pull request to trigger a review.
There was a problem hiding this comment.
Actionable comments posted: 9
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In @.github/workflows/graphqlmetrics-ci.yaml:
- Line 47: The CI workflow's pull_request path filters for the graphqlmetrics
job don't include changes under proto/wg/cosmo/cacheevents, so edits there won't
trigger the run that executes the command containing proto/wg/cosmo/cacheevents;
update the workflow's on.pull_request.paths to add the cacheevents proto path
(e.g., include proto/wg/cosmo/cacheevents/** or the specific proto files) so
changes to proto/wg/cosmo/cacheevents will trigger the job that runs the buf
generate command referencing proto/wg/cosmo/cacheevents.
In `@graphqlmetrics/cacheevents/service.go`:
- Around line 63-68: The current handler calls
s.processor.Push(BatchItem{Events: req.Msg.Events, Claims: claims}) and logs
s.logger.Warn on failure but still returns success; change this so that when
s.processor.Push returns an error (e.g., queue full or stopping) you
propagate/return a non-success error to the caller instead of acknowledging the
batch, so the router can retry/backoff; locate the Push call and surrounding
block in service.go and replace the log-only path with returning an appropriate
error (and still log context with zap.Error(err)) when Push fails.
- Around line 54-57: Replace the plain error return after calling
utils.GetClaims(ctx) so the handler returns a Connect unauthenticated error: use
connect.NewError(connect.CodeUnauthenticated, ...) (including the original err
or a descriptive message) instead of errNotAuthenticated; update the call site
that currently does "return nil, errNotAuthenticated" (the block using
utils.GetClaims and errNotAuthenticated) and add the connect package import if
missing.
In `@graphqlmetrics/cacheevents/writer.go`:
- Around line 43-84: The batch retry can create duplicate rows because
PrepareBatch/Send is retried; generate a single stable deduplication token
(e.g., UUID) once per logical batch before calling retry.Do and reuse that token
across retries, then include it in the ClickHouse INSERT by adding SETTINGS
insert_deduplicate=1 and insert_deduplication_token='...token...' to the query
used with w.conn.PrepareBatch (the insertCacheEventsRawQuery or its caller) so
that repeated batch.Send calls with the same token are deduplicated; ensure the
token is created outside the retry closure and passed into the PrepareBatch call
so appendCacheEventRow, items loop and retry logic remain unchanged.
In `@router-tests/entity_caching/cache_events_export_test.go`:
- Around line 102-107: The Eventually check is too weak (only checks len(events)
> 0); change the predicate to wait until all expected events are present by
comparing len(events) to the expected count used in the test setup (e.g.,
compute expectedEvents from the setup variables or constants) and return
len(events) == expectedEvents (or >= expectedEvents if duplicates/timing vary),
for example by replacing the closure passed to require.Eventually that calls
handler.snapshot() with one that checks equality against the test's
expectedEvents value so the periodic exporter test only proceeds once all items
have been exported.
In `@router/core/router.go`:
- Around line 986-991: The new cache events exporter is assigned to
r.cacheEventsExporter before bootstrap() completes, but if later bootstrap steps
fail its background goroutines keep running; update the bootstrap error paths to
perform partial-initialization cleanup by stopping/closing the exporter when
returning on error (mirror the pattern used elsewhere in router/core): after
setting r.cacheEventsExporter = ce, ensure any subsequent early returns call the
exporter's shutdown method (e.g. Close(), Stop(), or Shutdown() depending on the
cacheevents.Exporter API) and nil out r.cacheEventsExporter before returning;
also ensure Shutdown() (the top-level shutdown path) still works if the exporter
was already cleaned up.
- Around line 962-971: The code creates cache-events client even when both
r.entityCachingConfig.EventsExport.Endpoint and
r.graphqlMetricsConfig.CollectorEndpoint are empty, causing startup to succeed
but later fail; update the initialization near where
cacheeventsv1connect.NewCacheEventsServiceClient is called to validate the
resolved endpoint and fail fast: after computing endpoint (from
r.entityCachingConfig.EventsExport.Endpoint or fallback to
r.graphqlMetricsConfig.CollectorEndpoint) return an error or disable the
exporter if endpoint == "" and log a clear startup error rather than
constructing the client, ensuring the router startup surfaces the
invalid/missing endpoint immediately.
In `@router/internal/exporter/retry.go`:
- Around line 12-28: In IsRetryableConnectError, non-Connect errors are
currently treated as retryable which wrongly retries on context.Canceled and
context.DeadlineExceeded; update the function to explicitly short-circuit
(return false) when errors.Is(err, context.Canceled) or errors.Is(err,
context.DeadlineExceeded) before falling back to the connect.Error handling,
ensuring you import the context package and use errors.Is for comparison so
cancelled/expired contexts are not retried.
In `@router/internal/graphqlmetrics/graphql_metrics_sink.go`:
- Around line 26-30: NewGraphQLMetricsSink currently dereferences cfg.Logger via
cfg.Logger.With which panics if cfg.Logger is nil; update the constructor
(NewGraphQLMetricsSink) to guard cfg.Logger and use a fallback (e.g.
zap.NewNop()) before calling With so GraphQLMetricsSink.client and
GraphQLMetricsSink.logger are always set safely; locate the GraphQLMetricsSink
struct/constructor and replace the direct cfg.Logger.With call with logic that
sets logger := cfg.Logger; if logger == nil { logger = zap.NewNop() } then call
logger = logger.With(...) and assign to GraphQLMetricsSink.logger.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0098fdfb-4531-416f-a45c-f1f1f2605bc5
⛔ Files ignored due to path filters (4)
graphqlmetrics/gen/proto/wg/cosmo/cacheevents/v1/cacheevents.pb.gois excluded by!**/*.pb.go,!**/gen/**graphqlmetrics/gen/proto/wg/cosmo/cacheevents/v1/cacheeventsv1connect/cacheevents.connect.gois excluded by!**/gen/**router/gen/proto/wg/cosmo/cacheevents/v1/cacheevents.pb.gois excluded by!**/*.pb.go,!**/gen/**router/gen/proto/wg/cosmo/cacheevents/v1/cacheeventsv1connect/cacheevents.connect.gois excluded by!**/gen/**
📒 Files selected for processing (39)
.github/workflows/graphqlmetrics-ci.yamlMakefilecontrolplane/clickhouse/migrations/20260427120000_create_gql_cache_events.sqlgraphqlmetrics/cacheevents/processor.gographqlmetrics/cacheevents/processor_test.gographqlmetrics/cacheevents/schema.gographqlmetrics/cacheevents/schema_test.gographqlmetrics/cacheevents/service.gographqlmetrics/cacheevents/service_test.gographqlmetrics/cacheevents/writer.gographqlmetrics/cacheevents/writer_test.gographqlmetrics/cmd/main.gographqlmetrics/core/server.goproto/wg/cosmo/cacheevents/v1/cacheevents.protorouter-tests/entity_caching/cache_events_export_test.gorouter-tests/observability/graphql_metrics_test.gorouter/core/factoryresolver.gorouter/core/graph_server.gorouter/core/graphql_handler.gorouter/core/router.gorouter/core/router_config.gorouter/internal/cacheevents/aggregate.gorouter/internal/cacheevents/builder.gorouter/internal/cacheevents/builder_test.gorouter/internal/cacheevents/exporter.gorouter/internal/cacheevents/sink.gorouter/internal/cacheevents/sink_test.gorouter/internal/exporter/auth.gorouter/internal/exporter/auth_test.gorouter/internal/exporter/retry.gorouter/internal/exporter/retry_test.gorouter/internal/graphqlmetrics/graphql_exporter.gorouter/internal/graphqlmetrics/graphql_exporter_test.gorouter/internal/graphqlmetrics/graphql_metrics_sink.gorouter/internal/graphqlmetrics/graphql_metrics_sink_test.gorouter/pkg/config/config.gorouter/pkg/config/config.schema.jsonrouter/pkg/config/testdata/config_defaults.jsonrouter/pkg/config/testdata/config_full.json
💤 Files with no reviewable changes (1)
- router/internal/graphqlmetrics/graphql_exporter_test.go
…nd writer Adds tests for the previously uncovered surface of the raw event pipeline: - router/internal/cacheevents/aggregate_test.go — BuildRequest behaviour for nil/empty/populated batches, including the non-copy invariant. - router/internal/cacheevents/exporter_test.go — NewExporter constructs cleanly with valid settings and rejects invalid batch/queue sizes. - router/internal/cacheevents/builder_test.go — dedicated tests for cacheOpKindFromString, verdictFromKind, fetchSourceFromGoTools (incl. unknown values), OperationType lowercasing, the single-timestamp invariant, and field-level event mappings (ByteSize, TtlMs, IsShadow, CacheOpKind, etc.). Also pins the FIELD_HASH PII guard (KeyHash=0 with KeyRaw set must drop) and the SHADOW STALE/FRESH verdict mapping. - graphqlmetrics/cacheevents/writer_test.go — appendCacheEventRow column positions (Timestamp, OrgID, FedGraphID, OperationType, FieldPath), TimestampUnixNano fallback to insertTime, CacheOpKind precedence over the legacy CacheOp string, and ProcessBatch lifecycle (empty input, nil claims, nil events, cross-item rows in one batch).
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
router/core/router.go (1)
989-994:⚠️ Potential issue | 🟠 MajorClean up
cacheEventsExporteron later bootstrap failures.After
r.cacheEventsExporter = ce,bootstrap()still has several error exits below this block. If any of them fail, the exporter goroutine keeps running even though startup failed, because it is only stopped fromShutdown(). Please mirror the existing partial-initialization cleanup pattern here and nil out the field after a cleanup shutdown.Based on learnings: in
router/core, ensure error paths clean up partially initialized resources before returning.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@router/core/router.go` around lines 989 - 994, The cache events exporter created by cacheevents.NewExporter is assigned to r.cacheEventsExporter inside bootstrap(), but on subsequent bootstrap errors the exporter goroutine continues running; change the error paths after r.cacheEventsExporter = ce to call the same shutdown/cleanup used elsewhere (invoke the exporter shutdown/Stop method or the same cleanup helper used for partial-initialized resources), wait for it to finish, and then set r.cacheEventsExporter = nil before returning the error so the partially-initialized resource is cleaned up (mirror the existing partial-initialization cleanup pattern used in bootstrap() and ensure Shutdown() semantics are preserved).
🧹 Nitpick comments (1)
graphqlmetrics/cacheevents/service.go (1)
75-78: ⚡ Quick winSurface drain failures during shutdown.
StopAndWaitcan fail on timeout/cancellation, but this method drops that signal entirely. If queued cache events are abandoned during shutdown, operators currently get no indication that the drain was incomplete.Suggested fix
func (s *Service) Shutdown(timeout time.Duration) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - _ = s.processor.StopAndWait(ctx) + if err := s.processor.StopAndWait(ctx); err != nil { + s.logger.Warn("Cache events processor did not drain cleanly on shutdown", zap.Error(err)) + } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@graphqlmetrics/cacheevents/service.go` around lines 75 - 78, The Shutdown method currently drops the error from s.processor.StopAndWait and thus hides drain failures; change Shutdown to return an error (e.g. func (s *Service) Shutdown(timeout time.Duration) error), call ctx, cancel := context.WithTimeout(...), call err := s.processor.StopAndWait(ctx), and if err != nil return a wrapped error (or at least log it) so callers/operators are informed of timeout/cancellation failures during drain; keep the context cancel and ensure the returned error includes context that Shutdown on Service and StopAndWait failed.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@graphqlmetrics/cacheevents/writer.go`:
- Around line 70-73: The appendCacheEventRow errors are currently logged and
skipped, which allows ProcessBatch to proceed with a partial batch; change the
behavior in the loop that calls appendCacheEventRow so that on the first error
you return that error (propagate it up) instead of logging and continuing.
Locate the loop that calls appendCacheEventRow(batch, insertTime,
item.Claims.OrganizationID, item.Claims.FederatedGraphID, ev) and replace the
continue path with an immediate return of the error (e.g., return err) so
ProcessBatch fails fast and lets the existing retry/ack logic handle retries;
keep the existing zap.Error logging if you want but ensure you also return the
error from the enclosing function.
In `@router/core/router.go`:
- Around line 969-974: The client construction in the router (around the
CacheEventsService client creation) always adds
exporter.WithBearerAuth(r.graphApiToken) even when r.graphApiToken is empty,
causing an empty "Authorization: Bearer " header; fix by building the options
slice/varargs for cacheeventsv1connect.NewCacheEventsServiceClient (the call
that creates ceClient) and only append exporter.WithBearerAuth(r.graphApiToken)
when r.graphApiToken != "" (leave connect.WithSendGzip() and other required
options always present), then pass that options slice to
NewCacheEventsServiceClient so no Authorization header is sent for anonymous
collectors.
---
Duplicate comments:
In `@router/core/router.go`:
- Around line 989-994: The cache events exporter created by
cacheevents.NewExporter is assigned to r.cacheEventsExporter inside bootstrap(),
but on subsequent bootstrap errors the exporter goroutine continues running;
change the error paths after r.cacheEventsExporter = ce to call the same
shutdown/cleanup used elsewhere (invoke the exporter shutdown/Stop method or the
same cleanup helper used for partial-initialized resources), wait for it to
finish, and then set r.cacheEventsExporter = nil before returning the error so
the partially-initialized resource is cleaned up (mirror the existing
partial-initialization cleanup pattern used in bootstrap() and ensure Shutdown()
semantics are preserved).
---
Nitpick comments:
In `@graphqlmetrics/cacheevents/service.go`:
- Around line 75-78: The Shutdown method currently drops the error from
s.processor.StopAndWait and thus hides drain failures; change Shutdown to return
an error (e.g. func (s *Service) Shutdown(timeout time.Duration) error), call
ctx, cancel := context.WithTimeout(...), call err :=
s.processor.StopAndWait(ctx), and if err != nil return a wrapped error (or at
least log it) so callers/operators are informed of timeout/cancellation failures
during drain; keep the context cancel and ensure the returned error includes
context that Shutdown on Service and StopAndWait failed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: f7bf28f7-1bd8-4aa7-bfa7-3acf7ed7a1ae
📒 Files selected for processing (10)
.github/workflows/graphqlmetrics-ci.yamlcontrolplane/clickhouse/migrations/20260427120000_create_gql_cache_events.sqlgraphqlmetrics/cacheevents/service.gographqlmetrics/cacheevents/writer.gographqlmetrics/cacheevents/writer_test.gorouter-tests/entity_caching/cache_events_export_test.gorouter/core/router.gorouter/internal/cacheevents/aggregate_test.gorouter/internal/cacheevents/builder_test.gorouter/internal/cacheevents/exporter_test.go
✅ Files skipped from review due to trivial changes (1)
- router/internal/cacheevents/aggregate_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- controlplane/clickhouse/migrations/20260427120000_create_gql_cache_events.sql
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
dbmate's ClickHouse driver rejects multi-statement files with "Multi-statements are not allowed", causing make migrate to fail. Reduce the migration to just the gql_cache_events_raw table; the rolled-up SummingMergeTree tables and materialized views can be added later in their own per-statement migration files (the repo convention). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ents_raw schema FieldName, FieldHash, FieldPath, EntityCount, EntityUniqueKeys live at columns 16–20 in the migration but were appended at positions 40–44, so the positional batch insert silently shifted every column. The driver hit FieldPath (Array) at the wrong position with a uint32 and failed with "converting reflect.Value to Array is unsupported", dropping every batch on retry. Reorder the Append call to match the schema. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This PR has the following
Summary by CodeRabbit
New Features
Chores
Checklist
Open Source AI Manifesto
This project follows the principles of the Open Source AI Manifesto. Please ensure your contribution aligns with its principles.