diff --git a/src/adapter/src/flags.rs b/src/adapter/src/flags.rs index 15aaaff18c708..28bbb0432bbe2 100644 --- a/src/adapter/src/flags.rs +++ b/src/adapter/src/flags.rs @@ -147,7 +147,7 @@ pub fn storage_config(config: &SystemVars) -> StorageParameters { enable_dependency_read_hold_asserts: config.enable_dependency_read_hold_asserts(), user_storage_managed_collections_batch_duration: config .user_storage_managed_collections_batch_duration(), - dyncfg_updates: Some(config.dyncfg_updates()), + dyncfg_updates: config.dyncfg_updates(), } } diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index a1456957f7b6e..2dac1ed8209c7 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -264,9 +264,7 @@ where fn update_parameters(&mut self, config_params: StorageParameters) { // We serialize the dyncfg updates in StorageParameters, but configure // persist separately. - if let Some(updates) = &config_params.dyncfg_updates { - updates.apply(self.persist.cfg()); - } + config_params.dyncfg_updates.apply(self.persist.cfg()); for client in self.clients.values_mut() { client.send(StorageCommand::UpdateConfiguration(config_params.clone())); diff --git a/src/storage-types/src/configuration.rs b/src/storage-types/src/configuration.rs index 88e543990d32c..b42d4169cb90d 100644 --- a/src/storage-types/src/configuration.rs +++ b/src/storage-types/src/configuration.rs @@ -60,12 +60,10 @@ impl StorageConfiguration { &self.config_set } - pub fn update(&mut self, mut parameters: StorageParameters) { + pub fn update(&mut self, parameters: StorageParameters) { // We serialize the dyncfg updates in StorageParameters, but store the config set // top-level. Eventually, all of `StorageParameters` goes away. - if let Some(updates) = parameters.dyncfg_updates.take() { - updates.apply(&self.config_set); - } + parameters.dyncfg_updates.apply(&self.config_set); self.parameters.update(parameters); } } diff --git a/src/storage-types/src/parameters.rs b/src/storage-types/src/parameters.rs index b926ae94bceaf..73258be3a39dd 100644 --- a/src/storage-types/src/parameters.rs +++ b/src/storage-types/src/parameters.rs @@ -72,9 +72,8 @@ pub struct StorageParameters { /// Duration that we wait to batch rows for user owned, storage managed, collections. pub user_storage_managed_collections_batch_duration: Duration, - /// Updates used to update `StorageConfiguration::config_set`. `None` when - /// not being moved over the wire. - pub dyncfg_updates: Option, + /// Updates used to update `StorageConfiguration::config_set`. + pub dyncfg_updates: mz_dyncfg::ConfigUpdates, } pub const STATISTICS_INTERVAL_DEFAULT: Duration = Duration::from_secs(60); @@ -108,7 +107,7 @@ impl Default for StorageParameters { enable_dependency_read_hold_asserts: true, user_storage_managed_collections_batch_duration: STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT, - dyncfg_updates: None, + dyncfg_updates: Default::default(), } } } @@ -206,7 +205,7 @@ impl StorageParameters { pg_snapshot_config, enable_dependency_read_hold_asserts, user_storage_managed_collections_batch_duration, - dyncfg_updates: _, + dyncfg_updates, }: StorageParameters, ) { self.pg_source_tcp_timeouts = pg_source_tcp_timeouts; @@ -234,9 +233,7 @@ impl StorageParameters { self.enable_dependency_read_hold_asserts = enable_dependency_read_hold_asserts; self.user_storage_managed_collections_batch_duration = user_storage_managed_collections_batch_duration; - - // The storage controller and storage state don't need `dyncfg_updates` maintained. - self.dyncfg_updates = None; + self.dyncfg_updates.extend(dyncfg_updates); } } @@ -279,7 +276,7 @@ impl RustType for StorageParameters { self.user_storage_managed_collections_batch_duration .into_proto(), ), - dyncfg_updates: self.dyncfg_updates.clone(), + dyncfg_updates: Some(self.dyncfg_updates.clone()), } } @@ -348,7 +345,9 @@ impl RustType for StorageParameters { .into_rust_if_some( "ProtoStorageParameters::user_storage_managed_collections_batch_duration", )?, - dyncfg_updates: proto.dyncfg_updates, + dyncfg_updates: proto.dyncfg_updates.ok_or_else(|| { + TryFromProtoError::missing_field("ProtoStorageParameters::dyncfg_updates") + })?, }) } } diff --git a/src/storage/src/storage_state.rs b/src/storage/src/storage_state.rs index 585e7e9d53d4a..23a9b9fa2adc4 100644 --- a/src/storage/src/storage_state.rs +++ b/src/storage/src/storage_state.rs @@ -870,6 +870,14 @@ impl<'w, A: Allocate> Worker<'w, A> { self.storage_state .storage_configuration .update(storage_parameters); + + // Clear out the updates as we no longer forward them to anyone else to process. + // We clone `StorageState::storage_configuration` many times during rendering + // and want to avoid cloning these unused updates. + self.storage_state + .storage_configuration + .parameters + .dyncfg_updates = Default::default(); } InternalStorageCommand::StatisticsUpdate { sources, sinks } => self .storage_state @@ -1202,9 +1210,7 @@ impl StorageState { // We serialize the dyncfg updates in StorageParameters, but configure // persist separately. - if let Some(updates) = ¶ms.dyncfg_updates { - updates.apply(self.persist_clients.cfg()); - } + params.dyncfg_updates.apply(self.persist_clients.cfg()); params.tracing.apply(self.tracing_handle.as_ref());