From 8e95a23828a5f99c33c329e1a496860002749830 Mon Sep 17 00:00:00 2001 From: Shiyas Mohammed Date: Thu, 12 Mar 2026 17:28:08 +0530 Subject: [PATCH] refactor(metadata-db): drop redundant columns from jobs table --- ...0260307120000_drop_jobs_status_columns.sql | 9 ++++++++ crates/core/metadata-db/src/jobs.rs | 3 +-- crates/core/metadata-db/src/jobs/sql.rs | 6 ++--- .../src/jobs/tests/it_idempotency.rs | 23 ++++++------------- crates/core/metadata-db/src/tests/helpers.rs | 2 +- crates/services/controller/src/scheduler.rs | 2 +- 6 files changed, 21 insertions(+), 24 deletions(-) create mode 100644 crates/core/metadata-db/migrations/20260307120000_drop_jobs_status_columns.sql diff --git a/crates/core/metadata-db/migrations/20260307120000_drop_jobs_status_columns.sql b/crates/core/metadata-db/migrations/20260307120000_drop_jobs_status_columns.sql new file mode 100644 index 000000000..7f9cef945 --- /dev/null +++ b/crates/core/metadata-db/migrations/20260307120000_drop_jobs_status_columns.sql @@ -0,0 +1,9 @@ +-- Drop redundant columns from jobs; + +-- status/node_id/updated_at live in jobs_status +ALTER TABLE jobs DROP COLUMN IF EXISTS status; +ALTER TABLE jobs DROP COLUMN IF EXISTS node_id; +ALTER TABLE jobs DROP COLUMN IF EXISTS updated_at; + +-- descriptor lives in job_events.detail +ALTER TABLE jobs DROP COLUMN IF EXISTS descriptor; diff --git a/crates/core/metadata-db/src/jobs.rs b/crates/core/metadata-db/src/jobs.rs index 56fff7a67..432efbe06 100644 --- a/crates/core/metadata-db/src/jobs.rs +++ b/crates/core/metadata-db/src/jobs.rs @@ -39,12 +39,11 @@ use crate::{ pub async fn register<'c, E>( exe: E, idempotency_key: impl Into> + std::fmt::Debug, - node_id: impl Into> + std::fmt::Debug, ) -> Result where E: Executor<'c>, { - sql::insert(exe, &idempotency_key.into(), &node_id.into()) + sql::insert(exe, &idempotency_key.into()) .await .map_err(Error::Database) } diff --git a/crates/core/metadata-db/src/jobs/sql.rs b/crates/core/metadata-db/src/jobs/sql.rs index 72dc669d1..ed48cb642 100644 --- a/crates/core/metadata-db/src/jobs/sql.rs +++ b/crates/core/metadata-db/src/jobs/sql.rs @@ -30,21 +30,19 @@ pub struct JobWithRetryInfo { pub async fn insert<'c, E>( exe: E, idempotency_key: &IdempotencyKey<'_>, - node_id: &WorkerNodeId<'_>, ) -> Result where E: Executor<'c, Database = Postgres>, { let query = indoc::indoc! {r#" - INSERT INTO jobs (idempotency_key, node_id, created_at) - VALUES ($1, $2, timezone('UTC', now())) + INSERT INTO jobs (idempotency_key, created_at) + VALUES ($1, timezone('UTC', now())) ON CONFLICT ON CONSTRAINT jobs_idempotency_key_unique DO UPDATE SET idempotency_key = EXCLUDED.idempotency_key RETURNING id "#}; let res = sqlx::query_scalar(query) .bind(idempotency_key) - .bind(node_id) .fetch_one(exe) .await?; Ok(res) diff --git a/crates/core/metadata-db/src/jobs/tests/it_idempotency.rs b/crates/core/metadata-db/src/jobs/tests/it_idempotency.rs index 4df8ecae6..7c0fbb2c6 100644 --- a/crates/core/metadata-db/src/jobs/tests/it_idempotency.rs +++ b/crates/core/metadata-db/src/jobs/tests/it_idempotency.rs @@ -27,23 +27,14 @@ async fn register_with_same_key_returns_same_job_id() { async fn different_keys_produce_different_jobs() { //* Given let (_db, conn) = setup_test_db().await; - let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID); - let job_id_a = jobs::register( - &conn, - IdempotencyKey::from_ref_unchecked("key-a"), - &worker_id, - ) - .await - .expect("register key-a failed"); + let job_id_a = jobs::register(&conn, IdempotencyKey::from_ref_unchecked("key-a")) + .await + .expect("register key-a failed"); //* When - let job_id_b = jobs::register( - &conn, - IdempotencyKey::from_ref_unchecked("key-b"), - &worker_id, - ) - .await - .expect("register key-b failed"); + let job_id_b = jobs::register(&conn, IdempotencyKey::from_ref_unchecked("key-b")) + .await + .expect("register key-b failed"); //* Then assert_ne!( @@ -61,7 +52,7 @@ async fn get_by_idempotency_key_returns_job() { let key = IdempotencyKey::from_ref_unchecked("lookup-key"); let mut tx = conn.begin_txn().await.expect("begin txn failed"); - let job_id = jobs::register(&mut tx, key.clone(), &worker_id) + let job_id = jobs::register(&mut tx, key.clone()) .await .expect("register failed"); job_events::register( diff --git a/crates/core/metadata-db/src/tests/helpers.rs b/crates/core/metadata-db/src/tests/helpers.rs index 791742a69..6cd336c14 100644 --- a/crates/core/metadata-db/src/tests/helpers.rs +++ b/crates/core/metadata-db/src/tests/helpers.rs @@ -65,7 +65,7 @@ pub async fn register_job( let key_str = format!("test:{}", COUNTER.fetch_add(1, Ordering::Relaxed)); IdempotencyKey::from_owned_unchecked(key_str) }); - let job_id = jobs::register(&mut tx, idempotency_key, worker_id) + let job_id = jobs::register(&mut tx, idempotency_key) .await .expect("Failed to register job"); job_events::register(&mut tx, job_id, worker_id, status, Some(job_desc.clone())) diff --git a/crates/services/controller/src/scheduler.rs b/crates/services/controller/src/scheduler.rs index 94447c1b9..1769cd687 100644 --- a/crates/services/controller/src/scheduler.rs +++ b/crates/services/controller/src/scheduler.rs @@ -156,7 +156,7 @@ impl Scheduler { let scheduled_job_status = JobStatus::Scheduled; let detail: metadata_db::job_events::EventDetail<'static> = job_descriptor.into(); - let job_id: JobId = metadata_db::jobs::register(&mut tx, idempotency_key, &node_id) + let job_id: JobId = metadata_db::jobs::register(&mut tx, idempotency_key) .await .map(Into::into) .map_err(ScheduleJobError::RegisterJob)?;