Skip to content

feat(logs): Improve logs with spans #194

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 17, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ sqlx = { workspace = true, features = [
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tracing = { workspace = true, default-features = false }
tracing-actix-web = { workspace = true, features = ["emit_event_on_error"] }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We emit errors on our own.

tracing-actix-web = { workspace = true }
utoipa = { workspace = true, features = ["actix_extras"] }
utoipa-swagger-ui = { workspace = true, features = ["actix-web", "reqwest"] }
uuid = { version = "1.10.0", features = ["v4"] }
Expand Down
17 changes: 17 additions & 0 deletions api/configuration/prod.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
database:
host: "localhost"
port: 5430
name: "postgres"
username: "postgres"
password: "postgres"
tls:
enabled: false
trusted_root_certs: ""
require_ssl: false
application:
host: "127.0.0.1"
port: 8000
encryption_key:
id: 0
key: BlK9AlrzqRnCZy53j42uE1p2qGBiF7HYZjZYFaZObqg=
api_key: XOUbHmWbt9h7nWl15wWwyWQnctmFGNjpawMc3lT5CFs=
2 changes: 1 addition & 1 deletion api/src/k8s_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,4 +620,4 @@ impl K8sClient for HttpK8sClient {

Ok(logs)
}
}
}
10 changes: 2 additions & 8 deletions api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,8 @@ use telemetry::init_tracing;
use tracing::{error, info};

fn main() -> anyhow::Result<()> {
let app_name = env!("CARGO_BIN_NAME");

// We pass emit_on_span_close = true to emit logs on span close
// for the api because it is a web server, and we need to emit logs
// for every closing request. This is a bit of a hack, but it works
// for now. Ideally the tracing middleware should emit a log on
// request end, but it doesn't do that yet.
let _log_flusher = init_tracing(app_name, true)?;
// Initialize tracing from the binary name
let _log_flusher = init_tracing(env!("CARGO_BIN_NAME"))?;

// Initialize Sentry before the async runtime starts
let _sentry_guard = init_sentry()?;
Expand Down
75 changes: 63 additions & 12 deletions api/src/span_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,85 @@ use actix_web::{
body::MessageBody,
dev::{ServiceRequest, ServiceResponse},
};
use tracing::Span;
use tracing_actix_web::{DefaultRootSpanBuilder, RootSpanBuilder};
use tracing::{Span, info, warn};
use tracing_actix_web::RootSpanBuilder;

/// The `RootSpanBuilder` implementation for the API service.
/// It extracts the project ref from the `tenant_id` header
/// and sets it as a field in the root span. If the header is not
/// present, it sets the field to `Empty` to allow handlers
/// to set it later.
///
/// It extracts the project ref from the `tenant_id` header and sets it as a field in the root span.
/// It also emits info logs for request start and completion.
#[derive(Debug)]
pub struct ApiRootSpanBuilder;

impl RootSpanBuilder for ApiRootSpanBuilder {
fn on_request_start(request: &ServiceRequest) -> Span {
let project = request.headers().get("tenant_id");
match project {
let span = match project {
Some(project) => {
// We convert lossily to a string to be able to read at least part of the
// project ref in case of invalid UTF-8. This is useful for debugging.
// This is anyways an extreme edge case, as the project ref is
// generated by the system and should be valid UTF-8.
// We convert lossily to a string to be able to read at least part of the project
// ref in case of invalid UTF-8. This is useful for debugging.
//
// However, this is an edge case, as the project ref is generated by the system and
// should be valid UTF-8.
let project = String::from_utf8_lossy(project.as_bytes());
let project = project.as_ref();
tracing_actix_web::root_span!(request, project = project)
}
None => tracing_actix_web::root_span!(request, project = tracing::field::Empty),
};

// Log request start within the span
{
let _enter = span.enter();
info!(
method = %request.method(),
uri = %request.uri(),
path = %request.path(),
query_string = %request.query_string(),
version = ?request.version(),
"HTTP request received"
);
}

span
}

fn on_request_end<B: MessageBody>(span: Span, outcome: &Result<ServiceResponse<B>, Error>) {
DefaultRootSpanBuilder::on_request_end(span, outcome);
// Log request completion within the span
{
let _enter = span.enter();

match outcome {
Ok(response) => {
let status_code = response.status().as_u16();
let response_size = response
.headers()
.get("content-length")
.and_then(|h| h.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);

let response_content_type = response
.headers()
.get("content-type")
.and_then(|h| h.to_str().ok())
.unwrap_or("unknown");

info!(
status_code = status_code,
response_size = response_size,
response_content_type = %response_content_type,
"HTTP request completed successfully"
);
}
Err(error) => {
warn!(
error = %error,
error_type = %std::any::type_name_of_val(error),
"HTTP request completed with error"
);
}
}
}
}
}
4 changes: 2 additions & 2 deletions api/src/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ pub async fn run(
let openapi = ApiDoc::openapi();

let server = HttpServer::new(move || {
let tracing_middleware = TracingLogger::<ApiRootSpanBuilder>::new();
let tracing_logger = TracingLogger::<ApiRootSpanBuilder>::new();
let authentication = HttpAuthentication::bearer(auth_validator);
let app = App::new()
.wrap(
Expand All @@ -242,7 +242,7 @@ pub async fn run(
.start_transaction(true)
.finish(),
)
.wrap(tracing_middleware)
.wrap(tracing_logger)
.service(health_check)
.service(
SwaggerUi::new("/swagger-ui/{_:.*}").url("/api-docs/openapi.json", openapi.clone()),
Expand Down
2 changes: 1 addition & 1 deletion etl/src/clients/bigquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl BigQueryClient {
"".to_string()
};

info!("Creating table {full_table_name} in BigQuery");
info!("creating table {full_table_name} in BigQuery");

let query = format!("create table {full_table_name} {columns_spec} {max_staleness_option}");

Expand Down
4 changes: 4 additions & 0 deletions etl/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ where
}
}

pub fn id(&self) -> PipelineId {
self.id
}

pub fn shutdown_tx(&self) -> ShutdownTx {
self.shutdown_tx.clone()
}
Expand Down
1 change: 1 addition & 0 deletions etl/src/replication/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ where
info!("postgres connection terminated successfully")
}
.instrument(span);

tokio::spawn(task);
}

Expand Down
8 changes: 6 additions & 2 deletions etl/src/workers/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ where
async fn start(self) -> Result<ApplyWorkerHandle, Self::Error> {
info!("starting apply worker");

let apply_worker_span = tracing::info_span!("apply_worker");
let apply_worker_span = tracing::info_span!(
"apply_worker",
pipeline_id = self.pipeline_id,
publication_name = self.config.publication_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are pipeline_id and publication_name useful for every log line within the span? There's going to only one of each while the replicator is running. As opposed to something like a table_id which will be different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was that we want a reliable way to get a view of all logs of a certain pipeline for debugging a single customer.

);
let apply_worker = async move {
let start_lsn = get_start_lsn(self.pipeline_id, &self.replication_client).await?;

Expand Down Expand Up @@ -151,7 +155,7 @@ where

Ok(())
}
.instrument(apply_worker_span);
.instrument(apply_worker_span.or_current());

let handle = tokio::spawn(apply_worker);

Expand Down
8 changes: 6 additions & 2 deletions etl/src/workers/table_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,12 @@ where
let state = TableSyncWorkerState::new(self.table_id, table_replication_phase);

let state_clone = state.clone();
let table_sync_worker_span =
tracing::info_span!("table_sync_worker", table_id = self.table_id);
let table_sync_worker_span = tracing::info_span!(
"table_sync_worker",
pipeline_id = self.pipeline_id,
publication_name = self.config.publication_name,
table_id = self.table_id,
);
let table_sync_worker = async move {
debug!(
"waiting to acquire a running permit for table sync worker for table {}",
Expand Down
4 changes: 2 additions & 2 deletions replicator/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use etl::state::store::postgres::PostgresStateStore;
use etl::{destination::base::Destination, pipeline::PipelineId};
use secrecy::ExposeSecret;
use std::fmt;
use tracing::{info, instrument, warn};
use tracing::{info, warn};

pub async fn start_replicator() -> anyhow::Result<()> {
info!("starting replicator service");
Expand Down Expand Up @@ -145,7 +145,7 @@ async fn init_state_store(
Ok(PostgresStateStore::new(pipeline_id, pg_connection_config))
}

#[instrument(skip(pipeline))]
#[tracing::instrument(skip(pipeline), fields(pipeline_id = pipeline.id()))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, is pipeline_id needed?

async fn start_pipeline<S, D>(mut pipeline: Pipeline<S, D>) -> anyhow::Result<()>
where
S: StateStore + Clone + Send + Sync + 'static,
Expand Down
8 changes: 2 additions & 6 deletions replicator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,8 @@ mod core;
mod migrations;

fn main() -> anyhow::Result<()> {
let app_name = env!("CARGO_BIN_NAME");

// We pass `emit_on_span_close = false` to avoid emitting logs on span close
// for replicator because it is not a web server, and we don't need to emit logs
// for every closing span.
let _log_flusher = init_tracing(app_name, false)?;
// Initialize tracing from the binary name
let _log_flusher = init_tracing(env!("CARGO_BIN_NAME"))?;

// Initialize Sentry before the async runtime starts
let _sentry_guard = init_sentry()?;
Expand Down
42 changes: 11 additions & 31 deletions telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ use tracing_appender::{
rolling::{self, InitError},
};
use tracing_log::{LogTracer, log_tracer::SetLoggerError};
use tracing_subscriber::{
EnvFilter, FmtSubscriber,
fmt::{self, format::FmtSpan},
};
use tracing_subscriber::{EnvFilter, FmtSubscriber, fmt};

#[derive(Debug, Error)]
pub enum TracingError {
Expand Down Expand Up @@ -52,41 +49,37 @@ pub fn init_test_tracing() {
// and we need to log to terminal when `ENABLE_TRACING` env var is set.
Environment::Dev.set();
let _log_flusher =
init_tracing("test", false).expect("Failed to initialize tracing for tests");
init_tracing("test").expect("Failed to initialize tracing for tests");
}
});
}

/// Initializes tracing for the application.
pub fn init_tracing(app_name: &str, emit_on_span_close: bool) -> Result<LogFlusher, TracingError> {
pub fn init_tracing(app_name: &str) -> Result<LogFlusher, TracingError> {
// Initialize the log tracer to capture logs from the `log` crate
// and send them to the `tracing` subscriber. This captures logs
// from libraries that use the `log` crate.
LogTracer::init()?;

let is_prod = Environment::load()?.is_prod();

// Set the default log level to `info` if not specified in the RUST_LOG environment variable.
// Set the default log level to `info` if not specified in the `RUST_LOG` environment variable.
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into());

let log_flusher = if is_prod {
configure_prod_tracing(filter, app_name, emit_on_span_close)?
configure_prod_tracing(filter, app_name)?
} else {
configure_dev_tracing(filter, emit_on_span_close)?
configure_dev_tracing(filter)?
};

set_tracing_panic_hook();

// Return the log flusher to ensure logs are flushed before the application exits
// without this the logs in memory may not be flushed to the file
// without this the logs in memory may not be flushed to the file.
Ok(log_flusher)
}

fn configure_prod_tracing(
filter: EnvFilter,
app_name: &str,
emit_on_span_close: bool,
) -> Result<LogFlusher, TracingError> {
fn configure_prod_tracing(filter: EnvFilter, app_name: &str) -> Result<LogFlusher, TracingError> {
let filename_suffix = "log";
let log_dir = "logs";
let file_appender = rolling::Builder::new()
Expand All @@ -113,25 +106,18 @@ fn configure_prod_tracing(
.event_format(format)
.with_writer(file_appender)
.json()
.with_current_span(true)
.with_span_list(true)
.with_env_filter(filter);

let subscriber_builder = if emit_on_span_close {
subscriber_builder.with_span_events(FmtSpan::CLOSE)
} else {
subscriber_builder
};

let subscriber = subscriber_builder.finish();

set_global_default(subscriber)?;

Ok(LogFlusher::Flusher(guard))
}

fn configure_dev_tracing(
filter: EnvFilter,
emit_on_span_close: bool,
) -> Result<LogFlusher, TracingError> {
fn configure_dev_tracing(filter: EnvFilter) -> Result<LogFlusher, TracingError> {
let format = fmt::format()
// Emit the log level in the log output
.with_level(true)
Expand All @@ -149,12 +135,6 @@ fn configure_dev_tracing(
.event_format(format)
.with_env_filter(filter);

let subscriber_builder = if emit_on_span_close {
subscriber_builder.with_span_events(FmtSpan::CLOSE)
} else {
subscriber_builder
};

let subscriber = subscriber_builder.finish();

set_global_default(subscriber)?;
Expand Down
Loading