diff --git a/Cargo.lock b/Cargo.lock index cf3f6400..ca469317 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -552,6 +552,20 @@ dependencies = [ "typenum", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "debugid" version = "0.8.0" @@ -919,6 +933,12 @@ dependencies = [ "crunchy", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.5" @@ -936,7 +956,7 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" dependencies = [ - "hashbrown", + "hashbrown 0.15.5", ] [[package]] @@ -1255,7 +1275,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2481980430f9f78649238835720ddccc57e52df14ffce1c6f37391d61b563e9" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.5", ] [[package]] @@ -2431,8 +2451,7 @@ dependencies = [ [[package]] name = "sentry_protos" version = "0.4.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db6b0a26106f3a2fae5791618daafbdde92502c09dcbf48006db07c7fa0ba733" +source = "git+https://github.com/getsentry/sentry-protos?branch=george%2Fpush-broker-worker#a4b97fe5ae594996e20509f4053db360203e9c3c" dependencies = [ "prost", "prost-types", @@ -2646,7 +2665,7 @@ dependencies = [ "futures-intrusive", "futures-io", "futures-util", - "hashbrown", + "hashbrown 0.15.5", "hashlink", "indexmap", "log", @@ -2877,6 +2896,7 @@ dependencies = [ "chrono", "clap", "criterion", + "dashmap", "elegant-departure", "figment", "futures", @@ -2885,12 +2905,13 @@ dependencies = [ "hmac", "http", "http-body-util", + "itertools 0.14.0", "libsqlite3-sys", "metrics", "metrics-exporter-statsd", "prost", "prost-types", - "rand 0.8.5", + "rand 0.9.2", "rdkafka", "sentry", "sentry_protos", diff --git a/Cargo.toml b/Cargo.toml index 4ca8308f..a50b32d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ anyhow = "1.0.92" bytes = "1.10.0" chrono = { version = "0.4.26" } clap = { version = "4.5.20", features = ["derive"] } +dashmap = "6.1.0" elegant-departure = { version = "0.3.1", features = ["tokio"] } figment = { version = "0.10.19", features = ["env", "yaml", "test"] } futures = "0.3.31" @@ -26,7 +27,7 @@ metrics = "0.24.0" metrics-exporter-statsd = "0.9.0" prost = "0.13" prost-types = "0.13.3" -rand = "0.8.5" +rand = "0.9.2" rdkafka = { version = "0.37.0", features = ["cmake-build", "ssl"] } sentry = { version = "0.41.0", default-features = false, features = [ # default features, except `release-health` is disabled @@ -39,7 +40,8 @@ sentry = { version = "0.41.0", default-features = false, features = [ "tracing", "logs" ] } -sentry_protos = "0.4.10" +sentry_protos = { git = "https://github.com/getsentry/sentry-protos", branch = "george/push-broker-worker" } +itertools = "0.14.0" serde = "1.0.214" serde_yaml = "0.9.34" sha2 = "0.10.8" diff --git a/benches/store_bench.rs b/benches/store_bench.rs index af5ce5f5..0b767c50 100644 --- a/benches/store_bench.rs +++ b/benches/store_bench.rs @@ -15,11 +15,11 @@ use tokio::task::JoinSet; async fn get_pending_activations(num_activations: u32, num_workers: u32) { let url = if cfg!(feature = "bench-with-mnt-disk") { - let mut rng = rand::thread_rng(); + let mut rng = rand::rng(); format!( "/mnt/disks/sqlite/{}-{}.sqlite", Utc::now(), - rng.r#gen::() + rng.random::() ) } else { generate_temp_filename() @@ -78,11 +78,11 @@ async fn set_status(num_activations: u32, num_workers: u32) { assert!(num_activations.is_multiple_of(num_workers)); let url = if cfg!(feature = "bench-with-mnt-disk") { - let mut rng = rand::thread_rng(); + let mut rng = rand::rng(); format!( "/mnt/disks/sqlite/{}-{}.sqlite", Utc::now(), - rng.r#gen::() + rng.random::() ) } else { generate_temp_filename() diff --git a/config.yaml b/config.yaml new file mode 100644 index 00000000..f7015921 --- /dev/null +++ b/config.yaml @@ -0,0 +1,9 @@ +kafka_topic: "test-topic" +push: true +default_metrics_tags: + host: "127.0.0.1" +log_filter: "debug,sqlx=debug,librdkafka=warn,h2=off" +# workers: +# - "http://127.0.0.1:50052" +# - "http://127.0.0.1:50053" +# - "http://127.0.0.1:50054" diff --git a/src/config.rs b/src/config.rs index d4bbaf2b..d2b94804 100644 --- a/src/config.rs +++ b/src/config.rs @@ -214,6 +214,12 @@ pub struct Config { /// Enable additional metrics for the sqlite. pub enable_sqlite_status_metrics: bool, + + /// Enable push mode. + pub push: bool, + + /// Worker addresses. + pub workers: Vec, } impl Default for Config { @@ -279,6 +285,8 @@ impl Default for Config { full_vacuum_on_upkeep: true, vacuum_interval_ms: 30000, enable_sqlite_status_metrics: true, + push: false, + workers: vec![], } } } diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 99fe03d8..94e6b0f4 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -2,27 +2,57 @@ use chrono::Utc; use prost::Message; use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService; use sentry_protos::taskbroker::v1::{ - FetchNextTask, GetTaskRequest, GetTaskResponse, SetTaskStatusRequest, SetTaskStatusResponse, + AddWorkerRequest, AddWorkerResponse, FetchNextTask, GetTaskRequest, GetTaskResponse, + RemoveWorkerRequest, RemoveWorkerResponse, SetTaskStatusRequest, SetTaskStatusResponse, TaskActivation, TaskActivationStatus, }; use std::sync::Arc; use std::time::Instant; use tonic::{Request, Response, Status}; +use crate::pool::WorkerPool; use crate::store::inflight_activation::{InflightActivationStatus, InflightActivationStore}; -use tracing::{error, instrument}; +use tracing::{debug, error, instrument}; pub struct TaskbrokerServer { pub store: Arc, + pub pool: Arc, + pub push: bool, } #[tonic::async_trait] impl ConsumerService for TaskbrokerServer { + #[instrument(skip_all)] + async fn add_worker( + &self, + request: Request, + ) -> Result, Status> { + let address = &request.get_ref().address; + self.pool.add_worker(address).await; + Ok(Response::new(AddWorkerResponse {})) + } + + #[instrument(skip_all)] + async fn remove_worker( + &self, + request: Request, + ) -> Result, Status> { + let address = &request.get_ref().address; + self.pool.remove_worker(address); + Ok(Response::new(RemoveWorkerResponse {})) + } + #[instrument(skip_all)] async fn get_task( &self, request: Request, ) -> Result, Status> { + if self.push { + return Err(Status::failed_precondition( + "get_task is not available in push mode", + )); + } + let start_time = Instant::now(); let namespace = &request.get_ref().namespace; let inflight = self @@ -67,6 +97,8 @@ impl ConsumerService for TaskbrokerServer { let start_time = Instant::now(); let id = request.get_ref().id.clone(); + debug!("Received task status {} for {id}", request.get_ref().status); + let status: InflightActivationStatus = TaskActivationStatus::try_from(request.get_ref().status) .map_err(|e| { @@ -83,6 +115,8 @@ impl ConsumerService for TaskbrokerServer { metrics::counter!("grpc_server.set_status.failure").increment(1); } + debug!("Status of task {id} set to {:?}", status); + let update_result = self.store.set_status(&id, status).await; if let Err(e) = update_result { error!( @@ -101,6 +135,10 @@ impl ConsumerService for TaskbrokerServer { return Ok(Response::new(SetTaskStatusResponse { task: None })); }; + if self.push { + return Ok(Response::new(SetTaskStatusResponse { task: None })); + } + let start_time = Instant::now(); let res = match self .store diff --git a/src/grpc/server_tests.rs b/src/grpc/server_tests.rs index 6387b44d..19eb653d 100644 --- a/src/grpc/server_tests.rs +++ b/src/grpc/server_tests.rs @@ -4,12 +4,18 @@ use tonic::{Code, Request}; use crate::grpc::server::TaskbrokerServer; -use crate::test_utils::{create_test_store, make_activations}; +use crate::test_utils::{create_pool, create_test_store, make_activations}; #[tokio::test] async fn test_get_task() { let store = create_test_store().await; - let service = TaskbrokerServer { store }; + let pool = create_pool(); + + let service = TaskbrokerServer { + store, + pool, + push: false, + }; let request = GetTaskRequest { namespace: None }; let response = service.get_task(Request::new(request)).await; assert!(response.is_err()); @@ -22,11 +28,18 @@ async fn test_get_task() { #[allow(deprecated)] async fn test_set_task_status() { let store = create_test_store().await; - let service = TaskbrokerServer { store }; + let pool = create_pool(); + + let service = TaskbrokerServer { + store, + pool, + push: false, + }; let request = SetTaskStatusRequest { id: "test_task".to_string(), status: 5, // Complete fetch_next_task: None, + address: "http://127.0.0.1:50052".into(), }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -38,11 +51,18 @@ async fn test_set_task_status() { #[allow(deprecated)] async fn test_set_task_status_invalid() { let store = create_test_store().await; - let service = TaskbrokerServer { store }; + let pool = create_pool(); + + let service = TaskbrokerServer { + store, + pool, + push: false, + }; let request = SetTaskStatusRequest { id: "test_task".to_string(), status: 1, // Invalid fetch_next_task: None, + address: "http://127.0.0.1:50052".into(), }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_err()); @@ -58,10 +78,16 @@ async fn test_set_task_status_invalid() { #[allow(deprecated)] async fn test_get_task_success() { let store = create_test_store().await; + let pool = create_pool(); + let activations = make_activations(1); store.store(activations).await.unwrap(); - let service = TaskbrokerServer { store }; + let service = TaskbrokerServer { + store, + pool, + push: false, + }; let request = GetTaskRequest { namespace: None }; let response = service.get_task(Request::new(request)).await; assert!(response.is_ok()); @@ -75,10 +101,16 @@ async fn test_get_task_success() { #[allow(deprecated)] async fn test_set_task_status_success() { let store = create_test_store().await; + let pool = create_pool(); + let activations = make_activations(2); store.store(activations).await.unwrap(); - let service = TaskbrokerServer { store }; + let service = TaskbrokerServer { + store, + pool, + push: false, + }; let request = GetTaskRequest { namespace: None }; let response = service.get_task(Request::new(request)).await; @@ -92,6 +124,7 @@ async fn test_set_task_status_success() { id: "id_0".to_string(), status: 5, // Complete fetch_next_task: Some(FetchNextTask { namespace: None }), + address: "http://127.0.0.1:50052".into(), }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); diff --git a/src/lib.rs b/src/lib.rs index 33567944..6798fd05 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,8 @@ pub mod grpc; pub mod kafka; pub mod logging; pub mod metrics; +pub mod pool; +pub mod push; pub mod runtime_config; pub mod store; pub mod test_utils; diff --git a/src/main.rs b/src/main.rs index 3998e116..483d6a0d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,8 @@ use std::{sync::Arc, time::Duration}; use taskbroker::kafka::inflight_activation_batcher::{ ActivationBatcherConfig, InflightActivationBatcher, }; +use taskbroker::pool::WorkerPool; +use taskbroker::push::TaskPusher; use taskbroker::upkeep::upkeep; use tokio::signal::unix::SignalKind; use tokio::task::JoinHandle; @@ -57,6 +59,8 @@ async fn main() -> Result<(), Error> { let runtime_config_manager = Arc::new(RuntimeConfigManager::new(config.runtime_config_path.clone()).await); + let pool = Arc::new(WorkerPool::new(config.workers.clone())); + println!("taskbroker starting"); println!("version: {}", get_version().trim()); @@ -177,10 +181,29 @@ async fn main() -> Result<(), Error> { } }); + // Push task loop (conditionally enabled) + let push_task = if config.push { + info!("Running in PUSH mode"); + + let push_task_store = store.clone(); + let push_task_config = config.clone(); + let push_task_pool = pool.clone(); + + Some(tokio::spawn(async move { + let pusher = TaskPusher::new(push_task_store, push_task_config, push_task_pool); + pusher.start().await + })) + } else { + info!("Running in PULL mode"); + None + }; + // GRPC server let grpc_server_task = tokio::spawn({ let grpc_store = store.clone(); let grpc_config = config.clone(); + let grpc_pool = pool.clone(); + async move { let addr = format!("{}:{}", grpc_config.grpc_addr, grpc_config.grpc_port) .parse() @@ -195,6 +218,8 @@ async fn main() -> Result<(), Error> { .layer(layers) .add_service(ConsumerServiceServer::new(TaskbrokerServer { store: grpc_store, + pool: grpc_pool, + push: config.push, })) .add_service(health_service.clone()) .serve(addr); @@ -225,7 +250,7 @@ async fn main() -> Result<(), Error> { } }); - elegant_departure::tokio::depart() + let mut depart = elegant_departure::tokio::depart() .on_termination() .on_sigint() .on_signal(SignalKind::hangup()) @@ -233,8 +258,14 @@ async fn main() -> Result<(), Error> { .on_completion(log_task_completion("consumer", consumer_task)) .on_completion(log_task_completion("grpc_server", grpc_server_task)) .on_completion(log_task_completion("upkeep_task", upkeep_task)) - .on_completion(log_task_completion("maintenance_task", maintenance_task)) - .await; + .on_completion(log_task_completion("maintenance_task", maintenance_task)); + + // Only register push_task if it was spawned + if let Some(task) = push_task { + depart = depart.on_completion(log_task_completion("push_task", task)); + } + + depart.await; Ok(()) } diff --git a/src/pool.rs b/src/pool.rs new file mode 100644 index 00000000..242832d5 --- /dev/null +++ b/src/pool.rs @@ -0,0 +1,142 @@ +use std::cmp::Ordering; + +use anyhow::Result; +use dashmap::DashMap; +use rand::Rng; +use rand::seq::IteratorRandom; +use sentry_protos::taskworker::v1::{PushTaskRequest, worker_service_client::WorkerServiceClient}; +use tonic::transport::{Channel, Error}; +use tracing::{info, warn}; + +pub struct WorkerPool { + /// Maps every worker address to its client. + /// Uses DashMap for concurrent access without external locking. + clients: DashMap, +} + +#[derive(Clone)] +struct WorkerClient { + /// The actual RPC client connection. + connection: WorkerServiceClient, + + /// The worker address. + address: String, + + /// The worker's last known queue size. + queue_size: u32, +} + +impl WorkerClient { + pub fn new(connection: WorkerServiceClient, address: String, queue_size: u32) -> Self { + Self { + connection, + address, + queue_size, + } + } +} + +impl WorkerPool { + /// Create a new `WorkerPool` instance. + pub fn new>(_addresses: T) -> Self { + Self { + clients: DashMap::new(), + } + } + + /// Register worker address and attempt to connect immediately. + /// Only adds the worker to the pool if the connection succeeds. + pub async fn add_worker>(&self, address: T) { + let address = address.into(); + info!("Adding worker {address}"); + + // Only add to the pool if we can connect + match connect(&address).await { + Ok(connection) => { + info!("Connected to {address}"); + + let client = WorkerClient::new(connection, address.clone(), 0); + + self.clients.insert(address.clone(), client); + } + + Err(e) => { + warn!( + "Did not register worker {address} due to connection error - {:?}", + e + ); + } + } + } + + /// Unregister worker address during execution. + pub fn remove_worker(&self, address: &String) { + info!("Removing worker {address}"); + self.clients.remove(address); + } + + /// Try pushing a task to the best worker using P2C (Power of Two Choices). + pub async fn push(&self, request: PushTaskRequest) -> Result<()> { + let candidate = { + let mut rng = rand::rng(); + + self.clients + .iter() + .map(|entry| entry.value().clone()) + .choose_multiple(&mut rng, 2) + .into_iter() + .min_by(|a, b| { + match a.queue_size.cmp(&b.queue_size) { + // When two workers are the same, we pick one randomly to avoid hammering one worker repeatedly + Ordering::Equal => { + if rng.random::() { + Ordering::Less + } else { + Ordering::Greater + } + } + other => other, + } + }) + }; + + let Some(mut client) = candidate else { + return Err(anyhow::anyhow!("No connected workers")); + }; + + let address = client.address.clone(); + + match client.connection.push_task(request).await { + Ok(response) => { + let response = response.into_inner(); + + if !response.added { + return Err(anyhow::anyhow!("Selected worker was full")); + } + + // Update this worker's queue size + client.queue_size = response.queue_size; + self.clients.insert(address, client); + + Ok(()) + } + + Err(e) => { + warn!( + "Removing worker {address} from pool due to RPC error - {:?}", + e + ); + + // Remove this unhealthy worker + self.clients.remove(&address); + + Err(e.into()) + } + } + } +} + +#[inline] +async fn connect>(address: T) -> Result, Error> { + WorkerServiceClient::connect(address.into()).await +} diff --git a/src/push.rs b/src/push.rs new file mode 100644 index 00000000..5fbb0135 --- /dev/null +++ b/src/push.rs @@ -0,0 +1,153 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use prost::Message; +use sentry_protos::{taskbroker::v1::TaskActivation, taskworker::v1::PushTaskRequest}; +use tokio::time::sleep; +use tracing::{debug, error, info}; + +use crate::config::Config; +use crate::pool::WorkerPool; +use crate::store::inflight_activation::{InflightActivation, InflightActivationStore}; + +pub struct TaskPusher { + /// Pool of workers through which we will push tasks. + pool: Arc, + + /// Broker configuration. + config: Arc, + + /// Inflight activation store. + store: Arc, +} + +impl TaskPusher { + /// Create a new `TaskPusher` instance. + pub fn new( + store: Arc, + config: Arc, + pool: Arc, + ) -> Self { + Self { + store, + config, + pool, + } + } + + /// Start the worker update and push task loops. + pub async fn start(self) -> Result<()> { + info!("Push task loop starting..."); + + let guard = elegant_departure::get_shutdown_guard().shutdown_on_drop(); + + loop { + tokio::select! { + _ = guard.wait() => { + info!("Push task loop received shutdown signal"); + break; + } + + _ = async { + debug!("About to process next task..."); + self.process_next_task().await; + } => {} + } + } + + info!("Push task loop shutting down..."); + Ok(()) + } +} + +impl TaskPusher { + /// Grab the next pending task from the store. + async fn process_next_task(&self) { + debug!("Getting the next task..."); + + match self.store.peek_pending_activation().await { + Ok(Some(inflight)) => { + let id = inflight.id.clone(); + debug!("Found task {id} with status {:?}", inflight.status); + + if let Err(e) = self.handle_task_push(inflight).await { + debug!("Pushing task {id} resulted in error - {:?}", e); + } else { + debug!("Task {id} was pushed!"); + } + } + + Ok(None) => { + debug!("No pending tasks, sleeping briefly"); + sleep(milliseconds(100)).await; + } + + Err(e) => { + error!("Failed to fetch pending activation - {:?}", e); + sleep(milliseconds(100)).await; + } + } + } + + /// Decode task activation and push it to a worker. + async fn handle_task_push(&self, inflight: InflightActivation) -> Result<()> { + let task_id = inflight.id.clone(); + + let activation = TaskActivation::decode(&inflight.activation as &[u8]).map_err(|e| { + error!("Failed to decode activation {task_id}: {:?}", e); + e + })?; + + self.push_to_worker(activation, &task_id).await + } + + /// Build an RPC request and send it to the worker pool to be pushed. + async fn push_to_worker(&self, activation: TaskActivation, task_id: &str) -> Result<()> { + let request = PushTaskRequest { + task: Some(activation), + callback_url: format!( + "{}:{}", + self.config.default_metrics_tags["host"], self.config.grpc_port + ), + }; + + let result = self.pool.push(request).await; + + match result { + Ok(()) => { + debug!("Pushed task {task_id}"); + self.mark_task_as_processing(task_id).await; + Ok(()) + } + + Err(e) => { + debug!("Could not push task {task_id} - {:?}", e); + Err(e) + } + } + } + + /// Mark task with id `task_id` as processing if it's still pending. + async fn mark_task_as_processing(&self, task_id: &str) { + match self.store.mark_as_processing_if_pending(task_id).await { + Ok(true) => { + debug!("Task {} pushed and marked as processing", task_id); + } + + Ok(false) => { + error!("Task {task_id} was already taken by another process (race condition)"); + } + + Err(e) => { + error!("Failed to mark task {task_id} as processing - {:?}", e); + sleep(milliseconds(100)).await; + } + } + } +} + +#[inline] +fn milliseconds(i: u64) -> Duration { + Duration::from_millis(i) +} diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 5c911a19..900e5270 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -604,6 +604,70 @@ impl InflightActivationStore { meta_result } + #[instrument(skip_all)] + pub async fn peek_pending_activation(&self) -> Result, Error> { + let now = Utc::now(); + + let row_result: Option = sqlx::query_as( + " + SELECT id, + activation, + partition, + offset, + added_at, + received_at, + processing_attempts, + expires_at, + delay_until, + processing_deadline_duration, + processing_deadline, + status, + at_most_once, + namespace, + taskname, + on_attempts_exceeded + FROM inflight_taskactivations + WHERE status = $1 + AND (expires_at IS NULL OR expires_at > $2) + ORDER BY added_at + LIMIT 1 + ", + ) + .bind(InflightActivationStatus::Pending) + .bind(now.timestamp()) + .fetch_optional(&self.read_pool) + .await?; + + Ok(row_result.map(|row| row.into())) + } + + #[instrument(skip_all)] + pub async fn mark_as_processing_if_pending(&self, id: &str) -> Result { + let grace_period = self.config.processing_deadline_grace_sec; + let mut conn = self + .acquire_write_conn_metric("mark_as_processing_if_pending") + .await?; + + let result: Option = sqlx::query_as(&format!( + "UPDATE inflight_taskactivations + SET + processing_deadline = unixepoch( + 'now', '+' || (processing_deadline_duration + {grace_period}) || ' seconds' + ), + status = $1 + WHERE id = $2 + AND status = $3 + RETURNING *" + )) + .bind(InflightActivationStatus::Processing) + .bind(id) + .bind(InflightActivationStatus::Pending) + .fetch_optional(&mut *conn) + .await?; + + Ok(result.is_some()) + } + #[instrument(skip_all)] pub async fn get_pending_activation( &self, diff --git a/src/test_utils.rs b/src/test_utils.rs index 9ae23de5..7b9a3436 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -12,6 +12,7 @@ use uuid::Uuid; use crate::{ config::Config, + pool::WorkerPool, store::inflight_activation::{ InflightActivation, InflightActivationStatus, InflightActivationStore, InflightActivationStoreConfig, @@ -22,8 +23,8 @@ use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, RetryState, TaskActivati /// Generate a unique filename for isolated SQLite databases. pub fn generate_temp_filename() -> String { - let mut rng = rand::thread_rng(); - format!("/var/tmp/{}-{}.sqlite", Utc::now(), rng.r#gen::()) + let mut rng = rand::rng(); + format!("/var/tmp/{}-{}.sqlite", Utc::now(), rng.random::()) } /// Generate a unique alphanumeric string for namespaces (and possibly other purposes). @@ -87,6 +88,11 @@ pub fn create_config() -> Arc { Arc::new(Config::default()) } +/// Create a basic [`WorkerPool`]. +pub fn create_pool() -> Arc { + Arc::new(WorkerPool::new(["127.0.0.1:50052".into()])) +} + /// Create an InflightActivationStore instance pub async fn create_test_store() -> Arc { Arc::new(