diff --git a/src/avro/src/schema.rs b/src/avro/src/schema.rs index 997c24956473e..198ca9422aa5e 100644 --- a/src/avro/src/schema.rs +++ b/src/avro/src/schema.rs @@ -813,6 +813,11 @@ impl UnionSchema { &self.schemas } + /// Returns a mutable slice to all variants of this schema. + pub fn variants_mut(&mut self) -> &mut [SchemaPieceOrNamed] { + &mut self.schemas + } + /// Returns true if the first variant of this `UnionSchema` is `Null`. pub fn is_nullable(&self) -> bool { !self.schemas.is_empty() && self.schemas[0] == SchemaPieceOrNamed::Piece(SchemaPiece::Null) @@ -1377,6 +1382,110 @@ impl Schema { p.parse(value) } + /// Parse a schema with named types from referenced schemas pre-populated. + /// + /// This is used when parsing a schema that references types defined in other + /// schemas (Confluent Schema Registry schema references). Referenced schemas + /// should be provided in dependency order (dependencies first). + /// + /// # Arguments + /// + /// * `primary` - The primary schema JSON value to parse + /// * `reference_schemas` - Schemas whose named types should be available during parsing + pub fn parse_with_references( + primary: &Value, + reference_schemas: &[Schema], + ) -> Result { + // Collect and remap named types from all referenced schemas + let (named, indices) = Self::collect_named_types(reference_schemas); + + // Create parser with pre-populated named types + let p = SchemaParser { + named: named.into_iter().map(Some).collect(), + indices, + }; + p.parse(primary) + } + + /// Collect all named types from a list of schemas, remapping indices as needed. + /// + /// When combining named types from multiple schemas, each schema's internal + /// indices need to be remapped to account for duplicates and new positions. + fn collect_named_types( + schemas: &[Schema], + ) -> (Vec, BTreeMap) { + let mut combined_named: Vec = Vec::new(); + let mut combined_indices: BTreeMap = BTreeMap::new(); + + for schema in schemas { + // First pass: Build the index_map from this schema's indices to combined indices. + // For types that already exist: map to existing combined index + // For new types: map to their future position in combined_named + let mut index_map: Vec = Vec::with_capacity(schema.named.len()); + let mut new_type_offset = combined_named.len(); + + for named_piece in &schema.named { + if let Some(&existing_idx) = combined_indices.get(&named_piece.name) { + // Type already exists, use existing index + index_map.push(existing_idx); + } else { + // New type, assign next available index + index_map.push(new_type_offset); + new_type_offset += 1; + } + } + + // Second pass: Add new types with proper remapping + for named_piece in &schema.named { + if combined_indices.contains_key(&named_piece.name) { + continue; + } + + let mut remapped = named_piece.clone(); + Self::remap_indices_in_piece_with_map(&mut remapped.piece, &index_map); + + let new_idx = combined_named.len(); + combined_indices.insert(remapped.name.clone(), new_idx); + combined_named.push(remapped); + } + } + + (combined_named, combined_indices) + } + + /// Recursively remap Named indices in a SchemaPiece using an index map. + fn remap_indices_in_piece_with_map(piece: &mut SchemaPiece, index_map: &[usize]) { + match piece { + SchemaPiece::Array(inner) => Self::remap_indices_with_map(inner, index_map), + SchemaPiece::Map(inner) => Self::remap_indices_with_map(inner, index_map), + SchemaPiece::Union(union) => { + for variant in union.variants_mut() { + Self::remap_indices_with_map(variant, index_map); + } + } + SchemaPiece::Record { fields, .. } => { + for field in fields { + Self::remap_indices_with_map(&mut field.schema, index_map); + } + } + _ => {} + } + } + + /// Remap a single SchemaPieceOrNamed using an index map. + fn remap_indices_with_map(item: &mut SchemaPieceOrNamed, index_map: &[usize]) { + match item { + SchemaPieceOrNamed::Named(idx) => { + if let Some(&new_idx) = index_map.get(*idx) { + *idx = new_idx; + } + } + SchemaPieceOrNamed::Piece(piece) => { + Self::remap_indices_in_piece_with_map(piece, index_map) + } + } + } + /// Converts `self` into its [Parsing Canonical Form]. /// /// [Parsing Canonical Form]: @@ -3034,4 +3143,254 @@ mod tests { assert_eq!(expected, actual, "Name::make_valid({input})") } } + + #[mz_ore::test] + fn test_parse_with_simple_reference() { + // Schema A defines a User record + let ref_schema_json = r#"{ + "type": "record", + "name": "User", + "namespace": "com.example", + "fields": [{"name": "id", "type": "int"}] + }"#; + + // Schema B references User + let primary_json = r#"{ + "type": "record", + "name": "Event", + "namespace": "com.example", + "fields": [{"name": "user", "type": "com.example.User"}] + }"#; + + let ref_schema = Schema::from_str(ref_schema_json).unwrap(); + let primary_value: Value = serde_json::from_str(primary_json).unwrap(); + + let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap(); + + // Verify both Event and User types are in the schema + let user_name = FullName { + name: "User".to_string(), + namespace: "com.example".to_string(), + }; + let event_name = FullName { + name: "Event".to_string(), + namespace: "com.example".to_string(), + }; + + assert!( + schema.indices.contains_key(&user_name), + "User type should be in schema indices" + ); + assert!( + schema.indices.contains_key(&event_name), + "Event type should be in schema indices" + ); + + // Verify Event's user field references User + if let SchemaPieceOrNamed::Named(event_idx) = &schema.top { + let event_piece = &schema.named[*event_idx].piece; + if let SchemaPiece::Record { fields, .. } = event_piece { + assert_eq!(fields.len(), 1); + assert_eq!(fields[0].name, "user"); + // The user field should reference the User type (Named) + assert!(matches!(fields[0].schema, SchemaPieceOrNamed::Named(_))); + } else { + panic!("Expected Event to be a record"); + } + } else { + panic!("Expected top to be Named"); + } + } + + #[mz_ore::test] + fn test_parse_with_nested_references() { + // Schema A defines Address + let schema_a = r#"{ + "type": "record", + "name": "Address", + "namespace": "com.example", + "fields": [ + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"} + ] + }"#; + + // Schema B defines User with Address field (references A) + let schema_b = r#"{ + "type": "record", + "name": "User", + "namespace": "com.example", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "address", "type": "com.example.Address"} + ] + }"#; + + // Schema C defines Event with User field (references B, transitively A) + let schema_c = r#"{ + "type": "record", + "name": "Event", + "namespace": "com.example", + "fields": [ + {"name": "user", "type": "com.example.User"}, + {"name": "timestamp", "type": "long"} + ] + }"#; + + // Parse A first + let ref_schema_a = Schema::from_str(schema_a).unwrap(); + + // Parse B with reference to A + let schema_b_value: Value = serde_json::from_str(schema_b).unwrap(); + let ref_schema_b = + Schema::parse_with_references(&schema_b_value, std::slice::from_ref(&ref_schema_a)) + .unwrap(); + + // Parse C with references to A and B (in dependency order) + let schema_c_value: Value = serde_json::from_str(schema_c).unwrap(); + let final_schema = + Schema::parse_with_references(&schema_c_value, &[ref_schema_a, ref_schema_b]).unwrap(); + + // Verify all three types are in the schema + for name in ["Address", "User", "Event"] { + let full_name = FullName { + name: name.to_string(), + namespace: "com.example".to_string(), + }; + assert!( + final_schema.indices.contains_key(&full_name), + "{} type should be in schema indices", + name + ); + } + } + + #[mz_ore::test] + fn test_parse_with_multiple_types_in_reference() { + // Schema A defines both Address and PhoneNumber + let ref_schema_json = r#"{ + "type": "record", + "name": "ContactInfo", + "namespace": "com.example", + "fields": [ + { + "name": "address", + "type": { + "type": "record", + "name": "Address", + "fields": [{"name": "street", "type": "string"}] + } + }, + { + "name": "phone", + "type": { + "type": "record", + "name": "PhoneNumber", + "fields": [{"name": "number", "type": "string"}] + } + } + ] + }"#; + + // Schema B references both Address and PhoneNumber + let primary_json = r#"{ + "type": "record", + "name": "User", + "namespace": "com.example", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "home", "type": "com.example.Address"}, + {"name": "mobile", "type": "com.example.PhoneNumber"} + ] + }"#; + + let ref_schema = Schema::from_str(ref_schema_json).unwrap(); + let primary_value: Value = serde_json::from_str(primary_json).unwrap(); + + let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap(); + + // Verify all types are in the schema + for name in ["Address", "PhoneNumber", "ContactInfo", "User"] { + let full_name = FullName { + name: name.to_string(), + namespace: "com.example".to_string(), + }; + assert!( + schema.indices.contains_key(&full_name), + "{} type should be in schema indices", + name + ); + } + } + + #[mz_ore::test] + fn test_parse_with_no_references() { + // When no references are provided, it should behave like regular parse + let schema_json = r#"{ + "type": "record", + "name": "Simple", + "fields": [{"name": "id", "type": "int"}] + }"#; + + let value: Value = serde_json::from_str(schema_json).unwrap(); + + let schema_with_refs = Schema::parse_with_references(&value, &[]).unwrap(); + let schema_normal = Schema::parse(&value).unwrap(); + + // Both should produce equivalent schemas + assert_eq!(schema_with_refs.named.len(), schema_normal.named.len()); + assert_eq!(schema_with_refs.indices.len(), schema_normal.indices.len()); + } + + #[mz_ore::test] + fn test_parse_with_reference_in_array() { + // Schema A defines an Item record + let ref_schema_json = r#"{ + "type": "record", + "name": "Item", + "namespace": "com.example", + "fields": [{"name": "name", "type": "string"}] + }"#; + + // Schema B has an array of Items + let primary_json = r#"{ + "type": "record", + "name": "Order", + "namespace": "com.example", + "fields": [ + {"name": "items", "type": {"type": "array", "items": "com.example.Item"}} + ] + }"#; + + let ref_schema = Schema::from_str(ref_schema_json).unwrap(); + let primary_value: Value = serde_json::from_str(primary_json).unwrap(); + + let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap(); + + // Verify both types exist + let item_name = FullName { + name: "Item".to_string(), + namespace: "com.example".to_string(), + }; + assert!(schema.indices.contains_key(&item_name)); + + // Verify the array items type is a Named reference + if let SchemaPieceOrNamed::Named(order_idx) = &schema.top { + let order_piece = &schema.named[*order_idx].piece; + if let SchemaPiece::Record { fields, .. } = order_piece { + if let SchemaPieceOrNamed::Piece(SchemaPiece::Array(inner)) = &fields[0].schema { + assert!( + matches!(inner.as_ref(), SchemaPieceOrNamed::Named(_)), + "Array items should be a Named reference to Item" + ); + } else { + panic!("Expected items field to be an array"); + } + } else { + panic!("Expected Order to be a record"); + } + } else { + panic!("Expected top to be Named"); + } + } } diff --git a/src/ccsr/src/client.rs b/src/ccsr/src/client.rs index fc959d9e3da19..b5980c4fa1474 100644 --- a/src/ccsr/src/client.rs +++ b/src/ccsr/src/client.rs @@ -109,6 +109,29 @@ impl Client { }) } + /// Gets the latest version of the specified subject along with its direct references. + /// Returns the subject and a list of subject names that this subject directly references. + pub async fn get_subject_with_references( + &self, + subject: &str, + ) -> Result<(Subject, Vec), GetBySubjectError> { + let req = self.make_request(Method::GET, &["subjects", subject, "versions", "latest"]); + let res: GetBySubjectResponse = send_request(req).await?; + let referenced_subjects: Vec = + res.references.iter().map(|r| r.subject.clone()).collect(); + Ok(( + Subject { + schema: Schema { + id: res.id, + raw: res.schema, + }, + version: res.version, + name: res.subject, + }, + referenced_subjects, + )) + } + /// Gets the config set for the specified subject pub async fn get_subject_config( &self, @@ -145,6 +168,7 @@ impl Client { while let Some((subject, version)) = subjects_queue.pop() { let req = self.make_request(Method::GET, &["subjects", &subject, "versions", &version]); let res: GetBySubjectResponse = send_request(req).await?; + println!("subject response: {res:#?}"); subjects.push(Subject { schema: Schema { id: res.id, diff --git a/src/interchange/benches/avro.rs b/src/interchange/benches/avro.rs index b8e08826e3f10..99dcde5fb6365 100644 --- a/src/interchange/benches/avro.rs +++ b/src/interchange/benches/avro.rs @@ -244,7 +244,7 @@ pub fn bench_avro(c: &mut Criterion) { "connect.name": "tpch.tpch.lineitem.Envelope" } "#; - let schema = parse_schema(schema_str).unwrap(); + let schema = parse_schema(schema_str, &[]).unwrap(); fn since_epoch(days: i64) -> i32 { Date::from_unix_epoch(days.try_into().unwrap()) @@ -394,7 +394,7 @@ pub fn bench_avro(c: &mut Criterion) { buf.extend(mz_avro::to_avro_datum(&schema, record).unwrap()); let len = u64::cast_from(buf.len()); - let mut decoder = Decoder::new(schema_str, None, "avro_bench".to_string(), false).unwrap(); + let mut decoder = Decoder::new(schema_str, &[], None, "avro_bench".to_string(), false).unwrap(); let mut bg = c.benchmark_group("avro"); bg.throughput(Throughput::Bytes(len)); diff --git a/src/interchange/src/avro.rs b/src/interchange/src/avro.rs index 29dc56304667a..7ebe26be1607a 100644 --- a/src/interchange/src/avro.rs +++ b/src/interchange/src/avro.rs @@ -44,7 +44,7 @@ mod tests { "fields": [] }"#; - let desc = schema_to_relationdesc(parse_schema(schema)?)?; + let desc = schema_to_relationdesc(parse_schema(schema, &[])?)?; assert_eq!(desc.arity(), 0, "empty record produced rows"); Ok(()) @@ -61,7 +61,7 @@ mod tests { ] }"#; - let desc = schema_to_relationdesc(parse_schema(schema)?)?; + let desc = schema_to_relationdesc(parse_schema(schema, &[])?)?; let expected_desc = RelationDesc::builder() .with_column("f1", SqlScalarType::Int32.nullable(false)) .with_column("f2", SqlScalarType::String.nullable(false)) diff --git a/src/interchange/src/avro/decode.rs b/src/interchange/src/avro/decode.rs index 6d1c14d0d2280..3812794cc3a34 100644 --- a/src/interchange/src/avro/decode.rs +++ b/src/interchange/src/avro/decode.rs @@ -51,7 +51,7 @@ mod tests { "name": "test", "fields": [{"name": "f1", "type": "int"}, {"name": "f2", "type": "int"}] }"#; - let mut decoder = Decoder::new(schema, None, "Test".to_string(), false).unwrap(); + let mut decoder = Decoder::new(schema, &[], None, "Test".to_string(), false).unwrap(); // This is not a valid Avro blob for the given schema let mut bad_bytes: &[u8] = &[0]; assert_err!(decoder.decode(&mut bad_bytes).await.unwrap()); @@ -71,14 +71,23 @@ impl Decoder { /// The provided schema is called the "reader schema", which is the schema /// that we are expecting to use to decode records. The records may indicate /// that they are encoded with a different schema; as long as those. + /// + /// The `reader_reference_schemas` parameter provides schemas for types that + /// are referenced by the reader schema but defined in separate schemas. + /// These should be provided in dependency order (dependencies first). pub fn new( reader_schema: &str, + reader_reference_schemas: &[String], ccsr_client: Option, debug_name: String, confluent_wire_format: bool, ) -> anyhow::Result { - let csr_avro = - ConfluentAvroResolver::new(reader_schema, ccsr_client, confluent_wire_format)?; + let csr_avro = ConfluentAvroResolver::new( + reader_schema, + reader_reference_schemas, + ccsr_client, + confluent_wire_format, + )?; Ok(Decoder { csr_avro, diff --git a/src/interchange/src/avro/schema.rs b/src/interchange/src/avro/schema.rs index be4617365379d..4b973c9464ae1 100644 --- a/src/interchange/src/avro/schema.rs +++ b/src/interchange/src/avro/schema.rs @@ -38,12 +38,13 @@ use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet}; use std::fmt; -use std::str::FromStr; use std::sync::Arc; use anyhow::{Context, anyhow, bail}; use mz_avro::error::Error as AvroError; -use mz_avro::schema::{Schema, SchemaNode, SchemaPiece, SchemaPieceOrNamed, resolve_schemas}; +use mz_avro::schema::{ + ParseSchemaError, Schema, SchemaNode, SchemaPiece, SchemaPieceOrNamed, resolve_schemas, +}; use mz_ore::cast::CastFrom; use mz_ore::collections::CollectionExt; use mz_ore::future::OreFutureExt; @@ -55,9 +56,17 @@ use tracing::warn; use crate::avro::is_null; -pub fn parse_schema(schema: &str) -> anyhow::Result { - let schema = serde_json::from_str(schema)?; - Ok(Schema::parse(&schema)?) +pub fn parse_schema(schema: &str, references: &[String]) -> anyhow::Result { + let schema: serde_json::Value = serde_json::from_str(schema)?; + // Parse reference schemas incrementally: each reference may depend on previous ones. + // References must be provided in dependency order (dependencies first). + let mut parsed_refs: Vec = Vec::with_capacity(references.len()); + for reference in references { + let ref_json: serde_json::Value = serde_json::from_str(reference)?; + let parsed = Schema::parse_with_references(&ref_json, &parsed_refs)?; + parsed_refs.push(parsed); + } + Ok(Schema::parse_with_references(&schema, &parsed_refs)?) } /// Converts an Apache Avro schema into a list of column names and types. @@ -304,10 +313,12 @@ pub struct ConfluentAvroResolver { impl ConfluentAvroResolver { pub fn new( reader_schema: &str, + reader_reference_schemas: &[String], ccsr_client: Option, confluent_wire_format: bool, ) -> anyhow::Result { - let reader_schema = parse_schema(reader_schema)?; + // parse_schema handles incremental parsing of references (dependencies first) + let reader_schema = parse_schema(reader_schema, reader_reference_schemas)?; let writer_schemas = ccsr_client.map(SchemaCache::new).transpose()?; Ok(Self { reader_schema, @@ -400,6 +411,10 @@ impl SchemaCache { /// that this schema cache was initialized with, returns the schema directly. /// If not, performs schema resolution on the reader and writer and /// returns the result. + /// + /// This method also handles schema references: if the schema references types + /// defined in other schemas, those schemas are fetched and their types are made + /// available during parsing. async fn get( &mut self, id: i32, @@ -412,14 +427,16 @@ impl SchemaCache { // immediately, and not cached, since it might get better on the // next retry. let ccsr_client = Arc::clone(&self.ccsr_client); - let response = Retry::default() + + // Fetch schema with its references (if any) + let (primary_subject, reference_subjects) = Retry::default() // Twice the timeout of the ccsr client so we can attempt 2 requests. .max_duration(ccsr_client.timeout() * 2) // Canceling because ultimately it's just non-mutating HTTP requests. .retry_async_canceling(move |state| { let ccsr_client = Arc::clone(&ccsr_client); async move { - let res = ccsr_client.get_schema_by_id(id).await; + let res = ccsr_client.get_subject_and_references_by_id(id).await; match res { Err(e) => { if let Some(timeout) = state.next_backoff { @@ -437,21 +454,49 @@ impl SchemaCache { }) .run_in_task(|| format!("fetch_avro_schema:{}", id)) .await?; + // Now, we've gotten some json back, so we want to cache it (regardless of whether it's a valid // avro schema, it won't change). // // However, we can't just cache it directly, since resolving schemas takes significant CPU work, - // which we don't want to repeat for every record. So, parse and resolve it, and cache the + // which we don't want to repeat for every record. So, parse and resolve it, and cache the // result (whether schema or error). - let result = Schema::from_str(&response.raw).and_then(|schema| { - // Schema fingerprints don't actually capture whether two schemas are meaningfully - // different, because they strip out logical types. Thus, resolve in all cases. - let resolved = resolve_schemas(&schema, reader_schema)?; - Ok(resolved) - }); + let result = Self::parse_with_references( + &primary_subject, + &reference_subjects, + reader_schema, + ); v.insert(result) } }; Ok(entry.as_ref().map_err(|e| anyhow::Error::new(e.clone()))) } + + /// Parse a schema along with its references and resolve against the reader schema. + fn parse_with_references( + primary_subject: &mz_ccsr::Subject, + reference_subjects: &[mz_ccsr::Subject], + reader_schema: &Schema, + ) -> Result { + // Parse referenced schemas incrementally: each reference may depend on previous ones. + // References are returned in dependency order (dependencies first). + let mut reference_schemas: Vec = Vec::with_capacity(reference_subjects.len()); + for subject in reference_subjects { + let ref_json: serde_json::Value = serde_json::from_str(&subject.schema.raw) + .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?; + let parsed = Schema::parse_with_references(&ref_json, &reference_schemas)?; + reference_schemas.push(parsed); + } + + // Parse primary schema, using references if present + + let primary_value: serde_json::Value = serde_json::from_str(&primary_subject.schema.raw) + .map_err(|e| ParseSchemaError::new(format!("Error parsing JSON: {}", e)))?; + let schema = Schema::parse_with_references(&primary_value, &reference_schemas)?; + + // Schema fingerprints don't actually capture whether two schemas are meaningfully + // different, because they strip out logical types. Thus, resolve in all cases. + let resolved = resolve_schemas(&schema, reader_schema)?; + Ok(resolved) + } } diff --git a/src/interchange/src/bin/avro-decode.rs b/src/interchange/src/bin/avro-decode.rs index a1cf9ce448033..8ce3cbcddc476 100644 --- a/src/interchange/src/bin/avro-decode.rs +++ b/src/interchange/src/bin/avro-decode.rs @@ -54,7 +54,7 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { let ccsr_client: Option = None; let debug_name = String::from("avro-decode"); let confluent_wire_format = false; - let mut decoder = Decoder::new(&schema, ccsr_client, debug_name, confluent_wire_format) + let mut decoder = Decoder::new(&schema, &[], ccsr_client, debug_name, confluent_wire_format) .context("creating decoder")?; let row = decoder.decode(&mut data).await.context("decoding data")?; println!("row: {row:?}"); diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs index 028314502ce74..a43f41bd1e98b 100644 --- a/src/sql-parser/src/ast/defs/ddl.rs +++ b/src/sql-parser/src/ast/defs/ddl.rs @@ -386,6 +386,12 @@ impl_display_t!(CsrConnectionProtobuf); pub struct CsrSeedAvro { pub key_schema: Option, pub value_schema: String, + /// Reference schemas for the key schema, in dependency order. + /// Populated during purification by fetching from the schema registry. + pub key_reference_schemas: Vec, + /// Reference schemas for the value schema, in dependency order. + /// Populated during purification by fetching from the schema registry. + pub value_reference_schemas: Vec, } impl AstDisplay for CsrSeedAvro { @@ -395,10 +401,34 @@ impl AstDisplay for CsrSeedAvro { f.write_str(" KEY SCHEMA '"); f.write_node(&display::escape_single_quote_string(key_schema)); f.write_str("'"); + if !self.key_reference_schemas.is_empty() { + f.write_str(" KEY REFERENCES ("); + for (i, schema) in self.key_reference_schemas.iter().enumerate() { + if i > 0 { + f.write_str(", "); + } + f.write_str("'"); + f.write_node(&display::escape_single_quote_string(schema)); + f.write_str("'"); + } + f.write_str(")"); + } } f.write_str(" VALUE SCHEMA '"); f.write_node(&display::escape_single_quote_string(&self.value_schema)); f.write_str("'"); + if !self.value_reference_schemas.is_empty() { + f.write_str(" VALUE REFERENCES ("); + for (i, schema) in self.value_reference_schemas.iter().enumerate() { + if i > 0 { + f.write_str(", "); + } + f.write_str("'"); + f.write_node(&display::escape_single_quote_string(schema)); + f.write_str("'"); + } + f.write_str(")"); + } } } impl_display!(CsrSeedAvro); diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index b920b72902168..db6a81e235721 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -2245,11 +2245,36 @@ impl<'a> Parser<'a> { } else { None }; + + // Parse KEY REFERENCES if present (only valid if we have a key schema) + let key_reference_schemas = + if key_schema.is_some() && self.parse_keywords(&[KEY, REFERENCES]) { + self.expect_token(&Token::LParen)?; + let refs = self.parse_comma_separated(|p| p.parse_literal_string())?; + self.expect_token(&Token::RParen)?; + refs + } else { + vec![] + }; + self.expect_keywords(&[VALUE, SCHEMA])?; let value_schema = self.parse_literal_string()?; + + // Parse VALUE REFERENCES if present + let value_reference_schemas = if self.parse_keywords(&[VALUE, REFERENCES]) { + self.expect_token(&Token::LParen)?; + let refs = self.parse_comma_separated(|p| p.parse_literal_string())?; + self.expect_token(&Token::RParen)?; + refs + } else { + vec![] + }; + Some(CsrSeedAvro { key_schema, value_schema, + key_reference_schemas, + value_reference_schemas, }) } else { None diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index 61d8f1d97f318..088b8061b5a86 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -2649,21 +2649,21 @@ CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT AVRO USING C ---- CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC = 'baz') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED VALUE SCHEMA '{"some": "schema"}' ENVELOPE NONE => -CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, key_strategy: None, value_strategy: None, seed: Some(CsrSeedAvro { key_schema: None, value_schema: "{\"some\": \"schema\"}" }) } }))), envelope: Some(None), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, key_strategy: None, value_strategy: None, seed: Some(CsrSeedAvro { key_schema: None, value_schema: "{\"some\": \"schema\"}", key_reference_schemas: [], value_reference_schemas: [] }) } }))), envelope: Some(None), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) parse-statement CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"some": "schema"}' VALUE SCHEMA '123' ENVELOPE NONE ---- CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC = 'baz') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"some": "schema"}' VALUE SCHEMA '123' ENVELOPE NONE => -CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, key_strategy: None, value_strategy: None, seed: Some(CsrSeedAvro { key_schema: Some("{\"some\": \"schema\"}"), value_schema: "123" }) } }))), envelope: Some(None), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, key_strategy: None, value_strategy: None, seed: Some(CsrSeedAvro { key_schema: Some("{\"some\": \"schema\"}"), value_schema: "123", key_reference_schemas: [], value_reference_schemas: [] }) } }))), envelope: Some(None), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) parse-statement CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"some": "schema"}' VALUE SCHEMA '123' ENVELOPE NONE ---- CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC = 'baz') KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"some": "schema"}' VALUE SCHEMA '123' ENVELOPE NONE => -CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(KeyValue { key: Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, key_strategy: None, value_strategy: None, seed: None } }), value: Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, key_strategy: None, value_strategy: None, seed: Some(CsrSeedAvro { key_schema: Some("{\"some\": \"schema\"}"), value_schema: "123" }) } }) }), envelope: Some(None), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(KeyValue { key: Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, key_strategy: None, value_strategy: None, seed: None } }), value: Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, key_strategy: None, value_strategy: None, seed: Some(CsrSeedAvro { key_schema: Some("{\"some\": \"schema\"}"), value_schema: "123", key_reference_schemas: [], value_reference_schemas: [] }) } }) }), envelope: Some(None), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) parse-statement @@ -2671,7 +2671,21 @@ CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') KEY FORMAT AVRO USI ---- CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC = 'baz') KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"some": "schema"}' VALUE SCHEMA '123' INCLUDE KEY, TIMESTAMP, PARTITION AS "PART2", OFFSET, HEADERS ENVELOPE NONE => -CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [Key { alias: None }, Timestamp { alias: None }, Partition { alias: Some(Ident("PART2")) }, Offset { alias: None }, Headers { alias: None }], format: Some(KeyValue { key: Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, key_strategy: None, value_strategy: None, seed: None } }), value: Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, key_strategy: None, value_strategy: None, seed: Some(CsrSeedAvro { key_schema: Some("{\"some\": \"schema\"}"), value_schema: "123" }) } }) }), envelope: Some(None), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [Key { alias: None }, Timestamp { alias: None }, Partition { alias: Some(Ident("PART2")) }, Offset { alias: None }, Headers { alias: None }], format: Some(KeyValue { key: Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, key_strategy: None, value_strategy: None, seed: None } }), value: Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, key_strategy: None, value_strategy: None, seed: Some(CsrSeedAvro { key_schema: Some("{\"some\": \"schema\"}"), value_schema: "123", key_reference_schemas: [], value_reference_schemas: [] }) } }) }), envelope: Some(None), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) + +parse-statement +CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED VALUE SCHEMA '{"some": "schema"}' VALUE REFERENCES ('{"type":"record","name":"User"}') ENVELOPE NONE +---- +CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC = 'baz') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED VALUE SCHEMA '{"some": "schema"}' VALUE REFERENCES ('{"type":"record","name":"User"}') ENVELOPE NONE +=> +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, key_strategy: None, value_strategy: None, seed: Some(CsrSeedAvro { key_schema: None, value_schema: "{\"some\": \"schema\"}", key_reference_schemas: [], value_reference_schemas: ["{\"type\":\"record\",\"name\":\"User\"}"] }) } }))), envelope: Some(None), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) + +parse-statement +CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC 'baz') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"key": "schema"}' KEY REFERENCES ('{"type":"record","name":"KeyRef"}') VALUE SCHEMA '{"value": "schema"}' VALUE REFERENCES ('{"type":"record","name":"Ref1"}', '{"type":"record","name":"Ref2"}') ENVELOPE NONE +---- +CREATE SOURCE src1 FROM KAFKA CONNECTION conn1 (TOPIC = 'baz') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn SEED KEY SCHEMA '{"key": "schema"}' KEY REFERENCES ('{"type":"record","name":"KeyRef"}') VALUE SCHEMA '{"value": "schema"}' VALUE REFERENCES ('{"type":"record","name":"Ref1"}', '{"type":"record","name":"Ref2"}') ENVELOPE NONE +=> +CreateSource(CreateSourceStatement { name: UnresolvedItemName([Ident("src1")]), in_cluster: None, col_names: [], connection: Kafka { connection: Name(UnresolvedItemName([Ident("conn1")])), options: [KafkaSourceConfigOption { name: Topic, value: Some(Value(String("baz"))) }] }, include_metadata: [], format: Some(Bare(Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("csr_conn")])), options: [] }, key_strategy: None, value_strategy: None, seed: Some(CsrSeedAvro { key_schema: Some("{\"key\": \"schema\"}"), value_schema: "{\"value\": \"schema\"}", key_reference_schemas: ["{\"type\":\"record\",\"name\":\"KeyRef\"}"], value_reference_schemas: ["{\"type\":\"record\",\"name\":\"Ref1\"}", "{\"type\":\"record\",\"name\":\"Ref2\"}"] }) } }))), envelope: Some(None), if_not_exists: false, key_constraint: None, with_options: [], external_references: None, progress_subsource: None }) parse-statement CREATE SOURCE golbat FROM KAFKA BROKER 'zubat' TOPIC 'hoothoot' KEY FORMAT TEXT VALUE FORMAT TEXT INCLUDE KEY ENVELOPE NONE diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index 51a6b7a1ef0f7..69a45617ede3c 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -2281,6 +2281,10 @@ generate_extracted_config!(AvroSchemaOption, (ConfluentWireFormat, bool, Default pub struct Schema { pub key_schema: Option, pub value_schema: String, + /// Reference schemas for the key schema, in dependency order. + pub key_reference_schemas: Vec, + /// Reference schemas for the value schema, in dependency order. + pub value_reference_schemas: Vec, pub csr_connection: Option<::Csr>, pub confluent_wire_format: bool, } @@ -2295,6 +2299,8 @@ fn get_encoding_inner( let Schema { key_schema, value_schema, + key_reference_schemas, + value_reference_schemas, csr_connection, confluent_wire_format, } = match schema { @@ -2312,6 +2318,8 @@ fn get_encoding_inner( Schema { key_schema: None, value_schema: schema.clone(), + key_reference_schemas: vec![], + value_reference_schemas: vec![], csr_connection: None, confluent_wire_format, } @@ -2343,6 +2351,8 @@ fn get_encoding_inner( Schema { key_schema: seed.key_schema.clone(), value_schema: seed.value_schema.clone(), + key_reference_schemas: seed.key_reference_schemas.clone(), + value_reference_schemas: seed.value_reference_schemas.clone(), csr_connection: Some(csr_connection), confluent_wire_format: true, } @@ -2356,11 +2366,13 @@ fn get_encoding_inner( return Ok(SourceDataEncoding { key: Some(DataEncoding::Avro(AvroEncoding { schema: key_schema, + reference_schemas: key_reference_schemas, csr_connection: csr_connection.clone(), confluent_wire_format, })), value: DataEncoding::Avro(AvroEncoding { schema: value_schema, + reference_schemas: value_reference_schemas, csr_connection, confluent_wire_format, }), @@ -2368,6 +2380,7 @@ fn get_encoding_inner( } else { DataEncoding::Avro(AvroEncoding { schema: value_schema, + reference_schemas: value_reference_schemas, csr_connection, confluent_wire_format, }) diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index 1d2da568d2da3..6afc4030f6b14 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use anyhow::anyhow; use itertools::Itertools; use mz_adapter_types::dyncfgs::ENABLE_S3_TABLES_REGION_CHECK; -use mz_ccsr::{Client, GetByIdError, GetBySubjectError, Schema as CcsrSchema}; +use mz_ccsr::{Client, GetBySubjectError}; use mz_cloud_provider::CloudProvider; use mz_controller_types::ClusterId; use mz_kafka_util::client::MzClientContext; @@ -2456,6 +2456,8 @@ async fn purify_csr_connection_avro( let Schema { key_schema, value_schema, + key_reference_schemas, + value_reference_schemas, } = get_remote_csr_schema( &ccsr_client, key_strategy.clone().unwrap_or_default(), @@ -2470,6 +2472,8 @@ async fn purify_csr_connection_avro( *seed = Some(CsrSeedAvro { key_schema, value_schema, + key_reference_schemas, + value_reference_schemas, }) } @@ -2480,17 +2484,53 @@ async fn purify_csr_connection_avro( pub struct Schema { pub key_schema: Option, pub value_schema: String, + /// Reference schemas for the key schema, in dependency order. + pub key_reference_schemas: Vec, + /// Reference schemas for the value schema, in dependency order. + pub value_reference_schemas: Vec, +} + +/// Result of fetching a schema, including any referenced schemas. +struct SchemaWithReferences { + /// The primary schema. + schema: String, + /// Reference schemas in dependency order. + references: Vec, } async fn get_schema_with_strategy( client: &Client, strategy: ReaderSchemaSelectionStrategy, subject: &str, -) -> Result, PlanError> { +) -> Result, PlanError> { match strategy { ReaderSchemaSelectionStrategy::Latest => { - match client.get_schema_by_subject(subject).await { - Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)), + // Use get_subject_and_references to also fetch referenced schemas + match client.get_subject_and_references(subject).await { + Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences { + schema: primary.schema.raw, + references: dependencies.into_iter().map(|s| s.schema.raw).collect(), + })), + Err(GetBySubjectError::SubjectNotFound) + | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None), + Err(e) => Err(PlanError::FetchingCsrSchemaFailed { + schema_lookup: format!("subject {}", subject.quoted()), + cause: Arc::new(e), + }), + } + } + // TODO (maz): inline strategy can still have reached out to registry and collected + // references, so not clear that we should have empty references here. + ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(SchemaWithReferences { + schema: raw, + references: vec![], + })), + ReaderSchemaSelectionStrategy::ById(id) => { + match client.get_subject_and_references_by_id(id).await { + Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences { + schema: primary.schema.raw, + references: dependencies.into_iter().map(|s| s.schema.raw).collect(), + })), Err(GetBySubjectError::SubjectNotFound) | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None), Err(e) => Err(PlanError::FetchingCsrSchemaFailed { @@ -2499,15 +2539,6 @@ async fn get_schema_with_strategy( }), } } - ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(raw)), - ReaderSchemaSelectionStrategy::ById(id) => match client.get_schema_by_id(id).await { - Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)), - Err(GetByIdError::SchemaNotFound) => Ok(None), - Err(e) => Err(PlanError::FetchingCsrSchemaFailed { - schema_lookup: format!("ID {}", id), - cause: Arc::new(e), - }), - }, } } @@ -2518,14 +2549,18 @@ async fn get_remote_csr_schema( topic: &str, ) -> Result { let value_schema_name = format!("{}-value", topic); - let value_schema = + let value_result = get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?; - let value_schema = value_schema.ok_or_else(|| anyhow!("No value schema found"))?; - let subject = format!("{}-key", topic); - let key_schema = get_schema_with_strategy(ccsr_client, key_strategy, &subject).await?; + let value_result = value_result.ok_or_else(|| anyhow!("No value schema found"))?; + + let key_subject = format!("{}-key", topic); + let key_result = get_schema_with_strategy(ccsr_client, key_strategy, &key_subject).await?; + Ok(Schema { - key_schema, - value_schema, + key_schema: key_result.as_ref().map(|r| r.schema.clone()), + value_schema: value_result.schema, + key_reference_schemas: key_result.map(|r| r.references).unwrap_or_default(), + value_reference_schemas: value_result.references, }) } diff --git a/src/storage-types/src/sources/encoding.rs b/src/storage-types/src/sources/encoding.rs index 189ce0d4f92f5..b4149972f2db1 100644 --- a/src/storage-types/src/sources/encoding.rs +++ b/src/storage-types/src/sources/encoding.rs @@ -148,8 +148,13 @@ impl DataEncoding { Self::Json => RelationDesc::builder() .with_column("data", SqlScalarType::Jsonb.nullable(false)) .finish(), - Self::Avro(AvroEncoding { schema, .. }) => { - let parsed_schema = avro::parse_schema(schema).context("validating avro schema")?; + Self::Avro(AvroEncoding { + schema, + reference_schemas, + .. + }) => { + let parsed_schema = avro::parse_schema(schema, reference_schemas) + .context("validating avro schema")?; avro::schema_to_relationdesc(parsed_schema).context("validating avro schema")? } Self::Protobuf(ProtobufEncoding { @@ -247,6 +252,10 @@ impl AlterCompatible for DataEncoding { #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub struct AvroEncoding { pub schema: String, + /// Schemas for types referenced by the main schema, in dependency order. + /// These are fetched from the schema registry when the source is created. + #[serde(default)] + pub reference_schemas: Vec, pub csr_connection: Option, pub confluent_wire_format: bool, } @@ -257,11 +266,13 @@ impl IntoInlineConnection fn into_inline_connection(self, r: R) -> AvroEncoding { let AvroEncoding { schema, + reference_schemas, csr_connection, confluent_wire_format, } = self; AvroEncoding { schema, + reference_schemas, csr_connection: csr_connection.map(|csr| r.resolve_connection(csr).unwrap_csr()), confluent_wire_format, } @@ -276,12 +287,17 @@ impl AlterCompatible for AvroEncoding { let AvroEncoding { schema, + reference_schemas, csr_connection, confluent_wire_format, } = self; let compatibility_checks = [ (schema == &other.schema, "schema"), + ( + reference_schemas == &other.reference_schemas, + "reference_schemas", + ), ( match (csr_connection, &other.csr_connection) { (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(), diff --git a/src/storage/src/decode.rs b/src/storage/src/decode.rs index f286d5348530b..a940c829cf185 100644 --- a/src/storage/src/decode.rs +++ b/src/storage/src/decode.rs @@ -346,6 +346,7 @@ async fn get_decoder( let decoder = match encoding { DataEncoding::Avro(AvroEncoding { schema, + reference_schemas, csr_connection, confluent_wire_format, }) => { @@ -360,6 +361,7 @@ async fn get_decoder( }; let state = avro::AvroDecoderState::new( &schema, + &reference_schemas, csr_client, debug_name.to_string(), confluent_wire_format, diff --git a/src/storage/src/decode/avro.rs b/src/storage/src/decode/avro.rs index 21fb2e04dd44c..93762bc78bb5c 100644 --- a/src/storage/src/decode/avro.rs +++ b/src/storage/src/decode/avro.rs @@ -21,12 +21,19 @@ pub struct AvroDecoderState { impl AvroDecoderState { pub fn new( value_schema: &str, + reference_schemas: &[String], ccsr_client: Option, debug_name: String, confluent_wire_format: bool, ) -> Result { Ok(AvroDecoderState { - decoder: Decoder::new(value_schema, ccsr_client, debug_name, confluent_wire_format)?, + decoder: Decoder::new( + value_schema, + reference_schemas, + ccsr_client, + debug_name, + confluent_wire_format, + )?, events_success: 0, }) } diff --git a/src/testdrive/src/action.rs b/src/testdrive/src/action.rs index 5128ac15cd5dd..edda438837f57 100644 --- a/src/testdrive/src/action.rs +++ b/src/testdrive/src/action.rs @@ -706,12 +706,98 @@ impl State { let testdrive_subjects: Vec<_> = subjects .iter() .filter(|s| s.starts_with("testdrive-")) + .cloned() .collect(); - for subject in testdrive_subjects { - match self.ccsr_client.delete_subject(subject).await { - Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (), - Err(e) => errors.push(e.into()), + // Build a dependency graph to delete subjects in the correct order. + // If subject A references subject B, we must delete A before B. + // We build a map of: subject -> set of subjects that reference it (dependents) + #[allow(clippy::disallowed_types)] + let mut dependents: std::collections::HashMap< + String, + std::collections::HashSet, + > = std::collections::HashMap::new(); + #[allow(clippy::disallowed_types)] + let mut references: std::collections::HashMap> = + std::collections::HashMap::new(); + + // Initialize all subjects + for subject in &testdrive_subjects { + dependents.entry(subject.clone()).or_default(); + } + + // Fetch references for each subject and build the dependency graph + for subject in &testdrive_subjects { + match self.ccsr_client.get_subject_with_references(subject).await { + Ok((_subj, refs)) => { + // Filter to only testdrive subjects + let refs: Vec<_> = refs + .into_iter() + .filter(|r| r.starts_with("testdrive-")) + .collect(); + for referenced in &refs { + dependents + .entry(referenced.clone()) + .or_default() + .insert(subject.clone()); + } + references.insert(subject.clone(), refs); + } + Err(mz_ccsr::GetBySubjectError::SubjectNotFound) => { + // Subject was already deleted, skip it + } + Err(e) => { + errors.push(anyhow::anyhow!( + "failed to get references for subject {}: {}", + subject, + e + )); + } + } + } + + // Delete subjects in topological order: + // First delete subjects that have no dependents (nothing references them), + // then remove them from the dependent sets of other subjects, repeat. + #[allow(clippy::disallowed_types)] + let mut remaining: std::collections::HashSet = + testdrive_subjects.into_iter().collect(); + + while !remaining.is_empty() { + // Find subjects with no dependents (safe to delete) + let deletable: Vec<_> = remaining + .iter() + .filter(|s| dependents.get(*s).map(|d| d.is_empty()).unwrap_or(true)) + .cloned() + .collect(); + + if deletable.is_empty() { + // Circular dependency or all remaining have dependents - try deleting anyway + // This handles edge cases where references point to non-testdrive subjects + for subject in &remaining { + match self.ccsr_client.delete_subject(subject).await { + Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (), + Err(e) => errors.push(e.into()), + } + } + break; + } + + for subject in deletable { + match self.ccsr_client.delete_subject(&subject).await { + Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (), + Err(e) => errors.push(e.into()), + } + remaining.remove(&subject); + + // Remove this subject from the dependent sets of subjects it referenced + if let Some(refs) = references.get(&subject) { + for referenced in refs { + if let Some(deps) = dependents.get_mut(referenced) { + deps.remove(&subject); + } + } + } } } } diff --git a/src/testdrive/src/action/kafka/ingest.rs b/src/testdrive/src/action/kafka/ingest.rs index 44da0c8fe1132..125cf08727f38 100644 --- a/src/testdrive/src/action/kafka/ingest.rs +++ b/src/testdrive/src/action/kafka/ingest.rs @@ -29,11 +29,139 @@ use crate::parser::BuiltinCommand; const INGEST_BATCH_SIZE: isize = 10000; +/// Extracts ALL type names defined in an Avro schema (including nested types). +/// Returns a set of fully qualified type names. +#[allow(clippy::disallowed_types)] +fn extract_all_defined_types( + schema_json: &str, +) -> anyhow::Result> { + let value: serde_json::Value = serde_json::from_str(schema_json) + .context("parsing schema JSON to extract defined types")?; + + let mut types = std::collections::HashSet::new(); + collect_defined_types(&value, None, &mut types); + Ok(types) +} + +/// Recursively collects all named type definitions from an Avro schema. +#[allow(clippy::disallowed_types)] +fn collect_defined_types( + value: &serde_json::Value, + parent_namespace: Option<&str>, + types: &mut std::collections::HashSet, +) { + match value { + serde_json::Value::Object(map) => { + // Get this schema's namespace (falls back to parent's namespace) + let namespace = map + .get("namespace") + .and_then(|v| v.as_str()) + .or(parent_namespace); + + // Check if this is a named type definition (record, enum, or fixed) + if let Some(type_val) = map.get("type") + && type_val + .as_str() + .is_some_and(|typ| ["record", "enum", "fixed"].contains(&typ)) + { + if let Some(name) = map.get("name").and_then(|v| v.as_str()) { + // Construct fully qualified name + let fullname = if name.contains('.') { + name.to_string() + } else if let Some(ns) = namespace { + format!("{}.{}", ns, name) + } else { + name.to_string() + }; + types.insert(fullname); + } + } + + // The following types may have references: + // type field, items (array types), values (map types), and fields (e.g. unions) + for entity_type in &["type", "items", "values", "fields"] { + if let Some(val) = map.get(*entity_type) { + collect_defined_types(val, namespace, types); + } + } + } + serde_json::Value::Array(arr) => { + for item in arr { + collect_defined_types(item, parent_namespace, types); + } + } + _ => {} + } +} + +/// Extracts all type references from an Avro schema JSON string. +/// This finds all fully qualified type names that are referenced but not defined in the schema. +#[allow(clippy::disallowed_types)] +fn extract_type_references(schema_json: &str) -> anyhow::Result> { + let value: serde_json::Value = serde_json::from_str(schema_json) + .context("parsing schema JSON to extract type references")?; + + let mut references = std::collections::HashSet::new(); + collect_type_references(&value, &mut references); + Ok(references) +} + +/// Recursively collects type references from an Avro schema JSON value. +#[allow(clippy::disallowed_types)] +fn collect_type_references( + value: &serde_json::Value, + references: &mut std::collections::HashSet, +) { + match value { + serde_json::Value::String(s) => { + // A string type that contains a dot is likely a fully qualified type reference + if s.contains('.') + && ![ + "null", "boolean", "int", "long", "float", "double", "bytes", "string", + ] + .contains(&s.as_str()) + { + references.insert(s.clone()); + } + } + serde_json::Value::Object(map) => { + // For named types, we want to recurse into the fields, but the named type doesn't + // get added to references. + if let Some(type_val) = map.get("type") + && type_val + .as_str() + .is_some_and(|typ| ["record", "enum", "fixed"].contains(&typ)) + { + if let Some(fields) = map.get("fields") { + collect_type_references(fields, references); + } + return; + } + + // The following types may have references: + // type field, items (array types), values (map types), and fields (e.g. unions) + for entity_type in &["type", "items", "values", "fields"] { + if let Some(val) = map.get(*entity_type) { + collect_type_references(val, references); + } + } + } + serde_json::Value::Array(arr) => { + for item in arr { + collect_type_references(item, references); + } + } + _ => {} + } +} + #[derive(Clone)] enum Format { Avro { schema: String, confluent_wire_format: bool, + /// Schema references (subject names) for Confluent Schema Registry + references: Vec, }, Protobuf { descriptor_file: String, @@ -178,6 +306,12 @@ pub async fn run_ingest( "avro" => Format::Avro { schema: cmd.args.string("schema")?, confluent_wire_format: cmd.args.opt_bool("confluent-wire-format")?.unwrap_or(true), + // TODO (maz): update README! + references: cmd + .args + .opt_string("references") + .map(|s| s.split(',').map(|s| s.to_string()).collect()) + .unwrap_or_default(), }, "protobuf" => { let descriptor_file = cmd.args.string("descriptor-file")?; @@ -202,6 +336,11 @@ pub async fn run_ingest( anyhow!("key-schema parameter required when key-format is present") })?, confluent_wire_format: cmd.args.opt_bool("confluent-wire-format")?.unwrap_or(true), + references: cmd + .args + .opt_string("key-references") + .map(|s| s.split(',').map(|s| s.to_string()).collect()) + .unwrap_or_default(), }), Some("protobuf") => { let descriptor_file = cmd.args.string("key-descriptor-file")?; @@ -411,18 +550,95 @@ async fn make_transcoder( Format::Avro { schema, confluent_wire_format, + references, } => { if confluent_wire_format { + // Build references list by fetching each subject from the registry + // We need ALL references for local parsing, but only DIRECT references for the registry + let mut reference_subjects = vec![]; + for reference in &references { + let subject = state + .ccsr_client + .get_subject_latest(reference) + .await + .with_context(|| format!("fetching reference {}", reference))?; + // Extract ALL type names defined in this schema (including nested types) + let defined_types = extract_all_defined_types(&subject.schema.raw) + .with_context(|| { + format!("extracting type names from reference schema {}", reference) + })?; + reference_subjects.push(( + reference.to_string(), + subject.version, + subject.schema.raw, + defined_types, + )); + } + + // Extract types directly referenced by the primary schema + let direct_refs = extract_type_references(&schema) + .context("extracting type references from schema")?; + + // For the registry, create a reference for each type in direct_refs + // that is defined in one of the reference subjects + let mut schema_references = vec![]; + for type_name in &direct_refs { + for (subject_name, version, _, defined_types) in &reference_subjects { + if defined_types.contains(type_name) { + schema_references.push(mz_ccsr::SchemaReference { + name: type_name.clone(), + subject: subject_name.clone(), + version: *version, + }); + break; + } + } + } + + // For local parsing, we need all reference schemas + let reference_raw_schemas: Vec<_> = reference_subjects + .into_iter() + .map(|(_, _, raw, _)| raw) + .collect(); + let schema_id = state .ccsr_client - .publish_schema(&ccsr_subject, &schema, mz_ccsr::SchemaType::Avro, &[]) + .publish_schema( + &ccsr_subject, + &schema, + mz_ccsr::SchemaType::Avro, + &schema_references, + ) .await .context("publishing to schema registry")?; - let schema = avro::parse_schema(&schema) - .with_context(|| format!("parsing avro schema: {}", schema))?; + + // Parse schema, handling references if any + let schema = if reference_raw_schemas.is_empty() { + avro::parse_schema(&schema, &[]) + .with_context(|| format!("parsing avro schema: {}", schema))? + } else { + // Parse reference schemas incrementally (each may depend on previous ones). + // References must be specified in dependency order (dependencies first). + let mut parsed_refs: Vec = vec![]; + for raw in &reference_raw_schemas { + let schema_value: serde_json::Value = serde_json::from_str(raw) + .with_context(|| format!("parsing reference schema JSON: {}", raw))?; + let parsed = Schema::parse_with_references(&schema_value, &parsed_refs) + .with_context(|| format!("parsing reference avro schema: {}", raw))?; + parsed_refs.push(parsed); + } + + // Parse primary schema with all reference types available + let schema_value: serde_json::Value = serde_json::from_str(&schema) + .with_context(|| format!("parsing schema JSON: {}", schema))?; + Schema::parse_with_references(&schema_value, &parsed_refs).with_context( + || format!("parsing avro schema with references: {}", schema), + )? + }; + Ok::<_, anyhow::Error>(Transcoder::ConfluentAvro { schema, schema_id }) } else { - let schema = avro::parse_schema(&schema) + let schema = avro::parse_schema(&schema, &[]) .with_context(|| format!("parsing avro schema: {}", schema))?; Ok(Transcoder::PlainAvro { schema }) } diff --git a/src/testdrive/src/action/kafka/verify_data.rs b/src/testdrive/src/action/kafka/verify_data.rs index abab7109829e1..6c8d2c24be817 100644 --- a/src/testdrive/src/action/kafka/verify_data.rs +++ b/src/testdrive/src/action/kafka/verify_data.rs @@ -248,7 +248,9 @@ pub async fn run_verify_data( .get_schema_by_subject(&format!("{}-key", topic)) .await .ok() - .map(|key_schema| avro::parse_schema(&key_schema.raw).context("parsing avro schema")) + .map(|key_schema| { + avro::parse_schema(&key_schema.raw, &[]).context("parsing avro schema") + }) .transpose()?; // for avro, we can determine if a key is required based on the presence of the key schema // rather than requiring the user to specify the key=true flag @@ -266,7 +268,7 @@ pub async fn run_verify_data( .await .context("fetching schema")? .raw; - Some(avro::parse_schema(&val_schema).context("parsing avro schema")?) + Some(avro::parse_schema(&val_schema, &[]).context("parsing avro schema")?) } else { None }; diff --git a/src/testdrive/src/action/schema_registry.rs b/src/testdrive/src/action/schema_registry.rs index 6c36fd259c234..204dde163f729 100644 --- a/src/testdrive/src/action/schema_registry.rs +++ b/src/testdrive/src/action/schema_registry.rs @@ -13,11 +13,35 @@ use anyhow::{Context, bail}; use mz_ccsr::{SchemaReference, SchemaType}; use mz_ore::retry::Retry; use mz_ore::str::StrExt; +use serde_json::Value as JsonValue; use crate::action::{ControlFlow, State}; use crate::format::avro; use crate::parser::BuiltinCommand; +/// Extracts the fully qualified name from an Avro schema JSON string. +/// For record types, this combines namespace and name (e.g., "com.example.User"). +fn extract_avro_fullname(schema_json: &str) -> anyhow::Result { + let value: JsonValue = + serde_json::from_str(schema_json).context("parsing schema JSON to extract fullname")?; + + let name = value + .get("name") + .and_then(|v| v.as_str()) + .ok_or_else(|| anyhow::anyhow!("schema missing 'name' field"))?; + + let namespace = value.get("namespace").and_then(|v| v.as_str()); + + // If name contains dots, it's already fully qualified + if name.contains('.') { + Ok(name.to_string()) + } else if let Some(ns) = namespace { + Ok(format!("{}.{}", ns, name)) + } else { + Ok(name.to_string()) + } +} + pub async fn run_publish( mut cmd: BuiltinCommand, state: &State, @@ -49,8 +73,18 @@ pub async fn run_publish( .get_subject_latest(&reference) .await .with_context(|| format!("fetching reference {}", reference))?; + let type_name = match schema_type { + // Extract the fully qualified Avro type name from the schema. + // The Schema Registry reference `name` field should be the type name + // (e.g., "com.example.Address"), not the subject name. + SchemaType::Avro => extract_avro_fullname(&subject.schema.raw).with_context(|| { + format!("extracting type name from reference schema {}", reference) + })?, + SchemaType::Protobuf | SchemaType::Json => subject.name, + }; + references.push(SchemaReference { - name: subject.name, + name: type_name, subject: reference.to_string(), version: subject.version, }) @@ -77,7 +111,7 @@ pub async fn run_verify( cmd.args.done()?; let expected_schema = match &cmd.input[..] { [expected_schema] => { - avro::parse_schema(expected_schema).context("parsing expected avro schema")? + avro::parse_schema(expected_schema, &[]).context("parsing expected avro schema")? } _ => bail!("unable to read expected schema input"), }; @@ -105,7 +139,8 @@ pub async fn run_verify( .await .context("fetching schema")?; - let actual_schema = avro::parse_schema(&actual_schema).context("parsing actual avro schema")?; + let actual_schema = + avro::parse_schema(&actual_schema, &[]).context("parsing actual avro schema")?; if expected_schema != actual_schema { bail!( "schema did not match\nexpected:\n{:?}\n\nactual:\n{:?}", diff --git a/test/testdrive/avro-schema-references.td b/test/testdrive/avro-schema-references.td new file mode 100644 index 0000000000000..3eacea9e6411f --- /dev/null +++ b/test/testdrive/avro-schema-references.td @@ -0,0 +1,294 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Tests for Avro schema references in Confluent Schema Registry. +# Schema references allow one schema to reference named types defined in another schema. + +$ set-arg-default single-replica-cluster=quickstart + +> CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY ( + URL '${testdrive.schema-registry-url}' + ); + +> CREATE CONNECTION kafka_conn + TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT); + +# ============================================================================= +# Test 1: Simple schema reference +# A schema references a type defined in another schema +# ============================================================================= + +# First, publish the base schema that defines the User record type +$ set user-schema={ + "type": "record", + "name": "User", + "namespace": "com.example", + "fields": [ + {"name": "id", "type": "long"}, + {"name": "name", "type": "string"} + ] + } + +$ schema-registry-publish subject=testdrive-user-${testdrive.seed}-value schema-type=avro +\${user-schema} + +# Now create a topic for events that reference the User type +$ kafka-create-topic topic=simple-ref + +# Publish the Event schema that references User, and ingest data +$ set event-schema={ + "type": "record", + "name": "Event", + "namespace": "com.example", + "fields": [ + {"name": "event_id", "type": "long"}, + {"name": "user", "type": "com.example.User"} + ] + } + +$ kafka-ingest format=avro topic=simple-ref schema=${event-schema} references=testdrive-user-${testdrive.seed}-value timestamp=1 +{"event_id": 1, "user": {"id": 100, "name": "Alice"}} +{"event_id": 2, "user": {"id": 101, "name": "Bob"}} + +> BEGIN +> CREATE SOURCE simple_ref_src + IN CLUSTER ${arg.single-replica-cluster} + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-simple-ref-${testdrive.seed}') + +> CREATE TABLE simple_ref FROM SOURCE simple_ref_src (REFERENCE "testdrive-simple-ref-${testdrive.seed}") + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ENVELOPE NONE +> COMMIT + +> SELECT event_id, (user).id, (user).name FROM simple_ref ORDER BY event_id +event_id id name +----------------------- +1 100 Alice +2 101 Bob + +# ============================================================================= +# Test 2: Chained schema references (A -> B -> C) +# Schema C references B, which references A +# ============================================================================= + +# Schema A: Address +$ set address-schema={ + "type": "record", + "name": "Address", + "namespace": "com.example", + "fields": [ + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "zip", "type": "string"} + ] + } + +$ schema-registry-publish subject=testdrive-address-${testdrive.seed}-value schema-type=avro +\${address-schema} + +# Schema B: Person (references Address) +$ set person-schema={ + "type": "record", + "name": "Person", + "namespace": "com.example", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "home_address", "type": "com.example.Address"} + ] + } + +$ schema-registry-publish subject=testdrive-person-${testdrive.seed}-value schema-type=avro references=testdrive-address-${testdrive.seed}-value +\${person-schema} + +# Schema C: Order (references Person, which transitively references Address) +$ kafka-create-topic topic=chained-ref + +$ set order-schema={ + "type": "record", + "name": "Order", + "namespace": "com.example", + "fields": [ + {"name": "order_id", "type": "long"}, + {"name": "customer", "type": "com.example.Person"} + ] + } + +$ kafka-ingest format=avro topic=chained-ref schema=${order-schema} references=testdrive-address-${testdrive.seed}-value,testdrive-person-${testdrive.seed}-value timestamp=1 +{"order_id": 1001, "customer": {"name": "Charlie", "home_address": {"street": "123 Main St", "city": "Springfield", "zip": "12345"}}} + +> BEGIN +> CREATE SOURCE chained_ref_src + IN CLUSTER ${arg.single-replica-cluster} + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-chained-ref-${testdrive.seed}') + +> CREATE TABLE chained_ref FROM SOURCE chained_ref_src (REFERENCE "testdrive-chained-ref-${testdrive.seed}") + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ENVELOPE NONE +> COMMIT + +> SELECT order_id, (customer).name, ((customer).home_address).city FROM chained_ref +order_id name city +---------------------------- +1001 Charlie Springfield + +# ============================================================================= +# Test 3: Schema with multiple referenced types +# A schema references multiple types from separate schemas +# ============================================================================= + +# Coordinates schema +$ set coordinates-schema={ + "type": "record", + "name": "Coordinates", + "namespace": "com.geo", + "fields": [ + {"name": "lat", "type": "double"}, + {"name": "lng", "type": "double"} + ] + } + +$ schema-registry-publish subject=testdrive-coordinates-${testdrive.seed}-value schema-type=avro +\${coordinates-schema} + +# Metadata schema +$ set metadata-schema={ + "type": "record", + "name": "Metadata", + "namespace": "com.geo", + "fields": [ + {"name": "source", "type": "string"}, + {"name": "accuracy", "type": "float"} + ] + } + +$ schema-registry-publish subject=testdrive-metadata-${testdrive.seed}-value schema-type=avro +\${metadata-schema} + +# Location schema that uses both Coordinates and Metadata +$ kafka-create-topic topic=multi-ref + +$ set location-schema={ + "type": "record", + "name": "Location", + "namespace": "com.geo", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "position", "type": "com.geo.Coordinates"}, + {"name": "info", "type": "com.geo.Metadata"} + ] + } + +$ kafka-ingest format=avro topic=multi-ref schema=${location-schema} references=testdrive-coordinates-${testdrive.seed}-value,testdrive-metadata-${testdrive.seed}-value timestamp=1 +{"name": "Coffee Shop", "position": {"lat": 40.7128, "lng": -74.006}, "info": {"source": "GPS", "accuracy": 0.95}} + +> BEGIN +> CREATE SOURCE multi_ref_src + IN CLUSTER ${arg.single-replica-cluster} + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-multi-ref-${testdrive.seed}') + +> CREATE TABLE multi_ref FROM SOURCE multi_ref_src (REFERENCE "testdrive-multi-ref-${testdrive.seed}") + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ENVELOPE NONE +> COMMIT + +> SELECT name, (position).lat, (position).lng, (info).source FROM multi_ref +name lat lng source +-------------------------------------- +"Coffee Shop" 40.7128 -74.006 GPS + +# ============================================================================= +# Test 4: Reference with nullable field +# A schema references a type used in a nullable (union) field +# ============================================================================= + +# Reuse the User schema from Test 1 + +$ kafka-create-topic topic=nullable-ref + +$ set nullable-event-schema={ + "type": "record", + "name": "NullableEvent", + "namespace": "com.example", + "fields": [ + {"name": "event_id", "type": "long"}, + {"name": "user", "type": ["null", "com.example.User"], "default": null} + ] + } + +$ kafka-ingest format=avro topic=nullable-ref schema=${nullable-event-schema} references=testdrive-user-${testdrive.seed}-value timestamp=1 +{"event_id": 1, "user": {"com.example.User": {"id": 200, "name": "Diana"}}} +{"event_id": 2, "user": null} +{"event_id": 3, "user": {"com.example.User": {"id": 201, "name": "Eve"}}} + +> BEGIN +> CREATE SOURCE nullable_ref_src + IN CLUSTER ${arg.single-replica-cluster} + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-nullable-ref-${testdrive.seed}') + +> CREATE TABLE nullable_ref FROM SOURCE nullable_ref_src (REFERENCE "testdrive-nullable-ref-${testdrive.seed}") + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ENVELOPE NONE +> COMMIT + +> SELECT event_id, (user).id, (user).name FROM nullable_ref ORDER BY event_id +event_id id name +--------------------- +1 200 Diana +2 +3 201 Eve + +# ============================================================================= +# Test 5: Reference in array element type +# A schema references a type used as an array element +# ============================================================================= + +$ kafka-create-topic topic=array-ref + +$ set user-list-schema={ + "type": "record", + "name": "UserList", + "namespace": "com.example", + "fields": [ + {"name": "group_name", "type": "string"}, + {"name": "members", "type": {"type": "array", "items": "com.example.User"}} + ] + } + +$ kafka-ingest format=avro topic=array-ref schema=${user-list-schema} references=testdrive-user-${testdrive.seed}-value timestamp=1 +{"group_name": "Team A", "members": [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]} + +> BEGIN +> CREATE SOURCE array_ref_src + IN CLUSTER ${arg.single-replica-cluster} + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-array-ref-${testdrive.seed}') + +> CREATE TABLE array_ref FROM SOURCE array_ref_src (REFERENCE "testdrive-array-ref-${testdrive.seed}") + FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn + ENVELOPE NONE +> COMMIT + +> SELECT group_name, list_length(members) as member_count FROM array_ref +group_name member_count +------------------------- +"Team A" 2 + +# ============================================================================= +# Cleanup +# ============================================================================= + +> DROP TABLE IF EXISTS simple_ref; +> DROP TABLE IF EXISTS chained_ref; +> DROP TABLE IF EXISTS multi_ref; +> DROP TABLE IF EXISTS nullable_ref; +> DROP TABLE IF EXISTS array_ref; +> DROP SOURCE IF EXISTS simple_ref_src CASCADE; +> DROP SOURCE IF EXISTS chained_ref_src CASCADE; +> DROP SOURCE IF EXISTS multi_ref_src CASCADE; +> DROP SOURCE IF EXISTS nullable_ref_src CASCADE; +> DROP SOURCE IF EXISTS array_ref_src CASCADE;