From 87073a47f2f56c4de4bfe8e35b0a9b65286f3416 Mon Sep 17 00:00:00 2001 From: Roshan Jobanputra Date: Thu, 21 Mar 2024 11:48:15 -0400 Subject: [PATCH 1/6] storage: Deny localhost/private-ips in source/sink connections --- src/kafka-util/src/client.rs | 22 +++- src/mysql-util/src/tunnel.rs | 35 ++++-- src/ore/src/netio.rs | 2 + src/ore/src/netio/dns.rs | 67 +++++++++++ src/postgres-util/src/tunnel.rs | 18 ++- src/storage-types/src/connections.rs | 115 +++++++++++++++---- src/storage-types/src/dyncfgs.rs | 12 +- src/storage-types/src/errors.rs | 7 ++ test/mysql-cdc/mysql-cdc.td | 13 +++ test/pg-cdc/pg-cdc.td | 16 ++- test/ssh-connection/ssh-connections.td | 25 ++++ test/testdrive/connection-create-external.td | 77 +++++++++++++ 12 files changed, 373 insertions(+), 36 deletions(-) create mode 100644 src/ore/src/netio/dns.rs create mode 100644 test/testdrive/connection-create-external.td diff --git a/src/kafka-util/src/client.rs b/src/kafka-util/src/client.rs index 621bddebffd76..5aed2c099eb30 100644 --- a/src/kafka-util/src/client.rs +++ b/src/kafka-util/src/client.rs @@ -23,6 +23,7 @@ use crossbeam::channel::{unbounded, Receiver, Sender}; use mz_ore::collections::CollectionExt; use mz_ore::error::ErrorExt; use mz_ore::future::InTask; +use mz_ore::netio::resolve_external_address; use mz_ssh_util::tunnel::{SshTimeoutConfig, SshTunnelConfig, SshTunnelStatus}; use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager}; use rdkafka::client::{BrokerAddr, Client, NativeClient, OAuthToken}; @@ -319,6 +320,7 @@ pub struct TunnelingClientContext { ssh_tunnel_manager: SshTunnelManager, ssh_timeout_config: SshTimeoutConfig, runtime: Handle, + enforce_external_addresses: bool, } impl TunnelingClientContext { @@ -329,6 +331,7 @@ impl TunnelingClientContext { ssh_tunnel_manager: SshTunnelManager, ssh_timeout_config: SshTimeoutConfig, in_task: InTask, + enforce_external_addresses: bool, ) -> TunnelingClientContext { TunnelingClientContext { inner, @@ -338,6 +341,7 @@ impl TunnelingClientContext { ssh_tunnel_manager, ssh_timeout_config, runtime, + enforce_external_addresses, } } @@ -532,7 +536,23 @@ where host: host.to_owned(), port: addr.port, }, - TunnelConfig::None => addr, + TunnelConfig::None => { + // If no rewrite is specified, we still should check that this potentially + // new broker address is a global address. + let rewrite = self.runtime.block_on(async { + let resolved = resolve_external_address( + &addr.host, + self.enforce_external_addresses, + ) + .await + .unwrap(); + BrokerRewriteHandle::Simple(BrokerRewrite { + host: resolved.to_string(), + port: addr.port.parse().ok(), + }) + }); + return_rewrite(&rewrite) + } } } Some(rewrite) => return_rewrite(&rewrite), diff --git a/src/mysql-util/src/tunnel.rs b/src/mysql-util/src/tunnel.rs index dc2ae7e3332dd..5139aef9ec10b 100644 --- a/src/mysql-util/src/tunnel.rs +++ b/src/mysql-util/src/tunnel.rs @@ -27,7 +27,7 @@ use crate::MySqlError; #[derive(Debug, PartialEq, Clone)] pub enum TunnelConfig { /// Establish a direct TCP connection to the database host. - Direct, + Direct { tcp_host_override: Option }, /// Establish a TCP connection to the database via an SSH tunnel. /// This means first establishing an SSH connection to a bastion host, /// and then opening a separate connection from that host to the database. @@ -237,12 +237,33 @@ impl Config { ssh_tunnel_manager: &SshTunnelManager, ) -> Result { match &self.tunnel { - TunnelConfig::Direct => Ok(MySqlConn { - conn: Conn::new(self.inner.clone()) - .await - .map_err(MySqlError::from)?, - _ssh_tunnel_handle: None, - }), + TunnelConfig::Direct { tcp_host_override } => { + // Override the connection host for the actual TCP connection to point to + // the privatelink hostname instead. + let mut opts_builder = OptsBuilder::from_opts(self.inner.clone()); + + if let Some(tcp_override) = tcp_host_override { + opts_builder = opts_builder.ip_or_hostname(tcp_override); + + if let Some(ssl_opts) = self.inner.ssl_opts() { + if !ssl_opts.skip_domain_validation() { + // If the TLS configuration will validate the hostname, we need to set + // the TLS hostname back to the actual upstream host and not the + // TCP hostname. + opts_builder = opts_builder.ssl_opts(Some( + ssl_opts.clone().with_tls_hostname_override(Some( + self.inner.ip_or_hostname().to_string(), + )), + )); + } + } + }; + + Ok(MySqlConn { + conn: Conn::new(opts_builder).await.map_err(MySqlError::from)?, + _ssh_tunnel_handle: None, + }) + } TunnelConfig::Ssh { config } => { let (host, port) = self.address(); let tunnel = ssh_tunnel_manager diff --git a/src/ore/src/netio.rs b/src/ore/src/netio.rs index 3537e7eb54e5e..3a1cfcddd075a 100644 --- a/src/ore/src/netio.rs +++ b/src/ore/src/netio.rs @@ -16,11 +16,13 @@ //! Network I/O utilities. mod async_ready; +mod dns; mod framed; mod read_exact; mod socket; pub use crate::netio::async_ready::AsyncReady; +pub use crate::netio::dns::resolve_external_address; pub use crate::netio::framed::{FrameTooBig, MAX_FRAME_SIZE}; pub use crate::netio::read_exact::{read_exact_or_eof, ReadExactOrEof}; pub use crate::netio::socket::{Listener, SocketAddr, SocketAddrType, Stream, UnixSocketAddr}; diff --git a/src/ore/src/netio/dns.rs b/src/ore/src/netio/dns.rs new file mode 100644 index 0000000000000..03ad4f6976832 --- /dev/null +++ b/src/ore/src/netio/dns.rs @@ -0,0 +1,67 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License in the LICENSE file at the +// root of this repository, or online at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; +use std::net::IpAddr; + +use tokio::net::lookup_host; + +const DUMMY_PORT: u16 = 11111; + +/// Resolves a host address and ensures it is a global address when `enforce_global` is set. +/// This parameter is useful when connecting to user-defined unverified addresses. +pub async fn resolve_external_address( + mut host: &str, + enforce_global: bool, +) -> Result { + // `net::lookup_host` requires a port to be specified, but we don't care about the port. + let mut port = DUMMY_PORT; + // If a port is already specified, use it and remove it from the host. + if let Some(idx) = host.find(':') { + if let Ok(p) = host[idx + 1..].parse() { + port = p; + host = &host[..idx]; + } + } + + let mut addrs = lookup_host((host, port)).await?; + + if let Some(addr) = addrs.next() { + let ip = addr.ip(); + if enforce_global && !is_global(ip) { + Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "address is not global", + )) + } else { + Ok(ip) + } + } else { + Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "no addresses found", + )) + } +} + +fn is_global(addr: IpAddr) -> bool { + // TODO: Switch to `addr.is_global()` once stable: https://github.com/rust-lang/rust/issues/27709 + match addr { + IpAddr::V4(ip) => { + !(ip.is_unspecified() || ip.is_private() || ip.is_loopback() || ip.is_link_local()) + } + IpAddr::V6(ip) => !(ip.is_loopback() || ip.is_unspecified()), + } +} diff --git a/src/postgres-util/src/tunnel.rs b/src/postgres-util/src/tunnel.rs index 57ebfdba52f52..e6680728d97fe 100644 --- a/src/postgres-util/src/tunnel.rs +++ b/src/postgres-util/src/tunnel.rs @@ -38,7 +38,7 @@ macro_rules! bail_generic { #[derive(Debug, PartialEq, Clone)] pub enum TunnelConfig { /// Establish a direct TCP connection to the database host. - Direct, + Direct { tcp_host_override: Option }, /// Establish a TCP connection to the database via an SSH tunnel. /// This means first establishing an SSH connection to a bastion host, /// and then opening a separate connection from that host to the database. @@ -226,7 +226,21 @@ impl Config { })?; match &self.tunnel { - TunnelConfig::Direct => { + TunnelConfig::Direct { tcp_host_override } => { + // Override the TCP host we connect to, leaving the host used for TLS verification + // as the original host + if let Some(tcp_override) = tcp_host_override { + let (host, _) = self.address()?; + postgres_config.tls_verify_host(host); + + match postgres_config.get_hosts_mut() { + [Host::Tcp(host)] => *host = tcp_override.clone(), + _ => bail_generic!( + "only TCP connections to a single PostgreSQL server are supported" + ), + } + }; + let (client, connection) = async move { postgres_config.connect(tls).await } .run_in_task_if(self.in_task, || "pg_connect".to_string()) .await?; diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index 4a3e3bcf52b15..3a357441edccc 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -24,6 +24,7 @@ use mz_kafka_util::client::{ }; use mz_ore::error::ErrorExt; use mz_ore::future::{InTask, OreFutureExt}; +use mz_ore::netio::resolve_external_address; use mz_proto::tokio_postgres::any_ssl_mode; use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError}; use mz_repr::url::any_url; @@ -51,7 +52,7 @@ use url::Url; use crate::configuration::StorageConfiguration; use crate::connections::aws::{AwsConnection, AwsConnectionValidationError}; use crate::controller::AlterError; -use crate::dyncfgs::KAFKA_CLIENT_ID_ENRICHMENT_RULES; +use crate::dyncfgs::{KAFKA_CLIENT_ID_ENRICHMENT_RULES, ENFORCE_EXTERNAL_ADDRESSES}; use crate::errors::{ContextCreationError, CsrConnectError}; use crate::AlterCompatible; @@ -670,6 +671,7 @@ impl KafkaConnection { .clone(), storage_configuration.parameters.ssh_timeout_config, in_task, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), ); match &self.default_tunnel { @@ -710,15 +712,25 @@ impl KafkaConnection { }; match &broker.tunnel { Tunnel::Direct => { - // By default, don't override broker address lookup. - // - // N.B. - // // We _could_ pre-setup the default ssh tunnel for all known brokers here, but // we avoid doing because: // - Its not necessary. // - Not doing so makes it easier to test the `FailedDefaultSshTunnel` path // in the `TunnelingClientContext`. + + // Ensure any broker address we connect to is resolved to an external address. + let resolved = resolve_external_address( + &addr.host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; + context.add_broker_rewrite( + addr, + BrokerRewrite { + host: resolved.to_string(), + port: None, + }, + ) } Tunnel::AwsPrivatelink(aws_privatelink) => { let host = mz_cloud_resources::vpc_endpoint_host( @@ -735,11 +747,17 @@ impl KafkaConnection { ); } Tunnel::Ssh(ssh_tunnel) => { + // Ensure any SSH bastion address we connect to is resolved to an external address. + let ssh_host_resolved = resolve_external_address( + &ssh_tunnel.connection.host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; context .add_ssh_tunnel( addr, SshTunnelConfig { - host: ssh_tunnel.connection.host.clone(), + host: ssh_host_resolved.to_string(), port: ssh_tunnel.connection.port, user: ssh_tunnel.connection.user.clone(), key_pair: SshKeyPair::from_bytes( @@ -1034,15 +1052,23 @@ impl CsrConnection { // incorrectly starts using this port. const DUMMY_PORT: u16 = 11111; + // TODO: use types to enforce that the URL has a string hostname. + let host = self + .url + .host_str() + .ok_or_else(|| anyhow!("url missing host"))?; match &self.tunnel { - Tunnel::Direct => {} + Tunnel::Direct => { + // Ensure any host we connect to is resolved to an external address. + let resolved = resolve_external_address( + host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; + client_config = + client_config.resolve_to_addrs(host, &[SocketAddr::new(resolved, DUMMY_PORT)]) + } Tunnel::Ssh(ssh_tunnel) => { - // TODO: use types to enforce that the URL has a string hostname. - let host = self - .url - .host_str() - .ok_or_else(|| anyhow!("url missing host"))?; - let ssh_tunnel = ssh_tunnel .connect( storage_configuration, @@ -1093,11 +1119,6 @@ impl CsrConnection { Tunnel::AwsPrivatelink(connection) => { assert!(connection.port.is_none()); - // TODO: use types to enforce that the URL has a string hostname. - let host = self - .url - .host_str() - .ok_or_else(|| anyhow!("url missing host"))?; let privatelink_host = mz_cloud_resources::vpc_endpoint_host( connection.connection_id, connection.availability_zone.as_deref(), @@ -1332,7 +1353,17 @@ impl PostgresConnection { } let tunnel = match &self.tunnel { - Tunnel::Direct => mz_postgres_util::TunnelConfig::Direct, + Tunnel::Direct => { + // Ensure any host we connect to is resolved to an external address. + let resolved = resolve_external_address( + &self.host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; + mz_postgres_util::TunnelConfig::Direct { + tcp_host_override: Some(resolved.to_string()), + } + } Tunnel::Ssh(SshTunnel { connection_id, connection, @@ -1341,9 +1372,15 @@ impl PostgresConnection { .read_in_task_if(in_task, *connection_id) .await?; let key_pair = SshKeyPair::from_bytes(&secret)?; + // Ensure any ssh-bastion host we connect to is resolved to an external address. + let resolved = resolve_external_address( + &connection.host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; mz_postgres_util::TunnelConfig::Ssh { config: SshTunnelConfig { - host: connection.host.clone(), + host: resolved.to_string(), port: connection.port, user: connection.user.clone(), key_pair, @@ -1696,7 +1733,17 @@ impl MySqlConnection { opts = opts.ssl_opts(ssl_opts); let tunnel = match &self.tunnel { - Tunnel::Direct => mz_mysql_util::TunnelConfig::Direct, + Tunnel::Direct => { + // Ensure any host we connect to is resolved to an external address. + let resolved = resolve_external_address( + &self.host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; + mz_mysql_util::TunnelConfig::Direct { + tcp_host_override: Some(resolved.to_string()), + } + } Tunnel::Ssh(SshTunnel { connection_id, connection, @@ -1705,9 +1752,15 @@ impl MySqlConnection { .read_in_task_if(in_task, *connection_id) .await?; let key_pair = SshKeyPair::from_bytes(&secret)?; + // Ensure any ssh-bastion host we connect to is resolved to an external address. + let resolved = resolve_external_address( + &connection.host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; mz_mysql_util::TunnelConfig::Ssh { config: SshTunnelConfig { - host: connection.host.clone(), + host: resolved.to_string(), port: connection.port, user: connection.user.clone(), key_pair, @@ -1987,12 +2040,18 @@ impl SshTunnel { remote_port: u16, in_task: InTask, ) -> Result { + // Ensure any ssh-bastion host we connect to is resolved to an external address. + let resolved = resolve_external_address( + &self.connection.host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; storage_configuration .connection_context .ssh_tunnel_manager .connect( SshTunnelConfig { - host: self.connection.host.clone(), + host: resolved.to_string(), port: self.connection.port, user: self.connection.user.clone(), key_pair: SshKeyPair::from_bytes( @@ -2060,8 +2119,16 @@ impl SshConnection { ) .await?; let key_pair = SshKeyPair::from_bytes(&secret)?; + + // Ensure any ssh-bastion host we connect to is resolved to an external address. + let resolved = resolve_external_address( + &self.host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; + let config = SshTunnelConfig { - host: self.host.clone(), + host: resolved.to_string(), port: self.port, user: self.user.clone(), key_pair, diff --git a/src/storage-types/src/dyncfgs.rs b/src/storage-types/src/dyncfgs.rs index 0ff0e70bbf869..e19a5cd914328 100644 --- a/src/storage-types/src/dyncfgs.rs +++ b/src/storage-types/src/dyncfgs.rs @@ -48,6 +48,15 @@ pub const KAFKA_CLIENT_ID_ENRICHMENT_RULES: Config serde_json::Value> = "Rules for enriching the `client.id` property of Kafka clients with additional data.", ); +/// Whether or not to enforce that external connection addresses are global +/// (not private or local) when resolving them. +pub const ENFORCE_EXTERNAL_ADDRESSES: Config = Config::new( + "storage_enforce_external_addresses", + false, + "Whether or not to enforce that external connection addresses are global \ + (not private or local) when resolving them", +); + /// Whether or not to prevent buffering the entire _upstream_ snapshot in /// memory when processing it in memory. This is generally understood to reduce /// memory consumption. @@ -78,12 +87,13 @@ pub const STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING: Config> = C "Limit snapshot buffering in upsert.", ); -/// Adds the full set of all compute `Config`s. +/// Adds the full set of all storage `Config`s. pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { configs .add(&DELAY_SOURCES_PAST_REHYDRATION) .add(&STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION) .add(&KAFKA_CLIENT_ID_ENRICHMENT_RULES) + .add(&ENFORCE_EXTERNAL_ADDRESSES) .add(&STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING) .add(&STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING) } diff --git a/src/storage-types/src/errors.rs b/src/storage-types/src/errors.rs index eb8033d2d3b18..d21932a623258 100644 --- a/src/storage-types/src/errors.rs +++ b/src/storage-types/src/errors.rs @@ -1003,6 +1003,8 @@ pub enum ContextCreationError { KafkaError(#[from] KafkaError), #[error(transparent)] Other(#[from] anyhow::Error), + #[error(transparent)] + Io(#[from] std::io::Error), } /// An extension trait for `Result` that makes producing `ContextCreationError`s easier. @@ -1044,6 +1046,9 @@ where ContextCreationError::KafkaError(e) => { ContextCreationError::Other(anyhow!(anyhow!(e).context(msg))) } + ContextCreationError::Io(e) => { + ContextCreationError::Other(anyhow!(anyhow!(e).context(msg))) + } } }) } @@ -1072,6 +1077,8 @@ pub enum CsrConnectError { #[error(transparent)] Openssl(#[from] openssl::error::ErrorStack), #[error(transparent)] + Io(#[from] std::io::Error), + #[error(transparent)] Other(#[from] anyhow::Error), } diff --git a/test/mysql-cdc/mysql-cdc.td b/test/mysql-cdc/mysql-cdc.td index 8df080f555f06..8aff860b55ed5 100644 --- a/test/mysql-cdc/mysql-cdc.td +++ b/test/mysql-cdc/mysql-cdc.td @@ -222,6 +222,19 @@ contains:Connection refused ) contains:Access denied for user 'root' +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET storage_enforce_external_addresses = true + +! CREATE CONNECTION private_address TO MYSQL ( + HOST mysql, + USER root, + PASSWORD SECRET mysqlpass + ) +contains:address is not global + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET storage_enforce_external_addresses = false + ! CREATE SOURCE "mz_source" IN CLUSTER cdc_cluster FROM MYSQL CONNECTION mysql_conn diff --git a/test/pg-cdc/pg-cdc.td b/test/pg-cdc/pg-cdc.td index c08726d68559e..d15a18e3b0d65 100644 --- a/test/pg-cdc/pg-cdc.td +++ b/test/pg-cdc/pg-cdc.td @@ -190,7 +190,7 @@ ALTER SYSTEM SET pg_source_snapshot_statement_timeout = 0 USER postgres, PASSWORD SECRET pgpass ) -contains:error connecting to server: failed to lookup address information: Name or service not known: failed to lookup address +contains:failed to lookup address information: Name or service not known ! CREATE CONNECTION no_such_port TO POSTGRES ( HOST postgres, @@ -226,6 +226,20 @@ contains:password authentication failed for user "postgres" ) contains:database "no_such_dbname" does not exist +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET storage_enforce_external_addresses = true + +! CREATE CONNECTION private_address TO POSTGRES ( + HOST postgres, + DATABASE postgres, + USER postgres, + PASSWORD SECRET pgpass + ) +contains:address is not global + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET storage_enforce_external_addresses = false + ! CREATE SOURCE "no_such_publication" IN CLUSTER cdc_cluster FROM POSTGRES CONNECTION pg (PUBLICATION 'no_such_publication'); diff --git a/test/ssh-connection/ssh-connections.td b/test/ssh-connection/ssh-connections.td index 7aad8e0fde7c6..c1b3ecb169aaf 100644 --- a/test/ssh-connection/ssh-connections.td +++ b/test/ssh-connection/ssh-connections.td @@ -120,3 +120,28 @@ contains: still depended upon > SELECT name, type FROM mz_connections; name type ---------------- + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET storage_enforce_external_addresses = true + +> CREATE CONNECTION omega TO SSH TUNNEL ( + HOST 'chaos.example.com', + USER 'omega', + PORT 22 + ); + +# error is not consistent +! VALIDATE CONNECTION omega; +contains:failed to + +> CREATE CONNECTION local TO SSH TUNNEL ( + HOST 'ssh-bastion-host', + USER 'omega', + PORT 22 + ); + +! VALIDATE CONNECTION local; +contains:address is not global + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET storage_enforce_external_addresses = false diff --git a/test/testdrive/connection-create-external.td b/test/testdrive/connection-create-external.td new file mode 100644 index 0000000000000..f13bf2e66ad29 --- /dev/null +++ b/test/testdrive/connection-create-external.td @@ -0,0 +1,77 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +$ set-arg-default default-storage-size=1 +$ set-arg-default single-replica-cluster=quickstart + +# +# Test enforcement that connections are made to global non-private IPs + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET enable_connection_validation_syntax = true +ALTER SYSTEM SET storage_enforce_external_addresses = true + +! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT) WITH (VALIDATE = true) +contains:address is not global + +# Setup kafka topic with schema +# must be a subset of the keys in the rows +$ set keyschema={ + "type": "record", + "name": "Key", + "fields": [ + {"name": "id", "type": "long"} + ] + } + +$ set schema={ + "type" : "record", + "name" : "envelope", + "fields" : [ + { + "name": "before", + "type": [ + { + "name": "row", + "type": "record", + "fields": [ + { + "name": "id", + "type": "long" + }, + { + "name": "creature", + "type": "string" + }] + }, + "null" + ] + }, + { + "name": "after", + "type": ["row", "null"] + } + ] + } + +$ kafka-create-topic topic=csr_test partitions=1 + +$ kafka-ingest format=avro topic=csr_test key-format=avro key-schema=${keyschema} schema=${schema} timestamp=1 +{"id": 1} {"before": {"row": {"id": 1, "creature": "fish"}}, "after": {"row": {"id": 1, "creature": "mudskipper"}}} +{"id": 1} {"before": {"row": {"id": 1, "creature": "mudskipper"}}, "after": {"row": {"id": 1, "creature": "salamander"}}} +{"id": 1} {"before": {"row": {"id": 1, "creature": "salamander"}}, "after": {"row": {"id": 1, "creature": "lizard"}}} + + +! CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY ( + URL '${testdrive.schema-registry-url}' + ) WITH (VALIDATE = true); +contains:address is not global + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET storage_enforce_external_addresses = false From 07049d6743d75d978014243996222bb30a17ed57 Mon Sep 17 00:00:00 2001 From: Roshan Jobanputra Date: Thu, 21 Mar 2024 13:42:13 -0400 Subject: [PATCH 2/6] Fix other error checks to use dns error response; properly report errors from kafka client dns resolution --- src/kafka-util/src/client.rs | 121 +++++++++++++++--- src/mysql-util/src/tunnel.rs | 2 +- src/storage-types/src/connections.rs | 32 ++--- src/storage-types/src/errors.rs | 2 +- src/storage/src/source/kafka.rs | 37 ++++-- test/kafka-auth/test-kafka-ssl.td | 2 +- test/source-sink-errors/mzcompose.py | 14 +- ...ource-after-ssh-failure-restart-replica.td | 1 - .../pg-source-after-ssh-failure.td | 2 +- test/testdrive/connection-create-external.td | 2 +- test/testdrive/connection-validation.td | 4 +- test/testdrive/kafka-source-errors.td | 2 +- 12 files changed, 164 insertions(+), 57 deletions(-) diff --git a/src/kafka-util/src/client.rs b/src/kafka-util/src/client.rs index 5aed2c099eb30..ff26cd0ac2237 100644 --- a/src/kafka-util/src/client.rs +++ b/src/kafka-util/src/client.rs @@ -296,6 +296,9 @@ enum BrokerRewriteHandle { /// For _default_ ssh tunnels, we store an error if _creation_ /// of the tunnel failed, so that `tunnel_status` can return it. FailedDefaultSshTunnel(String), + /// We store an error if DNS resolution fails when resolving + /// a new broker host. + FailedDNSResolution(String), } /// Tunneling clients @@ -310,6 +313,24 @@ pub enum TunnelConfig { None, } +/// Status of all active ssh tunnels and direct broker connections for a `TunnelingClientContext`. +#[derive(Clone)] +pub struct TunnelingClientStatus { + /// Status of all active ssh tunnels. + pub ssh_status: SshTunnelStatus, + /// Status of direct broker connections. + pub broker_status: BrokerStatus, +} + +/// Status of direct broker connections for a `TunnelingClientContext`. +#[derive(Clone)] +pub enum BrokerStatus { + /// The broker connections are nominal. + Nominal, + /// At least one broker connection has failed. + Failed(String), +} + /// A client context that supports rewriting broker addresses. #[derive(Clone)] pub struct TunnelingClientContext { @@ -396,19 +417,22 @@ impl TunnelingClientContext { &self.inner } - /// Returns a _consolidated_ `SshTunnelStatus` that communicates the status - /// of all active ssh tunnels `self` knows about. - pub fn tunnel_status(&self) -> SshTunnelStatus { - self.rewrites - .lock() - .expect("poisoned") + /// Returns a `TunnelingClientStatus` that contains a _consolidated_ `SshTunnelStatus` to + /// communicates the status of all active ssh tunnels `self` knows about, and a `BrokerStatus` + /// that contains a _consolidated_ status of all direct broker connections. + pub fn tunnel_status(&self) -> TunnelingClientStatus { + let rewrites = self.rewrites.lock().expect("poisoned"); + + let ssh_status = rewrites .values() .map(|handle| match handle { BrokerRewriteHandle::SshTunnel(s) => s.check_status(), BrokerRewriteHandle::FailedDefaultSshTunnel(e) => { SshTunnelStatus::Errored(e.clone()) } - BrokerRewriteHandle::Simple(_) => SshTunnelStatus::Running, + BrokerRewriteHandle::Simple(_) | BrokerRewriteHandle::FailedDNSResolution(_) => { + SshTunnelStatus::Running + } }) .fold(SshTunnelStatus::Running, |acc, status| { match (acc, status) { @@ -423,7 +447,27 @@ impl TunnelingClientContext { SshTunnelStatus::Running } } + }); + + let broker_status = rewrites + .values() + .map(|handle| match handle { + BrokerRewriteHandle::FailedDNSResolution(e) => BrokerStatus::Failed(e.clone()), + _ => BrokerStatus::Nominal, }) + .fold(BrokerStatus::Nominal, |acc, status| match (acc, status) { + (BrokerStatus::Nominal, BrokerStatus::Failed(e)) + | (BrokerStatus::Failed(e), BrokerStatus::Nominal) => BrokerStatus::Failed(e), + (BrokerStatus::Failed(err), BrokerStatus::Failed(e)) => { + BrokerStatus::Failed(format!("{}, {}", err, e)) + } + (BrokerStatus::Nominal, BrokerStatus::Nominal) => BrokerStatus::Nominal, + }); + + TunnelingClientStatus { + ssh_status, + broker_status, + } } } @@ -446,7 +490,8 @@ where port: Some(addr.port()), } } - BrokerRewriteHandle::FailedDefaultSshTunnel(_) => { + BrokerRewriteHandle::FailedDefaultSshTunnel(_) + | BrokerRewriteHandle::FailedDNSResolution(_) => { unreachable!() } }; @@ -468,16 +513,30 @@ where let rewrite = self.rewrites.lock().expect("poisoned").get(&addr).cloned(); match rewrite { - None | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_)) => { + None + | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_)) + | Some(BrokerRewriteHandle::FailedDNSResolution(_)) => { match &self.default_tunnel { TunnelConfig::Ssh(default_tunnel) => { // Multiple users could all run `connect` at the same time; only one ssh // tunnel will ever be connected, and only one will be inserted into the // map. let ssh_tunnel = self.runtime.block_on(async { + // Ensure the default tunnel host is resolved to an external address. + let resolved_tunnel_addr = resolve_external_address( + &default_tunnel.host, + self.enforce_external_addresses, + ) + .await?; + let tunnel_config = SshTunnelConfig { + host: resolved_tunnel_addr.to_string(), + port: default_tunnel.port, + user: default_tunnel.user.clone(), + key_pair: default_tunnel.key_pair.clone(), + }; self.ssh_tunnel_manager .connect( - default_tunnel.clone(), + tunnel_config, &addr.host, addr.port.parse().unwrap(), self.ssh_timeout_config, @@ -493,6 +552,7 @@ where if matches!( o.get(), BrokerRewriteHandle::FailedDefaultSshTunnel(_) + | BrokerRewriteHandle::FailedDNSResolution(_) ) => { o.insert(BrokerRewriteHandle::SshTunnel( @@ -539,19 +599,42 @@ where TunnelConfig::None => { // If no rewrite is specified, we still should check that this potentially // new broker address is a global address. - let rewrite = self.runtime.block_on(async { - let resolved = resolve_external_address( + self.runtime.block_on(async { + match resolve_external_address( &addr.host, self.enforce_external_addresses, ) .await - .unwrap(); - BrokerRewriteHandle::Simple(BrokerRewrite { - host: resolved.to_string(), - port: addr.port.parse().ok(), - }) - }); - return_rewrite(&rewrite) + { + Ok(resolved) => { + let rewrite = BrokerRewriteHandle::Simple(BrokerRewrite { + host: resolved.to_string(), + port: addr.port.parse().ok(), + }); + return_rewrite(&rewrite) + } + Err(e) => { + warn!( + "failed to resolve external address for {:?}: {}", + addr, + e.display_with_causes() + ); + // Write an error if no one else has already written one. + let mut rewrites = self.rewrites.lock().expect("poisoned"); + rewrites.entry(addr.clone()).or_insert_with(|| { + BrokerRewriteHandle::FailedDNSResolution( + e.to_string_with_causes(), + ) + }); + // We have to give rdkafka an address, as this callback can't fail. + BrokerAddr { + host: "failed-dns-resolution.dev.materialize.com" + .to_string(), + port: 1337.to_string(), + } + } + } + }) } } } diff --git a/src/mysql-util/src/tunnel.rs b/src/mysql-util/src/tunnel.rs index 5139aef9ec10b..d0c6245ba2046 100644 --- a/src/mysql-util/src/tunnel.rs +++ b/src/mysql-util/src/tunnel.rs @@ -251,7 +251,7 @@ impl Config { // the TLS hostname back to the actual upstream host and not the // TCP hostname. opts_builder = opts_builder.ssl_opts(Some( - ssl_opts.clone().with_tls_hostname_override(Some( + ssl_opts.clone().with_danger_tls_hostname_override(Some( self.inner.ip_or_hostname().to_string(), )), )); diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index 3a357441edccc..ddeeac30c2497 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -692,8 +692,14 @@ impl KafkaConnection { .await?; let key_pair = SshKeyPair::from_bytes(&secret)?; + // Ensure any ssh-bastion address we connect to is resolved to an external address. + let resolved = resolve_external_address( + &ssh_tunnel.connection.host, + ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), + ) + .await?; context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig { - host: ssh_tunnel.connection.host.clone(), + host: resolved.to_string(), port: ssh_tunnel.connection.port, user: ssh_tunnel.connection.user.clone(), key_pair, @@ -712,25 +718,21 @@ impl KafkaConnection { }; match &broker.tunnel { Tunnel::Direct => { + // By default, don't override broker address lookup. + // + // N.B. + // // We _could_ pre-setup the default ssh tunnel for all known brokers here, but // we avoid doing because: // - Its not necessary. // - Not doing so makes it easier to test the `FailedDefaultSshTunnel` path // in the `TunnelingClientContext`. - - // Ensure any broker address we connect to is resolved to an external address. - let resolved = resolve_external_address( - &addr.host, - ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), - ) - .await?; - context.add_broker_rewrite( - addr, - BrokerRewrite { - host: resolved.to_string(), - port: None, - }, - ) + // + // NOTE that we do not need to use the `resolve_external_address` method to + // validate the broker address here since it will be validated when the + // connection is established in `src/kafka-util/src/client.rs`, and we do not + // want to specify any BrokerRewrite that would override any default-tunnel + // settings. } Tunnel::AwsPrivatelink(aws_privatelink) => { let host = mz_cloud_resources::vpc_endpoint_host( diff --git a/src/storage-types/src/errors.rs b/src/storage-types/src/errors.rs index d21932a623258..a4270d6eac799 100644 --- a/src/storage-types/src/errors.rs +++ b/src/storage-types/src/errors.rs @@ -1026,7 +1026,7 @@ where cx: &TunnelingClientContext, ) -> Result { self.map_err(|e| { - if let SshTunnelStatus::Errored(e) = cx.tunnel_status() { + if let SshTunnelStatus::Errored(e) = cx.tunnel_status().ssh_status { ContextCreationError::Ssh(anyhow!(e)) } else { ContextCreationError::from(e) diff --git a/src/storage/src/source/kafka.rs b/src/storage/src/source/kafka.rs index db33fd741f4bb..9afe8dd60221d 100644 --- a/src/storage/src/source/kafka.rs +++ b/src/storage/src/source/kafka.rs @@ -20,7 +20,9 @@ use chrono::{DateTime, NaiveDateTime}; use differential_dataflow::{AsCollection, Collection}; use futures::StreamExt; use maplit::btreemap; -use mz_kafka_util::client::{get_partitions, MzClientContext, PartitionId, TunnelingClientContext}; +use mz_kafka_util::client::{ + get_partitions, BrokerStatus, MzClientContext, PartitionId, TunnelingClientContext, +}; use mz_ore::error::ErrorExt; use mz_ore::future::InTask; use mz_ore::thread::{JoinHandleExt, UnparkOnDropHandle}; @@ -390,13 +392,29 @@ impl SourceRender for KafkaSourceConnection { "kafka metadata thread: updated partition metadata info", ); - // Clear all the health namespaces we know about. - // Note that many kafka sources's don't have an ssh tunnel, but - // the `health_operator` handles this fine. - *status_report.lock().unwrap() = HealthStatus { - kafka: Some(HealthStatusUpdate::running()), - ssh: Some(HealthStatusUpdate::running()), - }; + // Check to see if any broker errors have been hit + match consumer.client().context().tunnel_status().broker_status + { + BrokerStatus::Failed(err) => { + let status = HealthStatusUpdate::stalled( + format!("broker error: {}", err), + None, + ); + *status_report.lock().unwrap() = HealthStatus { + kafka: Some(status), + ssh: None, + }; + } + BrokerStatus::Nominal => { + // Clear all the health namespaces we know about. + // Note that many kafka sources's don't have an ssh tunnel, but + // the `health_operator` handles this fine. + *status_report.lock().unwrap() = HealthStatus { + kafka: Some(HealthStatusUpdate::running()), + ssh: Some(HealthStatusUpdate::running()), + }; + } + } } Err(e) => { let kafka_status = Some(HealthStatusUpdate::stalled( @@ -404,7 +422,8 @@ impl SourceRender for KafkaSourceConnection { None, )); - let ssh_status = consumer.client().context().tunnel_status(); + let ssh_status = + consumer.client().context().tunnel_status().ssh_status; let ssh_status = match ssh_status { SshTunnelStatus::Running => { Some(HealthStatusUpdate::running()) diff --git a/test/kafka-auth/test-kafka-ssl.td b/test/kafka-auth/test-kafka-ssl.td index 886471c34d08c..e0fd6c7d49f85 100644 --- a/test/kafka-auth/test-kafka-ssl.td +++ b/test/kafka-auth/test-kafka-ssl.td @@ -121,7 +121,7 @@ running # ALTER CONNECTION for Kafka + SSH ! ALTER CONNECTION testdrive_no_reset_connections.public.ssh SET (HOST = 'abcd') WITH (VALIDATE = true); -contains:Could not resolve hostname abcd +contains:failed to lookup address information: Name or service not known ! ALTER CONNECTION testdrive_no_reset_connections.public.ssh RESET (HOST); contains:HOST option is required diff --git a/test/source-sink-errors/mzcompose.py b/test/source-sink-errors/mzcompose.py index bff689245df98..5e6ea2fc8fd69 100644 --- a/test/source-sink-errors/mzcompose.py +++ b/test/source-sink-errors/mzcompose.py @@ -45,8 +45,7 @@ def schema() -> str: class Disruption(Protocol): name: str - def run_test(self, c: Composition) -> None: - ... + def run_test(self, c: Composition) -> None: ... @dataclass @@ -182,8 +181,13 @@ def populate(self, c: Composition) -> None: $ kafka-ingest topic=source-topic format=bytes ABC + # Specify a faster metadata refresh interval so errors are detected every second + # instead of every minute > CREATE SOURCE source1 - FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-source-topic-${testdrive.seed}') + FROM KAFKA CONNECTION kafka_conn ( + TOPIC 'testdrive-source-topic-${testdrive.seed}', + TOPIC METADATA REFRESH INTERVAL '1s' + ) FORMAT BYTES ENVELOPE NONE # WITH ( REMOTE 'clusterd:2100' ) https://github.com/MaterializeInc/materialize/issues/16582 @@ -466,7 +470,7 @@ def assert_recovery(self, c: Composition) -> None: "redpanda", "rpk", "topic", "delete", f"testdrive-source-topic-{seed}" ), expected_error="UnknownTopicOrPartition|topic", - fixage=None + fixage=None, # Re-creating the topic does not restart the source # fixage=lambda c,seed: redpanda_topics(c, "create", seed), ), @@ -499,7 +503,7 @@ def assert_recovery(self, c: Composition) -> None: PgDisruption( name="kill-postgres", breakage=lambda c, _: c.kill("postgres"), - expected_error="error connecting to server|connection closed|deadline has elapsed", + expected_error="error connecting to server|connection closed|deadline has elapsed|failed to lookup address information", fixage=lambda c, _: c.up("postgres"), ), PgDisruption( diff --git a/test/ssh-connection/kafka-source-after-ssh-failure-restart-replica.td b/test/ssh-connection/kafka-source-after-ssh-failure-restart-replica.td index fb8f327289bb6..8a1aa0b5c6223 100644 --- a/test/ssh-connection/kafka-source-after-ssh-failure-restart-replica.td +++ b/test/ssh-connection/kafka-source-after-ssh-failure-restart-replica.td @@ -14,7 +14,6 @@ > SELECT status FROM mz_internal.mz_source_statuses st JOIN mz_sources s ON st.id = s.id - WHERE error LIKE 'ssh:%' AND s.name in ('dynamic_text', 'fixed_text', 'fixed_plus_csr', 'dynamic_plus_csr') stalled stalled diff --git a/test/ssh-connection/pg-source-after-ssh-failure.td b/test/ssh-connection/pg-source-after-ssh-failure.td index 67442f72e93d8..1a9da9cd86538 100644 --- a/test/ssh-connection/pg-source-after-ssh-failure.td +++ b/test/ssh-connection/pg-source-after-ssh-failure.td @@ -11,5 +11,5 @@ > SELECT status FROM mz_internal.mz_source_statuses st JOIN mz_sources s ON st.id = s.id - WHERE s.name = 'mz_source' AND error LIKE 'ssh:%' + WHERE s.name = 'mz_source' stalled diff --git a/test/testdrive/connection-create-external.td b/test/testdrive/connection-create-external.td index f13bf2e66ad29..e1b98520cdbed 100644 --- a/test/testdrive/connection-create-external.td +++ b/test/testdrive/connection-create-external.td @@ -18,7 +18,7 @@ ALTER SYSTEM SET enable_connection_validation_syntax = true ALTER SYSTEM SET storage_enforce_external_addresses = true ! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT) WITH (VALIDATE = true) -contains:address is not global +contains:Failed to resolve hostname # Setup kafka topic with schema # must be a subset of the keys in the rows diff --git a/test/testdrive/connection-validation.td b/test/testdrive/connection-validation.td index 36db1e9876dbc..7692368ac1116 100644 --- a/test/testdrive/connection-validation.td +++ b/test/testdrive/connection-validation.td @@ -37,10 +37,10 @@ ALTER SYSTEM SET enable_connection_validation_syntax = true > CREATE CONNECTION invalid_tunnel TO SSH TUNNEL (HOST 'invalid', USER 'invalid', PORT 22) ! CREATE CONNECTION invalid_kafka_conn TO KAFKA (BROKERS ('${testdrive.kafka-addr}' USING SSH TUNNEL invalid_tunnel), SECURITY PROTOCOL PLAINTEXT) -contains:failed to connect to the remote host +contains:failed to lookup address information # Create the connection without validation and validate later > CREATE CONNECTION invalid_kafka_conn TO KAFKA (BROKERS ('${testdrive.kafka-addr}' USING SSH TUNNEL invalid_tunnel), SECURITY PROTOCOL PLAINTEXT) WITH (VALIDATE = false) ! VALIDATE CONNECTION invalid_kafka_conn -contains:failed to connect to the remote host +contains:failed to lookup address information diff --git a/test/testdrive/kafka-source-errors.td b/test/testdrive/kafka-source-errors.td index 9a848f89d44fd..fd89a44104efa 100644 --- a/test/testdrive/kafka-source-errors.td +++ b/test/testdrive/kafka-source-errors.td @@ -34,7 +34,7 @@ contains:Failed to resolve hostname ! CREATE CONNECTION IF NOT EXISTS fawlty_csr_conn TO CONFLUENT SCHEMA REGISTRY ( URL 'http://non-existent-csr:8081' ); -contains:failed to lookup address information +contains:failed to lookup address information: Name or service not known # Check that for all tables clause is rejected ! CREATE SOURCE bad_definition1 From 108e666f27b496d34105f060b78c54981254e6b6 Mon Sep 17 00:00:00 2001 From: Roshan Jobanputra Date: Tue, 26 Mar 2024 10:10:58 -0400 Subject: [PATCH 3/6] Provide all resolved IPs to mysql, postgres, and ssh clients --- Cargo.lock | 14 +++--- Cargo.toml | 3 ++ src/kafka-util/src/client.rs | 43 +++++++---------- src/mysql-util/src/tunnel.rs | 27 ++--------- src/ore/src/netio.rs | 2 +- src/ore/src/netio/dns.rs | 19 ++++---- src/postgres-util/src/tunnel.rs | 42 +++++++++++------ src/ssh-util/src/tunnel.rs | 70 +++++++++++++++++----------- src/ssh-util/src/tunnel_manager.rs | 18 +++++-- src/storage-types/src/connections.rs | 50 +++++++++++--------- src/workspace-hack/Cargo.toml | 4 +- test/source-sink-errors/mzcompose.py | 3 +- 12 files changed, 162 insertions(+), 133 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ae4c436e5c7f6..8828d67aff8c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3476,8 +3476,7 @@ dependencies = [ [[package]] name = "mysql_async" version = "0.34.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbfe87d7e35cb72363326216cc1712b865d8d4f70abf3b2d2e6b251fb6b2f427" +source = "git+https://github.com/MaterializeInc/mysql_async?rev=6afd2136181f2ec93b7ca7de524f6d02b6f10c1d#6afd2136181f2ec93b7ca7de524f6d02b6f10c1d" dependencies = [ "bytes", "crossbeam", @@ -7040,7 +7039,7 @@ checksum = "15eb2c6e362923af47e13c23ca5afb859e83d54452c55b0b9ac763b8f7c1ac16" [[package]] name = "postgres" version = "0.19.5" -source = "git+https://github.com/MaterializeInc/rust-postgres#b759caa33610403aa74b1cfdd37f45eb3100c9af" +source = "git+https://github.com/MaterializeInc/rust-postgres#91522e47643ebb6d6a5e392957b2319e5bb522ad" dependencies = [ "bytes", "fallible-iterator", @@ -7053,7 +7052,7 @@ dependencies = [ [[package]] name = "postgres-openssl" version = "0.5.0" -source = "git+https://github.com/MaterializeInc/rust-postgres#b759caa33610403aa74b1cfdd37f45eb3100c9af" +source = "git+https://github.com/MaterializeInc/rust-postgres#91522e47643ebb6d6a5e392957b2319e5bb522ad" dependencies = [ "openssl", "tokio", @@ -7064,7 +7063,7 @@ dependencies = [ [[package]] name = "postgres-protocol" version = "0.6.5" -source = "git+https://github.com/MaterializeInc/rust-postgres#b759caa33610403aa74b1cfdd37f45eb3100c9af" +source = "git+https://github.com/MaterializeInc/rust-postgres#91522e47643ebb6d6a5e392957b2319e5bb522ad" dependencies = [ "base64 0.21.5", "byteorder", @@ -7081,7 +7080,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.5" -source = "git+https://github.com/MaterializeInc/rust-postgres#b759caa33610403aa74b1cfdd37f45eb3100c9af" +source = "git+https://github.com/MaterializeInc/rust-postgres#91522e47643ebb6d6a5e392957b2319e5bb522ad" dependencies = [ "bytes", "chrono", @@ -9113,7 +9112,7 @@ dependencies = [ [[package]] name = "tokio-postgres" version = "0.7.8" -source = "git+https://github.com/MaterializeInc/rust-postgres#b759caa33610403aa74b1cfdd37f45eb3100c9af" +source = "git+https://github.com/MaterializeInc/rust-postgres#91522e47643ebb6d6a5e392957b2319e5bb522ad" dependencies = [ "async-trait", "byteorder", @@ -9128,6 +9127,7 @@ dependencies = [ "pin-project-lite", "postgres-protocol", "postgres-types", + "rand", "serde", "socket2 0.5.3", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 5680cf5963ad3..0716a0ee67494 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -189,6 +189,9 @@ postgres-types = { git = "https://github.com/MaterializeInc/rust-postgres" } postgres-openssl = { git = "https://github.com/MaterializeInc/rust-postgres" } postgres_array = { git = "https://github.com/MaterializeInc/rust-postgres-array" } +# Waiting on https://github.com/blackbeam/mysql_async/pull/300 +mysql_async = { git = "https://github.com/MaterializeInc/mysql_async", rev = "6afd2136181f2ec93b7ca7de524f6d02b6f10c1d" } + # Waiting on https://github.com/MaterializeInc/serde-value/pull/35. serde-value = { git = "https://github.com/MaterializeInc/serde-value.git" } diff --git a/src/kafka-util/src/client.rs b/src/kafka-util/src/client.rs index ff26cd0ac2237..f44ae6c04ea6d 100644 --- a/src/kafka-util/src/client.rs +++ b/src/kafka-util/src/client.rs @@ -23,7 +23,7 @@ use crossbeam::channel::{unbounded, Receiver, Sender}; use mz_ore::collections::CollectionExt; use mz_ore::error::ErrorExt; use mz_ore::future::InTask; -use mz_ore::netio::resolve_external_address; +use mz_ore::netio::resolve_address; use mz_ssh_util::tunnel::{SshTimeoutConfig, SshTunnelConfig, SshTunnelStatus}; use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager}; use rdkafka::client::{BrokerAddr, Client, NativeClient, OAuthToken}; @@ -298,7 +298,7 @@ enum BrokerRewriteHandle { FailedDefaultSshTunnel(String), /// We store an error if DNS resolution fails when resolving /// a new broker host. - FailedDNSResolution(String), + FailedDnsResolution(String), } /// Tunneling clients @@ -430,7 +430,7 @@ impl TunnelingClientContext { BrokerRewriteHandle::FailedDefaultSshTunnel(e) => { SshTunnelStatus::Errored(e.clone()) } - BrokerRewriteHandle::Simple(_) | BrokerRewriteHandle::FailedDNSResolution(_) => { + BrokerRewriteHandle::Simple(_) | BrokerRewriteHandle::FailedDnsResolution(_) => { SshTunnelStatus::Running } }) @@ -452,7 +452,7 @@ impl TunnelingClientContext { let broker_status = rewrites .values() .map(|handle| match handle { - BrokerRewriteHandle::FailedDNSResolution(e) => BrokerStatus::Failed(e.clone()), + BrokerRewriteHandle::FailedDnsResolution(e) => BrokerStatus::Failed(e.clone()), _ => BrokerStatus::Nominal, }) .fold(BrokerStatus::Nominal, |acc, status| match (acc, status) { @@ -491,7 +491,7 @@ where } } BrokerRewriteHandle::FailedDefaultSshTunnel(_) - | BrokerRewriteHandle::FailedDNSResolution(_) => { + | BrokerRewriteHandle::FailedDnsResolution(_) => { unreachable!() } }; @@ -515,28 +515,18 @@ where match rewrite { None | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_)) - | Some(BrokerRewriteHandle::FailedDNSResolution(_)) => { + | Some(BrokerRewriteHandle::FailedDnsResolution(_)) => { match &self.default_tunnel { TunnelConfig::Ssh(default_tunnel) => { // Multiple users could all run `connect` at the same time; only one ssh // tunnel will ever be connected, and only one will be inserted into the // map. let ssh_tunnel = self.runtime.block_on(async { - // Ensure the default tunnel host is resolved to an external address. - let resolved_tunnel_addr = resolve_external_address( - &default_tunnel.host, - self.enforce_external_addresses, - ) - .await?; - let tunnel_config = SshTunnelConfig { - host: resolved_tunnel_addr.to_string(), - port: default_tunnel.port, - user: default_tunnel.user.clone(), - key_pair: default_tunnel.key_pair.clone(), - }; self.ssh_tunnel_manager .connect( - tunnel_config, + // We know the default_tunnel has already been validated by the `resolve_address` + // method when it was provided to the client, so we don't need to check it again. + default_tunnel.clone(), &addr.host, addr.port.parse().unwrap(), self.ssh_timeout_config, @@ -552,7 +542,7 @@ where if matches!( o.get(), BrokerRewriteHandle::FailedDefaultSshTunnel(_) - | BrokerRewriteHandle::FailedDNSResolution(_) + | BrokerRewriteHandle::FailedDnsResolution(_) ) => { o.insert(BrokerRewriteHandle::SshTunnel( @@ -600,15 +590,14 @@ where // If no rewrite is specified, we still should check that this potentially // new broker address is a global address. self.runtime.block_on(async { - match resolve_external_address( - &addr.host, - self.enforce_external_addresses, - ) - .await + match resolve_address(&addr.host, self.enforce_external_addresses).await { Ok(resolved) => { let rewrite = BrokerRewriteHandle::Simple(BrokerRewrite { - host: resolved.to_string(), + // `resolve_address` will always return at least one address. + // TODO: Once we have a way to provide multiple hosts to rdkafka, we should + // return all resolved addresses here. + host: resolved.first().unwrap().to_string(), port: addr.port.parse().ok(), }); return_rewrite(&rewrite) @@ -622,7 +611,7 @@ where // Write an error if no one else has already written one. let mut rewrites = self.rewrites.lock().expect("poisoned"); rewrites.entry(addr.clone()).or_insert_with(|| { - BrokerRewriteHandle::FailedDNSResolution( + BrokerRewriteHandle::FailedDnsResolution( e.to_string_with_causes(), ) }); diff --git a/src/mysql-util/src/tunnel.rs b/src/mysql-util/src/tunnel.rs index d0c6245ba2046..91274fa72bd18 100644 --- a/src/mysql-util/src/tunnel.rs +++ b/src/mysql-util/src/tunnel.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::net::IpAddr; use std::ops::{Deref, DerefMut}; use std::time::Duration; @@ -27,7 +28,7 @@ use crate::MySqlError; #[derive(Debug, PartialEq, Clone)] pub enum TunnelConfig { /// Establish a direct TCP connection to the database host. - Direct { tcp_host_override: Option }, + Direct { resolved_ips: Option> }, /// Establish a TCP connection to the database via an SSH tunnel. /// This means first establishing an SSH connection to a bastion host, /// and then opening a separate connection from that host to the database. @@ -237,27 +238,9 @@ impl Config { ssh_tunnel_manager: &SshTunnelManager, ) -> Result { match &self.tunnel { - TunnelConfig::Direct { tcp_host_override } => { - // Override the connection host for the actual TCP connection to point to - // the privatelink hostname instead. - let mut opts_builder = OptsBuilder::from_opts(self.inner.clone()); - - if let Some(tcp_override) = tcp_host_override { - opts_builder = opts_builder.ip_or_hostname(tcp_override); - - if let Some(ssl_opts) = self.inner.ssl_opts() { - if !ssl_opts.skip_domain_validation() { - // If the TLS configuration will validate the hostname, we need to set - // the TLS hostname back to the actual upstream host and not the - // TCP hostname. - opts_builder = opts_builder.ssl_opts(Some( - ssl_opts.clone().with_danger_tls_hostname_override(Some( - self.inner.ip_or_hostname().to_string(), - )), - )); - } - } - }; + TunnelConfig::Direct { resolved_ips } => { + let opts_builder = + OptsBuilder::from_opts(self.inner.clone()).resolved_ips(resolved_ips.clone()); Ok(MySqlConn { conn: Conn::new(opts_builder).await.map_err(MySqlError::from)?, diff --git a/src/ore/src/netio.rs b/src/ore/src/netio.rs index 3a1cfcddd075a..e1268a7155a3c 100644 --- a/src/ore/src/netio.rs +++ b/src/ore/src/netio.rs @@ -22,7 +22,7 @@ mod read_exact; mod socket; pub use crate::netio::async_ready::AsyncReady; -pub use crate::netio::dns::resolve_external_address; +pub use crate::netio::dns::resolve_address; pub use crate::netio::framed::{FrameTooBig, MAX_FRAME_SIZE}; pub use crate::netio::read_exact::{read_exact_or_eof, ReadExactOrEof}; pub use crate::netio::socket::{Listener, SocketAddr, SocketAddrType, Stream, UnixSocketAddr}; diff --git a/src/ore/src/netio/dns.rs b/src/ore/src/netio/dns.rs index 03ad4f6976832..7ca1b4b243cfd 100644 --- a/src/ore/src/netio/dns.rs +++ b/src/ore/src/netio/dns.rs @@ -22,10 +22,10 @@ const DUMMY_PORT: u16 = 11111; /// Resolves a host address and ensures it is a global address when `enforce_global` is set. /// This parameter is useful when connecting to user-defined unverified addresses. -pub async fn resolve_external_address( +pub async fn resolve_address( mut host: &str, enforce_global: bool, -) -> Result { +) -> Result, io::Error> { // `net::lookup_host` requires a port to be specified, but we don't care about the port. let mut port = DUMMY_PORT; // If a port is already specified, use it and remove it from the host. @@ -37,23 +37,26 @@ pub async fn resolve_external_address( } let mut addrs = lookup_host((host, port)).await?; - - if let Some(addr) = addrs.next() { + let mut ips = Vec::new(); + while let Some(addr) = addrs.next() { let ip = addr.ip(); if enforce_global && !is_global(ip) { Err(io::Error::new( io::ErrorKind::AddrNotAvailable, "address is not global", - )) + ))? } else { - Ok(ip) + ips.push(ip); } - } else { + } + + if ips.len() == 0 { Err(io::Error::new( io::ErrorKind::AddrNotAvailable, "no addresses found", - )) + ))? } + Ok(ips) } fn is_global(addr: IpAddr) -> bool { diff --git a/src/postgres-util/src/tunnel.rs b/src/postgres-util/src/tunnel.rs index e6680728d97fe..3ab41c62231f5 100644 --- a/src/postgres-util/src/tunnel.rs +++ b/src/postgres-util/src/tunnel.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::net::IpAddr; use std::time::Duration; use mz_ore::future::{InTask, OreFutureExt}; @@ -38,7 +39,7 @@ macro_rules! bail_generic { #[derive(Debug, PartialEq, Clone)] pub enum TunnelConfig { /// Establish a direct TCP connection to the database host. - Direct { tcp_host_override: Option }, + Direct { resolved_ips: Option> }, /// Establish a TCP connection to the database via an SSH tunnel. /// This means first establishing an SSH connection to a bastion host, /// and then opening a separate connection from that host to the database. @@ -226,19 +227,22 @@ impl Config { })?; match &self.tunnel { - TunnelConfig::Direct { tcp_host_override } => { - // Override the TCP host we connect to, leaving the host used for TLS verification - // as the original host - if let Some(tcp_override) = tcp_host_override { - let (host, _) = self.address()?; - postgres_config.tls_verify_host(host); - - match postgres_config.get_hosts_mut() { - [Host::Tcp(host)] => *host = tcp_override.clone(), + TunnelConfig::Direct { resolved_ips } => { + if let Some(ips) = resolved_ips { + let host = match postgres_config.get_hosts() { + [Host::Tcp(host)] => host, _ => bail_generic!( "only TCP connections to a single PostgreSQL server are supported" ), } + .to_owned(); + // The number of 'host' and 'hostaddr' values must be the same. + for (idx, ip) in ips.iter().enumerate() { + if idx != 0 { + postgres_config.host(&host); + } + postgres_config.hostaddr(ip.clone()); + } }; let (client, connection) = async move { postgres_config.connect(tls).await } @@ -291,17 +295,25 @@ impl Config { // `tokio_postgres::Config` to do this is somewhat confusing, and requires we edit // the singular host in place. - let (host, _) = self.address()?; - postgres_config.tls_verify_host(host); - let privatelink_host = mz_cloud_resources::vpc_endpoint_name(*connection_id); + let privatelink_addrs = tokio::net::lookup_host(privatelink_host).await?; - match postgres_config.get_hosts_mut() { - [Host::Tcp(host)] => *host = privatelink_host, + // Override the actual IPs to connect to for the TCP connection, leaving the original host in-place + // for TLS verification + let host = match postgres_config.get_hosts() { + [Host::Tcp(host)] => host, _ => bail_generic!( "only TCP connections to a single PostgreSQL server are supported" ), } + .to_owned(); + // The number of 'host' and 'hostaddr' values must be the same. + for (idx, addr) in privatelink_addrs.enumerate() { + if idx != 0 { + postgres_config.host(&host); + } + postgres_config.hostaddr(addr.ip()); + } let (client, connection) = async move { postgres_config.connect(tls).await } .run_in_task_if(self.in_task, || "pg_connect".to_string()) diff --git a/src/ssh-util/src/tunnel.rs b/src/ssh-util/src/tunnel.rs index 0c9e9ac7ed79d..19a3f36a5401f 100644 --- a/src/ssh-util/src/tunnel.rs +++ b/src/ssh-util/src/tunnel.rs @@ -64,8 +64,9 @@ impl Default for SshTimeoutConfig { /// Specifies an SSH tunnel. #[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct SshTunnelConfig { - /// The hostname of the SSH bastion server. - pub host: String, + /// The hostname/IP of the SSH bastion server. + /// If multiple hosts are specified, they are tried in order. + pub host: Vec, /// The port to connect to. pub port: u16, /// The name of the user to connect as. @@ -108,7 +109,11 @@ impl SshTunnelConfig { ) -> Result { let tunnel_id = format!( "{}:{} via {}@{}:{}", - remote_host, remote_port, self.user, self.host, self.port, + remote_host, + remote_port, + self.user, + self.host.join(","), + self.port, ); // N.B. @@ -236,31 +241,44 @@ async fn connect( // Mostly helpful to ensure the file is not accidentally overwritten. tempfile.set_permissions(std::fs::Permissions::from_mode(0o400))?; - // Bastion hosts (and therefore keys) tend to change, so we don't want - // to lock ourselves into trusting only the first we see. In any case, - // recording a known host would only last as long as the life of a - // storage pod, so it doesn't offer any protection. - let session = openssh::SessionBuilder::default() - .known_hosts_check(openssh::KnownHosts::Accept) - .user_known_hosts_file("/dev/null") - .user(config.user.clone()) - .port(config.port) - .keyfile(&path) - .server_alive_interval(timeout_config.keepalives_idle) - .connect_timeout(timeout_config.connect_timeout) - .connect_mux(config.host.clone()) - .await?; - - // Delete the private key for safety: since `ssh` still has an open - // handle to it, it still has access to the key. - drop(tempfile); - fs::remove_file(&path)?; - drop(tempdir); + // Try connecting to each host in turn. + let mut connect_err = None; + for host in &config.host { + // Bastion hosts (and therefore keys) tend to change, so we don't want + // to lock ourselves into trusting only the first we see. In any case, + // recording a known host would only last as long as the life of a + // storage pod, so it doesn't offer any protection. + match openssh::SessionBuilder::default() + .known_hosts_check(openssh::KnownHosts::Accept) + .user_known_hosts_file("/dev/null") + .user(config.user.clone()) + .port(config.port) + .keyfile(&path) + .server_alive_interval(timeout_config.keepalives_idle) + .connect_timeout(timeout_config.connect_timeout) + .connect_mux(host.clone()) + .await + { + Ok(session) => { + // Delete the private key for safety: since `ssh` still has an open + // handle to it, it still has access to the key. + drop(tempfile); + fs::remove_file(&path)?; + drop(tempdir); - // Ensure session is healthy. - session.check().await?; + // Ensure session is healthy. + session.check().await?; - Ok(session) + return Ok(session); + } + Err(err) => { + connect_err = Some(err); + } + } + } + Err(connect_err + .map(Into::into) + .unwrap_or_else(|| anyhow::anyhow!("no hosts to connect to"))) } async fn port_forward(session: &Session, host: &str, port: u16) -> Result { diff --git a/src/ssh-util/src/tunnel_manager.rs b/src/ssh-util/src/tunnel_manager.rs index 547d4fae1c16b..b61f02656e58d 100644 --- a/src/ssh-util/src/tunnel_manager.rs +++ b/src/ssh-util/src/tunnel_manager.rs @@ -110,7 +110,11 @@ impl SshTunnelManager { error!( "not using existing ssh tunnel \ ({}:{} via {}@{}:{}) because it's broken: {e}", - remote_host, remote_port, config.user, config.host, config.port, + remote_host, + remote_port, + config.user, + config.host.join(","), + config.port, ); // This is bit unfortunate, as this method returns an @@ -124,7 +128,11 @@ impl SshTunnelManager { info!( "reusing existing ssh tunnel ({}:{} via {}@{}:{})", - remote_host, remote_port, config.user, config.host, config.port, + remote_host, + remote_port, + config.user, + config.host.join(","), + config.port, ); return Ok(handle); } @@ -147,7 +155,11 @@ impl SshTunnelManager { // Try to connect. info!( "initiating new ssh tunnel ({}:{} via {}@{}:{})", - remote_host, remote_port, config.user, config.host, config.port, + remote_host, + remote_port, + config.user, + config.host.join(","), + config.port, ); let config = config.clone(); diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index ddeeac30c2497..f221c6d486811 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -24,7 +24,7 @@ use mz_kafka_util::client::{ }; use mz_ore::error::ErrorExt; use mz_ore::future::{InTask, OreFutureExt}; -use mz_ore::netio::resolve_external_address; +use mz_ore::netio::resolve_address; use mz_proto::tokio_postgres::any_ssl_mode; use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError}; use mz_repr::url::any_url; @@ -693,13 +693,13 @@ impl KafkaConnection { let key_pair = SshKeyPair::from_bytes(&secret)?; // Ensure any ssh-bastion address we connect to is resolved to an external address. - let resolved = resolve_external_address( + let resolved = resolve_address( &ssh_tunnel.connection.host, ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), ) .await?; context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig { - host: resolved.to_string(), + host: resolved.iter().map(|a| a.to_string()).collect::>(), port: ssh_tunnel.connection.port, user: ssh_tunnel.connection.user.clone(), key_pair, @@ -728,7 +728,7 @@ impl KafkaConnection { // - Not doing so makes it easier to test the `FailedDefaultSshTunnel` path // in the `TunnelingClientContext`. // - // NOTE that we do not need to use the `resolve_external_address` method to + // NOTE that we do not need to use the `resolve_address` method to // validate the broker address here since it will be validated when the // connection is established in `src/kafka-util/src/client.rs`, and we do not // want to specify any BrokerRewrite that would override any default-tunnel @@ -750,7 +750,7 @@ impl KafkaConnection { } Tunnel::Ssh(ssh_tunnel) => { // Ensure any SSH bastion address we connect to is resolved to an external address. - let ssh_host_resolved = resolve_external_address( + let ssh_host_resolved = resolve_address( &ssh_tunnel.connection.host, ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), ) @@ -759,7 +759,10 @@ impl KafkaConnection { .add_ssh_tunnel( addr, SshTunnelConfig { - host: ssh_host_resolved.to_string(), + host: ssh_host_resolved + .iter() + .map(|a| a.to_string()) + .collect::>(), port: ssh_tunnel.connection.port, user: ssh_tunnel.connection.user.clone(), key_pair: SshKeyPair::from_bytes( @@ -1062,13 +1065,18 @@ impl CsrConnection { match &self.tunnel { Tunnel::Direct => { // Ensure any host we connect to is resolved to an external address. - let resolved = resolve_external_address( + let resolved = resolve_address( host, ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), ) .await?; - client_config = - client_config.resolve_to_addrs(host, &[SocketAddr::new(resolved, DUMMY_PORT)]) + client_config = client_config.resolve_to_addrs( + host, + &resolved + .iter() + .map(|addr| SocketAddr::new(*addr, DUMMY_PORT)) + .collect::>(), + ) } Tunnel::Ssh(ssh_tunnel) => { let ssh_tunnel = ssh_tunnel @@ -1357,13 +1365,13 @@ impl PostgresConnection { let tunnel = match &self.tunnel { Tunnel::Direct => { // Ensure any host we connect to is resolved to an external address. - let resolved = resolve_external_address( + let resolved = resolve_address( &self.host, ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), ) .await?; mz_postgres_util::TunnelConfig::Direct { - tcp_host_override: Some(resolved.to_string()), + resolved_ips: Some(resolved), } } Tunnel::Ssh(SshTunnel { @@ -1375,14 +1383,14 @@ impl PostgresConnection { .await?; let key_pair = SshKeyPair::from_bytes(&secret)?; // Ensure any ssh-bastion host we connect to is resolved to an external address. - let resolved = resolve_external_address( + let resolved = resolve_address( &connection.host, ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), ) .await?; mz_postgres_util::TunnelConfig::Ssh { config: SshTunnelConfig { - host: resolved.to_string(), + host: resolved.iter().map(|a| a.to_string()).collect::>(), port: connection.port, user: connection.user.clone(), key_pair, @@ -1737,13 +1745,13 @@ impl MySqlConnection { let tunnel = match &self.tunnel { Tunnel::Direct => { // Ensure any host we connect to is resolved to an external address. - let resolved = resolve_external_address( + let resolved = resolve_address( &self.host, ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), ) .await?; mz_mysql_util::TunnelConfig::Direct { - tcp_host_override: Some(resolved.to_string()), + resolved_ips: Some(resolved), } } Tunnel::Ssh(SshTunnel { @@ -1755,14 +1763,14 @@ impl MySqlConnection { .await?; let key_pair = SshKeyPair::from_bytes(&secret)?; // Ensure any ssh-bastion host we connect to is resolved to an external address. - let resolved = resolve_external_address( + let resolved = resolve_address( &connection.host, ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), ) .await?; mz_mysql_util::TunnelConfig::Ssh { config: SshTunnelConfig { - host: resolved.to_string(), + host: resolved.iter().map(|a| a.to_string()).collect::>(), port: connection.port, user: connection.user.clone(), key_pair, @@ -2043,7 +2051,7 @@ impl SshTunnel { in_task: InTask, ) -> Result { // Ensure any ssh-bastion host we connect to is resolved to an external address. - let resolved = resolve_external_address( + let resolved = resolve_address( &self.connection.host, ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), ) @@ -2053,7 +2061,7 @@ impl SshTunnel { .ssh_tunnel_manager .connect( SshTunnelConfig { - host: resolved.to_string(), + host: resolved.iter().map(|a| a.to_string()).collect::>(), port: self.connection.port, user: self.connection.user.clone(), key_pair: SshKeyPair::from_bytes( @@ -2123,14 +2131,14 @@ impl SshConnection { let key_pair = SshKeyPair::from_bytes(&secret)?; // Ensure any ssh-bastion host we connect to is resolved to an external address. - let resolved = resolve_external_address( + let resolved = resolve_address( &self.host, ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), ) .await?; let config = SshTunnelConfig { - host: resolved.to_string(), + host: resolved.iter().map(|a| a.to_string()).collect::>(), port: self.port, user: self.user.clone(), key_pair, diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 18445f6e60382..f5b6e2dce21b1 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -68,7 +68,7 @@ log = { version = "0.4.17", default-features = false, features = ["std"] } memchr = { version = "2.5.0", features = ["use_std"] } mime_guess = { version = "2.0.3" } mio = { version = "0.8.11", features = ["net", "os-ext"] } -mysql_async = { version = "0.34.1", default-features = false, features = ["binlog", "minimal", "native-tls-tls", "tracing"] } +mysql_async = { git = "https://github.com/MaterializeInc/mysql_async", rev = "6afd2136181f2ec93b7ca7de524f6d02b6f10c1d", default-features = false, features = ["binlog", "minimal", "native-tls-tls", "tracing"] } mysql_common = { version = "0.32.1", default-features = false, features = ["binlog", "chrono"] } native-tls = { version = "0.2.11", default-features = false, features = ["alpn"] } nix = { version = "0.26.1" } @@ -188,7 +188,7 @@ log = { version = "0.4.17", default-features = false, features = ["std"] } memchr = { version = "2.5.0", features = ["use_std"] } mime_guess = { version = "2.0.3" } mio = { version = "0.8.11", features = ["net", "os-ext"] } -mysql_async = { version = "0.34.1", default-features = false, features = ["binlog", "minimal", "native-tls-tls", "tracing"] } +mysql_async = { git = "https://github.com/MaterializeInc/mysql_async", rev = "6afd2136181f2ec93b7ca7de524f6d02b6f10c1d", default-features = false, features = ["binlog", "minimal", "native-tls-tls", "tracing"] } mysql_common = { version = "0.32.1", default-features = false, features = ["binlog", "chrono"] } native-tls = { version = "0.2.11", default-features = false, features = ["alpn"] } nix = { version = "0.26.1" } diff --git a/test/source-sink-errors/mzcompose.py b/test/source-sink-errors/mzcompose.py index 5e6ea2fc8fd69..5fb962c7adfc2 100644 --- a/test/source-sink-errors/mzcompose.py +++ b/test/source-sink-errors/mzcompose.py @@ -45,7 +45,8 @@ def schema() -> str: class Disruption(Protocol): name: str - def run_test(self, c: Composition) -> None: ... + def run_test(self, c: Composition) -> None: + ... @dataclass From 789c8d120a9d226cdb8f8585102d5b4ba9c94980 Mon Sep 17 00:00:00 2001 From: Roshan Jobanputra Date: Mon, 1 Apr 2024 15:57:03 -0400 Subject: [PATCH 4/6] Cargo vet certifications --- misc/cargo-vet/audits.toml | 25 +++++++++++++++++++++++++ misc/cargo-vet/config.toml | 20 -------------------- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/misc/cargo-vet/audits.toml b/misc/cargo-vet/audits.toml index 44dd334f029ab..c17bcf3c36298 100644 --- a/misc/cargo-vet/audits.toml +++ b/misc/cargo-vet/audits.toml @@ -187,6 +187,26 @@ who = "Gus Wynn " criteria = "maintained-and-necessary" version = "4.2.0" +[[audits.postgres]] +who = "Roshan Jobanputra " +criteria = "safe-to-deploy" +version = "0.19.5@git:91522e47643ebb6d6a5e392957b2319e5bb522ad" + +[[audits.postgres-openssl]] +who = "Roshan Jobanputra " +criteria = "safe-to-deploy" +version = "0.5.0@git:91522e47643ebb6d6a5e392957b2319e5bb522ad" + +[[audits.postgres-protocol]] +who = "Roshan Jobanputra " +criteria = "safe-to-deploy" +version = "0.6.5@git:91522e47643ebb6d6a5e392957b2319e5bb522ad" + +[[audits.postgres-types]] +who = "Roshan Jobanputra " +criteria = "safe-to-deploy" +version = "0.2.5@git:91522e47643ebb6d6a5e392957b2319e5bb522ad" + [[audits.quanta]] who = "Roshan Jobanputra " criteria = "safe-to-deploy" @@ -292,6 +312,11 @@ who = "Moritz Hoffmann " criteria = "safe-to-deploy" version = "0.12.0@git:46b28dc48bf5ed26dc0a0d5dbbd53c7964526f27" +[[audits.tokio-postgres]] +who = "Roshan Jobanputra " +criteria = "safe-to-deploy" +version = "0.7.8@git:91522e47643ebb6d6a5e392957b2319e5bb522ad" + [[audits.tracing-capture]] who = "Matt Jibson " criteria = "safe-to-deploy" diff --git a/misc/cargo-vet/config.toml b/misc/cargo-vet/config.toml index ca9b63c37d1ce..5d8b05f349f09 100644 --- a/misc/cargo-vet/config.toml +++ b/misc/cargo-vet/config.toml @@ -1010,22 +1010,6 @@ criteria = "safe-to-deploy" version = "0.3.15" criteria = "safe-to-deploy" -[[exemptions.postgres]] -version = "0.19.5@git:b759caa33610403aa74b1cfdd37f45eb3100c9af" -criteria = "safe-to-deploy" - -[[exemptions.postgres-openssl]] -version = "0.5.0@git:b759caa33610403aa74b1cfdd37f45eb3100c9af" -criteria = "safe-to-deploy" - -[[exemptions.postgres-protocol]] -version = "0.6.5@git:b759caa33610403aa74b1cfdd37f45eb3100c9af" -criteria = "safe-to-deploy" - -[[exemptions.postgres-types]] -version = "0.2.5@git:b759caa33610403aa74b1cfdd37f45eb3100c9af" -criteria = "safe-to-deploy" - [[exemptions.postgres_array]] version = "0.11.0@git:f58d0101e5198e04e8692629018d9b58f8543534" criteria = "safe-to-deploy" @@ -1514,10 +1498,6 @@ criteria = "safe-to-deploy" version = "0.2.12" criteria = "safe-to-deploy" -[[exemptions.tokio-postgres]] -version = "0.7.8@git:b759caa33610403aa74b1cfdd37f45eb3100c9af" -criteria = "safe-to-deploy" - [[exemptions.tokio-tungstenite]] version = "0.20.0" criteria = "safe-to-deploy" From a6e44492574529bd37bd495a92fa7acaaef2f909 Mon Sep 17 00:00:00 2001 From: Roshan Jobanputra Date: Fri, 12 Apr 2024 11:12:27 -0400 Subject: [PATCH 5/6] Revert kafka client changes until rdkafka can be updated --- src/kafka-util/src/client.rs | 112 ++----------------- src/storage-types/src/connections.rs | 12 +- src/storage-types/src/errors.rs | 2 +- src/storage/src/source/kafka.rs | 37 ++---- test/testdrive/connection-create-external.td | 3 - test/testdrive/kafka-source-errors.td | 2 +- 6 files changed, 25 insertions(+), 143 deletions(-) diff --git a/src/kafka-util/src/client.rs b/src/kafka-util/src/client.rs index f44ae6c04ea6d..621bddebffd76 100644 --- a/src/kafka-util/src/client.rs +++ b/src/kafka-util/src/client.rs @@ -23,7 +23,6 @@ use crossbeam::channel::{unbounded, Receiver, Sender}; use mz_ore::collections::CollectionExt; use mz_ore::error::ErrorExt; use mz_ore::future::InTask; -use mz_ore::netio::resolve_address; use mz_ssh_util::tunnel::{SshTimeoutConfig, SshTunnelConfig, SshTunnelStatus}; use mz_ssh_util::tunnel_manager::{ManagedSshTunnelHandle, SshTunnelManager}; use rdkafka::client::{BrokerAddr, Client, NativeClient, OAuthToken}; @@ -296,9 +295,6 @@ enum BrokerRewriteHandle { /// For _default_ ssh tunnels, we store an error if _creation_ /// of the tunnel failed, so that `tunnel_status` can return it. FailedDefaultSshTunnel(String), - /// We store an error if DNS resolution fails when resolving - /// a new broker host. - FailedDnsResolution(String), } /// Tunneling clients @@ -313,24 +309,6 @@ pub enum TunnelConfig { None, } -/// Status of all active ssh tunnels and direct broker connections for a `TunnelingClientContext`. -#[derive(Clone)] -pub struct TunnelingClientStatus { - /// Status of all active ssh tunnels. - pub ssh_status: SshTunnelStatus, - /// Status of direct broker connections. - pub broker_status: BrokerStatus, -} - -/// Status of direct broker connections for a `TunnelingClientContext`. -#[derive(Clone)] -pub enum BrokerStatus { - /// The broker connections are nominal. - Nominal, - /// At least one broker connection has failed. - Failed(String), -} - /// A client context that supports rewriting broker addresses. #[derive(Clone)] pub struct TunnelingClientContext { @@ -341,7 +319,6 @@ pub struct TunnelingClientContext { ssh_tunnel_manager: SshTunnelManager, ssh_timeout_config: SshTimeoutConfig, runtime: Handle, - enforce_external_addresses: bool, } impl TunnelingClientContext { @@ -352,7 +329,6 @@ impl TunnelingClientContext { ssh_tunnel_manager: SshTunnelManager, ssh_timeout_config: SshTimeoutConfig, in_task: InTask, - enforce_external_addresses: bool, ) -> TunnelingClientContext { TunnelingClientContext { inner, @@ -362,7 +338,6 @@ impl TunnelingClientContext { ssh_tunnel_manager, ssh_timeout_config, runtime, - enforce_external_addresses, } } @@ -417,22 +392,19 @@ impl TunnelingClientContext { &self.inner } - /// Returns a `TunnelingClientStatus` that contains a _consolidated_ `SshTunnelStatus` to - /// communicates the status of all active ssh tunnels `self` knows about, and a `BrokerStatus` - /// that contains a _consolidated_ status of all direct broker connections. - pub fn tunnel_status(&self) -> TunnelingClientStatus { - let rewrites = self.rewrites.lock().expect("poisoned"); - - let ssh_status = rewrites + /// Returns a _consolidated_ `SshTunnelStatus` that communicates the status + /// of all active ssh tunnels `self` knows about. + pub fn tunnel_status(&self) -> SshTunnelStatus { + self.rewrites + .lock() + .expect("poisoned") .values() .map(|handle| match handle { BrokerRewriteHandle::SshTunnel(s) => s.check_status(), BrokerRewriteHandle::FailedDefaultSshTunnel(e) => { SshTunnelStatus::Errored(e.clone()) } - BrokerRewriteHandle::Simple(_) | BrokerRewriteHandle::FailedDnsResolution(_) => { - SshTunnelStatus::Running - } + BrokerRewriteHandle::Simple(_) => SshTunnelStatus::Running, }) .fold(SshTunnelStatus::Running, |acc, status| { match (acc, status) { @@ -447,27 +419,7 @@ impl TunnelingClientContext { SshTunnelStatus::Running } } - }); - - let broker_status = rewrites - .values() - .map(|handle| match handle { - BrokerRewriteHandle::FailedDnsResolution(e) => BrokerStatus::Failed(e.clone()), - _ => BrokerStatus::Nominal, }) - .fold(BrokerStatus::Nominal, |acc, status| match (acc, status) { - (BrokerStatus::Nominal, BrokerStatus::Failed(e)) - | (BrokerStatus::Failed(e), BrokerStatus::Nominal) => BrokerStatus::Failed(e), - (BrokerStatus::Failed(err), BrokerStatus::Failed(e)) => { - BrokerStatus::Failed(format!("{}, {}", err, e)) - } - (BrokerStatus::Nominal, BrokerStatus::Nominal) => BrokerStatus::Nominal, - }); - - TunnelingClientStatus { - ssh_status, - broker_status, - } } } @@ -490,8 +442,7 @@ where port: Some(addr.port()), } } - BrokerRewriteHandle::FailedDefaultSshTunnel(_) - | BrokerRewriteHandle::FailedDnsResolution(_) => { + BrokerRewriteHandle::FailedDefaultSshTunnel(_) => { unreachable!() } }; @@ -513,9 +464,7 @@ where let rewrite = self.rewrites.lock().expect("poisoned").get(&addr).cloned(); match rewrite { - None - | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_)) - | Some(BrokerRewriteHandle::FailedDnsResolution(_)) => { + None | Some(BrokerRewriteHandle::FailedDefaultSshTunnel(_)) => { match &self.default_tunnel { TunnelConfig::Ssh(default_tunnel) => { // Multiple users could all run `connect` at the same time; only one ssh @@ -524,8 +473,6 @@ where let ssh_tunnel = self.runtime.block_on(async { self.ssh_tunnel_manager .connect( - // We know the default_tunnel has already been validated by the `resolve_address` - // method when it was provided to the client, so we don't need to check it again. default_tunnel.clone(), &addr.host, addr.port.parse().unwrap(), @@ -542,7 +489,6 @@ where if matches!( o.get(), BrokerRewriteHandle::FailedDefaultSshTunnel(_) - | BrokerRewriteHandle::FailedDnsResolution(_) ) => { o.insert(BrokerRewriteHandle::SshTunnel( @@ -586,45 +532,7 @@ where host: host.to_owned(), port: addr.port, }, - TunnelConfig::None => { - // If no rewrite is specified, we still should check that this potentially - // new broker address is a global address. - self.runtime.block_on(async { - match resolve_address(&addr.host, self.enforce_external_addresses).await - { - Ok(resolved) => { - let rewrite = BrokerRewriteHandle::Simple(BrokerRewrite { - // `resolve_address` will always return at least one address. - // TODO: Once we have a way to provide multiple hosts to rdkafka, we should - // return all resolved addresses here. - host: resolved.first().unwrap().to_string(), - port: addr.port.parse().ok(), - }); - return_rewrite(&rewrite) - } - Err(e) => { - warn!( - "failed to resolve external address for {:?}: {}", - addr, - e.display_with_causes() - ); - // Write an error if no one else has already written one. - let mut rewrites = self.rewrites.lock().expect("poisoned"); - rewrites.entry(addr.clone()).or_insert_with(|| { - BrokerRewriteHandle::FailedDnsResolution( - e.to_string_with_causes(), - ) - }); - // We have to give rdkafka an address, as this callback can't fail. - BrokerAddr { - host: "failed-dns-resolution.dev.materialize.com" - .to_string(), - port: 1337.to_string(), - } - } - } - }) - } + TunnelConfig::None => addr, } } Some(rewrite) => return_rewrite(&rewrite), diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index f221c6d486811..e9dcf5cdbd973 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -52,7 +52,7 @@ use url::Url; use crate::configuration::StorageConfiguration; use crate::connections::aws::{AwsConnection, AwsConnectionValidationError}; use crate::controller::AlterError; -use crate::dyncfgs::{KAFKA_CLIENT_ID_ENRICHMENT_RULES, ENFORCE_EXTERNAL_ADDRESSES}; +use crate::dyncfgs::{ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES}; use crate::errors::{ContextCreationError, CsrConnectError}; use crate::AlterCompatible; @@ -662,6 +662,9 @@ impl KafkaConnection { config.set(*k, v); } + // TODO(roshan): Implement enforcement of external address validation once + // rdkafka client has been updated to support providing multiple resolved + // addresses for brokers let mut context = TunnelingClientContext::new( context, Handle::current(), @@ -671,7 +674,6 @@ impl KafkaConnection { .clone(), storage_configuration.parameters.ssh_timeout_config, in_task, - ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()), ); match &self.default_tunnel { @@ -727,12 +729,6 @@ impl KafkaConnection { // - Its not necessary. // - Not doing so makes it easier to test the `FailedDefaultSshTunnel` path // in the `TunnelingClientContext`. - // - // NOTE that we do not need to use the `resolve_address` method to - // validate the broker address here since it will be validated when the - // connection is established in `src/kafka-util/src/client.rs`, and we do not - // want to specify any BrokerRewrite that would override any default-tunnel - // settings. } Tunnel::AwsPrivatelink(aws_privatelink) => { let host = mz_cloud_resources::vpc_endpoint_host( diff --git a/src/storage-types/src/errors.rs b/src/storage-types/src/errors.rs index a4270d6eac799..d21932a623258 100644 --- a/src/storage-types/src/errors.rs +++ b/src/storage-types/src/errors.rs @@ -1026,7 +1026,7 @@ where cx: &TunnelingClientContext, ) -> Result { self.map_err(|e| { - if let SshTunnelStatus::Errored(e) = cx.tunnel_status().ssh_status { + if let SshTunnelStatus::Errored(e) = cx.tunnel_status() { ContextCreationError::Ssh(anyhow!(e)) } else { ContextCreationError::from(e) diff --git a/src/storage/src/source/kafka.rs b/src/storage/src/source/kafka.rs index 9afe8dd60221d..db33fd741f4bb 100644 --- a/src/storage/src/source/kafka.rs +++ b/src/storage/src/source/kafka.rs @@ -20,9 +20,7 @@ use chrono::{DateTime, NaiveDateTime}; use differential_dataflow::{AsCollection, Collection}; use futures::StreamExt; use maplit::btreemap; -use mz_kafka_util::client::{ - get_partitions, BrokerStatus, MzClientContext, PartitionId, TunnelingClientContext, -}; +use mz_kafka_util::client::{get_partitions, MzClientContext, PartitionId, TunnelingClientContext}; use mz_ore::error::ErrorExt; use mz_ore::future::InTask; use mz_ore::thread::{JoinHandleExt, UnparkOnDropHandle}; @@ -392,29 +390,13 @@ impl SourceRender for KafkaSourceConnection { "kafka metadata thread: updated partition metadata info", ); - // Check to see if any broker errors have been hit - match consumer.client().context().tunnel_status().broker_status - { - BrokerStatus::Failed(err) => { - let status = HealthStatusUpdate::stalled( - format!("broker error: {}", err), - None, - ); - *status_report.lock().unwrap() = HealthStatus { - kafka: Some(status), - ssh: None, - }; - } - BrokerStatus::Nominal => { - // Clear all the health namespaces we know about. - // Note that many kafka sources's don't have an ssh tunnel, but - // the `health_operator` handles this fine. - *status_report.lock().unwrap() = HealthStatus { - kafka: Some(HealthStatusUpdate::running()), - ssh: Some(HealthStatusUpdate::running()), - }; - } - } + // Clear all the health namespaces we know about. + // Note that many kafka sources's don't have an ssh tunnel, but + // the `health_operator` handles this fine. + *status_report.lock().unwrap() = HealthStatus { + kafka: Some(HealthStatusUpdate::running()), + ssh: Some(HealthStatusUpdate::running()), + }; } Err(e) => { let kafka_status = Some(HealthStatusUpdate::stalled( @@ -422,8 +404,7 @@ impl SourceRender for KafkaSourceConnection { None, )); - let ssh_status = - consumer.client().context().tunnel_status().ssh_status; + let ssh_status = consumer.client().context().tunnel_status(); let ssh_status = match ssh_status { SshTunnelStatus::Running => { Some(HealthStatusUpdate::running()) diff --git a/test/testdrive/connection-create-external.td b/test/testdrive/connection-create-external.td index e1b98520cdbed..7b1cebfbe61d3 100644 --- a/test/testdrive/connection-create-external.td +++ b/test/testdrive/connection-create-external.td @@ -17,9 +17,6 @@ $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.mater ALTER SYSTEM SET enable_connection_validation_syntax = true ALTER SYSTEM SET storage_enforce_external_addresses = true -! CREATE CONNECTION testconn TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT) WITH (VALIDATE = true) -contains:Failed to resolve hostname - # Setup kafka topic with schema # must be a subset of the keys in the rows $ set keyschema={ diff --git a/test/testdrive/kafka-source-errors.td b/test/testdrive/kafka-source-errors.td index fd89a44104efa..9a848f89d44fd 100644 --- a/test/testdrive/kafka-source-errors.td +++ b/test/testdrive/kafka-source-errors.td @@ -34,7 +34,7 @@ contains:Failed to resolve hostname ! CREATE CONNECTION IF NOT EXISTS fawlty_csr_conn TO CONFLUENT SCHEMA REGISTRY ( URL 'http://non-existent-csr:8081' ); -contains:failed to lookup address information: Name or service not known +contains:failed to lookup address information # Check that for all tables clause is rejected ! CREATE SOURCE bad_definition1 From 0bc46ea6c8f31dc729502bb0f72b44129b143016 Mon Sep 17 00:00:00 2001 From: Roshan Jobanputra Date: Mon, 15 Apr 2024 15:10:49 -0400 Subject: [PATCH 6/6] Use a BtreeSet to ensure consistent matching for ssh tunnel connections --- Cargo.lock | 1 + src/mysql-util/src/tunnel.rs | 14 +++++++++++--- src/ore/src/netio/dns.rs | 7 ++++--- src/postgres-util/src/tunnel.rs | 15 +++++++++++--- src/ssh-util/Cargo.toml | 1 + src/ssh-util/src/tunnel.rs | 25 +++++++++++++++--------- src/ssh-util/src/tunnel_manager.rs | 29 +++++++++++----------------- src/storage-types/src/connections.rs | 29 +++++++++++++++++++++------- 8 files changed, 78 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8828d67aff8c1..a6b1da6169b3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5811,6 +5811,7 @@ version = "0.0.0" dependencies = [ "anyhow", "futures", + "itertools", "mz-ore", "openssh", "openssh-mux-client", diff --git a/src/mysql-util/src/tunnel.rs b/src/mysql-util/src/tunnel.rs index 91274fa72bd18..09a91bd89a747 100644 --- a/src/mysql-util/src/tunnel.rs +++ b/src/mysql-util/src/tunnel.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::BTreeSet; use std::net::IpAddr; use std::ops::{Deref, DerefMut}; use std::time::Duration; @@ -28,7 +29,11 @@ use crate::MySqlError; #[derive(Debug, PartialEq, Clone)] pub enum TunnelConfig { /// Establish a direct TCP connection to the database host. - Direct { resolved_ips: Option> }, + /// If `resolved_ips` is not None, the provided IPs will be used + /// rather than resolving the hostname. + Direct { + resolved_ips: Option>, + }, /// Establish a TCP connection to the database via an SSH tunnel. /// This means first establishing an SSH connection to a bastion host, /// and then opening a separate connection from that host to the database. @@ -239,8 +244,11 @@ impl Config { ) -> Result { match &self.tunnel { TunnelConfig::Direct { resolved_ips } => { - let opts_builder = - OptsBuilder::from_opts(self.inner.clone()).resolved_ips(resolved_ips.clone()); + let opts_builder = OptsBuilder::from_opts(self.inner.clone()).resolved_ips( + resolved_ips + .clone() + .map(|ips| ips.into_iter().collect::>()), + ); Ok(MySqlConn { conn: Conn::new(opts_builder).await.map_err(MySqlError::from)?, diff --git a/src/ore/src/netio/dns.rs b/src/ore/src/netio/dns.rs index 7ca1b4b243cfd..588018431a5cf 100644 --- a/src/ore/src/netio/dns.rs +++ b/src/ore/src/netio/dns.rs @@ -13,6 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeSet; use std::io; use std::net::IpAddr; @@ -25,7 +26,7 @@ const DUMMY_PORT: u16 = 11111; pub async fn resolve_address( mut host: &str, enforce_global: bool, -) -> Result, io::Error> { +) -> Result, io::Error> { // `net::lookup_host` requires a port to be specified, but we don't care about the port. let mut port = DUMMY_PORT; // If a port is already specified, use it and remove it from the host. @@ -37,7 +38,7 @@ pub async fn resolve_address( } let mut addrs = lookup_host((host, port)).await?; - let mut ips = Vec::new(); + let mut ips = BTreeSet::new(); while let Some(addr) = addrs.next() { let ip = addr.ip(); if enforce_global && !is_global(ip) { @@ -46,7 +47,7 @@ pub async fn resolve_address( "address is not global", ))? } else { - ips.push(ip); + ips.insert(ip); } } diff --git a/src/postgres-util/src/tunnel.rs b/src/postgres-util/src/tunnel.rs index 3ab41c62231f5..42f1aa81ab51d 100644 --- a/src/postgres-util/src/tunnel.rs +++ b/src/postgres-util/src/tunnel.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::BTreeSet; use std::net::IpAddr; use std::time::Duration; @@ -39,7 +40,11 @@ macro_rules! bail_generic { #[derive(Debug, PartialEq, Clone)] pub enum TunnelConfig { /// Establish a direct TCP connection to the database host. - Direct { resolved_ips: Option> }, + /// If `resolved_ips` is not None, the provided IPs will be used + /// rather than resolving the hostname. + Direct { + resolved_ips: Option>, + }, /// Establish a TCP connection to the database via an SSH tunnel. /// This means first establishing an SSH connection to a bastion host, /// and then opening a separate connection from that host to the database. @@ -236,7 +241,9 @@ impl Config { ), } .to_owned(); - // The number of 'host' and 'hostaddr' values must be the same. + // Associate each resolved ip with the exact same, singular host, for tls + // verification. We are required to do this dance because `tokio-postgres` + // enforces that the number of 'host' and 'hostaddr' values must be the same. for (idx, ip) in ips.iter().enumerate() { if idx != 0 { postgres_config.host(&host); @@ -307,7 +314,9 @@ impl Config { ), } .to_owned(); - // The number of 'host' and 'hostaddr' values must be the same. + // Associate each resolved ip with the exact same, singular host, for tls + // verification. We are required to do this dance because `tokio-postgres` + // enforces that the number of 'host' and 'hostaddr' values must be the same. for (idx, addr) in privatelink_addrs.enumerate() { if idx != 0 { postgres_config.host(&host); diff --git a/src/ssh-util/Cargo.toml b/src/ssh-util/Cargo.toml index b9fa732528a56..234899bf46f4e 100644 --- a/src/ssh-util/Cargo.toml +++ b/src/ssh-util/Cargo.toml @@ -17,6 +17,7 @@ openssh-mux-client = "0.15.5" openssl = { version = "0.10.48", features = ["vendored"] } rand = "0.8.5" futures = "0.3.25" +itertools = "0.10.5" scopeguard = "1.1.0" serde = { version = "1.0.152", features = ["derive"] } serde_json = { version = "1.0.89" } diff --git a/src/ssh-util/src/tunnel.rs b/src/ssh-util/src/tunnel.rs index 19a3f36a5401f..ea9f237d36b3c 100644 --- a/src/ssh-util/src/tunnel.rs +++ b/src/ssh-util/src/tunnel.rs @@ -7,6 +7,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::BTreeSet; use std::fmt; use std::fs::{self, File}; use std::io::Write; @@ -17,6 +18,7 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::bail; +use itertools::Itertools; use mz_ore::error::ErrorExt; use mz_ore::task::{self, AbortOnDropHandle}; use openssh::{ForwardType, Session}; @@ -66,7 +68,7 @@ impl Default for SshTimeoutConfig { pub struct SshTunnelConfig { /// The hostname/IP of the SSH bastion server. /// If multiple hosts are specified, they are tried in order. - pub host: Vec, + pub host: BTreeSet, /// The port to connect to. pub port: u16, /// The name of the user to connect as. @@ -86,6 +88,18 @@ impl fmt::Debug for SshTunnelConfig { } } +impl fmt::Display for SshTunnelConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}@{}:{}", + self.user, + self.host.iter().join(","), + self.port + ) + } +} + /// The status of a running SSH tunnel. #[derive(Clone, Debug)] pub enum SshTunnelStatus { @@ -107,14 +121,7 @@ impl SshTunnelConfig { remote_port: u16, timeout_config: SshTimeoutConfig, ) -> Result { - let tunnel_id = format!( - "{}:{} via {}@{}:{}", - remote_host, - remote_port, - self.user, - self.host.join(","), - self.port, - ); + let tunnel_id = format!("{}:{} via {}", remote_host, remote_port, self); // N.B. // diff --git a/src/ssh-util/src/tunnel_manager.rs b/src/ssh-util/src/tunnel_manager.rs index b61f02656e58d..74a73abc24565 100644 --- a/src/ssh-util/src/tunnel_manager.rs +++ b/src/ssh-util/src/tunnel_manager.rs @@ -109,12 +109,8 @@ impl SshTunnelManager { if let SshTunnelStatus::Errored(e) = handle.check_status() { error!( "not using existing ssh tunnel \ - ({}:{} via {}@{}:{}) because it's broken: {e}", - remote_host, - remote_port, - config.user, - config.host.join(","), - config.port, + ({}:{} via {}) because it's broken: {e}", + remote_host, remote_port, config ); // This is bit unfortunate, as this method returns an @@ -127,12 +123,8 @@ impl SshTunnelManager { } info!( - "reusing existing ssh tunnel ({}:{} via {}@{}:{})", - remote_host, - remote_port, - config.user, - config.host.join(","), - config.port, + "reusing existing ssh tunnel ({}:{} via {})", + remote_host, remote_port, config ); return Ok(handle); } @@ -154,12 +146,8 @@ impl SshTunnelManager { // Try to connect. info!( - "initiating new ssh tunnel ({}:{} via {}@{}:{})", - remote_host, - remote_port, - config.user, - config.host.join(","), - config.port, + "initiating new ssh tunnel ({}:{} via {})", + remote_host, remote_port, config ); let config = config.clone(); @@ -193,6 +181,11 @@ impl SshTunnelManager { } /// Identifies a connection to a remote host via an SSH tunnel. +/// There are a couple of edge cases where this key format may result +/// in extra connections being created: +/// 1. If a host resolves to a different number of ips on different workers +/// 2. Different workers connect to different upstream resolved ips if they +/// appear connectable at different times. #[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)] struct SshTunnelKey { config: SshTunnelConfig, diff --git a/src/storage-types/src/connections.rs b/src/storage-types/src/connections.rs index e9dcf5cdbd973..4b4f5745f42b6 100644 --- a/src/storage-types/src/connections.rs +++ b/src/storage-types/src/connections.rs @@ -10,7 +10,7 @@ //! Connection types. use std::borrow::Cow; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::net::SocketAddr; use std::sync::Arc; @@ -701,7 +701,10 @@ impl KafkaConnection { ) .await?; context.set_default_tunnel(TunnelConfig::Ssh(SshTunnelConfig { - host: resolved.iter().map(|a| a.to_string()).collect::>(), + host: resolved + .iter() + .map(|a| a.to_string()) + .collect::>(), port: ssh_tunnel.connection.port, user: ssh_tunnel.connection.user.clone(), key_pair, @@ -758,7 +761,7 @@ impl KafkaConnection { host: ssh_host_resolved .iter() .map(|a| a.to_string()) - .collect::>(), + .collect::>(), port: ssh_tunnel.connection.port, user: ssh_tunnel.connection.user.clone(), key_pair: SshKeyPair::from_bytes( @@ -1386,7 +1389,10 @@ impl PostgresConnection { .await?; mz_postgres_util::TunnelConfig::Ssh { config: SshTunnelConfig { - host: resolved.iter().map(|a| a.to_string()).collect::>(), + host: resolved + .iter() + .map(|a| a.to_string()) + .collect::>(), port: connection.port, user: connection.user.clone(), key_pair, @@ -1766,7 +1772,10 @@ impl MySqlConnection { .await?; mz_mysql_util::TunnelConfig::Ssh { config: SshTunnelConfig { - host: resolved.iter().map(|a| a.to_string()).collect::>(), + host: resolved + .iter() + .map(|a| a.to_string()) + .collect::>(), port: connection.port, user: connection.user.clone(), key_pair, @@ -2057,7 +2066,10 @@ impl SshTunnel { .ssh_tunnel_manager .connect( SshTunnelConfig { - host: resolved.iter().map(|a| a.to_string()).collect::>(), + host: resolved + .iter() + .map(|a| a.to_string()) + .collect::>(), port: self.connection.port, user: self.connection.user.clone(), key_pair: SshKeyPair::from_bytes( @@ -2134,7 +2146,10 @@ impl SshConnection { .await?; let config = SshTunnelConfig { - host: resolved.iter().map(|a| a.to_string()).collect::>(), + host: resolved + .iter() + .map(|a| a.to_string()) + .collect::>(), port: self.port, user: self.user.clone(), key_pair,