From a1c83b8fa7a67aee731fd061b3f42823a6e8b912 Mon Sep 17 00:00:00 2001 From: Adesh Atole Date: Fri, 15 Mar 2024 19:02:18 +0530 Subject: [PATCH 1/3] Add DynamoDB source connector. --- .../Dockerfile | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 build/producer/connect-dynamodb/0.27.0-kakfa-3.0.0-dynamodbconnector-0.11.0-avro-6.0.2/Dockerfile diff --git a/build/producer/connect-dynamodb/0.27.0-kakfa-3.0.0-dynamodbconnector-0.11.0-avro-6.0.2/Dockerfile b/build/producer/connect-dynamodb/0.27.0-kakfa-3.0.0-dynamodbconnector-0.11.0-avro-6.0.2/Dockerfile new file mode 100644 index 00000000..ae0584ee --- /dev/null +++ b/build/producer/connect-dynamodb/0.27.0-kakfa-3.0.0-dynamodbconnector-0.11.0-avro-6.0.2/Dockerfile @@ -0,0 +1,15 @@ +FROM quay.io/strimzi/kafka:0.27.0-kafka-3.0.0 +USER root:root +RUN mkdir -p /opt/kafka/plugins/dynamodb + +COPY ./kafka-connect-dynamodb-0.11.0.jar /opt/kafka/plugins/dynamodb/ + +COPY ./kafka-connect-avro-converter-6.0.2.jar /opt/kafka/libs/ +COPY ./kafka-connect-avro-data-6.0.2.jar /opt/kafka/libs/ +COPY ./kafka-avro-serializer-6.0.2.jar /opt/kafka/libs/ +COPY ./kafka-schema-serializer-6.0.2.jar /opt/kafka/libs/ +COPY ./kafka-schema-registry-client-6.0.2.jar /opt/kafka/libs/ +COPY ./avro-1.9.2.jar /opt/kafka/libs/ +COPY ./common-config-6.0.2.jar /opt/kafka/libs/ +COPY ./common-utils-6.0.2.jar /opt/kafka/libs/ +USER 1001 From 8febab67b99cb4aec8ef00e4de799eea6897af27 Mon Sep 17 00:00:00 2001 From: Adesh Atole Date: Fri, 15 Mar 2024 19:07:28 +0530 Subject: [PATCH 2/3] Add README --- .../README.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 build/producer/connect-dynamodb/0.27.0-kakfa-3.0.0-dynamodbconnector-0.11.0-avro-6.0.2/README.md diff --git a/build/producer/connect-dynamodb/0.27.0-kakfa-3.0.0-dynamodbconnector-0.11.0-avro-6.0.2/README.md b/build/producer/connect-dynamodb/0.27.0-kakfa-3.0.0-dynamodbconnector-0.11.0-avro-6.0.2/README.md new file mode 100644 index 00000000..147d888f --- /dev/null +++ b/build/producer/connect-dynamodb/0.27.0-kakfa-3.0.0-dynamodbconnector-0.11.0-avro-6.0.2/README.md @@ -0,0 +1,3 @@ +DynamoDB Kafka Source Connector + +https://github.com/trustpilot/kafka-connect-dynamodb/tree/0.11.0 \ No newline at end of file From 6b20349f7c3f8e2459e9561ead7deaafa3239d74 Mon Sep 17 00:00:00 2001 From: Adesh Atole Date: Tue, 26 Mar 2024 14:35:14 +0530 Subject: [PATCH 3/3] Add comments. --- pkg/redshift/redshift.go | 3 +++ pkg/redshiftbatcher/batch_processor.go | 4 ++++ pkg/redshiftloader/load_processor.go | 16 ++++++++++------ pkg/transformer/debezium/message.go | 14 ++++++++++++++ pkg/transformer/debezium/schema.go | 7 +++++-- 5 files changed, 36 insertions(+), 8 deletions(-) diff --git a/pkg/redshift/redshift.go b/pkg/redshift/redshift.go index d419acb6..fb95fd0b 100644 --- a/pkg/redshift/redshift.go +++ b/pkg/redshift/redshift.go @@ -429,6 +429,9 @@ func (r *Redshift) CreateTable( // 3. Strategy3: table-migration using UNLOAD and COPY and a temp table // Supports: all the other migration scenarios // Exectued by ReplaceTable(), triggered by this function + +// Loader checks for migration need here + func (r *Redshift) UpdateTable(ctx context.Context, inputTable, targetTable Table) (bool, error) { klog.V(4).Infof("inputt Table: \n%+v\n", inputTable) klog.V(4).Infof("target Table: \n%+v\n", targetTable) diff --git a/pkg/redshiftbatcher/batch_processor.go b/pkg/redshiftbatcher/batch_processor.go index f5882d06..7dc92ca0 100644 --- a/pkg/redshiftbatcher/batch_processor.go +++ b/pkg/redshiftbatcher/batch_processor.go @@ -363,6 +363,10 @@ func (b *batchProcessor) processMessage( ) } + // I think this transforms message in redshift format + // checked https://s3.console.aws.amazon.com/s3/buckets/prod-tipoca-stream?region=ap-south-1&bucketType=general&prefix=k8sqredshiftbatcher/tipoca-stream-redshiftsink-q-latest-abha-2-abha_addresses-951581-batcher/ts.abha.abha_addresses/9515811a1a100949a9939cfaaf746f36dbbd266c/&showversions=false + // there are 2 additional props debeziumop, kafkaoffset. I guess this is added by this function, need to check + err := b.messageTransformer.Transform(message, resp.batchSchemaTable) if err != nil { return bytesProcessed, fmt.Errorf( diff --git a/pkg/redshiftloader/load_processor.go b/pkg/redshiftloader/load_processor.go index f419c033..d421a434 100644 --- a/pkg/redshiftloader/load_processor.go +++ b/pkg/redshiftloader/load_processor.go @@ -385,12 +385,13 @@ func (b *loadProcessor) dropTable(ctx context.Context, schema string, table stri // merge: // begin transaction -// 1. deDupe -// 2. delete all rows in target table by pk which are present in -// in staging table -// 3. delete all the DELETE rows in staging table -// 4. insert all the rows from staging table to target table -// 5. drop the staging table +// 1. deDupe +// 2. delete all rows in target table by pk which are present in +// in staging table +// 3. delete all the DELETE rows in staging table +// 4. insert all the rows from staging table to target table +// 5. drop the staging table +// // end transaction func (b *loadProcessor) merge(ctx context.Context) error { start := time.Now() @@ -615,6 +616,9 @@ func (b *loadProcessor) migrateSchema(ctx context.Context, schemaId int, inputTa if err != nil { return fmt.Errorf("Error querying table exist, err: %v\n", err) } + + // Creates table here + if !tableExist { tx, err := b.redshifter.Begin(ctx) if err != nil { diff --git a/pkg/transformer/debezium/message.go b/pkg/transformer/debezium/message.go index 220c2cdb..2f8c7f9a 100644 --- a/pkg/transformer/debezium/message.go +++ b/pkg/transformer/debezium/message.go @@ -265,14 +265,24 @@ func convertDebeziumFormattedTime( } } +//This comment confirms + // Transform debezium event into a s3 message annotating extra information func (c *messageTransformer) Transform( + + // need to find format of this Message message *serializer.Message, table redshift.Table) error { + // How is this parsing? Where is the definition? + // A: It's not really parsing. `before()` and `after()` do the casting + d := &messageParser{ message: message.Value, } + // https://debezium.io/documentation/reference/stable/integrations/serdes.html + // Assuming we are using `without schema` + before := d.before() after := d.after() @@ -294,6 +304,7 @@ func (c *messageTransformer) Transform( return fmt.Errorf("Unknown operation: %s\n", operation) } + // Where is this table object coming from? for _, column := range table.Columns { if column.Type == "record" && column.SourceType.ColumnType == "polygon" { empty := "" @@ -326,6 +337,9 @@ func (c *messageTransformer) Transform( // redshift only has all columns as lower cases kafkaOffset := fmt.Sprintf("%v", message.Offset) + + // this is the place where those 2 props are added + value[transformer.TempTablePrimary] = &kafkaOffset value[transformer.TempTableOp] = &operation message.Operation = operation diff --git a/pkg/transformer/debezium/schema.go b/pkg/transformer/debezium/schema.go index 35892c5b..a7d9b63b 100644 --- a/pkg/transformer/debezium/schema.go +++ b/pkg/transformer/debezium/schema.go @@ -316,7 +316,7 @@ func (c *schemaTransformer) TransformValue( interface{}, error, ) { - s, err := schemaregistry.GetSchemaWithRetry(c.registry, schemaId, 10) + schema, err := schemaregistry.GetSchemaWithRetry(c.registry, schemaId, 10) if err != nil { return nil, err } @@ -332,7 +332,7 @@ func (c *schemaTransformer) TransformValue( } return c.transformSchemaValue( - s.Schema(), + schema.Schema(), primaryKeys, maskSchema, extraMaskSchema, @@ -373,6 +373,9 @@ func (c *schemaTransformer) transformSchemaValue(jobSchema string, // TODO: this might be required, better if not // schema := strings.ReplaceAll(jobSchema, `"null",`, "") schema := jobSchema + + //Debezium schema is declared here, most probably it holds the format in which debezium pushes data to Kafka + var debeziumSchema Schema err := json.Unmarshal([]byte(schema), &debeziumSchema) if err != nil {