Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions .github/workflows/service-replay.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ jobs:
contents: read
packages: write
id-token: write
deployments: write
strategy:
matrix:
include:
Expand Down Expand Up @@ -69,8 +70,8 @@ jobs:
tags: "${{matrix.tag}}:${{ env.GITHUB_REF_SLUG }}"
load: true
context: .
cache-from: type=gha
cache-to: type=gha,mode=max
# cache-from: type=gha
# cache-to: type=gha,mode=max

# Login against a Docker registry except on PR
# https://github.com/docker/login-action
Expand Down Expand Up @@ -100,8 +101,8 @@ jobs:
tags: "${{matrix.tag}}:${{ env.TAG }}"
push: true
context: .
cache-from: type=gha
cache-to: type=gha,mode=max
# cache-from: type=gha
# cache-to: type=gha,mode=max

- name: Configure Docker rolling release image tag ${{matrix.tag}}:${{ env.GITHUB_REF_SLUG }}
if: github.ref == 'refs/heads/main' || github.ref == 'refs/tags/*'
Expand All @@ -121,5 +122,5 @@ jobs:
tags: "${{matrix.tag}}:${{ env.SAVANT_RS_VERSION }}-rolling"
push: true
context: .
cache-from: type=gha
cache-to: type=gha,mode=max
# cache-from: type=gha
# cache-to: type=gha,mode=max
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ default-members = [
]

[workspace.package]
version = "1.0.5"
version = "1.1.0"
edition = "2021"
authors = ["Ivan Kudriavtsev <[email protected]>"]
description = "Savant Rust core functions library"
Expand Down
92 changes: 14 additions & 78 deletions docs/source/services/replay/2_installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,84 +70,8 @@ Configuration File

The configuration file is a JSON file that contains the following parameters:

.. code-block:: json

{
"common": {
"pass_metadata_only": false,
"management_port": 8080,
"stats_period": {
"secs": 60,
"nanos": 0
},
"job_writer_cache_max_capacity": 1000,
"job_writer_cache_ttl": {
"secs": 60,
"nanos": 0
},
"job_eviction_ttl": {
"secs": 60,
"nanos": 0
},
"default_job_sink_options": {
"send_timeout": {
"secs": 1,
"nanos": 0
},
"send_retries": 3,
"receive_timeout": {
"secs": 1,
"nanos": 0
},
"receive_retries": 3,
"send_hwm": 1000,
"receive_hwm": 100,
"inflight_ops": 100
}
},
"in_stream": {
"url": "router+bind:tcp://0.0.0.0:5555",
"options": {
"receive_timeout": {
"secs": 1,
"nanos": 0
},
"receive_hwm": 1000,
"topic_prefix_spec": {
"none": null
},
"source_cache_size": 1000,
"inflight_ops": 100
}
},
"out_stream": {
"url": "pub+bind:tcp://0.0.0.0:5556",
"options": {
"send_timeout": {
"secs": 1,
"nanos": 0
},
"send_retries": 3,
"receive_timeout": {
"secs": 1,
"nanos": 0
},
"receive_retries": 3,
"send_hwm": 1000,
"receive_hwm": 100,
"inflight_ops": 100
}
},
"storage": {
"rocksdb": {
"path": "${DB_PATH:-/tmp/rocksdb}",
"data_expiration_ttl": {
"secs": 60,
"nanos": 0
}
}
}
}
.. literalinclude:: ../../../../services/replay/replay/assets/test.json
:language: json

The above-mentioned configuration file is used by default, when you launch Replay without specifying the configuration file. You can override the default configuration by providing your own configuration file and specifying it in the launch command:

Expand Down Expand Up @@ -197,6 +121,18 @@ Configuration Parameters
- Default sink options to be applied to jobs if they don't specify their own options. If not set, jobs must provide their own sink options.
- ``null``
- See ``out_stream.options`` format.
* - ``common.telemetry_config_file``
- The path to a file containing telemetry configuration. When set, the service loads telemetry settings from this file.
- ``null``
- ``"/opt/telemetry_config.json"``
* - ``common.stats_frame_period``
- Defines how frequently the service should report statistics based on the number of frames processed. When set, statistics are logged after processing the specified number of frames.
- ``null``
- ``1000``
* - ``common.stats_timestamp_period``
- Defines how frequently the service should report statistics based on elapsed time. Controls the time interval between statistics reports.
- ``null``
- ``{"secs": 10, "nanos": 0}``
* - ``in_stream.url``
- The URL for the data ingress in Savant ZMQ format.
- ``router+bind:tcp://0.0.0.0:5555``
Expand Down
2 changes: 1 addition & 1 deletion savant_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub fn fast_hash(bytes: &[u8]) -> u32 {

#[inline]
pub fn get_tracer() -> BoxedTracer {
global::tracer("video_pipeline")
global::tracer("savant-tracer")
}

pub mod rust {
Expand Down
4 changes: 2 additions & 2 deletions savant_core/src/metrics/pipeline_metric_builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::metrics::{get_or_create_counter_family, get_or_create_gauge_family};
use crate::pipeline::get_registered_pipelines;
use crate::rust::FrameProcessingStatRecordType;
use crate::webserver::get_registered_pipelines;
use log::debug;

#[derive(Debug)]
Expand Down Expand Up @@ -29,7 +29,7 @@ impl PipelineMetricBuilder {
let stage_latency_label_names =
["record_type", "destination_stage_name", "source_stage_name"].as_slice();

let registered_pipelines = get_registered_pipelines().await;
let registered_pipelines = get_registered_pipelines();
debug!(
"Found {} registered pipeline(s)",
registered_pipelines.len()
Expand Down
72 changes: 65 additions & 7 deletions savant_core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ use std::time::SystemTime;

use anyhow::Result;
use hashbrown::HashMap;
use log::debug;
use log::error;
use log::info;
use opentelemetry::Context;
use parking_lot::Mutex;

pub use implementation::PipelineConfiguration;
pub use implementation::PipelineConfigurationBuilder;
Expand All @@ -15,10 +19,57 @@ use crate::primitives::frame::VideoFrameProxy;
use crate::primitives::frame_batch::VideoFrameBatch;
use crate::primitives::frame_update::VideoFrameUpdate;
use crate::primitives::object::BorrowedVideoObject;
use crate::webserver::{register_pipeline, unregister_pipeline};
use lazy_static::lazy_static;

const MAX_TRACKED_STREAMS: usize = 8192; // defines how many streams are tracked for the frame ordering

//

// pipelines: Arc<Mutex<HashMap<String, Arc<implementation::Pipeline>>>>,

lazy_static! {
static ref PIPELINES: Arc<Mutex<HashMap<String, Arc<implementation::Pipeline>>>> =
Arc::new(Mutex::new(HashMap::new()));
}

pub(crate) fn register_pipeline(pipeline: Arc<implementation::Pipeline>) {
let pipelines = PIPELINES.clone();
let mut bind = pipelines.lock();
let name = pipeline.get_name();
let entry = bind.get(&name);
if entry.is_some() {
let message = format!("Pipeline with name {} already exists in registry.", &name);
error!("{}", message);
panic!("{}", message);
}
bind.insert(name.clone(), pipeline.clone());
info!("Pipeline {} registered.", name);
}

pub(crate) fn unregister_pipeline(pipeline: Arc<implementation::Pipeline>) {
let stats = PIPELINES.clone();
let pipeline_name = pipeline.get_name();
let mut bind = stats.lock();
let prev_len = bind.len();
debug!("Removing pipeline {} from stats.", &pipeline_name);
bind.remove(&pipeline_name);
if bind.len() == prev_len {
let message = format!("Failed to remove pipeline {} from stats.", &pipeline_name);
error!("{}", message);
panic!("{}", message);
}
}

pub(crate) fn get_registered_pipelines() -> HashMap<String, Arc<implementation::Pipeline>> {
let s = PIPELINES.lock();
s.clone()
}

pub fn get_pipeline(name: &str) -> Option<Arc<implementation::Pipeline>> {
let s = PIPELINES.lock();
s.get(name).cloned()
}

pub mod stage;
pub mod stage_function_loader;
pub mod stage_plugin_sample;
Expand Down Expand Up @@ -293,6 +344,7 @@ pub(super) mod implementation {
id_counter: AtomicI64,
frame_counter: AtomicI64,
root_spans: SavantRwLock<HashMap<i64, Context>>,
outer_spans: SavantRwLock<HashMap<i64, Context>>,
stages: Vec<PipelineStage>,
frame_locations: SavantRwLock<HashMap<i64, usize>>,
frame_ordering: SavantRwLock<LruCache<String, i64>>,
Expand All @@ -311,6 +363,7 @@ pub(super) mod implementation {
id_counter: AtomicI64::new(0),
frame_counter: AtomicI64::new(0),
root_spans: SavantRwLock::new(HashMap::new()),
outer_spans: SavantRwLock::new(HashMap::new()),
stages: Vec::new(),
frame_locations: SavantRwLock::new(HashMap::new()),
frame_ordering: SavantRwLock::new(LruCache::new(
Expand Down Expand Up @@ -588,6 +641,9 @@ pub(super) mod implementation {
.write()
.insert(id_counter, Context::current_with_span(span));
}

self.outer_spans.write().insert(id_counter, parent_ctx);

let source_id_compatibility_hash = frame.stream_compatibility_hash();
let mut ordering = self.frame_ordering.write();
let prev_ordering_seq = ordering.get(&source_id);
Expand Down Expand Up @@ -676,17 +732,18 @@ pub(super) mod implementation {
bail!("Object {} is not found in the stage {}", id, stage.name)
}

let mut bind = self.root_spans.write();
// let mut bind = self.root_spans.write();
match removed.unwrap() {
PipelinePayload::Frame(frame, _, ctx, _, _) => {
self.stats.register_frame(frame.get_object_count());
self.add_frame_json(&frame, &ctx);
ctx.span().end();
let root_ctx = bind.remove(&id).unwrap();
Ok(HashMap::from([(id, root_ctx)]))
self.root_spans.write().remove(&id).unwrap();
let outer_ctx = self.outer_spans.write().remove(&id).unwrap();
Ok(HashMap::from([(id, outer_ctx)]))
}
PipelinePayload::Batch(batch, _, contexts, _, _) => Ok({
let mut bind = self.root_spans.write();
//let mut bind = self.root_spans.write();
contexts
.into_iter()
.map(|(frame_id, ctx)| {
Expand All @@ -703,8 +760,9 @@ pub(super) mod implementation {
)
}
ctx.span().end();
let root_ctx = bind.remove(&id).unwrap();
Ok((id, root_ctx))
self.root_spans.write().remove(&id).unwrap();
let outer_ctx = self.outer_spans.write().remove(&id).unwrap();
Ok((id, outer_ctx))
})
.collect::<Result<HashMap<_, _>, _>>()?
}),
Expand Down
11 changes: 8 additions & 3 deletions savant_core/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::get_or_init_async_runtime;
use log::error;
use opentelemetry::global;
use opentelemetry_jaeger_propagator::Propagator;
Expand All @@ -14,6 +13,8 @@ use std::fs;
use std::time::Duration;
use twelf::{config, Layer};

use crate::get_or_init_async_runtime;

#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum ContextPropagationFormat {
#[serde(rename = "jaeger")]
Expand Down Expand Up @@ -251,8 +252,12 @@ pub fn init(config: &TelemetryConfiguration) {
match configurator.get() {
Some(_) => panic!("Open Telemetry has been configured"),
None => {
let runtime = get_or_init_async_runtime();
let c = runtime.block_on(async { Configurator::new("savant", config) });
let c = if tokio::runtime::Handle::try_current().is_ok() {
Configurator::new("savant", config)
} else {
let rt = get_or_init_async_runtime();
rt.block_on(async { Configurator::new("savant", config) })
};
let result = configurator.set(c);
if result.is_err() {
// should not happen
Expand Down
Loading