From 780df5a08068a48b352d31e4805047feaf1a23d3 Mon Sep 17 00:00:00 2001 From: Ardian Saidi Date: Tue, 28 Jul 2020 13:50:25 +0200 Subject: [PATCH] Added: Support writing to a different project Added: Support integer range partitioning --- docs/BigQueryMultiTable-batchsink.md | 4 + docs/BigQueryTable-batchsink.md | 29 +- docs/GCS-batchsink.md | 6 + docs/GCSArgumentSetter-action.md | 58 ++++ ...th-range-partition-cdap-data-pipeline.json | 102 +++++++ pom.xml | 4 +- .../bigquery/sink/AbstractBigQuerySink.java | 58 +--- .../sink/AbstractBigQuerySinkConfig.java | 19 ++ .../bigquery/sink/BigQueryOutputFormat.java | 160 ++++++++--- .../gcp/bigquery/sink/BigQuerySink.java | 94 ++++++- .../gcp/bigquery/sink/BigQuerySinkConfig.java | 194 ++++++++++++-- .../gcp/bigquery/sink/PartitionType.java | 26 ++ .../gcp/bigquery/util/BigQueryConstants.java | 5 +- .../gcp/bigtable/sink/BigtableSink.java | 17 +- .../gcp/bigtable/source/BigtableSource.java | 24 +- .../io/cdap/plugin/gcp/common/GCPUtils.java | 6 +- .../io/cdap/plugin/gcp/gcs/StorageClient.java | 82 +++++- .../gcp/gcs/actions/GCSArgumentSetter.java | 246 +++++++++++++++++ .../gcs/actions/GCSArgumentSetterConfig.java | 125 +++++++++ .../gcp/gcs/actions/GCSBucketCreate.java | 15 +- .../gcp/gcs/actions/GCSBucketDelete.java | 21 +- .../plugin/gcp/gcs/sink/GCSBatchSink.java | 181 ++++++++++++- .../gcp/gcs/sink/GCSMultiBatchSink.java | 14 +- .../gcp/gcs/sink/GCSOutputFormatProvider.java | 250 ++++++++++++++++++ .../cdap/plugin/gcp/gcs/source/GCSSource.java | 5 +- .../gcp/bigquery/sink/BigQuerySinkTest.java | 12 +- .../plugin/gcp/gcs/sink/GCSBatchSinkTest.java | 63 +++++ .../gcs/sink/GCSOutputformatProviderTest.java | 100 +++++++ widgets/BigQueryMultiTable-batchsink.json | 8 + widgets/BigQueryTable-batchsink.json | 134 ++++++++-- widgets/GCS-batchsink.json | 15 ++ widgets/GCSArgumentSetter-action.json | 92 +++++++ widgets/GCSMultiFiles-batchsink.json | 10 + 33 files changed, 1999 insertions(+), 180 deletions(-) create mode 100644 docs/GCSArgumentSetter-action.md create mode 100644 examples/example-csv-file-to-bigquery-with-range-partition-cdap-data-pipeline.json create mode 100644 src/main/java/io/cdap/plugin/gcp/bigquery/sink/PartitionType.java create mode 100644 src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetter.java create mode 100644 src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetterConfig.java create mode 100644 src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java create mode 100644 src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSinkTest.java create mode 100644 src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputformatProviderTest.java create mode 100644 widgets/GCSArgumentSetter-action.json diff --git a/docs/BigQueryMultiTable-batchsink.md b/docs/BigQueryMultiTable-batchsink.md index b3635b8365..d736090ca6 100644 --- a/docs/BigQueryMultiTable-batchsink.md +++ b/docs/BigQueryMultiTable-batchsink.md @@ -27,6 +27,10 @@ It can be found on the Dashboard in the Google Cloud Platform Console. This is t that the BigQuery job will run in. If a temporary bucket needs to be created, the service account must have permission in this project to create buckets. +**Dataset Project**: Project the dataset belongs to. This is only required if the dataset is not +in the same project that the BigQuery job will run in. If no value is given, it will default to the +configured Project ID. + **Dataset:** Dataset the tables belongs to. A dataset is contained within a specific project. Datasets are top-level containers that are used to organize and control access to tables and views. If dataset does not exist, it will be created. diff --git a/docs/BigQueryTable-batchsink.md b/docs/BigQueryTable-batchsink.md index b9f5f319d2..67a934f530 100644 --- a/docs/BigQueryTable-batchsink.md +++ b/docs/BigQueryTable-batchsink.md @@ -28,6 +28,10 @@ It can be found on the Dashboard in the Google Cloud Platform Console. This is t that the BigQuery job will run in. If a temporary bucket needs to be created, the service account must have permission in this project to create buckets. +**Dataset Project**: Project the dataset belongs to. This is only required if the dataset is not +in the same project that the BigQuery job will run in. If no value is given, it will default to the +configured Project ID. + **Dataset**: Dataset the table belongs to. A dataset is contained within a specific project. Datasets are top-level containers that are used to organize and control access to tables and views. @@ -66,11 +70,30 @@ the update operation will be performed only in the partitions meeting the criter **Location:** The location where the big query dataset will get created. This value is ignored if the dataset or temporary bucket already exist. -**Create Partitioned Table**: Whether to create the BigQuery table with time partitioning. This value +**Create Partitioned Table [DEPRECATED]**: Whether to create the BigQuery table with time partitioning. This value is ignored if the table already exists. * When this is set to true, table will be created with time partitioning. -* When this is set to false, table will be created without time partitioning. - +* When this is set to false, value of Partitioning type will be used. +* [DEPRECATED] use Partitioning Type + +**Partitioning Type**: Specifies the partitioning type. Can either be Integer or Time or None. Defaults to Time. + This value is ignored if the table already exists. +* When this is set to Time, table will be created with time partitioning. +* When this is set to Integer, table will be created with range partitioning. +* When this is set to None, table will be created without time partitioning. + +**Range Start**: For integer partitioning, specifies the start of the range. Only used when table doesn’t +exist already, and partitioning type is set to Integer. +* The start value is inclusive. + +**Range End**: For integer partitioning, specifies the end of the range. Only used when table doesn’t +exist already, and partitioning type is set to Integer. +* The end value is exclusive. + +**Range Interval**: For integer partitioning, specifies the partition interval. Only used when table doesn’t exist already, +and partitioning type is set to Integer. +* The interval value must be a positive integer. + **Partition Field**: Partitioning column for the BigQuery table. This should be left empty if the BigQuery table is an ingestion-time partitioned table. diff --git a/docs/GCS-batchsink.md b/docs/GCS-batchsink.md index f50c7f7285..02d8dca940 100644 --- a/docs/GCS-batchsink.md +++ b/docs/GCS-batchsink.md @@ -47,5 +47,11 @@ The delimiter will be ignored if the format is anything other than 'delimited'. authorization. Can be set to 'auto-detect' when running on a Dataproc cluster. When running on other clusters, the file must be present on every node in the cluster. +**Output File Prefix:** Prefix for the output file name. +If none is given, it will default to 'part', which means all data files written by the sink will look like +'part-r-00000', 'part-r-00001', etc. + +**File System Properties:** Additional properties to use with the OutputFormat. + **Schema:** Schema of the data to write. The 'avro' and 'parquet' formats require a schema but other formats do not. diff --git a/docs/GCSArgumentSetter-action.md b/docs/GCSArgumentSetter-action.md new file mode 100644 index 0000000000..daf36335d5 --- /dev/null +++ b/docs/GCSArgumentSetter-action.md @@ -0,0 +1,58 @@ +# GCS Argument Setter + +Description +----------- + +Fetch Json File from GCS, to set arguments in the pipeline. + +The plugin provides the ability to map json properties as pipeline arguments name and columns +values as pipeline arguments Following is JSON configuration that can be provided. + +This is most commonly used when the structure of a pipeline is static, +and its configuration needs to be managed outside the pipeline itself. + +The Json File must contain arguments in a list: + + { + "arguments" : [ + { "name" : "argument name", type:"type", value" :"argument value"}, + { "name" : "argument1 name",type:"type", "value" :"argument1 value"} + ] + } +Where type can be Schema, Int, Float, Double, Short, String, Char, Array, Map + + +Credentials +----------- +If the plugin is run on a Google Cloud Dataproc cluster, the service account key does not need to be +provided and can be set to 'auto-detect'. +Credentials will be automatically read from the cluster environment. + +If the plugin is not run on a Dataproc cluster, the path to a service account key must be provided. +The service account key can be found on the Dashboard in the Cloud Platform Console. +Make sure the account key has permission to access BigQuery and Google Cloud Storage. +The service account key file needs to be available on every node in your cluster and +must be readable by all users running the job. + +Properties +---------- +**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc. + +**Project ID**: Google Cloud Project ID, which uniquely identifies a project. +It can be found on the Dashboard in the Google Cloud Platform Console. This is the project +that the BigQuery job will run in. If a temporary bucket needs to be created, the service account +must have permission in this project to create buckets. + +**Path**: GCS Path to the file containing the arguments. + +**Provide Service Account as JSON Content**: Provide service account as JSON. When it is set to 'Yes', +the content of service account key needs to be copied, whereas when it is set to 'No' the service Account +file path needs to be specified. The default value is 'No'. + +**Service Account File Path**: Path on the local file system of the service account key used for +authorization. Can be set to 'auto-detect' when running on a Dataproc cluster. +When running on other clusters, the file must be present on every node in the cluster. + +**Service Account JSON**: The content of the service account. + + diff --git a/examples/example-csv-file-to-bigquery-with-range-partition-cdap-data-pipeline.json b/examples/example-csv-file-to-bigquery-with-range-partition-cdap-data-pipeline.json new file mode 100644 index 0000000000..e857f0f2aa --- /dev/null +++ b/examples/example-csv-file-to-bigquery-with-range-partition-cdap-data-pipeline.json @@ -0,0 +1,102 @@ +{ + "artifact": { + "name": "cdap-data-pipeline", + "version": "6.3.0-SNAPSHOT", + "scope": "SYSTEM" + }, + "description": "Data Pipeline Application", + "name": "example-csv-file-to-bigquery-with-range-partition", + "config": { + "resources": { + "memoryMB": 2048, + "virtualCores": 1 + }, + "driverResources": { + "memoryMB": 2048, + "virtualCores": 1 + }, + "connections": [ + { + "from": "File", + "to": "BigQuery" + } + ], + "comments": [], + "postActions": [], + "properties": {}, + "processTimingEnabled": true, + "stageLoggingEnabled": false, + "stages": [ + { + "name": "File", + "plugin": { + "name": "File", + "type": "batchsource", + "label": "File", + "artifact": { + "name": "core-plugins", + "version": "2.5.0-SNAPSHOT", + "scope": "SYSTEM" + }, + "properties": { + "format": "delimited", + "skipHeader": "true", + "filenameOnly": "false", + "recursive": "false", + "ignoreNonExistingFolders": "false", + "schema": "${out_schema}", + "referenceName": "file", + "delimiter": ";", + "path": "${path_to_csv_example}" + } + }, + "outputSchema": "${out_schema}" + }, + { + "name": "BigQuery", + "plugin": { + "name": "BigQueryTable", + "type": "batchsink", + "label": "BigQuery", + "artifact": { + "name": "google-cloud", + "version": "0.16.0-SNAPSHOT", + "scope": "USER" + }, + "properties": { + "serviceFilePath": "/Users/ardian/Workspace/CDAP/google-cloud-plugins/BigQueryArgumentSetter/Data/adaptivescale-178418-a5edd1eb37f9.json", + "operation": "insert", + "truncateTable": "false", + "allowSchemaRelaxation": "false", + "location": "US", + "createPartitionedTable": "false", + "partitioningType": "INTEGER", + "rangeStart": "${range_start}", + "partitionFilterRequired": "false", + "dataset": "${bqsink_dataset}", + "table": "${bqsink_table}", + "referenceName": "BigQuerySink", + "partitionByField": "${partitionfield}", + "rangeEnd": "${range_end}", + "rangeInterval": "${range_interval}", + "schema": "${out_schema}", + "project": "adaptivescale-178418", + "bucket": "adaptive_scale_test_bucket" + } + }, + "outputSchema": "${out_schema}", + "inputSchema": [ + { + "name": "File", + "schema": "${out_schema}" + } + ] + } + ], + "schedule": "0 * * * *", + "engine": "spark", + "numOfRecordsPreview": 100, + "description": "Data Pipeline Application", + "maxConcurrentRuns": 1 + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index e3ad9f0eb7..d30bbae3f4 100644 --- a/pom.xml +++ b/pom.xml @@ -72,12 +72,12 @@ hadoop2-1.0.0 1.4 6.3.0-SNAPSHOT - 2.4.0-SNAPSHOT + 2.5.0-SNAPSHOT 3.2.6 0.3.1 hadoop2-2.0.0 1.11.0 - 1.92.0 + 1.100.0 1.92.0 1.37.0 1.20.0 diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java index bac0b1f143..2e12d167c0 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java @@ -22,10 +22,6 @@ import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FieldList; -import com.google.cloud.bigquery.Job; -import com.google.cloud.bigquery.JobConfiguration; -import com.google.cloud.bigquery.JobId; -import com.google.cloud.bigquery.JobStatistics; import com.google.cloud.bigquery.LegacySQLTypeName; import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableId; @@ -86,8 +82,7 @@ public abstract class AbstractBigQuerySink extends BatchSink cap) { - LOG.warn("Total record count is too high! Metric for the number of affected rows may not be updated correctly"); - } - count = count < cap ? count : cap; - for (int i = 0; i <= count && totalRows > 0; i++) { - int rowCount = totalRows < Integer.MAX_VALUE ? (int) totalRows : Integer.MAX_VALUE; - context.getMetrics().count(RECORDS_UPDATED_METRIC, rowCount); - totalRows -= rowCount; - } - } - - private JobId getJobId() { - String location = bigQuery.getDataset(getConfig().getDataset()).getLocation(); - return JobId.newBuilder().setLocation(location).setJob(jobId).build(); - } - - private long getTotalRows(Job queryJob) { - JobConfiguration.Type type = queryJob.getConfiguration().getType(); - if (type == JobConfiguration.Type.LOAD) { - return ((JobStatistics.LoadStatistics) queryJob.getStatistics()).getOutputRows(); - } else if (type == JobConfiguration.Type.QUERY) { - return ((JobStatistics.QueryStatistics) queryJob.getStatistics()).getNumDmlAffectedRows(); - } - LOG.warn("Unable to identify BigQuery job type. No metric will be emitted for the number of affected rows."); - return 0; - } - @Override public void initialize(BatchRuntimeContext context) throws Exception { avroTransformer = new StructuredToAvroTransformer(null); @@ -483,7 +435,7 @@ private Configuration getOutputConfiguration(String bucket, BigQueryOutputConfiguration.configure( configuration, - String.format("%s.%s", getConfig().getDataset(), tableName), + String.format("%s:%s.%s", getConfig().getDatasetProject(), getConfig().getDataset(), tableName), outputTableSchema, temporaryGcsPath, BigQueryFileFormat.AVRO, diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java index 5e35e76edd..4e2ae96bcb 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java @@ -15,7 +15,9 @@ */ package io.cdap.plugin.gcp.bigquery.sink; +import com.google.cloud.ServiceOptions; import com.google.cloud.bigquery.JobInfo; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; @@ -23,6 +25,7 @@ import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; +import io.cdap.plugin.gcp.common.GCPConfig; import io.cdap.plugin.gcp.common.GCPReferenceSinkConfig; import java.util.Set; @@ -37,6 +40,7 @@ public abstract class AbstractBigQuerySinkConfig extends GCPReferenceSinkConfig ImmutableSet.of(Schema.Type.INT, Schema.Type.LONG, Schema.Type.STRING, Schema.Type.FLOAT, Schema.Type.DOUBLE, Schema.Type.BOOLEAN, Schema.Type.BYTES, Schema.Type.ARRAY, Schema.Type.RECORD); + public static final String DATASET_PROJECT_ID = "datasetProject"; public static final String NAME_DATASET = "dataset"; public static final String NAME_BUCKET = "bucket"; public static final String NAME_TRUNCATE_TABLE = "truncateTable"; @@ -44,6 +48,13 @@ public abstract class AbstractBigQuerySinkConfig extends GCPReferenceSinkConfig private static final String NAME_GCS_CHUNK_SIZE = "gcsChunkSize"; private static final String NAME_UPDATE_SCHEMA = "allowSchemaRelaxation"; + @Name(DATASET_PROJECT_ID) + @Macro + @Nullable + @Description("The project in which the dataset is located/should be created." + + " Defaults to the project specified in the Project Id property.") + private String datasetProject; + @Name(NAME_DATASET) @Macro @Description("The dataset to write to. A dataset is contained within a specific project. " @@ -100,6 +111,14 @@ public String getDataset() { return dataset; } + @Nullable + public String getDatasetProject() { + if (GCPConfig.AUTO_DETECT.equalsIgnoreCase(datasetProject)) { + return ServiceOptions.getDefaultProjectId(); + } + return Strings.isNullOrEmpty(datasetProject) ? getProject() : datasetProject; + } + @Nullable public String getGcsChunkSize() { return gcsChunkSize; diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java index f92dee41c9..221c413e81 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java @@ -30,12 +30,15 @@ import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.RangePartitioning; +import com.google.api.services.bigquery.model.RangePartitioning.Range; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FieldList; @@ -71,7 +74,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.math.BigInteger; import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.Arrays; @@ -150,8 +152,9 @@ public void commitJob(JobContext jobContext) throws IOException { allowSchemaRelaxation = conf.getBoolean(BigQueryConstants.CONFIG_ALLOW_SCHEMA_RELAXATION, false); LOG.debug("Allow schema relaxation: '{}'", allowSchemaRelaxation); - boolean createPartitionedTable = conf.getBoolean(BigQueryConstants.CONFIG_CREATE_PARTITIONED_TABLE, false); - LOG.debug("Create Partitioned Table: '{}'", createPartitionedTable); + PartitionType partitionType = conf.getEnum(BigQueryConstants.CONFIG_PARTITION_TYPE, PartitionType.NONE); + LOG.debug("Create Partitioned Table type: '{}'", partitionType); + Range range = partitionType == PartitionType.INTEGER ? createRangeForIntegerPartitioning(conf) : null; String partitionByField = conf.get(BigQueryConstants.CONFIG_PARTITION_BY_FIELD, null); LOG.debug("Partition Field: '{}'", partitionByField); boolean requirePartitionFilter = conf.getBoolean(BigQueryConstants.CONFIG_REQUIRE_PARTITION_FILTER, false); @@ -174,22 +177,43 @@ public void commitJob(JobContext jobContext) throws IOException { LOG.debug("Partition filter: '{}'", partitionFilter); boolean tableExists = conf.getBoolean(BigQueryConstants.CONFIG_DESTINATION_TABLE_EXISTS, false); - String jobIdString = conf.get(BigQueryConstants.CONFIG_JOB_ID); - JobId jobId = JobId.of(jobIdString); try { importFromGcs(destProjectId, destTable, destSchema.orElse(null), kmsKeyName, outputFileFormat, - writeDisposition, sourceUris, createPartitionedTable, partitionByField, - requirePartitionFilter, clusteringOrderList, tableExists, jobIdString); + writeDisposition, sourceUris, partitionType, range, partitionByField, + requirePartitionFilter, clusteringOrderList, tableExists, getJobIdForImportGCS(conf)); if (temporaryTableReference != null) { - operationAction(destTable, kmsKeyName, jobId); + operationAction(destTable, kmsKeyName, getJobIdForUpdateUpsert(conf)); } - } catch (InterruptedException e) { - throw new IOException("Failed to import GCS into BigQuery", e); + } catch (Exception e) { + throw new IOException("Failed to import GCS into BigQuery. ", e); } cleanup(jobContext); } + private String getJobIdForImportGCS(Configuration conf) { + //If the operation is not INSERT then this is a write to a temporary table. No need to use saved JobId here. + // Return a random UUID + if (!Operation.INSERT.equals(operation)) { + return UUID.randomUUID().toString(); + } + //See if plugin specified a Job ID to be used. + String savedJobId = conf.get(BigQueryConstants.CONFIG_JOB_ID); + if (savedJobId == null || savedJobId.isEmpty()) { + return UUID.randomUUID().toString(); + } + return savedJobId; + } + + private JobId getJobIdForUpdateUpsert(Configuration conf) { + //See if plugin specified a Job ID to be used. + String savedJobId = conf.get(BigQueryConstants.CONFIG_JOB_ID); + if (savedJobId == null || savedJobId.isEmpty()) { + return JobId.of(UUID.randomUUID().toString()); + } + return JobId.of(savedJobId); + } + @Override public void abortJob(JobContext context, JobStatus.State state) throws IOException { //This method is copied from IndirectBigQueryOutputCommitter#abortJob. @@ -202,7 +226,7 @@ public void abortJob(JobContext context, JobStatus.State state) throws IOExcepti */ private void importFromGcs(String projectId, TableReference tableRef, @Nullable TableSchema schema, @Nullable String kmsKeyName, BigQueryFileFormat sourceFormat, String writeDisposition, - List gcsPaths, boolean createPartitionedTable, + List gcsPaths, PartitionType partitionType, @Nullable Range range, @Nullable String partitionByField, boolean requirePartitionFilter, List clusteringOrderList, boolean tableExists, String jobId) throws IOException, InterruptedException { @@ -228,15 +252,24 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable loadConfig.setSourceUris(gcsPaths); loadConfig.setWriteDisposition(writeDisposition); loadConfig.setUseAvroLogicalTypes(true); - if (!tableExists && createPartitionedTable) { - TimePartitioning timePartitioning = new TimePartitioning(); - timePartitioning.setType("DAY"); - if (partitionByField != null) { - timePartitioning.setField(partitionByField); + if (!tableExists) { + switch (partitionType) { + case TIME: + TimePartitioning timePartitioning = createTimePartitioning(partitionByField, requirePartitionFilter); + loadConfig.setTimePartitioning(timePartitioning); + break; + case INTEGER: + RangePartitioning rangePartitioning = createRangePartitioning(partitionByField, range); + if (requirePartitionFilter) { + createTableWithRangePartitionAndRequirePartitionFilter(tableRef, schema, rangePartitioning); + } else { + loadConfig.setRangePartitioning(rangePartitioning); + } + break; + case NONE: + break; } - timePartitioning.setRequirePartitionFilter(requirePartitionFilter); - loadConfig.setTimePartitioning(timePartitioning); - if (!clusteringOrderList.isEmpty()) { + if (PartitionType.NONE != partitionType && !clusteringOrderList.isEmpty()) { Clustering clustering = new Clustering(); clustering.setFields(clusteringOrderList); loadConfig.setClustering(clustering); @@ -244,7 +277,7 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable } temporaryTableReference = null; - if (tableExists && !isTableEmpty(tableRef) && !Operation.INSERT.equals(operation)) { + if (!Operation.INSERT.equals(operation)) { String temporaryTableName = tableRef.getTableId() + "_" + UUID.randomUUID().toString().replaceAll("-", "_"); temporaryTableReference = new TableReference() @@ -252,6 +285,17 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable .setProjectId(tableRef.getProjectId()) .setTableId(temporaryTableName); loadConfig.setDestinationTable(temporaryTableReference); + + if (!tableExists) { + if (Operation.UPSERT.equals(operation)) { + // For upsert operation, if the destination table does not exist, create it + Table table = new Table(); + table.setTableReference(tableRef); + table.setSchema(schema); + bigQueryHelper.getRawBigquery().tables().insert(tableRef.getProjectId(), tableRef.getDatasetId(), table) + .execute(); + } + } } else { loadConfig.setDestinationTable(tableRef); @@ -287,7 +331,7 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable JobReference jobReference = new JobReference().setProjectId(projectId) - .setJobId(temporaryTableReference == null ? jobId : UUID.randomUUID().toString()) + .setJobId(jobId) .setLocation(dataset.getLocation()); Job job = new Job(); job.setConfiguration(config); @@ -412,8 +456,7 @@ private static Optional getTableSchema(Configuration conf) throws I return Optional.empty(); } - private void operationAction(TableReference tableRef, @Nullable String cmekKey, JobId jobId) - throws InterruptedException { + private void operationAction(TableReference tableRef, @Nullable String cmekKey, JobId jobId) throws Exception { if (allowSchemaRelaxation) { updateTableSchema(tableRef); } @@ -428,11 +471,21 @@ private void operationAction(TableReference tableRef, @Nullable String cmekKey, com.google.cloud.bigquery.EncryptionConfiguration.newBuilder().setKmsKeyName(cmekKey).build()) .build(); - - com.google.cloud.bigquery.Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); - - // Wait for the query to complete. - queryJob.waitFor(); + try { + com.google.cloud.bigquery.Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig) + .setJobId(jobId).build()); + // Wait for the query to complete. + queryJob.waitFor(); + } catch (BigQueryException e) { + if (Operation.UPDATE.equals(operation) && !bigQueryHelper.tableExists(tableRef)) { + // ignore the exception. This is because we do not want to fail the pipeline as per below discussion + // https://github.com/data-integrations/google-cloud/pull/290#discussion_r472405882 + LOG.warn("BigQuery Table {} does not exist. The operation update will not write any records to the table." + , String.format("%s.%s.%s", tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId())); + return; + } + throw e; + } } private void updateTableSchema(TableReference tableRef) { @@ -498,12 +551,6 @@ private static TableSchema createTableSchemaFromFields(String fieldsJson) throws return new TableSchema().setFields(fields); } - private boolean isTableEmpty(TableReference tableRef) throws IOException { - Table table = bigQueryHelper.getTable(tableRef); - return table.getNumRows().compareTo(BigInteger.ZERO) <= 0 && table.getNumBytes() == 0 - && table.getNumLongTermBytes() == 0; - } - @Override protected void cleanup(JobContext context) throws IOException { super.cleanup(context); @@ -528,5 +575,50 @@ private static String formatPartitionFilter(String partitionFilter) { } return String.join(" ", queryWords); } + + private Range createRangeForIntegerPartitioning(Configuration conf) { + long rangeStart = conf.getLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_START, 0); + long rangeEnd = conf.getLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_END, 0); + long rangeInterval = conf.getLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_INTERVAL, 0); + Range range = new Range(); + range.setStart(rangeStart); + range.setEnd(rangeEnd); + range.setInterval(rangeInterval); + return range; + } + + private TimePartitioning createTimePartitioning( + @Nullable String partitionByField, boolean requirePartitionFilter) { + TimePartitioning timePartitioning = new TimePartitioning(); + timePartitioning.setType("DAY"); + if (partitionByField != null) { + timePartitioning.setField(partitionByField); + } + timePartitioning.setRequirePartitionFilter(requirePartitionFilter); + return timePartitioning; + } + + private void createTableWithRangePartitionAndRequirePartitionFilter(TableReference tableRef, + @Nullable TableSchema schema, + RangePartitioning rangePartitioning) + throws IOException { + Table table = new Table(); + table.setSchema(schema); + table.setTableReference(tableRef); + table.setRequirePartitionFilter(true); + table.setRangePartitioning(rangePartitioning); + bigQueryHelper.getRawBigquery().tables() + .insert(tableRef.getProjectId(), tableRef.getDatasetId(), table) + .execute(); + } + + private RangePartitioning createRangePartitioning(@Nullable String partitionByField, @Nullable Range range) { + RangePartitioning rangePartitioning = new RangePartitioning(); + rangePartitioning.setRange(range); + if (partitionByField != null) { + rangePartitioning.setField(partitionByField); + } + return rangePartitioning; + } } } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java index c6afd3f766..38c690644a 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java @@ -17,6 +17,10 @@ import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobConfiguration; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.JobStatistics; import com.google.cloud.bigquery.Table; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; @@ -37,11 +41,14 @@ import org.apache.avro.mapred.AvroKey; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.UUID; import java.util.stream.Collectors; /** @@ -61,6 +68,10 @@ public final class BigQuerySink extends AbstractBigQuerySink { private final BigQuerySinkConfig config; + private final String jobId = UUID.randomUUID().toString(); + + private static final Logger LOG = LoggerFactory.getLogger(BigQuerySink.class); + public BigQuerySink(BigQuerySinkConfig config) { this.config = config; } @@ -98,12 +109,65 @@ protected void prepareRunValidation(BatchSinkContext context) { protected void prepareRunInternal(BatchSinkContext context, BigQuery bigQuery, String bucket) throws IOException { FailureCollector collector = context.getFailureCollector(); Schema configSchema = config.getSchema(collector); - configureTable(); - configureBigQuerySink(); Schema schema = configSchema == null ? context.getInputSchema() : configSchema; + configureTable(schema); + configureBigQuerySink(); initOutput(context, bigQuery, config.getReferenceName(), config.getTable(), schema, bucket, collector); } + @Override + public void onRunFinish(boolean succeeded, BatchSinkContext context) { + super.onRunFinish(succeeded, context); + + try { + recordMetric(succeeded, context); + } catch (Exception exception) { + LOG.warn("Exception while trying to emit metric. No metric will be emitted for the number of affected rows.", + exception); + } + } + + private void recordMetric(boolean succeeded, BatchSinkContext context) { + if (!succeeded) { + return; + } + Job queryJob = bigQuery.getJob(getJobId()); + if (queryJob == null) { + LOG.warn("Unable to find BigQuery job. No metric will be emitted for the number of affected rows."); + return; + } + long totalRows = getTotalRows(queryJob); + LOG.info("Job {} affected {} rows", queryJob.getJobId(), totalRows); + //work around since StageMetrics count() only takes int as of now + int cap = 10000; // so the loop will not cause significant delays + long count = totalRows / Integer.MAX_VALUE; + if (count > cap) { + LOG.warn("Total record count is too high! Metric for the number of affected rows may not be updated correctly"); + } + count = count < cap ? count : cap; + for (int i = 0; i <= count && totalRows > 0; i++) { + int rowCount = totalRows < Integer.MAX_VALUE ? (int) totalRows : Integer.MAX_VALUE; + context.getMetrics().count(RECORDS_UPDATED_METRIC, rowCount); + totalRows -= rowCount; + } + } + + private JobId getJobId() { + String location = bigQuery.getDataset(getConfig().getDataset()).getLocation(); + return JobId.newBuilder().setLocation(location).setJob(jobId).build(); + } + + private long getTotalRows(Job queryJob) { + JobConfiguration.Type type = queryJob.getConfiguration().getType(); + if (type == JobConfiguration.Type.LOAD) { + return ((JobStatistics.LoadStatistics) queryJob.getStatistics()).getOutputRows(); + } else if (type == JobConfiguration.Type.QUERY) { + return ((JobStatistics.QueryStatistics) queryJob.getStatistics()).getNumDmlAffectedRows(); + } + LOG.warn("Unable to identify BigQuery job type. No metric will be emitted for the number of affected rows."); + return 0; + } + @Override protected OutputFormatProvider getOutputFormatProvider(Configuration configuration, String tableName, @@ -131,8 +195,7 @@ public void transform(StructuredRecord input, * Sets addition configuration for the AbstractBigQuerySink's Hadoop configuration */ private void configureBigQuerySink() { - baseConfiguration.setBoolean(BigQueryConstants.CONFIG_CREATE_PARTITIONED_TABLE, - getConfig().shouldCreatePartitionedTable()); + baseConfiguration.set(BigQueryConstants.CONFIG_JOB_ID, jobId); if (config.getPartitionByField() != null) { baseConfiguration.set(BigQueryConstants.CONFIG_PARTITION_BY_FIELD, getConfig().getPartitionByField()); } @@ -151,22 +214,39 @@ private void configureBigQuerySink() { if (config.getPartitionFilter() != null) { baseConfiguration.set(BigQueryConstants.CONFIG_PARTITION_FILTER, getConfig().getPartitionFilter()); } + + PartitionType partitioningType = getConfig().getPartitioningType(); + baseConfiguration.setEnum(BigQueryConstants.CONFIG_PARTITION_TYPE, partitioningType); + + if (config.getRangeStart() != null) { + baseConfiguration.setLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_START, config.getRangeStart()); + } + if (config.getRangeEnd() != null) { + baseConfiguration.setLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_END, config.getRangeEnd()); + } + if (config.getRangeInterval() != null) { + baseConfiguration.setLong(BigQueryConstants.CONFIG_PARTITION_INTEGER_RANGE_INTERVAL, config.getRangeInterval()); + } } /** * Sets the output table for the AbstractBigQuerySink's Hadoop configuration */ - private void configureTable() { + private void configureTable(Schema schema) { AbstractBigQuerySinkConfig config = getConfig(); Table table = BigQueryUtil.getBigQueryTable(config.getProject(), config.getDataset(), config.getTable(), config.getServiceAccountFilePath()); baseConfiguration.setBoolean(BigQueryConstants.CONFIG_DESTINATION_TABLE_EXISTS, table != null); + List tableFieldsNames; if (table != null) { - List tableFieldsNames = Objects.requireNonNull(table.getDefinition().getSchema()).getFields().stream() + tableFieldsNames = Objects.requireNonNull(table.getDefinition().getSchema()).getFields().stream() .map(Field::getName).collect(Collectors.toList()); - baseConfiguration.set(BigQueryConstants.CONFIG_TABLE_FIELDS, String.join(",", tableFieldsNames)); + } else { + tableFieldsNames = schema.getFields().stream() + .map(Schema.Field::getName).collect(Collectors.toList()); } + baseConfiguration.set(BigQueryConstants.CONFIG_TABLE_FIELDS, String.join(",", tableFieldsNames)); } private void validateConfiguredSchema(Schema schema, FailureCollector collector) { diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java index 002cb01cdf..bbb32956ac 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java @@ -17,6 +17,7 @@ package io.cdap.plugin.gcp.bigquery.sink; import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.RangePartitioning; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TimePartitioning; @@ -26,6 +27,8 @@ import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.data.schema.Schema.LogicalType; +import io.cdap.cdap.api.data.schema.Schema.Type; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; import org.slf4j.Logger; @@ -37,7 +40,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -61,6 +63,10 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig { public static final String NAME_CLUSTERING_ORDER = "clusteringOrder"; public static final String NAME_OPERATION = "operation"; public static final String PARTITION_FILTER = "partitionFilter"; + public static final String NAME_PARTITIONING_TYPE = "partitioningType"; + public static final String NAME_RANGE_START = "rangeStart"; + public static final String NAME_RANGE_END = "rangeEnd"; + public static final String NAME_RANGE_INTERVAL = "rangeInterval"; public static final int MAX_NUMBER_OF_COLUMNS = 4; @@ -79,10 +85,38 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig { @Macro @Nullable - @Description("Whether to create the BigQuery table with time partitioning. This value is ignored if the table " + - "already exists.") + @Description("DEPRECATED!. Whether to create the BigQuery table with time partitioning. " + + "This value is ignored if the table already exists." + + " When this is set to false, value of Partitioning type will be used. Use 'Partitioning type' property") protected Boolean createPartitionedTable; + @Name(NAME_PARTITIONING_TYPE) + @Macro + @Nullable + @Description("Specifies the partitioning type. Can either be Integer or Time or None. " + + "Ignored when table already exists") + protected String partitioningType; + + @Name(NAME_RANGE_START) + @Macro + @Nullable + @Description("Start value for range partitioning. The start value is inclusive. Ignored when table already exists") + protected Long rangeStart; + + @Name(NAME_RANGE_END) + @Macro + @Nullable + @Description("End value for range partitioning. The end value is exclusive. Ignored when table already exists") + protected Long rangeEnd; + + @Name(NAME_RANGE_INTERVAL) + @Macro + @Nullable + @Description( + "Interval value for range partitioning. The interval value must be a positive integer." + + "Ignored when table already exists") + protected Long rangeInterval; + @Name(NAME_PARTITION_BY_FIELD) @Macro @Nullable @@ -132,12 +166,17 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig { protected String partitionFilter; public BigQuerySinkConfig(String referenceName, String dataset, String table, - @Nullable String bucket, @Nullable String schema) { + @Nullable String bucket, @Nullable String schema, @Nullable String partitioningType, + @Nullable Long rangeStart, @Nullable Long rangeEnd, @Nullable Long rangeInterval) { this.referenceName = referenceName; this.dataset = dataset; this.table = table; this.bucket = bucket; this.schema = schema; + this.partitioningType = partitioningType; + this.rangeStart = rangeStart; + this.rangeEnd = rangeEnd; + this.rangeInterval = rangeInterval; } public String getTable() { @@ -145,7 +184,7 @@ public String getTable() { } public boolean shouldCreatePartitionedTable() { - return createPartitionedTable == null ? false : createPartitionedTable; + return getPartitioningType() != PartitionType.NONE; } @Nullable @@ -189,6 +228,29 @@ public String getPartitionFilter() { return partitionFilter; } + @Nullable + public Long getRangeStart() { + return rangeStart; + } + + @Nullable + public Long getRangeEnd() { + return rangeEnd; + } + + @Nullable + public Long getRangeInterval() { + return rangeInterval; + } + + public PartitionType getPartitioningType() { + if (createPartitionedTable != null && createPartitionedTable) { + return PartitionType.TIME; + } + return Strings.isNullOrEmpty(partitioningType) ? PartitionType.TIME + : PartitionType.valueOf(partitioningType.toUpperCase()); + } + /** * @return the schema of the dataset */ @@ -265,7 +327,7 @@ private void validatePartitionProperties(@Nullable Schema schema, FailureCollect if (tryGetProject() == null) { return; } - String project = getProject(); + String project = getDatasetProject(); String dataset = getDataset(); String tableName = getTable(); String serviceAccountPath = getServiceAccountFilePath(); @@ -283,21 +345,60 @@ private void validatePartitionProperties(@Nullable Schema schema, FailureCollect "exists without partitioning. Please verify the partitioning configuration.", table.getTableId().getTable())); } - if (timePartitioning != null && timePartitioning.getField() != null - && !timePartitioning.getField().equals(partitionByField)) { - collector.addFailure(String.format("Destination table '%s' is partitioned by column '%s'.", - table.getTableId().getTable(), - timePartitioning.getField()), - String.format("Set the partition field to '%s'.", timePartitioning.getField())) - .withConfigProperty(NAME_PARTITION_BY_FIELD); + RangePartitioning rangePartitioning = tableDefinition.getRangePartitioning(); + if (timePartitioning == null && rangePartitioning == null && shouldCreatePartitionedTable()) { + LOG.warn(String.format( + "The plugin is configured to auto-create a partitioned table, but table '%s' already " + + "exists without partitioning. Please verify the partitioning configuration.", + table.getTableId().getTable())); + } else if (timePartitioning != null) { + validateTimePartitionTableWithInputConfiguration(table, timePartitioning, collector); + } else if (rangePartitioning != null) { + validateRangePartitionTableWithInputConfiguration(table, rangePartitioning, collector); } validateColumnForPartition(partitionByField, schema, collector); return; } - if (createPartitionedTable == null || !createPartitionedTable) { - return; + if (!shouldCreatePartitionedTable()) { + validateColumnForPartition(partitionByField, schema, collector); + } + } + + private void validateTimePartitionTableWithInputConfiguration(Table table, TimePartitioning timePartitioning, + FailureCollector collector) { + PartitionType partitioningType = getPartitioningType(); + if (partitioningType == PartitionType.TIME && timePartitioning.getField() != null && !timePartitioning.getField() + .equals(partitionByField)) { + collector.addFailure(String.format("Destination table '%s' is partitioned by column '%s'.", + table.getTableId().getTable(), + timePartitioning.getField()), + String.format("Set the partition field to '%s'.", timePartitioning.getField())) + .withConfigProperty(NAME_PARTITION_BY_FIELD); + } else if (partitioningType != PartitionType.TIME) { + LOG.warn(String.format("The plugin is configured to %s, but table '%s' already " + + "exists with Time partitioning. Please verify the partitioning configuration.", + partitioningType == PartitionType.INTEGER ? "auto-create a Integer partitioned table" + : "auto-create table without partition", + table.getTableId().getTable())); + } + } + + private void validateRangePartitionTableWithInputConfiguration(Table table, RangePartitioning rangePartitioning, + FailureCollector collector) { + PartitionType partitioningType = getPartitioningType(); + if (partitioningType != PartitionType.INTEGER) { + LOG.warn(String.format("The plugin is configured to %s, but table '%s' already " + + "exists with Integer partitioning. Please verify the partitioning configuration.", + partitioningType == PartitionType.TIME ? "auto-create a Time partitioned table" + : "auto-create table without partition", + table.getTableId().getTable())); + } else if (rangePartitioning.getField() != null && !rangePartitioning.getField().equals(partitionByField)) { + collector.addFailure(String.format("Destination table '%s' is partitioned by column '%s'.", + table.getTableId().getTable(), + rangePartitioning.getField()), + String.format("Set the partition field to '%s'.", rangePartitioning.getField())) + .withConfigProperty(NAME_PARTITION_BY_FIELD); } - validateColumnForPartition(partitionByField, schema, collector); } private void validateColumnForPartition(@Nullable String columnName, @Nullable Schema schema, @@ -314,16 +415,64 @@ private void validateColumnForPartition(@Nullable String columnName, @Nullable S } Schema fieldSchema = field.getSchema(); fieldSchema = fieldSchema.isNullable() ? fieldSchema.getNonNullable() : fieldSchema; + PartitionType partitioningType = getPartitioningType(); + if (partitioningType == PartitionType.TIME) { + validateTimePartitioningColumn(columnName, collector, fieldSchema); + } else if (partitioningType == PartitionType.INTEGER) { + validateIntegerPartitioningColumn(columnName, collector, fieldSchema); + validateIntegerPartitioningRange(getRangeStart(), getRangeEnd(), getRangeInterval(), collector); + } + } + + private void validateIntegerPartitioningColumn(String columnName, FailureCollector collector, Schema fieldSchema) { + if (fieldSchema.getType() != Type.INT && fieldSchema.getType() != Type.LONG) { + collector.addFailure( + String.format("Partition column '%s' is of invalid type '%s'.", columnName, fieldSchema.getDisplayName()), + "Partition column must be a int or long.").withConfigProperty(NAME_PARTITION_BY_FIELD) + .withOutputSchemaField(columnName).withInputSchemaField(columnName); + } + } + + private void validateTimePartitioningColumn(String columnName, FailureCollector collector, Schema fieldSchema) { Schema.LogicalType logicalType = fieldSchema.getLogicalType(); - if (logicalType != Schema.LogicalType.DATE && logicalType != Schema.LogicalType.TIMESTAMP_MICROS - && logicalType != Schema.LogicalType.TIMESTAMP_MILLIS) { + if (logicalType != LogicalType.DATE && logicalType != LogicalType.TIMESTAMP_MICROS + && logicalType != LogicalType.TIMESTAMP_MILLIS) { collector.addFailure( String.format("Partition column '%s' is of invalid type '%s'.", columnName, fieldSchema.getDisplayName()), - "Partition column must be a date or timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD) + "Partition column must be a date or timestamp.") + .withConfigProperty(NAME_PARTITION_BY_FIELD) .withOutputSchemaField(columnName).withInputSchemaField(columnName); } } + private void validateIntegerPartitioningRange(Long rangeStart, Long rangeEnd, Long rangeInterval, + FailureCollector collector) { + if (!containsMacro(NAME_RANGE_START) && rangeStart == null) { + collector.addFailure("Range Start is not defined.", + "For Integer Partitioning, Range Start must be defined.") + .withConfigProperty(NAME_RANGE_START); + } + if (!containsMacro(NAME_RANGE_END) && rangeEnd == null) { + collector.addFailure("Range End is not defined.", + "For Integer Partitioning, Range End must be defined.") + .withConfigProperty(NAME_RANGE_END); + } + + if (!containsMacro(NAME_RANGE_INTERVAL)) { + if (rangeInterval == null) { + collector.addFailure( + "Range Interval is not defined.", + "For Integer Partitioning, Range Interval must be defined.") + .withConfigProperty(NAME_RANGE_INTERVAL); + } else if (rangeInterval <= 0) { + collector.addFailure( + "Range Interval is not a positive number.", + "Range interval must be a valid positive integer.") + .withConfigProperty(NAME_RANGE_INTERVAL); + } + } + } + private void validateClusteringOrder(@Nullable Schema schema, FailureCollector collector) { if (!shouldCreatePartitionedTable() || Strings.isNullOrEmpty(clusteringOrder) || schema == null) { return; @@ -451,8 +600,11 @@ private boolean isSupportedLogicalType(Schema.LogicalType logicalType) { * Returns true if bigquery table can be connected to or schema is not a macro. */ boolean shouldConnect() { - return !containsMacro(BigQuerySinkConfig.NAME_DATASET) && !containsMacro(BigQuerySinkConfig.NAME_TABLE) && + return !containsMacro(BigQuerySinkConfig.NAME_DATASET) && + !containsMacro(BigQuerySinkConfig.NAME_TABLE) && !containsMacro(BigQuerySinkConfig.NAME_SERVICE_ACCOUNT_FILE_PATH) && - !containsMacro(BigQuerySinkConfig.NAME_PROJECT) && !containsMacro(BigQuerySinkConfig.NAME_SCHEMA); + !containsMacro(BigQuerySinkConfig.NAME_PROJECT) && + !containsMacro(BigQuerySinkConfig.DATASET_PROJECT_ID) && + !containsMacro(BigQuerySinkConfig.NAME_SCHEMA); } } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/PartitionType.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/PartitionType.java new file mode 100644 index 0000000000..701517bd2b --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/PartitionType.java @@ -0,0 +1,26 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.bigquery.sink; + +/** + * The type of partition + */ +public enum PartitionType { + TIME, + INTEGER, + NONE +} diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java index aa79ff4437..73115a0f34 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryConstants.java @@ -22,7 +22,6 @@ public interface BigQueryConstants { String CONFIG_ALLOW_SCHEMA_RELAXATION = "cdap.bq.sink.allow.schema.relaxation"; String CONFIG_DESTINATION_TABLE_EXISTS = "cdap.bq.sink.destination.table.exists"; - String CONFIG_CREATE_PARTITIONED_TABLE = "cdap.bq.sink.create.partitioned.table"; String CONFIG_PARTITION_BY_FIELD = "cdap.bq.sink.partition.by.field"; String CONFIG_REQUIRE_PARTITION_FILTER = "cdap.bq.sink.require.partition.filter"; String CONFIG_PARTITION_FROM_DATE = "cdap.bq.source.partition.from.date"; @@ -36,4 +35,8 @@ public interface BigQueryConstants { String CONFIG_FILTER = "cdap.bq.source.filter"; String CONFIG_PARTITION_FILTER = "cdap.bq.sink.partition.filter"; String CONFIG_JOB_ID = "cdap.bq.sink.job.id"; + String CONFIG_PARTITION_TYPE = "cdap.bq.sink.partition.type"; + String CONFIG_PARTITION_INTEGER_RANGE_START = "cdap.bq.sink.partition.integer.range.start"; + String CONFIG_PARTITION_INTEGER_RANGE_END = "cdap.bq.sink.partition.integer.range.end"; + String CONFIG_PARTITION_INTEGER_RANGE_INTERVAL = "cdap.bq.sink.partition.integer.range.interval"; } diff --git a/src/main/java/io/cdap/plugin/gcp/bigtable/sink/BigtableSink.java b/src/main/java/io/cdap/plugin/gcp/bigtable/sink/BigtableSink.java index 38fcb6e98d..702b7d5667 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigtable/sink/BigtableSink.java +++ b/src/main/java/io/cdap/plugin/gcp/bigtable/sink/BigtableSink.java @@ -1,5 +1,5 @@ /* - * Copyright © 2019 Cask Data, Inc. + * Copyright © 2019-2020 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -48,6 +48,8 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; @@ -65,6 +67,7 @@ @Description("This sink writes data to Google Cloud Bigtable. " + "Cloud Bigtable is Google's NoSQL Big Data database service.") public final class BigtableSink extends BatchSink { + private static final Logger LOG = LoggerFactory.getLogger(BigtableSink.class); public static final String NAME = "Bigtable"; private static final Set SUPPORTED_FIELD_TYPES = ImmutableSet.of( @@ -103,10 +106,8 @@ public void configurePipeline(PipelineConfigurer configurer) { validateExistingTable(connection, tableName, collector); } } catch (IOException e) { - collector.addFailure( - String.format("Failed to connect to BigTable : %s", e.getMessage()), null) - .withConfigProperty(BigtableSinkConfig.BIGTABLE_OPTIONS) - .withStacktrace(e.getStackTrace()); + // Don't fail deployments due to connect failures + LOG.warn("Failed to connect to BigTable.", e); } } } @@ -210,7 +211,8 @@ private void createTable(Connection connection, TableName tableName, FailureColl } } - private void validateExistingTable(Connection connection, TableName tableName, FailureCollector collector) { + private void validateExistingTable(Connection connection, TableName tableName, FailureCollector collector) + throws IOException { try (Table table = connection.getTable(tableName)) { Set existingFamilies = table.getTableDescriptor() .getFamiliesKeys() @@ -228,9 +230,6 @@ private void validateExistingTable(Connection connection, TableName tableName, F ConfigUtil.getKVPair(entry.getKey(), entry.getValue().getQualifiedName(), "=")); } } - } catch (IOException e) { - collector.addFailure(String.format("Failed to connect to Bigtable : %s", e.getMessage()), null) - .withStacktrace(e.getStackTrace()); } } diff --git a/src/main/java/io/cdap/plugin/gcp/bigtable/source/BigtableSource.java b/src/main/java/io/cdap/plugin/gcp/bigtable/source/BigtableSource.java index 3d75d4104d..4cbced07e2 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigtable/source/BigtableSource.java +++ b/src/main/java/io/cdap/plugin/gcp/bigtable/source/BigtableSource.java @@ -1,5 +1,5 @@ /* - * Copyright © 2019 Cask Data, Inc. + * Copyright © 2019-2020 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -85,10 +85,8 @@ public void configurePipeline(PipelineConfigurer configurer) { Configuration conf = getConfiguration(collector); validateOutputSchema(conf, collector); } catch (IOException e) { - collector.addFailure( - String.format("Failed to prepare configuration for job : %s", e.getMessage()), null) - .withConfigProperty(BigtableSourceConfig.BIGTABLE_OPTIONS) - .withStacktrace(e.getStackTrace()); + // Don't fail deployment on connection failure + LOG.warn("Failed to validate output schema", e); } } @@ -103,14 +101,21 @@ public void prepareRun(BatchSourceContext context) { Configuration conf = null; try { conf = getConfiguration(collector); - validateOutputSchema(conf, collector); + } catch (IOException e) { collector.addFailure( String.format("Failed to prepare configuration for job : %s", e.getMessage()), null) .withConfigProperty(BigtableSourceConfig.BIGTABLE_OPTIONS) .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); + } + try { + validateOutputSchema(conf, collector); + } catch (IOException e) { + collector.addFailure(String.format("Failed to connect to Bigtable : %s", e.getMessage()), null) + .withStacktrace(e.getStackTrace()); + collector.getOrThrowException(); } - collector.getOrThrowException(); // Both emitLineage and setOutputFormat internally try to create an external dataset if it does not already exists. // We call emitLineage before since it creates the dataset with schema. @@ -163,7 +168,7 @@ private Configuration getConfiguration(FailureCollector collector) throws IOExce return conf; } - private void validateOutputSchema(Configuration configuration, FailureCollector collector) { + private void validateOutputSchema(Configuration configuration, FailureCollector collector) throws IOException { TableName tableName = TableName.valueOf(config.table); try (Connection connection = BigtableConfiguration.connect(configuration); Table table = connection.getTable(tableName)) { @@ -182,9 +187,6 @@ private void validateOutputSchema(Configuration configuration, FailureCollector ConfigUtil.getKVPair(key, columnMappings.get(key), "=")); } } - } catch (IOException e) { - collector.addFailure(String.format("Failed to connect to Bigtable : %s", e.getMessage()), null) - .withStacktrace(e.getStackTrace()); } } diff --git a/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java b/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java index 45e802ea48..c6060d2077 100644 --- a/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java @@ -39,6 +39,8 @@ */ public class GCPUtils { public static final String CMEK_KEY = "gcp.cmek.key.name"; + public static final String FS_GS_PROJECT_ID = "fs.gs.project.id"; + public static final String CLOUD_JSON_KEYFILE = "google.cloud.auth.service.account.json.keyfile"; public static ServiceAccountCredentials loadServiceAccountCredentials(String path) throws IOException { File credentialsPath = new File(path); @@ -51,12 +53,12 @@ public static Map getFileSystemProperties(GCPConfig config, Stri Map properties) { String serviceAccountFilePath = config.getServiceAccountFilePath(); if (serviceAccountFilePath != null) { - properties.put("google.cloud.auth.service.account.json.keyfile", serviceAccountFilePath); + properties.put(CLOUD_JSON_KEYFILE, serviceAccountFilePath); } properties.put("fs.gs.impl", GoogleHadoopFileSystem.class.getName()); properties.put("fs.AbstractFileSystem.gs.impl", GoogleHadoopFS.class.getName()); String projectId = config.getProject(); - properties.put("fs.gs.project.id", projectId); + properties.put(FS_GS_PROJECT_ID, projectId); properties.put("fs.gs.system.bucket", GCSPath.from(path).getBucket()); properties.put("fs.gs.path.encoding", "uri-path"); properties.put("fs.gs.working.dir", GCSPath.ROOT_DIR); diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/StorageClient.java b/src/main/java/io/cdap/plugin/gcp/gcs/StorageClient.java index 3d9d23c728..58020be127 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/StorageClient.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/StorageClient.java @@ -1,5 +1,5 @@ /* - * Copyright © 2019 Cask Data, Inc. + * Copyright © 2019-2020 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -19,9 +19,11 @@ import com.google.api.gax.paging.Page; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Bucket; import com.google.cloud.storage.CopyWriter; import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; import com.google.common.annotations.VisibleForTesting; import io.cdap.plugin.gcp.common.GCPUtils; @@ -30,7 +32,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.function.Consumer; import javax.annotation.Nullable; @@ -45,6 +49,62 @@ private StorageClient(Storage storage) { this.storage = storage; } + /** + * Picks one blob that has the path prefix and is not ending with '/' + * @param path + * @return + */ + public Blob pickABlob(String path) { + if (path == null || path.isEmpty()) { + return null; + } + GCSPath gcsPath = GCSPath.from(path); + Page blobPage = storage.list(gcsPath.getBucket(), Storage.BlobListOption.prefix(gcsPath.getName())); + Iterator iterator = blobPage.getValues().iterator(); + while (iterator.hasNext()) { + Blob blob = iterator.next(); + if (blob.getName().endsWith("/")) { + continue; + } + return blob; + } + return null; + } + + /** + * Updates the metadata for the blob + * @param blob + * @param metaData + */ + public void setMetaData(Blob blob, Map metaData) { + if (blob == null || metaData == null || metaData.isEmpty()) { + return; + } + storage.update(BlobInfo.newBuilder(blob.getBlobId()).setMetadata(metaData).build()); + } + + /** + * Applies the given function with metadata of each blobs in the path + * @param path + * @param function + */ + public void mapMetaDataForAllBlobs(String path, Consumer> function) { + if (path == null || path.isEmpty() || function == null) { + return; + } + GCSPath gcsPath = GCSPath.from(path); + Page blobPage = storage.list(gcsPath.getBucket(), Storage.BlobListOption.prefix(gcsPath.getName())); + Iterator blobIterator = blobPage.iterateAll().iterator(); + while (blobIterator.hasNext()) { + Blob blob = blobIterator.next(); + Map metadata = blob.getMetadata(); + if (metadata == null) { + continue; + } + function.accept(metadata); + } + } + /** * Copy objects from the source path to the destination path. If the source path is a single object, that object * will be copied to the destination. If the source path represents a directory, objects within the directory @@ -82,12 +142,28 @@ public void move(GCSPath sourcePath, GCSPath destPath, boolean recursive, boolea private void pairTraverse(GCSPath sourcePath, GCSPath destPath, boolean recursive, boolean overwrite, Consumer consumer) { - Bucket sourceBucket = storage.get(sourcePath.getBucket()); + Bucket sourceBucket = null; + try { + sourceBucket = storage.get(sourcePath.getBucket()); + } catch (StorageException e) { + // Add more descriptive error message + throw new RuntimeException( + String.format("Unable to access source bucket %s. ", sourcePath.getBucket()) + + "Ensure you entered the correct bucket path.", e); + } if (sourceBucket == null) { throw new IllegalArgumentException( String.format("Source bucket '%s' does not exist.", sourcePath.getBucket())); } - Bucket destBucket = storage.get(destPath.getBucket()); + Bucket destBucket = null; + try { + destBucket = storage.get(destPath.getBucket()); + } catch (StorageException e) { + // Add more descriptive error message + throw new RuntimeException( + String.format("Unable to access destination bucket %s. ", destPath.getBucket()) + + "Ensure you entered the correct bucket path.", e); + } if (destBucket == null) { throw new IllegalArgumentException( String.format("Destination bucket '%s' does not exist. Please create it first.", destPath.getBucket())); diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetter.java b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetter.java new file mode 100644 index 0000000000..0f8f0bf500 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetter.java @@ -0,0 +1,246 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.gcs.actions; + +import com.google.auth.oauth2.ServiceAccountCredentials; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import com.google.common.base.Joiner; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonSyntaxException; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.StageConfigurer; +import io.cdap.cdap.etl.api.action.Action; +import io.cdap.cdap.etl.api.action.ActionContext; +import io.cdap.plugin.gcp.common.GCPUtils; +import io.cdap.plugin.gcp.gcs.GCSPath; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * This class GCSArgumentSetter get json file configuration from GCS. + * + *

The plugin provides the ability to map json properties as pipeline arguments name and columns + * values as pipeline arguments + * { + * "arguments" : [ + * { "name" : "input.path", "type" : "string", "value" : "/data/sunny_feeds/master"}, + * { "name" : "parse.schema, "type" : "schema", "value" : [ + * { "name" : "fname", "type" : "string", "nullable" : true }, + * { "name" : "age", "type" : "int", "nullable" : true}, + * { "name" : "salary", "type" : "float", "nullable" : false} + * ]}, + * { "name" : "directives", "type" : "array", "value" : [ + * "parse-as-json body", + * "columns-replace s/body_//g", + * "keep f1,f2" + * ]} + * ] + * } + * + * In case when `nullable` is not specified inside field json, then it defaults to true. + * + */ +@Plugin(type = Action.PLUGIN_TYPE) +@Name(GCSArgumentSetter.NAME) +@Description("Argument setter for dynamically configuring pipeline from GCS") +public final class GCSArgumentSetter extends Action { + + public static final String NAME = "GCSArgumentSetter"; + private GCSArgumentSetterConfig config; + + @Override + public void configurePipeline(PipelineConfigurer configurer) { + super.configurePipeline(configurer); + StageConfigurer stageConfigurer = configurer.getStageConfigurer(); + FailureCollector collector = stageConfigurer.getFailureCollector(); + config.validate(collector); + } + + @Override + public void run(ActionContext context) throws Exception { + config.validateProperties(context.getFailureCollector()); + String fileContent = GCSArgumentSetter.getContent(config); + + try { + Configuration configuration = + new GsonBuilder().create().fromJson(fileContent, Configuration.class); + for (Argument argument : configuration.getArguments()) { + String name = argument.getName(); + String value = argument.getValue(); + if (value != null) { + context.getArguments().set(name, value); + } else { + throw new RuntimeException( + "Configuration '" + name + "' is null. Cannot set argument to null."); + } + } + } catch (JsonSyntaxException e) { + throw new RuntimeException( + String.format( + "Could not parse response from '%s': %s", config.getPath(), e.getMessage())); + } + } + + private static Storage getStorage(GCSArgumentSetterConfig config) throws IOException { + return StorageOptions.newBuilder() + .setProjectId(config.getProject()) + .setCredentials(getCredentials(config)) + .build() + .getService(); + } + + public static ServiceAccountCredentials getCredentials(GCSArgumentSetterConfig config) + throws IOException { + ServiceAccountCredentials credentials = null; + switch (config.getServiceAccountType()) { + case FILE_PATH: + credentials = GCPUtils.loadServiceAccountCredentials(config.getServiceAccountFilePath()); + break; + case JSON: + InputStream jsonInputStream = + new ByteArrayInputStream(config.getServiceAccountJSON().getBytes()); + credentials = ServiceAccountCredentials.fromStream(jsonInputStream); + break; + } + return credentials; + } + + public static String getContent(GCSArgumentSetterConfig config) throws IOException { + Storage storage = getStorage(config); + GCSPath path = config.getPath(); + Blob blob = storage.get(path.getBucket(), path.getName()); + return new String(blob.getContent()); + } + + private static final class Configuration { + + private List arguments; + + public List getArguments() { + return arguments; + } + } + + private static final class Argument { + + private String name; + private String type; + private JsonElement value; + + Argument() { + type = "string"; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getValue() { + if (value == null) { + throw new IllegalArgumentException("Null Argument value for name '" + name + "'"); + } + if (type.equalsIgnoreCase("schema")) { + return createSchema(value).toString(); + } else if (type.equalsIgnoreCase("int")) { + return Integer.toString(value.getAsInt()); + } else if (type.equalsIgnoreCase("float")) { + return Float.toString(value.getAsFloat()); + } else if (type.equalsIgnoreCase("double")) { + return Double.toString(value.getAsDouble()); + } else if (type.equalsIgnoreCase("short")) { + return Short.toString(value.getAsShort()); + } else if (type.equalsIgnoreCase("string")) { + return value.getAsString(); + } else if (type.equalsIgnoreCase("char")) { + return Character.toString(value.getAsCharacter()); + } else if (type.equalsIgnoreCase("array")) { + List values = new ArrayList<>(); + for (JsonElement v : value.getAsJsonArray()) { + values.add(v.getAsString()); + } + return Joiner.on("\n").join(values); + } else if (type.equalsIgnoreCase("map")) { + List values = new ArrayList<>(); + for (Map.Entry entry : value.getAsJsonObject().entrySet()) { + values.add(String.format("%s=%s", entry.getKey(), entry.getValue().getAsString())); + } + return Joiner.on(";").join(values); + } + + throw new IllegalArgumentException("Invalid argument value '" + value + "'"); + } + + private Schema createSchema(JsonElement array) { + List fields = new ArrayList<>(); + for (JsonElement field : array.getAsJsonArray()) { + Schema.Type type = Schema.Type.STRING; + + JsonObject object = field.getAsJsonObject(); + String fieldType = object.get("type").getAsString().toLowerCase(); + + boolean isNullable = true; + if (object.get("nullable") != null) { + isNullable = object.get("nullable").getAsBoolean(); + } + + String name = object.get("name").getAsString(); + + if (fieldType.equals("double")) { + type = Schema.Type.DOUBLE; + } else if (fieldType.equals("float")) { + type = Schema.Type.FLOAT; + } else if (fieldType.equals("long")) { + type = Schema.Type.LONG; + } else if (fieldType.equals("int")) { + type = Schema.Type.INT; + } else if (fieldType.equals("short")) { + type = Schema.Type.INT; + } else if (fieldType.equals("string")) { + type = Schema.Type.STRING; + } + + Schema schema; + if (isNullable) { + schema = Schema.nullableOf(Schema.of(type)); + } else { + schema = Schema.of(type); + } + Schema.Field fld = Schema.Field.of(name, schema); + fields.add(fld); + } + + return Schema.recordOf("record", fields); + } + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetterConfig.java b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetterConfig.java new file mode 100644 index 0000000000..573b9c75bf --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetterConfig.java @@ -0,0 +1,125 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.gcs.actions; + +import com.google.common.base.Strings; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Macro; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.plugin.gcp.common.GCPConfig; +import io.cdap.plugin.gcp.gcs.GCSPath; +import javax.annotation.Nullable; + +/** Holds configuration required for configuring {@link GCSArgumentSetter}. */ +public final class GCSArgumentSetterConfig extends GCPConfig { + + public static final String NAME_PATH = "path"; + public static final String NAME_SERVICE_ACCOUNT_TYPE = "serviceAccountType"; + public static final String NAME_SERVICE_ACCOUNT_JSON = "serviceAccountJSON"; + + @Name(NAME_PATH) + @Macro + @Description("GCS Path to the file containing the arguments") + private String path; + + @Name(NAME_SERVICE_ACCOUNT_TYPE) + @Macro + @Nullable + @Description( + "Provide service account as JSON. When it is set to 'Yes', " + + "the content of service account key needs to be copied, whereas when it is set to 'No' " + + "the service Account file path needs to be specified. The default value is 'No'") + private String serviceAccountType; + + @Name(NAME_SERVICE_ACCOUNT_JSON) + @Macro + @Nullable + @Description("The content of the service account.") + private String serviceAccountJSON; + + public void validate(FailureCollector collector) { + validateProperties(collector); + + if (canConnect()) { + try { + GCSArgumentSetter.getContent(this); + } catch (Exception e) { + collector.addFailure("Can not get content from GCP!", null); + } + } + collector.getOrThrowException(); + } + + public void validateProperties(FailureCollector collector) { + if (!containsMacro(NAME_PATH)) { + try { + getPath(); + } catch (IllegalArgumentException e) { + collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_PATH); + } + } + + if (getServiceAccountType() == ServiceAccountType.JSON + && !containsMacro(NAME_SERVICE_ACCOUNT_JSON) + && Strings.isNullOrEmpty(getServiceAccountJSON())) { + collector + .addFailure("Required property 'Service Account JSON' has no value.", "") + .withConfigProperty(NAME_SERVICE_ACCOUNT_JSON); + } + } + + private boolean canConnect() { + boolean canConnect = + !containsMacro(NAME_PATH) + && !(containsMacro(NAME_PROJECT) || AUTO_DETECT.equals(project)) + && !(containsMacro(NAME_SERVICE_ACCOUNT_TYPE)); + + if (!canConnect) { + return false; + } + + ServiceAccountType serviceAccountType = getServiceAccountType(); + + if (serviceAccountType == ServiceAccountType.FILE_PATH) { + return !containsMacro(NAME_SERVICE_ACCOUNT_FILE_PATH) + && !Strings.isNullOrEmpty(getServiceAccountFilePath()); + } + return !containsMacro(NAME_SERVICE_ACCOUNT_JSON) + && !Strings.isNullOrEmpty(getServiceAccountJSON()); + } + + public GCSPath getPath() { + return GCSPath.from(path); + } + + public ServiceAccountType getServiceAccountType() { + return "JSON".equalsIgnoreCase(serviceAccountType) + ? ServiceAccountType.JSON + : ServiceAccountType.FILE_PATH; + } + + public String getServiceAccountJSON() { + return serviceAccountJSON; + } + + /** The type of service account. */ + public enum ServiceAccountType { + FILE_PATH, + JSON; + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketCreate.java b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketCreate.java index b4894db306..d645544400 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketCreate.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketCreate.java @@ -1,5 +1,5 @@ /* - * Copyright © 2015 Cask Data, Inc. + * Copyright © 2015-2020 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -17,7 +17,9 @@ package io.cdap.plugin.gcp.gcs.actions; import com.google.auth.Credentials; +import com.google.cloud.storage.Bucket; import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageException; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; @@ -100,7 +102,16 @@ public void run(ActionContext context) throws Exception { } // create the gcs buckets if not exist - if (storage.get(gcsPath.getBucket()) == null) { + Bucket bucket = null; + try { + bucket = storage.get(gcsPath.getBucket()); + } catch (StorageException e) { + // Add more descriptive error message + throw new RuntimeException( + String.format("Unable to access or create bucket %s. ", gcsPath.getBucket()) + + "Ensure you entered the correct bucket path and have permissions for it.", e); + } + if (bucket == null) { GCPUtils.createBucket(storage, gcsPath.getBucket(), config.location, context.getArguments().get(GCPUtils.CMEK_KEY)); undoBucket.add(bucketPath); diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketDelete.java b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketDelete.java index 412b983b19..72d51f0421 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketDelete.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketDelete.java @@ -1,5 +1,5 @@ /* - * Copyright © 2015 Cask Data, Inc. + * Copyright © 2015-2020 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -16,6 +16,9 @@ package io.cdap.plugin.gcp.gcs.actions; +import com.google.auth.Credentials; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageException; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; @@ -25,6 +28,7 @@ import io.cdap.cdap.etl.api.action.Action; import io.cdap.cdap.etl.api.action.ActionContext; import io.cdap.plugin.gcp.common.GCPConfig; +import io.cdap.plugin.gcp.common.GCPUtils; import io.cdap.plugin.gcp.gcs.GCSPath; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -61,6 +65,8 @@ public void run(ActionContext context) throws Exception { Configuration configuration = new Configuration(); String serviceAccountFilePath = config.getServiceAccountFilePath(); + Credentials credentials = serviceAccountFilePath == null ? + null : GCPUtils.loadServiceAccountCredentials(serviceAccountFilePath); if (serviceAccountFilePath != null) { configuration.set("google.cloud.auth.service.account.json.keyfile", serviceAccountFilePath); } @@ -74,8 +80,19 @@ public void run(ActionContext context) throws Exception { configuration.setBoolean("fs.gs.impl.disable.cache", true); List gcsPaths = new ArrayList<>(); + Storage storage = GCPUtils.getStorage(config.getProject(), credentials); for (String path : config.getPaths()) { - gcsPaths.add(new Path(GCSPath.from(path).getUri())); + GCSPath gcsPath = GCSPath.from(path); + // Check if the bucket is accessible + try { + storage.get(gcsPath.getBucket()); + } catch (StorageException e) { + // Add more descriptive error message + throw new RuntimeException( + String.format("Unable to access or create bucket %s. ", gcsPath.getBucket()) + + "Ensure you entered the correct bucket path and have permissions for it.", e); + } + gcsPaths.add(new Path(gcsPath.getUri())); } FileSystem fs; diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java index b194ef272b..f11b3ddec7 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java @@ -1,5 +1,5 @@ /* - * Copyright © 2015-2016 Cask Data, Inc. + * Copyright © 2015-2020 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -17,8 +17,13 @@ package io.cdap.plugin.gcp.gcs.sink; import com.google.auth.Credentials; +import com.google.cloud.storage.Bucket; import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageException; import com.google.common.base.Strings; +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; +import com.google.gson.reflect.TypeToken; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; @@ -26,8 +31,10 @@ import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.StageMetrics; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; +import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat; import io.cdap.plugin.common.LineageRecorder; import io.cdap.plugin.format.FileFormat; import io.cdap.plugin.format.plugin.AbstractFileSink; @@ -35,9 +42,13 @@ import io.cdap.plugin.gcp.common.GCPReferenceSinkConfig; import io.cdap.plugin.gcp.common.GCPUtils; import io.cdap.plugin.gcp.gcs.GCSPath; +import io.cdap.plugin.gcp.gcs.StorageClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.text.SimpleDateFormat; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -50,7 +61,16 @@ @Name("GCS") @Description("Writes records to one or more files in a directory on Google Cloud Storage.") public class GCSBatchSink extends AbstractFileSink { + + private static final Logger LOG = LoggerFactory.getLogger(GCSBatchSink.class); + public static final String RECORD_COUNT = "recordcount"; + private static final String RECORDS_UPDATED_METRIC = "records.updated"; + public static final String AVRO_NAMED_OUTPUT = "avro.mo.config.namedOutput"; + public static final String COMMON_NAMED_OUTPUT = "mapreduce.output.basename"; + private final GCSBatchSinkConfig config; + private String outputPath; + public GCSBatchSink(GCSBatchSinkConfig config) { super(config); @@ -62,6 +82,18 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { super.configurePipeline(pipelineConfigurer); } + @Override + public ValidatingOutputFormat getValidatingOutputFormat(PipelineConfigurer pipelineConfigurer) { + ValidatingOutputFormat delegate = super.getValidatingOutputFormat(pipelineConfigurer); + return new GCSOutputFormatProvider(delegate); + } + + @Override + public ValidatingOutputFormat getOutputFormatForRun(BatchSinkContext context) throws InstantiationException { + ValidatingOutputFormat outputFormatForRun = super.getOutputFormatForRun(context); + return new GCSOutputFormatProvider(outputFormatForRun); + } + @Override public void prepareRun(BatchSinkContext context) throws Exception { super.prepareRun(context); @@ -69,14 +101,37 @@ public void prepareRun(BatchSinkContext context) throws Exception { Credentials credentials = config.getServiceAccountFilePath() == null ? null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccountFilePath()); Storage storage = GCPUtils.getStorage(config.getProject(), credentials); - if (storage.get(config.getBucket()) == null) { + Bucket bucket; + try { + bucket = storage.get(config.getBucket()); + } catch (StorageException e) { + throw new RuntimeException( + String.format("Unable to access or create bucket %s. ", config.getBucket()) + + "Ensure you entered the correct bucket path and have permissions for it.", e); + } + if (bucket == null) { GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKey); } } @Override protected Map getFileSystemProperties(BatchSinkContext context) { - return GCPUtils.getFileSystemProperties(config, config.getPath(), new HashMap<>()); + Map properties = GCPUtils.getFileSystemProperties(config, config.getPath(), new HashMap<>()); + properties.putAll(config.getFileSystemProperties()); + String outputFileBaseName = config.getOutputFileNameBase(); + if (outputFileBaseName == null || outputFileBaseName.isEmpty()) { + return properties; + } + + properties.put(AVRO_NAMED_OUTPUT, outputFileBaseName); + properties.put(COMMON_NAMED_OUTPUT, outputFileBaseName); + return properties; + } + + @Override + protected String getOutputDir(long logicalStartTime) { + this.outputPath = super.getOutputDir(logicalStartTime); + return this.outputPath; } @Override @@ -84,6 +139,88 @@ protected void recordLineage(LineageRecorder lineageRecorder, List outpu lineageRecorder.recordWrite("Write", "Wrote to Google Cloud Storage.", outputFields); } + @Override + public void onRunFinish(boolean succeeded, BatchSinkContext context) { + super.onRunFinish(succeeded, context); + emitMetrics(succeeded, context); + } + + private void emitMetrics(boolean succeeded, BatchSinkContext context) { + if (!succeeded) { + return; + } + + try { + StorageClient storageClient = StorageClient.create(config.getProject(), config.getServiceAccountFilePath()); + storageClient.mapMetaDataForAllBlobs(getPrefixPath(), new MetricsEmitter(context.getMetrics())::emitMetrics); + } catch (Exception e) { + LOG.warn("Metrics for the number of affected rows in GCS Sink maybe incorrect.", e); + } + } + + private String getPrefixPath() { + String filenameBase = getFilenameBase(); + if (filenameBase == null) { + return outputPath; + } + //Following code is for handling a special case for saving files in same output directory. + //The specified file prefix from Filesystem Properties/Output file base name can be used to make filename unique. + //The code is based on assumptions from the internal implementation of + //org.apache.avro.mapreduce.AvroOutputFormatBase and org.apache.hadoop.mapreduce.lib.output.FileOutputFormat + String outputPathPrefix = outputPath.endsWith("/") ? outputPath.substring(0, outputPath.length() - 1) : outputPath; + return String.format("%s/%s-", outputPathPrefix, filenameBase); + } + + @Nullable + private String getFilenameBase() { + String outputFileBaseName = config.getOutputFileNameBase(); + if (outputFileBaseName != null && !outputFileBaseName.isEmpty()) { + return outputFileBaseName; + } + + Map fileSystemProperties = config.getFileSystemProperties(); + if (fileSystemProperties.containsKey(AVRO_NAMED_OUTPUT) && config.getFormat() == FileFormat.AVRO) { + return fileSystemProperties.get(AVRO_NAMED_OUTPUT); + } + if (fileSystemProperties.containsKey(COMMON_NAMED_OUTPUT)) { + return fileSystemProperties.get(COMMON_NAMED_OUTPUT); + } + return null; + } + + private static class MetricsEmitter { + private StageMetrics stageMetrics; + + private MetricsEmitter(StageMetrics stageMetrics) { + this.stageMetrics = stageMetrics; + } + + public void emitMetrics(Map metaData) { + long totalRows = extractRecordCount(metaData); + if (totalRows == 0) { + return; + } + + //work around since StageMetrics count() only takes int as of now + int cap = 10000; // so the loop will not cause significant delays + long count = totalRows / Integer.MAX_VALUE; + if (count > cap) { + LOG.warn("Total record count is too high! Metric for the number of affected rows may not be updated correctly"); + } + count = count < cap ? count : cap; + for (int i = 0; i <= count && totalRows > 0; i++) { + int rowCount = totalRows < Integer.MAX_VALUE ? (int) totalRows : Integer.MAX_VALUE; + stageMetrics.count(RECORDS_UPDATED_METRIC, rowCount); + totalRows -= rowCount; + } + } + + private long extractRecordCount(Map metadata) { + String value = metadata.get(RECORD_COUNT); + return value == null ? 0L : Long.parseLong(value); + } + } + /** * Sink configuration. */ @@ -94,6 +231,8 @@ public static class GCSBatchSinkConfig extends GCPReferenceSinkConfig implements private static final String NAME_FORMAT = "format"; private static final String NAME_SCHEMA = "schema"; private static final String NAME_LOCATION = "location"; + private static final String NAME_FS_PROPERTIES = "fileSystemProperties"; + private static final String NAME_FILE_NAME_BASE = "outputFileNameBase"; private static final String SCHEME = "gs://"; @Name(NAME_PATH) @@ -132,6 +271,18 @@ public static class GCSBatchSinkConfig extends GCPReferenceSinkConfig implements "This value is ignored if the bucket already exists") protected String location; + @Name(NAME_FS_PROPERTIES) + @Macro + @Nullable + @Description("Advanced feature to specify any additional properties that should be used with the sink.") + private String fileSystemProperties; + + @Name(NAME_FILE_NAME_BASE) + @Macro + @Nullable + @Description("Advanced feature to specify file output name prefix.") + private String outputFileNameBase; + @Override public void validate() { // no-op @@ -171,6 +322,13 @@ public void validate(FailureCollector collector) { } catch (IllegalArgumentException e) { collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_SCHEMA).withStacktrace(e.getStackTrace()); } + + try { + getFileSystemProperties(); + } catch (IllegalArgumentException e) { + collector.addFailure("File system properties must be a valid json.", null) + .withConfigProperty(NAME_FS_PROPERTIES).withStacktrace(e.getStackTrace()); + } } public String getBucket() { @@ -215,5 +373,22 @@ public String getDelimiter() { public String getLocation() { return location; } + + public Map getFileSystemProperties() { + if (fileSystemProperties == null || fileSystemProperties.isEmpty()) { + return Collections.emptyMap(); + } + try { + return new Gson().fromJson(fileSystemProperties, new TypeToken>() { + }.getType()); + } catch (JsonSyntaxException e) { + throw new IllegalArgumentException("Unable to parse filesystem properties: " + e.getMessage(), e); + } + } + + @Nullable + public String getOutputFileNameBase() { + return outputFileNameBase; + } } } diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java index 4421ed8604..966a846617 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java @@ -1,5 +1,5 @@ /* - * Copyright © 2015-2016 Cask Data, Inc. + * Copyright © 2015-2020 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -18,6 +18,7 @@ import com.google.auth.Credentials; import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageException; import com.google.common.base.Strings; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; @@ -100,8 +101,15 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati Credentials credentials = config.getServiceAccountFilePath() == null ? null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccountFilePath()); Storage storage = GCPUtils.getStorage(config.getProject(), credentials); - if (storage.get(config.getBucket()) == null) { - GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKey); + try { + if (storage.get(config.getBucket()) == null) { + GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKey); + } + } catch (StorageException e) { + // Add more descriptive error message + throw new RuntimeException( + String.format("Unable to access or create bucket %s. ", config.getBucket()) + + "Ensure you entered the correct bucket path and have permissions for it.", e); } for (Map.Entry argument : argumentCopy.entrySet()) { diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java new file mode 100644 index 0000000000..0bced920c2 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputFormatProvider.java @@ -0,0 +1,250 @@ +package io.cdap.plugin.gcp.gcs.sink; + +import com.google.cloud.storage.Blob; +import com.google.common.annotations.VisibleForTesting; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.etl.api.validation.FormatContext; +import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat; +import io.cdap.plugin.gcp.common.GCPUtils; +import io.cdap.plugin.gcp.gcs.StorageClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * OutputFormatProvider for GCSSink + */ +public class GCSOutputFormatProvider implements ValidatingOutputFormat { + + private static final Logger LOG = LoggerFactory.getLogger(GCSOutputFormatProvider.class); + private static final String DELEGATE_OUTPUTFORMAT_CLASSNAME = "gcssink.delegate.outputformat.classname"; + private static final String OUTPUT_FOLDER = "gcssink.metric.output.folder"; + public static final String RECORD_COUNT_FORMAT = "recordcount.%s"; + private final ValidatingOutputFormat delegate; + + public GCSOutputFormatProvider(ValidatingOutputFormat delegate) { + this.delegate = delegate; + } + + @Override + public void validate(FormatContext context) { + delegate.validate(context); + } + + @Override + public String getOutputFormatClassName() { + return GCSOutputFormat.class.getName(); + } + + @Override + public Map getOutputFormatConfiguration() { + Map outputFormatConfiguration = new HashMap<>(delegate.getOutputFormatConfiguration()); + outputFormatConfiguration.put(DELEGATE_OUTPUTFORMAT_CLASSNAME, delegate.getOutputFormatClassName()); + return outputFormatConfiguration; + } + + /** + * OutputFormat for GCS Sink + */ + public static class GCSOutputFormat extends OutputFormat { + private OutputFormat delegateFormat; + + private OutputFormat getDelegateFormatInstance(Configuration configuration) throws IOException { + if (delegateFormat != null) { + return delegateFormat; + } + + String delegateClassName = configuration.get(DELEGATE_OUTPUTFORMAT_CLASSNAME); + try { + delegateFormat = (OutputFormat) ReflectionUtils + .newInstance(configuration.getClassByName(delegateClassName), configuration); + return delegateFormat; + } catch (ClassNotFoundException e) { + throw new IOException( + String.format("Unable to instantiate output format for class %s", delegateClassName), + e); + } + } + + @Override + public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) throws + IOException, InterruptedException { + RecordWriter originalWriter = getDelegateFormatInstance(taskAttemptContext.getConfiguration()) + .getRecordWriter(taskAttemptContext); + return new GCSRecordWriter(originalWriter); + } + + @Override + public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException { + getDelegateFormatInstance(jobContext.getConfiguration()).checkOutputSpecs(jobContext); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, + InterruptedException { + OutputCommitter delegateCommitter = getDelegateFormatInstance(taskAttemptContext.getConfiguration()) + .getOutputCommitter(taskAttemptContext); + return new GCSOutputCommitter(delegateCommitter); + } + } + + /** + * OutputCommitter for GCS + */ + public static class GCSOutputCommitter extends OutputCommitter { + + private final OutputCommitter delegate; + + public GCSOutputCommitter(OutputCommitter delegate) { + this.delegate = delegate; + } + + @Override + public void setupJob(JobContext jobContext) throws IOException { + delegate.setupJob(jobContext); + } + + @Override + public void cleanupJob(JobContext jobContext) throws IOException { + delegate.cleanupJob(jobContext); + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + delegate.commitJob(jobContext); + } + + @Override + public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { + delegate.abortJob(jobContext, state); + } + + @Override + public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException { + delegate.setupTask(taskAttemptContext); + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException { + return delegate.needsTaskCommit(taskAttemptContext); + } + + @Override + public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { + /*On commit task, there seems to be some inconsistency across different hadoop implementations regarding the path + where output file is stored. For some implementations it appears in the path returned by FileOutputCommitter + getCommittedTaskPath and for some it does not.Before commit, the files appear to be consistently present in path + returned by FileOutputCommitter getTaskAttemptPath. Hence, find the output file from taskAttemptPath and add + metadata before commit happens. After commit, file would have been moved out of the taskAttemptPath. */ + try { + updateMetricMetaData(taskAttemptContext); + } catch (Exception exception) { + LOG.warn("Unable to record metric for task. Metric emitted for the number of affected rows may be incorrect.", + exception); + } + + delegate.commitTask(taskAttemptContext); + } + + private void updateMetricMetaData(TaskAttemptContext taskAttemptContext) throws IOException { + if (!(delegate instanceof FileOutputCommitter)) { + return; + } + + FileOutputCommitter fileOutputCommitter = (FileOutputCommitter) delegate; + Configuration configuration = taskAttemptContext.getConfiguration(); + //Task is not yet committed, so should be available in attempt path + Path taskAttemptPath = fileOutputCommitter.getTaskAttemptPath(taskAttemptContext); + if (configuration == null || taskAttemptPath == null) { + return; + } + + //read the count from configuration + String keyInConfig = String.format(RECORD_COUNT_FORMAT, taskAttemptContext.getTaskAttemptID()); + Map metaData = new HashMap<>(); + metaData.put(GCSBatchSink.RECORD_COUNT, String.valueOf(configuration.getLong(keyInConfig, 0L))); + StorageClient storageClient = getStorageClient(configuration); + //update metadata on the output file present in the directory for this task + Blob blob = storageClient.pickABlob(taskAttemptPath.toString()); + if (blob == null) { + LOG.info("Could not find a file in path %s to apply count metadata.", taskAttemptPath.toString()); + return; + } + storageClient.setMetaData(blob, metaData); + } + + @VisibleForTesting + StorageClient getStorageClient(Configuration configuration) throws IOException { + String project = configuration.get(GCPUtils.FS_GS_PROJECT_ID); + String serviceAccountPath = configuration.get(GCPUtils.CLOUD_JSON_KEYFILE); + return StorageClient.create(project, serviceAccountPath); + } + + @Override + public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException { + delegate.abortTask(taskAttemptContext); + } + + @Override + public boolean isCommitJobRepeatable(JobContext jobContext) throws IOException { + return delegate.isCommitJobRepeatable(jobContext); + } + + @Override + public boolean isRecoverySupported(JobContext jobContext) throws IOException { + return delegate.isRecoverySupported(jobContext); + } + + @Override + public boolean isRecoverySupported() { + return delegate.isRecoverySupported(); + } + + @Override + public void recoverTask(TaskAttemptContext taskContext) throws IOException { + delegate.recoverTask(taskContext); + } + } + + /** + * RecordWriter for GCSSink + */ + public static class GCSRecordWriter extends RecordWriter { + + private final RecordWriter originalWriter; + private long recordCount; + + public GCSRecordWriter(RecordWriter originalWriter) { + this.originalWriter = originalWriter; + } + + @Override + public void write(NullWritable nullWritable, StructuredRecord structuredRecord) throws IOException, + InterruptedException { + originalWriter.write(nullWritable, structuredRecord); + recordCount++; + } + + @Override + public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + originalWriter.close(taskAttemptContext); + //Since the file details are not available here, pass the value on in configuration + taskAttemptContext.getConfiguration() + .setLong(String.format(RECORD_COUNT_FORMAT, taskAttemptContext.getTaskAttemptID()), recordCount); + } + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java index aec773792b..ee2f432956 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java @@ -1,5 +1,5 @@ /* - * Copyright © 2015 Cask Data, Inc. + * Copyright © 2015-2020 Cask Data, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -199,7 +199,8 @@ public void validate(FailureCollector collector) { try { GCSPath.from(path); } catch (IllegalArgumentException e) { - collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_PATH).withStacktrace(e.getStackTrace()); + collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_PATH) + .withStacktrace(e.getStackTrace()); } } if (!containsMacro(NAME_FILE_SYSTEM_PROPERTIES)) { diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkTest.java index 83db7e88d1..5666b56cb4 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkTest.java @@ -64,7 +64,8 @@ public void testBigQuerySinkConfig() { Schema.Field.of("timestamp", Schema.nullableOf(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)))); - BigQuerySinkConfig config = new BigQuerySinkConfig("44", "ds", "tb", "bucket", schema.toString()); + BigQuerySinkConfig config = new BigQuerySinkConfig("44", "ds", "tb", "bucket", schema.toString(), + "INTEGER", 0L, 100L, 10L); MockFailureCollector collector = new MockFailureCollector("bqsink"); config.validate(collector); Assert.assertEquals(0, collector.getValidationFailures().size()); @@ -75,7 +76,8 @@ public void testBigQuerySinkInvalidConfig() { Schema invalidSchema = Schema.recordOf("record", Schema.Field.of("id", Schema.of(Schema.Type.LONG))); - BigQuerySinkConfig config = new BigQuerySinkConfig("reference!!", "ds", "tb", "buck3t$$", invalidSchema.toString()); + BigQuerySinkConfig config = new BigQuerySinkConfig("reference!!", "ds", "tb", "buck3t$$", invalidSchema.toString(), + "INTEGER", 0L, 100L, 10L); MockFailureCollector collector = new MockFailureCollector("bqsink"); config.validate(collector); List failures = collector.getValidationFailures(); @@ -129,7 +131,8 @@ private static BigQuerySink getSinkToTest(Job mockJob) throws NoSuchFieldExcepti Schema.Field.of("id", Schema.of(Schema.Type.LONG)), Schema.Field.of("name", Schema.of(Schema.Type.STRING))); BigQuerySinkConfig config = - new BigQuerySinkConfig("testmetric", "ds", "tb", "bkt", schema.toString()); + new BigQuerySinkConfig("testmetric", "ds", "tb", "bkt", schema.toString(), + null, null, null, null); BigQuery bigQueryMock = mock(BigQuery.class); BigQuerySink sink = new BigQuerySink(config); setBigQuery(sink, bigQueryMock); @@ -204,7 +207,8 @@ private BigQuerySink getValidationTestSink(boolean truncateTable) throws NoSuchF Schema.Field.of("id", Schema.of(Schema.Type.LONG)), Schema.Field.of("name", Schema.of(Schema.Type.STRING))); BigQuerySinkConfig config = - new BigQuerySinkConfig("testmetric", "ds", "tb", "bkt", schema.toString()); + new BigQuerySinkConfig("testmetric", "ds", "tb", "bkt", schema.toString(), + "INTEGER", 0L, 100L, 10L); FieldSetter.setField(config, AbstractBigQuerySinkConfig.class.getDeclaredField("truncateTable"), truncateTable); BigQuery bigQueryMock = mock(BigQuery.class); diff --git a/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSinkTest.java b/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSinkTest.java new file mode 100644 index 0000000000..8c051ad551 --- /dev/null +++ b/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSinkTest.java @@ -0,0 +1,63 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.gcs.sink; + +import io.cdap.cdap.etl.mock.validation.MockFailureCollector; +import io.cdap.plugin.gcp.common.GCPReferenceSinkConfig; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.internal.util.reflection.FieldSetter; + +public class GCSBatchSinkTest { + + @Test + public void testValidFSProperties() throws NoSuchFieldException { + GCSBatchSink.GCSBatchSinkConfig config = getConfig(null); + MockFailureCollector collector = new MockFailureCollector("gcssink"); + config.validate(collector); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + private GCSBatchSink.GCSBatchSinkConfig getConfig(String fileSystemProperties) throws NoSuchFieldException { + GCSBatchSink.GCSBatchSinkConfig gcsBatchSinkConfig = new GCSBatchSink.GCSBatchSinkConfig(); + FieldSetter + .setField(gcsBatchSinkConfig, GCSBatchSink.GCSBatchSinkConfig.class.getDeclaredField("path"), "gs://test"); + FieldSetter.setField(gcsBatchSinkConfig, GCSBatchSink.GCSBatchSinkConfig.class.getDeclaredField("format"), "avro"); + FieldSetter + .setField(gcsBatchSinkConfig, GCPReferenceSinkConfig.class.getDeclaredField("referenceName"), "testref"); + FieldSetter + .setField(gcsBatchSinkConfig, GCSBatchSink.GCSBatchSinkConfig.class.getDeclaredField("fileSystemProperties"), + fileSystemProperties); + return gcsBatchSinkConfig; + } + + @Test + public void testValidFSProperties1() throws NoSuchFieldException { + GCSBatchSink.GCSBatchSinkConfig config = getConfig("{\"key\":\"val\"}"); + MockFailureCollector collector = new MockFailureCollector("gcssink"); + config.validate(collector); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + + @Test + public void testInvalidFSProperties() throws NoSuchFieldException { + GCSBatchSink.GCSBatchSinkConfig config = getConfig("{\"key\":}"); + MockFailureCollector collector = new MockFailureCollector("gcssink"); + config.validate(collector); + Assert.assertEquals(1, collector.getValidationFailures().size()); + } +} diff --git a/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputformatProviderTest.java b/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputformatProviderTest.java new file mode 100644 index 0000000000..63405858a0 --- /dev/null +++ b/src/test/java/io/cdap/plugin/gcp/gcs/sink/GCSOutputformatProviderTest.java @@ -0,0 +1,100 @@ +package io.cdap.plugin.gcp.gcs.sink; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.plugin.gcp.gcs.StorageClient; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.net.URISyntaxException; + +/** + * Tests for GCSOutputformatProvider + */ +public class GCSOutputformatProviderTest { + + @Test + public void testRecordWriter() throws IOException, InterruptedException { + RecordWriter mockWriter = Mockito.mock(RecordWriter.class); + GCSOutputFormatProvider.GCSRecordWriter recordWriterToTest = new GCSOutputFormatProvider.GCSRecordWriter( + mockWriter); + NullWritable mockWritable = Mockito.mock(NullWritable.class); + StructuredRecord mockRecord = Mockito.mock(StructuredRecord.class); + for (int i = 0; i < 5; i++) { + recordWriterToTest.write(mockWritable, mockRecord); + } + TaskAttemptContext mockContext = Mockito.mock(TaskAttemptContext.class); + Configuration configuration = new Configuration(); + Mockito.when(mockContext.getConfiguration()).thenReturn(configuration); + recordWriterToTest.close(mockContext); + //Verify that the delegate calls are being done as expected + Mockito.verify(mockWriter, Mockito.times(5)).write(mockWritable, mockRecord); + //verify that count is being recorded as expected + Assert.assertTrue(configuration.getLong( + String.format(GCSOutputFormatProvider.RECORD_COUNT_FORMAT, mockContext.getTaskAttemptID()), 0) == 5); + } + + @Test + public void testGCSOutputCommitter() throws IOException, URISyntaxException { + FileOutputCommitter fileOutputCommitter = Mockito.mock(FileOutputCommitter.class); + GCSOutputFormatProvider.GCSOutputCommitter committer = new GCSOutputFormatProvider.GCSOutputCommitter( + fileOutputCommitter); + GCSOutputFormatProvider.GCSOutputCommitter committerToTest = Mockito.spy(committer); + JobContext mockJobContext = Mockito.mock(JobContext.class); + JobStatus.State mockJobState = JobStatus.State.SUCCEEDED; + TaskAttemptContext mockContext = Mockito.mock(TaskAttemptContext.class); + //test all the delegation + committerToTest.abortJob(mockJobContext, mockJobState); + Mockito.verify(fileOutputCommitter, Mockito.times(1)).abortJob(mockJobContext, mockJobState); + + committerToTest.abortTask(mockContext); + Mockito.verify(fileOutputCommitter, Mockito.times(1)).abortTask(mockContext); + + committerToTest.cleanupJob(mockJobContext); + Mockito.verify(fileOutputCommitter, Mockito.times(1)).cleanupJob(mockJobContext); + + committerToTest.commitJob(mockJobContext); + Mockito.verify(fileOutputCommitter, Mockito.times(1)).commitJob(mockJobContext); + + committerToTest.isCommitJobRepeatable(mockJobContext); + Mockito.verify(fileOutputCommitter, Mockito.times(1)).isCommitJobRepeatable(mockJobContext); + + committerToTest.isRecoverySupported(); + Mockito.verify(fileOutputCommitter, Mockito.times(1)).isRecoverySupported(); + + committerToTest.isRecoverySupported(mockJobContext); + Mockito.verify(fileOutputCommitter, Mockito.times(1)).isRecoverySupported(mockJobContext); + + committerToTest.needsTaskCommit(mockContext); + Mockito.verify(fileOutputCommitter, Mockito.times(1)).needsTaskCommit(mockContext); + + committerToTest.recoverTask(mockContext); + Mockito.verify(fileOutputCommitter, Mockito.times(1)).recoverTask(mockContext); + + committerToTest.setupJob(mockJobContext); + Mockito.verify(fileOutputCommitter, Mockito.times(1)).setupJob(mockJobContext); + + committerToTest.setupTask(mockContext); + Mockito.verify(fileOutputCommitter, Mockito.times(1)).setupTask(mockContext); + + Configuration configuration = new Configuration(); + Mockito.when(mockContext.getConfiguration()).thenReturn(configuration); + Path committedTaskPathMock = Mockito.mock(Path.class); + Mockito.when(fileOutputCommitter.getCommittedTaskPath(mockContext)).thenReturn(committedTaskPathMock); + Mockito.when(committedTaskPathMock.toString()).thenReturn("gs://test"); + StorageClient mockStorage = Mockito.mock(StorageClient.class); + Mockito.when(committerToTest.getStorageClient(configuration)).thenReturn(mockStorage); + + committerToTest.commitTask(mockContext); + Mockito.verify(fileOutputCommitter, Mockito.times(1)).commitTask(mockContext); + } +} diff --git a/widgets/BigQueryMultiTable-batchsink.json b/widgets/BigQueryMultiTable-batchsink.json index 0137b31f7e..00fcff08d5 100644 --- a/widgets/BigQueryMultiTable-batchsink.json +++ b/widgets/BigQueryMultiTable-batchsink.json @@ -23,6 +23,14 @@ "default": "auto-detect" } }, + { + "widget-type": "textbox", + "label": "Dataset Project Id", + "name": "datasetProject", + "widget-attributes": { + "placeholder": "The project in which the dataset is located/should be created. Defaults to the project specified in the Project Id property." + } + }, { "widget-type": "textbox", "label": "Dataset", diff --git a/widgets/BigQueryTable-batchsink.json b/widgets/BigQueryTable-batchsink.json index af48f9cbab..cfc52651af 100644 --- a/widgets/BigQueryTable-batchsink.json +++ b/widgets/BigQueryTable-batchsink.json @@ -2,7 +2,7 @@ "metadata": { "spec-version": "1.5" }, - "display-name" : "BigQuery", + "display-name": "BigQuery", "configuration-groups": [ { "label": "Basic", @@ -11,7 +11,7 @@ "widget-type": "textbox", "label": "Reference Name", "name": "referenceName", - "widget-attributes" : { + "widget-attributes": { "placeholder": "Name used to identify this sink for lineage" } }, @@ -19,15 +19,23 @@ "widget-type": "textbox", "label": "Project ID", "name": "project", - "widget-attributes" : { + "widget-attributes": { "default": "auto-detect" } }, + { + "widget-type": "textbox", + "label": "Dataset Project Id", + "name": "datasetProject", + "widget-attributes": { + "placeholder": "The project in which the dataset is located/should be created. Defaults to the project specified in the Project Id property." + } + }, { "widget-type": "textbox", "label": "Dataset", "name": "dataset", - "widget-attributes" : { + "widget-attributes": { "placeholder": "Dataset the table belongs to" } }, @@ -35,7 +43,7 @@ "widget-type": "textbox", "label": "Table", "name": "table", - "widget-attributes" : { + "widget-attributes": { "placeholder": "Table to write to" } }, @@ -43,7 +51,7 @@ "widget-type": "textbox", "label": "Temporary Bucket Name", "name": "bucket", - "widget-attributes" : { + "widget-attributes": { "placeholder": "Google Cloud Storage bucket for temporary data" } }, @@ -51,20 +59,20 @@ "widget-type": "textbox", "label": "GCS Upload Request Chunk Size", "name": "gcsChunkSize", - "widget-attributes" : { + "widget-attributes": { "placeholder": "GCS upload request chunk size in bytes" } } ] }, { - "label" : "Credentials", - "properties" : [ + "label": "Credentials", + "properties": [ { "widget-type": "textbox", "label": "Service Account File Path", "name": "serviceFilePath", - "widget-attributes" : { + "widget-attributes": { "default": "auto-detect" } } @@ -75,8 +83,8 @@ "properties": [ { "widget-type": "radio-group", - "name" : "operation", - "label" : "Operation", + "name": "operation", + "label": "Operation", "widget-attributes": { "layout": "inline", "default": "insert", @@ -100,7 +108,7 @@ "name": "relationTableKey", "widget-type": "csv", "label": "Table Key", - "widget-attributes" : {} + "widget-attributes": {} }, { "name": "dedupeBy", @@ -109,14 +117,17 @@ "widget-attributes": { "delimiter": ",", "kv-delimiter": " ", - "dropdownOptions": [ "ASC", "DESC"] + "dropdownOptions": [ + "ASC", + "DESC" + ] } }, { "widget-type": "textbox", "label": "Partition Filter", "name": "partitionFilter", - "widget-attributes" : { + "widget-attributes": { "placeholder": "Filter that can be used for partition elimination" } }, @@ -161,7 +172,7 @@ "widget-type": "textbox", "label": "Location", "name": "location", - "widget-attributes" : { + "widget-attributes": { "default": "US" } }, @@ -181,11 +192,52 @@ "default": "false" } }, + { + "widget-type": "radio-group", + "label": "Partitioning type", + "name": "partitioningType", + "widget-attributes": { + "layout": "inline", + "default": "TIME", + "options": [ + { + "id": "TIME", + "label": "Time" + }, + { + "id": "INTEGER", + "label": "Integer" + }, + { + "id": "NONE", + "label": "None" + } + ] + } + }, + { + "widget-type": "Number", + "label": "Range Start (inclusive)", + "name": "rangeStart", + "widget-attributes": { + "default": "0" + } + }, + { + "widget-type": "Number", + "label": "Range End (exclusive)", + "name": "rangeEnd" + }, + { + "widget-type": "Number", + "label": "Range Interval", + "name": "rangeInterval" + }, { "widget-type": "textbox", "label": "Partition Field", "name": "partitionByField", - "widget-attributes" : { + "widget-attributes": { "placeholder": "Table field for partitioning" } }, @@ -209,7 +261,7 @@ "name": "clusteringOrder", "widget-type": "csv", "label": "Clustering Order", - "widget-attributes" : {} + "widget-attributes": {} } ] } @@ -233,6 +285,52 @@ } } ], + "filters": [ + { + "name": "PartitioningIntegerFieldsFilter", + "condition": { + "expression": "partitioningType == 'INTEGER'" + }, + "show": [ + { + "type": "property", + "name": "rangeStart" + }, + { + "type": "property", + "name": "rangeEnd" + }, + { + "type": "property", + "name": "rangeInterval" + }, + { + "type": "property", + "name": "clientAccessToken" + } + ] + }, + { + "name": "PartitionFieldFilter", + "condition": { + "expression": "createPartitionedTable == true || partitioningType == 'INTEGER' || partitioningType == 'TIME'" + }, + "show": [ + { + "type": "property", + "name": "partitionByField" + }, + { + "type": "property", + "name": "partitionFilterRequired" + }, + { + "type": "property", + "name": "clusteringOrder" + } + ] + } + ], "jump-config": { "datasets": [ { diff --git a/widgets/GCS-batchsink.json b/widgets/GCS-batchsink.json index 53180affc6..06062993b7 100644 --- a/widgets/GCS-batchsink.json +++ b/widgets/GCS-batchsink.json @@ -92,6 +92,21 @@ } } ] + }, + { + "label": "Advanced", + "properties": [ + { + "widget-type": "textbox", + "label": "Output File Prefix", + "name": "outputFileNameBase" + }, + { + "widget-type": "json-editor", + "label": "File System Properties", + "name": "fileSystemProperties" + } + ] } ], "outputs": [ diff --git a/widgets/GCSArgumentSetter-action.json b/widgets/GCSArgumentSetter-action.json new file mode 100644 index 0000000000..938f171619 --- /dev/null +++ b/widgets/GCSArgumentSetter-action.json @@ -0,0 +1,92 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "display-name": "GCS Argument Setter", + "configuration-groups": [ + { + "label": "Basic", + "properties": [ + { + "widget-type": "textbox", + "label": "Reference Name", + "name": "referenceName", + "widget-attributes": { + "placeholder": "Name used to identify this source for lineage" + } + }, + { + "widget-type": "textbox", + "label": "Project ID", + "name": "project", + "widget-attributes": { + "default": "auto-detect" + } + }, + { + "widget-type": "textbox", + "label": "Path", + "name": "path", + "widget-attributes": { + "placeholder": "gs:///path/to/configuration.json" + } + }, + { + "label": "Provide Service Account as JSON Content", + "name": "serviceAccountType", + "widget-type": "toggle", + "widget-attributes": { + "on": { + "value": "JSON", + "label": "Yes" + }, + "off": { + "value": "filePath", + "label": "No" + }, + "default": "filePath" + } + }, + { + "widget-type": "textbox", + "label": "Service Account File Path", + "name": "serviceFilePath", + "widget-attributes": { + "default": "auto-detect" + } + }, + { + "widget-type": "textbox", + "label": "Service Account JSON", + "name": "serviceAccountJSON" + } + ] + } + ], + "filters": [ + { + "name": "ServiceAuthenticationTypeFilePath", + "condition": { + "expression": "serviceAccountType == 'filePath'" + }, + "show": [ + { + "type": "property", + "name": "serviceFilePath" + } + ] + }, + { + "name": "ServiceAuthenticationTypeJSON", + "condition": { + "expression": "serviceAccountType == 'JSON'" + }, + "show": [ + { + "type": "property", + "name": "serviceAccountJSON" + } + ] + } + ] +} diff --git a/widgets/GCSMultiFiles-batchsink.json b/widgets/GCSMultiFiles-batchsink.json index e14420c2eb..5ac4a2765f 100644 --- a/widgets/GCSMultiFiles-batchsink.json +++ b/widgets/GCSMultiFiles-batchsink.json @@ -103,6 +103,16 @@ "widget-attributes": { "default": "tablename" } + }, + { + "widget-type": "hidden", + "label": "Output File Prefix", + "name": "outputFileNameBase" + }, + { + "widget-type": "hidden", + "label": "File System Properties", + "name": "fileSystemProperties" } ] },