@@ -1580,4 +1580,176 @@ mod tests {
1580
1580
1581
1581
assert_eq ! ( exporter. len( ) , 1 ) ;
1582
1582
}
1583
+
1584
+ #[ test]
1585
+ #[ cfg( feature = "experimental_logs_batch_log_processor_with_async_runtime" ) ]
1586
+ fn test_build_batch_log_processor_builder_rt ( ) {
1587
+ let mut env_vars = vec ! [
1588
+ ( OTEL_BLRP_MAX_EXPORT_BATCH_SIZE , Some ( "500" ) ) ,
1589
+ ( OTEL_BLRP_SCHEDULE_DELAY , Some ( "I am not number" ) ) ,
1590
+ ( OTEL_BLRP_EXPORT_TIMEOUT , Some ( "2046" ) ) ,
1591
+ ] ;
1592
+ temp_env:: with_vars ( env_vars. clone ( ) , || {
1593
+ let builder = BatchLogProcessorWithAsyncRuntime :: builder (
1594
+ InMemoryLogExporter :: default ( ) ,
1595
+ runtime:: Tokio ,
1596
+ ) ;
1597
+
1598
+ assert_eq ! ( builder. config. max_export_batch_size, 500 ) ;
1599
+ assert_eq ! (
1600
+ builder. config. scheduled_delay,
1601
+ Duration :: from_millis( OTEL_BLRP_SCHEDULE_DELAY_DEFAULT )
1602
+ ) ;
1603
+ assert_eq ! (
1604
+ builder. config. max_queue_size,
1605
+ OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT
1606
+ ) ;
1607
+ assert_eq ! (
1608
+ builder. config. max_export_timeout,
1609
+ Duration :: from_millis( 2046 )
1610
+ ) ;
1611
+ } ) ;
1612
+
1613
+ env_vars. push ( ( OTEL_BLRP_MAX_QUEUE_SIZE , Some ( "120" ) ) ) ;
1614
+
1615
+ temp_env:: with_vars ( env_vars, || {
1616
+ let builder = BatchLogProcessorWithAsyncRuntime :: builder (
1617
+ InMemoryLogExporter :: default ( ) ,
1618
+ runtime:: Tokio ,
1619
+ ) ;
1620
+ assert_eq ! ( builder. config. max_export_batch_size, 120 ) ;
1621
+ assert_eq ! ( builder. config. max_queue_size, 120 ) ;
1622
+ } ) ;
1623
+ }
1624
+
1625
+ #[ test]
1626
+ #[ cfg( feature = "experimental_logs_batch_log_processor_with_async_runtime" ) ]
1627
+ fn test_build_batch_log_processor_builder_rt_with_custom_config ( ) {
1628
+ let expected = BatchConfigBuilder :: default ( )
1629
+ . with_max_export_batch_size ( 1 )
1630
+ . with_scheduled_delay ( Duration :: from_millis ( 2 ) )
1631
+ . with_max_export_timeout ( Duration :: from_millis ( 3 ) )
1632
+ . with_max_queue_size ( 4 )
1633
+ . build ( ) ;
1634
+
1635
+ let builder = BatchLogProcessorWithAsyncRuntime :: builder (
1636
+ InMemoryLogExporter :: default ( ) ,
1637
+ runtime:: Tokio ,
1638
+ )
1639
+ . with_batch_config ( expected) ;
1640
+
1641
+ let actual = & builder. config ;
1642
+ assert_eq ! ( actual. max_export_batch_size, 1 ) ;
1643
+ assert_eq ! ( actual. scheduled_delay, Duration :: from_millis( 2 ) ) ;
1644
+ assert_eq ! ( actual. max_export_timeout, Duration :: from_millis( 3 ) ) ;
1645
+ assert_eq ! ( actual. max_queue_size, 4 ) ;
1646
+ }
1647
+
1648
+ #[ cfg( feature = "experimental_logs_batch_log_processor_with_async_runtime" ) ]
1649
+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
1650
+ async fn test_set_resource_batch_processor_rt ( ) {
1651
+ let exporter = MockLogExporter {
1652
+ resource : Arc :: new ( Mutex :: new ( None ) ) ,
1653
+ } ;
1654
+ let processor = BatchLogProcessorWithAsyncRuntime :: new (
1655
+ Box :: new ( exporter. clone ( ) ) ,
1656
+ BatchConfig :: default ( ) ,
1657
+ runtime:: Tokio ,
1658
+ ) ;
1659
+ let provider = LoggerProvider :: builder ( )
1660
+ . with_log_processor ( processor)
1661
+ . with_resource ( Resource :: new ( vec ! [
1662
+ KeyValue :: new( "k1" , "v1" ) ,
1663
+ KeyValue :: new( "k2" , "v3" ) ,
1664
+ KeyValue :: new( "k3" , "v3" ) ,
1665
+ KeyValue :: new( "k4" , "v4" ) ,
1666
+ KeyValue :: new( "k5" , "v5" ) ,
1667
+ ] ) )
1668
+ . build ( ) ;
1669
+ tokio:: time:: sleep ( Duration :: from_secs ( 2 ) ) . await ; // set resource in batch span processor is not blocking. Should we make it blocking?
1670
+ assert_eq ! ( exporter. get_resource( ) . unwrap( ) . into_iter( ) . count( ) , 5 ) ;
1671
+ let _ = provider. shutdown ( ) ;
1672
+ }
1673
+
1674
+ #[ cfg( feature = "experimental_logs_batch_log_processor_with_async_runtime" ) ]
1675
+ #[ tokio:: test( flavor = "multi_thread" ) ]
1676
+ async fn test_batch_shutdown_rt ( ) {
1677
+ // assert we will receive an error
1678
+ // setup
1679
+ let exporter = InMemoryLogExporterBuilder :: default ( )
1680
+ . keep_records_on_shutdown ( )
1681
+ . build ( ) ;
1682
+ let processor = BatchLogProcessorWithAsyncRuntime :: new (
1683
+ Box :: new ( exporter. clone ( ) ) ,
1684
+ BatchConfig :: default ( ) ,
1685
+ runtime:: Tokio ,
1686
+ ) ;
1687
+
1688
+ let mut record = LogRecord :: default ( ) ;
1689
+ let instrumentation = InstrumentationScope :: default ( ) ;
1690
+
1691
+ processor. emit ( & mut record, & instrumentation) ;
1692
+ processor. force_flush ( ) . unwrap ( ) ;
1693
+ processor. shutdown ( ) . unwrap ( ) ;
1694
+ // todo: expect to see errors here. How should we assert this?
1695
+ processor. emit ( & mut record, & instrumentation) ;
1696
+ assert_eq ! ( 1 , exporter. get_emitted_logs( ) . unwrap( ) . len( ) )
1697
+ }
1698
+
1699
+ #[ cfg( feature = "experimental_logs_batch_log_processor_with_async_runtime" ) ]
1700
+ #[ tokio:: test( flavor = "current_thread" ) ]
1701
+ #[ ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/1968" ]
1702
+ async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_multi_thread ( ) {
1703
+ let exporter = InMemoryLogExporterBuilder :: default ( ) . build ( ) ;
1704
+ let processor = BatchLogProcessorWithAsyncRuntime :: new (
1705
+ Box :: new ( exporter. clone ( ) ) ,
1706
+ BatchConfig :: default ( ) ,
1707
+ runtime:: Tokio ,
1708
+ ) ;
1709
+
1710
+ //
1711
+ // deadloack happens in shutdown with tokio current_thread runtime
1712
+ //
1713
+ processor. shutdown ( ) . unwrap ( ) ;
1714
+ }
1715
+
1716
+ #[ cfg( feature = "experimental_logs_batch_log_processor_with_async_runtime" ) ]
1717
+ #[ tokio:: test( flavor = "current_thread" ) ]
1718
+ async fn test_batch_log_processor_rt_shutdown_with_async_runtime_current_flavor_current_thread ( )
1719
+ {
1720
+ let exporter = InMemoryLogExporterBuilder :: default ( ) . build ( ) ;
1721
+ let processor = BatchLogProcessorWithAsyncRuntime :: new (
1722
+ Box :: new ( exporter. clone ( ) ) ,
1723
+ BatchConfig :: default ( ) ,
1724
+ runtime:: TokioCurrentThread ,
1725
+ ) ;
1726
+
1727
+ processor. shutdown ( ) . unwrap ( ) ;
1728
+ }
1729
+
1730
+ #[ cfg( feature = "experimental_logs_batch_log_processor_with_async_runtime" ) ]
1731
+ #[ tokio:: test( flavor = "multi_thread" ) ]
1732
+ async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_multi_thread ( ) {
1733
+ let exporter = InMemoryLogExporterBuilder :: default ( ) . build ( ) ;
1734
+ let processor = BatchLogProcessorWithAsyncRuntime :: new (
1735
+ Box :: new ( exporter. clone ( ) ) ,
1736
+ BatchConfig :: default ( ) ,
1737
+ runtime:: Tokio ,
1738
+ ) ;
1739
+
1740
+ processor. shutdown ( ) . unwrap ( ) ;
1741
+ }
1742
+
1743
+ #[ cfg( feature = "experimental_logs_batch_log_processor_with_async_runtime" ) ]
1744
+ #[ tokio:: test( flavor = "multi_thread" ) ]
1745
+ async fn test_batch_log_processor_rt_shutdown_with_async_runtime_multi_flavor_current_thread ( ) {
1746
+ let exporter = InMemoryLogExporterBuilder :: default ( ) . build ( ) ;
1747
+ let processor = BatchLogProcessorWithAsyncRuntime :: new (
1748
+ Box :: new ( exporter. clone ( ) ) ,
1749
+ BatchConfig :: default ( ) ,
1750
+ runtime:: TokioCurrentThread ,
1751
+ ) ;
1752
+
1753
+ processor. shutdown ( ) . unwrap ( ) ;
1754
+ }
1583
1755
}
0 commit comments