diff --git a/SCHEMA_EVOLUTION_WHITEPAPER.md b/SCHEMA_EVOLUTION_WHITEPAPER.md
new file mode 100644
index 000000000000..f22cd8910fb1
--- /dev/null
+++ b/SCHEMA_EVOLUTION_WHITEPAPER.md
@@ -0,0 +1,1240 @@
+# OpenMetadata Schema Evolution 深度技术白皮书
+## 从零构建元数据版本管理系统的完整指南
+
+> **作者身份**: OpenMetadata 项目首席架构师
+> **目标读者**: 基于 Java/Spring Boot + MySQL 技术栈从零构建元数据版本管理系统的架构师
+> **技术深度**: 生产级实战经验总结,包含详细的数据库设计、核心算法实现与架构决策
+
+---
+
+## 目录
+
+1. [模块一:数据库设计与版本存储 (Data Persistence & Versioning Model)](#模块一数据库设计与版本存储)
+2. [模块二:核心变更检测 (Diff) 逻辑 (Change Detection Logic)](#模块二核心变更检测-diff-逻辑)
+3. [模块三:事件/通知架构 (Event & Notification Architecture)](#模块三事件通知架构)
+4. [核心设计哲学与架构建议](#核心设计哲学与架构建议)
+
+---
+
+## 模块一:数据库设计与版本存储
+
+### 1.1 实体与历史的关系:统一存储与扩展表
+
+OpenMetadata 采用了一种**优雅的架构模式**,将当前活跃实体和历史版本分离存储,但保持关联。核心设计如下:
+
+#### 1.1.1 主实体表设计(以 `table_entity` 为例)
+
+```sql
+CREATE TABLE `table_entity` (
+ `id` varchar(36) GENERATED ALWAYS AS (json_unquote(json_extract(`json`,'$.id'))) STORED NOT NULL,
+ `json` json NOT NULL, -- 核心:整个实体序列化为 JSON
+ `updatedAt` bigint unsigned GENERATED ALWAYS AS (json_unquote(json_extract(`json`,'$.updatedAt'))) VIRTUAL NOT NULL,
+ `updatedBy` varchar(256) GENERATED ALWAYS AS (json_unquote(json_extract(`json`,'$.updatedBy'))) VIRTUAL NOT NULL,
+ `deleted` tinyint(1) GENERATED ALWAYS AS (json_extract(`json`,'$.deleted')) VIRTUAL,
+ `fqnHash` varchar(768) CHARACTER SET ascii COLLATE ascii_bin DEFAULT NULL,
+ `name` varchar(256) GENERATED ALWAYS AS (json_unquote(json_extract(`json`,'$.name'))) VIRTUAL NOT NULL,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `fqnHash` (`fqnHash`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
+```
+
+**架构亮点**:
+- **JSON 存储核心实体**:`json` 列存储完整的实体对象(JSON 格式),包含所有字段、嵌套对象等。
+- **虚拟列(Virtual Columns)用于索引**:通过 MySQL 的 `GENERATED ALWAYS AS` 从 JSON 中提取常用字段(如 `id`、`updatedAt`、`name`),创建虚拟列作为索引,兼顾灵活性与查询性能。
+- **Soft Delete 支持**:`deleted` 字段标记逻辑删除,支持实体恢复。
+
+#### 1.1.2 历史版本存储表:`entity_extension`
+
+```sql
+CREATE TABLE `entity_extension` (
+ `id` varchar(36) NOT NULL, -- 指向主实体的 ID
+ `extension` varchar(256) CHARACTER SET ascii COLLATE ascii_bin NOT NULL, -- 扩展类型/版本标识
+ `jsonSchema` varchar(256) NOT NULL, -- Schema 名称(如 "table")
+ `json` json NOT NULL, -- 存储历史版本的完整快照(JSON)
+ PRIMARY KEY (`id`,`extension`) -- 复合主键:实体 ID + 版本标识
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
+```
+
+**工作机制**:
+- **版本标识命名规范**:`extension` 字段使用格式 `{entityType}.{version}`,例如:
+ - `table.0.1` 表示表实体的 v0.1 版本
+ - `table.1.0` 表示表实体的 v1.0 版本
+- **快照存储策略**:每次版本更新时,将**旧版本的完整实体**存储到 `entity_extension` 表,新版本覆盖主表的 `json` 字段。
+- **时间序列版本链**:通过 `extension` 字段的版本号,可以构建完整的版本演进链。
+
+**Java 代码示例**(版本存储核心逻辑):
+
+```java
+// 来自 EntityRepository.EntityUpdater 内部类
+private void storeEntityHistory() {
+ // 计算版本扩展名称
+ String extensionName = EntityUtil.getVersionExtension(entityType, original.getVersion());
+ // 示例:entityType="table", original.getVersion()=1.0 => extensionName="table.1.0"
+
+ // 将旧版本(original)存储到 entity_extension 表
+ daoCollection
+ .entityExtensionDAO()
+ .insert(original.getId(), extensionName, entityType, JsonUtils.pojoToJson(original));
+}
+```
+
+**版本号管理规则**(`EntityUtil` 中的策略):
+```java
+public static String getVersionExtension(String entityType, Double version) {
+ return String.format("%s.%s", entityType, version);
+}
+
+// 版本递增策略
+public static Double nextVersion(Double currentVersion) {
+ return Math.round((currentVersion + 0.1) * 10.0) / 10.0; // 0.1 -> 0.2 -> 0.3 ...
+}
+
+public static Double nextMajorVersion(Double currentVersion) {
+ return Math.floor(currentVersion) + 1.0; // 0.5 -> 1.0, 1.3 -> 2.0
+}
+```
+
+---
+
+### 1.2 存储策略:完整快照 (Full Snapshot) vs 差异存储 (Delta)
+
+**OpenMetadata 的选择:完整快照 (Full Snapshot)**
+
+当一个表从 v1.0 演变到 v1.1 时(例如增加了一列),系统的存储策略如下:
+
+1. **v1.0 版本**被完整地序列化为 JSON,存储到 `entity_extension` 表:
+ ```json
+ {
+ "id": "uuid-1234",
+ "name": "customer",
+ "version": 1.0,
+ "columns": [
+ {"name": "id", "dataType": "INT"},
+ {"name": "name", "dataType": "VARCHAR"}
+ ],
+ "updatedAt": 1700000000000,
+ "changeDescription": {
+ "fieldsAdded": [...],
+ "fieldsUpdated": [...],
+ "fieldsDeleted": [...]
+ }
+ }
+ ```
+
+2. **v1.1 版本**的完整快照直接覆盖主表 `table_entity` 的 `json` 列。
+
+3. **变更描述(ChangeDescription)同时存储**在 v1.1 的 JSON 中,记录从 v1.0 到 v1.1 的 delta 信息。
+
+**为什么选择完整快照而非差异存储?**
+
+| 策略 | 优势 | 劣势 | OpenMetadata 的决策 |
+|--------------|----------------------------------------------------------------------|------------------------------------------------|----------------------------------------------|
+| **完整快照** | - 读取任意版本速度极快(单次查询)
- 实现简单,无需重构历史版本 | - 存储空间较大 | ✅ **适合元数据场景**(数据量有限,读多写少) |
+| **差异存储** | - 存储空间小 | - 重构历史版本需要应用多个 Patch
- 读取慢 | ❌ 不适合需要频繁访问历史版本的场景 |
+
+**架构决策理由**:
+- 元数据体积远小于业务数据,存储成本可接受。
+- 频繁查询历史版本(如审计、回滚、版本对比),完整快照性能优势显著。
+- JSON Schema 使得序列化/反序列化成本极低。
+
+---
+
+### 1.3 JSON Schema 在版本管理中的巨大优势
+
+OpenMetadata 的核心决策之一是**将所有元数据实体定义为 JSON Schema**,这一决策对版本管理带来了革命性的影响:
+
+#### 1.3.1 JSON Schema 示例(Table 实体)
+
+```json
+{
+ "$id": "https://open-metadata.org/schema/entity/data/table.json",
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "title": "Table",
+ "description": "A Table entity",
+ "type": "object",
+ "properties": {
+ "id": { "type": "string", "format": "uuid" },
+ "name": { "type": "string" },
+ "fullyQualifiedName": { "type": "string" },
+ "columns": {
+ "type": "array",
+ "items": { "$ref": "../../type/column.json#/definitions/column" }
+ },
+ "tags": {
+ "type": "array",
+ "items": { "$ref": "../../type/tagLabel.json" }
+ },
+ "version": { "type": "number" },
+ "updatedAt": { "type": "integer" },
+ "updatedBy": { "type": "string" },
+ "changeDescription": { "$ref": "../../type/entityHistory.json#/definitions/changeDescription" }
+ },
+ "required": ["id", "name", "columns"]
+}
+```
+
+#### 1.3.2 JSON Schema 带来的版本管理优势
+
+1. **Schema 演进的天然支持**:
+ - 新增字段:只需在 Schema 中添加 `optional` 属性,旧数据自动兼容。
+ - 字段重命名:通过 `previousNames` 元数据实现向后兼容。
+ - 类型变更:通过 Schema 验证器在写入时检查,防止破坏性变更。
+
+2. **统一的序列化/反序列化**:
+ ```java
+ // 写入:实体 -> JSON
+ String json = JsonUtils.pojoToJson(entity);
+ dao.insert(id, json);
+
+ // 读取:JSON -> 实体
+ Table entity = JsonUtils.readValue(json, Table.class);
+ ```
+
+3. **自描述的版本快照**:
+ - 每个版本的 JSON 快照包含其 `version` 字段和 `changeDescription`,无需额外元数据表。
+ - 支持跨版本查询和比较(如 "显示 v1.0 和 v2.0 的差异")。
+
+4. **松耦合的存储与业务逻辑**:
+ - 数据库仅存储 JSON 字符串,业务逻辑通过 Pydantic(Python)或 Jackson(Java)动态解析。
+ - 支持在不修改数据库 Schema 的前提下快速迭代实体模型。
+
+---
+
+### 1.4 关键 SQL Schema 详解:`change_event` 表
+
+除了存储实体版本,OpenMetadata 还维护一个**变更事件流**用于审计、通知和下游消费:
+
+```sql
+CREATE TABLE `change_event` (
+ `eventType` varchar(36) GENERATED ALWAYS AS (json_unquote(json_extract(`json`,'$.eventType'))) VIRTUAL NOT NULL,
+ `entityType` varchar(36) GENERATED ALWAYS AS (json_unquote(json_extract(`json`,'$.entityType'))) VIRTUAL NOT NULL,
+ `userName` varchar(256) GENERATED ALWAYS AS (json_unquote(json_extract(`json`,'$.userName'))) VIRTUAL NOT NULL,
+ `eventTime` bigint unsigned GENERATED ALWAYS AS (json_unquote(json_extract(`json`,'$.timestamp'))) VIRTUAL NOT NULL,
+ `json` json NOT NULL, -- 完整的 ChangeEvent JSON
+ KEY `event_type_index` (`eventType`),
+ KEY `entity_type_index` (`entityType`),
+ KEY `event_time_index` (`eventTime`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
+```
+
+**ChangeEvent 的 JSON 结构**:
+
+```json
+{
+ "id": "uuid-5678",
+ "eventType": "ENTITY_UPDATED", // CREATED/UPDATED/DELETED/SOFT_DELETED/RESTORED
+ "entityType": "table",
+ "entityId": "uuid-1234",
+ "entityFullyQualifiedName": "db.schema.customer",
+ "userName": "john.doe",
+ "timestamp": 1700000000000,
+ "currentVersion": 1.1,
+ "previousVersion": 1.0,
+ "changeDescription": {
+ "fieldsAdded": [
+ {
+ "name": "columns",
+ "newValue": "[{\"name\": \"email\", \"dataType\": \"VARCHAR\"}]"
+ }
+ ],
+ "fieldsUpdated": [],
+ "fieldsDeleted": []
+ },
+ "entity": { /* 完整的实体 JSON(可选,用于下游消费)*/ }
+}
+```
+
+**关键字段说明**:
+- `eventType`:事件类型,支持实体生命周期管理。
+- `changeDescription`:记录**精确的字段级变更**,用于生成 Diff 报告和通知内容。
+- `entity`:可选的完整实体快照,避免下游服务再次查询数据库。
+
+---
+
+### 1.5 版本号管理策略:语义化版本
+
+OpenMetadata 使用**简化的语义化版本**策略:
+
+- **Minor 版本(0.1, 0.2, ...)**:向后兼容的变更(如新增可选字段、更新描述)。
+- **Major 版本(1.0, 2.0, ...)**:破坏性变更(如删除列、修改数据类型)。
+
+**Java 实现**:
+
+```java
+public class EntityUpdater {
+ private boolean majorVersionChange = false; // 标记是否为破坏性变更
+
+ protected void updateColumns(List origColumns, List updatedColumns) {
+ List deletedColumns = diffLists(origColumns, updatedColumns, ...);
+ if (!deletedColumns.isEmpty()) {
+ majorVersionChange = true; // 删除列 => Major 版本
+ }
+ }
+
+ private boolean updateVersion(Double oldVersion) {
+ Double newVersion = majorVersionChange
+ ? EntityUtil.nextMajorVersion(oldVersion) // 1.0 -> 2.0
+ : EntityUtil.nextVersion(oldVersion); // 1.0 -> 1.1
+
+ updated.setVersion(newVersion);
+ changeDescription.setPreviousVersion(oldVersion);
+ return !newVersion.equals(oldVersion);
+ }
+}
+```
+
+---
+
+## 模块二:核心变更检测 (Diff) 逻辑
+
+### 2.1 Diff 算法概览:逐字段比较 + ChangeDescription 聚合
+
+OpenMetadata 的变更检测采用**逐字段递归比较**策略,而非单纯依赖 JSON Patch。核心流程如下:
+
+```java
+public class EntityUpdater {
+ protected ChangeDescription changeDescription; // 聚合所有字段的变更
+
+ @Transaction
+ public final void update() {
+ changeDescription = new ChangeDescription(); // 初始化变更描述
+ updateInternal(); // 执行字段级比较
+ storeUpdate(); // 存储新版本
+ postUpdate(original, updated); // 触发事件
+ }
+
+ private void updateInternal() {
+ // 1. 比较通用字段
+ updateDescription(original, updated);
+ updateDisplayName(original, updated);
+ updateOwners(original, updated);
+ updateTags(original, updated);
+
+ // 2. 调用子类实现的实体特定逻辑
+ entitySpecificUpdate(); // 如 TableRepository.updateColumns()
+ }
+}
+```
+
+### 2.2 字段比较的核心方法:`recordChange()`
+
+```java
+protected boolean recordChange(String fieldName, Object oldValue, Object newValue) {
+ if (operation.isPut() && oldValue != null && newValue == null) {
+ // PUT 操作不允许删除已有值
+ return false;
+ }
+
+ if (!objectMatch(oldValue, newValue)) {
+ // 使用深度比较(支持嵌套对象、集合)
+ fieldUpdated(changeDescription, fieldName, oldValue, newValue);
+ return true;
+ }
+ return false;
+}
+
+// 工具方法:深度对象比较
+public static boolean objectMatch(Object obj1, Object obj2) {
+ if (obj1 == obj2) return true;
+ if (obj1 == null || obj2 == null) return false;
+
+ // 对于复杂对象,转为 JSON 字符串比较(处理字段顺序差异)
+ return JsonUtils.pojoToJson(obj1).equals(JsonUtils.pojoToJson(obj2));
+}
+```
+
+### 2.3 ChangeDescription 的结构
+
+```java
+public class ChangeDescription {
+ private Double previousVersion; // 上一个版本号
+ private List fieldsAdded; // 新增字段列表
+ private List fieldsUpdated; // 更新字段列表
+ private List fieldsDeleted; // 删除字段列表
+}
+
+public class FieldChange {
+ private String name; // 字段全限定名(如 "columns[0].dataType")
+ private Object oldValue; // 旧值(JSON 可序列化)
+ private Object newValue; // 新值(JSON 可序列化)
+}
+```
+
+**示例:表结构变更的 ChangeDescription**
+
+```json
+{
+ "previousVersion": 1.0,
+ "fieldsAdded": [
+ {
+ "name": "columns[2]",
+ "oldValue": null,
+ "newValue": {
+ "name": "email",
+ "dataType": "VARCHAR(255)",
+ "description": "User email address"
+ }
+ }
+ ],
+ "fieldsUpdated": [
+ {
+ "name": "columns[0].dataLength",
+ "oldValue": 50,
+ "newValue": 100
+ }
+ ],
+ "fieldsDeleted": [
+ {
+ "name": "columns[3]",
+ "oldValue": {
+ "name": "deprecated_field",
+ "dataType": "INT"
+ },
+ "newValue": null
+ }
+ ]
+}
+```
+
+---
+
+### 2.4 复杂场景处理:嵌套列变更检测
+
+对于表的 `columns` 字段(嵌套列表),OpenMetadata 使用递归比较:
+
+```java
+// TableRepository.TableUpdater 中的实现
+public void updateColumns(
+ String fieldName,
+ List origColumns,
+ List updatedColumns,
+ BiPredicate columnMatch) { // 列匹配策略
+
+ List deletedColumns = new ArrayList<>();
+ List addedColumns = new ArrayList<>();
+
+ // 1. 记录列表级别的新增/删除
+ recordListChange(fieldName, origColumns, updatedColumns,
+ addedColumns, deletedColumns, columnMatch);
+
+ // 2. 对于保留的列,逐个比较属性
+ for (Column updated : updatedColumns) {
+ Column stored = origColumns.stream()
+ .filter(c -> columnMatch.test(c, updated))
+ .findAny().orElse(null);
+
+ if (stored != null) {
+ // 递归比较列属性
+ updateColumnDescription(fieldName, stored, updated);
+ updateColumnDataLength(stored, updated);
+ updateColumnTags(stored, updated);
+
+ // 递归处理嵌套子列
+ if (updated.getChildren() != null && stored.getChildren() != null) {
+ updateColumns(
+ EntityUtil.getFieldName(fieldName, updated.getName()),
+ stored.getChildren(),
+ updated.getChildren(),
+ columnMatch
+ );
+ }
+ }
+ }
+
+ // 3. 删除列触发 Major 版本变更
+ majorVersionChange = majorVersionChange || !deletedColumns.isEmpty();
+}
+```
+
+**列匹配策略**(`columnMatch` 函数):
+
+```java
+BiPredicate columnMatch = (c1, c2) -> {
+ // 通过名称 + 序号匹配(处理列重命名场景)
+ return c1.getName().equals(c2.getName())
+ && c1.getOrdinalPosition().equals(c2.getOrdinalPosition());
+};
+```
+
+---
+
+### 2.5 标签变更的特殊处理
+
+标签(Tags)作为元数据管理的核心功能,OpenMetadata 对其变更检测进行了精细化处理:
+
+```java
+protected void updateTags(String fqn, String fieldName,
+ List origTags, List updatedTags) {
+ // 1. 标签比较(忽略派生标签 derived=true)
+ List addedTags = listOf(updatedTags).stream()
+ .filter(t -> !t.getDerived() && !listOf(origTags).contains(t))
+ .collect(Collectors.toList());
+
+ List deletedTags = listOf(origTags).stream()
+ .filter(t -> !t.getDerived() && !listOf(updatedTags).contains(t))
+ .collect(Collectors.toList());
+
+ // 2. 记录变更
+ if (!addedTags.isEmpty()) {
+ fieldAdded(changeDescription, fieldName, JsonUtils.pojoToJson(addedTags));
+ }
+ if (!deletedTags.isEmpty()) {
+ fieldDeleted(changeDescription, fieldName, JsonUtils.pojoToJson(deletedTags));
+ }
+
+ // 3. 更新数据库中的标签关系(tag_usage 表)
+ applyTags(updatedTags, fqn); // 批量插入新标签
+ deleteTags(deletedTags, fqn); // 批量删除旧标签
+}
+```
+
+**标签存储表 `tag_usage`**:
+
+```sql
+CREATE TABLE `tag_usage` (
+ `source` tinyint NOT NULL, -- 标签来源(手动/自动)
+ `tagFQN` varchar(256) NOT NULL, -- 标签全限定名
+ `labelType` tinyint NOT NULL, -- 标签类型
+ `state` tinyint NOT NULL, -- 状态
+ `tagFQNHash` varchar(768) CHARACTER SET ascii COLLATE ascii_bin DEFAULT NULL,
+ `targetFQNHash` varchar(768) CHARACTER SET ascii COLLATE ascii_bin DEFAULT NULL, -- 目标实体 FQN Hash
+ UNIQUE KEY `tag_usage_key` (`source`,`tagFQNHash`,`targetFQNHash`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
+```
+
+---
+
+### 2.6 变更合并(Change Consolidation):会话内合并
+
+OpenMetadata 实现了**用户会话内的变更合并**机制,避免频繁的小版本号:
+
+**场景**:用户在 10 分钟内对同一实体进行了 3 次修改:
+1. v1.0 -> v1.1:添加描述
+2. v1.1 -> v1.2:修改 Owner
+3. v1.2 -> v1.3:添加标签
+
+**合并后的结果**:
+- 最终版本号:v1.1(而非 v1.3)
+- 变更描述合并为单个 ChangeDescription:
+ ```json
+ {
+ "previousVersion": 1.0,
+ "fieldsAdded": ["description", "tags"],
+ "fieldsUpdated": ["owner"]
+ }
+ ```
+
+**实现逻辑**:
+
+```java
+protected boolean consolidateChanges(T original, T updated, Operation operation) {
+ // 满足以下条件时进行合并:
+ return original.getVersion() > 0.1 // 非首次更新
+ && operation == Operation.PATCH // PATCH 操作
+ && !original.getDeleted() // 未删除
+ && original.getUpdatedBy().equals(updated.getUpdatedBy()) // 同一用户
+ && updated.getUpdatedAt() - original.getUpdatedAt() <= 600000; // 10 分钟内
+}
+
+private void revert() {
+ // 1. 从 entity_extension 获取上一版本
+ previous = getPreviousVersion(original);
+
+ // 2. 将 original 回滚到 previous(不记录变更)
+ changeDescription = null;
+ updateInternal();
+
+ // 3. 重新应用从 previous 到 updated 的变更(记录合并后的变更)
+ changeDescription = new ChangeDescription();
+ updateInternal();
+}
+```
+
+---
+
+### 2.7 JSON Patch 的辅助作用
+
+虽然 OpenMetadata 主要使用字段级比较,但在 **PATCH API** 中支持标准的 **JSON Patch (RFC 6902)** 格式:
+
+**客户端 PATCH 请求示例**:
+
+```http
+PATCH /api/v1/tables/uuid-1234
+Content-Type: application/json-patch+json
+
+[
+ {
+ "op": "add",
+ "path": "/tags/-",
+ "value": {
+ "tagFQN": "PII.Sensitive",
+ "source": "Manual"
+ }
+ },
+ {
+ "op": "replace",
+ "path": "/description",
+ "value": "Updated description"
+ }
+]
+```
+
+**服务端处理**(`EntityResource.patch()` 方法):
+
+```java
+public Response patch(
+ @PathParam("id") String id,
+ @HeaderParam("If-Match") String ifMatch, // 乐观锁版本号
+ JsonPatch patch) {
+
+ // 1. 获取当前实体
+ T original = repository.get(uriInfo, UUID.fromString(id), repository.getFields("*"));
+
+ // 2. 应用 JSON Patch
+ String originalJson = JsonUtils.pojoToJson(original);
+ JsonNode patched = JsonPatchUtils.apply(originalJson, patch); // 使用 Jackson 的 JsonPatch
+ T updated = JsonUtils.treeToValue(patched, entityClass);
+
+ // 3. 调用 EntityUpdater 进行变更检测和版本管理
+ PutResponse response = repository.createOrUpdate(uriInfo, updated);
+
+ return Response.ok(response.getEntity()).build();
+}
+```
+
+**JsonPatchUtils 的核心实现**:
+
+```java
+public static JsonNode apply(String originalJson, JsonPatch patch) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode targetJson = mapper.readTree(originalJson);
+
+ // 使用 jackson-json-patch 库应用补丁
+ com.github.fge.jsonpatch.JsonPatch jacksonPatch =
+ com.github.fge.jsonpatch.JsonPatch.fromJson(patch);
+
+ return jacksonPatch.apply(targetJson);
+}
+```
+
+---
+
+## 模块三:事件/通知架构
+
+### 3.1 事件模型:基于 LMAX Disruptor 的高性能发布-订阅
+
+OpenMetadata 使用**异步事件总线**解耦变更生产者和消费者,核心组件是 `EventPubSub`:
+
+```java
+@Slf4j
+public class EventPubSub {
+ private static Disruptor disruptor;
+ private static RingBuffer ringBuffer;
+
+ public static void start() {
+ // 初始化 Disruptor(1024 大小的环形缓冲区)
+ disruptor = new Disruptor<>(
+ ChangeEventHolder::new,
+ 1024,
+ DaemonThreadFactory.INSTANCE
+ );
+ ringBuffer = disruptor.start();
+ }
+
+ // 发布变更事件(异步非阻塞)
+ public static void publish(ChangeEvent event) {
+ long sequence = ringBuffer.next(); // 获取序列号
+ ringBuffer.get(sequence).setEvent(event); // 填充数据
+ ringBuffer.publish(sequence); // 发布事件
+ }
+
+ // 注册事件处理器
+ public static BatchEventProcessor addEventHandler(
+ EventHandler handler) {
+
+ BatchEventProcessor processor =
+ new BatchEventProcessor<>(ringBuffer, ringBuffer.newBarrier(), handler);
+
+ ringBuffer.addGatingSequences(processor.getSequence());
+ executor.execute(processor); // 在独立线程池中执行
+ return processor;
+ }
+}
+
+// 事件持有者(用于 Disruptor)
+public static class ChangeEventHolder {
+ @Getter @Setter
+ private ChangeEvent event;
+}
+```
+
+**架构优势**:
+- **无锁并发**:Disruptor 的环形缓冲区实现无锁队列,性能远超传统 `BlockingQueue`。
+- **背压控制**:当消费者处理速度慢时,生产者会等待,防止内存溢出。
+- **多消费者支持**:可注册多个 `EventHandler`,实现不同的下游处理逻辑。
+
+---
+
+### 3.2 ChangeEvent 的完整结构
+
+```java
+public class ChangeEvent {
+ private UUID id; // 事件唯一标识
+ private EventType eventType; // CREATED/UPDATED/DELETED/SOFT_DELETED/RESTORED/NO_CHANGE
+ private String entityType; // 实体类型(如 "table", "dashboard")
+ private UUID entityId; // 实体 ID
+ private String entityFullyQualifiedName; // 实体 FQN
+ private String userName; // 操作用户
+ private Long timestamp; // 事件时间戳
+ private Double currentVersion; // 当前版本号
+ private Double previousVersion; // 上一版本号
+ private ChangeDescription changeDescription; // 详细的字段级变更
+ private Object entity; // 完整实体(可选,用于下游直接消费)
+}
+```
+
+**事件生成时机**(`EntityResource` 响应拦截器):
+
+```java
+@Slf4j
+public class ChangeEventHandler implements EventHandler {
+ @SneakyThrows
+ public Void process(
+ ContainerRequestContext requestContext,
+ ContainerResponseContext responseContext) {
+
+ // GET 操作不产生变更事件
+ if (requestContext.getMethod().equals("GET")) {
+ return null;
+ }
+
+ // 从响应中提取 ChangeEvent
+ Optional optionalChangeEvent =
+ getChangeEventFromResponseContext(responseContext, loggedInUserName);
+
+ if (optionalChangeEvent.isPresent()) {
+ ChangeEvent changeEvent = optionalChangeEvent.get();
+
+ // 1. 持久化到 change_event 表(用于审计)
+ if (!changeEvent.getEventType().equals(EventType.ENTITY_NO_CHANGE)) {
+ Entity.getCollectionDAO()
+ .changeEventDAO()
+ .insert(JsonUtils.pojoToJson(changeEvent));
+ }
+
+ // 2. 发布到事件总线(用于实时通知)
+ EventPubSub.publish(changeEvent);
+ }
+
+ return null;
+ }
+}
+```
+
+---
+
+### 3.3 通知分发架构:基于 EventSubscription 的动态路由
+
+OpenMetadata 支持灵活的**通知订阅机制**,用户可通过 UI 配置订阅规则。
+
+#### 3.3.1 订阅配置表:`event_subscription_entity`
+
+```sql
+CREATE TABLE `event_subscription_entity` (
+ `id` varchar(36) GENERATED ALWAYS AS (json_unquote(json_extract(`json`,'$.id'))) STORED NOT NULL,
+ `name` varchar(256) GENERATED ALWAYS AS (json_unquote(json_extract(`json`,'$.name'))) VIRTUAL NOT NULL,
+ `json` json NOT NULL, -- 包含订阅规则、目标配置
+ `nameHash` varchar(256) CHARACTER SET ascii COLLATE ascii_bin DEFAULT NULL,
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `nameHash` (`nameHash`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
+```
+
+**EventSubscription JSON 示例**:
+
+```json
+{
+ "id": "uuid-9876",
+ "name": "Table Schema Changes Notification",
+ "enabled": true,
+ "resources": ["table"], // 监听的实体类型
+ "eventFilters": [
+ {
+ "eventType": "ENTITY_UPDATED",
+ "filters": [
+ {
+ "field": "changeDescription.fieldsAdded.name",
+ "condition": "matchAny",
+ "values": ["columns"] // 仅当添加列时触发
+ }
+ ]
+ }
+ ],
+ "destinations": [
+ {
+ "type": "Slack",
+ "config": {
+ "webhook": "https://hooks.slack.com/services/xxx"
+ }
+ },
+ {
+ "type": "Email",
+ "config": {
+ "recipients": ["admin@company.com"]
+ }
+ }
+ ]
+}
+```
+
+#### 3.3.2 事件过滤与路由逻辑
+
+```java
+public class EventSubscriptionManager implements EventHandler {
+ private List subscriptions; // 从数据库加载
+
+ @Override
+ public void onEvent(ChangeEventHolder holder, long sequence, boolean endOfBatch) {
+ ChangeEvent event = holder.getEvent();
+
+ // 遍历所有订阅,匹配过滤规则
+ for (EventSubscription subscription : subscriptions) {
+ if (subscription.isEnabled() && matchesFilters(event, subscription)) {
+ // 分发到所有配置的目标
+ for (Destination destination : subscription.getDestinations()) {
+ sendNotification(event, destination);
+ }
+ }
+ }
+ }
+
+ private boolean matchesFilters(ChangeEvent event, EventSubscription subscription) {
+ // 1. 检查实体类型
+ if (!subscription.getResources().contains(event.getEntityType())) {
+ return false;
+ }
+
+ // 2. 检查事件类型
+ if (!subscription.getEventFilters().stream()
+ .anyMatch(f -> f.getEventType().equals(event.getEventType()))) {
+ return false;
+ }
+
+ // 3. 检查字段级过滤器(如 "columns 被修改")
+ return subscription.getEventFilters().stream()
+ .anyMatch(filter -> evaluateConditions(event, filter.getFilters()));
+ }
+
+ private boolean evaluateConditions(ChangeEvent event, List filters) {
+ for (FilterRule rule : filters) {
+ // 支持 JSONPath 表达式解析
+ Object value = JsonPath.read(event.getChangeDescription(), rule.getField());
+ if (!rule.evaluate(value)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
+```
+
+---
+
+### 3.4 通知端点实现:策略模式
+
+不同的通知渠道通过**策略模式**实现:
+
+```java
+public interface Destination {
+ void sendMessage(T event) throws EventPublisherException;
+ void close();
+}
+
+// Slack 通知实现
+public class SlackEventPublisher implements Destination {
+ private String webhookUrl;
+ private RestTemplate restTemplate;
+
+ @Override
+ public void sendMessage(ChangeEvent event) {
+ // 构造 Slack 消息格式
+ SlackMessage message = buildSlackMessage(event);
+
+ // 发送 HTTP POST 请求
+ try {
+ restTemplate.postForEntity(webhookUrl, message, String.class);
+ } catch (RestClientException e) {
+ throw new RetriableException("Failed to send Slack notification", e);
+ }
+ }
+
+ private SlackMessage buildSlackMessage(ChangeEvent event) {
+ return SlackMessage.builder()
+ .text(String.format(
+ "🔔 *%s* %s by %s",
+ event.getEntityFullyQualifiedName(),
+ event.getEventType().value(),
+ event.getUserName()
+ ))
+ .attachments(List.of(
+ Attachment.builder()
+ .color("#36a64f")
+ .fields(buildChangeFields(event.getChangeDescription()))
+ .build()
+ ))
+ .build();
+ }
+}
+
+// Email 通知实现
+public class EmailPublisher implements Destination {
+ private JavaMailSender mailSender;
+
+ @Override
+ public void sendMessage(ChangeEvent event) {
+ MimeMessage message = mailSender.createMimeMessage();
+ MimeMessageHelper helper = new MimeMessageHelper(message, true);
+
+ helper.setTo(config.getRecipients());
+ helper.setSubject("Metadata Change: " + event.getEntityFullyQualifiedName());
+ helper.setText(renderEmailTemplate(event), true); // HTML 模板
+
+ mailSender.send(message);
+ }
+}
+
+// Webhook 通知实现
+public class WebhookPublisher implements Destination {
+ @Override
+ public void sendMessage(ChangeEvent event) {
+ HttpPost request = new HttpPost(config.getEndpoint());
+ request.setHeader("Content-Type", "application/json");
+ request.setEntity(new StringEntity(JsonUtils.pojoToJson(event)));
+
+ // 支持重试机制
+ httpClient.execute(request, new RetryHandler(3, 1000));
+ }
+}
+```
+
+---
+
+### 3.5 事件持久化与审计
+
+所有变更事件在发布到内存总线的同时,会持久化到 `change_event` 表,用于:
+
+1. **审计追踪**:满足合规要求,记录所有元数据变更历史。
+2. **离线分析**:通过 SQL 查询分析元数据演进趋势(如 "过去 30 天新增了多少表")。
+3. **事件重放**:支持从数据库重新发布历史事件到新的订阅者。
+
+**查询示例**:
+
+```sql
+-- 查询某用户在某时间段内的所有变更
+SELECT
+ entityType,
+ entityFullyQualifiedName,
+ eventType,
+ JSON_EXTRACT(json, '$.changeDescription') AS changes,
+ eventTime
+FROM change_event
+WHERE userName = 'john.doe'
+ AND eventTime BETWEEN 1700000000000 AND 1700086400000
+ORDER BY eventTime DESC;
+```
+
+---
+
+## 核心设计哲学与架构建议
+
+### 4.1 OpenMetadata 的核心设计哲学
+
+经过深入分析 OpenMetadata 的代码和架构,其 Schema Evolution 系统的核心设计哲学可归纳为:
+
+#### 1. **Schema-Driven Everything**
+- **理念**:所有实体通过 JSON Schema 定义,数据库仅存储 JSON 文档。
+- **优势**:
+ - 模型演进无需修改数据库 Schema(DDL 变更极少)。
+ - 支持灵活的嵌套结构(如表的列、仪表板的图表)。
+ - 便于跨语言(Java、Python、TypeScript)共享模型定义。
+
+#### 2. **Full Snapshot over Delta**
+- **理念**:存储每个版本的完整快照,而非增量 Patch。
+- **权衡**:牺牲一定的存储空间,换取极致的查询性能和实现简洁性。
+- **适用场景**:元数据体积有限(通常 < 100GB),且读操作远多于写操作。
+
+#### 3. **Event-Driven Notification**
+- **理念**:变更事件作为一等公民,支持实时通知和下游消费。
+- **优势**:
+ - 解耦元数据管理与外部系统(如数据质量工具、数据血缘引擎)。
+ - 支持灵活的订阅规则,满足不同团队的需求。
+
+#### 4. **Optimistic Concurrency Control(可选)**
+- **理念**:通过版本号实现乐观锁,避免并发更新冲突。
+- **实现**:
+ ```java
+ @Transaction
+ public void storeEntityWithVersion(T entity, boolean update, Double expectedVersion) {
+ if (update) {
+ int rowsAffected = dao.update(
+ entity.getId(),
+ JsonUtils.pojoToJson(entity),
+ expectedVersion // WHERE version = ?
+ );
+ if (rowsAffected == 0) {
+ throw new OptimisticLockException("Entity was modified by another process");
+ }
+ }
+ }
+ ```
+
+#### 5. **Session-Based Change Consolidation**
+- **理念**:合并用户短时间内的多次小修改,避免版本号膨胀。
+- **权衡**:增加一定的实现复杂度,但显著提升用户体验(如 "保存草稿" 场景)。
+
+---
+
+### 4.2 给您的新项目的架构建议
+
+基于以上分析,对于您的 **Java/Spring Boot + MySQL 元数据版本管理系统**,以下是两个**最值得借鉴的关键架构决策**:
+
+#### **建议 1:采用 JSON Schema + 完整快照存储策略**
+
+**实施步骤**:
+
+1. **定义实体表结构**(以 `metadata_entity` 为例):
+ ```sql
+ CREATE TABLE metadata_entity (
+ id VARCHAR(36) PRIMARY KEY,
+ entity_type VARCHAR(64) NOT NULL,
+ name VARCHAR(256) NOT NULL,
+ version DECIMAL(10, 2) NOT NULL DEFAULT 0.1,
+ json_content JSON NOT NULL, -- 完整实体 JSON
+ updated_at BIGINT NOT NULL,
+ updated_by VARCHAR(256) NOT NULL,
+ deleted BOOLEAN DEFAULT FALSE,
+ INDEX idx_entity_type (entity_type),
+ INDEX idx_name (name),
+ INDEX idx_updated_at (updated_at)
+ );
+ ```
+
+2. **创建版本历史表**:
+ ```sql
+ CREATE TABLE entity_version_history (
+ entity_id VARCHAR(36) NOT NULL,
+ version DECIMAL(10, 2) NOT NULL,
+ json_content JSON NOT NULL, -- 该版本的完整快照
+ created_at BIGINT NOT NULL,
+ created_by VARCHAR(256) NOT NULL,
+ change_description JSON, -- ChangeDescription 结构
+ PRIMARY KEY (entity_id, version),
+ FOREIGN KEY (entity_id) REFERENCES metadata_entity(id)
+ );
+ ```
+
+3. **使用 Jackson 进行 JSON 序列化**(Spring Boot 内置支持):
+ ```java
+ @Entity
+ @Table(name = "metadata_entity")
+ public class MetadataEntity {
+ @Id
+ private String id;
+
+ private String entityType;
+
+ @Column(columnDefinition = "json")
+ @Convert(converter = JsonContentConverter.class)
+ private JsonNode jsonContent; // 使用 Jackson 的 JsonNode
+
+ private Double version;
+ }
+
+ @Converter
+ public class JsonContentConverter implements AttributeConverter {
+ private ObjectMapper objectMapper = new ObjectMapper();
+
+ @Override
+ public String convertToDatabaseColumn(JsonNode attribute) {
+ return attribute.toString();
+ }
+
+ @Override
+ public JsonNode convertToEntityAttribute(String dbData) {
+ return objectMapper.readTree(dbData);
+ }
+ }
+ ```
+
+**预期效果**:
+- 添加新实体类型时,无需修改数据库 Schema。
+- 查询任意版本速度极快(单次查询,无需重构)。
+- 支持灵活的查询需求(如 "查找所有包含 PII 标签的表")。
+
+---
+
+#### **建议 2:构建基于 Spring Events 的事件驱动架构**
+
+**实施步骤**:
+
+1. **定义 ChangeEvent 模型**:
+ ```java
+ public class MetadataChangeEvent extends ApplicationEvent {
+ private final String entityId;
+ private final String entityType;
+ private final EventType eventType; // CREATED/UPDATED/DELETED
+ private final ChangeDescription changeDescription;
+
+ // 构造函数、getter/setter
+ }
+ ```
+
+2. **在 EntityService 中发布事件**:
+ ```java
+ @Service
+ public class MetadataEntityService {
+ @Autowired
+ private ApplicationEventPublisher eventPublisher;
+
+ @Transactional
+ public MetadataEntity updateEntity(String id, JsonNode updates) {
+ MetadataEntity original = repository.findById(id);
+ MetadataEntity updated = applyUpdates(original, updates);
+
+ // 计算变更
+ ChangeDescription changeDescription = diffEntities(original, updated);
+
+ // 存储新版本
+ storeVersion(original); // 保存到 entity_version_history
+ repository.save(updated);
+
+ // 发布事件(异步)
+ eventPublisher.publishEvent(new MetadataChangeEvent(
+ id,
+ updated.getEntityType(),
+ EventType.UPDATED,
+ changeDescription
+ ));
+
+ return updated;
+ }
+ }
+ ```
+
+3. **实现事件监听器**(支持多个独立的处理器):
+ ```java
+ @Component
+ public class SlackNotificationListener {
+ @EventListener
+ @Async // 异步处理
+ public void handleChangeEvent(MetadataChangeEvent event) {
+ // 根据订阅规则过滤
+ if (shouldNotify(event)) {
+ sendSlackNotification(event);
+ }
+ }
+ }
+
+ @Component
+ public class AuditLogListener {
+ @EventListener
+ @Async
+ public void handleChangeEvent(MetadataChangeEvent event) {
+ // 持久化到审计日志
+ auditLogRepository.save(new AuditLog(event));
+ }
+ }
+
+ @Component
+ public class ElasticsearchSyncListener {
+ @EventListener
+ @Async
+ public void handleChangeEvent(MetadataChangeEvent event) {
+ // 同步到搜索引擎
+ elasticsearchService.indexEntity(event.getEntityId());
+ }
+ }
+ ```
+
+4. **配置异步线程池**(避免阻塞主线程):
+ ```java
+ @Configuration
+ @EnableAsync
+ public class AsyncConfig {
+ @Bean(name = "eventExecutor")
+ public Executor taskExecutor() {
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+ executor.setCorePoolSize(4);
+ executor.setMaxPoolSize(16);
+ executor.setQueueCapacity(500);
+ executor.setThreadNamePrefix("event-handler-");
+ executor.initialize();
+ return executor;
+ }
+ }
+ ```
+
+**预期效果**:
+- 元数据变更与通知逻辑完全解耦,易于扩展新的订阅者。
+- 通过 `@Async` 避免阻塞主业务流程,提升响应速度。
+- Spring Events 提供可靠的事件传播机制(支持事务边界)。
+
+---
+
+### 4.3 可选的增强功能
+
+1. **引入 Flyway 管理 JSON Schema 版本**:
+ - 将 JSON Schema 文件纳入版本控制。
+ - 使用 Flyway Migration 在 Schema 演进时自动迁移旧数据。
+
+2. **实现 GraphQL API 查询历史版本**:
+ ```graphql
+ query {
+ metadataEntity(id: "uuid-1234") {
+ name
+ currentVersion
+ versionHistory {
+ version
+ changedAt
+ changedBy
+ changes {
+ fieldsAdded
+ fieldsUpdated
+ fieldsDeleted
+ }
+ }
+ }
+ }
+ ```
+
+3. **支持乐观锁(Optimistic Locking)**:
+ ```java
+ @Entity
+ public class MetadataEntity {
+ @Version
+ private Long lockVersion; // JPA 自动管理
+ }
+ ```
+
+---
+
+## 总结:OpenMetadata 的启示
+
+OpenMetadata 的 Schema Evolution 系统之所以强大,在于其**将复杂性封装在简洁的抽象之下**:
+
+- **数据库层**:使用 JSON + 完整快照,最大化灵活性与查询性能。
+- **业务逻辑层**:通过 `EntityUpdater` 抽象,统一处理各类实体的版本管理。
+- **事件通知层**:基于 Disruptor 的高性能异步架构,支持灵活的订阅规则。
+
+对于您的新项目,建议**优先采用 JSON Schema + 完整快照**的存储策略,配合 **Spring Events** 构建事件驱动架构。这将为您带来:
+- **快速迭代能力**:无需频繁修改数据库 Schema。
+- **强大的扩展性**:轻松添加新的实体类型和通知渠道。
+- **优秀的查询性能**:直接查询 JSON 字段,支持复杂的元数据搜索。
+
+如需进一步的技术细节或实现指导,请参考 OpenMetadata 的源代码(尤其是 `EntityRepository.java` 和 `EventPubSub.java`),它们是理解整个系统运作的关键入口。
+
+---
+
+**文档版本**: v1.0
+**最后更新**: 2025-10-23
+**联系方式**: 如有疑问,请参考 OpenMetadata 官方文档或社区论坛。