Skip to content

Commit 6a3b04d

Browse files
authored
Add observable instruments to periodicreader tests (#2428)
1 parent 4e52554 commit 6a3b04d

File tree

1 file changed

+130
-0
lines changed

1 file changed

+130
-0
lines changed

opentelemetry-sdk/src/metrics/periodic_reader.rs

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -782,4 +782,134 @@ mod tests {
782782
"Metrics should be available in exporter."
783783
);
784784
}
785+
786+
async fn some_async_function() -> u64 {
787+
// No dependency on any particular async runtime.
788+
std::thread::sleep(std::time::Duration::from_millis(1));
789+
1
790+
}
791+
792+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
793+
async fn async_inside_observable_callback_from_tokio_multi_with_one_worker() {
794+
async_inside_observable_callback_helper();
795+
}
796+
797+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
798+
async fn async_inside_observable_callback_from_tokio_multi_with_two_worker() {
799+
async_inside_observable_callback_helper();
800+
}
801+
802+
#[tokio::test(flavor = "current_thread")]
803+
async fn async_inside_observable_callback_from_tokio_current_thread() {
804+
async_inside_observable_callback_helper();
805+
}
806+
807+
#[test]
808+
fn async_inside_observable_callback_from_regular_main() {
809+
async_inside_observable_callback_helper();
810+
}
811+
812+
fn async_inside_observable_callback_helper() {
813+
let interval = std::time::Duration::from_millis(10);
814+
let exporter = InMemoryMetricExporter::default();
815+
let reader = PeriodicReader::builder(exporter.clone())
816+
.with_interval(interval)
817+
.build();
818+
819+
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
820+
let meter = meter_provider.meter("test");
821+
let _gauge = meter
822+
.u64_observable_gauge("my_observable_gauge")
823+
.with_callback(|observer| {
824+
// using futures_executor::block_on intentionally and avoiding
825+
// any particular async runtime.
826+
let value = futures_executor::block_on(some_async_function());
827+
observer.observe(value, &[]);
828+
})
829+
.build();
830+
831+
meter_provider.force_flush().expect("flush should succeed");
832+
let exported_metrics = exporter
833+
.get_finished_metrics()
834+
.expect("this should not fail");
835+
assert!(
836+
!exported_metrics.is_empty(),
837+
"Metrics should be available in exporter."
838+
);
839+
}
840+
841+
async fn some_tokio_async_function() -> u64 {
842+
// Tokio specific async function
843+
tokio::time::sleep(Duration::from_millis(1)).await;
844+
1
845+
}
846+
847+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
848+
849+
async fn tokio_async_inside_observable_callback_from_tokio_multi_with_one_worker() {
850+
tokio_async_inside_observable_callback_helper(true);
851+
}
852+
853+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
854+
async fn tokio_async_inside_observable_callback_from_tokio_multi_with_two_worker() {
855+
tokio_async_inside_observable_callback_helper(true);
856+
}
857+
858+
#[tokio::test(flavor = "current_thread")]
859+
#[ignore] //TODO: Investigate if this can be fixed.
860+
async fn tokio_async_inside_observable_callback_from_tokio_current_thread() {
861+
tokio_async_inside_observable_callback_helper(true);
862+
}
863+
864+
#[test]
865+
fn tokio_async_inside_observable_callback_from_regular_main() {
866+
tokio_async_inside_observable_callback_helper(false);
867+
}
868+
869+
fn tokio_async_inside_observable_callback_helper(use_current_tokio_runtime: bool) {
870+
let interval = std::time::Duration::from_millis(10);
871+
let exporter = InMemoryMetricExporter::default();
872+
let reader = PeriodicReader::builder(exporter.clone())
873+
.with_interval(interval)
874+
.build();
875+
876+
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
877+
let meter = meter_provider.meter("test");
878+
879+
if use_current_tokio_runtime {
880+
let rt = tokio::runtime::Handle::current().clone();
881+
let _gauge = meter
882+
.u64_observable_gauge("my_observable_gauge")
883+
.with_callback(move |observer| {
884+
// call tokio specific async function from here
885+
let value = rt.block_on(some_tokio_async_function());
886+
observer.observe(value, &[]);
887+
})
888+
.build();
889+
// rt here is a reference to the current tokio runtime.
890+
// Droppng it occurs when the tokio::main itself ends.
891+
} else {
892+
let rt = tokio::runtime::Runtime::new().unwrap();
893+
let _gauge = meter
894+
.u64_observable_gauge("my_observable_gauge")
895+
.with_callback(move |observer| {
896+
// call tokio specific async function from here
897+
let value = rt.block_on(some_tokio_async_function());
898+
observer.observe(value, &[]);
899+
})
900+
.build();
901+
// rt is not dropped here as it is moved to the closure,
902+
// and is dropped only when MeterProvider itself is dropped.
903+
// This works when called from normal main.
904+
};
905+
906+
meter_provider.force_flush().expect("flush should succeed");
907+
let exported_metrics = exporter
908+
.get_finished_metrics()
909+
.expect("this should not fail");
910+
assert!(
911+
!exported_metrics.is_empty(),
912+
"Metrics should be available in exporter."
913+
);
914+
}
785915
}

0 commit comments

Comments
 (0)