Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ sentry_protos = "0.4.11"
serde = "1.0.214"
serde_yaml = "0.9.34"
sha2 = "0.10.8"
sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "chrono"] }
sqlx = { version = "0.8.3", features = ["sqlite", "runtime-tokio", "chrono", "postgres"] }
tokio = { version = "1.43.1", features = ["full"] }
tokio-stream = { version = "0.1.16", features = ["full"] }
tokio-util = "0.7.12"
Expand All @@ -61,6 +61,7 @@ uuid = { version = "1.11.0", features = ["v4"] }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["async_tokio"] }
rstest = "0.23"

[[bench]]
name = "store_bench"
Expand Down
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# recent enough version of protobuf-compiler
FROM rust:1-bookworm AS build

RUN apt-get update && apt-get upgrade -y
RUN apt-get update && apt-get upgrade -y
RUN apt-get install -y cmake pkg-config libssl-dev librdkafka-dev protobuf-compiler

RUN USER=root cargo new --bin taskbroker
Expand All @@ -17,6 +17,7 @@ ENV TASKBROKER_VERSION=$TASKBROKER_GIT_REVISION
COPY ./Cargo.lock ./Cargo.lock
COPY ./Cargo.toml ./Cargo.toml
COPY ./migrations ./migrations
COPY ./pg_migrations ./pg_migrations
COPY ./benches ./benches

# Build dependencies in a way they can be cached
Expand Down
1 change: 1 addition & 0 deletions default_migrations/0001_create_database.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE DATABASE taskbroker;
20 changes: 20 additions & 0 deletions pg_migrations/0001_create_inflight_activations.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- PostgreSQL equivalent of the inflight_taskactivations table
CREATE TABLE IF NOT EXISTS inflight_taskactivations (
id TEXT NOT NULL PRIMARY KEY,
activation BYTEA NOT NULL,
partition INTEGER NOT NULL,
kafka_offset BIGINT NOT NULL,
added_at TIMESTAMPTZ NOT NULL,
received_at TIMESTAMPTZ NOT NULL,
processing_attempts INTEGER NOT NULL,
expires_at TIMESTAMPTZ,
delay_until TIMESTAMPTZ,
processing_deadline_duration INTEGER NOT NULL,
processing_deadline TIMESTAMPTZ,
status TEXT NOT NULL,
at_most_once BOOLEAN NOT NULL DEFAULT FALSE,
application TEXT NOT NULL DEFAULT '',

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not need to have a default now that it has been merged and rolled out.

namespace TEXT NOT NULL,
taskname TEXT NOT NULL,
on_attempts_exceeded INTEGER NOT NULL DEFAULT 1
);
11 changes: 11 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ pub struct Config {
/// The number of ms for timeouts when publishing messages to kafka.
pub kafka_send_timeout_ms: u64,

pub database_adapter: &'static str,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not an enum ?


/// The url of the postgres database to use for the inflight activation store.
pub pg_url: String,

/// The name of the postgres database to use for the inflight activation store.
pub pg_database_name: String,

/// The path to the sqlite database
pub db_path: String,

Expand Down Expand Up @@ -256,6 +264,9 @@ impl Default for Config {
kafka_auto_offset_reset: "latest".to_owned(),
kafka_send_timeout_ms: 500,
db_path: "./taskbroker-inflight.sqlite".to_owned(),
database_adapter: "sqlite",
pg_url: "postgres://postgres:password@sentry-postgres-1:5432/".to_owned(),
pg_database_name: "taskbroker".to_owned(),
db_write_failure_backoff_ms: 4000,
db_insert_batch_max_len: 256,
db_insert_batch_max_size: 16_000_000,
Expand Down
73 changes: 52 additions & 21 deletions src/grpc/server_tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::grpc::server::TaskbrokerServer;
use crate::store::inflight_activation::InflightActivationStore;
use prost::Message;
use rstest::rstest;
use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService;
use sentry_protos::taskbroker::v1::{
FetchNextTask, GetTaskRequest, SetTaskStatusRequest, TaskActivation,
Expand All @@ -10,8 +10,11 @@ use tonic::{Code, Request};
use crate::test_utils::{create_test_store, make_activations};

#[tokio::test]
async fn test_get_task() {
let store = create_test_store().await;
#[rstest]
#[case::sqlite("sqlite")]
#[case::postgres("postgres")]
async fn test_get_task(#[case] adapter: &str) {
let store = create_test_store(adapter).await;
let service = TaskbrokerServer { store };
let request = GetTaskRequest {
namespace: None,
Expand All @@ -25,9 +28,12 @@ async fn test_get_task() {
}

#[tokio::test]
#[rstest]
#[case::sqlite("sqlite")]
#[case::postgres("postgres")]
#[allow(deprecated)]
async fn test_set_task_status() {
let store = create_test_store().await;
async fn test_set_task_status(#[case] adapter: &str) {
let store = create_test_store(adapter).await;
let service = TaskbrokerServer { store };
let request = SetTaskStatusRequest {
id: "test_task".to_string(),
Expand All @@ -41,9 +47,12 @@ async fn test_set_task_status() {
}

#[tokio::test]
#[rstest]
#[case::sqlite("sqlite")]
#[case::postgres("postgres")]
#[allow(deprecated)]
async fn test_set_task_status_invalid() {
let store = create_test_store().await;
async fn test_set_task_status_invalid(#[case] adapter: &str) {
let store = create_test_store(adapter).await;
let service = TaskbrokerServer { store };
let request = SetTaskStatusRequest {
id: "test_task".to_string(),
Expand All @@ -61,9 +70,12 @@ async fn test_set_task_status_invalid() {
}

#[tokio::test]
#[rstest]
#[case::sqlite("sqlite")]
#[case::postgres("postgres")]
#[allow(deprecated)]
async fn test_get_task_success() {
let store = create_test_store().await;
async fn test_get_task_success(#[case] adapter: &str) {
let store = create_test_store(adapter).await;
let activations = make_activations(1);
store.store(activations).await.unwrap();

Expand All @@ -81,9 +93,12 @@ async fn test_get_task_success() {
}

#[tokio::test]
#[rstest]
#[case::sqlite("sqlite")]
#[case::postgres("postgres")]
#[allow(deprecated)]
async fn test_get_task_with_application_success() {
let store = create_test_store().await;
async fn test_get_task_with_application_success(#[case] adapter: &str) {
let store = create_test_store(adapter).await;
let mut activations = make_activations(2);

let mut payload = TaskActivation::decode(&activations[1].activation as &[u8]).unwrap();
Expand All @@ -108,9 +123,12 @@ async fn test_get_task_with_application_success() {
}

#[tokio::test]
#[rstest]
#[case::sqlite("sqlite")]
#[case::postgres("postgres")]
#[allow(deprecated)]
async fn test_get_task_with_namespace_requires_application() {
let store = create_test_store().await;
async fn test_get_task_with_namespace_requires_application(#[case] adapter: &str) {
let store = create_test_store(adapter).await;
let activations = make_activations(2);
let namespace = activations[0].namespace.clone();

Expand All @@ -129,9 +147,12 @@ async fn test_get_task_with_namespace_requires_application() {
}

#[tokio::test]
#[rstest]
#[case::sqlite("sqlite")]
#[case::postgres("postgres")]
#[allow(deprecated)]
async fn test_set_task_status_success() {
let store = create_test_store().await;
async fn test_set_task_status_success(#[case] adapter: &str) {
let store = create_test_store(adapter).await;
let activations = make_activations(2);
store.store(activations).await.unwrap();

Expand All @@ -157,6 +178,7 @@ async fn test_set_task_status_success() {
}),
};
let response = service.set_task_status(Request::new(request)).await;
println!("response: {:?}", response);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove

assert!(response.is_ok());
let resp = response.unwrap();
assert!(resp.get_ref().task.is_some());
Expand All @@ -165,9 +187,12 @@ async fn test_set_task_status_success() {
}

#[tokio::test]
#[rstest]
#[case::sqlite("sqlite")]
#[case::postgres("postgres")]
#[allow(deprecated)]
async fn test_set_task_status_with_application() {
let store = create_test_store().await;
async fn test_set_task_status_with_application(#[case] adapter: &str) {
let store = create_test_store(adapter).await;
let mut activations = make_activations(2);

let mut payload = TaskActivation::decode(&activations[1].activation as &[u8]).unwrap();
Expand Down Expand Up @@ -199,9 +224,12 @@ async fn test_set_task_status_with_application() {
}

#[tokio::test]
#[rstest]
#[case::sqlite("sqlite")]
#[case::postgres("postgres")]
#[allow(deprecated)]
async fn test_set_task_status_with_application_no_match() {
let store = create_test_store().await;
async fn test_set_task_status_with_application_no_match(#[case] adapter: &str) {
let store = create_test_store(adapter).await;
let mut activations = make_activations(2);

let mut payload = TaskActivation::decode(&activations[1].activation as &[u8]).unwrap();
Expand All @@ -228,9 +256,12 @@ async fn test_set_task_status_with_application_no_match() {
}

#[tokio::test]
#[rstest]
#[case::sqlite("sqlite")]
#[case::postgres("postgres")]
#[allow(deprecated)]
async fn test_set_task_status_with_namespace_requires_application() {
let store = create_test_store().await;
async fn test_set_task_status_with_namespace_requires_application(#[case] adapter: &str) {
let store = create_test_store(adapter).await;
let activations = make_activations(2);
let namespace = activations[0].namespace.clone();

Expand Down
2 changes: 1 addition & 1 deletion src/kafka/deserialize_activation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub fn new(
added_at: Utc::now(),
received_at: activation_time,
processing_deadline: None,
processing_deadline_duration: activation.processing_deadline_duration as u32,
processing_deadline_duration: activation.processing_deadline_duration as i32,
processing_attempts: 0,
expires_at,
delay_until,
Expand Down
Loading
Loading