Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cat-gateway): Correct Service Health logic #1974

Open
wants to merge 54 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
3e87af3
wip(rust/signed_doc): add atomic variables for live check, update end…
saibatizoku Feb 20, 2025
4b1a6aa
wip(rust/signed_doc): reset live counter
saibatizoku Feb 20, 2025
4e0d843
fix(cat-gateway): typo
saibatizoku Feb 20, 2025
03d7e7d
fix(cat-gateway): use timestamp for LIVE_COUNTER, update logic
saibatizoku Feb 25, 2025
87b4a2b
fix(cat-gateway): refactor code into health utilities
saibatizoku Feb 25, 2025
2933466
fix(cat-gateway): fix panic catcher to disable live service flag
saibatizoku Feb 25, 2025
0c6a28b
fix(cat-gateway): remove unused atomic-counter crate
saibatizoku Feb 25, 2025
c69eb77
fix(cat-gateway): refactor health::live utilities into proper module
saibatizoku Feb 25, 2025
e8196c4
fix(cat-gateway): restore live counter logic, set env var for threshold
saibatizoku Feb 25, 2025
92e20c2
fix(cat-gateway): code format
saibatizoku Feb 25, 2025
090a503
fix(cat-gateway): return service unavailable with proper error
saibatizoku Feb 26, 2025
5d4b5d9
fix(cat-gateway): refactor health::start utilities into proper module
saibatizoku Feb 26, 2025
7ddb1bc
feat(cat-gateway): add service::utilities::health::ready module
saibatizoku Feb 25, 2025
8b848ee
wip(rust/signed_doc): add atomic variables for live check, update end…
saibatizoku Feb 20, 2025
e24d875
wip(rust/signed_doc): reset live counter
saibatizoku Feb 20, 2025
9c6468e
fix(cat-gateway): typo
saibatizoku Feb 20, 2025
adb2cff
fix(cat-gateway): use timestamp for LIVE_COUNTER, update logic
saibatizoku Feb 25, 2025
fa0a94b
fix(cat-gateway): refactor code into health utilities
saibatizoku Feb 25, 2025
f7650f3
fix(cat-gateway): fix panic catcher to disable live service flag
saibatizoku Feb 25, 2025
400d04f
fix(cat-gateway): remove unused atomic-counter crate
saibatizoku Feb 25, 2025
d7890dd
fix(cat-gateway): refactor health::live utilities into proper module
saibatizoku Feb 25, 2025
7da190d
fix(cat-gateway): restore live counter logic, set env var for threshold
saibatizoku Feb 25, 2025
2fe0c92
fix(cat-gateway): code format
saibatizoku Feb 25, 2025
bfe01f9
fix(cat-gateway): return service unavailable with proper error
saibatizoku Feb 26, 2025
f006db3
Merge branch 'fix/service-liveness-check' into fix/service-started-check
saibatizoku Feb 26, 2025
5000831
Merge branch 'fix/service-started-check' into fix/service-readiness-c…
saibatizoku Feb 26, 2025
3482a78
fix(cat-gateway): add atomic vars to health::started to keep track an…
saibatizoku Feb 26, 2025
a8d0508
fix(cat-gateway): add atomic var to health::started to keep track of …
saibatizoku Feb 27, 2025
7284980
Merge remote-tracking branch 'origin/main' into fix/service-started-c…
saibatizoku Feb 27, 2025
759df0a
fix(cat-gateway): implement logic for health::started flags
saibatizoku Feb 27, 2025
0bbde73
Merge remote-tracking branch 'origin/main' into fix/service-started-c…
saibatizoku Feb 27, 2025
8412c25
Merge branch 'fix/service-started-check' into fix/service-readiness-c…
saibatizoku Feb 27, 2025
2832b38
Merge remote-tracking branch 'origin/main' into fix/service-started-c…
saibatizoku Mar 4, 2025
8d1b0d7
Merge branch 'fix/service-started-check' into fix/service-readiness-c…
saibatizoku Mar 4, 2025
b9f33a9
Merge remote-tracking branch 'origin/main' into fix/health-probe-logic
saibatizoku Mar 6, 2025
82b0760
fix(cat-gateway): add middleware to check DB connections
saibatizoku Mar 6, 2025
8c970c5
fix(cat-gateway): attempt DB reconnect if health/ready check fails
saibatizoku Mar 6, 2025
30d6172
fix(cat-gateway): implement logic for health::ready endpoint
saibatizoku Mar 7, 2025
3bf2daf
Merge remote-tracking branch 'origin/main' into fix/health-probe-logic
saibatizoku Mar 7, 2025
7b81d79
fix(cat-gateway): handle DB errors at endpoints
saibatizoku Mar 7, 2025
e2b11bd
Merge remote-tracking branch 'origin/main' into fix/health-probe-logic
saibatizoku Mar 7, 2025
de54387
Merge branch 'main' into fix/health-probe-logic
saibatizoku Mar 17, 2025
bc1b9a2
chore(cat-gateway): fix doc comments
saibatizoku Mar 18, 2025
e089bdc
fix(cat-gateway): add suggested fixes
saibatizoku Mar 18, 2025
9fd1958
Merge branch 'main' into fix/health-probe-logic
saibatizoku Mar 18, 2025
e8647ac
fix(cat-gateway): simplify boolean logic, correctly set flag when fol…
saibatizoku Mar 18, 2025
d76bd59
Merge remote-tracking branch 'origin/main' into fix/health-probe-logic
saibatizoku Mar 18, 2025
33b8b11
fix(cat-gateway): add suggested fix
saibatizoku Mar 18, 2025
d759cbe
chore(docs): fix doc comment
saibatizoku Mar 18, 2025
f7e3f9d
Merge branch 'main' into fix/health-probe-logic
saibatizoku Mar 19, 2025
f583340
fix(cat-gateway): update doc comments for health endpoints
saibatizoku Mar 20, 2025
dd4d1c7
Merge branch 'main' into fix/health-probe-logic
saibatizoku Mar 24, 2025
6402dd9
fix(cat-gateway): add suggested fix to doc comments
saibatizoku Mar 24, 2025
af2b4d5
Merge remote-tracking branch 'origin/main' into fix/health-probe-logic
saibatizoku Mar 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions catalyst-gateway/bin/src/cardano/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ use crate::{
},
session::CassandraSession,
},
service::utilities::health::{
follower_has_first_reached_tip, set_follower_first_reached_tip, set_index_db_liveness,
},
settings::{chain_follower, Settings},
};

Expand All @@ -33,6 +36,20 @@ pub(crate) mod util;
/// How long we wait between checks for connection to the indexing DB to be ready.
const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);

/// Wait for the Cassandra Indexing DB to be ready before continuing.
///
/// Returns boolean that is `true` if connection to the Indexing DB is `OK`.
///
/// NOTE: This function updates the Indexing DB liveness variables.
pub async fn index_db_is_ready() -> bool {
let is_ready = CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true)
.await
.is_ok();
// Set the Indexing DB service liveness flag
set_index_db_liveness(is_ready);
is_ready
}

/// Start syncing a particular network
async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
let chain = cfg.chain;
Expand Down Expand Up @@ -246,7 +263,9 @@ fn sync_subchain(
params.backoff().await;

// Wait for indexing DB to be ready before continuing.
drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
if !index_db_is_ready().await {
error!(chain=%params.chain, params=%params,"Indexing DB connection failed");
}
info!(chain=%params.chain, params=%params,"Indexing DB is ready");

let mut first_indexed_block = params.first_indexed_block.clone();
Expand Down Expand Up @@ -435,6 +454,12 @@ impl SyncTask {
/// Primary Chain Follower task.
///
/// This continuously runs in the background, and never terminates.
///
/// Sets the Index DB liveness flag to true if it is not already set.
///
/// Sets the Chain Follower Has First Reached Tip flag to true if it is not already
/// set.
#[allow(clippy::too_many_lines)]
async fn run(&mut self) {
// We can't sync until the local chain data is synced.
// This call will wait until we sync.
Expand All @@ -453,7 +478,12 @@ impl SyncTask {
// Wait for indexing DB to be ready before continuing.
// We do this after the above, because other nodes may have finished already, and we don't
// want to wait do any work they already completed while we were fetching the blockchain.
drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
//
// After waiting, we set the liveness flag to true if it is not already set.
if !index_db_is_ready().await {
error!(chain=%self.cfg.chain, "Indexing DB connection failed");
}

info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state");
self.sync_status = get_sync_status().await;
debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status);
Expand Down Expand Up @@ -501,6 +531,11 @@ impl SyncTask {
);

self.start_immutable_followers();

// Update flag if this is the first time reaching TIP.
if !follower_has_first_reached_tip() {
set_follower_first_reached_tip();
}
} else {
error!(chain=%self.cfg.chain, report=%finished,
"The TIP follower failed, restarting it.");
Expand Down
25 changes: 20 additions & 5 deletions catalyst-gateway/bin/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! CLI interpreter for the service
use std::{io::Write, path::PathBuf};
use std::{io::Write, path::PathBuf, time::Duration};

use clap::Parser;
use tracing::{error, info};
Expand All @@ -8,8 +8,10 @@ use crate::{
cardano::start_followers,
db::{self, index::session::CassandraSession},
service::{
self, started,
utilities::health::{is_live, live_counter_reset},
self,
utilities::health::{
condition_for_started, is_live, live_counter_reset, service_has_started, set_to_started,
},
},
settings::{ServiceSettings, Settings},
};
Expand Down Expand Up @@ -49,13 +51,15 @@ impl Cli {

info!("Catalyst Gateway - Starting");

// Start the DB's
// Start the DB's.
CassandraSession::init();

db::event::establish_connection();

// Start the chain indexing follower.
start_followers().await?;

// Start the API service.
let handle = tokio::spawn(async move {
match service::run().await {
Ok(()) => info!("Endpoints started ok"),
Expand All @@ -66,6 +70,7 @@ impl Cli {
});
tasks.push(handle);

// Start task to reset the service 'live counter' at a regular interval.
let handle = tokio::spawn(async move {
while is_live() {
tokio::time::sleep(Settings::service_live_timeout_interval()).await;
Expand All @@ -74,8 +79,18 @@ impl Cli {
});
tasks.push(handle);

started();
// Start task to wait for the service 'started' flag to be `true`.
let handle = tokio::spawn(async move {
while !service_has_started() {
tokio::time::sleep(Duration::from_secs(1)).await;
if condition_for_started() {
set_to_started();
}
}
});
tasks.push(handle);

// Run all asynchronous tasks to completion.
for task in tasks {
task.await?;
}
Expand Down
11 changes: 10 additions & 1 deletion catalyst-gateway/bin/src/db/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use futures::{Stream, StreamExt, TryStreamExt};
use tokio_postgres::{types::ToSql, NoTls, Row};
use tracing::{debug, debug_span, error, Instrument};

use crate::settings::Settings;
use crate::{service::utilities::health::set_event_db_liveness, settings::Settings};

pub(crate) mod common;
pub(crate) mod config;
Expand Down Expand Up @@ -160,6 +160,11 @@ impl EventDB {
Ok(())
}

/// Checks that connection to `EventDB` is available.
pub(crate) fn connection_is_ok() -> bool {
EVENT_DB_POOL.get().is_some()
}

/// Prepend `EXPLAIN ANALYZE` to the query, and rollback the transaction.
async fn explain_analyze_rollback(
stmt: &str, params: &[&(dyn ToSql + Sync)],
Expand Down Expand Up @@ -239,6 +244,8 @@ impl EventDB {
///
/// The env var "`DATABASE_URL`" can be set directly as an anv var, or in a
/// `.env` file.
///
/// If connection to the pool is `OK`, the `LIVE_EVENT_DB` atomic flag is set to `true`.
pub fn establish_connection() {
let (url, user, pass) = Settings::event_db_settings();

Expand All @@ -261,5 +268,7 @@ pub fn establish_connection() {

if EVENT_DB_POOL.set(Arc::new(pool)).is_err() {
error!("Failed to set event db pool. Called Twice?");
} else {
set_event_db_liveness(true);
}
}
62 changes: 61 additions & 1 deletion catalyst-gateway/bin/src/service/api/health/live_get.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,64 @@
//! Implementation of the `GET /health/live` endpoint.
//! # Implementation of the `GET /health/live` endpoint.
//!
//! This module provides an HTTP endpoint to monitor the liveness of the API service using
//! a simple counter mechanism. It uses an atomic boolean named `IS_LIVE` to track whether
//! the service is operational. The `IS_LIVE` boolean is initially set to `true`.
//!
//! ## Key Features
//!
//! 1. **Atomic Counter**: The endpoint maintains an atomic counter that increments
//! every time the endpoint is accessed. This counter helps track the number of
//! requests made to the endpoint.
//!
//! 2. **Counter Reset**: Every 30 seconds, the counter is automatically reset to zero.
//! This ensures that the count reflects recent activity rather than cumulative usage
//! over a long period.
//!
//! 3. **Threshold Check**: If the counter reaches a predefined threshold (e.g., 100),
//! the `IS_LIVE` boolean is set to `false`. This indicates that the service is no
//! longer operational. Once `IS_LIVE` is set to `false`, it cannot be changed back to
//! `true`.
//!
//! 4. **Response Logic**:
//! - If `IS_LIVE` is `true`, the endpoint returns a `204 No Content` response,
//! indicating that the service is healthy and operational.
//! - If `IS_LIVE` is `false`, the endpoint returns a `503 Service Unavailable`
//! response, indicating that the service is no longer operational.
//!
//! ## How It Works
//!
//! - When the endpoint is called, the atomic counter increments by 1.
//! - Every 30 seconds, the counter is reset to 0 to ensure it only reflects recent
//! activity.
//! - If the counter reaches the threshold (e.g., 100), the `IS_LIVE` boolean is set to
//! `false`.
//! - Once `IS_LIVE` is `false`, the endpoint will always respond with `503 Service
//! Unavailable`.
//! - If `IS_LIVE` is `true`, the endpoint responds with `204 No Content`.
//!
//! ## Example Scenarios
//!
//! 1. **Normal Operation**:
//! - The counter is below the threshold.
//! - `IS_LIVE` remains `true`.
//! - The endpoint returns `204 No Content`.
//!
//! 2. **Threshold Exceeded**:
//! - The counter reaches 100.
//! - `IS_LIVE` is set to `false`.
//! - The endpoint returns `503 Service Unavailable`.
//!
//! 3. **After Threshold Exceeded**:
//! - The counter is reset to 0, but `IS_LIVE` remains `false`.
//! - The endpoint continues to return `503 Service Unavailable`.
//!
//! ## Notes
//!
//! - The `IS_LIVE` boolean is atomic, meaning it is thread-safe and can be accessed
//! concurrently without issues.
//!
//! This endpoint is useful for monitoring service liveness and automatically marking it
//! as unavailable if it becomes overloaded or encounters issues.

use poem_openapi::ApiResponse;

Expand Down
1 change: 0 additions & 1 deletion catalyst-gateway/bin/src/service/api/health/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ mod inspection_get;
mod live_get;
mod ready_get;
mod started_get;
pub(crate) use started_get::started;

/// Health API Endpoints
pub(crate) struct HealthApi;
Expand Down
130 changes: 127 additions & 3 deletions catalyst-gateway/bin/src/service/api/health/ready_get.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,93 @@
//! Implementation of the GET /health/ready endpoint
//! # Implementation of the GET /health/ready endpoint
//!
//! This module provides an HTTP endpoint to monitor the readiness of the service's
//! database connections and attempt to reconnect to any databases that are not currently
//! live. It uses the `LIVE_INDEX_DB` and `LIVE_EVENT_DB` atomic booleans defined in the
//! parent module to track the status of the Index DB and Event DB, respectively.
//!
//! ## Key Features
//!
//! 1. **Reconnection Logic**:
//! - If either `LIVE_INDEX_DB` or `LIVE_EVENT_DB` is `false`, the service will attempt
//! to reconnect to the corresponding database.
//! - If the reconnection attempt is successful, the respective flag (`LIVE_INDEX_DB`
//! or `LIVE_EVENT_DB`) is set to `true`.
//! - If the reconnection attempt fails, the flag remains `false`.
//!
//! 2. **Readiness Check Logic**:
//! - After attempting to reconnect to any non-live databases, the endpoint checks the
//! status of both `LIVE_INDEX_DB` and `LIVE_EVENT_DB`.
//! - If both flags are `true`, the endpoint returns a `204 No Content` response,
//! indicating that all databases are live and the service is healthy.
//! - If either flag is `false`, the endpoint returns a `503 Service Unavailable`
//! response, indicating that at least one database is not live and the service is
//! unhealthy.
//!
//! ## How It Works
//!
//! - When the endpoint is called, it first checks the status of `LIVE_INDEX_DB` and
//! `LIVE_EVENT_DB`.
//! - For any database that is not live (i.e., its flag is `false`), the service attempts
//! to reconnect to that database.
//! - If the reconnection attempt is successful, the corresponding flag is set to `true`.
//! - After attempting to reconnect, the endpoint checks the status of both flags:
//! - If both `LIVE_INDEX_DB` and `LIVE_EVENT_DB` are `true`, the endpoint returns `204
//! No Content`.
//! - If either flag is `false`, the endpoint returns `503 Service Unavailable`.
//!
//! ## Example Scenarios
//!
//! 1. **Both Databases Live**:
//! - `LIVE_INDEX_DB` and `LIVE_EVENT_DB` are both `true`.
//! - No reconnection attempts are made.
//! - The endpoint returns `204 No Content`.
//!
//! 2. **Index DB Not Live, Reconnection Successful**:
//! - `LIVE_INDEX_DB` is `false`, and `LIVE_EVENT_DB` is `true`.
//! - The service attempts to reconnect to the Index DB and succeeds.
//! - `LIVE_INDEX_DB` is set to `true`.
//! - The endpoint returns `204 No Content`.
//!
//! 3. **Event DB Not Live, Reconnection Fails**:
//! - `LIVE_INDEX_DB` is `true`, and `LIVE_EVENT_DB` is `false`.
//! - The service attempts to reconnect to the Event DB but fails.
//! - `LIVE_EVENT_DB` remains `false`.
//! - The endpoint returns `503 Service Unavailable`.
//!
//! 4. **Both Databases Not Live, Reconnection Partially Successful**:
//! - `LIVE_INDEX_DB` and `LIVE_EVENT_DB` are both `false`.
//! - The service attempts to reconnect to both databases.
//! - Reconnection to the Index DB succeeds (`LIVE_INDEX_DB` is set to `true`), but
//! reconnection to the Event DB fails (`LIVE_EVENT_DB` remains `false`).
//! - The endpoint returns `503 Service Unavailable`.
//!
//! ## Notes
//!
//! - The reconnection logic ensures that the service actively attempts to restore
//! connectivity to any non-live databases, improving robustness and reliability.
//! - The atomic booleans (`LIVE_INDEX_DB` and `LIVE_EVENT_DB`) are thread-safe, allowing
//! concurrent access without issues.
//! - This endpoint is useful for monitoring and automatically recovering from transient
//! database connectivity issues, ensuring that the service remains operational whenever
//! possible.
//!
//! This endpoint complements the initialization and readiness monitoring endpoints by
//! providing ongoing connectivity checks and recovery attempts for the service's
//! databases.
use poem_openapi::ApiResponse;
use tracing::error;

use crate::service::common::responses::WithErrorResponses;
use crate::{
cardano::index_db_is_ready,
db::{
event::{establish_connection, EventDB},
index::session::CassandraSession,
},
service::{
common::{responses::WithErrorResponses, types::headers::retry_after::RetryAfterOption},
utilities::health::{event_db_is_live, index_db_is_live, set_event_db_liveness},
},
};

/// Endpoint responses.
#[derive(ApiResponse)]
Expand Down Expand Up @@ -35,5 +121,43 @@ pub(crate) type AllResponses = WithErrorResponses<Responses>;
/// service that are ready.
#[allow(clippy::unused_async)]
pub(crate) async fn endpoint() -> AllResponses {
Responses::NoContent.into()
// Check Event DB connection
let event_db_live = event_db_is_live();

// When check fails, attempt to re-connect
if !event_db_live {
establish_connection();
// Re-check, and update Event DB service liveness flag
set_event_db_liveness(EventDB::connection_is_ok());
};

// Check Index DB connection
let index_db_live = index_db_is_live();

// When check fails, attempt to re-connect
if !index_db_live {
CassandraSession::init();
// Re-check connection to Indexing DB (internally updates the liveness flag)
if !index_db_is_ready().await {
error!("Index DB re-connection failed readiness check");
}
}

let success_response = Responses::NoContent.into();

// Return 204 response if check passed initially.
if index_db_live && event_db_live {
return success_response;
}

// Otherwise, re-check, and return 204 response if all is good.
if index_db_is_live() && event_db_is_live() {
return success_response;
}

// Otherwise, return 503 response.
AllResponses::service_unavailable(
&anyhow::anyhow!("Service is not ready, do not send other requests."),
RetryAfterOption::Default,
)
}
Loading
Loading