@@ -25,11 +25,11 @@ use common::{
25
25
LogLines ,
26
26
} ,
27
27
log_streaming:: {
28
- EventSource ,
28
+ self ,
29
29
FunctionEventSource ,
30
30
LogEvent ,
31
31
LogSender ,
32
- LogTopic ,
32
+ StructuredLogEvent ,
33
33
} ,
34
34
runtime:: {
35
35
Runtime ,
@@ -212,68 +212,47 @@ impl FunctionExecution {
212
212
LogLine :: Unstructured ( _) => self . unix_timestamp ,
213
213
LogLine :: Structured { timestamp, .. } => * timestamp,
214
214
} ;
215
- let JsonValue :: Object ( payload) = json ! ( {
216
- "message" : line. clone( ) . to_pretty_string( )
217
- } ) else {
218
- anyhow:: bail!( "could not create JSON object" ) ;
219
- } ;
220
- Ok :: < _ , anyhow:: Error > ( LogEvent {
221
- topic : LogTopic :: Console ,
222
- source : EventSource :: Function ( self . event_source ( ) ) ,
223
- payload,
215
+ LogEvent {
224
216
timestamp,
225
- } )
226
- } )
227
- . filter_map ( |event| match event {
228
- Err ( mut e) => {
229
- tracing:: error!( "Dropping log event due to failed payload conversion: {e:?}" ) ;
230
- report_error ( & mut e) ;
231
- None
232
- } ,
233
- Ok ( ev) => Some ( ev) ,
217
+ event : StructuredLogEvent :: Console {
218
+ source : self . event_source ( ) ,
219
+ log_line : line. clone ( ) ,
220
+ } ,
221
+ }
234
222
} )
235
223
. collect ( )
236
224
}
237
225
238
226
fn udf_execution_record_log_events ( & self ) -> anyhow:: Result < Vec < LogEvent > > {
239
- let execution_time: u64 = Duration :: from_secs_f64 ( self . execution_time )
240
- . as_millis ( )
241
- . try_into ( ) ?;
242
- let ( reason, status) = match self . params . err ( ) {
243
- Some ( err) => ( json ! ( err. to_string( ) ) , "failure" ) ,
244
- None => ( JsonValue :: Null , "success" ) ,
245
- } ;
246
-
247
- let JsonValue :: Object ( payload) = json ! ( {
248
- "status" : status,
249
- "reason" : reason,
250
- "executionTimeMs" : execution_time,
251
- "databaseReadBytes" : self . usage_stats. database_read_bytes,
252
- "databaseWriteBytes" : self . usage_stats. database_write_bytes,
253
- "storageReadBytes" : self . usage_stats. storage_read_bytes,
254
- "storageWriteBytes" : self . usage_stats. storage_write_bytes,
255
- } ) else {
256
- anyhow:: bail!( "could not create JSON object for UdfExecutionRecord" ) ;
257
- } ;
227
+ let execution_time = Duration :: from_secs_f64 ( self . execution_time ) ;
258
228
259
229
let mut events = vec ! [ LogEvent {
260
- topic: LogTopic :: UdfExecutionRecord ,
261
230
timestamp: self . unix_timestamp,
262
- source: EventSource :: Function ( self . event_source( ) ) ,
263
- payload,
231
+ event: StructuredLogEvent :: FunctionExecution {
232
+ source: self . event_source( ) ,
233
+ error: self . params. err( ) . cloned( ) ,
234
+ execution_time,
235
+ usage_stats: log_streaming:: AggregatedFunctionUsageStats {
236
+ database_read_bytes: self . usage_stats. database_read_bytes,
237
+ database_write_bytes: self . usage_stats. database_write_bytes,
238
+ storage_read_bytes: self . usage_stats. storage_read_bytes,
239
+ storage_write_bytes: self . usage_stats. storage_write_bytes,
240
+ vector_index_read_bytes: self . usage_stats. vector_index_read_bytes,
241
+ vector_index_write_bytes: self . usage_stats. vector_index_write_bytes,
242
+ } ,
243
+ } ,
264
244
} ] ;
265
245
266
246
if let Some ( err) = self . params . err ( ) {
267
- events. push ( LogEvent :: construct_exception (
268
- err,
269
- self . unix_timestamp ,
270
- EventSource :: Function ( self . event_source ( ) ) ,
271
- self . udf_server_version
272
- . as_ref ( )
273
- . map ( |v| v. to_string ( ) )
274
- . as_deref ( ) ,
275
- & self . identity ,
276
- ) ?) ;
247
+ events. push ( LogEvent {
248
+ timestamp : self . unix_timestamp ,
249
+ event : StructuredLogEvent :: Exception {
250
+ error : err. clone ( ) ,
251
+ user_identifier : self . identity . user_identifier ( ) . cloned ( ) ,
252
+ source : self . event_source ( ) ,
253
+ udf_server_version : self . udf_server_version . clone ( ) ,
254
+ } ,
255
+ } ) ;
277
256
}
278
257
279
258
Ok ( events)
@@ -304,25 +283,13 @@ impl FunctionExecutionProgress {
304
283
LogLine :: Unstructured ( _) => self . function_start_timestamp ,
305
284
LogLine :: Structured { timestamp, .. } => * timestamp,
306
285
} ;
307
- let JsonValue :: Object ( payload) = json ! ( {
308
- "message" : line. to_pretty_string( )
309
- } ) else {
310
- anyhow:: bail!( "could not create JSON object" ) ;
311
- } ;
312
- Ok :: < _ , anyhow:: Error > ( LogEvent {
313
- topic : LogTopic :: Console ,
314
- source : EventSource :: Function ( self . event_source . clone ( ) ) ,
315
- payload,
286
+ LogEvent {
316
287
timestamp,
317
- } )
318
- } )
319
- . filter_map ( |event| match event {
320
- Err ( mut e) => {
321
- tracing:: error!( "Dropping log event due to failed payload conversion: {e:?}" ) ;
322
- report_error ( & mut e) ;
323
- None
324
- } ,
325
- Ok ( ev) => Some ( ev) ,
288
+ event : StructuredLogEvent :: Console {
289
+ source : self . event_source . clone ( ) ,
290
+ log_line : line,
291
+ } ,
292
+ }
326
293
} )
327
294
. collect ( )
328
295
}
0 commit comments