Conversation
Adds an opt-in `since` parameter to extract_pull_requests as the first slice toward incremental daily snapshots (pulling only PRs changed since the last run rather than re-pulling every PR every day). When `since` is set, PRs are fetched sorted by updated descending, only PRs with updated_at >= since are yielded, pagination stops at the first older PR, and the per-PR commit/review/comment sub-fetches run only for the kept PRs. When `since` is None (default), behavior is unchanged (created/asc, full pagination), so this lands with no impact on the current pipeline. Timestamp comparison parses GitHub ISO-8601 values into aware datetimes (via a new _parse_github_timestamp helper) to avoid fractional-second string-compare bugs; PRs with a missing/unparseable updated_at are kept and do not trigger the watermark stop. Adds unit tests covering default vs incremental sort params, watermark early-stop, inclusive boundary, sub-fetch filtering, and missing updated_at handling. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Second slice toward incremental daily snapshots. Adds two BigQuery helpers that the incremental orchestration will use (nothing calls them yet, so pipeline behavior is unchanged): - get_prior_snapshot_watermark: one aggregate query returning the most recent prior snapshot_date (as a YYYY-MM-DD string) and the MAX(date_modified) watermark (aware UTC datetime) for a repo. Returns (None, None) when there is no prior snapshot or the table is missing (first run); re-raises on a missing dataset, mirroring snapshot_exists. - carry_forward_snapshot: per-table INSERT...SELECT that re-stamps a prior snapshot's rows with today's snapshot_date across all four tables, giving today a complete baseline to overlay changed PRs onto. Documents the ordering contract (callers must delete_existing_snapshot for today first so same-day re-runs stay idempotent). Adds a module-level _TABLE_COLUMNS constant (ordered data columns per table, from data.yml) so the carry-forward statements avoid SELECT *; it will also back the reconcile delta in the next slice. Adds tests/test_snapshot_helpers.py covering both helpers (params, SQL shape, no-prior-data, missing-table vs missing-dataset, null-watermark, per-table INSERT...SELECT, and a pull_requests column-list guard). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…pshot Third slice toward incremental daily snapshots. Adds reconcile_delta, which takes the freshly fetched-and-transformed data for only the PRs that changed since the last run and replaces those PRs' carried-forward rows with the new data across all four tables. Nothing calls it yet, so pipeline behavior is unchanged. Approach (delete-by-PR + reuse load_data, no physical staging table): - The set of changed pull_request_ids (from the delta's pull_requests rows) is the reconcile key for every table, so a changed PR's stale child rows (e.g. a removed comment) are deleted even when the delta now has zero child rows for that PR. - DELETE ... WHERE pull_request_id IN UNNEST(@pr_ids) removes those PRs' rows from today's snapshot for this repo, then load_data inserts the fresh delta (stamping snapshot_date and honoring the streaming toggle). - New PRs created since the watermark fall through the same path: their delete matches nothing and their insert adds them. - DELETE runs before INSERT and only touches carry_forward_snapshot's managed-storage rows, so there is no streaming-buffer conflict. An empty delta is a no-op (the carried-forward snapshot already stands). Adds tests/test_reconcile_delta.py covering the no-op short-circuit, per-table DELETE ... IN UNNEST, param binding (including the pr_ids array param), id dedupe/None filtering, and the load_data hand-off. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Final slice: process_repo now chooses per repo between a full export and incremental sync, turning the daily run from "re-pull every PR" into "carry forward yesterday's snapshot and overlay only changed PRs". Per repo: - Look up the most recent prior snapshot and its MAX(date_modified) watermark (get_prior_snapshot_watermark), then clear any partial rows for today (existing re-run safety). - Full export (first run / new repo / no usable watermark, or when GITHUB_ETL_FULL_REFRESH is set): the existing chunk-streaming extract -> transform -> load_data loop, kept memory-bounded for large backfills. - Incremental: carry_forward_snapshot copies the prior snapshot into today, then extract_pull_requests(since=watermark-lookback) fetches only changed PRs, which are accumulated and reconcile_delta'd once. New configuration: - GITHUB_ETL_LOOKBACK_HOURS (default 24): buffer subtracted from the watermark to absorb eventual-consistency lag and PRs whose updated_at wasn't bumped. - GITHUB_ETL_FULL_REFRESH: force a full export for all repos (periodic drift correction / manual rebuild). Also hardens get_prior_snapshot_watermark with an empty-result guard so a mocked/empty client yields (None, None) instead of IndexError. Adds tests/test_process_repo.py (full vs incremental branching, since computation, delta accumulation) and _resolve_lookback_hours / _env_flag tests in test_main.py. Existing full-flow tests still pass via the full path (MagicMock client -> no prior snapshot). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Verified slices 2-4 end-to-end against the goccy BigQuery emulator and fixed two analyzer incompatibilities the mock-based unit tests could not catch. Both changes are no-ops against production BigQuery: - carry_forward_snapshot: CAST(@snapshot_date AS DATE) in the INSERT...SELECT projection. The emulator infers a DATE-typed parameter as STRING and rejects inserting it into the DATE snapshot_date column. - reconcile_delta: inline the validated integer pull_request_ids as literals (pull_request_id IN (1, 2, ...)) instead of an ArrayQueryParameter with IN UNNEST(@pr_ids). The emulator types an array param's elements as STRING, so IN UNNEST fails to match the INT64 column. The ids are ints (coerced via int()), so inlining is injection-safe. Updates test_reconcile_delta.py to assert the inlined IN list. Verification (manual, against `docker compose up bigquery-emulator`): seed a prior-day snapshot, carry forward, reconcile a delta of one changed PR plus one new PR, and confirm the changed PR is replaced (no duplicate), the untouched PR's carried-forward row remains, the new PR is added, and child rows (comments) are correspondingly replaced/retained. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR introduces an incremental sync path intended to avoid re-pulling all pull requests daily per repo by carrying forward the prior snapshot in BigQuery and reconciling only PRs changed since a stored watermark.
Changes:
- Add incremental extraction support to
extract_pull_requests()via asincewatermark (updated/desc fetch + early stop at watermark). - Add BigQuery snapshot helpers: look up prior snapshot + watermark, carry forward prior snapshot, and reconcile a changed-PR delta into today’s snapshot.
- Add env-configurable incremental knobs (
GITHUB_ETL_LOOKBACK_HOURS,GITHUB_ETL_FULL_REFRESH) and comprehensive tests for the new behavior.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
main.py |
Implements incremental strategy (watermark lookup, carry-forward, delta reconcile), env configuration, and incremental pull extraction. |
tests/test_extract_pull_requests.py |
Adds coverage for incremental extraction sort/stop/filter behavior. |
tests/test_snapshot_helpers.py |
Adds tests for prior snapshot watermark lookup and carry-forward SQL/params. |
tests/test_reconcile_delta.py |
Adds tests for delete+reload reconciliation logic and parameter binding. |
tests/test_process_repo.py |
Adds tests for strategy selection (full vs incremental) and delta accumulation across chunks. |
tests/test_main.py |
Adds tests for env parsing helpers (_resolve_lookback_hours, _env_flag). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Pin the carry-forward watermark to the same prior snapshot via a CTE so MAX(date_modified) can no longer be drawn from a different snapshot than MAX(snapshot_date), which could skip changed PRs. - Treat an unparseable `since` as "not provided": warn and fall back to a full created/asc fetch instead of switching to updated/desc with the watermark filter disabled. - Log the actual full-export reason when a prior snapshot exists but has no usable watermark, rather than "no prior snapshot". Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
shtrom
left a comment
There was a problem hiding this comment.
I think we should use the issues API endpoint, to filter based on the watermark. It also return PRs (with a pull_request key), and has a since query parameter that does exactly what we need.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
- Lower default lookback from 24h to 1h: the watermark looks before data we already have, so the buffer only needs to cover same-second boundary misses. - Force watermark to None in the full-export branch so a stale value cannot leak past the fork. - Drop the now-unneeded Z->+00:00 normalization in _parse_github_timestamp (fromisoformat accepts a trailing Z on the 3.11+ runtime we target). - Batch reconcile DELETE ids via itertools.batched so the inlined IN (...) list stays bounded regardless of daily PR volume. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…test
The "Potential fix for pull request finding" autofix removed the
`with patch("main.load_data"):` line but left its body indented, producing
an IndentationError. Restore the context manager to match the sibling tests.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Although a good idea it might be better to look at this later if these new changes do not work as this would require significant change and issues API is missing some data that we need. We would need to still call pulls API for each pull request changed to get the missing data. |
Co-authored-by: Olivier Mehani <shtrom-github@ssji.net>
No description provided.