Skip to content

Commit 7f753c2

Browse files
sshaderConvex, Inc.
authored and
Convex, Inc.
committed
Start logging structured log lines (#23391)
We've landed the code to handle reading either structure or unstructured log lines (always persisting them as unstructured). So now it should be safe to start logging structured log lines! For now, the only user visible change would be the timestamps used in log streams, but pretty soon we can surface these on the dashboard or in the CLI (e.g. show timestamps per log line on the dashboard logs page) GitOrigin-RevId: 426d238f77f0bd9cfa657005f36551778b6f0f94
1 parent 69a3f3f commit 7f753c2

File tree

18 files changed

+540
-358
lines changed

18 files changed

+540
-358
lines changed

crates/application/src/function_log.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ use common::{
2020
execution_context::ExecutionContext,
2121
identity::InertIdentity,
2222
knobs::MAX_UDF_EXECUTION,
23-
log_lines::LogLines,
23+
log_lines::{
24+
LogLine,
25+
LogLines,
26+
},
2427
log_streaming::{
2528
EventSource,
2629
FunctionEventSource,
@@ -201,6 +204,10 @@ impl FunctionExecution {
201204
self.log_lines
202205
.iter()
203206
.map(|line| {
207+
let timestamp = match &line {
208+
LogLine::Unstructured(_) => self.unix_timestamp,
209+
LogLine::Structured { timestamp, .. } => *timestamp,
210+
};
204211
let JsonValue::Object(payload) = json!({
205212
"message": line.clone().to_pretty_string()
206213
}) else {
@@ -210,7 +217,7 @@ impl FunctionExecution {
210217
topic: LogTopic::Console,
211218
source: EventSource::Function(self.event_source()),
212219
payload,
213-
timestamp: self.unix_timestamp,
220+
timestamp,
214221
})
215222
})
216223
.filter_map(|event| match event {
@@ -288,7 +295,11 @@ impl FunctionExecutionProgress {
288295
fn console_log_events(self) -> Vec<LogEvent> {
289296
self.log_lines
290297
.into_iter()
291-
.map(|line| {
298+
.map(|line: LogLine| {
299+
let timestamp = match &line {
300+
LogLine::Unstructured(_) => self.function_start_timestamp,
301+
LogLine::Structured { timestamp, .. } => *timestamp,
302+
};
292303
let JsonValue::Object(payload) = json!({
293304
"message": line.to_pretty_string()
294305
}) else {
@@ -298,7 +309,7 @@ impl FunctionExecutionProgress {
298309
topic: LogTopic::Console,
299310
source: EventSource::Function(self.event_source.clone()),
300311
payload,
301-
timestamp: self.function_start_timestamp,
312+
timestamp,
302313
})
303314
})
304315
.filter_map(|event| match event {
@@ -1078,8 +1089,7 @@ impl<RT: Runtime> Inner<RT> {
10781089
event_source,
10791090
function_start_timestamp,
10801091
};
1081-
// TODO: this should ideally use a timestamp on the log lines themselves, but
1082-
// for now use the start timestamp of the function
1092+
10831093
let log_events = progress.clone().console_log_events();
10841094
self.log_manager.send_logs(log_events);
10851095
self.log

crates/common/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#![feature(async_closure)]
2222
#![feature(error_iter)]
2323
#![feature(impl_trait_in_assoc_type)]
24+
#![feature(round_char_boundary)]
2425

2526
pub mod async_compat;
2627
pub mod auth;

crates/common/src/log_lines.rs

Lines changed: 105 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use value::{
3232
HeapSize,
3333
WithHeapSize,
3434
},
35+
obj,
3536
remove_boolean,
3637
remove_int64,
3738
remove_nullable_object,
@@ -44,6 +45,7 @@ use value::{
4445
use crate::runtime::UnixTimestamp;
4546

4647
pub const TRUNCATED_LINE_SUFFIX: &str = " (truncated due to length)";
48+
pub const MAX_LOG_LINE_LENGTH: usize = 32768;
4749
/// List of log lines from a Convex function execution.
4850
pub type LogLines = WithHeapSize<Vec<LogLine>>;
4951
pub type RawLogLines = WithHeapSize<Vec<String>>;
@@ -95,7 +97,7 @@ impl HeapSize for LogLevel {
9597
#[derive(Clone, Debug, PartialEq)]
9698
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
9799
pub struct SystemLogMetadata {
98-
code: String,
100+
pub code: String,
99101
}
100102

101103
impl HeapSize for SystemLogMetadata {
@@ -114,6 +116,14 @@ impl TryFrom<ConvexObject> for SystemLogMetadata {
114116
}
115117
}
116118

119+
impl TryFrom<SystemLogMetadata> for ConvexValue {
120+
type Error = anyhow::Error;
121+
122+
fn try_from(value: SystemLogMetadata) -> Result<Self, Self::Error> {
123+
Ok(ConvexValue::Object(obj!("code" => value.code)?))
124+
}
125+
}
126+
117127
#[derive(Clone, Debug, PartialEq)]
118128
pub enum LogLine {
119129
Unstructured(String),
@@ -141,17 +151,21 @@ impl Arbitrary for LogLine {
141151
prop::collection::vec(any::<String>(), 1..4),
142152
any::<LogLevel>(),
143153
any::<bool>(),
144-
any::<u64>(),
154+
any::<i64>(),
145155
any::<Option<SystemLogMetadata>>()
146156
)
147-
.prop_map(
157+
.prop_filter_map(
158+
"Invalid LogLine",
148159
|(messages, level, is_truncated, timestamp_ms, system_metadata)| {
149-
LogLine::Structured {
150-
messages: messages.into(),
151-
level,
152-
is_truncated,
153-
timestamp: UnixTimestamp::from_millis(timestamp_ms),
154-
system_metadata,
160+
match u64::try_from(timestamp_ms) {
161+
Ok(timestamp_ms) => Some(LogLine::Structured {
162+
messages: messages.into(),
163+
level,
164+
is_truncated,
165+
timestamp: UnixTimestamp::from_millis(timestamp_ms),
166+
system_metadata,
167+
}),
168+
Err(_) => None,
155169
}
156170
}
157171
)
@@ -178,6 +192,61 @@ impl LogLine {
178192
},
179193
}
180194
}
195+
196+
pub fn new_developer_log_line(
197+
level: LogLevel,
198+
messages: Vec<String>,
199+
timestamp: UnixTimestamp,
200+
) -> Self {
201+
// total length of messages joined by a space
202+
let total_length = messages.iter().map(|m| m.len() + 1).sum::<usize>() - 1;
203+
if total_length <= MAX_LOG_LINE_LENGTH {
204+
return LogLine::Structured {
205+
messages: messages.into(),
206+
level,
207+
is_truncated: false,
208+
timestamp,
209+
system_metadata: None,
210+
};
211+
}
212+
let mut total_length = 0;
213+
let mut truncated_messages: Vec<String> = vec![];
214+
for message in messages {
215+
let remaining_space = MAX_LOG_LINE_LENGTH - TRUNCATED_LINE_SUFFIX.len() - total_length;
216+
if message.len() <= remaining_space {
217+
total_length += message.len() + 1;
218+
truncated_messages.push(message);
219+
} else {
220+
let last_message =
221+
message[..message.floor_char_boundary(remaining_space)].to_string();
222+
truncated_messages.push(last_message);
223+
break;
224+
}
225+
}
226+
LogLine::Structured {
227+
messages: truncated_messages.into(),
228+
level,
229+
is_truncated: true,
230+
timestamp,
231+
system_metadata: None,
232+
}
233+
}
234+
235+
pub fn new_system_log_line(
236+
level: LogLevel,
237+
messages: Vec<String>,
238+
timestamp: UnixTimestamp,
239+
system_log_metadata: SystemLogMetadata,
240+
) -> Self {
241+
// Never truncate system log lines
242+
LogLine::Structured {
243+
messages: messages.into(),
244+
level,
245+
is_truncated: false,
246+
timestamp,
247+
system_metadata: Some(system_log_metadata),
248+
}
249+
}
181250
}
182251

183252
impl HeapSize for LogLine {
@@ -216,7 +285,8 @@ impl TryFrom<ConvexValue> for LogLine {
216285
let level = remove_string(&mut fields, "level")?;
217286

218287
let timestamp = remove_int64(&mut fields, "timestamp")?;
219-
let system_metadata = remove_nullable_object(&mut fields, "system_metadata")?;
288+
let system_metadata: Option<SystemLogMetadata> =
289+
remove_nullable_object(&mut fields, "system_metadata")?;
220290

221291
LogLine::Structured {
222292
messages: messages.clone().into(),
@@ -236,7 +306,30 @@ impl TryFrom<LogLine> for ConvexValue {
236306
type Error = anyhow::Error;
237307

238308
fn try_from(value: LogLine) -> Result<Self, Self::Error> {
239-
Ok(ConvexValue::String(value.to_pretty_string().try_into()?))
309+
let result = match value {
310+
LogLine::Unstructured(v) => v.try_into()?,
311+
LogLine::Structured {
312+
messages,
313+
level,
314+
is_truncated,
315+
timestamp,
316+
system_metadata,
317+
} => {
318+
let timestamp_ms: i64 = timestamp.as_ms_since_epoch()?.try_into()?;
319+
let system_metadata_value = match system_metadata {
320+
Some(m) => ConvexValue::try_from(m)?,
321+
None => ConvexValue::Null,
322+
};
323+
ConvexValue::Object(obj!(
324+
"messages" => messages.into_iter().map(ConvexValue::try_from).try_collect::<Vec<_>>()?,
325+
"level" => level.to_string(),
326+
"is_truncated" => is_truncated,
327+
"timestamp" => timestamp_ms,
328+
"system_metadata" => system_metadata_value,
329+
)?)
330+
},
331+
};
332+
Ok(result)
240333
}
241334
}
242335

@@ -426,9 +519,7 @@ mod tests {
426519
)]
427520
#[test]
428521
fn test_structured_round_trips(log_line in any::<LogLine>()) {
429-
let pretty = log_line.clone().to_pretty_string();
430-
let val = LogLine::try_from(ConvexValue::try_from(log_line).unwrap()).unwrap();
431-
assert_eq!(val, LogLine::Unstructured(pretty));
522+
assert_roundtrips::<LogLine, ConvexValue>(log_line);
432523
}
433524

434525
#[test]

0 commit comments

Comments
 (0)