From 2f06aa551e46a032fcec21887101a0108ac20b26 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Fri, 26 Dec 2025 12:22:25 -0800 Subject: [PATCH 1/7] Add Builders for `TaskActivation` and `InflightActivation` --- Cargo.lock | 73 +++++++++++++ Cargo.toml | 1 + src/store/inflight_activation.rs | 115 ++++++++++++++++++-- src/store/inflight_activation_tests.rs | 71 ++++++------- src/store/mod.rs | 2 + src/store/task_activation.rs | 139 +++++++++++++++++++++++++ 6 files changed, 354 insertions(+), 47 deletions(-) create mode 100644 src/store/task_activation.rs diff --git a/Cargo.lock b/Cargo.lock index 2f33ebb3..ae5cb489 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -578,6 +578,41 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" +dependencies = [ + "darling_core", + "quote", + "syn", +] + [[package]] name = "debugid" version = "0.8.0" @@ -608,6 +643,37 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "derive_builder" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "507dfb09ea8b7fa618fcf76e953f4f5e192547945816d5358edffe39f6f94947" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "derive_builder_macro" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" +dependencies = [ + "derive_builder_core", + "syn", +] + [[package]] name = "digest" version = "0.10.7" @@ -1259,6 +1325,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "1.1.0" @@ -2910,6 +2982,7 @@ dependencies = [ "chrono", "clap", "criterion", + "derive_builder", "elegant-departure", "figment", "futures", diff --git a/Cargo.toml b/Cargo.toml index 6ef48e26..62732bb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ anyhow = "1.0.92" bytes = "1.10.0" chrono = { version = "0.4.26" } clap = { version = "4.5.20", features = ["derive"] } +derive_builder = "0.20.2" elegant-departure = { version = "0.3.1", features = ["tokio"] } figment = { version = "0.10.19", features = ["env", "yaml", "test"] } futures = "0.3.31" diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 5c911a19..8e821e7e 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -1,7 +1,6 @@ -use std::{str::FromStr, time::Instant}; - use anyhow::{Error, anyhow}; use chrono::{DateTime, Utc}; +use derive_builder::Builder; use libsqlite3_sys::{ SQLITE_DBSTATUS_CACHE_HIT, SQLITE_DBSTATUS_CACHE_MISS, SQLITE_DBSTATUS_CACHE_SPILL, SQLITE_DBSTATUS_CACHE_USED, SQLITE_DBSTATUS_CACHE_USED_SHARED, SQLITE_DBSTATUS_CACHE_WRITE, @@ -10,7 +9,9 @@ use libsqlite3_sys::{ SQLITE_DBSTATUS_LOOKASIDE_USED, SQLITE_DBSTATUS_SCHEMA_USED, SQLITE_DBSTATUS_STMT_USED, SQLITE_OK, sqlite3_db_status, }; -use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivationStatus}; +use prost::Message; +use prost_types::Timestamp; +use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivation, TaskActivationStatus}; use sqlx::{ ConnectOptions, FromRow, Pool, QueryBuilder, Row, Sqlite, Type, migrate::MigrateDatabase, @@ -20,6 +21,7 @@ use sqlx::{ SqliteRow, SqliteSynchronous, }, }; +use std::{str::FromStr, time::Instant}; use tracing::instrument; use crate::config::Config; @@ -63,57 +65,154 @@ impl From for InflightActivationStatus { } } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Builder)] +#[builder(pattern = "owned")] +#[builder_struct_attr(doc = r#" +Build `InflightActivation`s by only providing values you care about. + +### Required Fields +- `id` +- `namespace` +- `taskname` +- `activation` + +### Usage + +```rs +InflightActivationBuilder::default() + .id("task-123") + .namespace("my-namespace") + .taskname("my-task") + .activation( + TaskActivationBuilder::default() + .id(id) + .namespace("my-namespace") + .taskname("my-task") + .build() + ) + .build() + .unwrap() +``` + +The code above is equivalent to the snippet below. + +```rs +InflightActivation { + id: "task-123".to_string(), + namespace: "my-namespace".to_string(), + taskname: "my-task".to_string(), + activation: TaskActivation { + id: "task-123".to_string(), + namespace: "my-namespace".to_string(), + taskname: "my-task".to_string(), + parameters: "{}".to_string(), + headers: HashMap::new(), + processing_deadline_duration: 0, + received_at: None, + retry_state: None, + expires: None, + delay: None, + }.encode_to_vec(), + status: InflightActivationStatus::Pending, + partition: 0, + offset: 0, + added_at: Utc::now(), + received_at: Utc::now(), + processing_attempts: 0, + processing_deadline_duration: 0, + expires_at: None, + delay_until: None, + processing_deadline: None, + on_attempts_exceeded: OnAttemptsExceeded::Discard, + at_most_once: false +} +``` +"#)] pub struct InflightActivation { + #[builder(setter(into))] pub id: String, - /// The protobuf activation that was received from kafka + + /// The task namespace. + #[builder(setter(into))] + pub namespace: String, + + /// The task name. + #[builder(setter(into))] + pub taskname: String, + + /// The Protobuf activation that was received from Kafka. + #[builder(setter(custom))] pub activation: Vec, /// The current status of the activation + #[builder(default = InflightActivationStatus::Pending)] pub status: InflightActivationStatus, /// The partition the activation was received from + #[builder(default = 0)] pub partition: i32, /// The offset the activation had + #[builder(default = 0)] pub offset: i64, /// The timestamp when the activation was stored in activation store. + #[builder(default = Utc::now())] pub added_at: DateTime, /// The timestamp a task was stored in Kafka + #[builder(default = Utc::now())] + #[builder(setter(custom))] pub received_at: DateTime, /// The number of times the activation has been attempted to be processed. This counter is /// incremented everytime a task is reset from processing back to pending. When this /// exceeds max_processing_attempts, the task is discarded/deadlettered. + #[builder(default = 0)] pub processing_attempts: i32, /// The duration in seconds that a worker has to complete task execution. /// When an activation is moved from pending -> processing a result is expected /// in this many seconds. + #[builder(default = 0)] pub processing_deadline_duration: u32, /// If the task has specified an expiry, this is the timestamp after which the task should be removed from inflight store + #[builder(default = None)] pub expires_at: Option>, /// If the task has specified a delay, this is the timestamp after which the task can be sent to workers + #[builder(default = None)] pub delay_until: Option>, /// The timestamp for when processing should be complete + #[builder(default = None)] pub processing_deadline: Option>, /// What to do when the maximum number of attempts to complete a task is exceeded + #[builder(default = OnAttemptsExceeded::Discard)] pub on_attempts_exceeded: OnAttemptsExceeded, /// Whether or not the activation uses at_most_once. /// When enabled activations are not retried when processing_deadlines /// are exceeded. + #[builder(default = false)] pub at_most_once: bool, +} - /// Details about the task - pub namespace: String, - pub taskname: String, +impl InflightActivationBuilder { + /// Override the default setter generated for `activation`. + pub fn activation(mut self, activation: TaskActivation) -> Self { + self.activation = Some(activation.encode_to_vec()); + self + } + + pub fn received_at(mut self, received_at: Timestamp) -> Self { + self.received_at = Some( + DateTime::from_timestamp(received_at.seconds, received_at.nanos as u32).expect(""), + ); + self + } } impl InflightActivation { diff --git a/src/store/inflight_activation_tests.rs b/src/store/inflight_activation_tests.rs index 2ff605e3..fa959be0 100644 --- a/src/store/inflight_activation_tests.rs +++ b/src/store/inflight_activation_tests.rs @@ -1,5 +1,4 @@ -use prost::Message; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::io::Error; use std::path::Path; use std::sync::Arc; @@ -7,18 +6,17 @@ use std::time::Duration; use crate::config::Config; use crate::store::inflight_activation::{ - InflightActivation, InflightActivationStatus, InflightActivationStore, + InflightActivationBuilder, InflightActivationStatus, InflightActivationStore, InflightActivationStoreConfig, QueryResult, create_sqlite_pool, }; +use crate::store::task_activation::TaskActivationBuilder; use crate::test_utils::{ StatusCount, assert_counts, create_integration_config, create_test_store, generate_temp_filename, generate_unique_namespace, make_activations, make_activations_with_namespace, replace_retry_state, }; -use chrono::{DateTime, SubsecRound, TimeZone, Utc}; -use sentry_protos::taskbroker::v1::{ - OnAttemptsExceeded, RetryState, TaskActivation, TaskActivationStatus, -}; +use chrono::{SubsecRound, TimeZone, Utc}; +use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, RetryState, TaskActivationStatus}; use sqlx::{QueryBuilder, Sqlite}; use std::fs; use tokio::sync::broadcast; @@ -45,6 +43,9 @@ fn test_inflightactivation_status_is_completion() { assert!(value.is_conclusion()); } +#[test] +fn test_inflightactivation_builder() {} + #[test] fn test_inflightactivation_status_from() { let mut value: InflightActivationStatus = TaskActivationStatus::Pending.into(); @@ -1089,44 +1090,36 @@ async fn test_remove_killswitched() { #[tokio::test] async fn test_clear() { let store = create_test_store().await; - let namespace = generate_unique_namespace(); #[allow(deprecated)] let received_at = prost_types::Timestamp { seconds: 0, nanos: 0, }; - let batch = vec![InflightActivation { - id: "id_0".into(), - activation: TaskActivation { - id: "id_0".into(), - namespace: namespace.clone(), - taskname: "taskname".into(), - parameters: "{}".into(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: Some(1), - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp(received_at.seconds, received_at.nanos as u32) - .expect(""), - processing_attempts: 0, - processing_deadline_duration: 0, - on_attempts_exceeded: OnAttemptsExceeded::Discard, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "taskname".into(), - }]; + + let id = "id_0"; + let taskname = "taskname"; + let namespace = generate_unique_namespace(); + + let batch = vec![ + InflightActivationBuilder::default() + .id(id) + .namespace(&namespace) + .taskname(taskname) + .activation( + TaskActivationBuilder::default() + .id(id) + .namespace(&namespace) + .taskname(taskname) + .received_at(received_at) + .expires(1) + .build(), + ) + .received_at(received_at) + .build() + .unwrap(), + ]; + assert!(store.store(batch).await.is_ok()); assert_counts( StatusCount { diff --git a/src/store/mod.rs b/src/store/mod.rs index dcc0f255..1c60fd9c 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -1,3 +1,5 @@ pub mod inflight_activation; +pub mod task_activation; + #[cfg(test)] pub mod inflight_activation_tests; diff --git a/src/store/task_activation.rs b/src/store/task_activation.rs new file mode 100644 index 00000000..f2247d06 --- /dev/null +++ b/src/store/task_activation.rs @@ -0,0 +1,139 @@ +use std::collections::HashMap; + +use prost_types::Timestamp; +use sentry_protos::taskbroker::v1::{self, RetryState}; + +/// Build `TaskActivation`s by only providing values you care about. +/// +/// ### Required Fields +/// - `id` +/// - `namespace` +/// - `taskname` +/// +/// ### Usage +/// +/// ```rs +/// TaskActivationBuilder::new() +/// .id("task-123") +/// .namespace("my-namespace") +/// .taskname("my-task") +/// .build() +/// ``` +/// +/// The code above is equivalent to the snippet below. +/// +/// ```rs +/// TaskActivation { +/// id: "task-123".to_string(), +/// namespace: "my-namespace".to_string(), +/// taskname: "my-task".to_string(), +/// parameters: "{}".to_string(), +/// headers: HashMap::new(), +/// processing_deadline_duration: 0, +/// received_at: None, +/// retry_state: None, +/// expires: None, +/// delay: None, +/// } +/// ``` +/// +pub struct TaskActivationBuilder { + id: Option, + namespace: Option, + taskname: Option, + parameters: Option, + headers: Option>, + received_at: Option, + retry_state: Option, + processing_deadline_duration: Option, + expires: Option, + delay: Option, +} + +impl TaskActivationBuilder { + pub fn new() -> Self { + Self { + id: None, + namespace: None, + taskname: None, + parameters: None, + headers: None, + received_at: None, + retry_state: None, + processing_deadline_duration: None, + expires: None, + delay: None, + } + } + + pub fn id>(mut self, id: T) -> Self { + self.id = Some(id.into()); + self + } + + pub fn namespace>(mut self, namespace: T) -> Self { + self.namespace = Some(namespace.into()); + self + } + + pub fn taskname>(mut self, taskname: T) -> Self { + self.taskname = Some(taskname.into()); + self + } + + pub fn parameters>(mut self, parameters: T) -> Self { + self.parameters = Some(parameters.into()); + self + } + + pub fn headers(mut self, headers: HashMap) -> Self { + self.headers = Some(headers); + self + } + + pub fn received_at(mut self, received_at: Timestamp) -> Self { + self.received_at = Some(received_at); + self + } + + pub fn retry_state(mut self, retry_state: RetryState) -> Self { + self.retry_state = Some(retry_state); + self + } + + pub fn processing_deadline_duration(mut self, duration: u64) -> Self { + self.processing_deadline_duration = Some(duration); + self + } + + pub fn expires(mut self, expires: u64) -> Self { + self.expires = Some(expires); + self + } + + pub fn delay(mut self, delay: u64) -> Self { + self.delay = Some(delay); + self + } + + pub fn build(self) -> v1::TaskActivation { + v1::TaskActivation { + id: self.id.expect("id is required"), + namespace: self.namespace.expect("namespace is required"), + taskname: self.taskname.expect("taskname is required"), + parameters: self.parameters.unwrap_or_else(|| "{}".to_string()), + headers: self.headers.unwrap_or_else(|| HashMap::new()), + processing_deadline_duration: self.processing_deadline_duration.unwrap_or(0), + received_at: self.received_at, + retry_state: self.retry_state, + expires: self.expires, + delay: self.delay, + } + } +} + +impl Default for TaskActivationBuilder { + fn default() -> Self { + Self::new() + } +} From 63c1d0b93ff6c46c7cf9fef679afdb8aec9af50d Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Mon, 29 Dec 2025 17:03:14 -0800 Subject: [PATCH 2/7] Refactor Some `InflightActivation` Literals --- src/kafka/inflight_activation_batcher.rs | 310 ++++++------------- src/kafka/inflight_activation_writer.rs | 370 +++++++---------------- src/store/inflight_activation.rs | 10 +- src/store/inflight_activation_tests.rs | 15 +- 4 files changed, 218 insertions(+), 487 deletions(-) diff --git a/src/kafka/inflight_activation_batcher.rs b/src/kafka/inflight_activation_batcher.rs index 94043d0e..dcaf9eec 100644 --- a/src/kafka/inflight_activation_batcher.rs +++ b/src/kafka/inflight_activation_batcher.rs @@ -225,7 +225,11 @@ mod tests { use std::sync::Arc; use crate::{ - store::inflight_activation::InflightActivationStatus, test_utils::generate_unique_namespace, + store::{ + inflight_activation::{InflightActivationBuilder, InflightActivationStatus}, + task_activation::TaskActivationBuilder, + }, + test_utils::generate_unique_namespace, }; #[tokio::test] @@ -248,36 +252,18 @@ demoted_namespaces: let namespace = generate_unique_namespace(); - let inflight_activation_0 = InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "task_to_be_filtered".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: None, - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: Utc::now(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "task_to_be_filtered".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }; + let inflight_activation_0 = InflightActivationBuilder::default() + .id("0") + .taskname("task_to_be_filtered") + .namespace(&namespace) + .activation( + TaskActivationBuilder::default() + .id("0") + .taskname("task_to_be_filtered") + .namespace(namespace) + .build(), + ) + .build(); batcher.reduce(inflight_activation_0).await.unwrap(); assert_eq!(batcher.batch.len(), 0); @@ -296,36 +282,20 @@ demoted_namespaces: let namespace = generate_unique_namespace(); - let inflight_activation_0 = InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "task_to_be_filtered".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: None, - retry_state: None, - processing_deadline_duration: 0, - expires: Some(0), - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: Utc::now(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: Some(Utc::now()), - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "task_to_be_filtered".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }; + let inflight_activation_0 = InflightActivationBuilder::default() + .id("0") + .taskname("task_to_be_filtered") + .namespace(&namespace) + .expires_at(Utc::now()) + .activation( + TaskActivationBuilder::default() + .id("0") + .taskname("task_to_be_filtered") + .namespace(namespace) + .expires(0) + .build(), + ) + .build(); batcher.reduce(inflight_activation_0).await.unwrap(); assert_eq!(batcher.batch.len(), 0); @@ -347,36 +317,19 @@ demoted_namespaces: let namespace = generate_unique_namespace(); - let inflight_activation_0 = InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "taskname".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: None, - retry_state: None, - processing_deadline_duration: 0, - expires: Some(0), - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: Utc::now(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "taskname".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }; + let inflight_activation_0 = InflightActivationBuilder::default() + .id("0") + .taskname("taskname") + .namespace(&namespace) + .activation( + TaskActivationBuilder::default() + .id("0") + .taskname("taskname") + .namespace(&namespace) + .expires(0) + .build(), + ) + .build(); batcher.reduce(inflight_activation_0).await.unwrap(); assert!(batcher.is_full().await); @@ -400,67 +353,33 @@ demoted_namespaces: let namespace = generate_unique_namespace(); - let inflight_activation_0 = InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "taskname".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: None, - retry_state: None, - processing_deadline_duration: 0, - expires: Some(0), - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: Utc::now(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "taskname".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }; - - let inflight_activation_1 = InflightActivation { - id: "1".to_string(), - activation: TaskActivation { - id: "1".to_string(), - namespace: namespace.clone(), - taskname: "taskname".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: None, - retry_state: None, - processing_deadline_duration: 0, - expires: Some(0), - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: Utc::now(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "taskname".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }; + let inflight_activation_0 = InflightActivationBuilder::default() + .id("0") + .taskname("taskname") + .namespace(&namespace) + .activation( + TaskActivationBuilder::default() + .id("0") + .taskname("taskname") + .namespace(&namespace) + .expires(0) + .build(), + ) + .build(); + + let inflight_activation_1 = InflightActivationBuilder::default() + .id("1") + .taskname("taskname") + .namespace(&namespace) + .activation( + TaskActivationBuilder::default() + .id("1") + .taskname("taskname") + .namespace(&namespace) + .expires(0) + .build(), + ) + .build(); batcher.reduce(inflight_activation_0).await.unwrap(); batcher.reduce(inflight_activation_1).await.unwrap(); @@ -491,67 +410,32 @@ demoted_topic: taskworker-demoted"#; assert_eq!(batcher.producer_cluster, config.kafka_cluster.clone()); - let inflight_activation_0 = InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: "bad_namespace".to_string(), - taskname: "task_to_be_filtered".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: None, - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: Utc::now(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: "bad_namespace".to_string(), - taskname: "taskname".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }; - - let inflight_activation_1 = InflightActivation { - id: "1".to_string(), - activation: TaskActivation { - id: "1".to_string(), - namespace: "good_namespace".to_string(), - taskname: "good_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: None, - retry_state: None, - processing_deadline_duration: 0, - expires: Some(0), - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: Utc::now(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: "good_namespace".to_string(), - taskname: "good_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }; + let inflight_activation_0 = InflightActivationBuilder::default() + .id("0") + .taskname("task_to_be_filtered") + .namespace("bad_namespace") + .activation( + TaskActivationBuilder::default() + .id("0") + .taskname("task_to_be_filtered") + .namespace("bad_namespace") + .build(), + ) + .build(); + + let inflight_activation_1 = InflightActivationBuilder::default() + .id("1") + .taskname("good_task") + .namespace("good_namespace") + .activation( + TaskActivationBuilder::default() + .id("1") + .taskname("good_task") + .namespace("good_namespace") + .expires(0) + .build(), + ) + .build(); batcher.reduce(inflight_activation_0).await.unwrap(); batcher.reduce(inflight_activation_1).await.unwrap(); diff --git a/src/kafka/inflight_activation_writer.rs b/src/kafka/inflight_activation_writer.rs index 3f498567..f7efb3f9 100644 --- a/src/kafka/inflight_activation_writer.rs +++ b/src/kafka/inflight_activation_writer.rs @@ -212,9 +212,11 @@ mod tests { use sentry_protos::taskbroker::v1::TaskActivation; use std::sync::Arc; + use crate::store::inflight_activation::InflightActivationBuilder; use crate::store::inflight_activation::{ InflightActivationStatus, InflightActivationStore, InflightActivationStoreConfig, }; + use crate::store::task_activation::TaskActivationBuilder; use crate::test_utils::generate_unique_namespace; use crate::test_utils::make_activations; use crate::test_utils::{create_integration_config, generate_temp_filename}; @@ -249,74 +251,34 @@ mod tests { let namespace = generate_unique_namespace(); let batch = vec![ - InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp( - received_at.seconds, - received_at.nanos as u32, + InflightActivationBuilder::default() + .id("0") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .activation( + TaskActivationBuilder::default() + .id("0") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .build(), ) - .unwrap(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }, - InflightActivation { - id: "1".to_string(), - activation: TaskActivation { - id: "1".to_string(), - namespace: namespace.clone(), - taskname: "delay_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Delay, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp( - received_at.seconds, - received_at.nanos as u32, + .build(), + InflightActivationBuilder::default() + .id("1") + .taskname("delay_task") + .namespace(&namespace) + .received_at(received_at) + .activation( + TaskActivationBuilder::default() + .id("1") + .taskname("delay_task") + .namespace(&namespace) + .received_at(received_at) + .build(), ) - .unwrap(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "delay_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }, + .build(), ]; writer.reduce(batch).await.unwrap(); @@ -498,74 +460,34 @@ mod tests { let namespace = generate_unique_namespace(); let batch = vec![ - InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp( - received_at.seconds, - received_at.nanos as u32, + InflightActivationBuilder::default() + .id("0") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .activation( + TaskActivationBuilder::default() + .id("0") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .build(), ) - .unwrap(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }, - InflightActivation { - id: "1".to_string(), - activation: TaskActivation { - id: "1".to_string(), - namespace: namespace.clone(), - taskname: "delay_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Delay, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp( - received_at.seconds, - received_at.nanos as u32, + .build(), + InflightActivationBuilder::default() + .id("1") + .taskname("delay_task") + .namespace(&namespace) + .received_at(received_at) + .activation( + TaskActivationBuilder::default() + .id("1") + .taskname("delay_task") + .namespace(&namespace) + .received_at(received_at) + .build(), ) - .unwrap(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "delay_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }, + .build(), ]; writer.reduce(batch).await.unwrap(); @@ -610,74 +532,34 @@ mod tests { let namespace = generate_unique_namespace(); let batch = vec![ - InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp( - received_at.seconds, - received_at.nanos as u32, + InflightActivationBuilder::default() + .id("0") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .activation( + TaskActivationBuilder::default() + .id("0") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .build(), ) - .unwrap(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }, - InflightActivation { - id: "1".to_string(), - activation: TaskActivation { - id: "1".to_string(), - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp( - received_at.seconds, - received_at.nanos as u32, + .build(), + InflightActivationBuilder::default() + .id("1") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .activation( + TaskActivationBuilder::default() + .id("1") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .build(), ) - .unwrap(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }, + .build(), ]; writer.reduce(batch).await.unwrap(); @@ -753,74 +635,34 @@ mod tests { let mut writer = InflightActivationWriter::new(store.clone(), writer_config); let batch = vec![ - InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp( - received_at.seconds, - received_at.nanos as u32, + InflightActivationBuilder::default() + .id("0") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .activation( + TaskActivationBuilder::default() + .id("0") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .build(), ) - .unwrap(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }, - InflightActivation { - id: "1".to_string(), - activation: TaskActivation { - id: "1".to_string(), - namespace: namespace.clone(), - taskname: "delay_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp( - received_at.seconds, - received_at.nanos as u32, + .build(), + InflightActivationBuilder::default() + .id("1") + .taskname("delay_task") + .namespace(&namespace) + .received_at(received_at) + .activation( + TaskActivationBuilder::default() + .id("1") + .taskname("delay_task") + .namespace(&namespace) + .received_at(received_at) + .build(), ) - .unwrap(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "delay_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }, + .build(), ]; writer.reduce(batch).await.unwrap(); diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 8e821e7e..de5e725a 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -67,6 +67,7 @@ impl From for InflightActivationStatus { #[derive(Clone, Debug, PartialEq, Builder)] #[builder(pattern = "owned")] +#[builder(build_fn(name = "_build", private))] #[builder_struct_attr(doc = r#" Build `InflightActivation`s by only providing values you care about. @@ -178,7 +179,7 @@ pub struct InflightActivation { pub processing_deadline_duration: u32, /// If the task has specified an expiry, this is the timestamp after which the task should be removed from inflight store - #[builder(default = None)] + #[builder(default = None, setter(strip_option))] pub expires_at: Option>, /// If the task has specified a delay, this is the timestamp after which the task can be sent to workers @@ -213,6 +214,13 @@ impl InflightActivationBuilder { ); self } + + pub fn build(self) -> InflightActivation { + match self._build() { + Ok(activation) => activation, + Err(e) => panic!("Failed to build InflightActivation: {}", e), + } + } } impl InflightActivation { diff --git a/src/store/inflight_activation_tests.rs b/src/store/inflight_activation_tests.rs index fa959be0..8fa06e07 100644 --- a/src/store/inflight_activation_tests.rs +++ b/src/store/inflight_activation_tests.rs @@ -1097,27 +1097,24 @@ async fn test_clear() { nanos: 0, }; - let id = "id_0"; - let taskname = "taskname"; let namespace = generate_unique_namespace(); let batch = vec![ InflightActivationBuilder::default() - .id(id) + .id("id_0") + .taskname("taskname") .namespace(&namespace) - .taskname(taskname) + .received_at(received_at) .activation( TaskActivationBuilder::default() - .id(id) + .id("id_0") + .taskname("taskname") .namespace(&namespace) - .taskname(taskname) .received_at(received_at) .expires(1) .build(), ) - .received_at(received_at) - .build() - .unwrap(), + .build(), ]; assert!(store.store(batch).await.is_ok()); From a2562fd86a1bbce8441b45cb616b03d3b4f8c600 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 30 Dec 2025 10:12:32 -0800 Subject: [PATCH 3/7] Replace More `InflightActivation` Literals --- src/kafka/inflight_activation_writer.rs | 142 ++++++++---------------- src/store/inflight_activation.rs | 1 - src/store/task_activation.rs | 2 +- src/test_utils.rs | 68 ++++++------ 4 files changed, 81 insertions(+), 132 deletions(-) diff --git a/src/kafka/inflight_activation_writer.rs b/src/kafka/inflight_activation_writer.rs index f7efb3f9..543769c8 100644 --- a/src/kafka/inflight_activation_writer.rs +++ b/src/kafka/inflight_activation_writer.rs @@ -321,37 +321,22 @@ mod tests { let namespace = generate_unique_namespace(); - let batch = vec![InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp(received_at.seconds, received_at.nanos as u32) - .unwrap(), - processing_attempts: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - processing_deadline_duration: 0, - at_most_once: false, - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }]; + let batch = vec![ + InflightActivationBuilder::default() + .id("0") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .activation( + TaskActivationBuilder::default() + .id("0") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .build(), + ) + .build(), + ]; writer.reduce(batch).await.unwrap(); writer.flush().await.unwrap(); @@ -388,37 +373,23 @@ mod tests { let namespace = generate_unique_namespace(); - let batch = vec![InflightActivation { - id: "0".to_string(), - activation: TaskActivation { - id: "0".to_string(), - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Delay, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp(received_at.seconds, received_at.nanos as u32) - .unwrap(), - processing_attempts: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - processing_deadline_duration: 0, - at_most_once: false, - namespace: namespace.clone(), - taskname: "pending_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }]; + let batch = vec![ + InflightActivationBuilder::default() + .id("0") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .status(InflightActivationStatus::Delay) + .activation( + TaskActivationBuilder::default() + .id("0") + .taskname("pending_task") + .namespace(&namespace) + .received_at(received_at) + .build(), + ) + .build(), + ]; writer.reduce(batch).await.unwrap(); writer.flush().await.unwrap(); @@ -600,37 +571,22 @@ mod tests { let namespace = generate_unique_namespace(); - let existing_activation = InflightActivation { - id: "existing".to_string(), - activation: TaskActivation { - id: "existing".to_string(), - namespace: namespace.clone(), - taskname: "existing_task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - received_at: Some(received_at), - retry_state: None, - processing_deadline_duration: 0, - expires: None, - delay: None, - } - .encode_to_vec(), - status: InflightActivationStatus::Processing, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: DateTime::from_timestamp(received_at.seconds, received_at.nanos as u32) - .unwrap(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - at_most_once: false, - namespace: namespace.clone(), - taskname: "existing_task".to_string(), - on_attempts_exceeded: OnAttemptsExceeded::Discard, - }; + let existing_activation = InflightActivationBuilder::default() + .id("existing") + .taskname("existing_task") + .namespace(&namespace) + .received_at(received_at) + .status(InflightActivationStatus::Processing) + .activation( + TaskActivationBuilder::default() + .id("existing") + .taskname("existing_task") + .namespace(&namespace) + .received_at(received_at) + .build(), + ) + .build(); + store.store(vec![existing_activation]).await.unwrap(); let mut writer = InflightActivationWriter::new(store.clone(), writer_config); diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index de5e725a..be2b11fc 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -202,7 +202,6 @@ pub struct InflightActivation { } impl InflightActivationBuilder { - /// Override the default setter generated for `activation`. pub fn activation(mut self, activation: TaskActivation) -> Self { self.activation = Some(activation.encode_to_vec()); self diff --git a/src/store/task_activation.rs b/src/store/task_activation.rs index f2247d06..729b31b3 100644 --- a/src/store/task_activation.rs +++ b/src/store/task_activation.rs @@ -122,7 +122,7 @@ impl TaskActivationBuilder { namespace: self.namespace.expect("namespace is required"), taskname: self.taskname.expect("taskname is required"), parameters: self.parameters.unwrap_or_else(|| "{}".to_string()), - headers: self.headers.unwrap_or_else(|| HashMap::new()), + headers: self.headers.unwrap_or_default(), processing_deadline_duration: self.processing_deadline_duration.unwrap_or(0), received_at: self.received_at, retry_state: self.retry_state, diff --git a/src/test_utils.rs b/src/test_utils.rs index 9ae23de5..218511c2 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -7,14 +7,17 @@ use rdkafka::{ consumer::{Consumer, StreamConsumer}, producer::FutureProducer, }; -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use uuid::Uuid; use crate::{ config::Config, - store::inflight_activation::{ - InflightActivation, InflightActivationStatus, InflightActivationStore, - InflightActivationStoreConfig, + store::{ + inflight_activation::{ + InflightActivation, InflightActivationBuilder, InflightActivationStatus, + InflightActivationStore, InflightActivationStoreConfig, + }, + task_activation::TaskActivationBuilder, }, }; use chrono::{Timelike, Utc}; @@ -37,40 +40,31 @@ pub fn make_activations_with_namespace(namespace: String, count: u32) -> Vec Date: Tue, 30 Dec 2025 10:36:20 -0800 Subject: [PATCH 4/7] Reduce `TaskActivation` Builder Boilerplate w/Macros --- src/kafka/inflight_activation_batcher.rs | 9 +- src/kafka/inflight_activation_writer.rs | 9 +- src/store/inflight_activation.rs | 61 ------------ src/store/task_activation.rs | 117 +++++++---------------- 4 files changed, 36 insertions(+), 160 deletions(-) diff --git a/src/kafka/inflight_activation_batcher.rs b/src/kafka/inflight_activation_batcher.rs index dcaf9eec..94e00c05 100644 --- a/src/kafka/inflight_activation_batcher.rs +++ b/src/kafka/inflight_activation_batcher.rs @@ -213,21 +213,16 @@ impl Reducer for InflightActivationBatcher { #[cfg(test)] mod tests { use super::{ - ActivationBatcherConfig, Config, InflightActivation, InflightActivationBatcher, Reducer, - RuntimeConfigManager, + ActivationBatcherConfig, Config, InflightActivationBatcher, Reducer, RuntimeConfigManager, }; use chrono::Utc; - use std::collections::HashMap; use tokio::fs; - use prost::Message; - use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivation}; use std::sync::Arc; use crate::{ store::{ - inflight_activation::{InflightActivationBuilder, InflightActivationStatus}, - task_activation::TaskActivationBuilder, + inflight_activation::InflightActivationBuilder, task_activation::TaskActivationBuilder, }, test_utils::generate_unique_namespace, }; diff --git a/src/kafka/inflight_activation_writer.rs b/src/kafka/inflight_activation_writer.rs index 543769c8..33dadea5 100644 --- a/src/kafka/inflight_activation_writer.rs +++ b/src/kafka/inflight_activation_writer.rs @@ -202,14 +202,9 @@ impl Reducer for InflightActivationWriter { #[cfg(test)] mod tests { - use super::{ActivationWriterConfig, InflightActivation, InflightActivationWriter, Reducer}; - use chrono::{DateTime, Utc}; - use prost::Message; - use prost_types::Timestamp; - use std::collections::HashMap; + use super::{ActivationWriterConfig, InflightActivationWriter, Reducer}; - use sentry_protos::taskbroker::v1::OnAttemptsExceeded; - use sentry_protos::taskbroker::v1::TaskActivation; + use prost_types::Timestamp; use std::sync::Arc; use crate::store::inflight_activation::InflightActivationBuilder; diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index be2b11fc..d233ab52 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -68,67 +68,6 @@ impl From for InflightActivationStatus { #[derive(Clone, Debug, PartialEq, Builder)] #[builder(pattern = "owned")] #[builder(build_fn(name = "_build", private))] -#[builder_struct_attr(doc = r#" -Build `InflightActivation`s by only providing values you care about. - -### Required Fields -- `id` -- `namespace` -- `taskname` -- `activation` - -### Usage - -```rs -InflightActivationBuilder::default() - .id("task-123") - .namespace("my-namespace") - .taskname("my-task") - .activation( - TaskActivationBuilder::default() - .id(id) - .namespace("my-namespace") - .taskname("my-task") - .build() - ) - .build() - .unwrap() -``` - -The code above is equivalent to the snippet below. - -```rs -InflightActivation { - id: "task-123".to_string(), - namespace: "my-namespace".to_string(), - taskname: "my-task".to_string(), - activation: TaskActivation { - id: "task-123".to_string(), - namespace: "my-namespace".to_string(), - taskname: "my-task".to_string(), - parameters: "{}".to_string(), - headers: HashMap::new(), - processing_deadline_duration: 0, - received_at: None, - retry_state: None, - expires: None, - delay: None, - }.encode_to_vec(), - status: InflightActivationStatus::Pending, - partition: 0, - offset: 0, - added_at: Utc::now(), - received_at: Utc::now(), - processing_attempts: 0, - processing_deadline_duration: 0, - expires_at: None, - delay_until: None, - processing_deadline: None, - on_attempts_exceeded: OnAttemptsExceeded::Discard, - at_most_once: false -} -``` -"#)] pub struct InflightActivation { #[builder(setter(into))] pub id: String, diff --git a/src/store/task_activation.rs b/src/store/task_activation.rs index 729b31b3..e9ebbe25 100644 --- a/src/store/task_activation.rs +++ b/src/store/task_activation.rs @@ -1,42 +1,25 @@ +use prost_types::Timestamp; +use sentry_protos::taskbroker::v1; use std::collections::HashMap; -use prost_types::Timestamp; -use sentry_protos::taskbroker::v1::{self, RetryState}; +macro_rules! builder_setter { + // For types that should accept `impl Into` + ($field:ident: impl Into<$ty:ty>) => { + pub fn $field>(mut self, $field: T) -> Self { + self.$field = Some($field.into()); + self + } + }; + + // For types that should be used directly + ($field:ident: $ty:ty) => { + pub fn $field(mut self, $field: $ty) -> Self { + self.$field = Some($field); + self + } + }; +} -/// Build `TaskActivation`s by only providing values you care about. -/// -/// ### Required Fields -/// - `id` -/// - `namespace` -/// - `taskname` -/// -/// ### Usage -/// -/// ```rs -/// TaskActivationBuilder::new() -/// .id("task-123") -/// .namespace("my-namespace") -/// .taskname("my-task") -/// .build() -/// ``` -/// -/// The code above is equivalent to the snippet below. -/// -/// ```rs -/// TaskActivation { -/// id: "task-123".to_string(), -/// namespace: "my-namespace".to_string(), -/// taskname: "my-task".to_string(), -/// parameters: "{}".to_string(), -/// headers: HashMap::new(), -/// processing_deadline_duration: 0, -/// received_at: None, -/// retry_state: None, -/// expires: None, -/// delay: None, -/// } -/// ``` -/// pub struct TaskActivationBuilder { id: Option, namespace: Option, @@ -44,7 +27,7 @@ pub struct TaskActivationBuilder { parameters: Option, headers: Option>, received_at: Option, - retry_state: Option, + retry_state: Option, processing_deadline_duration: Option, expires: Option, delay: Option, @@ -66,55 +49,19 @@ impl TaskActivationBuilder { } } - pub fn id>(mut self, id: T) -> Self { - self.id = Some(id.into()); - self - } - - pub fn namespace>(mut self, namespace: T) -> Self { - self.namespace = Some(namespace.into()); - self - } - - pub fn taskname>(mut self, taskname: T) -> Self { - self.taskname = Some(taskname.into()); - self - } - - pub fn parameters>(mut self, parameters: T) -> Self { - self.parameters = Some(parameters.into()); - self - } - - pub fn headers(mut self, headers: HashMap) -> Self { - self.headers = Some(headers); - self - } - - pub fn received_at(mut self, received_at: Timestamp) -> Self { - self.received_at = Some(received_at); - self - } - - pub fn retry_state(mut self, retry_state: RetryState) -> Self { - self.retry_state = Some(retry_state); - self - } - - pub fn processing_deadline_duration(mut self, duration: u64) -> Self { - self.processing_deadline_duration = Some(duration); - self - } + // String fields that accept `impl Into` + builder_setter!(id: impl Into); + builder_setter!(namespace: impl Into); + builder_setter!(taskname: impl Into); + builder_setter!(parameters: impl Into); - pub fn expires(mut self, expires: u64) -> Self { - self.expires = Some(expires); - self - } - - pub fn delay(mut self, delay: u64) -> Self { - self.delay = Some(delay); - self - } + // Other fields + builder_setter!(headers: HashMap); + builder_setter!(received_at: Timestamp); + builder_setter!(retry_state: v1::RetryState); + builder_setter!(processing_deadline_duration: u64); + builder_setter!(expires: u64); + builder_setter!(delay: u64); pub fn build(self) -> v1::TaskActivation { v1::TaskActivation { From 5241f2171f42a80fb8f6c4e0dcf42fa3080f8e59 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Wed, 31 Dec 2025 11:01:30 -0800 Subject: [PATCH 5/7] Infer `TaskActivation` Values from `InflightActivationBuilder` --- src/kafka/inflight_activation_batcher.rs | 84 ++---------- src/kafka/inflight_activation_writer.rs | 168 ++++------------------- src/store/inflight_activation.rs | 68 +++++++-- src/store/inflight_activation_tests.rs | 26 +--- src/store/task_activation.rs | 20 +-- src/test_utils.rs | 22 +-- 6 files changed, 118 insertions(+), 270 deletions(-) diff --git a/src/kafka/inflight_activation_batcher.rs b/src/kafka/inflight_activation_batcher.rs index 94e00c05..91fc3f5e 100644 --- a/src/kafka/inflight_activation_batcher.rs +++ b/src/kafka/inflight_activation_batcher.rs @@ -247,18 +247,11 @@ demoted_namespaces: let namespace = generate_unique_namespace(); - let inflight_activation_0 = InflightActivationBuilder::default() + let inflight_activation_0 = InflightActivationBuilder::new() .id("0") .taskname("task_to_be_filtered") .namespace(&namespace) - .activation( - TaskActivationBuilder::default() - .id("0") - .taskname("task_to_be_filtered") - .namespace(namespace) - .build(), - ) - .build(); + .build(TaskActivationBuilder::new()); batcher.reduce(inflight_activation_0).await.unwrap(); assert_eq!(batcher.batch.len(), 0); @@ -282,15 +275,7 @@ demoted_namespaces: .taskname("task_to_be_filtered") .namespace(&namespace) .expires_at(Utc::now()) - .activation( - TaskActivationBuilder::default() - .id("0") - .taskname("task_to_be_filtered") - .namespace(namespace) - .expires(0) - .build(), - ) - .build(); + .build(TaskActivationBuilder::default()); batcher.reduce(inflight_activation_0).await.unwrap(); assert_eq!(batcher.batch.len(), 0); @@ -312,19 +297,11 @@ demoted_namespaces: let namespace = generate_unique_namespace(); - let inflight_activation_0 = InflightActivationBuilder::default() + let inflight_activation_0 = InflightActivationBuilder::new() .id("0") .taskname("taskname") .namespace(&namespace) - .activation( - TaskActivationBuilder::default() - .id("0") - .taskname("taskname") - .namespace(&namespace) - .expires(0) - .build(), - ) - .build(); + .build(TaskActivationBuilder::new()); batcher.reduce(inflight_activation_0).await.unwrap(); assert!(batcher.is_full().await); @@ -348,33 +325,17 @@ demoted_namespaces: let namespace = generate_unique_namespace(); - let inflight_activation_0 = InflightActivationBuilder::default() + let inflight_activation_0 = InflightActivationBuilder::new() .id("0") .taskname("taskname") .namespace(&namespace) - .activation( - TaskActivationBuilder::default() - .id("0") - .taskname("taskname") - .namespace(&namespace) - .expires(0) - .build(), - ) - .build(); - - let inflight_activation_1 = InflightActivationBuilder::default() + .build(TaskActivationBuilder::new()); + + let inflight_activation_1 = InflightActivationBuilder::new() .id("1") .taskname("taskname") .namespace(&namespace) - .activation( - TaskActivationBuilder::default() - .id("1") - .taskname("taskname") - .namespace(&namespace) - .expires(0) - .build(), - ) - .build(); + .build(TaskActivationBuilder::new()); batcher.reduce(inflight_activation_0).await.unwrap(); batcher.reduce(inflight_activation_1).await.unwrap(); @@ -405,32 +366,17 @@ demoted_topic: taskworker-demoted"#; assert_eq!(batcher.producer_cluster, config.kafka_cluster.clone()); - let inflight_activation_0 = InflightActivationBuilder::default() + let inflight_activation_0 = InflightActivationBuilder::new() .id("0") .taskname("task_to_be_filtered") .namespace("bad_namespace") - .activation( - TaskActivationBuilder::default() - .id("0") - .taskname("task_to_be_filtered") - .namespace("bad_namespace") - .build(), - ) - .build(); - - let inflight_activation_1 = InflightActivationBuilder::default() + .build(TaskActivationBuilder::new()); + + let inflight_activation_1 = InflightActivationBuilder::new() .id("1") .taskname("good_task") .namespace("good_namespace") - .activation( - TaskActivationBuilder::default() - .id("1") - .taskname("good_task") - .namespace("good_namespace") - .expires(0) - .build(), - ) - .build(); + .build(TaskActivationBuilder::new()); batcher.reduce(inflight_activation_0).await.unwrap(); batcher.reduce(inflight_activation_1).await.unwrap(); diff --git a/src/kafka/inflight_activation_writer.rs b/src/kafka/inflight_activation_writer.rs index 33dadea5..7eb3611f 100644 --- a/src/kafka/inflight_activation_writer.rs +++ b/src/kafka/inflight_activation_writer.rs @@ -204,7 +204,7 @@ impl Reducer for InflightActivationWriter { mod tests { use super::{ActivationWriterConfig, InflightActivationWriter, Reducer}; - use prost_types::Timestamp; + use chrono::DateTime; use std::sync::Arc; use crate::store::inflight_activation::InflightActivationBuilder; @@ -238,42 +238,22 @@ mod tests { writer_config, ); - let received_at = Timestamp { - seconds: 0, - nanos: 0, - }; - + let received_at = DateTime::from_timestamp_nanos(0); let namespace = generate_unique_namespace(); let batch = vec![ - InflightActivationBuilder::default() + InflightActivationBuilder::new() .id("0") .taskname("pending_task") .namespace(&namespace) .received_at(received_at) - .activation( - TaskActivationBuilder::default() - .id("0") - .taskname("pending_task") - .namespace(&namespace) - .received_at(received_at) - .build(), - ) - .build(), - InflightActivationBuilder::default() + .build(TaskActivationBuilder::new()), + InflightActivationBuilder::new() .id("1") .taskname("delay_task") .namespace(&namespace) .received_at(received_at) - .activation( - TaskActivationBuilder::default() - .id("1") - .taskname("delay_task") - .namespace(&namespace) - .received_at(received_at) - .build(), - ) - .build(), + .build(TaskActivationBuilder::new()), ]; writer.reduce(batch).await.unwrap(); @@ -309,28 +289,16 @@ mod tests { writer_config, ); - let received_at = Timestamp { - seconds: 0, - nanos: 0, - }; - + let received_at = DateTime::from_timestamp_nanos(0); let namespace = generate_unique_namespace(); let batch = vec![ - InflightActivationBuilder::default() + InflightActivationBuilder::new() .id("0") .taskname("pending_task") .namespace(&namespace) .received_at(received_at) - .activation( - TaskActivationBuilder::default() - .id("0") - .taskname("pending_task") - .namespace(&namespace) - .received_at(received_at) - .build(), - ) - .build(), + .build(TaskActivationBuilder::new()), ]; writer.reduce(batch).await.unwrap(); @@ -361,29 +329,17 @@ mod tests { writer_config, ); - let received_at = Timestamp { - seconds: 0, - nanos: 0, - }; - + let received_at = DateTime::from_timestamp_nanos(0); let namespace = generate_unique_namespace(); let batch = vec![ - InflightActivationBuilder::default() + InflightActivationBuilder::new() .id("0") .taskname("pending_task") .namespace(&namespace) .received_at(received_at) .status(InflightActivationStatus::Delay) - .activation( - TaskActivationBuilder::default() - .id("0") - .taskname("pending_task") - .namespace(&namespace) - .received_at(received_at) - .build(), - ) - .build(), + .build(TaskActivationBuilder::new()), ]; writer.reduce(batch).await.unwrap(); @@ -418,42 +374,22 @@ mod tests { writer_config, ); - let received_at = Timestamp { - seconds: 0, - nanos: 0, - }; - + let received_at = DateTime::from_timestamp_nanos(0); let namespace = generate_unique_namespace(); let batch = vec![ - InflightActivationBuilder::default() + InflightActivationBuilder::new() .id("0") .taskname("pending_task") .namespace(&namespace) .received_at(received_at) - .activation( - TaskActivationBuilder::default() - .id("0") - .taskname("pending_task") - .namespace(&namespace) - .received_at(received_at) - .build(), - ) - .build(), - InflightActivationBuilder::default() + .build(TaskActivationBuilder::new()), + InflightActivationBuilder::new() .id("1") .taskname("delay_task") .namespace(&namespace) .received_at(received_at) - .activation( - TaskActivationBuilder::default() - .id("1") - .taskname("delay_task") - .namespace(&namespace) - .received_at(received_at) - .build(), - ) - .build(), + .build(TaskActivationBuilder::new()), ]; writer.reduce(batch).await.unwrap(); @@ -490,42 +426,22 @@ mod tests { writer_config, ); - let received_at = Timestamp { - seconds: 0, - nanos: 0, - }; - + let received_at = DateTime::from_timestamp_nanos(0); let namespace = generate_unique_namespace(); let batch = vec![ - InflightActivationBuilder::default() + InflightActivationBuilder::new() .id("0") .taskname("pending_task") .namespace(&namespace) .received_at(received_at) - .activation( - TaskActivationBuilder::default() - .id("0") - .taskname("pending_task") - .namespace(&namespace) - .received_at(received_at) - .build(), - ) - .build(), - InflightActivationBuilder::default() + .build(TaskActivationBuilder::new()), + InflightActivationBuilder::new() .id("1") .taskname("pending_task") .namespace(&namespace) .received_at(received_at) - .activation( - TaskActivationBuilder::default() - .id("1") - .taskname("pending_task") - .namespace(&namespace) - .received_at(received_at) - .build(), - ) - .build(), + .build(TaskActivationBuilder::new()), ]; writer.reduce(batch).await.unwrap(); @@ -559,11 +475,7 @@ mod tests { .unwrap(), ); - let received_at = Timestamp { - seconds: 0, - nanos: 0, - }; - + let received_at = DateTime::from_timestamp_nanos(0); let namespace = generate_unique_namespace(); let existing_activation = InflightActivationBuilder::default() @@ -572,48 +484,24 @@ mod tests { .namespace(&namespace) .received_at(received_at) .status(InflightActivationStatus::Processing) - .activation( - TaskActivationBuilder::default() - .id("existing") - .taskname("existing_task") - .namespace(&namespace) - .received_at(received_at) - .build(), - ) - .build(); + .build(TaskActivationBuilder::new()); store.store(vec![existing_activation]).await.unwrap(); let mut writer = InflightActivationWriter::new(store.clone(), writer_config); let batch = vec![ - InflightActivationBuilder::default() + InflightActivationBuilder::new() .id("0") .taskname("pending_task") .namespace(&namespace) .received_at(received_at) - .activation( - TaskActivationBuilder::default() - .id("0") - .taskname("pending_task") - .namespace(&namespace) - .received_at(received_at) - .build(), - ) - .build(), - InflightActivationBuilder::default() + .build(TaskActivationBuilder::new()), + InflightActivationBuilder::new() .id("1") .taskname("delay_task") .namespace(&namespace) .received_at(received_at) - .activation( - TaskActivationBuilder::default() - .id("1") - .taskname("delay_task") - .namespace(&namespace) - .received_at(received_at) - .build(), - ) - .build(), + .build(TaskActivationBuilder::new()), ]; writer.reduce(batch).await.unwrap(); diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index d233ab52..68c26676 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -11,7 +11,7 @@ use libsqlite3_sys::{ }; use prost::Message; use prost_types::Timestamp; -use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivation, TaskActivationStatus}; +use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivationStatus}; use sqlx::{ ConnectOptions, FromRow, Pool, QueryBuilder, Row, Sqlite, Type, migrate::MigrateDatabase, @@ -21,10 +21,13 @@ use sqlx::{ SqliteRow, SqliteSynchronous, }, }; -use std::{str::FromStr, time::Instant}; +use std::{ + str::FromStr, + time::{Instant, SystemTime}, +}; use tracing::instrument; -use crate::config::Config; +use crate::{config::Config, store::task_activation::TaskActivationBuilder}; /// The members of this enum should be synced with the members /// of InflightActivationStatus in sentry_protos @@ -122,11 +125,11 @@ pub struct InflightActivation { pub expires_at: Option>, /// If the task has specified a delay, this is the timestamp after which the task can be sent to workers - #[builder(default = None)] + #[builder(default = None, setter(strip_option))] pub delay_until: Option>, /// The timestamp for when processing should be complete - #[builder(default = None)] + #[builder(default = None, setter(strip_option))] pub processing_deadline: Option>, /// What to do when the maximum number of attempts to complete a task is exceeded @@ -141,19 +144,58 @@ pub struct InflightActivation { } impl InflightActivationBuilder { - pub fn activation(mut self, activation: TaskActivation) -> Self { - self.activation = Some(activation.encode_to_vec()); - self + pub fn new() -> Self { + Self::default() } - pub fn received_at(mut self, received_at: Timestamp) -> Self { - self.received_at = Some( - DateTime::from_timestamp(received_at.seconds, received_at.nanos as u32).expect(""), - ); + pub fn received_at(mut self, received_at: DateTime) -> Self { + self.received_at = Some(received_at); self } - pub fn build(self) -> InflightActivation { + pub fn build(mut self, builder: TaskActivationBuilder) -> InflightActivation { + // Grab required fields + let id = self.id.as_ref().expect("field 'id' is required"); + let namespace = self + .namespace + .as_ref() + .expect("field 'namespace' is required"); + let taskname = self + .taskname + .as_ref() + .expect("field 'taskname' is required"); + + // Grab fields with defaults + let received_at = self.received_at.unwrap_or_default(); + let processing_deadline_duration = self.processing_deadline_duration.unwrap_or_default(); + + // Infer 'expires' field + let expires = self + .expires_at + .flatten() + .map(|date_time| (date_time - received_at).num_seconds() as u64); + + // Infer 'delay' field + let delay = self + .delay_until + .flatten() + .map(|date_time| (date_time - received_at).num_seconds() as u64); + + // Build the activation + let mut activation = builder + .id(id) + .taskname(taskname) + .namespace(namespace) + .received_at(Timestamp::from(SystemTime::from(received_at))) + .processing_deadline_duration(processing_deadline_duration as u64) + .build(); + + // Set 'expiration' and 'delay' fields manually after activation has been build + activation.expires = expires; + activation.delay = delay; + + self.activation = Some(activation.encode_to_vec()); + match self._build() { Ok(activation) => activation, Err(e) => panic!("Failed to build InflightActivation: {}", e), diff --git a/src/store/inflight_activation_tests.rs b/src/store/inflight_activation_tests.rs index 8fa06e07..fd5fd3ec 100644 --- a/src/store/inflight_activation_tests.rs +++ b/src/store/inflight_activation_tests.rs @@ -15,7 +15,7 @@ use crate::test_utils::{ generate_temp_filename, generate_unique_namespace, make_activations, make_activations_with_namespace, replace_retry_state, }; -use chrono::{SubsecRound, TimeZone, Utc}; +use chrono::{DateTime, SubsecRound, TimeZone, Utc}; use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, RetryState, TaskActivationStatus}; use sqlx::{QueryBuilder, Sqlite}; use std::fs; @@ -43,9 +43,6 @@ fn test_inflightactivation_status_is_completion() { assert!(value.is_conclusion()); } -#[test] -fn test_inflightactivation_builder() {} - #[test] fn test_inflightactivation_status_from() { let mut value: InflightActivationStatus = TaskActivationStatus::Pending.into(); @@ -1091,30 +1088,19 @@ async fn test_remove_killswitched() { async fn test_clear() { let store = create_test_store().await; - #[allow(deprecated)] - let received_at = prost_types::Timestamp { - seconds: 0, - nanos: 0, - }; + let received_at = DateTime::from_timestamp_nanos(0); + let expires_at = received_at + Duration::from_secs(1); let namespace = generate_unique_namespace(); let batch = vec![ - InflightActivationBuilder::default() + InflightActivationBuilder::new() .id("id_0") .taskname("taskname") .namespace(&namespace) .received_at(received_at) - .activation( - TaskActivationBuilder::default() - .id("id_0") - .taskname("taskname") - .namespace(&namespace) - .received_at(received_at) - .expires(1) - .build(), - ) - .build(), + .expires_at(expires_at) + .build(TaskActivationBuilder::new()), ]; assert!(store.store(batch).await.is_ok()); diff --git a/src/store/task_activation.rs b/src/store/task_activation.rs index e9ebbe25..55675286 100644 --- a/src/store/task_activation.rs +++ b/src/store/task_activation.rs @@ -21,16 +21,16 @@ macro_rules! builder_setter { } pub struct TaskActivationBuilder { - id: Option, - namespace: Option, - taskname: Option, - parameters: Option, - headers: Option>, - received_at: Option, - retry_state: Option, - processing_deadline_duration: Option, - expires: Option, - delay: Option, + pub id: Option, + pub namespace: Option, + pub taskname: Option, + pub parameters: Option, + pub headers: Option>, + pub received_at: Option, + pub retry_state: Option, + pub processing_deadline_duration: Option, + pub expires: Option, + pub delay: Option, } impl TaskActivationBuilder { diff --git a/src/test_utils.rs b/src/test_utils.rs index 218511c2..3cd9f2b2 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -20,7 +20,7 @@ use crate::{ task_activation::TaskActivationBuilder, }, }; -use chrono::{Timelike, Utc}; +use chrono::Utc; use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, RetryState, TaskActivation}; /// Generate a unique filename for isolated SQLite databases. @@ -41,29 +41,15 @@ pub fn make_activations_with_namespace(namespace: String, count: u32) -> Vec Date: Wed, 31 Dec 2025 11:38:54 -0800 Subject: [PATCH 6/7] Minor Fixes --- src/kafka/inflight_activation_batcher.rs | 4 ++-- src/kafka/inflight_activation_writer.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/kafka/inflight_activation_batcher.rs b/src/kafka/inflight_activation_batcher.rs index 91fc3f5e..8848535f 100644 --- a/src/kafka/inflight_activation_batcher.rs +++ b/src/kafka/inflight_activation_batcher.rs @@ -270,12 +270,12 @@ demoted_namespaces: let namespace = generate_unique_namespace(); - let inflight_activation_0 = InflightActivationBuilder::default() + let inflight_activation_0 = InflightActivationBuilder::new() .id("0") .taskname("task_to_be_filtered") .namespace(&namespace) .expires_at(Utc::now()) - .build(TaskActivationBuilder::default()); + .build(TaskActivationBuilder::new()); batcher.reduce(inflight_activation_0).await.unwrap(); assert_eq!(batcher.batch.len(), 0); diff --git a/src/kafka/inflight_activation_writer.rs b/src/kafka/inflight_activation_writer.rs index 7eb3611f..4cdf716b 100644 --- a/src/kafka/inflight_activation_writer.rs +++ b/src/kafka/inflight_activation_writer.rs @@ -478,7 +478,7 @@ mod tests { let received_at = DateTime::from_timestamp_nanos(0); let namespace = generate_unique_namespace(); - let existing_activation = InflightActivationBuilder::default() + let existing_activation = InflightActivationBuilder::new() .id("existing") .taskname("existing_task") .namespace(&namespace) From ea6b09fdb2c73881fbe05fa4d811ed85cc286025 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Wed, 31 Dec 2025 11:40:45 -0800 Subject: [PATCH 7/7] Remove Custom Received Setter --- src/store/inflight_activation.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 68c26676..e40a215c 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -105,7 +105,6 @@ pub struct InflightActivation { /// The timestamp a task was stored in Kafka #[builder(default = Utc::now())] - #[builder(setter(custom))] pub received_at: DateTime, /// The number of times the activation has been attempted to be processed. This counter is @@ -148,11 +147,6 @@ impl InflightActivationBuilder { Self::default() } - pub fn received_at(mut self, received_at: DateTime) -> Self { - self.received_at = Some(received_at); - self - } - pub fn build(mut self, builder: TaskActivationBuilder) -> InflightActivation { // Grab required fields let id = self.id.as_ref().expect("field 'id' is required");