Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
-- =============================================================
-- Migration: Add jobs_status projection table
-- =============================================================
-- 1. Create jobs_status table (one row per job, latest state)
-- 2. Backfill from jobs table
-- 3. Add foreign key constraint
-- 4. Add indexes for common query patterns
-- =============================================================

-- 1. Create jobs_status table
CREATE TABLE IF NOT EXISTS jobs_status (
job_id BIGINT NOT NULL,
node_id TEXT NOT NULL,
status TEXT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT timezone('UTC', now()),
CONSTRAINT jobs_status_pkey PRIMARY KEY (job_id)
);

-- 2. Backfill from existing jobs
INSERT INTO jobs_status (job_id, node_id, status, updated_at)
SELECT id, node_id, status, updated_at
FROM jobs
ON CONFLICT (job_id) DO NOTHING;

-- 3. Add foreign key
ALTER TABLE jobs_status
ADD CONSTRAINT fk_jobs_status_job_id
FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE;

-- 4. Add indexes for common query patterns
CREATE INDEX IF NOT EXISTS idx_jobs_status_status ON jobs_status (status);
CREATE INDEX IF NOT EXISTS idx_jobs_status_node_id_status ON jobs_status (node_id, status);
2 changes: 1 addition & 1 deletion crates/core/metadata-db/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ pub enum Error {
///
/// See `JobStatusUpdateError` for specific transition validation errors.
#[error("Job status update error: {0}")]
JobStatusUpdate(#[source] crate::jobs::JobStatusUpdateError),
JobStatusUpdate(#[source] crate::job_status::JobStatusUpdateError),

/// Failed to get active physical table by location ID
///
Expand Down
66 changes: 7 additions & 59 deletions crates/core/metadata-db/src/job_events/tests/it_job_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,11 @@ use pgtemp::PgTempDB;
use crate::{
config::DEFAULT_POOL_MAX_CONNECTIONS,
job_events,
jobs::{self, JobDescriptorRaw, JobStatus},
jobs::JobStatus,
tests::common::{raw_descriptor, register_job},
workers::{self, WorkerInfo, WorkerNodeId},
};

fn raw_descriptor(value: &serde_json::Value) -> JobDescriptorRaw<'static> {
let raw = serde_json::value::to_raw_value(value).expect("Failed to serialize to raw value");
JobDescriptorRaw::from_owned_unchecked(raw)
}

#[tokio::test]
async fn register_inserts_event() {
//* Given
Expand All @@ -29,14 +25,9 @@ async fn register_inserts_event() {
.expect("Failed to register worker");

let job_desc = raw_descriptor(&serde_json::json!({"test": "event"}));
let job_id = jobs::sql::insert_with_default_status(&conn, worker_id.clone(), &job_desc)
.await
.expect("Failed to insert job");

//* When
job_events::register(&conn, job_id, &worker_id, JobStatus::Scheduled)
.await
.expect("Failed to register event");
let job_id = register_job(&conn, &job_desc, &worker_id, None).await;

//* Then
let attempts = job_events::get_attempts_for_job(&conn, job_id)
Expand All @@ -62,14 +53,9 @@ async fn get_attempts_returns_only_scheduled_events() {
.expect("Failed to register worker");

let job_desc = raw_descriptor(&serde_json::json!({"test": "filter"}));
let job_id = jobs::sql::insert_with_default_status(&conn, worker_id.clone(), &job_desc)
.await
.expect("Failed to insert job");

// Insert a mix of event types
job_events::register(&conn, job_id, &worker_id, JobStatus::Scheduled)
.await
.expect("Failed to register SCHEDULED");
let job_id = register_job(&conn, &job_desc, &worker_id, None).await;
job_events::register(&conn, job_id, &worker_id, JobStatus::Running)
.await
.expect("Failed to register RUNNING");
Expand All @@ -91,34 +77,6 @@ async fn get_attempts_returns_only_scheduled_events() {
assert_eq!(attempts[1].retry_index, 1);
}

#[tokio::test]
async fn get_attempts_returns_empty_for_no_events() {
//* Given
let temp_db = PgTempDB::new();
let conn =
crate::connect_pool_with_retry(&temp_db.connection_uri(), DEFAULT_POOL_MAX_CONNECTIONS)
.await
.expect("Failed to connect to metadata db");

let worker_id = WorkerNodeId::from_ref_unchecked("test-worker");
workers::register(&conn, worker_id.clone(), WorkerInfo::default())
.await
.expect("Failed to register worker");

let job_desc = raw_descriptor(&serde_json::json!({"test": "empty"}));
let job_id = jobs::sql::insert_with_default_status(&conn, worker_id.clone(), &job_desc)
.await
.expect("Failed to insert job");

//* When
let attempts = job_events::get_attempts_for_job(&conn, job_id)
.await
.expect("Failed to get attempts");

//* Then
assert!(attempts.is_empty());
}

#[tokio::test]
async fn get_attempts_scoped_to_job() {
//* Given
Expand All @@ -135,23 +93,13 @@ async fn get_attempts_scoped_to_job() {

let job_desc = raw_descriptor(&serde_json::json!({"test": "scope"}));

let job_id_1 = jobs::sql::insert_with_default_status(&conn, worker_id.clone(), &job_desc)
.await
.expect("Failed to insert job 1");
let job_id_2 = jobs::sql::insert_with_default_status(&conn, worker_id.clone(), &job_desc)
.await
.expect("Failed to insert job 2");

// Insert events for both jobs
job_events::register(&conn, job_id_1, &worker_id, JobStatus::Scheduled)
.await
.expect("Failed to register event for job 1");
let job_id_1 = register_job(&conn, &job_desc, &worker_id, None).await;
job_events::register(&conn, job_id_1, &worker_id, JobStatus::Scheduled)
.await
.expect("Failed to register second event for job 1");
job_events::register(&conn, job_id_2, &worker_id, JobStatus::Scheduled)
.await
.expect("Failed to register event for job 2");

let job_id_2 = register_job(&conn, &job_desc, &worker_id, None).await;

//* When
let attempts_1 = job_events::get_attempts_for_job(&conn, job_id_1)
Expand Down
Loading
Loading