Skip to content

feat(logs): Run vector as sidecar container #195

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 44 additions & 49 deletions api/src/k8s_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ impl K8sClient for HttpK8sClient {
"kind": "StatefulSet",
"metadata": {
"name": stateful_set_name,
"namespace": NAMESPACE_NAME,
"namespace": NAMESPACE_NAME
},
"spec": {
"replicas": 1,
Expand All @@ -348,22 +348,33 @@ impl K8sClient for HttpK8sClient {
}
},
"spec": {
"volumes": [
"terminationGracePeriodSeconds": 60,
"initContainers": [
{
"name": REPLICATOR_CONFIG_FILE_VOLUME_NAME,
"configMap": {
"name": replicator_config_map_name
}
},
{
"name": VECTOR_CONFIG_FILE_VOLUME_NAME,
"configMap": {
"name": VECTOR_CONFIG_MAP_NAME
}
},
{
"name": LOGS_VOLUME_NAME,
"emptyDir": {}
"name": vector_container_name,
"image": VECTOR_IMAGE_NAME,
"restartPolicy": "Always",
"env": [
{
"name": "LOGFLARE_API_KEY",
"valueFrom": {
"secretKeyRef": {
"name": LOGFLARE_SECRET_NAME,
"key": "key"
}
}
}
],
"volumeMounts": [
{
"name": VECTOR_CONFIG_FILE_VOLUME_NAME,
"mountPath": "/etc/vector"
},
{
"name": LOGS_VOLUME_NAME,
"mountPath": "/var/log"
}
]
}
],
"containers": [
Expand Down Expand Up @@ -402,42 +413,26 @@ impl K8sClient for HttpK8sClient {
{
"name": LOGS_VOLUME_NAME,
"mountPath": "/app/logs"
},
}
]
}
],
"volumes": [
{
"name": REPLICATOR_CONFIG_FILE_VOLUME_NAME,
"configMap": {
"name": replicator_config_map_name
}
},
{
"name": vector_container_name,
"image": VECTOR_IMAGE_NAME,
"env": [
{
"name": "LOGFLARE_API_KEY",
"valueFrom": {
"secretKeyRef": {
"name": LOGFLARE_SECRET_NAME,
"key": "key"
}
}
}
],
"resources": {
"limits": {
"memory": "200Mi",
},
"requests": {
"memory": "200Mi",
"cpu": "100m"
}
},
"volumeMounts": [
{
"name": VECTOR_CONFIG_FILE_VOLUME_NAME,
"mountPath": "/etc/vector"
},
{
"name": LOGS_VOLUME_NAME,
"mountPath": "/var/log"
}
],
"name": VECTOR_CONFIG_FILE_VOLUME_NAME,
"configMap": {
"name": VECTOR_CONFIG_MAP_NAME
}
},
{
"name": LOGS_VOLUME_NAME,
"emptyDir": {}
}
]
}
Expand Down
18 changes: 11 additions & 7 deletions etl/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,18 +208,20 @@ where
if let Err(err) = apply_worker_result {
errors.push(err);

info!("apply worker completed with an error, shutting down table sync workers");

// TODO: in the future we might build a system based on the `ReactiveFuture` that
// automatically sends a shutdown signal to table sync workers on apply worker failure.
// If there was an error in the apply worker, we want to shut down all table sync
// workers, since without an apply worker they are lost.
if let Err(err) = self.shutdown_tx.shutdown() {
info!(
"shut down signal could not be delivered, likely because no workers are running: {:?}",
err
);
//
// If shutdown fails, it means that all receivers of the shutdown signal were not active
// and in our case receivers are either apply worker or table sync workers. Since the apply
// worker is terminated, we can infer that a failure of sending a message implies that there
// are no active table sync workers.
if self.shutdown_tx.shutdown().is_err() {
info!("there are no table sync workers to shutdown");
}

info!("apply worker completed with an error, shutting down table sync workers");
} else {
info!("apply worker completed successfully");
}
Expand All @@ -242,6 +244,8 @@ where
)));
}

info!("pipeline completed successfully");

Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions etl/src/replication/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ where
// the table sync workers to get stuck if there are no changes in the cdc stream.
if !state.handling_transaction() {
debug!("processing syncing tables after a period of inactivity of {} seconds", REFRESH_INTERVAL.as_secs());

let continue_loop = hook.process_syncing_tables(state.next_status_update.flush_lsn, true).await?;
if !continue_loop {
break Ok(ApplyLoopResult::ApplyStopped);
Expand Down Expand Up @@ -462,6 +463,7 @@ where
"updating lsn for next status update to {}",
last_commit_end_lsn
);

state
.next_status_update
.update_flush_lsn(last_commit_end_lsn);
Expand Down
9 changes: 4 additions & 5 deletions replicator/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use std::fmt;
use tracing::{info, instrument, warn};

pub async fn start_replicator() -> anyhow::Result<()> {
info!("starting replicator service");
info!("starting replicator");

let replicator_config = load_replicator_config()?;

log_config(&replicator_config);
Expand Down Expand Up @@ -68,7 +69,8 @@ pub async fn start_replicator() -> anyhow::Result<()> {
}
}

info!("replicator service completed");
info!("replicator completed successfully");

Ok(())
}

Expand Down Expand Up @@ -177,10 +179,7 @@ where

if let Err(e) = shutdown_tx.shutdown() {
warn!("failed to send shutdown signal: {:?}", e);
return;
}

info!("pipeline shutdown successfully")
});

// Wait for the pipeline to finish (either normally or via shutdown).
Expand Down