Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ http = "1.3.1"
http-body-util = "0.1.3"
hyper = { version = "1.7.0", features = ["full"] }
hyper-util = { version = "0.1.17", features = ["full"] }
indexmap = "2.13.0"
metrics = "0.24.2"
metrics-exporter-statsd = "0.9.0"
reqwest = { version = "0.12.23", features = ["json"] }
Expand Down
1 change: 1 addition & 0 deletions ingest-router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ http = { workspace = true }
http-body-util = { workspace = true }
hyper = { workspace = true }
hyper-util = { workspace = true }
indexmap = { workspace = true }
locator = { path = "../locator" }
metrics = { workspace = true }
reqwest = { workspace = true }
Expand Down
1 change: 0 additions & 1 deletion ingest-router/src/api/any_cell_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ impl Handler for AnyCellHandler {
// Send the request to all cells
let cell_requests = cells
.cell_list()
.iter()
.map(|cell_id| {
let req = Request::from_parts(parts.clone(), body.clone());
(cell_id.clone(), req)
Expand Down
102 changes: 96 additions & 6 deletions ingest-router/src/api/project_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
//!
//! - **`configs`**: Map of public keys to their project configurations
//! - Configs are passed through unchanged from upstream Sentry instances
//! - They will add the relevant processing relay URL in the response
//! - The ingest router adds the `upstreamRelayUrl` field to each config based on the cell it came from
//! - **`pending`**: Array of public keys for which configs are being computed asynchronously
//! - Relay should retry the request later to fetch these configs
//! - Also used when upstreams fail/timeout (graceful degradation)
Expand Down Expand Up @@ -96,8 +96,8 @@
//!
//! ### Configs (HashMap merge)
//! - Merge all `configs` HashMaps from all upstreams
//! - Configs are passed through unchanged (no modifications)
//! - relay_url is expected to be added in the upstream response
//! - Add `upstreamRelayUrl` to each config based on which cell it came from
//! - Other config fields are passed through unchanged from upstream
//!
//! ### Pending (Array concatenation)
//! - Concatenate all `pending` arrays from all upstream responses
Expand Down Expand Up @@ -297,6 +297,8 @@ use serde_json::Value as JsonValue;
use shared::http::make_error_response;
use std::collections::HashMap;

const UPSTREAM_RELAY_FIELD: &str = "upstreamRelayUrl";

/// Request format for the relay project configs endpoint.
///
/// # Example
Expand Down Expand Up @@ -378,6 +380,8 @@ struct ProjectConfigsMetadata {
cell_to_keys: HashMap<CellId, Vec<String>>,
// keys that couldn't be assigned to any cell
unassigned_keys: Vec<String>,
// mapping from cell_id to relay_url for adding upstreamRelayUrl to configs
cell_to_relay_url: HashMap<CellId, String>,
}

/// Handler for the Relay Project Configs endpoint
Expand Down Expand Up @@ -441,6 +445,19 @@ impl Handler for ProjectConfigsHandler {
}
}

// Build cell_to_relay_url mapping for adding upstreamRelayUrl to configs during merge
let mut cell_to_relay_url: HashMap<CellId, String> = HashMap::new();
for cell_id in cell_to_keys.keys() {
if let Some(upstream) = cells.get_upstream(cell_id) {
cell_to_relay_url.insert(cell_id.clone(), upstream.relay_url.to_string());
} else {
tracing::warn!(
cell_id = %cell_id,
"Cell has keys but no upstream configured; upstreamRelayUrl will not be added to configs"
);
}
}

let cell_requests = cell_to_keys
.iter()
.map(|(cell_id, keys)| {
Expand All @@ -458,6 +475,7 @@ impl Handler for ProjectConfigsHandler {
let metadata = Box::new(ProjectConfigsMetadata {
cell_to_keys,
unassigned_keys: pending,
cell_to_relay_url,
});
Ok((cell_requests, metadata))
}
Expand Down Expand Up @@ -511,7 +529,24 @@ impl Handler for ProjectConfigsHandler {
}

if let Ok(parsed) = deserialize_body::<ProjectConfigsResponse>(body) {
merged.project_configs.extend(parsed.project_configs);
let relay_url = meta.cell_to_relay_url.get(&cell_id);

// Add upstreamRelayUrl to each config from this cell
let mut configs_with_relay_url = HashMap::new();
for (key, mut config) in parsed.project_configs {
// Add upstreamRelayUrl to the config if we have it
if let (Some(relay_url_str), Some(config_obj)) =
(relay_url, config.as_object_mut())
{
config_obj.insert(
UPSTREAM_RELAY_FIELD.to_string(),
serde_json::Value::String(relay_url_str.clone()),
);
}
configs_with_relay_url.insert(key, config);
}

merged.project_configs.extend(configs_with_relay_url);
merged.extra_fields.extend(parsed.extra_fields);
merged.pending_keys.extend(parsed.pending_keys);
} else {
Expand Down Expand Up @@ -729,13 +764,40 @@ mod tests {
("us2".to_string(), Ok(response2)),
];

let metadata: SplitMetadata = Box::new(Vec::<String>::new());
let metadata: SplitMetadata = Box::new(ProjectConfigsMetadata {
cell_to_keys: HashMap::from([
("us1".to_string(), vec!["key1".to_string()]),
("us2".to_string(), vec!["key2".to_string()]),
]),
unassigned_keys: Vec::new(),
cell_to_relay_url: HashMap::from([
("us1".to_string(), "http://relay-us1:8090".to_string()),
("us2".to_string(), "http://relay-us2:8090".to_string()),
]),
});
let merged = handler.merge_responses(results, metadata).await;

let parsed: ProjectConfigsResponse = deserialize_body(merged.into_body()).unwrap();

assert!(parsed.project_configs.contains_key("key1"));
assert!(parsed.project_configs.contains_key("key2"));

// Verify upstreamRelayUrl field was added to each config
let key1_config = parsed.project_configs.get("key1").unwrap();
assert!(key1_config.is_object());
let key1_obj = key1_config.as_object().unwrap();
assert_eq!(
key1_obj.get(UPSTREAM_RELAY_FIELD),
Some(&serde_json::json!("http://relay-us1:8090"))
);

let key2_config = parsed.project_configs.get("key2").unwrap();
assert!(key2_config.is_object());
let key2_obj = key2_config.as_object().unwrap();
assert_eq!(
key2_obj.get(UPSTREAM_RELAY_FIELD),
Some(&serde_json::json!("http://relay-us2:8090"))
);
}

#[tokio::test]
Expand Down Expand Up @@ -768,12 +830,19 @@ mod tests {

// Pending from split phase (routing failures)
let pending_from_split: ProjectConfigsMetadata = ProjectConfigsMetadata {
cell_to_keys: HashMap::new(),
cell_to_keys: HashMap::from([
("us1".to_string(), vec!["key1".to_string()]),
("us2".to_string(), vec!["key2".to_string()]),
]),
unassigned_keys: vec![
"key_routing_failed".to_string(),
"key_from_failed_cell1".to_string(),
"key_from_failed_cell2".to_string(),
],
cell_to_relay_url: HashMap::from([
("us1".to_string(), "http://relay-us1:8090".to_string()),
("us2".to_string(), "http://relay-us2:8090".to_string()),
]),
};

let metadata: SplitMetadata = Box::new(pending_from_split);
Expand All @@ -782,6 +851,27 @@ mod tests {
// Parse merged response body so we can assert on pending keys
let parsed: ProjectConfigsResponse = deserialize_body(merged.into_body()).unwrap();

// Should have configs from successful responses
assert!(parsed.project_configs.contains_key("key1"));
assert!(parsed.project_configs.contains_key("key2"));

// Verify upstreamRelayUrl field was added to each config
let key1_config = parsed.project_configs.get("key1").unwrap();
assert!(key1_config.is_object());
let key1_obj = key1_config.as_object().unwrap();
assert_eq!(
key1_obj.get(UPSTREAM_RELAY_FIELD),
Some(&serde_json::json!("http://relay-us1:8090"))
);

let key2_config = parsed.project_configs.get("key2").unwrap();
assert!(key2_config.is_object());
let key2_obj = key2_config.as_object().unwrap();
assert_eq!(
key2_obj.get(UPSTREAM_RELAY_FIELD),
Some(&serde_json::json!("http://relay-us2:8090"))
);

// Should have pending keys from split phase
assert_eq!(parsed.pending_keys.len(), 3);
assert!(
Expand Down
3 changes: 1 addition & 2 deletions ingest-router/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ async fn send_to_cell(
) -> Result<Response<Bytes>, IngestRouterError> {
// Look up the upstream for this cell
let upstream = cells
.cell_to_upstreams()
.get(cell_id)
.get_upstream(cell_id)
.ok_or_else(|| IngestRouterError::InternalError(format!("Unknown cell: {}", cell_id)))?;

// Wrap Bytes in Full for the HTTP client
Expand Down
Loading