Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ anyhow = "1.0.92"
bytes = "1.10.0"
chrono = { version = "0.4.26" }
clap = { version = "4.5.20", features = ["derive"] }
dashmap = "6.1.0"
elegant-departure = { version = "0.3.1", features = ["tokio"] }
figment = { version = "0.10.19", features = ["env", "yaml", "test"] }
futures = "0.3.31"
Expand All @@ -26,7 +27,7 @@ metrics = "0.24.0"
metrics-exporter-statsd = "0.9.0"
prost = "0.13"
prost-types = "0.13.3"
rand = "0.8.5"
rand = "0.9.2"
rdkafka = { version = "0.37.0", features = ["cmake-build", "ssl"] }
sentry = { version = "0.41.0", default-features = false, features = [
# default features, except `release-health` is disabled
Expand All @@ -39,7 +40,8 @@ sentry = { version = "0.41.0", default-features = false, features = [
"tracing",
"logs"
] }
sentry_protos = "0.4.10"
sentry_protos = { git = "https://github.com/getsentry/sentry-protos", branch = "george/push-broker-worker" }
itertools = "0.14.0"
serde = "1.0.214"
serde_yaml = "0.9.34"
sha2 = "0.10.8"
Expand Down
8 changes: 4 additions & 4 deletions benches/store_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ use tokio::task::JoinSet;

async fn get_pending_activations(num_activations: u32, num_workers: u32) {
let url = if cfg!(feature = "bench-with-mnt-disk") {
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
format!(
"/mnt/disks/sqlite/{}-{}.sqlite",
Utc::now(),
rng.r#gen::<u64>()
rng.random::<u64>()
)
} else {
generate_temp_filename()
Expand Down Expand Up @@ -78,11 +78,11 @@ async fn set_status(num_activations: u32, num_workers: u32) {
assert!(num_activations.is_multiple_of(num_workers));

let url = if cfg!(feature = "bench-with-mnt-disk") {
let mut rng = rand::thread_rng();
let mut rng = rand::rng();
format!(
"/mnt/disks/sqlite/{}-{}.sqlite",
Utc::now(),
rng.r#gen::<u64>()
rng.random::<u64>()
)
} else {
generate_temp_filename()
Expand Down
9 changes: 9 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
kafka_topic: "test-topic"
push: true
default_metrics_tags:
host: "127.0.0.1"
log_filter: "debug,sqlx=debug,librdkafka=warn,h2=off"
# workers:
# - "http://127.0.0.1:50052"
# - "http://127.0.0.1:50053"
# - "http://127.0.0.1:50054"
8 changes: 8 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ pub struct Config {

/// Enable additional metrics for the sqlite.
pub enable_sqlite_status_metrics: bool,

/// Enable push mode.
pub push: bool,

/// Worker addresses.
pub workers: Vec<String>,
Comment on lines +221 to +222
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we have a static list of workers? With the workers generally running behind an horizontal pod scaler we may not know how many workers will be online, and the number & names of those workers will not be fixed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, workers can use the AddWorker and RemoveWorker RPC endpoints to add themselves on startup and to remove themselves on shutdown. This field is just something I've been using for testing, or if it's useful this way, as a list of initial workers we definitely want to connect to.

}

impl Default for Config {
Expand Down Expand Up @@ -279,6 +285,8 @@ impl Default for Config {
full_vacuum_on_upkeep: true,
vacuum_interval_ms: 30000,
enable_sqlite_status_metrics: true,
push: false,
workers: vec![],
}
}
}
Expand Down
42 changes: 40 additions & 2 deletions src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,57 @@ use chrono::Utc;
use prost::Message;
use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService;
use sentry_protos::taskbroker::v1::{
FetchNextTask, GetTaskRequest, GetTaskResponse, SetTaskStatusRequest, SetTaskStatusResponse,
AddWorkerRequest, AddWorkerResponse, FetchNextTask, GetTaskRequest, GetTaskResponse,
RemoveWorkerRequest, RemoveWorkerResponse, SetTaskStatusRequest, SetTaskStatusResponse,
TaskActivation, TaskActivationStatus,
};
use std::sync::Arc;
use std::time::Instant;
use tonic::{Request, Response, Status};

use crate::pool::WorkerPool;
use crate::store::inflight_activation::{InflightActivationStatus, InflightActivationStore};
use tracing::{error, instrument};
use tracing::{debug, error, instrument};

pub struct TaskbrokerServer {
pub store: Arc<InflightActivationStore>,
pub pool: Arc<WorkerPool>,
pub push: bool,
}

#[tonic::async_trait]
impl ConsumerService for TaskbrokerServer {
#[instrument(skip_all)]
async fn add_worker(
&self,
request: Request<AddWorkerRequest>,
) -> Result<Response<AddWorkerResponse>, Status> {
let address = &request.get_ref().address;
self.pool.add_worker(address).await;
Ok(Response::new(AddWorkerResponse {}))
}

#[instrument(skip_all)]
async fn remove_worker(
&self,
request: Request<RemoveWorkerRequest>,
) -> Result<Response<RemoveWorkerResponse>, Status> {
let address = &request.get_ref().address;
self.pool.remove_worker(address);
Ok(Response::new(RemoveWorkerResponse {}))
}

#[instrument(skip_all)]
async fn get_task(
&self,
request: Request<GetTaskRequest>,
) -> Result<Response<GetTaskResponse>, Status> {
if self.push {
return Err(Status::failed_precondition(
"get_task is not available in push mode",
));
}

let start_time = Instant::now();
let namespace = &request.get_ref().namespace;
let inflight = self
Expand Down Expand Up @@ -67,6 +97,8 @@ impl ConsumerService for TaskbrokerServer {
let start_time = Instant::now();
let id = request.get_ref().id.clone();

debug!("Received task status {} for {id}", request.get_ref().status);

let status: InflightActivationStatus =
TaskActivationStatus::try_from(request.get_ref().status)
.map_err(|e| {
Expand All @@ -83,6 +115,8 @@ impl ConsumerService for TaskbrokerServer {
metrics::counter!("grpc_server.set_status.failure").increment(1);
}

debug!("Status of task {id} set to {:?}", status);

let update_result = self.store.set_status(&id, status).await;
if let Err(e) = update_result {
error!(
Expand All @@ -101,6 +135,10 @@ impl ConsumerService for TaskbrokerServer {
return Ok(Response::new(SetTaskStatusResponse { task: None }));
};

if self.push {
return Ok(Response::new(SetTaskStatusResponse { task: None }));
}

let start_time = Instant::now();
let res = match self
.store
Expand Down
45 changes: 39 additions & 6 deletions src/grpc/server_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@ use tonic::{Code, Request};

use crate::grpc::server::TaskbrokerServer;

use crate::test_utils::{create_test_store, make_activations};
use crate::test_utils::{create_pool, create_test_store, make_activations};

#[tokio::test]
async fn test_get_task() {
let store = create_test_store().await;
let service = TaskbrokerServer { store };
let pool = create_pool();

let service = TaskbrokerServer {
store,
pool,
push: false,
};
let request = GetTaskRequest { namespace: None };
let response = service.get_task(Request::new(request)).await;
assert!(response.is_err());
Expand All @@ -22,11 +28,18 @@ async fn test_get_task() {
#[allow(deprecated)]
async fn test_set_task_status() {
let store = create_test_store().await;
let service = TaskbrokerServer { store };
let pool = create_pool();

let service = TaskbrokerServer {
store,
pool,
push: false,
};
let request = SetTaskStatusRequest {
id: "test_task".to_string(),
status: 5, // Complete
fetch_next_task: None,
address: "http://127.0.0.1:50052".into(),
};
let response = service.set_task_status(Request::new(request)).await;
assert!(response.is_ok());
Expand All @@ -38,11 +51,18 @@ async fn test_set_task_status() {
#[allow(deprecated)]
async fn test_set_task_status_invalid() {
let store = create_test_store().await;
let service = TaskbrokerServer { store };
let pool = create_pool();

let service = TaskbrokerServer {
store,
pool,
push: false,
};
let request = SetTaskStatusRequest {
id: "test_task".to_string(),
status: 1, // Invalid
fetch_next_task: None,
address: "http://127.0.0.1:50052".into(),
};
let response = service.set_task_status(Request::new(request)).await;
assert!(response.is_err());
Expand All @@ -58,10 +78,16 @@ async fn test_set_task_status_invalid() {
#[allow(deprecated)]
async fn test_get_task_success() {
let store = create_test_store().await;
let pool = create_pool();

let activations = make_activations(1);
store.store(activations).await.unwrap();

let service = TaskbrokerServer { store };
let service = TaskbrokerServer {
store,
pool,
push: false,
};
let request = GetTaskRequest { namespace: None };
let response = service.get_task(Request::new(request)).await;
assert!(response.is_ok());
Expand All @@ -75,10 +101,16 @@ async fn test_get_task_success() {
#[allow(deprecated)]
async fn test_set_task_status_success() {
let store = create_test_store().await;
let pool = create_pool();

let activations = make_activations(2);
store.store(activations).await.unwrap();

let service = TaskbrokerServer { store };
let service = TaskbrokerServer {
store,
pool,
push: false,
};

let request = GetTaskRequest { namespace: None };
let response = service.get_task(Request::new(request)).await;
Expand All @@ -92,6 +124,7 @@ async fn test_set_task_status_success() {
id: "id_0".to_string(),
status: 5, // Complete
fetch_next_task: Some(FetchNextTask { namespace: None }),
address: "http://127.0.0.1:50052".into(),
};
let response = service.set_task_status(Request::new(request)).await;
assert!(response.is_ok());
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pub mod grpc;
pub mod kafka;
pub mod logging;
pub mod metrics;
pub mod pool;
pub mod push;
pub mod runtime_config;
pub mod store;
pub mod test_utils;
Expand Down
Loading