@@ -231,7 +231,7 @@ fn prepare_output(
231
231
array_addrs : jlongArray ,
232
232
schema_addrs : jlongArray ,
233
233
output_batch : RecordBatch ,
234
- exec_context : & mut ExecutionContext ,
234
+ validate : bool ,
235
235
) -> CometResult < jlong > {
236
236
let array_address_array = unsafe { JLongArray :: from_raw ( array_addrs) } ;
237
237
let num_cols = env. get_array_length ( & array_address_array) ? as usize ;
@@ -255,7 +255,7 @@ fn prepare_output(
255
255
) ) ) ;
256
256
}
257
257
258
- if exec_context . debug_native {
258
+ if validate {
259
259
// Validate the output arrays.
260
260
for array in results. iter ( ) {
261
261
let array_data = array. to_data ( ) ;
@@ -275,9 +275,6 @@ fn prepare_output(
275
275
i += 1 ;
276
276
}
277
277
278
- // Update metrics
279
- update_metrics ( env, exec_context) ?;
280
-
281
278
Ok ( num_rows as jlong )
282
279
}
283
280
@@ -356,22 +353,22 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
356
353
let next_item = exec_context. stream . as_mut ( ) . unwrap ( ) . next ( ) ;
357
354
let poll_output = exec_context. runtime . block_on ( async { poll ! ( next_item) } ) ;
358
355
356
+ // Update metrics
357
+ update_metrics ( & mut env, exec_context) ?;
358
+
359
359
match poll_output {
360
360
Poll :: Ready ( Some ( output) ) => {
361
+ // prepare output for FFI transfer
361
362
return prepare_output (
362
363
& mut env,
363
364
array_addrs,
364
365
schema_addrs,
365
366
output?,
366
- exec_context,
367
+ exec_context. debug_native ,
367
368
) ;
368
369
}
369
370
Poll :: Ready ( None ) => {
370
371
// Reaches EOF of output.
371
-
372
- // Update metrics
373
- update_metrics ( & mut env, exec_context) ?;
374
-
375
372
if exec_context. explain_native {
376
373
if let Some ( plan) = & exec_context. root_op {
377
374
let formatted_plan_str =
@@ -391,9 +388,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
391
388
// A poll pending means there are more than one blocking operators,
392
389
// we don't need go back-forth between JVM/Native. Just keeping polling.
393
390
Poll :: Pending => {
394
- // Update metrics
395
- update_metrics ( & mut env, exec_context) ?;
396
-
397
391
// Pull input batches
398
392
pull_input_batches ( exec_context) ?;
399
393
0 commit comments