Skip to content

Commit 97cbe0a

Browse files
authored
feat(logs): Improve logs with spans (#194)
1 parent 94bd0e6 commit 97cbe0a

File tree

11 files changed

+73
-64
lines changed

11 files changed

+73
-64
lines changed

api/src/main.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,8 @@ use telemetry::init_tracing;
77
use tracing::{error, info};
88

99
fn main() -> anyhow::Result<()> {
10-
let app_name = env!("CARGO_BIN_NAME");
11-
12-
// We pass emit_on_span_close = true to emit logs on span close
13-
// for the api because it is a web server, and we need to emit logs
14-
// for every closing request. This is a bit of a hack, but it works
15-
// for now. Ideally the tracing middleware should emit a log on
16-
// request end, but it doesn't do that yet.
17-
let _log_flusher = init_tracing(app_name, true)?;
10+
// Initialize tracing from the binary name
11+
let _log_flusher = init_tracing(env!("CARGO_BIN_NAME"))?;
1812

1913
// Initialize Sentry before the async runtime starts
2014
let _sentry_guard = init_sentry()?;

api/src/span_builder.rs

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,60 @@ use actix_web::{
33
body::MessageBody,
44
dev::{ServiceRequest, ServiceResponse},
55
};
6-
use tracing::Span;
6+
use tracing::{Span, info};
77
use tracing_actix_web::{DefaultRootSpanBuilder, RootSpanBuilder};
88

99
/// The `RootSpanBuilder` implementation for the API service.
10-
/// It extracts the project ref from the `tenant_id` header
11-
/// and sets it as a field in the root span. If the header is not
12-
/// present, it sets the field to `Empty` to allow handlers
13-
/// to set it later.
10+
///
11+
/// It extracts the project ref from the `tenant_id` header and sets it as a field in the root span.
12+
/// It also emits info logs for request start and completion.
13+
#[derive(Debug)]
1414
pub struct ApiRootSpanBuilder;
1515

1616
impl RootSpanBuilder for ApiRootSpanBuilder {
1717
fn on_request_start(request: &ServiceRequest) -> Span {
1818
let project = request.headers().get("tenant_id");
19-
match project {
19+
let span = match project {
2020
Some(project) => {
21-
// We convert lossily to a string to be able to read at least part of the
22-
// project ref in case of invalid UTF-8. This is useful for debugging.
23-
// This is anyways an extreme edge case, as the project ref is
24-
// generated by the system and should be valid UTF-8.
21+
// We convert lossily to a string to be able to read at least part of the project
22+
// ref in case of invalid UTF-8. This is useful for debugging.
23+
//
24+
// However, this is an edge case, as the project ref is generated by the system and
25+
// should be valid UTF-8.
2526
let project = String::from_utf8_lossy(project.as_bytes());
2627
let project = project.as_ref();
2728
tracing_actix_web::root_span!(request, project = project)
2829
}
2930
None => tracing_actix_web::root_span!(request, project = tracing::field::Empty),
31+
};
32+
33+
// We enter the span temporarily to log the request received. The span will be entered
34+
// automatically before the request is handled.
35+
{
36+
let _enter = span.enter();
37+
info!(
38+
method = %request.method(),
39+
uri = %request.uri(),
40+
path = %request.path(),
41+
query_string = %request.query_string(),
42+
version = ?request.version(),
43+
"HTTP request received"
44+
);
3045
}
46+
47+
span
3148
}
3249

3350
fn on_request_end<B: MessageBody>(span: Span, outcome: &Result<ServiceResponse<B>, Error>) {
51+
// We call the default builder to add more details to the current span.
3452
DefaultRootSpanBuilder::on_request_end(span, outcome);
53+
54+
// In case we have a positive response, we want to log the success. In case of error, it will
55+
// be logged automatically since we have the `emit_event_on_error` feature enabled.
56+
if let Ok(response) = outcome {
57+
if response.response().error().is_none() {
58+
info!("HTTP request completed successfully");
59+
}
60+
}
3561
}
3662
}

api/src/startup.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ pub async fn run(
233233
let openapi = ApiDoc::openapi();
234234

235235
let server = HttpServer::new(move || {
236-
let tracing_middleware = TracingLogger::<ApiRootSpanBuilder>::new();
236+
let tracing_logger = TracingLogger::<ApiRootSpanBuilder>::new();
237237
let authentication = HttpAuthentication::bearer(auth_validator);
238238
let app = App::new()
239239
.wrap(
@@ -242,7 +242,7 @@ pub async fn run(
242242
.start_transaction(true)
243243
.finish(),
244244
)
245-
.wrap(tracing_middleware)
245+
.wrap(tracing_logger)
246246
.service(health_check)
247247
.service(
248248
SwaggerUi::new("/swagger-ui/{_:.*}").url("/api-docs/openapi.json", openapi.clone()),

etl/src/clients/bigquery.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ impl BigQueryClient {
177177
"".to_string()
178178
};
179179

180-
info!("Creating table {full_table_name} in BigQuery");
180+
info!("creating table {full_table_name} in BigQuery");
181181

182182
let query = format!("create table {full_table_name} {columns_spec} {max_staleness_option}");
183183

etl/src/pipeline.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ where
8686
}
8787
}
8888

89+
pub fn id(&self) -> PipelineId {
90+
self.id
91+
}
92+
8993
pub fn shutdown_tx(&self) -> ShutdownTx {
9094
self.shutdown_tx.clone()
9195
}

etl/src/replication/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ where
3737
info!("postgres connection terminated successfully")
3838
}
3939
.instrument(span);
40+
4041
tokio::spawn(task);
4142
}
4243

etl/src/workers/apply.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,11 @@ where
122122
async fn start(self) -> Result<ApplyWorkerHandle, Self::Error> {
123123
info!("starting apply worker");
124124

125-
let apply_worker_span = tracing::info_span!("apply_worker");
125+
let apply_worker_span = tracing::info_span!(
126+
"apply_worker",
127+
pipeline_id = self.pipeline_id,
128+
publication_name = self.config.publication_name
129+
);
126130
let apply_worker = async move {
127131
let start_lsn = get_start_lsn(self.pipeline_id, &self.replication_client).await?;
128132

@@ -151,7 +155,7 @@ where
151155

152156
Ok(())
153157
}
154-
.instrument(apply_worker_span);
158+
.instrument(apply_worker_span.or_current());
155159

156160
let handle = tokio::spawn(apply_worker);
157161

etl/src/workers/table_sync.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,12 @@ where
314314
let state = TableSyncWorkerState::new(self.table_id, table_replication_phase);
315315

316316
let state_clone = state.clone();
317-
let table_sync_worker_span =
318-
tracing::info_span!("table_sync_worker", table_id = self.table_id);
317+
let table_sync_worker_span = tracing::info_span!(
318+
"table_sync_worker",
319+
pipeline_id = self.pipeline_id,
320+
publication_name = self.config.publication_name,
321+
table_id = self.table_id,
322+
);
319323
let table_sync_worker = async move {
320324
debug!(
321325
"waiting to acquire a running permit for table sync worker for table {}",

replicator/src/core.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use etl::state::store::postgres::PostgresStateStore;
1313
use etl::{destination::base::Destination, pipeline::PipelineId};
1414
use secrecy::ExposeSecret;
1515
use std::fmt;
16-
use tracing::{info, instrument, warn};
16+
use tracing::{info, warn};
1717

1818
pub async fn start_replicator() -> anyhow::Result<()> {
1919
info!("starting replicator service");
@@ -145,7 +145,7 @@ async fn init_state_store(
145145
Ok(PostgresStateStore::new(pipeline_id, pg_connection_config))
146146
}
147147

148-
#[instrument(skip(pipeline))]
148+
#[tracing::instrument(skip(pipeline), fields(pipeline_id = pipeline.id()))]
149149
async fn start_pipeline<S, D>(mut pipeline: Pipeline<S, D>) -> anyhow::Result<()>
150150
where
151151
S: StateStore + Clone + Send + Sync + 'static,

replicator/src/main.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,8 @@ mod core;
1212
mod migrations;
1313

1414
fn main() -> anyhow::Result<()> {
15-
let app_name = env!("CARGO_BIN_NAME");
16-
17-
// We pass `emit_on_span_close = false` to avoid emitting logs on span close
18-
// for replicator because it is not a web server, and we don't need to emit logs
19-
// for every closing span.
20-
let _log_flusher = init_tracing(app_name, false)?;
15+
// Initialize tracing from the binary name
16+
let _log_flusher = init_tracing(env!("CARGO_BIN_NAME"))?;
2117

2218
// Initialize Sentry before the async runtime starts
2319
let _sentry_guard = init_sentry()?;

0 commit comments

Comments
 (0)