diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index f5cd19ec89e..8e66e71046b 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5682,7 +5682,7 @@ checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" [[package]] name = "ownedbytes" version = "0.9.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=dabcaa5#dabcaa58093a3f7f10e98a5a3b06cfe2370482f9" +source = "git+https://github.com/SekoiaLab/tantivy/?rev=bbdf83e#bbdf83e82e6ddc334727c35df6cc55452319ccc6" dependencies = [ "stable_deref_trait", ] @@ -7164,6 +7164,7 @@ dependencies = [ "quickwit-serve", "quickwit-storage", "rand 0.8.5", + "rdkafka", "reqwest 0.12.23", "rustls 0.23.31", "serde_json", @@ -9570,7 +9571,7 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" [[package]] name = "tantivy" version = "0.25.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=dabcaa5#dabcaa58093a3f7f10e98a5a3b06cfe2370482f9" +source = "git+https://github.com/SekoiaLab/tantivy/?rev=bbdf83e#bbdf83e82e6ddc334727c35df6cc55452319ccc6" dependencies = [ "aho-corasick", "arc-swap", @@ -9625,7 +9626,7 @@ dependencies = [ [[package]] name = "tantivy-bitpacker" version = "0.9.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=dabcaa5#dabcaa58093a3f7f10e98a5a3b06cfe2370482f9" +source = "git+https://github.com/SekoiaLab/tantivy/?rev=bbdf83e#bbdf83e82e6ddc334727c35df6cc55452319ccc6" dependencies = [ "bitpacking", ] @@ -9633,7 +9634,7 @@ dependencies = [ [[package]] name = "tantivy-columnar" version = "0.6.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=dabcaa5#dabcaa58093a3f7f10e98a5a3b06cfe2370482f9" +source = "git+https://github.com/SekoiaLab/tantivy/?rev=bbdf83e#bbdf83e82e6ddc334727c35df6cc55452319ccc6" dependencies = [ "downcast-rs", "fastdivide", @@ -9648,7 +9649,7 @@ dependencies = [ [[package]] name = "tantivy-common" version = "0.10.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=dabcaa5#dabcaa58093a3f7f10e98a5a3b06cfe2370482f9" +source = "git+https://github.com/SekoiaLab/tantivy/?rev=bbdf83e#bbdf83e82e6ddc334727c35df6cc55452319ccc6" dependencies = [ "async-trait", "byteorder", @@ -9671,7 +9672,7 @@ dependencies = [ [[package]] name = "tantivy-query-grammar" version = "0.25.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=dabcaa5#dabcaa58093a3f7f10e98a5a3b06cfe2370482f9" +source = "git+https://github.com/SekoiaLab/tantivy/?rev=bbdf83e#bbdf83e82e6ddc334727c35df6cc55452319ccc6" dependencies = [ "fnv", "nom 7.1.3", @@ -9683,7 +9684,7 @@ dependencies = [ [[package]] name = "tantivy-sstable" version = "0.6.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=dabcaa5#dabcaa58093a3f7f10e98a5a3b06cfe2370482f9" +source = "git+https://github.com/SekoiaLab/tantivy/?rev=bbdf83e#bbdf83e82e6ddc334727c35df6cc55452319ccc6" dependencies = [ "futures-util", "itertools 0.14.0", @@ -9696,7 +9697,7 @@ dependencies = [ [[package]] name = "tantivy-stacker" version = "0.6.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=dabcaa5#dabcaa58093a3f7f10e98a5a3b06cfe2370482f9" +source = "git+https://github.com/SekoiaLab/tantivy/?rev=bbdf83e#bbdf83e82e6ddc334727c35df6cc55452319ccc6" dependencies = [ "murmurhash32", "rand_distr", @@ -9706,7 +9707,7 @@ dependencies = [ [[package]] name = "tantivy-tokenizer-api" version = "0.6.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=dabcaa5#dabcaa58093a3f7f10e98a5a3b06cfe2370482f9" +source = "git+https://github.com/SekoiaLab/tantivy/?rev=bbdf83e#bbdf83e82e6ddc334727c35df6cc55452319ccc6" dependencies = [ "serde", ] diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 89e1429d9ab..e7ed41793c3 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -346,7 +346,7 @@ quickwit-serve = { path = "quickwit-serve" } quickwit-storage = { path = "quickwit-storage" } quickwit-telemetry = { path = "quickwit-telemetry" } -tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "dabcaa5", default-features = false, features = [ +tantivy = { git = "https://github.com/SekoiaLab/tantivy/", rev = "bbdf83e", default-features = false, features = [ "lz4-compression", "mmap", "quickwit", diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index da5521c0e8c..d32db8a9e45 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -552,6 +552,7 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> { sort_by, count_all: CountHits::CountAll, allow_failed_splits: false, + split_id: None, }; let search_request = search_request_from_api_request(vec![args.index_id], search_request_query_string)?; diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index bb8a17daaeb..ae46c331a01 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -268,6 +268,7 @@ pub struct SearcherConfig { pub split_footer_cache_capacity: ByteSize, pub partial_request_cache_capacity: ByteSize, pub max_num_concurrent_split_searches: usize, + pub max_total_split_searches: Option, // Deprecated: stream search requests are no longer supported. #[serde(alias = "max_num_concurrent_split_streams", default, skip_serializing)] pub _max_num_concurrent_split_streams: Option, @@ -325,6 +326,7 @@ impl Default for SearcherConfig { split_footer_cache_capacity: ByteSize::mb(500), partial_request_cache_capacity: ByteSize::mb(64), max_num_concurrent_split_searches: 100, + max_total_split_searches: None, _max_num_concurrent_split_streams: None, aggregation_memory_limit: ByteSize::mb(500), aggregation_bucket_limit: 65000, diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index b5f39ceb0ac..b81e25390a6 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -662,6 +662,7 @@ mod tests { split_footer_cache_capacity: ByteSize::gb(1), partial_request_cache_capacity: ByteSize::mb(64), max_num_concurrent_split_searches: 150, + max_total_split_searches: None, _max_num_concurrent_split_streams: Some(serde::de::IgnoredAny), split_cache: None, request_timeout_secs: NonZeroU64::new(30).unwrap(), diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 24992209b9c..f22ecd98ab0 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -51,6 +51,9 @@ pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration = Duration::from_secs(30) }; +/// That's 80% of a pipeline capacity +const MAX_LOAD_PER_PIPELINE: CpuCapacity = CpuCapacity::from_cpu_millis(3_200); + #[derive(Debug, Clone, Default, Serialize)] pub struct IndexingSchedulerState { pub num_applied_physical_indexing_plan: usize, @@ -257,8 +260,12 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { source_uid, source_type: SourceToScheduleType::NonSharded { num_pipelines: source_config.num_pipelines.get() as u32, - // FIXME - load_per_pipeline: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis()) + // FIXME: + // - implementing adaptative load contains the risk of generating + // rebalancing storms for sources like Kafka + // - this is coupled with the scheduling logic that misses the notion of + // pipeline + load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()) .unwrap(), }, params_fingerprint, diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/README.md b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/README.md index f4ec64010a9..58ff4ae62d0 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/README.md +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/README.md @@ -15,8 +15,9 @@ We also want to observe some interesting properties such as: To simplify the logic and make it easier to test it, we first abstract this in the following optimization problem. In Quickwit, we have two types of source: -- The push api source: they have a given (changing) set of shards associated to them. - A shard is rate-limited to ensure their throughput is lower than `5MB/s` worth of +- The push api source: indexes have a given (changing) set of shards associated to them. + Shards are stored on indexer nodes and are spread randomly accross them. A shard is + rate-limited to ensure their throughput is lower than `5MB/s` worth of uncompressed data. This guarantees that a given shard can be indexed by a single indexing pipeline. diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index ca5ac4d1fc7..eb7875097e0 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -757,8 +757,8 @@ mod tests { convert_scheduling_solution_to_physical_plan_single_node_single_source, }; use crate::indexing_plan::PhysicalIndexingPlan; - use crate::indexing_scheduler::get_shard_locality_metrics; use crate::indexing_scheduler::scheduling::assign_shards; + use crate::indexing_scheduler::{MAX_LOAD_PER_PIPELINE, get_shard_locality_metrics}; use crate::model::ShardLocations; fn source_id() -> SourceUid { @@ -939,6 +939,146 @@ mod tests { } } + #[test] + fn test_build_physical_plan_with_pipeline_limit() { + let indexer1 = "indexer1".to_string(); + let indexer2 = "indexer2".to_string(); + let source_uid0 = source_id(); + let source_uid1 = source_id(); + let source_0 = SourceToSchedule { + source_uid: source_uid0.clone(), + source_type: SourceToScheduleType::Sharded { + shard_ids: (0..16).map(ShardId::from).collect(), + load_per_shard: NonZeroU32::new(800).unwrap(), + }, + params_fingerprint: 0, + }; + let source_1 = SourceToSchedule { + source_uid: source_uid1.clone(), + source_type: SourceToScheduleType::NonSharded { + num_pipelines: 4, + load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap(), + }, + params_fingerprint: 0, + }; + let mut indexer_id_to_cpu_capacities = FnvHashMap::default(); + indexer_id_to_cpu_capacities.insert(indexer1.clone(), mcpu(16_000)); + indexer_id_to_cpu_capacities.insert(indexer2.clone(), mcpu(16_000)); + let shard_locations = ShardLocations::default(); + let indexing_plan = build_physical_indexing_plan( + &[source_0, source_1], + &indexer_id_to_cpu_capacities, + None, + &shard_locations, + ); + assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 2); + + let node1_plan = indexing_plan.indexer(&indexer1).unwrap(); + let node2_plan = indexing_plan.indexer(&indexer2).unwrap(); + + let source_0_on_node1 = node1_plan + .iter() + .filter(|task| task.source_id == source_uid0.source_id) + .count(); + let source_0_on_node2 = node2_plan + .iter() + .filter(|task| task.source_id == source_uid0.source_id) + .count(); + assert!(source_0_on_node1 <= 3); + assert!(source_0_on_node2 <= 3); + assert_eq!(source_0_on_node1 + source_0_on_node2, 4); + + let source_1_on_node1 = node1_plan + .iter() + .filter(|task| task.source_id == source_uid1.source_id) + .count(); + let source_1_on_node2 = node2_plan + .iter() + .filter(|task| task.source_id == source_uid1.source_id) + .count(); + assert!(source_1_on_node1 <= 3); + assert!(source_1_on_node2 <= 3); + assert_eq!(source_1_on_node1 + source_1_on_node2, 4); + } + + #[test] + fn test_build_physical_plan_second_iteration() { + let indexer1 = "indexer1".to_string(); + let indexer2 = "indexer2".to_string(); + let indexer3 = "indexer3".to_string(); + let mut sources = Vec::new(); + for _ in 0..10 { + sources.push(SourceToSchedule { + source_uid: source_id(), + source_type: SourceToScheduleType::NonSharded { + num_pipelines: 4, + load_per_pipeline: NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap(), + }, + params_fingerprint: 0, + }); + } + let mut indexer_id_to_cpu_capacities = FnvHashMap::default(); + indexer_id_to_cpu_capacities.insert(indexer1.clone(), mcpu(16_000)); + indexer_id_to_cpu_capacities.insert(indexer2.clone(), mcpu(16_000)); + indexer_id_to_cpu_capacities.insert(indexer3.clone(), mcpu(16_000)); + let shard_locations = ShardLocations::default(); + let indexing_plan = build_physical_indexing_plan( + &sources, + &indexer_id_to_cpu_capacities, + None, + &shard_locations, + ); + assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 3); + + for source in &sources { + let pipelines_per_indexer_for_source = indexing_plan + .indexing_tasks_per_indexer() + .values() + .map(|tasks| { + tasks + .iter() + .filter(|t| t.source_id == source.source_uid.source_id) + .count() + }) + .collect_vec(); + assert!(pipelines_per_indexer_for_source.contains(&3)); + assert!(pipelines_per_indexer_for_source.contains(&1)); + assert!(pipelines_per_indexer_for_source.contains(&0)); + assert_eq!(pipelines_per_indexer_for_source.iter().sum::(), 4); + } + + for source in &mut sources { + if let SourceToScheduleType::NonSharded { num_pipelines, .. } = &mut source.source_type + { + *num_pipelines = 5; + } + } + + let new_indexing_plan = build_physical_indexing_plan( + &sources, + &indexer_id_to_cpu_capacities, + Some(&indexing_plan), + &shard_locations, + ); + + for source in &sources { + let pipelines_per_indexer_for_source = new_indexing_plan + .indexing_tasks_per_indexer() + .values() + .map(|tasks| { + tasks + .iter() + .filter(|t| t.source_id == source.source_uid.source_id) + .count() + }) + .collect_vec(); + assert!(pipelines_per_indexer_for_source.contains(&3)); + assert!(pipelines_per_indexer_for_source.contains(&2)); + assert!(pipelines_per_indexer_for_source.contains(&0)); + assert_eq!(pipelines_per_indexer_for_source.iter().sum::(), 5); + } + } + fn make_indexing_tasks( source_uid: &SourceUid, shards: &[(PipelineUid, &[ShardId])], diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs index a8a5c4f64c0..bb668d53204 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs @@ -12,14 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp::Reverse; +use std::cmp::{Ordering, Reverse}; use std::collections::BTreeMap; use std::collections::btree_map::Entry; -use itertools::Itertools; use quickwit_proto::indexing::CpuCapacity; use super::scheduling_logic_model::*; +use crate::indexing_scheduler::MAX_LOAD_PER_PIPELINE; use crate::indexing_scheduler::scheduling::inflate_node_capacities_if_necessary; // ------------------------------------------------------------------------------------ @@ -229,6 +229,44 @@ fn assert_enforce_nodes_cpu_capacity_post_condition( // If this algorithm fails to place all remaining shards, we inflate // the node capacities by 20% in the scheduling problem and start from the beginning. +#[derive(Debug, PartialEq, Eq)] +struct PlacementCandidate { + indexer_ord: IndexerOrd, + current_num_shards: u32, + available_capacity: CpuCapacity, + affinity: u32, +} + +impl Ord for PlacementCandidate { + fn cmp(&self, other: &Self) -> Ordering { + // Higher affinity is better + match self.affinity.cmp(&other.affinity) { + Ordering::Equal => {} + ordering => return ordering.reverse(), + } + // If tie, pick the node with shards already assigned first + let current_shard_presence = self.current_num_shards.clamp(0, 1); + let other_shard_presence = other.current_num_shards.clamp(0, 1); + match current_shard_presence.cmp(&other_shard_presence) { + Ordering::Equal => {} + ordering => return ordering.reverse(), + } + // If tie, pick the node with the highest available capacity + match self.available_capacity.cmp(&other.available_capacity) { + Ordering::Equal => {} + ordering => return ordering.reverse(), + } + // Final tie-breaker: indexer ID for deterministic ordering + self.indexer_ord.cmp(&other.indexer_ord).reverse() + } +} + +impl PartialOrd for PlacementCandidate { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + fn attempt_place_unassigned_shards( unassigned_shards: &[Source], problem: &SchedulingProblem, @@ -236,12 +274,27 @@ fn attempt_place_unassigned_shards( ) -> Result { let mut solution = partial_solution.clone(); for source in unassigned_shards { - let indexers_with_most_available_capacity = - compute_indexer_available_capacity(problem, &solution) - .sorted_by_key(|(indexer_ord, capacity)| Reverse((*capacity, *indexer_ord))); + let mut placements: Vec = solution + .indexer_assignments + .iter() + .map(|indexer_assignment: &IndexerAssignment| { + let available_capacity = indexer_assignment.indexer_available_capacity(problem); + assert!(available_capacity >= 0i32); + let available_capacity = CpuCapacity::from_cpu_millis(available_capacity as u32); + let current_num_shards = indexer_assignment.num_shards(source.source_ord); + PlacementCandidate { + affinity: 0, + current_num_shards, + available_capacity, + indexer_ord: indexer_assignment.indexer_ord, + } + }) + .collect(); + placements.sort(); place_unassigned_shards_single_source( source, - indexers_with_most_available_capacity, + &placements, + problem.num_indexers(), &mut solution, )?; } @@ -259,25 +312,29 @@ fn place_unassigned_shards_with_affinity( Reverse(load) }); for source in &unassigned_shards { - // List of indexer with a non-null affinity and some available capacity, sorted by - // (affinity, available capacity) in that order. - let indexers_with_affinity_and_available_capacity = source + let mut placements: Vec = source .affinities .iter() .filter(|&(_, &affinity)| affinity != 0u32) - .map(|(&indexer_ord, affinity)| { + .map(|(&indexer_ord, &affinity)| { let available_capacity = solution.indexer_assignments[indexer_ord].indexer_available_capacity(problem); - let capacity = CpuCapacity::from_cpu_millis(available_capacity as u32); - (indexer_ord, affinity, capacity) - }) - .sorted_by_key(|(indexer_ord, affinity, capacity)| { - Reverse((*affinity, *capacity, *indexer_ord)) + let available_capacity = CpuCapacity::from_cpu_millis(available_capacity as u32); + let current_num_shards = + solution.indexer_assignments[indexer_ord].num_shards(source.source_ord); + PlacementCandidate { + affinity, + current_num_shards, + available_capacity, + indexer_ord, + } }) - .map(|(indexer_ord, _, capacity)| (indexer_ord, capacity)); + .collect(); + placements.sort(); let _ = place_unassigned_shards_single_source( source, - indexers_with_affinity_and_available_capacity, + &placements, + problem.num_indexers(), solution, ); } @@ -350,22 +407,40 @@ struct NotEnoughCapacity; /// amongst the node with their given node capacity. fn place_unassigned_shards_single_source( source: &Source, - mut indexer_with_capacities: impl Iterator, + sorted_candidates: &[PlacementCandidate], + num_indexers: usize, solution: &mut SchedulingSolution, ) -> Result<(), NotEnoughCapacity> { let mut num_shards = source.num_shards; - while num_shards > 0 { - let Some((indexer_ord, available_capacity)) = indexer_with_capacities.next() else { - return Err(NotEnoughCapacity); - }; - let num_placable_shards = available_capacity.cpu_millis() / source.load_per_shard; - let num_shards_to_place = num_placable_shards.min(num_shards); + // To ensure that merges can keep up, we try not to assign more than 3 + // pipelines per indexer for a source (except if there aren't enough nodes). + let target_limit_num_shards_per_indexer_per_source = + 3 * MAX_LOAD_PER_PIPELINE.cpu_millis() / source.load_per_shard.get(); + let limit_num_shards_per_indexer_per_source = target_limit_num_shards_per_indexer_per_source + .max(num_shards.div_ceil(num_indexers as u32)); + for PlacementCandidate { + indexer_ord, + available_capacity, + current_num_shards, + .. + } in sorted_candidates + { + let num_placable_shards_for_available_capacity = + available_capacity.cpu_millis() / source.load_per_shard; + let num_placable_shards_for_limit = + limit_num_shards_per_indexer_per_source.saturating_sub(*current_num_shards); + let num_shards_to_place = num_shards + .min(num_placable_shards_for_available_capacity) + .min(num_placable_shards_for_limit); // Update the solution, the shard load, and the number of shards to place. - solution.indexer_assignments[indexer_ord] + solution.indexer_assignments[*indexer_ord] .add_shards(source.source_ord, num_shards_to_place); num_shards -= num_shards_to_place; + if num_shards == 0 { + return Ok(()); + } } - Ok(()) + Err(NotEnoughCapacity) } /// Compute the sources/shards that have not been assigned to any indexer yet. @@ -394,30 +469,11 @@ fn compute_unassigned_sources( unassigned_sources.into_values().collect() } -/// Builds a BinaryHeap with the different indexer capacities. -/// -/// Panics if one of the indexer is over-assigned. -fn compute_indexer_available_capacity<'a>( - problem: &'a SchedulingProblem, - solution: &'a SchedulingSolution, -) -> impl Iterator + 'a { - solution - .indexer_assignments - .iter() - .map(|indexer_assignment| { - let available_capacity: i32 = indexer_assignment.indexer_available_capacity(problem); - assert!(available_capacity >= 0i32); - ( - indexer_assignment.indexer_ord, - CpuCapacity::from_cpu_millis(available_capacity as u32), - ) - }) -} - #[cfg(test)] mod tests { use std::num::NonZeroU32; + use itertools::Itertools; use proptest::prelude::*; use quickwit_proto::indexing::mcpu; @@ -602,6 +658,27 @@ mod tests { assert_eq!(solution.indexer_assignments[1].num_shards(0), 4); } + #[test] + fn test_placement_limit_with_affinity() { + let mut problem = + SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(16_000), mcpu(16_000)]); + let max_load_per_pipeline = NonZeroU32::new(MAX_LOAD_PER_PIPELINE.cpu_millis()).unwrap(); + problem.add_source(4, max_load_per_pipeline); + problem.add_source(4, max_load_per_pipeline); + problem.inc_affinity(0, 1); + problem.inc_affinity(0, 1); + problem.inc_affinity(0, 0); + problem.inc_affinity(1, 0); + let mut solution = problem.new_solution(); + place_unassigned_shards_with_affinity(&problem, &mut solution); + assert_eq!(solution.indexer_assignments[0].num_shards(1), 3); + assert_eq!(solution.indexer_assignments[0].num_shards(0), 1); + assert_eq!(solution.indexer_assignments[1].num_shards(0), 3); + // one shard was not placed because indexer 0 was full and it had no + // affinity with indexer 1 + assert_eq!(solution.indexer_assignments[1].num_shards(1), 0); + } + #[test] fn test_place_unassigned_shards_reach_capacity() { let mut problem = @@ -807,4 +884,35 @@ mod tests { assert_eq!(solution.capacity_scaling_iterations, 1); } + + #[test] + fn test_shard_fragmentation_when_iterating() { + // Create a problem where affinity constraints cause suboptimal placement + // requiring iterative scaling despite initial capacity scaling. + let mut problem = + SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(3000), mcpu(3000)]); + problem.add_source(1, NonZeroU32::new(1000).unwrap()); + problem.add_source(1, NonZeroU32::new(1000).unwrap()); + problem.add_source(1, NonZeroU32::new(1000).unwrap()); + let empty_solution = problem.new_solution(); + let first_solution = solve(problem, empty_solution); + + let mut updated_problem = + SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(3000), mcpu(3000)]); + updated_problem.add_source(2, NonZeroU32::new(1000).unwrap()); + updated_problem.add_source(2, NonZeroU32::new(1000).unwrap()); + updated_problem.add_source(2, NonZeroU32::new(1000).unwrap()); + + let second_solution = solve(updated_problem, first_solution); + + for source in 0..2 { + let num_shards_per_indexer = second_solution + .indexer_assignments + .iter() + .map(|indexer_assignment| indexer_assignment.num_shards(source)) + .collect_vec(); + assert!(num_shards_per_indexer.contains(&2)); + assert!(num_shards_per_indexer.contains(&0)); + } + } } diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 5d307fb200b..10e2e692beb 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -31,7 +31,7 @@ use quickwit_common::io::Limiter; use quickwit_common::pubsub::EventBroker; use quickwit_common::{io, temp_dir}; use quickwit_config::{ - INGEST_API_SOURCE_ID, IndexConfig, IndexerConfig, SourceConfig, build_doc_mapper, + INGEST_API_SOURCE_ID, IndexConfig, IndexerConfig, SourceConfig, SourceParams, build_doc_mapper, indexing_pipeline_params_fingerprint, }; use quickwit_ingest::{ @@ -289,8 +289,26 @@ impl IndexingService { let message = format!("failed to spawn indexing pipeline: {error}"); IndexingError::Internal(message) })?; - let merge_policy = - crate::merge_policy::merge_policy_from_settings(&index_config.indexing_settings); + + let mut indexing_settings = index_config.indexing_settings.clone(); + if let SourceParams::Kafka(kafka_params) = &source_config.source_params { + if let Some(indexing_settings_value) = + kafka_params.client_params.get("indexing_settings") + { + if let Ok(indexing_pipeline) = + serde_json::from_value(indexing_settings_value.clone()) + { + indexing_settings = indexing_pipeline; + } else { + error!( + index_id = indexing_pipeline_id.index_uid.index_id, + source_id = indexing_pipeline_id.source_id, + "source level override of indexing_settings failed, deserialization error" + ); + } + } + } + let merge_policy = crate::merge_policy::merge_policy_from_settings(&indexing_settings); let retention_policy = index_config.retention_policy_opt.clone(); let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone()); @@ -345,7 +363,7 @@ impl IndexingService { // Indexing-related parameters doc_mapper, indexing_directory, - indexing_settings: index_config.indexing_settings.clone(), + indexing_settings, split_store, max_concurrent_split_uploads_index, cooperative_indexing_permits: self.cooperative_indexing_permits.clone(), diff --git a/quickwit/quickwit-indexing/src/source/kafka_source.rs b/quickwit/quickwit-indexing/src/source/kafka_source.rs index bc5b400a9bc..5086616ebed 100644 --- a/quickwit/quickwit-indexing/src/source/kafka_source.rs +++ b/quickwit/quickwit-indexing/src/source/kafka_source.rs @@ -714,6 +714,10 @@ fn parse_client_params(client_params: JsonValue) -> anyhow::Result }; let mut client_config = ClientConfig::new(); for (key, value_json) in params { + if key == "indexing_settings" { + // used for QW per source settings override workaround + continue; + } let value = match value_json { JsonValue::Bool(value_bool) => value_bool.to_string(), JsonValue::Number(value_number) => value_number.to_string(), @@ -834,6 +838,7 @@ mod kafka_broker_tests { { let producer: &FutureProducer = &ClientConfig::new() .set("bootstrap.servers", "localhost:9092") + .set("broker.address.family", "v4") .set("statistics.interval.ms", "500") .set("api.version.request", "true") .set("debug", "all") @@ -1169,7 +1174,7 @@ mod kafka_broker_tests { #[tokio::test] async fn test_kafka_source_suggest_truncate() { let admin_client = create_admin_client(); - let topic = append_random_suffix("test-kafka-source--suggest-truncate--topic"); + let topic = append_random_suffix("test--source--suggest-truncate--topic"); create_topic(&admin_client, &topic, 2).await.unwrap(); let metastore = metastore_for_test(); diff --git a/quickwit/quickwit-integration-tests/Cargo.toml b/quickwit/quickwit-integration-tests/Cargo.toml index 55308cff556..46c645ec10b 100644 --- a/quickwit/quickwit-integration-tests/Cargo.toml +++ b/quickwit/quickwit-integration-tests/Cargo.toml @@ -11,6 +11,9 @@ authors.workspace = true license.workspace = true [features] +kafka-broker-tests = [ + "quickwit-indexing/kafka", +] sqs-localstack-tests = [ "quickwit-indexing/sqs", "quickwit-indexing/sqs-localstack-tests", @@ -26,6 +29,7 @@ hyper = { workspace = true } hyper-util = { workspace = true } itertools = { workspace = true } rand = { workspace = true } +rdkafka = { workspace = true } reqwest = { workspace = true } rustls = { workspace = true } serde_json = { workspace = true } diff --git a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs index 36fbadbbcea..b957bd8e348 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs @@ -48,7 +48,7 @@ use reqwest::Url; use serde_json::Value; use tempfile::TempDir; use tokio::net::TcpListener; -use tracing::debug; +use tracing::{debug, warn}; use super::shutdown::NodeShutdownHandle; @@ -178,7 +178,7 @@ impl ResolvedClusterConfig { pub async fn start(self) -> ClusterSandbox { rustls::crypto::ring::default_provider() .install_default() - .expect("rustls crypto ring default provider installation should not fail"); + .unwrap_or_else(|_| warn!("failed to install default ring crypto provider")); let mut node_shutdown_handles = Vec::new(); let runtimes_config = RuntimesConfig::light_for_tests(); diff --git a/quickwit/quickwit-integration-tests/src/tests/kafka_tests.rs b/quickwit/quickwit-integration-tests/src/tests/kafka_tests.rs new file mode 100644 index 00000000000..c2c95a3f769 --- /dev/null +++ b/quickwit/quickwit-integration-tests/src/tests/kafka_tests.rs @@ -0,0 +1,290 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use quickwit_common::rand::append_random_suffix; +use quickwit_common::test_utils::wait_until_predicate; +use quickwit_config::ConfigFormat; +use quickwit_config::service::QuickwitService; +use quickwit_metastore::SplitState; +use quickwit_serve::ListSplitsQueryParams; +use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; +use rdkafka::client::DefaultClientContext; +use rdkafka::config::ClientConfig; +use rdkafka::producer::{FutureProducer, FutureRecord}; + +use crate::test_utils::ClusterSandboxBuilder; + +fn create_admin_client() -> AdminClient { + ClientConfig::new() + .set("bootstrap.servers", "localhost:9092") + .set("broker.address.family", "v4") + .create() + .unwrap() +} + +async fn create_topic( + admin_client: &AdminClient, + topic: &str, + num_partitions: i32, +) -> anyhow::Result<()> { + admin_client + .create_topics( + &[NewTopic::new( + topic, + num_partitions, + TopicReplication::Fixed(1), + )], + &AdminOptions::new().operation_timeout(Some(Duration::from_secs(5))), + ) + .await? + .into_iter() + .collect::, _>>() + .map_err(|(topic, err_code)| { + anyhow::anyhow!( + "failed to create topic `{}`. error code: `{}`", + topic, + err_code + ) + })?; + Ok(()) +} + +async fn populate_topic(topic: &str) -> anyhow::Result<()> { + let producer: &FutureProducer = &ClientConfig::new() + .set("bootstrap.servers", "localhost:9092") + .set("broker.address.family", "v4") + .set("message.timeout.ms", "30000") + .create()?; + + let message = r#"{"message":"test","id":1}"#; + + producer + .send( + FutureRecord { + topic, + partition: None, + timestamp: None, + key: None::<&[u8]>, + payload: Some(message), + headers: None, + }, + Duration::from_secs(5), + ) + .await + .map_err(|(err, _)| err)?; + Ok(()) +} + +#[tokio::test] +async fn test_kafka_source() { + quickwit_common::setup_logging_for_tests(); + + let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; + let index_id = append_random_suffix("test-kafka-source"); + let topic = append_random_suffix("test-kafka-source-topic"); + + let admin_client = create_admin_client(); + create_topic(&admin_client, &topic, 1).await.unwrap(); + + let index_config = format!( + r#" + version: 0.8 + index_id: {index_id} + doc_mapping: + field_mappings: + - name: message + type: text + - name: id + type: i64 + indexing_settings: + commit_timeout_secs: 3 + "# + ); + + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .create(index_config.clone(), ConfigFormat::Yaml, false) + .await + .unwrap(); + + let source_id = "test-kafka-source-no-override"; + let source_config = format!( + r#" + version: 0.7 + source_id: {source_id} + desired_num_pipelines: 1 + max_num_pipelines_per_indexer: 1 + source_type: kafka + params: + topic: {topic} + client_params: + bootstrap.servers: localhost:9092 + broker.address.family: v4 + auto.offset.reset: earliest + enable.auto.commit: false + input_format: json + "# + ); + + sandbox + .rest_client(QuickwitService::Indexer) + .sources(&index_id) + .create(source_config, ConfigFormat::Yaml) + .await + .unwrap(); + + populate_topic(&topic).await.unwrap(); + + let result = wait_until_predicate( + || async { + let splits_query_params = ListSplitsQueryParams { + split_states: Some(vec![SplitState::Published]), + ..Default::default() + }; + sandbox + .rest_client(QuickwitService::Indexer) + .splits(&index_id) + .list(splits_query_params) + .await + .map(|splits| !splits.is_empty()) + .unwrap_or(false) + }, + Duration::from_secs(15), + Duration::from_millis(500), + ) + .await; + + assert!( + result.is_ok(), + "Splits should be published within 15 seconds using index config settings" + ); + + sandbox.assert_hit_count(&index_id, "", 1).await; + + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .delete(&index_id, false) + .await + .unwrap(); + + sandbox.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_kafka_source_with_indexing_settings_override() { + quickwit_common::setup_logging_for_tests(); + + let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; + let index_id = append_random_suffix("test-kafka-indexing-settings-override"); + let topic = append_random_suffix("test-kafka-indexing-settings-override-topic"); + + let admin_client = create_admin_client(); + create_topic(&admin_client, &topic, 1).await.unwrap(); + + // Create index with high commit_timeout (300 seconds) + // This would normally mean splits take 5 minutes to commit + let index_config = format!( + r#" + version: 0.8 + index_id: {index_id} + doc_mapping: + field_mappings: + - name: message + type: text + - name: id + type: i64 + indexing_settings: + commit_timeout_secs: 300 + "# + ); + + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .create(index_config.clone(), ConfigFormat::Yaml, false) + .await + .unwrap(); + + // Create Kafka source with indexing_settings override to lower commit_timeout to 3 seconds + // This tests that the source-level override works correctly + let source_id = "test-kafka-source"; + let source_config = format!( + r#" + version: 0.7 + source_id: {source_id} + desired_num_pipelines: 1 + max_num_pipelines_per_indexer: 1 + source_type: kafka + params: + topic: {topic} + client_params: + bootstrap.servers: localhost:9092 + broker.address.family: v4 + auto.offset.reset: earliest + enable.auto.commit: false + indexing_settings: + commit_timeout_secs: 3 + input_format: json + "# + ); + + sandbox + .rest_client(QuickwitService::Indexer) + .sources(&index_id) + .create(source_config, ConfigFormat::Yaml) + .await + .unwrap(); + + populate_topic(&topic).await.unwrap(); + + let result = wait_until_predicate( + || async { + let splits_query_params = ListSplitsQueryParams { + split_states: Some(vec![SplitState::Published]), + ..Default::default() + }; + sandbox + .rest_client(QuickwitService::Indexer) + .splits(&index_id) + .list(splits_query_params) + .await + .map(|splits| !splits.is_empty()) + .unwrap_or(false) + }, + Duration::from_secs(15), + Duration::from_millis(500), + ) + .await; + + assert!( + result.is_ok(), + "Splits should be published within 15 seconds when using indexing_settings override. If \ + this test fails, the override may not be working correctly." + ); + + sandbox.assert_hit_count(&index_id, "", 1).await; + + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .delete(&index_id, false) + .await + .unwrap(); + + sandbox.shutdown().await.unwrap(); +} diff --git a/quickwit/quickwit-integration-tests/src/tests/mod.rs b/quickwit/quickwit-integration-tests/src/tests/mod.rs index bbc5dcf814a..519537bb5fb 100644 --- a/quickwit/quickwit-integration-tests/src/tests/mod.rs +++ b/quickwit/quickwit-integration-tests/src/tests/mod.rs @@ -15,6 +15,8 @@ mod basic_tests; mod ingest_v1_tests; mod ingest_v2_tests; +#[cfg(feature = "kafka-broker-tests")] +mod kafka_tests; mod no_cp_tests; mod otlp_tests; #[cfg(feature = "sqs-localstack-tests")] diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index 04fa0cedf2e..7b543e9ed25 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -247,6 +247,8 @@ message SearchRequest { // When an exact index ID is provided (not a pattern), the query fails only if // that index is not found and this parameter is set to `false`. bool ignore_missing_indexes = 18; + + optional string split_id = 19; } enum CountHits { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index 07bbdb5ce21..bfba62dd35a 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -185,6 +185,8 @@ pub struct SearchRequest { /// that index is not found and this parameter is set to `false`. #[prost(bool, tag = "18")] pub ignore_missing_indexes: bool, + #[prost(string, optional, tag = "19")] + pub split_id: ::core::option::Option<::prost::alloc::string::String>, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Eq, Hash)] diff --git a/quickwit/quickwit-query/src/aggregations.rs b/quickwit/quickwit-query/src/aggregations.rs index eca275ad265..5318d7884c6 100644 --- a/quickwit/quickwit-query/src/aggregations.rs +++ b/quickwit/quickwit-query/src/aggregations.rs @@ -18,7 +18,8 @@ use tantivy::aggregation::Key as TantivyKey; use tantivy::aggregation::agg_result::{ AggregationResult as TantivyAggregationResult, AggregationResults as TantivyAggregationResults, BucketEntries as TantivyBucketEntries, BucketEntry as TantivyBucketEntry, - BucketResult as TantivyBucketResult, MetricResult as TantivyMetricResult, + BucketResult as TantivyBucketResult, CompositeBucketEntry as TantivyCompositeBucketEntry, + CompositeKey as TantivyCompositeKey, MetricResult as TantivyMetricResult, RangeBucketEntry as TantivyRangeBucketEntry, }; use tantivy::aggregation::metric::{ @@ -169,6 +170,13 @@ pub enum BucketResult { /// The upper bound error for the doc count of each term. doc_count_error_upper_bound: Option, }, + /// This is the composite aggregation result + Composite { + /// The buckets + buckets: Vec, + /// The key to start after when paginating + after_key: FxHashMap, + }, } impl From for BucketResult { @@ -189,6 +197,10 @@ impl From for BucketResult { sum_other_doc_count, doc_count_error_upper_bound, }, + TantivyBucketResult::Composite { buckets, after_key } => BucketResult::Composite { + buckets: buckets.into_iter().map(Into::into).collect(), + after_key: after_key.into_iter().map(|(k, v)| (k, v.into())).collect(), + }, } } } @@ -211,6 +223,10 @@ impl From for TantivyBucketResult { sum_other_doc_count, doc_count_error_upper_bound, }, + BucketResult::Composite { buckets, after_key } => TantivyBucketResult::Composite { + buckets: buckets.into_iter().map(Into::into).collect(), + after_key: after_key.into_iter().map(|(k, v)| (k, v.into())).collect(), + }, } } } @@ -410,3 +426,75 @@ impl From for TantivyPercentilesMetricResult { TantivyPercentilesMetricResult { values } } } + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum CompositeKey { + /// Boolean key + Bool(bool), + /// String key + Str(String), + /// `i64` key + I64(i64), + /// `u64` key + U64(u64), + /// `f64` key + F64(f64), + /// Null key + Null, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CompositeBucketEntry { + /// The identifier of the bucket. + pub key: FxHashMap, + /// Number of documents in the bucket. + pub doc_count: u64, + /// Sub-aggregations in this bucket. + pub sub_aggregation: AggregationResults, +} + +impl From for CompositeKey { + fn from(value: TantivyCompositeKey) -> CompositeKey { + match value { + TantivyCompositeKey::Bool(b) => CompositeKey::Bool(b), + TantivyCompositeKey::Str(s) => CompositeKey::Str(s), + TantivyCompositeKey::I64(i) => CompositeKey::I64(i), + TantivyCompositeKey::U64(u) => CompositeKey::U64(u), + TantivyCompositeKey::F64(f) => CompositeKey::F64(f), + TantivyCompositeKey::Null => CompositeKey::Null, + } + } +} + +impl From for TantivyCompositeKey { + fn from(value: CompositeKey) -> TantivyCompositeKey { + match value { + CompositeKey::Bool(b) => TantivyCompositeKey::Bool(b), + CompositeKey::Str(s) => TantivyCompositeKey::Str(s), + CompositeKey::I64(i) => TantivyCompositeKey::I64(i), + CompositeKey::U64(u) => TantivyCompositeKey::U64(u), + CompositeKey::F64(f) => TantivyCompositeKey::F64(f), + CompositeKey::Null => TantivyCompositeKey::Null, + } + } +} + +impl From for CompositeBucketEntry { + fn from(value: TantivyCompositeBucketEntry) -> CompositeBucketEntry { + CompositeBucketEntry { + key: value.key.into_iter().map(|(k, v)| (k, v.into())).collect(), + doc_count: value.doc_count, + sub_aggregation: value.sub_aggregation.into(), + } + } +} + +impl From for TantivyCompositeBucketEntry { + fn from(value: CompositeBucketEntry) -> TantivyCompositeBucketEntry { + TantivyCompositeBucketEntry { + key: value.key.into_iter().map(|(k, v)| (k, v.into())).collect(), + doc_count: value.doc_count, + sub_aggregation: value.sub_aggregation.into(), + } + } +} diff --git a/quickwit/quickwit-search/src/metrics_trackers.rs b/quickwit/quickwit-search/src/metrics_trackers.rs index 7f2f9fbbfb3..c48200acc0f 100644 --- a/quickwit/quickwit-search/src/metrics_trackers.rs +++ b/quickwit/quickwit-search/src/metrics_trackers.rs @@ -26,6 +26,7 @@ use crate::metrics::SEARCH_METRICS; // root +#[derive(Debug)] pub enum RootSearchMetricsStep { Plan, Exec { num_targeted_splits: usize }, @@ -85,14 +86,17 @@ impl PinnedDrop for RootSearchMetricsFuture { } } -impl Future for RootSearchMetricsFuture -where F: Future> +impl Future for RootSearchMetricsFuture +where F: Future> { - type Output = Result; + type Output = crate::Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); let response = ready!(this.tracked.poll(cx)); + if let Err(err) = &response { + tracing::error!(?err, step = ?this.step, "root search failed"); + } *this.is_success = Some(response.is_ok()); Poll::Ready(Ok(response?)) } @@ -141,10 +145,10 @@ where F: Future> fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); let response = ready!(this.tracked.poll(cx)); - *this.status = if response.is_ok() { - Some("success") - } else { - Some("error") + *this.status = match &response { + Ok(resp) if !resp.failed_splits.is_empty() => Some("partial-success"), + Ok(_) => Some("success"), + Err(_) => Some("error"), }; Poll::Ready(Ok(response?)) } diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 7d89bc1baa9..9dcbde4f7fe 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -367,6 +367,7 @@ fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> crate::Result< // to recompute it afterward. count_hits: quickwit_proto::search::CountHits::Underestimate as i32, ignore_missing_indexes: req.ignore_missing_indexes, + split_id: req.split_id.clone(), }) } @@ -1140,6 +1141,18 @@ async fn refine_and_list_matches( metastore, ) .await?; + + if let Some(split_id) = &search_request.split_id { + for split_metadata in split_metadatas { + if &split_metadata.split_id == split_id { + return Ok(vec![split_metadata]); + } + } + return Err(SearchError::InvalidQuery(format!( + "split ID {split_id} not found for the given query" + ))); + } + Ok(split_metadatas) } @@ -1212,6 +1225,24 @@ pub async fn root_search( current_span.record("num_docs", num_docs); current_span.record("num_splits", num_splits); + if let Some(max_total_split_searches) = + searcher_context.searcher_config.max_total_split_searches + { + if max_total_split_searches < num_splits { + tracing::error!( + num_splits, + max_total_split_searches, + index=?search_request.index_id_patterns, + query=%search_request.query_ast, + "max total splits exceeded" + ); + return Err(SearchError::InvalidArgument(format!( + "Number of targeted splits {num_splits} exceeds the limit \ + {max_total_split_searches}" + ))); + } + } + let mut search_response_result = RootSearchMetricsFuture { start: start_instant, tracked: root_search_aux( diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index 79ab0025ae3..607bfd17b6c 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -412,6 +412,7 @@ fn build_request_for_es_api( search_after, count_hits, ignore_missing_indexes, + split_id: None, }, has_doc_id_field, )) diff --git a/quickwit/quickwit-serve/src/search_api/rest_handler.rs b/quickwit/quickwit-serve/src/search_api/rest_handler.rs index cfdf46c61ef..671d7a6c2fa 100644 --- a/quickwit/quickwit-serve/src/search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/search_api/rest_handler.rs @@ -210,6 +210,9 @@ pub struct SearchRequestQueryString { #[schema(value_type = bool)] #[serde(default)] pub allow_failed_splits: bool, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub split_id: Option, } mod count_hits_from_bool { @@ -265,6 +268,7 @@ pub fn search_request_from_api_request( search_after: None, count_hits: search_request.count_all.into(), ignore_missing_indexes: false, + split_id: search_request.split_id, }; Ok(search_request) } diff --git a/quickwit/quickwit-storage/src/lib.rs b/quickwit/quickwit-storage/src/lib.rs index f808ac83286..31bbddcdd89 100644 --- a/quickwit/quickwit-storage/src/lib.rs +++ b/quickwit/quickwit-storage/src/lib.rs @@ -29,6 +29,7 @@ mod cache; mod debouncer; mod file_descriptor_cache; mod metrics; +mod metrics_wrappers; mod storage; mod timeout_and_retry_storage; pub use debouncer::AsyncDebouncer; diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index 43ef588e192..064448e0270 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -16,7 +16,7 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - GaugeGuard, Histogram, IntCounter, IntCounterVec, IntGauge, new_counter, new_counter_vec, + GaugeGuard, HistogramVec, IntCounter, IntCounterVec, IntGauge, new_counter, new_counter_vec, new_gauge, new_histogram_vec, }; @@ -30,19 +30,13 @@ pub struct StorageMetrics { pub searcher_split_cache: CacheMetrics, pub get_slice_timeout_successes: [IntCounter; 3], pub get_slice_timeout_all_timeouts: IntCounter, - pub object_storage_get_total: IntCounter, - pub object_storage_get_errors_total: IntCounterVec<1>, + pub object_storage_requests_total: IntCounterVec<2>, + pub object_storage_request_duration: HistogramVec<2>, pub object_storage_get_slice_in_flight_count: IntGauge, pub object_storage_get_slice_in_flight_num_bytes: IntGauge, - pub object_storage_put_total: IntCounter, - pub object_storage_put_parts: IntCounter, - pub object_storage_download_num_bytes: IntCounter, - pub object_storage_upload_num_bytes: IntCounter, - - pub object_storage_delete_requests_total: IntCounter, - pub object_storage_bulk_delete_requests_total: IntCounter, - pub object_storage_delete_request_duration: Histogram, - pub object_storage_bulk_delete_request_duration: Histogram, + pub object_storage_download_num_bytes: IntCounterVec<1>, + pub object_storage_download_errors: IntCounterVec<1>, + pub object_storage_upload_num_bytes: IntCounterVec<1>, } impl Default for StorageMetrics { @@ -63,31 +57,6 @@ impl Default for StorageMetrics { let get_slice_timeout_all_timeouts = get_slice_timeout_outcome_total_vec.with_label_values(["all_timeouts"]); - let object_storage_requests_total = new_counter_vec( - "object_storage_requests_total", - "Total number of object storage requests performed.", - "storage", - &[], - ["action"], - ); - let object_storage_delete_requests_total = - object_storage_requests_total.with_label_values(["delete_object"]); - let object_storage_bulk_delete_requests_total = - object_storage_requests_total.with_label_values(["delete_objects"]); - - let object_storage_request_duration = new_histogram_vec( - "object_storage_request_duration_seconds", - "Duration of object storage requests in seconds.", - "storage", - &[], - ["action"], - vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0], - ); - let object_storage_delete_request_duration = - object_storage_request_duration.with_label_values(["delete_object"]); - let object_storage_bulk_delete_request_duration = - object_storage_request_duration.with_label_values(["delete_objects"]); - StorageMetrics { fast_field_cache: CacheMetrics::for_component("fastfields"), fd_cache_metrics: CacheMetrics::for_component("fd"), @@ -97,62 +66,63 @@ impl Default for StorageMetrics { split_footer_cache: CacheMetrics::for_component("splitfooter"), get_slice_timeout_successes, get_slice_timeout_all_timeouts, - object_storage_get_total: new_counter( - "object_storage_gets_total", - "Number of objects fetched. Might be lower than get_slice_timeout_outcome if \ - queries are debounced.", + object_storage_requests_total: new_counter_vec( + "object_storage_requests_total", + "Number of requests to the object store, by action and status. Requests are \ + recorded when the response headers are returned, download failures will not \ + appear as errors.", "storage", &[], + ["action", "status"], ), - object_storage_get_errors_total: new_counter_vec::<1>( - "object_storage_get_errors_total", - "Number of GetObject errors.", + object_storage_request_duration: new_histogram_vec( + "object_storage_request_duration", + "Durations until the response headers are returned from the object store, by \ + action and status. This does not measure the download time for the body content.", "storage", &[], - ["code"], + ["action", "status"], + vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0], ), object_storage_get_slice_in_flight_count: new_gauge( "object_storage_get_slice_in_flight_count", - "Number of GetObject for which the memory was allocated but the download is still \ - in progress.", + "Number of get_object for which the memory was allocated but the download is \ + still in progress.", "storage", &[], ), object_storage_get_slice_in_flight_num_bytes: new_gauge( "object_storage_get_slice_in_flight_num_bytes", - "Memory allocated for GetObject requests that are still in progress.", + "Memory allocated for get_object requests that are still in progress.", "storage", &[], ), - object_storage_put_total: new_counter( - "object_storage_puts_total", - "Number of objects uploaded. May differ from object_storage_requests_parts due to \ - multipart upload.", + object_storage_download_num_bytes: new_counter_vec( + "object_storage_download_num_bytes", + "Amount of data downloaded from object storage.", "storage", &[], + ["status"], ), - object_storage_put_parts: new_counter( - "object_storage_puts_parts", - "Number of object parts uploaded.", - "", - &[], - ), - object_storage_download_num_bytes: new_counter( - "object_storage_download_num_bytes", - "Amount of data downloaded from an object storage.", + object_storage_download_errors: new_counter_vec( + "object_storage_download_errors", + // Download errors are recorded separately because the associated + // get_object requests were already recorded as successful in + // object_storage_requests_total + "Number of download requests that received successful response headers but failed \ + during download.", "storage", &[], + ["status"], ), - object_storage_upload_num_bytes: new_counter( + object_storage_upload_num_bytes: new_counter_vec( "object_storage_upload_num_bytes", - "Amount of data uploaded to an object storage.", + "Amount of data uploaded to object storage. The value recorded for failed and \ + aborted uploads is the full payload size.", "storage", &[], + ["status"], ), - object_storage_delete_requests_total, - object_storage_bulk_delete_requests_total, - object_storage_delete_request_duration, - object_storage_bulk_delete_request_duration, } } } diff --git a/quickwit/quickwit-storage/src/metrics_wrappers.rs b/quickwit/quickwit-storage/src/metrics_wrappers.rs new file mode 100644 index 00000000000..f09d1f0d92d --- /dev/null +++ b/quickwit/quickwit-storage/src/metrics_wrappers.rs @@ -0,0 +1,482 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll, ready}; +use std::time::Instant; + +use pin_project::{pin_project, pinned_drop}; +use tokio::io::{AsyncBufRead, AsyncWrite}; + +use crate::STORAGE_METRICS; + +#[derive(Clone, Copy, Debug)] +pub enum ActionLabel { + AbortMultipartUpload, + CompleteMultipartUpload, + CreateMultipartUpload, + DeleteObject, + DeleteObjects, + GetObject, + HeadObject, + ListObjects, + PutObject, + UploadPart, +} + +impl ActionLabel { + fn as_str(&self) -> &'static str { + match self { + ActionLabel::AbortMultipartUpload => "abort_multipart_upload", + ActionLabel::CompleteMultipartUpload => "complete_multipart_upload", + ActionLabel::CreateMultipartUpload => "create_multipart_upload", + ActionLabel::DeleteObject => "delete_object", + ActionLabel::DeleteObjects => "delete_objects", + ActionLabel::GetObject => "get_object", + ActionLabel::HeadObject => "head_object", + ActionLabel::ListObjects => "list_objects", + ActionLabel::PutObject => "put_object", + ActionLabel::UploadPart => "upload_part", + } + } +} + +pub enum RequestStatus { + Pending, + // only useful on feature="azure" + #[allow(dead_code)] + Done, + Ready(String), +} + +/// Converts an object store client SDK Result<> to the [Status] that should be +/// recorded in the metrics. +/// +/// The `Marker` type is necessary to avoid conflicting implementations of the +/// trait. +pub trait AsRequestStatus { + fn as_status(&self) -> RequestStatus; +} + +/// Wrapper around object store requests to record metrics, including cancellation. +#[pin_project(PinnedDrop)] +pub struct RequestMetricsWrapper +where + F: Future, + F::Output: AsRequestStatus, +{ + #[pin] + tracked: F, + action: ActionLabel, + start: Option, + uploaded_bytes: Option, + status: RequestStatus, + _marker: PhantomData, +} + +#[pinned_drop] +impl PinnedDrop for RequestMetricsWrapper +where + F: Future, + F::Output: AsRequestStatus, +{ + fn drop(self: Pin<&mut Self>) { + let status = match &self.status { + RequestStatus::Pending => "cancelled", + RequestStatus::Done => return, + RequestStatus::Ready(s) => s.as_str(), + }; + let label_values = [self.action.as_str(), status]; + STORAGE_METRICS + .object_storage_requests_total + .with_label_values(label_values) + .inc(); + if let Some(start) = self.start { + STORAGE_METRICS + .object_storage_request_duration + .with_label_values(label_values) + .observe(start.elapsed().as_secs_f64()); + } + if let Some(bytes) = self.uploaded_bytes { + STORAGE_METRICS + .object_storage_upload_num_bytes + .with_label_values([status]) + .inc_by(bytes); + } + } +} + +impl Future for RequestMetricsWrapper +where + F: Future, + F::Output: AsRequestStatus, +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let response = ready!(this.tracked.poll(cx)); + *this.status = response.as_status(); + + Poll::Ready(response) + } +} + +pub trait RequestMetricsWrapperExt +where + F: Future, + F::Output: AsRequestStatus, +{ + fn with_count_metric(self, action: ActionLabel) -> RequestMetricsWrapper; + + fn with_count_and_duration_metrics( + self, + action: ActionLabel, + ) -> RequestMetricsWrapper; + + fn with_count_and_upload_metrics( + self, + action: ActionLabel, + bytes: u64, + ) -> RequestMetricsWrapper; +} + +impl RequestMetricsWrapperExt for F +where + F: Future, + F::Output: AsRequestStatus, +{ + fn with_count_metric(self, action: ActionLabel) -> RequestMetricsWrapper { + RequestMetricsWrapper { + tracked: self, + action, + status: RequestStatus::Pending, + start: None, + uploaded_bytes: None, + _marker: PhantomData, + } + } + + fn with_count_and_duration_metrics( + self, + action: ActionLabel, + ) -> RequestMetricsWrapper { + RequestMetricsWrapper { + tracked: self, + action, + status: RequestStatus::Pending, + start: Some(Instant::now()), + uploaded_bytes: None, + _marker: PhantomData, + } + } + + fn with_count_and_upload_metrics( + self, + action: ActionLabel, + bytes: u64, + ) -> RequestMetricsWrapper { + RequestMetricsWrapper { + tracked: self, + action, + status: RequestStatus::Pending, + start: None, + uploaded_bytes: Some(bytes), + _marker: PhantomData, + } + } +} + +mod s3_impls { + use aws_sdk_s3::error::{ProvideErrorMetadata, SdkError}; + + use super::{AsRequestStatus, RequestStatus}; + + pub struct S3Marker; + + impl AsRequestStatus for Result> { + fn as_status(&self) -> RequestStatus { + let status_str = match self { + Ok(_) => "success".to_string(), + Err(SdkError::ConstructionFailure(_)) => "construction_failure".to_string(), + Err(SdkError::TimeoutError(_)) => "timeout_error".to_string(), + Err(SdkError::DispatchFailure(_)) => "dispatch_failure".to_string(), + Err(SdkError::ResponseError(_)) => "response_error".to_string(), + Err(e @ SdkError::ServiceError(_)) => e + .meta() + .code() + .unwrap_or("unknown_service_error") + .to_string(), + Err(_) => "unknown".to_string(), + }; + RequestStatus::Ready(status_str) + } + } +} + +#[cfg(feature = "azure")] +mod azure_impl { + use super::{AsRequestStatus, RequestStatus}; + + pub struct AzureMarker; + + impl AsRequestStatus for Result { + fn as_status(&self) -> RequestStatus { + let Err(err) = self else { + return RequestStatus::Ready("success".to_string()); + }; + let err_status_str = match err.kind() { + azure_storage::ErrorKind::HttpResponse { status, .. } => status.to_string(), + azure_storage::ErrorKind::Credential => "credential".to_string(), + azure_storage::ErrorKind::Io => "io".to_string(), + azure_storage::ErrorKind::DataConversion => "data_conversion".to_string(), + _ => "unknown".to_string(), + }; + RequestStatus::Ready(err_status_str) + } + } + + // The Azure SDK get_blob request returns Option because it chunks + // the download into a stream of get requests. + impl AsRequestStatus for Option> { + fn as_status(&self) -> RequestStatus { + match self { + None => RequestStatus::Done, + Some(res) => res.as_status(), + } + } + } +} + +pub enum DownloadStatus { + InProgress, + Done, + Failed(&'static str), +} + +/// Track io errors during downloads. +/// +/// Downloads are a bit different from other requests because the request might +/// fail while getting the bytes from the response body, long after getting a +/// successful response header. +#[pin_project(PinnedDrop)] +struct DownloadMetricsWrapper<'a, R, W> +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + #[pin] + tracked: copy_buf::CopyBuf<'a, R, W>, + status: DownloadStatus, +} + +#[pinned_drop] +impl<'a, R, W> PinnedDrop for DownloadMetricsWrapper<'a, R, W> +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + fn drop(self: Pin<&mut Self>) { + let error_opt = match &self.status { + DownloadStatus::InProgress => Some("cancelled"), + DownloadStatus::Failed(e) => Some(*e), + DownloadStatus::Done => None, + }; + + STORAGE_METRICS + .object_storage_download_num_bytes + .with_label_values([error_opt.unwrap_or("success")]) + .inc_by(self.tracked.amt); + + if let Some(error) = error_opt { + STORAGE_METRICS + .object_storage_download_errors + .with_label_values([error]) + .inc(); + } + } +} + +impl<'a, R, W> Future for DownloadMetricsWrapper<'a, R, W> +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let response = ready!(this.tracked.poll(cx)); + *this.status = match &response { + Ok(_) => DownloadStatus::Done, + Err(e) => DownloadStatus::Failed(io_error_as_label(e.kind())), + }; + Poll::Ready(response) + } +} + +pub async fn copy_with_download_metrics<'a, R, W>( + reader: &'a mut R, + writer: &'a mut W, +) -> io::Result +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + DownloadMetricsWrapper { + tracked: copy_buf::CopyBuf { + reader, + writer, + amt: 0, + }, + status: DownloadStatus::InProgress, + } + .await +} + +/// This is a fork of `tokio::io::copy_buf` that enables tracking the number of +/// bytes transferred. This estimate should be accurate as long as the network +/// is the bottleneck. +mod copy_buf { + + use std::future::Future; + use std::io; + use std::pin::Pin; + use std::task::{Context, Poll, ready}; + + use tokio::io::{AsyncBufRead, AsyncWrite}; + + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct CopyBuf<'a, R: ?Sized, W: ?Sized> { + pub reader: &'a mut R, + pub writer: &'a mut W, + pub amt: u64, + } + + impl Future for CopyBuf<'_, R, W> + where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, + { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + let me = &mut *self; + let buffer = ready!(Pin::new(&mut *me.reader).poll_fill_buf(cx))?; + if buffer.is_empty() { + ready!(Pin::new(&mut self.writer).poll_flush(cx))?; + return Poll::Ready(Ok(self.amt)); + } + + let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, buffer))?; + if i == 0 { + return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into())); + } + self.amt += i as u64; + Pin::new(&mut *self.reader).consume(i); + } + } + } +} + +fn io_error_as_label(error: io::ErrorKind) -> &'static str { + use io::ErrorKind::*; + // most of these variants are not expected to happen + match error { + AddrInUse => "addr_in_use", + AddrNotAvailable => "addr_not_available", + AlreadyExists => "already_exists", + ArgumentListTooLong => "argument_list_too_long", + BrokenPipe => "broken_pipe", + ConnectionAborted => "connection_aborted", + ConnectionRefused => "connection_refused", + ConnectionReset => "connection_reset", + CrossesDevices => "crosses_devices", + Deadlock => "deadlock", + DirectoryNotEmpty => "directory_not_empty", + ExecutableFileBusy => "executable_file_busy", + FileTooLarge => "file_too_large", + HostUnreachable => "host_unreachable", + Interrupted => "interrupted", + InvalidData => "invalid_data", + InvalidFilename => "invalid_filename", + InvalidInput => "invalid_input", + IsADirectory => "is_a_directory", + NetworkDown => "network_down", + NetworkUnreachable => "network_unreachable", + NotADirectory => "not_a_directory", + NotConnected => "not_connected", + NotFound => "not_found", + NotSeekable => "not_seekable", + Other => "other", + OutOfMemory => "out_of_memory", + PermissionDenied => "permission_denied", + QuotaExceeded => "quota_exceeded", + ReadOnlyFilesystem => "read_only_filesystem", + ResourceBusy => "resource_busy", + StaleNetworkFileHandle => "stale_network_file_handle", + StorageFull => "storage_full", + TimedOut => "timed_out", + TooManyLinks => "too_many_links", + UnexpectedEof => "unexpected_eof", + Unsupported => "unsupported", + WouldBlock => "would_block", + WriteZero => "write_zero", + _ => "uncategorized", + } +} + +#[cfg(feature = "gcs")] +pub mod opendal_helpers { + use quickwit_common::metrics::HistogramTimer; + + use super::*; + + /// Records a request occurrence for this action with unknown status. + pub fn record_request(action: ActionLabel) { + STORAGE_METRICS + .object_storage_requests_total + .with_label_values([action.as_str(), "unknown"]) + .inc(); + } + + /// Records an upload volume for this action with unknown status. + pub fn record_upload(bytes: u64) { + STORAGE_METRICS + .object_storage_upload_num_bytes + .with_label_values(["unknown"]) + .inc_by(bytes); + } + + /// Records an download volume for this action with unknown status. + pub fn record_download(bytes: u64) { + STORAGE_METRICS + .object_storage_download_num_bytes + .with_label_values(["unknown"]) + .inc_by(bytes); + } + + /// Records a request occurrence for this action with unknown status. + pub fn record_request_with_timer(action: ActionLabel) -> HistogramTimer { + record_request(action); + STORAGE_METRICS + .object_storage_request_duration + .with_label_values([action.as_str(), "unknown"]) + .start_timer() + } +} diff --git a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs index d4c9bd67d84..c92cd71e737 100644 --- a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs @@ -45,10 +45,11 @@ use tracing::{instrument, warn}; use crate::debouncer::DebouncedStorage; use crate::metrics::object_storage_get_slice_in_flight_guards; +use crate::metrics_wrappers::{ActionLabel, RequestMetricsWrapperExt, copy_with_download_metrics}; use crate::storage::SendableAsync; use crate::{ - BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, STORAGE_METRICS, Storage, - StorageError, StorageErrorKind, StorageFactory, StorageResolverError, StorageResult, + BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, Storage, StorageError, + StorageErrorKind, StorageFactory, StorageResolverError, StorageResult, }; /// Azure object storage resolver. @@ -242,10 +243,6 @@ impl AzureBlobStorage { name: &'a str, payload: Box, ) -> StorageResult<()> { - crate::STORAGE_METRICS.object_storage_put_parts.inc(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(payload.len()); retry(&self.retry_params, || async { let data = Bytes::from(payload.read_all().await?.to_vec()); let hash = azure_storage_blobs::prelude::Hash::from(md5::compute(&data[..]).0); @@ -254,6 +251,7 @@ impl AzureBlobStorage { .put_block_blob(data) .hash(hash) .into_future() + .with_count_and_upload_metrics(ActionLabel::PutObject, payload.len()) .await?; Result::<(), AzureErrorWrapper>::Ok(()) }) @@ -278,10 +276,6 @@ impl AzureBlobStorage { .map(|(num, range)| { let moved_blob_client = blob_client.clone(); let moved_payload = payload.clone(); - crate::STORAGE_METRICS.object_storage_put_parts.inc(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(range.end - range.start); async move { retry(&self.retry_params, || async { // zero pad block ids to make them sortable as strings @@ -294,6 +288,10 @@ impl AzureBlobStorage { .put_block(block_id.clone(), data) .hash(hash) .into_future() + .with_count_and_upload_metrics( + ActionLabel::UploadPart, + range.end - range.start, + ) .await?; Result::<_, AzureErrorWrapper>::Ok(block_id) }) @@ -323,6 +321,7 @@ impl AzureBlobStorage { blob_client .put_block_list(block_list) .into_future() + .with_count_metric(ActionLabel::CompleteMultipartUpload) .await .map_err(AzureErrorWrapper::from)?; @@ -339,6 +338,7 @@ impl Storage for AzureBlobStorage { .max_results(NonZeroU32::new(1u32).expect("1 is always non-zero.")) .into_stream() .next() + .with_count_metric(ActionLabel::ListObjects) .await { let _ = first_blob_result?; @@ -351,7 +351,6 @@ impl Storage for AzureBlobStorage { path: &Path, payload: Box, ) -> crate::StorageResult<()> { - crate::STORAGE_METRICS.object_storage_put_total.inc(); let name = self.blob_name(path); let total_len = payload.len(); let part_num_bytes = self.multipart_policy.part_num_bytes(total_len); @@ -369,7 +368,11 @@ impl Storage for AzureBlobStorage { let name = self.blob_name(path); let mut output_stream = self.container_client.blob_client(name).get().into_stream(); - while let Some(chunk_result) = output_stream.next().await { + while let Some(chunk_result) = output_stream + .next() + .with_count_metric(ActionLabel::GetObject) + .await + { let chunk_response = chunk_result.map_err(AzureErrorWrapper::from)?; let chunk_response_body_stream = chunk_response .data @@ -377,10 +380,7 @@ impl Storage for AzureBlobStorage { .into_async_read() .compat(); let mut body_stream_reader = BufReader::new(chunk_response_body_stream); - let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?; - STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_stream_reader, output).await?; } output.flush().await?; Ok(()) @@ -393,6 +393,7 @@ impl Storage for AzureBlobStorage { .blob_client(blob_name) .delete() .into_future() + .with_count_metric(ActionLabel::DeleteObject) .await .map_err(|err| AzureErrorWrapper::from(err).into()); ignore_error_kind!(StorageErrorKind::NotFound, delete_res)?; @@ -515,6 +516,7 @@ impl Storage for AzureBlobStorage { .blob_client(name) .get_properties() .into_future() + .with_count_metric(ActionLabel::HeadObject) .await; match properties_result { Ok(response) => Ok(response.blob.properties.content_length), @@ -537,7 +539,7 @@ async fn extract_range_data_and_hash( .await? .into_async_read(); let mut buf: Vec = Vec::with_capacity(range.count()); - tokio::io::copy(&mut reader, &mut buf).await?; + tokio::io::copy_buf(&mut reader, &mut buf).await?; let data = Bytes::from(buf); let hash = md5::compute(&data[..]); Ok((data, hash)) @@ -568,7 +570,11 @@ async fn download_all( output: &mut Vec, ) -> Result<(), AzureErrorWrapper> { output.clear(); - while let Some(chunk_result) = chunk_stream.next().await { + while let Some(chunk_result) = chunk_stream + .next() + .with_count_metric(ActionLabel::GetObject) + .await + { let chunk_response = chunk_result?; let chunk_response_body_stream = chunk_response .data @@ -576,10 +582,7 @@ async fn download_all( .into_async_read() .compat(); let mut body_stream_reader = BufReader::new(chunk_response_body_stream); - let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?; - crate::STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_stream_reader, output).await?; } // When calling `get_all`, the Vec capacity is not properly set. output.shrink_to_fit(); diff --git a/quickwit/quickwit-storage/src/object_storage/error.rs b/quickwit/quickwit-storage/src/object_storage/error.rs index 5f60fe1f944..8a7efc13332 100644 --- a/quickwit/quickwit-storage/src/object_storage/error.rs +++ b/quickwit/quickwit-storage/src/object_storage/error.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use aws_sdk_s3::error::{DisplayErrorContext, ProvideErrorMetadata, SdkError}; +use aws_sdk_s3::error::{DisplayErrorContext, SdkError}; use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadError; use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError; use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadError; @@ -62,11 +62,6 @@ pub trait ToStorageErrorKind { impl ToStorageErrorKind for GetObjectError { fn to_storage_error_kind(&self) -> StorageErrorKind { - let error_code = self.code().unwrap_or("unknown"); - crate::STORAGE_METRICS - .object_storage_get_errors_total - .with_label_values([error_code]) - .inc(); match self { GetObjectError::InvalidObjectState(_) => StorageErrorKind::Service, GetObjectError::NoSuchKey(_) => StorageErrorKind::NotFound, diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index e1cdfd16b70..7f68375b3fb 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -45,11 +45,12 @@ use tokio::sync::Semaphore; use tracing::{info, instrument, warn}; use crate::metrics::object_storage_get_slice_in_flight_guards; +use crate::metrics_wrappers::{ActionLabel, RequestMetricsWrapperExt, copy_with_download_metrics}; use crate::object_storage::MultiPartPolicy; use crate::storage::SendableAsync; use crate::{ - BulkDeleteError, DeleteFailure, OwnedBytes, STORAGE_METRICS, Storage, StorageError, - StorageErrorKind, StorageResolverError, StorageResult, + BulkDeleteError, DeleteFailure, OwnedBytes, Storage, StorageError, StorageErrorKind, + StorageResolverError, StorageResult, }; /// Semaphore to limit the number of concurrent requests to the object store. Some object stores @@ -288,11 +289,6 @@ impl S3CompatibleObjectStorage { .await .map_err(|io_error| Retry::Permanent(StorageError::from(io_error)))?; - crate::STORAGE_METRICS.object_storage_put_parts.inc(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(len); - self.s3_client .put_object() .bucket(bucket) @@ -300,6 +296,7 @@ impl S3CompatibleObjectStorage { .body(body) .content_length(len as i64) .send() + .with_count_and_upload_metrics(ActionLabel::PutObject, len) .await .map_err(|sdk_error| { if sdk_error.is_retryable() { @@ -334,6 +331,7 @@ impl S3CompatibleObjectStorage { .bucket(self.bucket.clone()) .key(key) .send() + .with_count_metric(ActionLabel::CreateMultipartUpload) .await }) .await? @@ -423,11 +421,6 @@ impl S3CompatibleObjectStorage { .map_err(Retry::Permanent)?; let md5 = BASE64_STANDARD.encode(part.md5.0); - crate::STORAGE_METRICS.object_storage_put_parts.inc(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(part.len()); - let upload_part_output = self .s3_client .upload_part() @@ -439,6 +432,7 @@ impl S3CompatibleObjectStorage { .part_number(part.part_number as i32) .upload_id(upload_id.0) .send() + .with_count_and_upload_metrics(ActionLabel::UploadPart, part.len()) .await .map_err(|s3_err| { if s3_err.is_retryable() { @@ -518,6 +512,7 @@ impl S3CompatibleObjectStorage { .multipart_upload(completed_upload.clone()) .upload_id(upload_id) .send() + .with_count_metric(ActionLabel::CompleteMultipartUpload) .await }) .await?; @@ -532,6 +527,7 @@ impl S3CompatibleObjectStorage { .key(key) .upload_id(upload_id) .send() + .with_count_metric(ActionLabel::AbortMultipartUpload) .await }) .await?; @@ -546,8 +542,6 @@ impl S3CompatibleObjectStorage { let key = self.key(path); let range_str = range_opt.map(|range| format!("bytes={}-{}", range.start, range.end - 1)); - crate::STORAGE_METRICS.object_storage_get_total.inc(); - let get_object_output = self .s3_client .get_object() @@ -555,6 +549,7 @@ impl S3CompatibleObjectStorage { .key(key) .set_range(range_str) .send() + .with_count_and_duration_metrics(ActionLabel::GetObject) .await?; Ok(get_object_output) } @@ -642,17 +637,12 @@ impl S3CompatibleObjectStorage { for (path_chunk, delete) in &mut delete_requests_it { let delete_objects_res: StorageResult = aws_retry(&self.retry_params, || async { - crate::STORAGE_METRICS - .object_storage_bulk_delete_requests_total - .inc(); - let _timer = crate::STORAGE_METRICS - .object_storage_bulk_delete_request_duration - .start_timer(); self.s3_client .delete_objects() .bucket(self.bucket.clone()) .delete(delete.clone()) .send() + .with_count_and_duration_metrics(ActionLabel::DeleteObjects) .await }) .await @@ -718,10 +708,7 @@ impl S3CompatibleObjectStorage { async fn download_all(byte_stream: ByteStream, output: &mut Vec) -> io::Result<()> { output.clear(); let mut body_stream_reader = BufReader::new(byte_stream.into_async_read()); - let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?; - STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_stream_reader, output).await?; // When calling `get_all`, the Vec capacity is not properly set. output.shrink_to_fit(); Ok(()) @@ -737,6 +724,7 @@ impl Storage for S3CompatibleObjectStorage { .bucket(self.bucket.clone()) .max_keys(1) .send() + .with_count_metric(ActionLabel::ListObjects) .await?; Ok(()) } @@ -746,7 +734,6 @@ impl Storage for S3CompatibleObjectStorage { path: &Path, payload: Box, ) -> crate::StorageResult<()> { - crate::STORAGE_METRICS.object_storage_put_total.inc(); let _permit = REQUEST_SEMAPHORE.acquire().await; let key = self.key(path); let total_len = payload.len(); @@ -765,10 +752,7 @@ impl Storage for S3CompatibleObjectStorage { let get_object_output = aws_retry(&self.retry_params, || self.get_object(path, None)).await?; let mut body_read = BufReader::new(get_object_output.body.into_async_read()); - let num_bytes_copied = tokio::io::copy_buf(&mut body_read, output).await?; - STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_read, output).await?; output.flush().await?; Ok(()) } @@ -778,17 +762,12 @@ impl Storage for S3CompatibleObjectStorage { let bucket = self.bucket.clone(); let key = self.key(path); let delete_res = aws_retry(&self.retry_params, || async { - crate::STORAGE_METRICS - .object_storage_delete_requests_total - .inc(); - let _timer = crate::STORAGE_METRICS - .object_storage_delete_request_duration - .start_timer(); self.s3_client .delete_object() .bucket(&bucket) .key(&key) .send() + .with_count_and_duration_metrics(ActionLabel::DeleteObject) .await }) .await; @@ -869,6 +848,7 @@ impl Storage for S3CompatibleObjectStorage { .bucket(&bucket) .key(&key) .send() + .with_count_metric(ActionLabel::HeadObject) .await }) .await?; diff --git a/quickwit/quickwit-storage/src/opendal_storage/base.rs b/quickwit/quickwit-storage/src/opendal_storage/base.rs index 0466a42d1d6..e84ba4afcc0 100644 --- a/quickwit/quickwit-storage/src/opendal_storage/base.rs +++ b/quickwit/quickwit-storage/src/opendal_storage/base.rs @@ -24,6 +24,7 @@ use tokio::io::{AsyncRead, AsyncWriteExt as TokioAsyncWriteExt}; use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}; use crate::metrics::object_storage_get_slice_in_flight_guards; +use crate::metrics_wrappers::{ActionLabel, opendal_helpers}; use crate::storage::SendableAsync; use crate::{ BulkDeleteError, MultiPartPolicy, OwnedBytes, PutPayload, Storage, StorageError, @@ -79,7 +80,8 @@ impl Storage for OpendalStorage { } async fn put(&self, path: &Path, payload: Box) -> StorageResult<()> { - crate::STORAGE_METRICS.object_storage_put_total.inc(); + // we record one put even though it may involve multiple parts + opendal_helpers::record_request(ActionLabel::PutObject); let path = path.as_os_str().to_string_lossy(); let mut payload_reader = payload.byte_stream().await?.into_async_read(); @@ -92,9 +94,7 @@ impl Storage for OpendalStorage { .compat_write(); tokio::io::copy(&mut payload_reader, &mut storage_writer).await?; storage_writer.get_mut().close().await?; - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(payload.len()); + opendal_helpers::record_upload(payload.len()); Ok(()) } @@ -108,9 +108,7 @@ impl Storage for OpendalStorage { .await? .compat(); let num_bytes_copied = tokio::io::copy(&mut storage_reader, output).await?; - crate::STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + opendal_helpers::record_download(num_bytes_copied); output.flush().await?; Ok(()) } @@ -122,7 +120,7 @@ impl Storage for OpendalStorage { // Unlike other object store implementations, in flight requests are // recorded before issuing the query to the object store. let _inflight_guards = object_storage_get_slice_in_flight_guards(size); - crate::STORAGE_METRICS.object_storage_get_total.inc(); + opendal_helpers::record_request(ActionLabel::GetObject); let storage_content = self.op.read_with(&path).range(range).await?.to_vec(); Ok(OwnedBytes::new(storage_content)) } @@ -152,12 +150,7 @@ impl Storage for OpendalStorage { async fn delete(&self, path: &Path) -> StorageResult<()> { let path = path.as_os_str().to_string_lossy(); - crate::STORAGE_METRICS - .object_storage_delete_requests_total - .inc(); - let _timer = crate::STORAGE_METRICS - .object_storage_delete_request_duration - .start_timer(); + let _timer = opendal_helpers::record_request_with_timer(ActionLabel::DeleteObject); self.op.delete(&path).await?; Ok(()) } @@ -173,12 +166,8 @@ impl Storage for OpendalStorage { { let mut bulk_error = BulkDeleteError::default(); for (index, path) in paths.iter().enumerate() { - crate::STORAGE_METRICS - .object_storage_bulk_delete_requests_total - .inc(); - let _timer = crate::STORAGE_METRICS - .object_storage_bulk_delete_request_duration - .start_timer(); + let _timer = + opendal_helpers::record_request_with_timer(ActionLabel::DeleteObjects); let result = self.op.delete(&path.as_os_str().to_string_lossy()).await; if let Err(err) = result { let storage_error_kind = err.kind(); diff --git a/quickwit/rest-api-tests/scenarii/aggregations/0001-aggregations.yaml b/quickwit/rest-api-tests/scenarii/aggregations/0001-aggregations.yaml index f81c2215f40..63daf92db06 100644 --- a/quickwit/rest-api-tests/scenarii/aggregations/0001-aggregations.yaml +++ b/quickwit/rest-api-tests/scenarii/aggregations/0001-aggregations.yaml @@ -375,6 +375,7 @@ expected: aggregations: response_stats: sum_of_squares: 55300.0 +--- # Test term aggs number precision method: [GET] engines: @@ -393,3 +394,86 @@ expected: buckets: - doc_count: 1 key: 1769070189829214200 +--- +# Test composite aggregation +method: [GET] +engines: + - quickwit +endpoint: _elastic/aggregations/_search +json: + size: 0 + aggs: + host_name_composite: + composite: + size: 5 + sources: + - host: + terms: + field: "host" + missing_bucket: true + - name: + terms: + field: "name" + - response: + histogram: + field: "response" + interval: 50 +expected: + aggregations: + host_name_composite: + buckets: + - key: { "host": null, "name": "Bernhard", "response": 100.0 } + doc_count: 1 + - key: { "host": null, "name": "Fritz", "response": 0.0 } + doc_count: 2 + - key: { "host": "192.168.0.1", "name": "Fred", "response": 100.0 } + doc_count: 1 + - key: { "host": "192.168.0.1", "name": "Fritz", "response": 0.0 } + doc_count: 1 + - key: { "host": "192.168.0.10", "name": "Albert", "response": 100.0 } + doc_count: 1 + after_key: + host: "192.168.0.10" + name: "Albert" + response: 100.0 + +--- +# Test composite aggregation paging +method: [GET] +engines: + - quickwit +endpoint: _elastic/aggregations/_search +json: + size: 0 + aggs: + host_name_composite: + composite: + size: 5 + sources: + - host: + terms: + field: "host" + missing_bucket: true + - name: + terms: + field: "name" + - response: + histogram: + field: "response" + interval: 50 + after: + host: "192.168.0.10" + name: "Albert" + response: 100.0 +expected: + aggregations: + host_name_composite: + buckets: + - key: { "host": "192.168.0.10", "name": "Holger", "response": 0.0 } + doc_count: 1 + # Horst is missing because his response field is missing + - key: { "host": "192.168.0.10", "name": "Werner", "response": 0.0 } + doc_count: 1 + - key: { "host": "192.168.0.11", "name": "Manfred", "response": 100.0 } + doc_count: 1 +--- \ No newline at end of file