Skip to content

Conversation

@evanh
Copy link
Member

@evanh evanh commented Jan 13, 2026

This adds a postgres storage adapter for the taskbroker, as well as providing a way to choose between the adapters in the configuration. This adapter will also work with AlloyDB.

In postgres, the keyword offset is reserved, so that column is called kafka_offset in the PG tables and converted to offset.

The tests were updated to run with both the SQLite and Postgres adapter using the rstest crate. The create_test_store function was updated to be the standard for all tests, and to allow choosing between a SQLite and Postgres DB.

A remove_db function was added to the trait and the existing adapters, since the tests create a unique PG database on every run that should be cleaned up.

The create_test_store function was updated to be the standard for all tests, and to allow choosing between an SQLite and Postgres DB.

@evanh evanh requested a review from a team as a code owner January 13, 2026 20:48
@evanh evanh force-pushed the evanh/feat/use-postgresql-interface branch from 6fe9265 to dcf8130 Compare January 13, 2026 21:35
@evanh evanh requested a review from a team January 13, 2026 21:35
This adds a postgres storage adapter for the taskbroker, as well as providing a way to choose
between the adapters in the configuration. This adapter will also work with AlloyDB.

In postgres, the keyword `offset` is reserved, so that column is called `kafka_offset` in the PG
tables and converted to `offset`.

The tests were updated to run with both the SQLite and Postgres adapter using the rstest crate. The
`create_test_store` function was updated to be the standard for all tests, and to allow choosing
between a SQLite and Postgres DB.

A `remove_db` function was added to the trait and the existing adapters, since the tests create a
unique PG database on every run that should be cleaned up.

The `create_test_store` function was updated to be the standard for all tests, and to allow choosing
between an SQLite and Postgres DB.
@evanh evanh force-pushed the evanh/feat/use-postgresql-interface branch from dcf8130 to f70bfda Compare January 13, 2026 21:39
Copy link

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we start a postgres as part of devservices ?

Copy link

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend reducing the duplication in the ActivationStore between the two stores. I don't know whether we will remove SQLite from the implementation for good, anyway it will take some time before we get there, having so much copy paste is quite dangerous.

processing_deadline TIMESTAMPTZ,
status TEXT NOT NULL,
at_most_once BOOLEAN NOT NULL DEFAULT FALSE,
application TEXT NOT NULL DEFAULT '',

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not need to have a default now that it has been merged and rolled out.

}),
};
let response = service.set_task_status(Request::new(request)).await;
println!("response: {:?}", response);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove

/// The number of ms for timeouts when publishing messages to kafka.
pub kafka_send_timeout_ms: u64,

pub database_adapter: &'static str,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not an enum ?

/// When an activation is moved from pending -> processing a result is expected
/// in this many seconds.
pub processing_deadline_duration: u32,
pub processing_deadline_duration: i32,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance that we have values in the sqlite DB that would not fit a i32 ?

Comment on lines +404 to +405
/// Remove the database, used only in tests
async fn remove_db(&self) -> Result<(), Error>;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WE should create an MCP in front of production taskbroker and we absolutely need to expose this, which will certainly be used only in tests.

.fetch_all(&mut *conn)
.await?;

Ok(rows.into_iter().map(|row| row.into()).collect())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we update the rows here, then the broker crashes because a nuclear strike targets the datacenter or, less likely, it suffers an OOM ? Are these tasks stuck ?

Comment on lines +356 to +369
if let Ok(row) = result {
let received_at: DateTime<Utc> = row.get("received_at");
let delay_until: Option<DateTime<Utc>> = row.get("delay_until");
let millis = now.signed_duration_since(received_at).num_milliseconds()
- delay_until.map_or(0, |delay_time| {
delay_time
.signed_duration_since(received_at)
.num_milliseconds()
});
millis as f64 / 1000.0
} else {
// If we couldn't find a row, there is no latency.
0.0
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, we can reduce the duplications by having a lower level DB trait that is used by the activation store.

Comment on lines +379 to +386
async fn count_by_status(&self, status: InflightActivationStatus) -> Result<usize, Error> {
let result =
sqlx::query("SELECT COUNT(*) as count FROM inflight_taskactivations WHERE status = $1")
.bind(status.to_string())
.fetch_one(&self.read_pool)
.await?;
Ok(result.get::<i64, _>("count") as usize)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this could be a default method in the trait? This is an alternative direction with respect to the low level DB trait.

.bind(id)
.fetch_optional(&mut *conn)
.await?;
println!("result: {:?}", result);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

let now = Utc::now();
let mut atomic = self.write_pool.begin().await?;

// Idempotent tasks that fail their processing deadlines go directly to failure

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at_most_once != idempotent. Idempotent means you can issue the task again safely. Is the comment wrong here ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants