Skip to content

Commit 06293cc

Browse files
feat: add querier json (#1288)
This PR adds the basic framework required to support multiple query nodes on the Enterprise version. This PR doesn't change any behavior for OSS mode. --------- Signed-off-by: Nikhil Sinha <[email protected]>
1 parent 0140c58 commit 06293cc

28 files changed

+933
-807
lines changed

src/analytics.rs

+47-6
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use crate::{
3636
http::{
3737
base_path_without_preceding_slash,
3838
cluster::{self, utils::check_liveness},
39+
modal::{NodeMetadata, NodeType},
3940
},
4041
STREAM_NAME_HEADER_KEY,
4142
},
@@ -74,6 +75,10 @@ pub struct Report {
7475
commit_hash: String,
7576
active_ingestors: u64,
7677
inactive_ingestors: u64,
78+
active_indexers: u64,
79+
inactive_indexers: u64,
80+
active_queriers: u64,
81+
inactive_queriers: u64,
7782
stream_count: usize,
7883
total_events_count: u64,
7984
total_json_bytes: u64,
@@ -106,7 +111,32 @@ impl Report {
106111
mem_total = info.total_memory();
107112
}
108113
let ingestor_metrics = fetch_ingestors_metrics().await?;
114+
let mut active_indexers = 0;
115+
let mut inactive_indexers = 0;
116+
let mut active_queriers = 0;
117+
let mut inactive_queriers = 0;
118+
119+
// check liveness of indexers
120+
// get the count of active and inactive indexers
121+
let indexer_infos: Vec<NodeMetadata> = cluster::get_node_info(NodeType::Indexer).await?;
122+
for indexer in indexer_infos {
123+
if check_liveness(&indexer.domain_name).await {
124+
active_indexers += 1;
125+
} else {
126+
inactive_indexers += 1;
127+
}
128+
}
109129

130+
// check liveness of queriers
131+
// get the count of active and inactive queriers
132+
let query_infos: Vec<NodeMetadata> = cluster::get_node_info(NodeType::Querier).await?;
133+
for query in query_infos {
134+
if check_liveness(&query.domain_name).await {
135+
active_queriers += 1;
136+
} else {
137+
inactive_queriers += 1;
138+
}
139+
}
110140
Ok(Self {
111141
deployment_id: storage::StorageMetadata::global().deployment_id,
112142
uptime: upt,
@@ -122,6 +152,10 @@ impl Report {
122152
commit_hash: current().commit_hash,
123153
active_ingestors: ingestor_metrics.0,
124154
inactive_ingestors: ingestor_metrics.1,
155+
active_indexers,
156+
inactive_indexers,
157+
active_queriers,
158+
inactive_queriers,
125159
stream_count: ingestor_metrics.2,
126160
total_events_count: ingestor_metrics.3,
127161
total_json_bytes: ingestor_metrics.4,
@@ -224,11 +258,14 @@ async fn fetch_ingestors_metrics(
224258
let mut vec = vec![];
225259
let mut active_ingestors = 0u64;
226260
let mut offline_ingestors = 0u64;
227-
if PARSEABLE.options.mode == Mode::Query {
261+
262+
// for OSS, Query mode fetches the analytics report
263+
// for Enterprise, Prism mode fetches the analytics report
264+
if PARSEABLE.options.mode == Mode::Query || PARSEABLE.options.mode == Mode::Prism {
228265
// send analytics for ingest servers
229266

230267
// ingestor infos should be valid here, if not some thing is wrong
231-
let ingestor_infos = cluster::get_ingestor_info().await.unwrap();
268+
let ingestor_infos: Vec<NodeMetadata> = cluster::get_node_info(NodeType::Ingestor).await?;
232269

233270
for im in ingestor_infos {
234271
if !check_liveness(&im.domain_name).await {
@@ -250,10 +287,14 @@ async fn fetch_ingestors_metrics(
250287
.send()
251288
.await
252289
.expect("should respond");
253-
254-
let data = serde_json::from_slice::<NodeMetrics>(&resp.bytes().await?)?;
255-
vec.push(data);
256-
active_ingestors += 1;
290+
// check if the response is valid
291+
if let Ok(data) = serde_json::from_slice::<NodeMetrics>(&resp.bytes().await?) {
292+
active_ingestors += 1;
293+
vec.push(data);
294+
} else {
295+
offline_ingestors += 1;
296+
continue;
297+
}
257298
}
258299

259300
node_metrics.accumulate(&mut vec);

src/catalog/mod.rs

+16-6
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,13 @@ use tracing::{error, info};
2828

2929
use crate::{
3030
event::DEFAULT_TIMESTAMP_KEY,
31-
handlers::{self, http::base_path_without_preceding_slash},
31+
handlers::{
32+
self,
33+
http::{
34+
base_path_without_preceding_slash,
35+
modal::{NodeMetadata, NodeType},
36+
},
37+
},
3238
metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE},
3339
option::Mode,
3440
parseable::PARSEABLE,
@@ -335,15 +341,19 @@ pub async fn remove_manifest_from_snapshot(
335341
meta.first_event_at = None;
336342
storage.put_snapshot(stream_name, meta.snapshot).await?;
337343
}
344+
345+
// retention is initiated from the querier
346+
// request is forwarded to all ingestors to clean up their manifests
347+
// no action required for the Index or Prism nodes
338348
match PARSEABLE.options.mode {
339349
Mode::All | Mode::Ingest => {
340350
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
341351
}
342352
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
343-
Mode::Index => Err(ObjectStorageError::UnhandledError(Box::new(
353+
Mode::Index | Mode::Prism => Err(ObjectStorageError::UnhandledError(Box::new(
344354
std::io::Error::new(
345355
std::io::ErrorKind::Unsupported,
346-
"Can't remove manifest from within Index server",
356+
"Can't remove manifest from within Index or Prism server",
347357
),
348358
))),
349359
}
@@ -356,7 +366,6 @@ pub async fn get_first_event(
356366
) -> Result<Option<String>, ObjectStorageError> {
357367
let mut first_event_at: String = String::default();
358368
match PARSEABLE.options.mode {
359-
Mode::Index => unimplemented!(),
360369
Mode::All | Mode::Ingest => {
361370
// get current snapshot
362371
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();
@@ -406,8 +415,8 @@ pub async fn get_first_event(
406415
}
407416
}
408417
Mode::Query => {
409-
let ingestor_metadata =
410-
handlers::http::cluster::get_ingestor_info()
418+
let ingestor_metadata: Vec<NodeMetadata> =
419+
handlers::http::cluster::get_node_info(NodeType::Ingestor)
411420
.await
412421
.map_err(|err| {
413422
error!("Fatal: failed to get ingestor info: {:?}", err);
@@ -437,6 +446,7 @@ pub async fn get_first_event(
437446
}
438447
first_event_at = ingestors_first_event_at.iter().min().unwrap().to_string();
439448
}
449+
_ => {}
440450
}
441451

442452
Ok(Some(first_event_at))

src/cli.rs

+75-62
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,14 @@ pub struct Options {
328328
)]
329329
pub indexer_endpoint: String,
330330

331+
#[arg(
332+
long,
333+
env = "P_QUERIER_ENDPOINT",
334+
default_value = "",
335+
help = "URL to connect to this specific querier. Default is the address of the server"
336+
)]
337+
pub querier_endpoint: String,
338+
331339
#[command(flatten)]
332340
pub oidc: Option<OidcConfig>,
333341

@@ -439,83 +447,88 @@ impl Options {
439447
}
440448
}
441449

442-
/// TODO: refactor and document
450+
/// get the address of the server
451+
/// based on the mode
443452
pub fn get_url(&self, mode: Mode) -> Url {
444-
let (endpoint, env_var) = match mode {
445-
Mode::Ingest => {
446-
if self.ingestor_endpoint.is_empty() {
447-
return format!(
448-
"{}://{}",
449-
self.get_scheme(),
450-
self.address
451-
)
452-
.parse::<Url>() // if the value was improperly set, this will panic before hand
453-
.unwrap_or_else(|err| {
454-
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
455-
});
456-
}
457-
(&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT")
458-
}
459-
Mode::Index => {
460-
if self.indexer_endpoint.is_empty() {
461-
return format!(
462-
"{}://{}",
463-
self.get_scheme(),
464-
self.address
465-
)
466-
.parse::<Url>() // if the value was improperly set, this will panic before hand
467-
.unwrap_or_else(|err| {
468-
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
469-
});
470-
}
471-
(&self.indexer_endpoint, "P_INDEXER_ENDPOINT")
472-
}
473-
_ => panic!("Invalid mode"),
453+
let endpoint = match mode {
454+
Mode::Ingest => self.get_endpoint(&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT"),
455+
Mode::Index => self.get_endpoint(&self.indexer_endpoint, "P_INDEXER_ENDPOINT"),
456+
Mode::Query => self.get_endpoint(&self.querier_endpoint, "P_QUERIER_ENDPOINT"),
457+
_ => return self.build_url(&self.address),
474458
};
475459

476-
if endpoint.starts_with("http") {
477-
panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint);
478-
}
479-
480-
let addr_from_env = endpoint.split(':').collect::<Vec<&str>>();
460+
self.parse_endpoint(&endpoint)
461+
}
481462

482-
if addr_from_env.len() != 2 {
483-
panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint);
463+
/// get the endpoint for the server
464+
/// if env var is empty, use the address, else use the env var
465+
fn get_endpoint(&self, endpoint: &str, env_var: &str) -> String {
466+
if endpoint.is_empty() {
467+
self.address.to_string()
468+
} else {
469+
if endpoint.starts_with("http") {
470+
panic!(
471+
"Invalid value `{}`, please set the environment variable `{}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.",
472+
endpoint, env_var
473+
);
474+
}
475+
endpoint.to_string()
484476
}
477+
}
485478

486-
let mut hostname = addr_from_env[0].to_string();
487-
let mut port = addr_from_env[1].to_string();
479+
/// parse the endpoint to get the address and port
480+
/// if the address is an env var, resolve it
481+
/// if the port is an env var, resolve it
482+
fn parse_endpoint(&self, endpoint: &str) -> Url {
483+
let addr_parts: Vec<&str> = endpoint.split(':').collect();
484+
485+
if addr_parts.len() != 2 {
486+
panic!(
487+
"Invalid value `{}`, please set the environment variable to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.",
488+
endpoint
489+
);
490+
}
488491

489-
// if the env var value fits the pattern $VAR_NAME:$VAR_NAME
490-
// fetch the value from the specified env vars
491-
if hostname.starts_with('$') {
492-
let var_hostname = hostname[1..].to_string();
493-
hostname = env::var(&var_hostname).unwrap_or_default();
492+
let hostname = self.resolve_env_var(addr_parts[0]);
493+
let port = self.resolve_env_var(addr_parts[1]);
494494

495-
if hostname.is_empty() {
496-
panic!("The environement variable `{}` is not set, please set as <ip address / DNS> without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", var_hostname);
497-
}
498-
if hostname.starts_with("http") {
499-
panic!("Invalid value `{}`, please set the environement variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname);
500-
} else {
501-
hostname = format!("{}://{}", self.get_scheme(), hostname);
502-
}
503-
}
495+
self.build_url(&format!("{}:{}", hostname, port))
496+
}
504497

505-
if port.starts_with('$') {
506-
let var_port = port[1..].to_string();
507-
port = env::var(&var_port).unwrap_or_default();
498+
/// resolve the env var
499+
/// if the env var is not set, panic
500+
/// if the env var is set, return the value
501+
fn resolve_env_var(&self, value: &str) -> String {
502+
if let Some(env_var) = value.strip_prefix('$') {
503+
let resolved_value = env::var(env_var).unwrap_or_else(|_| {
504+
panic!(
505+
"The environment variable `{}` is not set. Please set it to a valid value. Refer to the documentation: https://logg.ing/env for more details.",
506+
env_var
507+
);
508+
});
508509

509-
if port.is_empty() {
510+
if resolved_value.starts_with("http") {
510511
panic!(
511-
"Port is not set in the environement variable `{}`. Please refer to the documentation: https://logg.ing/env for more details.",
512-
var_port
512+
"Invalid value `{}`, please set the environment variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.",
513+
resolved_value, env_var
513514
);
514515
}
516+
517+
resolved_value
518+
} else {
519+
value.to_string()
515520
}
521+
}
516522

517-
format!("{}://{}:{}", self.get_scheme(), hostname, port)
523+
/// build the url from the address
524+
fn build_url(&self, address: &str) -> Url {
525+
format!("{}://{}", self.get_scheme(), address)
518526
.parse::<Url>()
519-
.expect("Valid URL")
527+
.unwrap_or_else(|err| {
528+
panic!(
529+
"{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.",
530+
address
531+
);
532+
})
520533
}
521534
}

src/handlers/airplane.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ use futures_util::{Future, TryFutureExt};
3333
use tonic::transport::{Identity, Server, ServerTlsConfig};
3434
use tonic_web::GrpcWebLayer;
3535

36-
use crate::handlers::http::cluster::get_ingestor_info;
36+
use crate::handlers::http::cluster::get_node_info;
37+
use crate::handlers::http::modal::{NodeMetadata, NodeType};
3738
use crate::handlers::http::query::{into_query, update_schema_when_distributed};
3839
use crate::handlers::livetail::cross_origin_config;
3940
use crate::metrics::QUERY_EXECUTE_TIME;
@@ -179,7 +180,7 @@ impl FlightService for AirServiceImpl {
179180
})
180181
.to_string();
181182

182-
let ingester_metadatas = get_ingestor_info()
183+
let ingester_metadatas: Vec<NodeMetadata> = get_node_info(NodeType::Ingestor)
183184
.await
184185
.map_err(|err| Status::failed_precondition(err.to_string()))?;
185186
let mut minute_result: Vec<RecordBatch> = vec![];

src/handlers/http/about.rs

+1-6
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use serde_json::{json, Value};
2121

2222
use crate::{
2323
about::{self, get_latest_release},
24-
option::Mode,
2524
parseable::PARSEABLE,
2625
storage::StorageMetadata,
2726
};
@@ -63,11 +62,7 @@ pub async fn about() -> Json<Value> {
6362
let commit = current_release.commit_hash;
6463
let deployment_id = meta.deployment_id.to_string();
6564
let mode = PARSEABLE.get_server_mode_string();
66-
let staging = if PARSEABLE.options.mode == Mode::Query {
67-
"".to_string()
68-
} else {
69-
PARSEABLE.options.staging_dir().display().to_string()
70-
};
65+
let staging = PARSEABLE.options.staging_dir().display().to_string();
7166
let grpc_port = PARSEABLE.options.grpc_port;
7267

7368
let store_endpoint = PARSEABLE.storage.get_endpoint();

0 commit comments

Comments
 (0)