Skip to content

Commit 072b739

Browse files
committed
Merge branch 'main' into brian/streamline_cds_move_legacy_components
2 parents 723f940 + dd52cfe commit 072b739

File tree

9 files changed

+90
-4
lines changed

9 files changed

+90
-4
lines changed

airbyte_cdk/manifest_server/api_models/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
DiscoverResponse,
1313
FullResolveRequest,
1414
ManifestResponse,
15+
RequestContext,
1516
ResolveRequest,
1617
StreamTestReadRequest,
1718
)
@@ -30,6 +31,7 @@
3031
"ConnectorConfig",
3132
"Manifest",
3233
# Manifest request/response models
34+
"RequestContext",
3335
"FullResolveRequest",
3436
"ManifestResponse",
3537
"StreamTestReadRequest",

airbyte_cdk/manifest_server/api_models/manifest.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@
1313
from .dicts import ConnectorConfig, Manifest
1414

1515

16+
class RequestContext(BaseModel):
17+
"""Optional context information for tracing and observability."""
18+
19+
workspace_id: Optional[str] = None
20+
project_id: Optional[str] = None
21+
22+
1623
class StreamTestReadRequest(BaseModel):
1724
"""Request to test read from a specific stream."""
1825

@@ -24,13 +31,15 @@ class StreamTestReadRequest(BaseModel):
2431
record_limit: int = Field(default=100, ge=1, le=5000)
2532
page_limit: int = Field(default=5, ge=1, le=20)
2633
slice_limit: int = Field(default=5, ge=1, le=20)
34+
context: Optional[RequestContext] = None
2735

2836

2937
class CheckRequest(BaseModel):
3038
"""Request to check a manifest."""
3139

3240
manifest: Manifest
3341
config: ConnectorConfig
42+
context: Optional[RequestContext] = None
3443

3544

3645
class CheckResponse(BaseModel):
@@ -45,6 +54,7 @@ class DiscoverRequest(BaseModel):
4554

4655
manifest: Manifest
4756
config: ConnectorConfig
57+
context: Optional[RequestContext] = None
4858

4959

5060
class DiscoverResponse(BaseModel):
@@ -57,6 +67,7 @@ class ResolveRequest(BaseModel):
5767
"""Request to resolve a manifest."""
5868

5969
manifest: Manifest
70+
context: Optional[RequestContext] = None
6071

6172

6273
class ManifestResponse(BaseModel):
@@ -71,3 +82,4 @@ class FullResolveRequest(BaseModel):
7182
manifest: Manifest
7283
config: ConnectorConfig
7384
stream_limit: int = Field(default=100, ge=1, le=100)
85+
context: Optional[RequestContext] = None

airbyte_cdk/manifest_server/api_models/stream.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
fixing type mismatches like slice_descriptor being a string rather than an object.
77
"""
88

9-
from typing import Any, Dict, List, Optional
9+
from typing import Any, Dict, List, Optional, Union
1010

1111
from pydantic import BaseModel
1212

@@ -59,7 +59,7 @@ class StreamReadSlices(BaseModel):
5959
"""Slices of data read from a stream."""
6060

6161
pages: List[StreamReadPages]
62-
slice_descriptor: Optional[str] # This is actually a string at runtime, not Dict[str, Any]
62+
slice_descriptor: Optional[Union[Dict[str, Any], str]] # We're seeing strings at runtime
6363
state: Optional[List[Dict[str, Any]]] = None
6464
auxiliary_requests: Optional[List[AuxiliaryRequest]] = None
6565

airbyte_cdk/manifest_server/helpers/__init__.py

Whitespace-only changes.
File renamed without changes.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import logging
2+
from typing import Optional
3+
4+
import ddtrace
5+
6+
logger = logging.getLogger(__name__)
7+
8+
9+
def apply_trace_tags_from_context(
10+
workspace_id: Optional[str] = None,
11+
project_id: Optional[str] = None,
12+
) -> None:
13+
"""Apply trace tags from context to the current span."""
14+
if not workspace_id and not project_id:
15+
return
16+
17+
# Log the trace IDs for observability
18+
log_parts = []
19+
if workspace_id:
20+
log_parts.append(f"workspace_id={workspace_id}")
21+
if project_id:
22+
log_parts.append(f"project_id={project_id}")
23+
24+
if log_parts:
25+
logger.info(f"Processing request with trace tags: {', '.join(log_parts)}")
26+
27+
try:
28+
span = ddtrace.tracer.current_span()
29+
if span:
30+
if workspace_id:
31+
span.set_tag("workspace_id", workspace_id)
32+
if project_id:
33+
span.set_tag("project_id", project_id)
34+
except Exception:
35+
# Silently ignore any ddtrace-related errors (e.g. if ddtrace.auto wasn't run)
36+
pass

airbyte_cdk/manifest_server/routers/manifest.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@
2727
StreamReadResponse,
2828
StreamTestReadRequest,
2929
)
30-
from ..auth import verify_jwt_token
3130
from ..command_processor.processor import ManifestCommandProcessor
3231
from ..command_processor.utils import build_catalog, build_source
32+
from ..helpers.auth import verify_jwt_token
33+
from ..helpers.tracing import apply_trace_tags_from_context
3334

3435

3536
def safe_build_source(
@@ -68,6 +69,13 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse:
6869
"""
6970
Test reading from a specific stream in the manifest.
7071
"""
72+
# Apply trace tags from context if provided
73+
if request.context:
74+
apply_trace_tags_from_context(
75+
workspace_id=request.context.workspace_id,
76+
project_id=request.context.project_id,
77+
)
78+
7179
config_dict = request.config.model_dump()
7280

7381
catalog = build_catalog(request.stream_name)
@@ -104,6 +112,13 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse:
104112
@router.post("/check", operation_id="check")
105113
def check(request: CheckRequest) -> CheckResponse:
106114
"""Check configuration against a manifest"""
115+
# Apply trace tags from context if provided
116+
if request.context:
117+
apply_trace_tags_from_context(
118+
workspace_id=request.context.workspace_id,
119+
project_id=request.context.project_id,
120+
)
121+
107122
source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
108123
runner = ManifestCommandProcessor(source)
109124
success, message = runner.check_connection(request.config.model_dump())
@@ -113,6 +128,13 @@ def check(request: CheckRequest) -> CheckResponse:
113128
@router.post("/discover", operation_id="discover")
114129
def discover(request: DiscoverRequest) -> DiscoverResponse:
115130
"""Discover streams from a manifest"""
131+
# Apply trace tags from context if provided
132+
if request.context:
133+
apply_trace_tags_from_context(
134+
workspace_id=request.context.workspace_id,
135+
project_id=request.context.project_id,
136+
)
137+
116138
source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
117139
runner = ManifestCommandProcessor(source)
118140
catalog = runner.discover(request.config.model_dump())
@@ -124,6 +146,13 @@ def discover(request: DiscoverRequest) -> DiscoverResponse:
124146
@router.post("/resolve", operation_id="resolve")
125147
def resolve(request: ResolveRequest) -> ManifestResponse:
126148
"""Resolve a manifest to its final configuration."""
149+
# Apply trace tags from context if provided
150+
if request.context:
151+
apply_trace_tags_from_context(
152+
workspace_id=request.context.workspace_id,
153+
project_id=request.context.project_id,
154+
)
155+
127156
source = safe_build_source(request.manifest.model_dump(), {})
128157
return ManifestResponse(manifest=Manifest(**source.resolved_manifest))
129158

@@ -135,6 +164,13 @@ def full_resolve(request: FullResolveRequest) -> ManifestResponse:
135164
136165
This is a similar operation to resolve, but has an extra step which generates streams from dynamic stream templates if the manifest contains any. This is used when a user clicks the generate streams button on a stream template in the Builder UI
137166
"""
167+
# Apply trace tags from context if provided
168+
if request.context:
169+
apply_trace_tags_from_context(
170+
workspace_id=request.context.workspace_id,
171+
project_id=request.context.project_id,
172+
)
173+
138174
source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
139175
manifest = {**source.resolved_manifest}
140176
streams = manifest.get("streams", [])

unit_tests/manifest_server/helpers/__init__.py

Whitespace-only changes.

unit_tests/manifest_server/test_auth.py renamed to unit_tests/manifest_server/helpers/test_auth.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from fastapi import HTTPException
88
from fastapi.security import HTTPAuthorizationCredentials
99

10-
from airbyte_cdk.manifest_server.auth import verify_jwt_token
10+
from airbyte_cdk.manifest_server.helpers.auth import verify_jwt_token
1111

1212

1313
class TestVerifyJwtToken:

0 commit comments

Comments
 (0)