Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 193 additions & 0 deletions ingest-router/src/ingest_router_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
use crate::config;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is not new -- just moved from lib.rs

use crate::errors::IngestRouterError;
use crate::executor;
use crate::router;
use http_body_util::{BodyExt, Full};
use hyper::StatusCode;
use hyper::body::Bytes;
use hyper::service::Service;
use hyper::{Request, Response};
use shared::http::make_error_response;
use std::pin::Pin;

pub struct IngestRouterService {
router: router::Router,
executor: executor::Executor,
}

impl IngestRouterService {
pub fn new(router: router::Router, timeouts: config::RelayTimeouts) -> Self {
let executor = executor::Executor::new(timeouts);
Self { router, executor }
}
}

impl<B> Service<Request<B>> for IngestRouterService
where
B: BodyExt<Data = Bytes> + Send + Sync + 'static,
B::Error: std::error::Error + Send + Sync + 'static,
B: Unpin,
{
type Response = Response<Full<Bytes>>;
type Error = IngestRouterError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn call(&self, req: Request<B>) -> Self::Future {
let maybe_handler = self.router.resolve(&req);

match maybe_handler {
Some((handler, cells)) => {
let (parts, body) = req.into_parts();
let executor = self.executor.clone();

Box::pin(async move {
let body_bytes = match body.collect().await {
Ok(c) => c.to_bytes(),
Err(_) => {
return Ok(make_error_response(StatusCode::BAD_REQUEST).map(Full::new));
}
};
let request = Request::from_parts(parts, body_bytes);
let response = executor.execute(handler, request, cells).await;
Ok(response.map(Full::new))
})
}
None => {
Box::pin(
async move { Ok(make_error_response(StatusCode::BAD_REQUEST).map(Full::new)) },
)
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::api::project_config::ProjectConfigsResponse;
use crate::api::utils::deserialize_body;
use crate::config::{CellConfig, HandlerAction, HttpMethod, Match, Route};
use crate::testutils::create_test_locator;
use hyper::Method;
use hyper::header::HOST;
use std::collections::HashMap;
use std::net::TcpStream;
use std::process::{Child, Command};
use std::time::Duration;
use url::Url;

struct TestServer {
child: Child,
}

impl TestServer {
fn spawn() -> std::io::Result<Self> {
let child = Command::new("python")
.arg("../scripts/mock_relay_api.py")
.spawn()?;

// Wait for tcp
for _ in 0..10 {
if TcpStream::connect("127.0.0.1:8000").is_err() {
std::thread::sleep(Duration::from_millis(100));
} else {
return Ok(Self { child });
}
}

Ok(Self { child })
}
}

impl Drop for TestServer {
fn drop(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
}
}

#[tokio::test]
async fn test_ingest_router() {
let _relay_server = TestServer::spawn().expect("Failed to spawn test server");

let routes_config = vec![
Route {
r#match: Match {
host: Some("us.sentry.io".to_string()),
path: Some("/api/0/relays/projectconfigs/".to_string()),
method: Some(HttpMethod::Post),
},
action: HandlerAction::RelayProjectConfigs,
locale: "us".to_string(),
},
Route {
r#match: Match {
host: Some("us.sentry.io".to_string()),
path: Some("/api/0/relays/live/".to_string()),
method: Some(HttpMethod::Get),
},
action: HandlerAction::Health,
locale: "us".to_string(),
},
];

let locales = HashMap::from([(
"us".to_string(),
vec![CellConfig {
id: "us1".to_string(),
sentry_url: Url::parse("https://sentry.io/us1").unwrap(),
relay_url: Url::parse("http://localhost:8000").unwrap(),
}],
)]);

let locator = create_test_locator(HashMap::from([(
"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(),
"us1".to_string(),
)]))
.await;

let service = IngestRouterService::new(
router::Router::new(routes_config, locales, locator),
config::RelayTimeouts {
http_timeout_secs: 5000,
task_initial_timeout_secs: 10000,
task_subsequent_timeout_secs: 10000,
},
);

// Project configs request
let request = Request::builder()
.method(Method::POST)
.uri("/api/0/relays/projectconfigs/")
.header(HOST, "us.sentry.io")
.body(Full::new(Bytes::from(
r#"{"publicKeys": ["aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"], "global": 1}"#,
)))
.unwrap();

let response = service.call(request).await.unwrap();

let (parts, body) = response.into_parts();

assert_eq!(parts.status, 200);

// Convert BoxBody to Bytes for deserialize_body
let body_bytes = body.collect().await.unwrap().to_bytes();
let parsed: ProjectConfigsResponse = deserialize_body(body_bytes).unwrap();
assert_eq!(parsed.project_configs.len(), 1);
assert_eq!(parsed.pending_keys.len(), 0);
assert_eq!(parsed.extra_fields.len(), 2);

// Healthcheck
let request = Request::builder()
.method(Method::GET)
.uri("/api/0/relays/live/")
.header(HOST, "us.sentry.io")
.body(Full::new(Bytes::new()))
.unwrap();

let response = service.call(request).await.unwrap();
assert_eq!(response.status(), 200);
}
}
Loading