- 
                Notifications
    You must be signed in to change notification settings 
- Fork 74
airbyte pull more in a single page #1180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| WalkthroughAdded pagination (pageSize=100) and sorting (sortKey) parameters to list calls for sources, destinations, and web backend connections in airbyte_service.py, keeping existing response handling unchanged. Changes
 Sequence Diagram(s)sequenceDiagram
  autonumber
  participant Client
  participant AirbyteService
  participant AirbyteAPI as Airbyte API
  rect rgb(240,248,255)
  note right of AirbyteService: get_sources
  Client->>AirbyteService: get_sources(workspaceId)
  AirbyteService->>AirbyteAPI: POST /sources/list {workspaceId, pageSize:100, sortKey:"actorName_asc"}
  AirbyteAPI-->>AirbyteService: {sources:[...]}
  AirbyteService-->>Client: sources or []
  end
  rect rgb(245,255,240)
  note right of AirbyteService: get_destinations
  Client->>AirbyteService: get_destinations(workspaceId)
  AirbyteService->>AirbyteAPI: POST /destinations/list {workspaceId, pageSize:100, sortKey:"actorName_asc"}
  AirbyteAPI-->>AirbyteService: {destinations:[...]}
  AirbyteService-->>Client: destinations or []
  end
  rect rgb(255,250,240)
  note right of AirbyteService: get_webbackend_connections
  Client->>AirbyteService: get_webbackend_connections(workspaceId)
  AirbyteService->>AirbyteAPI: POST /web_backend/connections/list {workspaceId, pageSize:100, sortKey:"connectionName_asc"}
  AirbyteAPI-->>AirbyteService: {connections:[...]}
  AirbyteService-->>Client: connections or []
  end
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Pre-merge checks and finishing touches✅ Passed checks (3 passed)
 ✨ Finishing touches
 🧪 Generate unit tests (beta)
 📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro Disabled knowledge base sources: 
 
 📒 Files selected for processing (1)
 🚧 Files skipped from review as they are similar to previous changes (1)
 ⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment  | 
| Codecov Report❌ Patch coverage is  
 Additional details and impacted files@@           Coverage Diff           @@
##             main    #1180   +/-   ##
=======================================
  Coverage   52.53%   52.53%           
=======================================
  Files          97       97           
  Lines       11463    11463           
=======================================
  Hits         6022     6022           
  Misses       5441     5441           ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
 | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (1)
ddpui/ddpairbyte/airbyte_service.py (1)
668-678: Implement pagination for connection listing.Both
get_connectionsandget_webbackend_connectionscall the sameconnections/listendpoint without pagination, andget_webbackend_connectionseven includes a# TODO: move this to paginated apis. If these endpoints can return large result sets, add pagination parameters (e.g.pageSize,pageToken) to both functions.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
- ddpui/ddpairbyte/airbyte_service.py(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: checks (3.10, 6)
| res = abreq( | ||
| "sources/list", {"workspaceId": workspace_id, "pageSize": 100, "sortKey": "actorName_asc"} | ||
| ) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed page size can cause silent data truncation.
Setting pageSize: 100 without pagination logic means workspaces with more than 100 sources will return incomplete data. This could lead to sources being invisible in the UI or missing from operations.
Additionally, get_destinations and get_webbackend_connections have TODO comments indicating these should be moved to paginated APIs, but this function doesn't. Consider adding a similar TODO comment for consistency, or implement proper pagination that fetches all pages.
If immediate pagination implementation isn't feasible, at minimum:
- Add a TODO comment for tracking
- Document the 100-item limitation in the function docstring
- Consider logging a warning when exactly 100 items are returned (likely indicates truncation)
For a complete fix, implement pagination:
 def get_sources(workspace_id: str) -> List[Dict]:
     """Fetch all sources in an airbyte workspace"""
     if not isinstance(workspace_id, str):
         raise HttpError(400, "Invalid workspace ID")
 
+    all_sources = []
+    page_size = 100
+    offset = 0
+    
+    while True:
-    res = abreq(
-        "sources/list", {"workspaceId": workspace_id, "pageSize": 100, "sortKey": "actorName_asc"}
-    )
+        res = abreq(
+            "sources/list", 
+            {
+                "workspaceId": workspace_id, 
+                "pageSize": page_size, 
+                "sortKey": "actorName_asc",
+                "offset": offset
+            }
+        )
-    if "sources" not in res:
-        logger.error("Sources not found for workspace: %s", workspace_id)
-        raise HttpError(404, "sources not found for workspace")
-    return res
+        if "sources" not in res:
+            logger.error("Sources not found for workspace: %s", workspace_id)
+            raise HttpError(404, "sources not found for workspace")
+        
+        all_sources.extend(res["sources"])
+        
+        # Break if we got fewer items than page size (last page)
+        if len(res["sources"]) < page_size:
+            break
+            
+        offset += page_size
+    
+    return {"sources": all_sources}Note: Verify if the Airbyte API uses offset or a different pagination mechanism (e.g., pageToken).
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| res = abreq( | |
| "sources/list", {"workspaceId": workspace_id, "pageSize": 100, "sortKey": "actorName_asc"} | |
| ) | |
| def get_sources(workspace_id: str) -> List[Dict]: | |
| """Fetch all sources in an airbyte workspace""" | |
| if not isinstance(workspace_id, str): | |
| raise HttpError(400, "Invalid workspace ID") | |
| all_sources = [] | |
| page_size = 100 | |
| offset = 0 | |
| while True: | |
| res = abreq( | |
| "sources/list", | |
| { | |
| "workspaceId": workspace_id, | |
| "pageSize": page_size, | |
| "sortKey": "actorName_asc", | |
| "offset": offset | |
| } | |
| ) | |
| if "sources" not in res: | |
| logger.error("Sources not found for workspace: %s", workspace_id) | |
| raise HttpError(404, "sources not found for workspace") | |
| all_sources.extend(res["sources"]) | |
| # Break if we got fewer items than page size (last page) | |
| if len(res["sources"]) < page_size: | |
| break | |
| offset += page_size | |
| return {"sources": all_sources} | 
🤖 Prompt for AI Agents
In ddpui/ddpairbyte/airbyte_service.py around lines 270 to 272, the call to
abreq uses a fixed pageSize=100 which can silently truncate results for
workspaces with >100 sources; either implement proper pagination to iterate all
pages according to Airbyte's pagination scheme (confirm whether it uses
offset/page/limit or pageToken and accumulate results), or at minimum add a TODO
noting pagination is required, update the function docstring to document the
100-item limitation, and add a warning log when the API returns exactly 100
items (indicating likely truncation) so truncation is detectable in logs.
| # TODO: move this to paginated apis | ||
| res = abreq( | ||
| "destinations/list", | ||
| {"workspaceId": workspace_id, "pageSize": 100, "sortKey": "actorName_asc"}, | ||
| ) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same pagination limitation applies here.
Like get_sources, this has the same critical issue where workspaces with more than 100 destinations will return incomplete data. The TODO comment acknowledges this is temporary, but ensure this is tracked in your issue tracker to prevent it from being forgotten.
Consider applying the same pagination solution as suggested for get_sources, adapting it for destinations. Also verify if the Airbyte API returns pagination metadata (like hasNext or totalCount) that could help implement robust pagination.
| # TODO: move this to paginated apis | ||
| res = abreq( | ||
| "web_backend/connections/list", | ||
| {"workspaceId": workspace_id, "pageSize": 100, "sortKey": "connectionName_asc"}, | ||
| ) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pagination limitation in web backend connections.
This function has the same critical issue: workspaces with more than 100 connections will have incomplete data. The sort key connectionName_asc is appropriate for connections (vs actorName_asc for sources/destinations).
Note that unlike the other functions, this returns res["connections"] directly rather than the full response object. When implementing proper pagination, ensure the return type remains consistent.
🤖 Prompt for AI Agents
In ddpui/ddpairbyte/airbyte_service.py around lines 686-690, the code only
fetches the first 100 connections which drops data for workspaces with >100
connections; update this to call the "web_backend/connections/list" endpoint in
a paginated loop using pageSize (keep 100) and the pageToken/next page mechanism
returned by the API while preserving workspaceId and
sortKey="connectionName_asc", accumulate all res["connections"] across pages and
return the combined list (keep the current return type of res["connections"]);
ensure the loop stops when no next page token is returned and propagate any API
errors as before.
Summary by CodeRabbit