Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
359 changes: 359 additions & 0 deletions src/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,11 @@ impl UnionSchema {
&self.schemas
}

/// Returns a mutable slice to all variants of this schema.
pub fn variants_mut(&mut self) -> &mut [SchemaPieceOrNamed] {
&mut self.schemas
}

/// Returns true if the first variant of this `UnionSchema` is `Null`.
pub fn is_nullable(&self) -> bool {
!self.schemas.is_empty() && self.schemas[0] == SchemaPieceOrNamed::Piece(SchemaPiece::Null)
Expand Down Expand Up @@ -1377,6 +1382,110 @@ impl Schema {
p.parse(value)
}

/// Parse a schema with named types from referenced schemas pre-populated.
///
/// This is used when parsing a schema that references types defined in other
/// schemas (Confluent Schema Registry schema references). Referenced schemas
/// should be provided in dependency order (dependencies first).
///
/// # Arguments
///
/// * `primary` - The primary schema JSON value to parse
/// * `reference_schemas` - Schemas whose named types should be available during parsing
pub fn parse_with_references(
primary: &Value,
reference_schemas: &[Schema],
) -> Result<Self, AvroError> {
// Collect and remap named types from all referenced schemas
let (named, indices) = Self::collect_named_types(reference_schemas);

// Create parser with pre-populated named types
let p = SchemaParser {
named: named.into_iter().map(Some).collect(),
indices,
};
p.parse(primary)
}

/// Collect all named types from a list of schemas, remapping indices as needed.
///
/// When combining named types from multiple schemas, each schema's internal
/// indices need to be remapped to account for duplicates and new positions.
fn collect_named_types(
schemas: &[Schema],
) -> (Vec<NamedSchemaPiece>, BTreeMap<FullName, usize>) {
let mut combined_named: Vec<NamedSchemaPiece> = Vec::new();
let mut combined_indices: BTreeMap<FullName, usize> = BTreeMap::new();

for schema in schemas {
// First pass: Build the index_map from this schema's indices to combined indices.
// For types that already exist: map to existing combined index
// For new types: map to their future position in combined_named
let mut index_map: Vec<usize> = Vec::with_capacity(schema.named.len());
let mut new_type_offset = combined_named.len();

for named_piece in &schema.named {
if let Some(&existing_idx) = combined_indices.get(&named_piece.name) {
// Type already exists, use existing index
index_map.push(existing_idx);
} else {
// New type, assign next available index
index_map.push(new_type_offset);
new_type_offset += 1;
}
}

// Second pass: Add new types with proper remapping
for named_piece in &schema.named {
if combined_indices.contains_key(&named_piece.name) {
continue;
}

let mut remapped = named_piece.clone();
Self::remap_indices_in_piece_with_map(&mut remapped.piece, &index_map);

let new_idx = combined_named.len();
combined_indices.insert(remapped.name.clone(), new_idx);
combined_named.push(remapped);
}
}

(combined_named, combined_indices)
}

/// Recursively remap Named indices in a SchemaPiece using an index map.
fn remap_indices_in_piece_with_map(piece: &mut SchemaPiece, index_map: &[usize]) {
match piece {
SchemaPiece::Array(inner) => Self::remap_indices_with_map(inner, index_map),
SchemaPiece::Map(inner) => Self::remap_indices_with_map(inner, index_map),
SchemaPiece::Union(union) => {
for variant in union.variants_mut() {
Self::remap_indices_with_map(variant, index_map);
}
}
SchemaPiece::Record { fields, .. } => {
for field in fields {
Self::remap_indices_with_map(&mut field.schema, index_map);
}
}
_ => {}
}
}

/// Remap a single SchemaPieceOrNamed using an index map.
fn remap_indices_with_map(item: &mut SchemaPieceOrNamed, index_map: &[usize]) {
match item {
SchemaPieceOrNamed::Named(idx) => {
if let Some(&new_idx) = index_map.get(*idx) {
*idx = new_idx;
}
}
SchemaPieceOrNamed::Piece(piece) => {
Self::remap_indices_in_piece_with_map(piece, index_map)
}
}
}

/// Converts `self` into its [Parsing Canonical Form].
///
/// [Parsing Canonical Form]:
Expand Down Expand Up @@ -3034,4 +3143,254 @@ mod tests {
assert_eq!(expected, actual, "Name::make_valid({input})")
}
}

#[mz_ore::test]
fn test_parse_with_simple_reference() {
// Schema A defines a User record
let ref_schema_json = r#"{
"type": "record",
"name": "User",
"namespace": "com.example",
"fields": [{"name": "id", "type": "int"}]
}"#;

// Schema B references User
let primary_json = r#"{
"type": "record",
"name": "Event",
"namespace": "com.example",
"fields": [{"name": "user", "type": "com.example.User"}]
}"#;

let ref_schema = Schema::from_str(ref_schema_json).unwrap();
let primary_value: Value = serde_json::from_str(primary_json).unwrap();

let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();

// Verify both Event and User types are in the schema
let user_name = FullName {
name: "User".to_string(),
namespace: "com.example".to_string(),
};
let event_name = FullName {
name: "Event".to_string(),
namespace: "com.example".to_string(),
};

assert!(
schema.indices.contains_key(&user_name),
"User type should be in schema indices"
);
assert!(
schema.indices.contains_key(&event_name),
"Event type should be in schema indices"
);

// Verify Event's user field references User
if let SchemaPieceOrNamed::Named(event_idx) = &schema.top {
let event_piece = &schema.named[*event_idx].piece;
if let SchemaPiece::Record { fields, .. } = event_piece {
assert_eq!(fields.len(), 1);
assert_eq!(fields[0].name, "user");
// The user field should reference the User type (Named)
assert!(matches!(fields[0].schema, SchemaPieceOrNamed::Named(_)));
} else {
panic!("Expected Event to be a record");
}
} else {
panic!("Expected top to be Named");
}
}

#[mz_ore::test]
fn test_parse_with_nested_references() {
// Schema A defines Address
let schema_a = r#"{
"type": "record",
"name": "Address",
"namespace": "com.example",
"fields": [
{"name": "street", "type": "string"},
{"name": "city", "type": "string"}
]
}"#;

// Schema B defines User with Address field (references A)
let schema_b = r#"{
"type": "record",
"name": "User",
"namespace": "com.example",
"fields": [
{"name": "id", "type": "int"},
{"name": "address", "type": "com.example.Address"}
]
}"#;

// Schema C defines Event with User field (references B, transitively A)
let schema_c = r#"{
"type": "record",
"name": "Event",
"namespace": "com.example",
"fields": [
{"name": "user", "type": "com.example.User"},
{"name": "timestamp", "type": "long"}
]
}"#;

// Parse A first
let ref_schema_a = Schema::from_str(schema_a).unwrap();

// Parse B with reference to A
let schema_b_value: Value = serde_json::from_str(schema_b).unwrap();
let ref_schema_b =
Schema::parse_with_references(&schema_b_value, std::slice::from_ref(&ref_schema_a))
.unwrap();

// Parse C with references to A and B (in dependency order)
let schema_c_value: Value = serde_json::from_str(schema_c).unwrap();
let final_schema =
Schema::parse_with_references(&schema_c_value, &[ref_schema_a, ref_schema_b]).unwrap();

// Verify all three types are in the schema
for name in ["Address", "User", "Event"] {
let full_name = FullName {
name: name.to_string(),
namespace: "com.example".to_string(),
};
assert!(
final_schema.indices.contains_key(&full_name),
"{} type should be in schema indices",
name
);
}
}

#[mz_ore::test]
fn test_parse_with_multiple_types_in_reference() {
// Schema A defines both Address and PhoneNumber
let ref_schema_json = r#"{
"type": "record",
"name": "ContactInfo",
"namespace": "com.example",
"fields": [
{
"name": "address",
"type": {
"type": "record",
"name": "Address",
"fields": [{"name": "street", "type": "string"}]
}
},
{
"name": "phone",
"type": {
"type": "record",
"name": "PhoneNumber",
"fields": [{"name": "number", "type": "string"}]
}
}
]
}"#;

// Schema B references both Address and PhoneNumber
let primary_json = r#"{
"type": "record",
"name": "User",
"namespace": "com.example",
"fields": [
{"name": "id", "type": "int"},
{"name": "home", "type": "com.example.Address"},
{"name": "mobile", "type": "com.example.PhoneNumber"}
]
}"#;

let ref_schema = Schema::from_str(ref_schema_json).unwrap();
let primary_value: Value = serde_json::from_str(primary_json).unwrap();

let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();

// Verify all types are in the schema
for name in ["Address", "PhoneNumber", "ContactInfo", "User"] {
let full_name = FullName {
name: name.to_string(),
namespace: "com.example".to_string(),
};
assert!(
schema.indices.contains_key(&full_name),
"{} type should be in schema indices",
name
);
}
}

#[mz_ore::test]
fn test_parse_with_no_references() {
// When no references are provided, it should behave like regular parse
let schema_json = r#"{
"type": "record",
"name": "Simple",
"fields": [{"name": "id", "type": "int"}]
}"#;

let value: Value = serde_json::from_str(schema_json).unwrap();

let schema_with_refs = Schema::parse_with_references(&value, &[]).unwrap();
let schema_normal = Schema::parse(&value).unwrap();

// Both should produce equivalent schemas
assert_eq!(schema_with_refs.named.len(), schema_normal.named.len());
assert_eq!(schema_with_refs.indices.len(), schema_normal.indices.len());
}

#[mz_ore::test]
fn test_parse_with_reference_in_array() {
// Schema A defines an Item record
let ref_schema_json = r#"{
"type": "record",
"name": "Item",
"namespace": "com.example",
"fields": [{"name": "name", "type": "string"}]
}"#;

// Schema B has an array of Items
let primary_json = r#"{
"type": "record",
"name": "Order",
"namespace": "com.example",
"fields": [
{"name": "items", "type": {"type": "array", "items": "com.example.Item"}}
]
}"#;

let ref_schema = Schema::from_str(ref_schema_json).unwrap();
let primary_value: Value = serde_json::from_str(primary_json).unwrap();

let schema = Schema::parse_with_references(&primary_value, &[ref_schema]).unwrap();

// Verify both types exist
let item_name = FullName {
name: "Item".to_string(),
namespace: "com.example".to_string(),
};
assert!(schema.indices.contains_key(&item_name));

// Verify the array items type is a Named reference
if let SchemaPieceOrNamed::Named(order_idx) = &schema.top {
let order_piece = &schema.named[*order_idx].piece;
if let SchemaPiece::Record { fields, .. } = order_piece {
if let SchemaPieceOrNamed::Piece(SchemaPiece::Array(inner)) = &fields[0].schema {
assert!(
matches!(inner.as_ref(), SchemaPieceOrNamed::Named(_)),
"Array items should be a Named reference to Item"
);
} else {
panic!("Expected items field to be an array");
}
} else {
panic!("Expected Order to be a record");
}
} else {
panic!("Expected top to be Named");
}
}
}
Loading