consumer) {
- Bucket sourceBucket = storage.get(sourcePath.getBucket());
+ Bucket sourceBucket = null;
+ try {
+ sourceBucket = storage.get(sourcePath.getBucket());
+ } catch (StorageException e) {
+ // Add more descriptive error message
+ throw new RuntimeException(
+ String.format("Unable to access source bucket %s. ", sourcePath.getBucket())
+ + "Ensure you entered the correct bucket path.", e);
+ }
if (sourceBucket == null) {
throw new IllegalArgumentException(
String.format("Source bucket '%s' does not exist.", sourcePath.getBucket()));
}
- Bucket destBucket = storage.get(destPath.getBucket());
+ Bucket destBucket = null;
+ try {
+ destBucket = storage.get(destPath.getBucket());
+ } catch (StorageException e) {
+ // Add more descriptive error message
+ throw new RuntimeException(
+ String.format("Unable to access destination bucket %s. ", destPath.getBucket())
+ + "Ensure you entered the correct bucket path.", e);
+ }
if (destBucket == null) {
throw new IllegalArgumentException(
String.format("Destination bucket '%s' does not exist. Please create it first.", destPath.getBucket()));
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetter.java b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetter.java
new file mode 100644
index 0000000000..0f8f0bf500
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetter.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright © 2020 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.gcp.gcs.actions;
+
+import com.google.auth.oauth2.ServiceAccountCredentials;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import com.google.common.base.Joiner;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSyntaxException;
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.cdap.etl.api.PipelineConfigurer;
+import io.cdap.cdap.etl.api.StageConfigurer;
+import io.cdap.cdap.etl.api.action.Action;
+import io.cdap.cdap.etl.api.action.ActionContext;
+import io.cdap.plugin.gcp.common.GCPUtils;
+import io.cdap.plugin.gcp.gcs.GCSPath;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class GCSArgumentSetter get json file configuration from GCS.
+ *
+ * The plugin provides the ability to map json properties as pipeline arguments name and columns
+ * values as pipeline arguments
+ * {
+ * "arguments" : [
+ * { "name" : "input.path", "type" : "string", "value" : "/data/sunny_feeds/master"},
+ * { "name" : "parse.schema, "type" : "schema", "value" : [
+ * { "name" : "fname", "type" : "string", "nullable" : true },
+ * { "name" : "age", "type" : "int", "nullable" : true},
+ * { "name" : "salary", "type" : "float", "nullable" : false}
+ * ]},
+ * { "name" : "directives", "type" : "array", "value" : [
+ * "parse-as-json body",
+ * "columns-replace s/body_//g",
+ * "keep f1,f2"
+ * ]}
+ * ]
+ * }
+ *
+ * In case when `nullable` is not specified inside field json, then it defaults to true.
+ *
+ */
+@Plugin(type = Action.PLUGIN_TYPE)
+@Name(GCSArgumentSetter.NAME)
+@Description("Argument setter for dynamically configuring pipeline from GCS")
+public final class GCSArgumentSetter extends Action {
+
+ public static final String NAME = "GCSArgumentSetter";
+ private GCSArgumentSetterConfig config;
+
+ @Override
+ public void configurePipeline(PipelineConfigurer configurer) {
+ super.configurePipeline(configurer);
+ StageConfigurer stageConfigurer = configurer.getStageConfigurer();
+ FailureCollector collector = stageConfigurer.getFailureCollector();
+ config.validate(collector);
+ }
+
+ @Override
+ public void run(ActionContext context) throws Exception {
+ config.validateProperties(context.getFailureCollector());
+ String fileContent = GCSArgumentSetter.getContent(config);
+
+ try {
+ Configuration configuration =
+ new GsonBuilder().create().fromJson(fileContent, Configuration.class);
+ for (Argument argument : configuration.getArguments()) {
+ String name = argument.getName();
+ String value = argument.getValue();
+ if (value != null) {
+ context.getArguments().set(name, value);
+ } else {
+ throw new RuntimeException(
+ "Configuration '" + name + "' is null. Cannot set argument to null.");
+ }
+ }
+ } catch (JsonSyntaxException e) {
+ throw new RuntimeException(
+ String.format(
+ "Could not parse response from '%s': %s", config.getPath(), e.getMessage()));
+ }
+ }
+
+ private static Storage getStorage(GCSArgumentSetterConfig config) throws IOException {
+ return StorageOptions.newBuilder()
+ .setProjectId(config.getProject())
+ .setCredentials(getCredentials(config))
+ .build()
+ .getService();
+ }
+
+ public static ServiceAccountCredentials getCredentials(GCSArgumentSetterConfig config)
+ throws IOException {
+ ServiceAccountCredentials credentials = null;
+ switch (config.getServiceAccountType()) {
+ case FILE_PATH:
+ credentials = GCPUtils.loadServiceAccountCredentials(config.getServiceAccountFilePath());
+ break;
+ case JSON:
+ InputStream jsonInputStream =
+ new ByteArrayInputStream(config.getServiceAccountJSON().getBytes());
+ credentials = ServiceAccountCredentials.fromStream(jsonInputStream);
+ break;
+ }
+ return credentials;
+ }
+
+ public static String getContent(GCSArgumentSetterConfig config) throws IOException {
+ Storage storage = getStorage(config);
+ GCSPath path = config.getPath();
+ Blob blob = storage.get(path.getBucket(), path.getName());
+ return new String(blob.getContent());
+ }
+
+ private static final class Configuration {
+
+ private List arguments;
+
+ public List getArguments() {
+ return arguments;
+ }
+ }
+
+ private static final class Argument {
+
+ private String name;
+ private String type;
+ private JsonElement value;
+
+ Argument() {
+ type = "string";
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getValue() {
+ if (value == null) {
+ throw new IllegalArgumentException("Null Argument value for name '" + name + "'");
+ }
+ if (type.equalsIgnoreCase("schema")) {
+ return createSchema(value).toString();
+ } else if (type.equalsIgnoreCase("int")) {
+ return Integer.toString(value.getAsInt());
+ } else if (type.equalsIgnoreCase("float")) {
+ return Float.toString(value.getAsFloat());
+ } else if (type.equalsIgnoreCase("double")) {
+ return Double.toString(value.getAsDouble());
+ } else if (type.equalsIgnoreCase("short")) {
+ return Short.toString(value.getAsShort());
+ } else if (type.equalsIgnoreCase("string")) {
+ return value.getAsString();
+ } else if (type.equalsIgnoreCase("char")) {
+ return Character.toString(value.getAsCharacter());
+ } else if (type.equalsIgnoreCase("array")) {
+ List values = new ArrayList<>();
+ for (JsonElement v : value.getAsJsonArray()) {
+ values.add(v.getAsString());
+ }
+ return Joiner.on("\n").join(values);
+ } else if (type.equalsIgnoreCase("map")) {
+ List values = new ArrayList<>();
+ for (Map.Entry entry : value.getAsJsonObject().entrySet()) {
+ values.add(String.format("%s=%s", entry.getKey(), entry.getValue().getAsString()));
+ }
+ return Joiner.on(";").join(values);
+ }
+
+ throw new IllegalArgumentException("Invalid argument value '" + value + "'");
+ }
+
+ private Schema createSchema(JsonElement array) {
+ List fields = new ArrayList<>();
+ for (JsonElement field : array.getAsJsonArray()) {
+ Schema.Type type = Schema.Type.STRING;
+
+ JsonObject object = field.getAsJsonObject();
+ String fieldType = object.get("type").getAsString().toLowerCase();
+
+ boolean isNullable = true;
+ if (object.get("nullable") != null) {
+ isNullable = object.get("nullable").getAsBoolean();
+ }
+
+ String name = object.get("name").getAsString();
+
+ if (fieldType.equals("double")) {
+ type = Schema.Type.DOUBLE;
+ } else if (fieldType.equals("float")) {
+ type = Schema.Type.FLOAT;
+ } else if (fieldType.equals("long")) {
+ type = Schema.Type.LONG;
+ } else if (fieldType.equals("int")) {
+ type = Schema.Type.INT;
+ } else if (fieldType.equals("short")) {
+ type = Schema.Type.INT;
+ } else if (fieldType.equals("string")) {
+ type = Schema.Type.STRING;
+ }
+
+ Schema schema;
+ if (isNullable) {
+ schema = Schema.nullableOf(Schema.of(type));
+ } else {
+ schema = Schema.of(type);
+ }
+ Schema.Field fld = Schema.Field.of(name, schema);
+ fields.add(fld);
+ }
+
+ return Schema.recordOf("record", fields);
+ }
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetterConfig.java b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetterConfig.java
new file mode 100644
index 0000000000..573b9c75bf
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetterConfig.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright © 2020 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.gcp.gcs.actions;
+
+import com.google.common.base.Strings;
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Macro;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.plugin.gcp.common.GCPConfig;
+import io.cdap.plugin.gcp.gcs.GCSPath;
+import javax.annotation.Nullable;
+
+/** Holds configuration required for configuring {@link GCSArgumentSetter}. */
+public final class GCSArgumentSetterConfig extends GCPConfig {
+
+ public static final String NAME_PATH = "path";
+ public static final String NAME_SERVICE_ACCOUNT_TYPE = "serviceAccountType";
+ public static final String NAME_SERVICE_ACCOUNT_JSON = "serviceAccountJSON";
+
+ @Name(NAME_PATH)
+ @Macro
+ @Description("GCS Path to the file containing the arguments")
+ private String path;
+
+ @Name(NAME_SERVICE_ACCOUNT_TYPE)
+ @Macro
+ @Nullable
+ @Description(
+ "Provide service account as JSON. When it is set to 'Yes', "
+ + "the content of service account key needs to be copied, whereas when it is set to 'No' "
+ + "the service Account file path needs to be specified. The default value is 'No'")
+ private String serviceAccountType;
+
+ @Name(NAME_SERVICE_ACCOUNT_JSON)
+ @Macro
+ @Nullable
+ @Description("The content of the service account.")
+ private String serviceAccountJSON;
+
+ public void validate(FailureCollector collector) {
+ validateProperties(collector);
+
+ if (canConnect()) {
+ try {
+ GCSArgumentSetter.getContent(this);
+ } catch (Exception e) {
+ collector.addFailure("Can not get content from GCP!", null);
+ }
+ }
+ collector.getOrThrowException();
+ }
+
+ public void validateProperties(FailureCollector collector) {
+ if (!containsMacro(NAME_PATH)) {
+ try {
+ getPath();
+ } catch (IllegalArgumentException e) {
+ collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_PATH);
+ }
+ }
+
+ if (getServiceAccountType() == ServiceAccountType.JSON
+ && !containsMacro(NAME_SERVICE_ACCOUNT_JSON)
+ && Strings.isNullOrEmpty(getServiceAccountJSON())) {
+ collector
+ .addFailure("Required property 'Service Account JSON' has no value.", "")
+ .withConfigProperty(NAME_SERVICE_ACCOUNT_JSON);
+ }
+ }
+
+ private boolean canConnect() {
+ boolean canConnect =
+ !containsMacro(NAME_PATH)
+ && !(containsMacro(NAME_PROJECT) || AUTO_DETECT.equals(project))
+ && !(containsMacro(NAME_SERVICE_ACCOUNT_TYPE));
+
+ if (!canConnect) {
+ return false;
+ }
+
+ ServiceAccountType serviceAccountType = getServiceAccountType();
+
+ if (serviceAccountType == ServiceAccountType.FILE_PATH) {
+ return !containsMacro(NAME_SERVICE_ACCOUNT_FILE_PATH)
+ && !Strings.isNullOrEmpty(getServiceAccountFilePath());
+ }
+ return !containsMacro(NAME_SERVICE_ACCOUNT_JSON)
+ && !Strings.isNullOrEmpty(getServiceAccountJSON());
+ }
+
+ public GCSPath getPath() {
+ return GCSPath.from(path);
+ }
+
+ public ServiceAccountType getServiceAccountType() {
+ return "JSON".equalsIgnoreCase(serviceAccountType)
+ ? ServiceAccountType.JSON
+ : ServiceAccountType.FILE_PATH;
+ }
+
+ public String getServiceAccountJSON() {
+ return serviceAccountJSON;
+ }
+
+ /** The type of service account. */
+ public enum ServiceAccountType {
+ FILE_PATH,
+ JSON;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketCreate.java b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketCreate.java
index b4894db306..d645544400 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketCreate.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketCreate.java
@@ -1,5 +1,5 @@
/*
- * Copyright © 2015 Cask Data, Inc.
+ * Copyright © 2015-2020 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -17,7 +17,9 @@
package io.cdap.plugin.gcp.gcs.actions;
import com.google.auth.Credentials;
+import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
@@ -100,7 +102,16 @@ public void run(ActionContext context) throws Exception {
}
// create the gcs buckets if not exist
- if (storage.get(gcsPath.getBucket()) == null) {
+ Bucket bucket = null;
+ try {
+ bucket = storage.get(gcsPath.getBucket());
+ } catch (StorageException e) {
+ // Add more descriptive error message
+ throw new RuntimeException(
+ String.format("Unable to access or create bucket %s. ", gcsPath.getBucket())
+ + "Ensure you entered the correct bucket path and have permissions for it.", e);
+ }
+ if (bucket == null) {
GCPUtils.createBucket(storage, gcsPath.getBucket(), config.location,
context.getArguments().get(GCPUtils.CMEK_KEY));
undoBucket.add(bucketPath);
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketDelete.java b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketDelete.java
index 412b983b19..72d51f0421 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketDelete.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketDelete.java
@@ -1,5 +1,5 @@
/*
- * Copyright © 2015 Cask Data, Inc.
+ * Copyright © 2015-2020 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -16,6 +16,9 @@
package io.cdap.plugin.gcp.gcs.actions;
+import com.google.auth.Credentials;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
@@ -25,6 +28,7 @@
import io.cdap.cdap.etl.api.action.Action;
import io.cdap.cdap.etl.api.action.ActionContext;
import io.cdap.plugin.gcp.common.GCPConfig;
+import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.GCSPath;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -61,6 +65,8 @@ public void run(ActionContext context) throws Exception {
Configuration configuration = new Configuration();
String serviceAccountFilePath = config.getServiceAccountFilePath();
+ Credentials credentials = serviceAccountFilePath == null ?
+ null : GCPUtils.loadServiceAccountCredentials(serviceAccountFilePath);
if (serviceAccountFilePath != null) {
configuration.set("google.cloud.auth.service.account.json.keyfile", serviceAccountFilePath);
}
@@ -74,8 +80,19 @@ public void run(ActionContext context) throws Exception {
configuration.setBoolean("fs.gs.impl.disable.cache", true);
List gcsPaths = new ArrayList<>();
+ Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
for (String path : config.getPaths()) {
- gcsPaths.add(new Path(GCSPath.from(path).getUri()));
+ GCSPath gcsPath = GCSPath.from(path);
+ // Check if the bucket is accessible
+ try {
+ storage.get(gcsPath.getBucket());
+ } catch (StorageException e) {
+ // Add more descriptive error message
+ throw new RuntimeException(
+ String.format("Unable to access or create bucket %s. ", gcsPath.getBucket())
+ + "Ensure you entered the correct bucket path and have permissions for it.", e);
+ }
+ gcsPaths.add(new Path(gcsPath.getUri()));
}
FileSystem fs;
diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java
index b194ef272b..f11b3ddec7 100644
--- a/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java
+++ b/src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java
@@ -1,5 +1,5 @@
/*
- * Copyright © 2015-2016 Cask Data, Inc.
+ * Copyright © 2015-2020 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -17,8 +17,13 @@
package io.cdap.plugin.gcp.gcs.sink;
import com.google.auth.Credentials;
+import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageException;
import com.google.common.base.Strings;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.reflect.TypeToken;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
@@ -26,8 +31,10 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
+import io.cdap.cdap.etl.api.StageMetrics;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
+import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.format.FileFormat;
import io.cdap.plugin.format.plugin.AbstractFileSink;
@@ -35,9 +42,13 @@
import io.cdap.plugin.gcp.common.GCPReferenceSinkConfig;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.GCSPath;
+import io.cdap.plugin.gcp.gcs.StorageClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -50,7 +61,16 @@
@Name("GCS")
@Description("Writes records to one or more files in a directory on Google Cloud Storage.")
public class GCSBatchSink extends AbstractFileSink {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GCSBatchSink.class);
+ public static final String RECORD_COUNT = "recordcount";
+ private static final String RECORDS_UPDATED_METRIC = "records.updated";
+ public static final String AVRO_NAMED_OUTPUT = "avro.mo.config.namedOutput";
+ public static final String COMMON_NAMED_OUTPUT = "mapreduce.output.basename";
+
private final GCSBatchSinkConfig config;
+ private String outputPath;
+
public GCSBatchSink(GCSBatchSinkConfig config) {
super(config);
@@ -62,6 +82,18 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);
}
+ @Override
+ public ValidatingOutputFormat getValidatingOutputFormat(PipelineConfigurer pipelineConfigurer) {
+ ValidatingOutputFormat delegate = super.getValidatingOutputFormat(pipelineConfigurer);
+ return new GCSOutputFormatProvider(delegate);
+ }
+
+ @Override
+ public ValidatingOutputFormat getOutputFormatForRun(BatchSinkContext context) throws InstantiationException {
+ ValidatingOutputFormat outputFormatForRun = super.getOutputFormatForRun(context);
+ return new GCSOutputFormatProvider(outputFormatForRun);
+ }
+
@Override
public void prepareRun(BatchSinkContext context) throws Exception {
super.prepareRun(context);
@@ -69,14 +101,37 @@ public void prepareRun(BatchSinkContext context) throws Exception {
Credentials credentials = config.getServiceAccountFilePath() == null ?
null : GCPUtils.loadServiceAccountCredentials(config.getServiceAccountFilePath());
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
- if (storage.get(config.getBucket()) == null) {
+ Bucket bucket;
+ try {
+ bucket = storage.get(config.getBucket());
+ } catch (StorageException e) {
+ throw new RuntimeException(
+ String.format("Unable to access or create bucket %s. ", config.getBucket())
+ + "Ensure you entered the correct bucket path and have permissions for it.", e);
+ }
+ if (bucket == null) {
GCPUtils.createBucket(storage, config.getBucket(), config.getLocation(), cmekKey);
}
}
@Override
protected Map getFileSystemProperties(BatchSinkContext context) {
- return GCPUtils.getFileSystemProperties(config, config.getPath(), new HashMap<>());
+ Map properties = GCPUtils.getFileSystemProperties(config, config.getPath(), new HashMap<>());
+ properties.putAll(config.getFileSystemProperties());
+ String outputFileBaseName = config.getOutputFileNameBase();
+ if (outputFileBaseName == null || outputFileBaseName.isEmpty()) {
+ return properties;
+ }
+
+ properties.put(AVRO_NAMED_OUTPUT, outputFileBaseName);
+ properties.put(COMMON_NAMED_OUTPUT, outputFileBaseName);
+ return properties;
+ }
+
+ @Override
+ protected String getOutputDir(long logicalStartTime) {
+ this.outputPath = super.getOutputDir(logicalStartTime);
+ return this.outputPath;
}
@Override
@@ -84,6 +139,88 @@ protected void recordLineage(LineageRecorder lineageRecorder, List outpu
lineageRecorder.recordWrite("Write", "Wrote to Google Cloud Storage.", outputFields);
}
+ @Override
+ public void onRunFinish(boolean succeeded, BatchSinkContext context) {
+ super.onRunFinish(succeeded, context);
+ emitMetrics(succeeded, context);
+ }
+
+ private void emitMetrics(boolean succeeded, BatchSinkContext context) {
+ if (!succeeded) {
+ return;
+ }
+
+ try {
+ StorageClient storageClient = StorageClient.create(config.getProject(), config.getServiceAccountFilePath());
+ storageClient.mapMetaDataForAllBlobs(getPrefixPath(), new MetricsEmitter(context.getMetrics())::emitMetrics);
+ } catch (Exception e) {
+ LOG.warn("Metrics for the number of affected rows in GCS Sink maybe incorrect.", e);
+ }
+ }
+
+ private String getPrefixPath() {
+ String filenameBase = getFilenameBase();
+ if (filenameBase == null) {
+ return outputPath;
+ }
+ //Following code is for handling a special case for saving files in same output directory.
+ //The specified file prefix from Filesystem Properties/Output file base name can be used to make filename unique.
+ //The code is based on assumptions from the internal implementation of
+ //org.apache.avro.mapreduce.AvroOutputFormatBase and org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+ String outputPathPrefix = outputPath.endsWith("/") ? outputPath.substring(0, outputPath.length() - 1) : outputPath;
+ return String.format("%s/%s-", outputPathPrefix, filenameBase);
+ }
+
+ @Nullable
+ private String getFilenameBase() {
+ String outputFileBaseName = config.getOutputFileNameBase();
+ if (outputFileBaseName != null && !outputFileBaseName.isEmpty()) {
+ return outputFileBaseName;
+ }
+
+ Map fileSystemProperties = config.getFileSystemProperties();
+ if (fileSystemProperties.containsKey(AVRO_NAMED_OUTPUT) && config.getFormat() == FileFormat.AVRO) {
+ return fileSystemProperties.get(AVRO_NAMED_OUTPUT);
+ }
+ if (fileSystemProperties.containsKey(COMMON_NAMED_OUTPUT)) {
+ return fileSystemProperties.get(COMMON_NAMED_OUTPUT);
+ }
+ return null;
+ }
+
+ private static class MetricsEmitter {
+ private StageMetrics stageMetrics;
+
+ private MetricsEmitter(StageMetrics stageMetrics) {
+ this.stageMetrics = stageMetrics;
+ }
+
+ public void emitMetrics(Map metaData) {
+ long totalRows = extractRecordCount(metaData);
+ if (totalRows == 0) {
+ return;
+ }
+
+ //work around since StageMetrics count() only takes int as of now
+ int cap = 10000; // so the loop will not cause significant delays
+ long count = totalRows / Integer.MAX_VALUE;
+ if (count > cap) {
+ LOG.warn("Total record count is too high! Metric for the number of affected rows may not be updated correctly");
+ }
+ count = count < cap ? count : cap;
+ for (int i = 0; i <= count && totalRows > 0; i++) {
+ int rowCount = totalRows < Integer.MAX_VALUE ? (int) totalRows : Integer.MAX_VALUE;
+ stageMetrics.count(RECORDS_UPDATED_METRIC, rowCount);
+ totalRows -= rowCount;
+ }
+ }
+
+ private long extractRecordCount(Map metadata) {
+ String value = metadata.get(RECORD_COUNT);
+ return value == null ? 0L : Long.parseLong(value);
+ }
+ }
+
/**
* Sink configuration.
*/
@@ -94,6 +231,8 @@ public static class GCSBatchSinkConfig extends GCPReferenceSinkConfig implements
private static final String NAME_FORMAT = "format";
private static final String NAME_SCHEMA = "schema";
private static final String NAME_LOCATION = "location";
+ private static final String NAME_FS_PROPERTIES = "fileSystemProperties";
+ private static final String NAME_FILE_NAME_BASE = "outputFileNameBase";
private static final String SCHEME = "gs://";
@Name(NAME_PATH)
@@ -132,6 +271,18 @@ public static class GCSBatchSinkConfig extends GCPReferenceSinkConfig implements
"This value is ignored if the bucket already exists")
protected String location;
+ @Name(NAME_FS_PROPERTIES)
+ @Macro
+ @Nullable
+ @Description("Advanced feature to specify any additional properties that should be used with the sink.")
+ private String fileSystemProperties;
+
+ @Name(NAME_FILE_NAME_BASE)
+ @Macro
+ @Nullable
+ @Description("Advanced feature to specify file output name prefix.")
+ private String outputFileNameBase;
+
@Override
public void validate() {
// no-op
@@ -171,6 +322,13 @@ public void validate(FailureCollector collector) {
} catch (IllegalArgumentException e) {
collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_SCHEMA).withStacktrace(e.getStackTrace());
}
+
+ try {
+ getFileSystemProperties();
+ } catch (IllegalArgumentException e) {
+ collector.addFailure("File system properties must be a valid json.", null)
+ .withConfigProperty(NAME_FS_PROPERTIES).withStacktrace(e.getStackTrace());
+ }
}
public String getBucket() {
@@ -215,5 +373,22 @@ public String getDelimiter() {
public String getLocation() {
return location;
}
+
+ public Map getFileSystemProperties() {
+ if (fileSystemProperties == null || fileSystemProperties.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ try {
+ return new Gson().fromJson(fileSystemProperties, new TypeToken