Skip to content

Commit 62e8bc4

Browse files
committed
feat(data-pipeline-ffi): migrate export function to sidecar-ffi
Signed-off-by: Alexandre Rulleau <[email protected]>
1 parent 619410e commit 62e8bc4

File tree

5 files changed

+126
-123
lines changed

5 files changed

+126
-123
lines changed

Cargo.lock

Lines changed: 2 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

data-pipeline-ffi/Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,8 @@ datadog-trace-utils = { path = "../datadog-trace-utils" }
2929
[dependencies]
3030
data-pipeline = { path = "../data-pipeline" }
3131
datadog-ipc = { path = "../ipc" }
32-
datadog-sidecar = { path = "../sidecar" }
33-
datadog-sidecar-ffi = { path = "../sidecar-ffi" }
3432
datadog-trace-utils = { path = "../trace-utils" }
3533
ddcommon-ffi = { path = "../ddcommon-ffi", default-features = false }
3634
ddtelemetry-ffi = { path = "../ddtelemetry-ffi", default-features = false }
3735
rmp-serde = "1.1.1"
3836
tinybytes = { path = "../tinybytes" }
39-
tracing = { version = "0.1", default-features = false }

data-pipeline-ffi/src/span.rs

Lines changed: 2 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,16 @@
11
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use datadog_ipc::platform::{MappedMem, ShmHandle};
5-
use datadog_sidecar_ffi::{
6-
ddog_alloc_anon_shm_handle, ddog_map_shm, ddog_sidecar_send_trace_v04_bytes,
7-
ddog_sidecar_send_trace_v04_shm, ddog_unmap_shm, TracerHeaderTags,
8-
};
94
use datadog_trace_utils::span::{
105
AttributeAnyValueBytes, AttributeArrayValueBytes, SpanBytes, SpanEventBytes, SpanLinkBytes,
116
};
12-
use ddcommon_ffi::{
13-
ddog_Error_message,
14-
slice::{AsBytes, CharSlice},
15-
MaybeError,
16-
};
7+
use ddcommon_ffi::slice::{AsBytes, CharSlice};
178
use std::ffi::c_char;
189
use std::ffi::c_void;
1910
use std::io::Cursor;
2011
use std::slice;
2112
use tinybytes::{Bytes, BytesString};
2213

23-
use datadog_sidecar::service::{blocking::SidecarTransport, InstanceId};
24-
2514
// ---------------- Macros -------------------
2615

2716
// Set a BytesString field of the given pointer.
@@ -669,26 +658,6 @@ pub unsafe extern "C" fn ddog_add_event_attributes_float(
669658

670659
// ------------------- Export Functions -------------------
671660

672-
#[repr(C)]
673-
#[derive()]
674-
pub struct SenderParameters {
675-
pub tracer_headers_tags: TracerHeaderTags<'static>,
676-
pub transport: Box<SidecarTransport>,
677-
pub instance_id: *mut InstanceId,
678-
pub limit: usize,
679-
pub n_requests: i64,
680-
pub buffer_size: i64,
681-
pub url: CharSlice<'static>,
682-
}
683-
684-
unsafe fn check_error(msg: &str, maybe_error: MaybeError) -> bool {
685-
if maybe_error != MaybeError::None {
686-
tracing::error!("{}: {}", msg, ddog_Error_message(maybe_error.to_std_ref()));
687-
return false;
688-
}
689-
true
690-
}
691-
692661
#[no_mangle]
693662
#[allow(clippy::missing_safety_doc)]
694663
pub unsafe extern "C" fn ddog_serialize_trace_into_c_string(
@@ -712,7 +681,7 @@ pub unsafe extern "C" fn ddog_serialize_trace_into_c_string(
712681
}
713682
}
714683

715-
unsafe fn serialize_traces_into_mapped_memory(
684+
pub unsafe fn serialize_traces_into_mapped_memory(
716685
traces_ptr: *const TracesBytes,
717686
buf_ptr: *mut c_void,
718687
cap: usize,
@@ -731,89 +700,6 @@ unsafe fn serialize_traces_into_mapped_memory(
731700
}
732701
}
733702

734-
#[no_mangle]
735-
#[allow(clippy::missing_safety_doc)]
736-
pub unsafe extern "C" fn ddog_send_traces_to_sidecar(
737-
traces_ptr: *mut TracesBytes,
738-
parameters: &mut SenderParameters,
739-
) {
740-
if traces_ptr.is_null() {
741-
tracing::error!("Invalid traces pointer");
742-
return;
743-
}
744-
745-
let traces = &*traces_ptr;
746-
let size: usize = traces.iter().map(|trace| trace.len()).sum();
747-
748-
if parameters.transport.is_closed() {
749-
tracing::info!("Skipping flushing traces of size {} as connection to sidecar failed", size);
750-
return;
751-
}
752-
753-
let mut shm: *mut ShmHandle = std::ptr::null_mut();
754-
let mut mapped_shm: *mut MappedMem<ShmHandle> = std::ptr::null_mut();
755-
756-
if !check_error(
757-
"Failed allocating shared memory",
758-
ddog_alloc_anon_shm_handle(parameters.limit, &mut shm),
759-
) {
760-
return;
761-
}
762-
763-
let mut size: usize = 0;
764-
let mut pointer = std::ptr::null_mut();
765-
if !check_error(
766-
"Failed mapping shared memory",
767-
ddog_map_shm(Box::from_raw(shm), &mut mapped_shm, &mut pointer, &mut size),
768-
) {
769-
return;
770-
}
771-
772-
let boxed_mapped_shm = Box::from_raw(mapped_shm);
773-
774-
let written = serialize_traces_into_mapped_memory(traces, pointer, size);
775-
ddog_unmap_shm(boxed_mapped_shm);
776-
if !written == 0 {
777-
return;
778-
}
779-
780-
let mut size_hint = written;
781-
if parameters.n_requests > 0 {
782-
size_hint = size_hint.max((parameters.buffer_size / parameters.n_requests + 1) as usize);
783-
}
784-
785-
let send_error = ddog_sidecar_send_trace_v04_shm(
786-
&mut parameters.transport,
787-
&*parameters.instance_id,
788-
Box::from_raw(shm),
789-
size_hint,
790-
&parameters.tracer_headers_tags,
791-
);
792-
793-
loop {
794-
if send_error != MaybeError::None {
795-
let mut buffer = vec![0u8; written];
796-
pointer = buffer.as_mut_ptr() as *mut c_void;
797-
serialize_traces_into_mapped_memory(traces, pointer, written);
798-
799-
let retry_error = ddog_sidecar_send_trace_v04_bytes(
800-
&mut parameters.transport,
801-
&*parameters.instance_id,
802-
CharSlice::from_raw_parts(pointer.cast(), written),
803-
&parameters.tracer_headers_tags,
804-
);
805-
806-
if check_error("Failed sending traces to the sidecar", retry_error) {
807-
tracing::debug!("Failed sending traces via shm to sidecar: {}", ddog_Error_message(send_error.to_std_ref()));
808-
} else {
809-
break;
810-
}
811-
}
812-
813-
tracing::info!("Flushing traces of size {} to send-queue for {}", size, parameters.url);
814-
}
815-
}
816-
817703
// ------------------- Tests -------------------
818704

819705
#[cfg(test)]

datadog-sidecar-ffi/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ datadog-live-debugger = { path = "../datadog-live-debugger" }
2323
paste = "1"
2424
libc = "0.2"
2525
dogstatsd-client = { path = "../dogstatsd-client" }
26+
tracing = { version = "0.1", default-features = false }
27+
data-pipeline-ffi = { path = "../data-pipeline-ffi" }
28+
2629

2730
[target.'cfg(windows)'.dependencies]
2831
datadog-crashtracker-ffi = { path = "../datadog-crashtracker-ffi", features = ["collector", "collector_windows"] }

datadog-sidecar-ffi/src/lib.rs

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#![cfg_attr(not(test), deny(clippy::todo))]
88
#![cfg_attr(not(test), deny(clippy::unimplemented))]
99

10+
use data_pipeline_ffi::span::{serialize_traces_into_mapped_memory, TracesBytes};
1011
#[cfg(windows)]
1112
use datadog_crashtracker_ffi::Metadata;
1213
use datadog_ipc::platform::{
@@ -31,7 +32,7 @@ use datadog_sidecar::shm_remote_config::{path_for_remote_config, RemoteConfigRea
3132
use ddcommon::tag::Tag;
3233
use ddcommon::Endpoint;
3334
use ddcommon_ffi as ffi;
34-
use ddcommon_ffi::{CharSlice, MaybeError};
35+
use ddcommon_ffi::{ddog_Error_message, CharSlice, MaybeError};
3536
use ddtelemetry::{
3637
data::{self, Dependency, Integration},
3738
worker::{LifecycleAction, TelemetryActions},
@@ -999,6 +1000,123 @@ pub unsafe extern "C" fn ddog_get_agent_info_env<'a>(
9991000
.unwrap_or(ffi::CharSlice::empty())
10001001
}
10011002

1003+
#[macro_export]
1004+
macro_rules! check {
1005+
($failable:expr, $msg:expr) => {
1006+
match $failable {
1007+
Ok(o) => o,
1008+
Err(e) => {
1009+
tracing::error!("{}: {}", $msg, e);
1010+
return;
1011+
}
1012+
}
1013+
};
1014+
}
1015+
1016+
#[repr(C)]
1017+
#[derive()]
1018+
pub struct SenderParameters {
1019+
pub tracer_headers_tags: TracerHeaderTags<'static>,
1020+
pub transport: Box<SidecarTransport>,
1021+
pub instance_id: *mut InstanceId,
1022+
pub limit: usize,
1023+
pub n_requests: i64,
1024+
pub buffer_size: i64,
1025+
pub url: CharSlice<'static>,
1026+
}
1027+
1028+
#[no_mangle]
1029+
#[allow(clippy::missing_safety_doc)]
1030+
pub unsafe extern "C" fn ddog_send_traces_to_sidecar(
1031+
traces_ptr: *mut TracesBytes,
1032+
parameters: &mut SenderParameters,
1033+
) {
1034+
if traces_ptr.is_null() {
1035+
tracing::error!("Invalid traces pointer");
1036+
return;
1037+
}
1038+
1039+
let traces = &*traces_ptr;
1040+
let size: usize = traces.iter().map(|trace| trace.len()).sum();
1041+
1042+
// Check connection to the sidecar
1043+
if parameters.transport.is_closed() {
1044+
tracing::info!(
1045+
"Skipping flushing traces of size {} as connection to sidecar failed",
1046+
size
1047+
);
1048+
return;
1049+
}
1050+
1051+
// Create and map shared memory
1052+
let shm = check!(
1053+
ShmHandle::new(parameters.limit),
1054+
"Failed to create shared memory"
1055+
);
1056+
1057+
let mut mapped_shm = check!(shm.clone().map(), "Failed to map shared memory");
1058+
1059+
// Write traces to the shared memory
1060+
let slice = mapped_shm.as_slice_mut();
1061+
let pointer = slice as *mut [u8] as *mut c_void;
1062+
let size = slice.len();
1063+
1064+
let written = serialize_traces_into_mapped_memory(traces, pointer, size);
1065+
1066+
// Send traces to the sidecar via the shared memory handler
1067+
let mut size_hint = written;
1068+
if parameters.n_requests > 0 {
1069+
size_hint = size_hint.max((parameters.buffer_size / parameters.n_requests + 1) as usize);
1070+
}
1071+
1072+
let send_error = match blocking::send_trace_v04_shm(
1073+
&mut parameters.transport,
1074+
&*parameters.instance_id,
1075+
shm,
1076+
size_hint,
1077+
check!(
1078+
(&parameters.tracer_headers_tags).try_into(),
1079+
"Failed to convert tracer headers tags"
1080+
),
1081+
) {
1082+
Ok(_) => MaybeError::None,
1083+
Err(e) => MaybeError::Some(ddcommon_ffi::Error::from(format!("{:?}", e))),
1084+
};
1085+
1086+
// Retry sending traces via bytes if there was an error
1087+
loop {
1088+
if send_error != MaybeError::None {
1089+
let mut buffer = vec![0u8; written];
1090+
let pointer = buffer.as_mut_ptr() as *mut c_void;
1091+
serialize_traces_into_mapped_memory(traces, pointer, written);
1092+
1093+
match blocking::send_trace_v04_bytes(
1094+
&mut parameters.transport,
1095+
&*parameters.instance_id,
1096+
CharSlice::from_raw_parts(pointer.cast(), written)
1097+
.as_bytes()
1098+
.to_vec(),
1099+
check!(
1100+
(&parameters.tracer_headers_tags).try_into(),
1101+
"Failed to convert tracer headers tags"
1102+
),
1103+
) {
1104+
Ok(_) => break,
1105+
Err(_) => tracing::debug!(
1106+
"Failed sending traces via shm to sidecar: {}",
1107+
ddog_Error_message(send_error.to_std_ref())
1108+
),
1109+
};
1110+
}
1111+
1112+
tracing::info!(
1113+
"Flushing traces of size {} to send-queue for {}",
1114+
size,
1115+
parameters.url
1116+
);
1117+
}
1118+
}
1119+
10021120
/// Drops the agent info reader.
10031121
#[no_mangle]
10041122
#[allow(clippy::missing_safety_doc)]

0 commit comments

Comments
 (0)