diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 2da7e035a0..3ecd0d0208 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -234,7 +234,9 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon defer dstClose(ctx) syncState.Store(shared.Ptr("updating schema")) - if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas); err != nil { + if err := dstConn.ReplayTableSchemaDeltas( + ctx, config.Env, flowName, options.TableMappings, recordBatchSync.SchemaDeltas, config.Version, + ); err != nil { return nil, fmt.Errorf("failed to sync schema: %w", err) } diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index c36cded384..a37e61706b 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -239,6 +239,7 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas( flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, + _ uint32, ) error { for _, schemaDelta := range schemaDeltas { if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 { diff --git a/flow/connectors/bigquery/qrep.go b/flow/connectors/bigquery/qrep.go index 71a360e1da..22412a88a0 100644 --- a/flow/connectors/bigquery/qrep.go +++ b/flow/connectors/bigquery/qrep.go @@ -87,7 +87,7 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep( } if err := c.ReplayTableSchemaDeltas( - ctx, config.Env, config.FlowJobName, nil, []*protos.TableSchemaDelta{tableSchemaDelta}, + ctx, config.Env, config.FlowJobName, nil, []*protos.TableSchemaDelta{tableSchemaDelta}, config.Version, ); err != nil { return nil, fmt.Errorf("failed to add columns to destination table: %w", err) } diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index e2f7ff7ca5..c20c78ab84 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -98,7 +98,9 @@ func (s *QRepAvroSyncMethod) SyncRecords( slog.String(string(shared.FlowNameKey), req.FlowJobName), slog.String("dstTableName", rawTableName)) - if err := s.connector.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas); err != nil { + if err := s.connector.ReplayTableSchemaDeltas( + ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Version, + ); err != nil { return nil, fmt.Errorf("failed to sync schema changes: %w", err) } diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 9fcd42ee30..0e03fbb822 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -131,7 +131,7 @@ func (c *ClickHouseConnector) syncRecordsViaAvro( } warnings := numericTruncator.Warnings() - if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas); err != nil { + if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Version); err != nil { return nil, fmt.Errorf("failed to sync schema changes: %w", err) } @@ -165,6 +165,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas( flowJobName string, tableMappings []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, + internalVersion uint32, ) error { if len(schemaDeltas) == 0 { return nil @@ -188,7 +189,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas( for _, addedColumn := range schemaDelta.AddedColumns { qvKind := types.QValueKind(addedColumn.Type) clickHouseColType, err := qvalue.ToDWHColumnType( - ctx, qvKind, env, protos.DBType_CLICKHOUSE, c.chVersion, addedColumn, schemaDelta.NullableEnabled, + ctx, qvKind, env, protos.DBType_CLICKHOUSE, c.chVersion, addedColumn, schemaDelta.NullableEnabled, internalVersion, ) if err != nil { return fmt.Errorf("failed to convert column type %s to ClickHouse type: %w", addedColumn.Type, err) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index fe67c1a869..d07b9ab44d 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -422,6 +422,8 @@ func GetTableSchemaForTable(tm *protos.TableMapping, columns []driver.ColumnType qkind = types.QValueKindUUID case "DateTime64(6)", "Nullable(DateTime64(6))", "DateTime64(9)", "Nullable(DateTime64(9))": qkind = types.QValueKindTimestamp + case "Time64(3)", "Nullable(Time64(3))": + qkind = types.QValueKindTime case "Date32", "Nullable(Date32)": qkind = types.QValueKindDate case "Float32", "Nullable(Float32)": diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 59df573aa9..e9c9128e19 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -66,6 +66,7 @@ func (c *ClickHouseConnector) SetupNormalizedTable( destinationTableIdentifier, sourceTableSchema, c.chVersion, + config.Version, ) if err != nil { return false, fmt.Errorf("error while generating create table sql for destination ClickHouse table: %w", err) @@ -85,6 +86,7 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable( tableIdentifier string, tableSchema *protos.TableSchema, chVersion *chproto.Version, + internalVersion uint32, ) ([]string, error) { var engine string tmEngine := protos.TableEngine_CH_ENGINE_REPLACING_MERGE_TREE @@ -203,7 +205,8 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable( if clickHouseType == "" { var err error clickHouseType, err = qvalue.ToDWHColumnType( - ctx, colType, config.Env, protos.DBType_CLICKHOUSE, chVersion, column, tableSchema.NullableEnabled || columnNullableEnabled, + ctx, colType, config.Env, protos.DBType_CLICKHOUSE, chVersion, column, + tableSchema.NullableEnabled || columnNullableEnabled, internalVersion, ) if err != nil { return nil, fmt.Errorf("error while converting column type to ClickHouse type: %w", err) diff --git a/flow/connectors/clickhouse/normalize_query.go b/flow/connectors/clickhouse/normalize_query.go index 44f95dafec..a0cdfec3d8 100644 --- a/flow/connectors/clickhouse/normalize_query.go +++ b/flow/connectors/clickhouse/normalize_query.go @@ -123,7 +123,7 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error if clickHouseType == "" { var err error clickHouseType, err = qvalue.ToDWHColumnType( - ctx, colType, t.env, protos.DBType_CLICKHOUSE, t.chVersion, column, schema.NullableEnabled || columnNullableEnabled, + ctx, colType, t.env, protos.DBType_CLICKHOUSE, t.chVersion, column, schema.NullableEnabled || columnNullableEnabled, t.version, ) if err != nil { return "", fmt.Errorf("error while converting column type to clickhouse type: %w", err) @@ -131,6 +131,21 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error } switch clickHouseType { + case "Time64(3)", "Nullable(Time64(3))": + // Time64 is a time-of-day type, parse from JSON string + // toTime64 converts string to Time64(3), returns NULL if string is NULL or invalid + fmt.Fprintf(&projection, + "toTime64(JSONExtractString(_peerdb_data, %s),3) AS %s,", + peerdb_clickhouse.QuoteLiteral(colName), + peerdb_clickhouse.QuoteIdentifier(dstColName), + ) + if t.enablePrimaryUpdate { + fmt.Fprintf(&projectionUpdate, + "toTime64(JSONExtractString(_peerdb_match_data, %s),3) AS %s,", + peerdb_clickhouse.QuoteLiteral(colName), + peerdb_clickhouse.QuoteIdentifier(dstColName), + ) + } case "Date32", "Nullable(Date32)": fmt.Fprintf(&projection, "toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, %s),6,'UTC')) AS %s,", diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 7bf7f01ef8..2cf4531790 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -185,7 +185,7 @@ type CDCSyncConnectorCore interface { // This could involve adding multiple columns. // Connectors which are non-normalizing should implement this as a nop. ReplayTableSchemaDeltas(ctx context.Context, env map[string]string, flowJobName string, - tableMappings []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta) error + tableMappings []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, internalVersion uint32) error } type CDCSyncConnector interface { diff --git a/flow/connectors/elasticsearch/elasticsearch.go b/flow/connectors/elasticsearch/elasticsearch.go index 4c08d91a08..89edbb6f36 100644 --- a/flow/connectors/elasticsearch/elasticsearch.go +++ b/flow/connectors/elasticsearch/elasticsearch.go @@ -94,7 +94,7 @@ func (esc *ElasticsearchConnector) CreateRawTable(ctx context.Context, // we handle schema changes by not handling them since no mapping is being enforced right now func (esc *ElasticsearchConnector) ReplayTableSchemaDeltas(ctx context.Context, env map[string]string, - flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, + flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, _ uint32, ) error { return nil } diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index eefbd22e23..d5d9908059 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -371,7 +371,7 @@ func (c *EventHubConnector) CreateRawTable(ctx context.Context, req *protos.Crea } func (c *EventHubConnector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string, - flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, + flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, _ uint32, ) error { return nil } diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go index a33de879a3..266b8e959e 100644 --- a/flow/connectors/kafka/kafka.go +++ b/flow/connectors/kafka/kafka.go @@ -162,7 +162,7 @@ func (c *KafkaConnector) CreateRawTable(ctx context.Context, req *protos.CreateR } func (c *KafkaConnector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string, - flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, + flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, _ uint32, ) error { return nil } diff --git a/flow/connectors/mysql/cdc.go b/flow/connectors/mysql/cdc.go index 2172e1a017..9fa5705b47 100644 --- a/flow/connectors/mysql/cdc.go +++ b/flow/connectors/mysql/cdc.go @@ -40,7 +40,7 @@ func (c *MySqlConnector) GetTableSchema( ) (map[string]*protos.TableSchema, error) { res := make(map[string]*protos.TableSchema, len(tableMappings)) for _, tm := range tableMappings { - tableSchema, err := c.getTableSchemaForTable(ctx, env, tm, system) + tableSchema, err := c.getTableSchemaForTable(ctx, env, tm, system, version) if err != nil { c.logger.Info("error fetching schema", slog.String("table", tm.SourceTableIdentifier), slog.Any("error", err)) return nil, err @@ -57,6 +57,7 @@ func (c *MySqlConnector) getTableSchemaForTable( env map[string]string, tm *protos.TableMapping, system protos.TypeSystem, + version uint32, ) (*protos.TableSchema, error) { schemaTable, err := utils.ParseSchemaTable(tm.SourceTableIdentifier) if err != nil { @@ -106,7 +107,7 @@ func (c *MySqlConnector) getTableSchemaForTable( if err != nil { return nil, err } - qkind, err := QkindFromMysqlColumnType(dataType) + qkind, err := QkindFromMysqlColumnType(dataType, version) if err != nil { return nil, err } @@ -699,7 +700,7 @@ func (c *MySqlConnector) processAlterTableQuery(ctx context.Context, catalogPool slog.String("tableName", sourceTableName)) continue } - qkind, err := QkindFromMysqlColumnType(col.Tp.InfoSchemaStr()) + qkind, err := QkindFromMysqlColumnType(col.Tp.InfoSchemaStr(), req.InternalVersion) if err != nil { return err } diff --git a/flow/connectors/mysql/qrep.go b/flow/connectors/mysql/qrep.go index 992c26a6ed..905f0c08ec 100644 --- a/flow/connectors/mysql/qrep.go +++ b/flow/connectors/mysql/qrep.go @@ -182,7 +182,7 @@ func (c *MySqlConnector) PullQRepRecords( stream *model.QRecordStream, ) (int64, int64, error) { tableSchema, err := c.getTableSchemaForTable(ctx, config.Env, - &protos.TableMapping{SourceTableIdentifier: config.WatermarkTable}, protos.TypeSystem_Q) + &protos.TableMapping{SourceTableIdentifier: config.WatermarkTable}, protos.TypeSystem_Q, config.Version) if err != nil { return 0, 0, fmt.Errorf("failed to get schema for watermark table %s: %w", config.WatermarkTable, err) } diff --git a/flow/connectors/mysql/schema.go b/flow/connectors/mysql/schema.go index 64c041532b..f045e2740d 100644 --- a/flow/connectors/mysql/schema.go +++ b/flow/connectors/mysql/schema.go @@ -98,7 +98,7 @@ func (c *MySqlConnector) GetColumns(ctx context.Context, version uint32, schema if err != nil { return nil, err } - qkind, err := QkindFromMysqlColumnType(columnType) + qkind, err := QkindFromMysqlColumnType(columnType, version) if err != nil { return nil, err } diff --git a/flow/connectors/mysql/type_conversion.go b/flow/connectors/mysql/type_conversion.go index b12038ac08..204d70aab9 100644 --- a/flow/connectors/mysql/type_conversion.go +++ b/flow/connectors/mysql/type_conversion.go @@ -4,10 +4,11 @@ import ( "fmt" "strings" + "github.com/PeerDB-io/peerdb/flow/shared" "github.com/PeerDB-io/peerdb/flow/shared/types" ) -func QkindFromMysqlColumnType(ct string) (types.QValueKind, error) { +func QkindFromMysqlColumnType(ct string, version uint32) (types.QValueKind, error) { // https://mariadb.com/docs/server/reference/data-types/date-and-time-data-types/timestamp#tab-current-1 ct, _ = strings.CutSuffix(ct, " /* mariadb-5.3 */") ct, _ = strings.CutSuffix(ct, " zerofill") @@ -24,7 +25,13 @@ func QkindFromMysqlColumnType(ct string) (types.QValueKind, error) { return types.QValueKindBytes, nil case "date": return types.QValueKindDate, nil - case "datetime", "timestamp", "time": + case "datetime", "timestamp": + return types.QValueKindTimestamp, nil + case "time": + // For new versions, map TIME to QValueKindTime instead of QValueKindTimestamp + if version >= shared.InternalVersion_ClickHouseTime64 { + return types.QValueKindTime, nil + } return types.QValueKindTimestamp, nil case "decimal", "numeric": return types.QValueKindNumeric, nil diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index e71ef66baf..48c060e397 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -650,7 +650,7 @@ func syncRecordsCore[Items model.Items]( return nil, err } - if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas); err != nil { + if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Version); err != nil { return nil, fmt.Errorf("failed to sync schema changes: %w", err) } @@ -1120,6 +1120,7 @@ func (c *PostgresConnector) ReplayTableSchemaDeltas( flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, + _ uint32, ) error { if len(schemaDeltas) == 0 { return nil diff --git a/flow/connectors/postgres/postgres_schema_delta_test.go b/flow/connectors/postgres/postgres_schema_delta_test.go index 4fc4333a1d..2448965e80 100644 --- a/flow/connectors/postgres/postgres_schema_delta_test.go +++ b/flow/connectors/postgres/postgres_schema_delta_test.go @@ -67,7 +67,7 @@ func (s PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() { Nullable: true, }, }, - }})) + }}, shared.InternalVersion_Latest)) output, err := s.connector.GetTableSchema(s.t.Context(), nil, shared.InternalVersion_Latest, protos.TypeSystem_Q, []*protos.TableMapping{{SourceTableIdentifier: tableName}}) @@ -115,7 +115,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() { SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, - }})) + }}, shared.InternalVersion_Latest)) output, err := s.connector.GetTableSchema(s.t.Context(), nil, shared.InternalVersion_Latest, protos.TypeSystem_Q, []*protos.TableMapping{{SourceTableIdentifier: tableName}}) @@ -146,7 +146,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() { SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, - }})) + }}, shared.InternalVersion_Latest)) output, err := s.connector.GetTableSchema(s.t.Context(), nil, shared.InternalVersion_Latest, protos.TypeSystem_Q, []*protos.TableMapping{{SourceTableIdentifier: tableName}}) @@ -177,7 +177,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() { SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, - }})) + }}, shared.InternalVersion_Latest)) output, err := s.connector.GetTableSchema(s.t.Context(), nil, shared.InternalVersion_Latest, protos.TypeSystem_Q, []*protos.TableMapping{{SourceTableIdentifier: tableName}}) diff --git a/flow/connectors/pubsub/pubsub.go b/flow/connectors/pubsub/pubsub.go index 8c69d6d7b8..0d61ebe49e 100644 --- a/flow/connectors/pubsub/pubsub.go +++ b/flow/connectors/pubsub/pubsub.go @@ -75,7 +75,7 @@ func (c *PubSubConnector) CreateRawTable(ctx context.Context, req *protos.Create } func (c *PubSubConnector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string, - flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, + flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, _ uint32, ) error { return nil } diff --git a/flow/connectors/s3/s3.go b/flow/connectors/s3/s3.go index abc857cbcf..e507ab5d56 100644 --- a/flow/connectors/s3/s3.go +++ b/flow/connectors/s3/s3.go @@ -117,7 +117,7 @@ func (c *S3Connector) SyncRecords(ctx context.Context, req *model.SyncRecordsReq } func (c *S3Connector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string, - flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, + flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, _ uint32, ) error { return nil } diff --git a/flow/connectors/snowflake/merge_stmt_generator.go b/flow/connectors/snowflake/merge_stmt_generator.go index 486e9a3e45..eaec6f282a 100644 --- a/flow/connectors/snowflake/merge_stmt_generator.go +++ b/flow/connectors/snowflake/merge_stmt_generator.go @@ -36,7 +36,7 @@ func (m *mergeStmtGenerator) generateMergeStmt(ctx context.Context, env map[stri for _, column := range columns { genericColumnType := column.Type qvKind := types.QValueKind(genericColumnType) - sfType, err := qvalue.ToDWHColumnType(ctx, qvKind, env, protos.DBType_SNOWFLAKE, nil, column, normalizedTableSchema.NullableEnabled) + sfType, err := qvalue.ToDWHColumnType(ctx, qvKind, env, protos.DBType_SNOWFLAKE, nil, column, normalizedTableSchema.NullableEnabled, 0) if err != nil { return "", fmt.Errorf("failed to convert column type %s to snowflake type: %w", genericColumnType, err) } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index f7ebf975df..f74a03f0c5 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -340,6 +340,7 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas( flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, + _ uint32, ) error { if len(schemaDeltas) == 0 { return nil @@ -365,7 +366,7 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas( for _, addedColumn := range schemaDelta.AddedColumns { qvKind := types.QValueKind(addedColumn.Type) sfColtype, err := qvalue.ToDWHColumnType( - ctx, qvKind, env, protos.DBType_SNOWFLAKE, nil, addedColumn, schemaDelta.NullableEnabled, + ctx, qvKind, env, protos.DBType_SNOWFLAKE, nil, addedColumn, schemaDelta.NullableEnabled, 0, ) if err != nil { return fmt.Errorf("failed to convert column type %s to snowflake type: %w", @@ -450,7 +451,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro( return nil, err } - if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas); err != nil { + if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.TableMappings, req.Records.SchemaDeltas, req.Version); err != nil { return nil, fmt.Errorf("failed to sync schema changes: %w", err) } @@ -663,7 +664,7 @@ func generateCreateTableSQLForNormalizedTable( normalizedColName := SnowflakeIdentifierNormalize(column.Name) qvKind := types.QValueKind(genericColumnType) sfColType, err := qvalue.ToDWHColumnType( - ctx, qvKind, config.Env, protos.DBType_SNOWFLAKE, nil, column, tableSchema.NullableEnabled, + ctx, qvKind, config.Env, protos.DBType_SNOWFLAKE, nil, column, tableSchema.NullableEnabled, 0, ) if err != nil { slog.WarnContext(ctx, fmt.Sprintf("failed to convert column type %s to snowflake type", genericColumnType), diff --git a/flow/e2e/clickhouse.go b/flow/e2e/clickhouse.go index 278a869ba1..8640d4251c 100644 --- a/flow/e2e/clickhouse.go +++ b/flow/e2e/clickhouse.go @@ -289,7 +289,11 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch qrow = append(qrow, types.QValueUInt256{Val: *v}) } case *time.Time: - qrow = append(qrow, types.QValueTimestamp{Val: *v}) + if batch.Schema.Fields[idx].Type == types.QValueKindTime { + qrow = append(qrow, types.QValueTime{Val: v.Sub(v.Truncate(time.Hour * 24))}) + } else { + qrow = append(qrow, types.QValueTimestamp{Val: *v}) + } case *[]time.Time: qrow = append(qrow, types.QValueArrayTimestamp{Val: *v}) case **decimal.Decimal: diff --git a/flow/e2e/clickhouse_mysql_test.go b/flow/e2e/clickhouse_mysql_test.go index bd8ff1f37f..2fd531e80d 100644 --- a/flow/e2e/clickhouse_mysql_test.go +++ b/flow/e2e/clickhouse_mysql_test.go @@ -3,6 +3,7 @@ package e2e import ( "fmt" "math" + "strconv" "strings" "github.com/stretchr/testify/require" @@ -113,6 +114,97 @@ func (s ClickHouseSuite) Test_MySQL_Time() { EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,\"key\",d,dt,tm,t") + // Verify that TIME column uses Time64(3) when ClickHouse version >= 25.6 (assumes latest internal version) + // Backward compatibility with older internal versions is tested in Test_MySQL_Time_BackwardCompatibility + ch, err := connclickhouse.Connect(s.t.Context(), nil, s.Peer().GetClickhouseConfig()) + require.NoError(s.t, err) + defer ch.Close() + + var columnType string + err = ch.QueryRow(s.t.Context(), fmt.Sprintf( + "SELECT type FROM system.columns WHERE database = currentDatabase() AND table = %s AND name = 't'", + clickhouse.QuoteLiteral(dstTableName), + )).Scan(&columnType) + require.NoError(s.t, err) + + chVersion, err := s.connector.GetVersion(s.t.Context()) + require.NoError(s.t, err) + + // Check if ClickHouse version >= 25.6 + versionParts := strings.Split(chVersion, ".") + if len(versionParts) >= 2 { + major, _ := strconv.Atoi(versionParts[0]) + minor, _ := strconv.Atoi(versionParts[1]) + + // If ClickHouse >= 25.6 and using latest internal version, should use Time64(3) + if major > 25 || (major == 25 && minor >= 6) { + require.Contains(s.t, columnType, "Time64(3)", + "Expected Time64(3) for TIME column when ClickHouse >= 25.6 and using latest internal version, got %s", columnType) + } else { + // Older ClickHouse versions should use DateTime64(6) + require.Contains(s.t, columnType, "DateTime64(6)", + "Expected DateTime64(6) for TIME column when ClickHouse < 25.6, got %s", columnType) + } + } + + env.Cancel(s.t.Context()) + RequireEnvCanceled(s.t, env) +} + +func (s ClickHouseSuite) Test_MySQL_Time_BackwardCompatibility() { + if _, ok := s.source.(*MySqlSource); !ok { + s.t.Skip("only applies to mysql") + } + + srcTableName := "test_datetime_backward_compat" + srcFullName := s.attachSchemaSuffix(srcTableName) + quotedSrcFullName := "\"" + strings.ReplaceAll(srcFullName, ".", "\".\"") + "\"" + dstTableName := "test_datetime_backward_compat_dst" + + require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id SERIAL PRIMARY KEY, + "key" TEXT NOT NULL, + t TIME NOT NULL + ) + `, quotedSrcFullName))) + + require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`INSERT INTO %s ("key",t) VALUES + ('init','14:21.654321')`, + quotedSrcFullName))) + + connectionGen := FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix(srcTableName), + TableNameMapping: map[string]string{srcFullName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s) + flowConnConfig.DoInitialSnapshot = true + // Explicitly set to old internal version to test backward compatibility + flowConnConfig.Version = shared.InternalVersion_First + + tc := NewTemporalClient(s.t) + env := ExecutePeerflow(s.t, tc, flowConnConfig) + SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + EnvWaitForEqualTablesWithNames(env, s, "waiting on initial", srcTableName, dstTableName, "id,\"key\",t") + + // Verify that TIME column uses DateTime64(6) even with ClickHouse >= 25.6 when using old internal version + ch, err := connclickhouse.Connect(s.t.Context(), nil, s.Peer().GetClickhouseConfig()) + require.NoError(s.t, err) + defer ch.Close() + + var columnType string + err = ch.QueryRow(s.t.Context(), fmt.Sprintf( + "SELECT type FROM system.columns WHERE database = currentDatabase() AND table = %s AND name = 't'", + clickhouse.QuoteLiteral(dstTableName), + )).Scan(&columnType) + require.NoError(s.t, err) + + // With old internal version, should always use DateTime64(6) regardless of ClickHouse version + require.Contains(s.t, columnType, "DateTime64(6)", + "Expected DateTime64(6) for TIME column with old internal version (InternalVersion_First), got %s", columnType) + env.Cancel(s.t.Context()) RequireEnvCanceled(s.t, env) } diff --git a/flow/e2e/snowflake_schema_delta_test.go b/flow/e2e/snowflake_schema_delta_test.go index 333df110c6..ce92ff1e68 100644 --- a/flow/e2e/snowflake_schema_delta_test.go +++ b/flow/e2e/snowflake_schema_delta_test.go @@ -10,6 +10,7 @@ import ( connsnowflake "github.com/PeerDB-io/peerdb/flow/connectors/snowflake" "github.com/PeerDB-io/peerdb/flow/e2eshared" "github.com/PeerDB-io/peerdb/flow/generated/protos" + "github.com/PeerDB-io/peerdb/flow/shared" "github.com/PeerDB-io/peerdb/flow/shared/types" ) @@ -63,7 +64,7 @@ func (s SnowflakeSchemaDeltaTestSuite) TestSimpleAddColumn() { TypeModifier: -1, }, }, - }})) + }}, shared.InternalVersion_Latest)) output, err := s.connector.GetTableSchema(s.t.Context(), nil, 0, protos.TypeSystem_Q, []*protos.TableMapping{{SourceTableIdentifier: tableName}}) @@ -171,7 +172,7 @@ func (s SnowflakeSchemaDeltaTestSuite) TestAddAllColumnTypes() { SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, - }})) + }}, shared.InternalVersion_Latest)) output, err := s.connector.GetTableSchema(s.t.Context(), nil, 0, protos.TypeSystem_Q, []*protos.TableMapping{{SourceTableIdentifier: tableName}}) @@ -250,7 +251,7 @@ func (s SnowflakeSchemaDeltaTestSuite) TestAddTrickyColumnNames() { SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, - }})) + }}, shared.InternalVersion_Latest)) output, err := s.connector.GetTableSchema(s.t.Context(), nil, 0, protos.TypeSystem_Q, []*protos.TableMapping{{SourceTableIdentifier: tableName}}) @@ -305,7 +306,7 @@ func (s SnowflakeSchemaDeltaTestSuite) TestAddWhitespaceColumnNames() { SrcTableName: tableName, DstTableName: tableName, AddedColumns: addedColumns, - }})) + }}, shared.InternalVersion_Latest)) output, err := s.connector.GetTableSchema(s.t.Context(), nil, 0, protos.TypeSystem_Q, []*protos.TableMapping{{SourceTableIdentifier: tableName}}) diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 814694de7f..009c41d2db 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -8,6 +8,7 @@ import ( "github.com/PeerDB-io/peerdb/flow/generated/protos" "github.com/PeerDB-io/peerdb/flow/internal" + "github.com/PeerDB-io/peerdb/flow/shared" "github.com/PeerDB-io/peerdb/flow/shared/datatypes" "github.com/PeerDB-io/peerdb/flow/shared/types" ) @@ -57,6 +58,7 @@ func ToDWHColumnType( dwhVersion *chproto.Version, column *protos.FieldDescription, nullableEnabled bool, + internalVersion uint32, ) (string, error) { var colType string switch dwhType { @@ -88,6 +90,14 @@ func ToDWHColumnType( colType = fmt.Sprintf("Array(%s)", colType) } else if (kind == types.QValueKindJSON || kind == types.QValueKindJSONB) && ShouldUseNativeJSONType(ctx, env, dwhVersion) { colType = "JSON" + } else if kind == types.QValueKindTime && internalVersion >= shared.InternalVersion_ClickHouseTime64 && dwhVersion != nil { + // Time64 was introduced in ClickHouse 25.6 + if chproto.CheckMinVersion(chproto.Version{Major: 25, Minor: 6, Patch: 0}, *dwhVersion) { + colType = "Time64(3)" + } else { + // Fall back to DateTime64(6) for older ClickHouse versions + colType = types.QValueKindToClickHouseTypeMap[kind] + } } else if val, ok := types.QValueKindToClickHouseTypeMap[kind]; ok { colType = val } else { diff --git a/flow/shared/constants.go b/flow/shared/constants.go index bca7c21093..ed72171531 100644 --- a/flow/shared/constants.go +++ b/flow/shared/constants.go @@ -40,6 +40,9 @@ const ( IntervalVersion_MongoDBFullDocumentColumnToDoc // All: setting json_type_escape_dots_in_keys = true when inserting JSON column to ClickHouse (only impacts MongoDB today) InternalVersion_JsonEscapeDotsInKeys + // ClickHouse: use Time64(3) data type for QValueKindTime when ClickHouse version >= 25.6 + // MySQL: map TIME type to QValueKindTime instead of QValueKindTimestamp + InternalVersion_ClickHouseTime64 TotalNumberOfInternalVersions InternalVersion_Latest = TotalNumberOfInternalVersions - 1 diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index cd83e1fdf4..4e080f7774 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -160,6 +160,7 @@ func (q *QRepFlowExecution) setupWatermarkTableOnDestination(ctx workflow.Contex FlowName: q.config.FlowJobName, Env: q.config.Env, IsResync: q.config.DstTableFullResync, + Version: q.config.Version, } if err := workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig).Get(ctx, nil); err != nil { diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index f4c79eb67c..cae1ffd1c0 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -213,6 +213,7 @@ func (s *SetupFlowExecution) setupNormalizedTables( FlowName: flowConnectionConfigs.FlowJobName, Env: flowConnectionConfigs.Env, IsResync: flowConnectionConfigs.Resync, + Version: flowConnectionConfigs.Version, } if err := workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig).Get(ctx, nil); err != nil { diff --git a/protos/flow.proto b/protos/flow.proto index ee94edfadb..b7b1382e44 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -262,6 +262,7 @@ message SetupNormalizedTableBatchInput { string flow_name = 6; string peer_name = 7; bool is_resync = 8; + uint32 version = 9; } message SetupNormalizedTableOutput {