Skip to content

Commit ac8e4e5

Browse files
authored
Add wasm support to jaeger (open-telemetry#365)
1 parent 6249046 commit ac8e4e5

File tree

17 files changed

+323
-111
lines changed

17 files changed

+323
-111
lines changed

opentelemetry-jaeger/Cargo.toml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,41 @@ rustdoc-args = ["--cfg", "docsrs"]
2222
[dependencies]
2323
async-std = { version = "1.6", optional = true }
2424
async-trait = "0.1"
25+
base64 = { version = "0.13", optional = true }
26+
futures-util = { version = "0.3", optional = true }
2527
http = { version = "0.2", optional = true }
2628
isahc = { version = "0.9", default-features = false, optional = true }
29+
js-sys = { version = "0.3", optional = true }
2730
opentelemetry = { version = "0.10", default-features = false, features = ["trace"], path = "../opentelemetry" }
31+
pin-project = { version = "1.0", optional = true }
2832
thrift = "0.13"
2933
tokio = { version = "0.2", features = ["udp", "sync"], optional = true }
34+
wasm-bindgen = { version = "0.2", optional = true }
35+
wasm-bindgen-futures = { version = "0.4.18", optional = true }
36+
37+
[dependencies.web-sys]
38+
version = "0.3.4"
39+
features = [
40+
'Headers',
41+
'Request',
42+
'RequestCredentials',
43+
'RequestInit',
44+
'RequestMode',
45+
'Response',
46+
'Window',
47+
]
48+
optional = true
3049

3150
[features]
3251
default = []
3352
collector_client = ["isahc", "http"]
53+
wasm_collector_client = [
54+
"base64",
55+
"futures-util",
56+
"http",
57+
"js-sys",
58+
"pin-project",
59+
"wasm-bindgen",
60+
"wasm-bindgen-futures",
61+
"web-sys",
62+
]

opentelemetry-jaeger/src/collector.rs

Lines changed: 222 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,88 +1,247 @@
11
//! # HTTP Jaeger Collector Client
2-
use crate::thrift::jaeger;
3-
use http::{Request, Uri};
4-
use isahc::{
5-
auth::{Authentication, Credentials},
6-
config::Configurable,
7-
HttpClient,
8-
};
9-
use std::io::{self, Cursor};
10-
use std::sync::atomic::{AtomicUsize, Ordering};
11-
use thrift::protocol::TBinaryOutputProtocol;
2+
use http::Uri;
3+
use std::sync::atomic::AtomicUsize;
124

135
/// `CollectorAsyncClientHttp` implements an async version of the
146
/// `TCollectorSyncClient` interface over HTTP
157
#[derive(Debug)]
168
pub(crate) struct CollectorAsyncClientHttp {
179
endpoint: Uri,
18-
client: HttpClient,
10+
#[cfg(feature = "collector_client")]
11+
client: isahc::HttpClient,
12+
#[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))]
13+
client: WasmHttpClient,
1914
payload_size_estimate: AtomicUsize,
2015
}
2116

22-
impl CollectorAsyncClientHttp {
23-
/// Create a new HTTP collector client
24-
pub(crate) fn new(
25-
endpoint: Uri,
26-
username: Option<String>,
27-
password: Option<String>,
28-
) -> thrift::Result<Self> {
29-
let mut builder = HttpClient::builder();
30-
if let (Some(username), Some(password)) = (username, password) {
31-
builder = builder
32-
.authentication(Authentication::basic())
33-
.credentials(Credentials::new(username, password));
17+
#[cfg(feature = "wasm_collector_client")]
18+
#[derive(Debug)]
19+
struct WasmHttpClient {
20+
auth: Option<String>,
21+
}
22+
23+
#[cfg(feature = "collector_client")]
24+
mod collector_client {
25+
use super::*;
26+
use crate::thrift::jaeger;
27+
use http::{Request, Uri};
28+
use isahc::{
29+
auth::{Authentication, Credentials},
30+
config::Configurable,
31+
HttpClient,
32+
};
33+
use std::io::{self, Cursor};
34+
use std::sync::atomic::{AtomicUsize, Ordering};
35+
use thrift::protocol::TBinaryOutputProtocol;
36+
37+
impl CollectorAsyncClientHttp {
38+
/// Create a new HTTP collector client
39+
pub(crate) fn new(
40+
endpoint: Uri,
41+
username: Option<String>,
42+
password: Option<String>,
43+
) -> thrift::Result<Self> {
44+
let mut builder = HttpClient::builder();
45+
if let (Some(username), Some(password)) = (username, password) {
46+
builder = builder
47+
.authentication(Authentication::basic())
48+
.credentials(Credentials::new(username, password));
49+
}
50+
let client = builder
51+
.build()
52+
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
53+
let payload_size_estimate = AtomicUsize::new(512);
54+
55+
Ok(CollectorAsyncClientHttp {
56+
endpoint,
57+
client,
58+
payload_size_estimate,
59+
})
60+
}
61+
62+
/// Submit list of Jaeger batches
63+
pub(crate) async fn submit_batch(
64+
&self,
65+
batch: jaeger::Batch,
66+
) -> thrift::Result<jaeger::BatchSubmitResponse> {
67+
// estimate transport capacity based on last request
68+
let estimate = self.payload_size_estimate.load(Ordering::Relaxed);
69+
70+
// Write payload to transport buffer
71+
let transport = Cursor::new(Vec::with_capacity(estimate));
72+
let mut protocol = TBinaryOutputProtocol::new(transport, true);
73+
batch.write_to_out_protocol(&mut protocol)?;
74+
75+
// Use current batch capacity as new estimate
76+
self.payload_size_estimate
77+
.store(protocol.transport.get_ref().len(), Ordering::Relaxed);
78+
79+
// Build collector request
80+
let req = Request::builder()
81+
.method("POST")
82+
.uri(&self.endpoint)
83+
.header("Content-Type", "application/vnd.apache.thrift.binary")
84+
.body(protocol.transport.into_inner())
85+
.expect("request should always be valid");
86+
87+
// Send request to collector
88+
let res = self
89+
.client
90+
.send_async(req)
91+
.await
92+
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
93+
94+
if !res.status().is_success() {
95+
return Err(thrift::Error::from(io::Error::new(
96+
io::ErrorKind::Other,
97+
format!("Expected success response, got {:?}", res.status()),
98+
)));
99+
}
100+
101+
Ok(jaeger::BatchSubmitResponse { ok: true })
34102
}
35-
let client = builder
36-
.build()
37-
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
38-
let payload_size_estimate = AtomicUsize::new(512);
39-
40-
Ok(CollectorAsyncClientHttp {
41-
endpoint,
42-
client,
43-
payload_size_estimate,
44-
})
45103
}
104+
}
105+
106+
#[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))]
107+
mod wasm_collector_client {
108+
use super::*;
109+
use crate::thrift::jaeger;
110+
use futures_util::future;
111+
use http::Uri;
112+
use js_sys::Uint8Array;
113+
use std::future::Future;
114+
use std::io::{self, Cursor};
115+
use std::pin::Pin;
116+
use std::sync::atomic::{AtomicUsize, Ordering};
117+
use std::task::{Context, Poll};
118+
use thrift::protocol::TBinaryOutputProtocol;
119+
use wasm_bindgen::JsCast;
120+
use wasm_bindgen_futures::JsFuture;
121+
use web_sys::{Request, RequestCredentials, RequestInit, RequestMode, Response};
122+
123+
impl CollectorAsyncClientHttp {
124+
/// Create a new HTTP collector client
125+
pub(crate) fn new(
126+
endpoint: Uri,
127+
username: Option<String>,
128+
password: Option<String>,
129+
) -> thrift::Result<Self> {
130+
let auth = if let (Some(username), Some(password)) = (username, password) {
131+
let mut auth = String::from("Basic ");
132+
base64::encode_config_buf(username, base64::STANDARD, &mut auth);
133+
auth.push(':');
134+
base64::encode_config_buf(password, base64::STANDARD, &mut auth);
135+
Some(auth)
136+
} else {
137+
None
138+
};
139+
let payload_size_estimate = AtomicUsize::new(512);
140+
141+
Ok(Self {
142+
endpoint,
143+
client: WasmHttpClient { auth },
144+
payload_size_estimate,
145+
})
146+
}
46147

47-
/// Submit list of Jaeger batches
48-
pub(crate) async fn submit_batch(
49-
&self,
50-
batch: jaeger::Batch,
51-
) -> thrift::Result<jaeger::BatchSubmitResponse> {
52-
// estimate transport capacity based on last request
53-
let estimate = self.payload_size_estimate.load(Ordering::Relaxed);
54-
55-
// Write payload to transport buffer
56-
let transport = Cursor::new(Vec::with_capacity(estimate));
57-
let mut protocol = TBinaryOutputProtocol::new(transport, true);
58-
batch.write_to_out_protocol(&mut protocol)?;
59-
60-
// Use current batch capacity as new estimate
61-
self.payload_size_estimate
62-
.store(protocol.transport.get_ref().len(), Ordering::Relaxed);
63-
64-
// Build collector request
65-
let req = Request::builder()
66-
.method("POST")
67-
.uri(&self.endpoint)
68-
.header("Content-Type", "application/vnd.apache.thrift.binary")
69-
.body(protocol.transport.into_inner())
70-
.expect("request should always be valid");
148+
/// Submit list of Jaeger batches
149+
pub(crate) fn submit_batch(
150+
&self,
151+
batch: jaeger::Batch,
152+
) -> impl Future<Output = thrift::Result<jaeger::BatchSubmitResponse>> + Send + 'static
153+
{
154+
self.build_request(batch)
155+
.map(post_request)
156+
.map(|fut| future::Either::Left(SubmitBatchFuture(fut)))
157+
.unwrap_or_else(|e| future::Either::Right(future::err(e)))
158+
}
159+
160+
fn build_request(&self, batch: jaeger::Batch) -> thrift::Result<Request> {
161+
// estimate transport capacity based on last request
162+
let estimate = self.payload_size_estimate.load(Ordering::Relaxed);
163+
164+
// Write payload to transport buffer
165+
let transport = Cursor::new(Vec::with_capacity(estimate));
166+
let mut protocol = TBinaryOutputProtocol::new(transport, true);
167+
batch.write_to_out_protocol(&mut protocol)?;
168+
169+
// Use current batch capacity as new estimate
170+
self.payload_size_estimate
171+
.store(protocol.transport.get_ref().len(), Ordering::Relaxed);
172+
173+
// Build collector request
174+
let mut options = RequestInit::new();
175+
options.method("POST");
176+
options.mode(RequestMode::Cors);
177+
178+
let body: Uint8Array = protocol.transport.get_ref().as_slice().into();
179+
options.body(Some(body.as_ref()));
180+
181+
if self.client.auth.is_some() {
182+
options.credentials(RequestCredentials::Include);
183+
}
184+
185+
let request = Request::new_with_str_and_init(&self.endpoint.to_string(), &options)
186+
.map_err(jsvalue_into_ioerror)?;
187+
let headers = request.headers();
188+
headers
189+
.set("Content-Type", "application/vnd.apache.thrift.binary")
190+
.map_err(jsvalue_into_ioerror)?;
191+
if let Some(auth) = self.client.auth.as_ref() {
192+
headers
193+
.set("Authorization", auth)
194+
.map_err(jsvalue_into_ioerror)?;
195+
}
196+
197+
Ok(request)
198+
}
199+
}
71200

201+
async fn post_request(request: Request) -> thrift::Result<jaeger::BatchSubmitResponse> {
72202
// Send request to collector
73-
let res = self
74-
.client
75-
.send_async(req)
203+
let window = web_sys::window().unwrap();
204+
let res_value = JsFuture::from(window.fetch_with_request(&request))
76205
.await
77-
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
206+
.map_err(jsvalue_into_ioerror)?;
207+
let res: Response = res_value.dyn_into().unwrap();
78208

79-
if !res.status().is_success() {
209+
if !res.ok() {
80210
return Err(thrift::Error::from(io::Error::new(
81211
io::ErrorKind::Other,
82-
format!("Expected success response, got {:?}", res.status()),
212+
format!(
213+
"Expected success response, got {} ({})",
214+
res.status(),
215+
res.status_text()
216+
),
83217
)));
84218
}
85219

86220
Ok(jaeger::BatchSubmitResponse { ok: true })
87221
}
222+
223+
/// Wrapper of web fetch API future marked as Send.
224+
///
225+
/// At the moment, the web APIs are single threaded. Since all opentelemetry futures are
226+
/// required to be Send, we mark this future as Send.
227+
#[pin_project::pin_project]
228+
struct SubmitBatchFuture<F>(#[pin] F);
229+
230+
unsafe impl<F> Send for SubmitBatchFuture<F> {}
231+
232+
impl<F: Future> Future for SubmitBatchFuture<F> {
233+
type Output = F::Output;
234+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
235+
self.project().0.poll(cx)
236+
}
237+
}
238+
239+
fn jsvalue_into_ioerror(value: wasm_bindgen::JsValue) -> io::Error {
240+
io::Error::new(
241+
io::ErrorKind::Other,
242+
js_sys::JSON::stringify(&value)
243+
.map(String::from)
244+
.unwrap_or_else(|_| "unknown error".to_string()),
245+
)
246+
}
88247
}

0 commit comments

Comments
 (0)