Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: add span command to measure command latency #3391

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#### Changes
* Core: Add `opentelemetry` protocols support ([#3191](https://github.com/valkey-io/valkey-glide/pull/3191))
* Node: Create openTelemetry span to measure command latency ([#3391](https://github.com/valkey-io/valkey-glide/pull/3391))
* Node: Fix ZADD, enabling `+inf` and `-inf` as score ([#3370](https://github.com/valkey-io/valkey-glide/pull/3370))
* Go: Add JSON.SET and JSON.GET ([#3115](https://github.com/valkey-io/valkey-glide/pull/3115))
* Csharp: updating xUnit, adding xUnit analyser rules and guidelines ([#3035](https://github.com/valkey-io/valkey-glide/pull/3035))
Expand Down
8 changes: 6 additions & 2 deletions glide-core/redis-rs/redis/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use futures_util::{
};
#[cfg(feature = "aio")]
use std::pin::Pin;
use std::sync::Arc;
use std::{fmt, io};

use crate::connection::ConnectionLike;
Expand Down Expand Up @@ -370,8 +371,11 @@ impl Cmd {
///
/// A span is used by an OpenTelemetry backend to track the lifetime of the command
#[inline]
pub fn with_span(&mut self, name: &str) -> &mut Cmd {
self.span = Some(telemetrylib::GlideOpenTelemetry::new_span(name));
pub fn with_span_by_ptr(&mut self, span_ptr: u64) -> &mut Cmd {
unsafe {
Arc::increment_strong_count(span_ptr as *const GlideSpan);
self.span = Some((*Arc::from_raw(span_ptr as *const GlideSpan)).clone());
}
self
}

Expand Down
1 change: 1 addition & 0 deletions glide-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ pub use client::ConnectionRequest;
pub mod cluster_scan_container;
pub mod request_type;
pub use telemetrylib::Telemetry;
pub use telemetrylib::{GlideOpenTelemetry, GlideSpan};
1 change: 1 addition & 0 deletions glide-core/src/protobuf/command_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -526,4 +526,5 @@ message CommandRequest {
UpdateConnectionPassword update_connection_password = 7;
}
Routes route = 8;
optional uint64 span_command = 9;
}
1 change: 1 addition & 0 deletions glide-core/src/protobuf/response.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ message Response {
string closing_error = 5;
}
bool is_push = 6;
optional uint64 span_command = 7;
}

enum ConstantResponse {
Expand Down
38 changes: 32 additions & 6 deletions glide-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,12 @@ async fn write_result(
resp_result: ClientUsageResult<Value>,
callback_index: u32,
writer: &Rc<Writer>,
span_command_ptr: Option<u64>,
) -> Result<(), io::Error> {
let mut response = Response::new();
response.callback_idx = callback_index;
response.is_push = false;
response.span_command = span_command_ptr;
response.value = match resp_result {
Ok(Value::Okay) => Some(response::response::Value::ConstantResponse(
response::ConstantResponse::OK.into(),
Expand Down Expand Up @@ -308,8 +310,25 @@ async fn send_command(
.send_command(&cmd, routing)
.await
.map_err(|err| err.into());
if let Some(child_span) = child_span {
child_span.end();
match child_span {
Some(Ok(child_span)) => {
child_span.end();
}
Some(Err(error_msg)) => {
log_error(
"OpenTelemetry error",
format!(
"The child span `send command` for command {:?} was failed with error: {:?}",
cmd, error_msg
),
);
}
None => {
log_info(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove

"OpenTelemetry",
"No span created - this is expected as we only sample part of requests for tracing",
);
}
}
res
}
Expand Down Expand Up @@ -474,18 +493,25 @@ fn handle_request(request: CommandRequest, mut client: Client, writer: Rc<Writer
true => match request.command {
Some(action) => match action {
command_request::Command::ClusterScan(cluster_scan_command) => {
//ToDo: handle scan command
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO reminder

cluster_scan(cluster_scan_command, client).await
}
command_request::Command::SingleCommand(command) => {
match get_redis_command(&command) {
Ok(cmd) => match get_route(request.route.0, Some(&cmd)) {
Ok(routes) => send_command(cmd, client, routes).await,
Ok(mut cmd) => match get_route(request.route.0, Some(&cmd)) {
Ok(routes) => {
if let Some(span_command) = request.span_command {
cmd.with_span_by_ptr(span_command);
}
send_command(cmd, client, routes).await
}
Err(e) => Err(e),
},
Err(e) => Err(e),
}
}
command_request::Command::Transaction(transaction) => {
//ToDo: handle Batch command
match get_route(request.route.0, None) {
Ok(routes) => send_transaction(transaction, &mut client, routes).await,
Err(e) => Err(e),
Expand Down Expand Up @@ -551,7 +577,7 @@ fn handle_request(request: CommandRequest, mut client: Client, writer: Rc<Writer
client_clone.release_inflight_request();
}

let _res = write_result(result, request.callback_idx, &writer).await;
let _res = write_result(result, request.callback_idx, &writer, request.span_command).await;
});
}

Expand Down Expand Up @@ -581,7 +607,7 @@ async fn create_client(
Ok(client) => client,
Err(err) => return Err(ClientCreationError::ConnectionError(err)),
};
write_result(Ok(Value::Okay), 0, writer).await?;
write_result(Ok(Value::Okay), 0, writer, None).await?;
Ok(client)
}

Expand Down
106 changes: 91 additions & 15 deletions glide-core/telemetry/src/open_telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use opentelemetry_sdk::runtime::Tokio;
use opentelemetry_sdk::trace::{BatchConfig, BatchSpanProcessor, TracerProvider};
use std::io::{Error, ErrorKind};
use std::path::PathBuf;
#[cfg(test)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use thiserror::Error;
use url::Url;
Expand Down Expand Up @@ -90,6 +92,9 @@ fn parse_endpoint(endpoint: &str) -> Result<GlideOpenTelemetryTraceExporter, Err
#[derive(Clone, Debug)]
struct GlideSpanInner {
span: Arc<RwLock<opentelemetry::global::BoxedSpan>>,
span_name: String,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is span name ever in used?

#[cfg(test)]
reference_count: Arc<AtomicUsize>,
}

impl GlideSpanInner {
Expand All @@ -102,15 +107,25 @@ impl GlideSpanInner {
.with_kind(SpanKind::Client)
.start(&tracer),
));
GlideSpanInner { span }

GlideSpanInner {
span,
span_name: name.to_string(),
#[cfg(test)]
reference_count: Arc::new(AtomicUsize::new(1)),
}
}

pub fn print_span_name(&self) {
println!("The span name is: {}", self.span_name);
}

/// Create new span as a child of `parent`.
pub fn new_with_parent(name: &str, parent: &GlideSpanInner) -> Self {
/// Create new span as a child of `parent`, returning an error if the parent span lock is poisoned.
pub fn new_with_parent(name: &str, parent: &GlideSpanInner) -> Result<Self, TraceError> {
let parent_span_ctx = parent
.span
.read()
.expect(SPAN_READ_LOCK_ERR)
.map_err(|_| TraceError::from(SPAN_READ_LOCK_ERR))?
.span_context()
.clone();

Expand All @@ -124,7 +139,12 @@ impl GlideSpanInner {
.with_kind(SpanKind::Client)
.start_with_context(&tracer, &parent_context),
));
GlideSpanInner { span }
Ok(GlideSpanInner {
span,
span_name: name.to_string(),
#[cfg(test)]
reference_count: Arc::new(AtomicUsize::new(1)),
})
}

/// Attach event with name and list of attributes to this span.
Expand Down Expand Up @@ -164,17 +184,21 @@ impl GlideSpanInner {
}
}

/// Create new span, add it as a child to this span and return it
pub fn add_span(&self, name: &str) -> GlideSpanInner {
let child = GlideSpanInner::new_with_parent(name, self);
/// Create new span, add it as a child to this span and return it.
/// Returns an error if the child span creation fails.
pub fn add_span(&self, name: &str) -> Result<GlideSpanInner, TraceError> {
let child = GlideSpanInner::new_with_parent(name, self)?;
{
let child_span = child.span.read().expect(SPAN_WRITE_LOCK_ERR);
let child_span = child
.span
.read()
.map_err(|_| TraceError::from(SPAN_READ_LOCK_ERR))?;
self.span
.write()
.expect(SPAN_WRITE_LOCK_ERR)
.add_link(child_span.span_context().clone(), Vec::default());
}
child
Ok(child)
}

/// Return the span ID
Expand All @@ -191,6 +215,34 @@ impl GlideSpanInner {
pub fn end(&self) {
self.span.write().expect(SPAN_READ_LOCK_ERR).end()
}

#[cfg(test)]
pub fn get_reference_count(&self) -> usize {
self.reference_count.load(Ordering::SeqCst)
}

#[cfg(test)]
pub fn increment_reference_count(&self) {
self.reference_count.fetch_add(1, Ordering::SeqCst);
}

#[cfg(test)]
pub fn decrement_reference_count(&self) {
self.reference_count.fetch_sub(1, Ordering::SeqCst);
}
}

#[cfg(test)]
impl Drop for GlideSpanInner {
fn drop(&mut self) {
// Only print debug info if the reference count is non-zero
let current_count = self.reference_count.load(Ordering::SeqCst);
if current_count > 0 {
self.decrement_reference_count();
} else {
panic!("Span reference count is 0");
}
}
}

#[derive(Clone, Debug)]
Expand All @@ -205,6 +257,10 @@ impl GlideSpan {
}
}

pub fn print_span(&self) {
self.inner.print_span_name();
}

/// Attach event with name to this span.
pub fn add_event(&self, name: &str) {
self.inner.add_event(name, None)
Expand All @@ -220,10 +276,12 @@ impl GlideSpan {
}

/// Add child span to this span and return it
pub fn add_span(&self, name: &str) -> GlideSpan {
GlideSpan {
inner: self.inner.add_span(name),
}
pub fn add_span(&self, name: &str) -> Result<GlideSpan, TraceError> {
let inner_span = self.inner.add_span(name).map_err(|err| {
TraceError::from(format!("Failed to create child span '{}': {}", name, err))
})?;

Ok(GlideSpan { inner: inner_span })
}

pub fn id(&self) -> String {
Expand All @@ -234,6 +292,16 @@ impl GlideSpan {
pub fn end(&self) {
self.inner.end()
}

#[cfg(test)]
pub fn add_reference(&self) {
self.inner.increment_reference_count();
}

#[cfg(test)]
pub fn get_reference_count(&self) -> usize {
self.inner.get_reference_count()
}
}

/// OpenTelemetry configuration object. Use `GlideOpenTelemetryConfigBuilder` to construct it:
Expand Down Expand Up @@ -371,7 +439,7 @@ mod tests {
span.add_event("Event1");
span.set_status(GlideSpanStatus::Ok);

let child1 = span.add_span("Network_Span");
let child1 = span.add_span("Network_Span").unwrap();

// Simulate some work
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Expand Down Expand Up @@ -474,4 +542,12 @@ mod tests {
create_test_spans().await;
});
}

#[test]
fn test_span_reference_count() {
let span = GlideOpenTelemetry::new_span("Root_Span_1");
span.add_reference();
assert_eq!(span.get_reference_count(), 2);
drop(span);
}
}
20 changes: 18 additions & 2 deletions node/rust-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0

use glide_core::Telemetry;
use glide_core::{GlideOpenTelemetry, GlideSpan, Telemetry};
use redis::GlideConnectionOptions;

#[cfg(not(target_env = "msvc"))]
Expand All @@ -14,7 +14,6 @@ use byteorder::{LittleEndian, WriteBytesExt};
use bytes::Bytes;
use glide_core::start_socket_listener;
use glide_core::MAX_REQUEST_ARGS_LENGTH;
#[cfg(feature = "testing_utilities")]
use napi::bindgen_prelude::BigInt;
use napi::bindgen_prelude::Either;
use napi::bindgen_prelude::Uint8Array;
Expand All @@ -26,6 +25,7 @@ use redis::{aio::MultiplexedConnection, AsyncCommands, Value};
use std::collections::HashMap;
use std::ptr::from_mut;
use std::str;
use std::sync::Arc;
use tokio::runtime::{Builder, Runtime};
#[napi]
pub enum Level {
Expand Down Expand Up @@ -410,6 +410,22 @@ pub fn create_leaked_double(float: f64) -> [u32; 2] {
split_pointer(pointer)
}

/// Creates an open telemetry span with the given name and returns a pointer to the span
#[napi(ts_return_type = "[number, number]")]
pub fn create_leaked_otel_span(name: String) -> [u32; 2] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not to return a BigInt?

let span = GlideOpenTelemetry::new_span(&name);
let s = Arc::into_raw(Arc::new(span)) as *mut GlideSpan;
split_pointer(s)
}

#[napi]
pub fn drop_otel_span(span_ptr: BigInt) {
let span_ptr = span_ptr.get_u64().1;
if span_ptr != 0 {
unsafe { Arc::from_raw(span_ptr as *const GlideSpan) };
}
}

#[napi]
/// A wrapper for a script object. As long as this object is alive, the script's code is saved in memory, and can be resent to the server.
struct Script {
Expand Down
Loading
Loading