Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Drop redundant columns from jobs;

-- status/node_id/updated_at live in jobs_status
ALTER TABLE jobs DROP COLUMN IF EXISTS status;
ALTER TABLE jobs DROP COLUMN IF EXISTS node_id;
ALTER TABLE jobs DROP COLUMN IF EXISTS updated_at;

-- descriptor lives in job_events.detail
ALTER TABLE jobs DROP COLUMN IF EXISTS descriptor;
3 changes: 1 addition & 2 deletions crates/core/metadata-db/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,11 @@ use crate::{
pub async fn register<'c, E>(
exe: E,
idempotency_key: impl Into<IdempotencyKey<'_>> + std::fmt::Debug,
node_id: impl Into<WorkerNodeId<'_>> + std::fmt::Debug,
) -> Result<JobId, Error>
where
E: Executor<'c>,
{
sql::insert(exe, &idempotency_key.into(), &node_id.into())
sql::insert(exe, &idempotency_key.into())
.await
.map_err(Error::Database)
}
Expand Down
6 changes: 2 additions & 4 deletions crates/core/metadata-db/src/jobs/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,19 @@ pub struct JobWithRetryInfo {
pub async fn insert<'c, E>(
exe: E,
idempotency_key: &IdempotencyKey<'_>,
node_id: &WorkerNodeId<'_>,
) -> Result<JobId, sqlx::Error>
where
E: Executor<'c, Database = Postgres>,
{
let query = indoc::indoc! {r#"
INSERT INTO jobs (idempotency_key, node_id, created_at)
VALUES ($1, $2, timezone('UTC', now()))
INSERT INTO jobs (idempotency_key, created_at)
VALUES ($1, timezone('UTC', now()))
ON CONFLICT ON CONSTRAINT jobs_idempotency_key_unique
DO UPDATE SET idempotency_key = EXCLUDED.idempotency_key
RETURNING id
"#};
let res = sqlx::query_scalar(query)
.bind(idempotency_key)
.bind(node_id)
.fetch_one(exe)
.await?;
Ok(res)
Expand Down
23 changes: 7 additions & 16 deletions crates/core/metadata-db/src/jobs/tests/it_idempotency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,14 @@ async fn register_with_same_key_returns_same_job_id() {
async fn different_keys_produce_different_jobs() {
//* Given
let (_db, conn) = setup_test_db().await;
let worker_id = WorkerNodeId::from_ref_unchecked(TEST_WORKER_ID);
let job_id_a = jobs::register(
&conn,
IdempotencyKey::from_ref_unchecked("key-a"),
&worker_id,
)
.await
.expect("register key-a failed");
let job_id_a = jobs::register(&conn, IdempotencyKey::from_ref_unchecked("key-a"))
.await
.expect("register key-a failed");

//* When
let job_id_b = jobs::register(
&conn,
IdempotencyKey::from_ref_unchecked("key-b"),
&worker_id,
)
.await
.expect("register key-b failed");
let job_id_b = jobs::register(&conn, IdempotencyKey::from_ref_unchecked("key-b"))
.await
.expect("register key-b failed");

//* Then
assert_ne!(
Expand All @@ -61,7 +52,7 @@ async fn get_by_idempotency_key_returns_job() {
let key = IdempotencyKey::from_ref_unchecked("lookup-key");

let mut tx = conn.begin_txn().await.expect("begin txn failed");
let job_id = jobs::register(&mut tx, key.clone(), &worker_id)
let job_id = jobs::register(&mut tx, key.clone())
.await
.expect("register failed");
job_events::register(
Expand Down
2 changes: 1 addition & 1 deletion crates/core/metadata-db/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub async fn register_job(
let key_str = format!("test:{}", COUNTER.fetch_add(1, Ordering::Relaxed));
IdempotencyKey::from_owned_unchecked(key_str)
});
let job_id = jobs::register(&mut tx, idempotency_key, worker_id)
let job_id = jobs::register(&mut tx, idempotency_key)
.await
.expect("Failed to register job");
job_events::register(&mut tx, job_id, worker_id, status, Some(job_desc.clone()))
Expand Down
2 changes: 1 addition & 1 deletion crates/services/controller/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl Scheduler {
let scheduled_job_status = JobStatus::Scheduled;
let detail: metadata_db::job_events::EventDetail<'static> = job_descriptor.into();

let job_id: JobId = metadata_db::jobs::register(&mut tx, idempotency_key, &node_id)
let job_id: JobId = metadata_db::jobs::register(&mut tx, idempotency_key)
.await
.map(Into::into)
.map_err(ScheduleJobError::RegisterJob)?;
Expand Down
Loading