diff --git a/docs/GCSFile-batchsource.md b/docs/GCSFile-batchsource.md index 84faae03c8..3417de9c4e 100644 --- a/docs/GCSFile-batchsource.md +++ b/docs/GCSFile-batchsource.md @@ -41,10 +41,14 @@ If the format is 'text', the schema must contain a field named 'body' of type 's **Skip Header** Whether to skip the first line of each file. Supported formats are 'text', 'csv', 'tsv', 'delimited'. -**Service Account File Path**: Path on the local file system of the service account key used for +**Service Account** - service account key used for authorization + +* **File Path**: Path on the local file system of the service account key used for authorization. Can be set to 'auto-detect' when running on a Dataproc cluster. When running on other clusters, the file must be present on every node in the cluster. +* **JSON**: Contents of the service account JSON file. + **Maximum Split Size:** Maximum size in bytes for each input partition. Smaller partitions will increase the level of parallelism, but will require more resources and overhead. The default value is 128MB. diff --git a/src/main/java/io/cdap/plugin/gcp/common/GCPConfig.java b/src/main/java/io/cdap/plugin/gcp/common/GCPConfig.java index 4bef405a1f..83d9a2c11b 100644 --- a/src/main/java/io/cdap/plugin/gcp/common/GCPConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/common/GCPConfig.java @@ -16,8 +16,12 @@ */ public class GCPConfig extends PluginConfig { public static final String NAME_PROJECT = "project"; + public static final String NAME_SERVICE_ACCOUNT_TYPE = "serviceAccountType"; public static final String NAME_SERVICE_ACCOUNT_FILE_PATH = "serviceFilePath"; + public static final String NAME_SERVICE_ACCOUNT_JSON = "serviceJson"; public static final String AUTO_DETECT = "auto-detect"; + public static final String SERVICE_ACCOUNT_TYPE_FILE_PATH = "filePath"; + public static final String SERVICE_ACCOUNT_TYPE_JSON = "json"; @Name(NAME_PROJECT) @Description("Google Cloud Project ID, which uniquely identifies a project. " @@ -26,6 +30,12 @@ public class GCPConfig extends PluginConfig { @Nullable protected String project; + @Name(NAME_SERVICE_ACCOUNT_TYPE) + @Description("Service account which can be either file path or JSON content.") + @Macro + @Nullable + protected String serviceAccountType; + @Name(NAME_SERVICE_ACCOUNT_FILE_PATH) @Description("Path on the local file system of the service account key used " + "for authorization. Can be set to 'auto-detect' when running on a Dataproc cluster. " @@ -34,6 +44,12 @@ public class GCPConfig extends PluginConfig { @Nullable protected String serviceFilePath; + @Name(NAME_SERVICE_ACCOUNT_JSON) + @Description("Content of the service account file.") + @Macro + @Nullable + protected String serviceJson; + public String getProject() { String projectId = tryGetProject(); if (projectId == null) { @@ -64,6 +80,40 @@ public String getServiceAccountFilePath() { return serviceFilePath; } + @Nullable + public String getServiceAccountJson() { + if (containsMacro(NAME_SERVICE_ACCOUNT_JSON) || Strings.isNullOrEmpty(serviceJson)) { + return null; + } + return serviceJson; + } + + public String getServiceAccountType() { + if (containsMacro(NAME_SERVICE_ACCOUNT_TYPE) || Strings.isNullOrEmpty(serviceAccountType)) { + return null; + } + return serviceAccountType; + } + + public Boolean isServiceAccountJson() { + return getServiceAccountType() == null ? + null : getServiceAccountType().equals(SERVICE_ACCOUNT_TYPE_JSON); + } + + public Boolean isServiceAccountFilePath() { + return getServiceAccountType() == null ? + null : getServiceAccountType().equals(SERVICE_ACCOUNT_TYPE_FILE_PATH); + } + + @Nullable + public String getServiceAccount() { + Boolean serviceAccountJson = isServiceAccountJson(); + if (serviceAccountJson == null) { + return null; + } + return serviceAccountJson ? getServiceAccountJson() : getServiceAccountFilePath(); + } + /** * Return true if the service account is set to auto-detect but it can't be fetched from the environment. * This shouldn't result in a deployment failure, as the credential could be detected at runtime if the pipeline @@ -72,7 +122,7 @@ public String getServiceAccountFilePath() { * @return true if the service account is set to auto-detect but it can't be fetched from the environment. */ public boolean autoServiceAccountUnavailable() { - if (getServiceAccountFilePath() == null) { + if (getServiceAccountFilePath() == null && SERVICE_ACCOUNT_TYPE_FILE_PATH.equals(getServiceAccountType())) { try { ServiceAccountCredentials.getApplicationDefault(); } catch (IOException e) { diff --git a/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java b/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java index c6060d2077..0cada1a509 100644 --- a/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java @@ -28,9 +28,13 @@ import com.google.cloud.storage.StorageOptions; import io.cdap.plugin.gcp.gcs.GCSPath; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; +import java.util.Base64; +import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; @@ -40,7 +44,18 @@ public class GCPUtils { public static final String CMEK_KEY = "gcp.cmek.key.name"; public static final String FS_GS_PROJECT_ID = "fs.gs.project.id"; - public static final String CLOUD_JSON_KEYFILE = "google.cloud.auth.service.account.json.keyfile"; + public static final String CLOUD_JSON_KEYFILE_SUFFIX = "auth.service.account.json.keyfile"; + public static final String CLOUD_JSON_KEYFILE_PREFIX = "google.cloud"; + public static final String CLOUD_JSON_KEYFILE = String.format("%s.%s", CLOUD_JSON_KEYFILE_PREFIX, + CLOUD_JSON_KEYFILE_SUFFIX); + public static final String CLOUD_ACCOUNT_EMAIL_SUFFIX = "auth.service.account.email"; + public static final String CLOUD_ACCOUNT_PRIVATE_KEY_ID_SUFFIX = "auth.service.account.private.key.id"; + public static final String CLOUD_ACCOUNT_KEY_SUFFIX = "auth.service.account.private.key"; + public static final String CLOUD_ACCOUNT_JSON_SUFFIX = "auth.service.account.json"; + public static final String PRIVATE_KEY_WRAP = "-----BEGIN PRIVATE KEY-----\\n%s\\n-----END PRIVATE KEY-----\\n"; + public static final String SERVICE_ACCOUNT_TYPE = "cdap.gcs.auth.service.account.type"; + public static final String SERVICE_ACCOUNT_TYPE_FILE_PATH = "filePath"; + public static final String SERVICE_ACCOUNT_TYPE_JSON = "json"; public static ServiceAccountCredentials loadServiceAccountCredentials(String path) throws IOException { File credentialsPath = new File(path); @@ -49,11 +64,55 @@ public static ServiceAccountCredentials loadServiceAccountCredentials(String pat } } + public static ServiceAccountCredentials loadServiceAccountCredentials(String content, + boolean isServiceAccountFilePath) + throws IOException { + if (isServiceAccountFilePath) { + return loadServiceAccountCredentials(content); + } + InputStream jsonInputStream = new ByteArrayInputStream(content.getBytes()); + return ServiceAccountCredentials.fromStream(jsonInputStream); + } + + public static String extractPrivateKey(ServiceAccountCredentials credentials) { + return String.format(PRIVATE_KEY_WRAP, + Base64.getEncoder().encodeToString(credentials.getPrivateKey().getEncoded())); + } + + public static Map generateAuthProperties(String serviceAccount, + boolean isServiceAccountFilePath, + String... keyPrefix) throws IOException { + Map properties = new HashMap<>(); + String privateKeyData = null; + properties.put(SERVICE_ACCOUNT_TYPE, isServiceAccountFilePath ? SERVICE_ACCOUNT_TYPE_FILE_PATH + : SERVICE_ACCOUNT_TYPE_JSON); + + for (String prefix : keyPrefix) { + if (isServiceAccountFilePath) { + properties.put(String.format("%s.%s", prefix, CLOUD_JSON_KEYFILE_SUFFIX), serviceAccount); + continue; + } + ServiceAccountCredentials credentials = loadServiceAccountCredentials(serviceAccount, false); + + properties.put(String.format("%s.%s", prefix, CLOUD_ACCOUNT_EMAIL_SUFFIX), credentials.getClientEmail()); + properties.put(String.format("%s.%s", prefix, CLOUD_ACCOUNT_PRIVATE_KEY_ID_SUFFIX), + credentials.getPrivateKeyId()); + if (privateKeyData == null) { + privateKeyData = extractPrivateKey(credentials); + } + properties.put(String.format("%s.%s", prefix, CLOUD_ACCOUNT_KEY_SUFFIX), privateKeyData); + properties.put(String.format("%s.%s", prefix, CLOUD_ACCOUNT_JSON_SUFFIX), serviceAccount); + } + return properties; + } + public static Map getFileSystemProperties(GCPConfig config, String path, Map properties) { - String serviceAccountFilePath = config.getServiceAccountFilePath(); - if (serviceAccountFilePath != null) { - properties.put(CLOUD_JSON_KEYFILE, serviceAccountFilePath); + try { + properties.putAll(generateAuthProperties(config.getServiceAccount(), config.isServiceAccountFilePath(), + CLOUD_JSON_KEYFILE_PREFIX)); + } catch (Exception ignored) { + } properties.put("fs.gs.impl", GoogleHadoopFileSystem.class.getName()); properties.put("fs.AbstractFileSystem.gs.impl", GoogleHadoopFS.class.getName()); diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetter.java b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetter.java index 0f8f0bf500..9e1408c224 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetter.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetter.java @@ -16,7 +16,6 @@ package io.cdap.plugin.gcp.gcs.actions; -import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.storage.Blob; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; @@ -36,9 +35,7 @@ import io.cdap.cdap.etl.api.action.ActionContext; import io.cdap.plugin.gcp.common.GCPUtils; import io.cdap.plugin.gcp.gcs.GCSPath; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -110,26 +107,11 @@ public void run(ActionContext context) throws Exception { private static Storage getStorage(GCSArgumentSetterConfig config) throws IOException { return StorageOptions.newBuilder() - .setProjectId(config.getProject()) - .setCredentials(getCredentials(config)) - .build() - .getService(); - } - - public static ServiceAccountCredentials getCredentials(GCSArgumentSetterConfig config) - throws IOException { - ServiceAccountCredentials credentials = null; - switch (config.getServiceAccountType()) { - case FILE_PATH: - credentials = GCPUtils.loadServiceAccountCredentials(config.getServiceAccountFilePath()); - break; - case JSON: - InputStream jsonInputStream = - new ByteArrayInputStream(config.getServiceAccountJSON().getBytes()); - credentials = ServiceAccountCredentials.fromStream(jsonInputStream); - break; - } - return credentials; + .setProjectId(config.getProject()) + .setCredentials(GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(), + config.isServiceAccountJson())) + .build() + .getService(); } public static String getContent(GCSArgumentSetterConfig config) throws IOException { diff --git a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetterConfig.java b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetterConfig.java index 573b9c75bf..4591420e4f 100644 --- a/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetterConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetterConfig.java @@ -23,35 +23,17 @@ import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.plugin.gcp.common.GCPConfig; import io.cdap.plugin.gcp.gcs.GCSPath; -import javax.annotation.Nullable; /** Holds configuration required for configuring {@link GCSArgumentSetter}. */ public final class GCSArgumentSetterConfig extends GCPConfig { public static final String NAME_PATH = "path"; - public static final String NAME_SERVICE_ACCOUNT_TYPE = "serviceAccountType"; - public static final String NAME_SERVICE_ACCOUNT_JSON = "serviceAccountJSON"; @Name(NAME_PATH) @Macro @Description("GCS Path to the file containing the arguments") private String path; - @Name(NAME_SERVICE_ACCOUNT_TYPE) - @Macro - @Nullable - @Description( - "Provide service account as JSON. When it is set to 'Yes', " - + "the content of service account key needs to be copied, whereas when it is set to 'No' " - + "the service Account file path needs to be specified. The default value is 'No'") - private String serviceAccountType; - - @Name(NAME_SERVICE_ACCOUNT_JSON) - @Macro - @Nullable - @Description("The content of the service account.") - private String serviceAccountJSON; - public void validate(FailureCollector collector) { validateProperties(collector); @@ -74,12 +56,12 @@ public void validateProperties(FailureCollector collector) { } } - if (getServiceAccountType() == ServiceAccountType.JSON - && !containsMacro(NAME_SERVICE_ACCOUNT_JSON) - && Strings.isNullOrEmpty(getServiceAccountJSON())) { + if (isServiceAccountJson() + && !containsMacro(NAME_SERVICE_ACCOUNT_JSON) + && Strings.isNullOrEmpty(getServiceAccountJson())) { collector - .addFailure("Required property 'Service Account JSON' has no value.", "") - .withConfigProperty(NAME_SERVICE_ACCOUNT_JSON); + .addFailure("Required property 'Service Account JSON' has no value.", "") + .withConfigProperty(NAME_SERVICE_ACCOUNT_JSON); } } @@ -93,33 +75,15 @@ private boolean canConnect() { return false; } - ServiceAccountType serviceAccountType = getServiceAccountType(); - - if (serviceAccountType == ServiceAccountType.FILE_PATH) { + if (!isServiceAccountJson()) { return !containsMacro(NAME_SERVICE_ACCOUNT_FILE_PATH) && !Strings.isNullOrEmpty(getServiceAccountFilePath()); } return !containsMacro(NAME_SERVICE_ACCOUNT_JSON) - && !Strings.isNullOrEmpty(getServiceAccountJSON()); + && !Strings.isNullOrEmpty(getServiceAccountJson()); } public GCSPath getPath() { return GCSPath.from(path); } - - public ServiceAccountType getServiceAccountType() { - return "JSON".equalsIgnoreCase(serviceAccountType) - ? ServiceAccountType.JSON - : ServiceAccountType.FILE_PATH; - } - - public String getServiceAccountJSON() { - return serviceAccountJSON; - } - - /** The type of service account. */ - public enum ServiceAccountType { - FILE_PATH, - JSON; - } } diff --git a/widgets/GCSArgumentSetter-action.json b/widgets/GCSArgumentSetter-action.json index 938f171619..715730b522 100644 --- a/widgets/GCSArgumentSetter-action.json +++ b/widgets/GCSArgumentSetter-action.json @@ -32,24 +32,27 @@ } }, { - "label": "Provide Service Account as JSON Content", "name": "serviceAccountType", - "widget-type": "toggle", + "label": "Service Account Type", + "widget-type": "radio-group", "widget-attributes": { - "on": { - "value": "JSON", - "label": "Yes" - }, - "off": { - "value": "filePath", - "label": "No" - }, - "default": "filePath" + "layout": "inline", + "default": "filePath", + "options": [ + { + "id": "filePath", + "label": "File Path" + }, + { + "id": "json", + "label": "JSON" + } + ] } }, { "widget-type": "textbox", - "label": "Service Account File Path", + "label": "File Path", "name": "serviceFilePath", "widget-attributes": { "default": "auto-detect" @@ -57,8 +60,8 @@ }, { "widget-type": "textbox", - "label": "Service Account JSON", - "name": "serviceAccountJSON" + "label": "JSON", + "name": "serviceJson" } ] } @@ -79,12 +82,12 @@ { "name": "ServiceAuthenticationTypeJSON", "condition": { - "expression": "serviceAccountType == 'JSON'" + "expression": "serviceAccountType == 'json'" }, "show": [ { "type": "property", - "name": "serviceAccountJSON" + "name": "serviceJson" } ] } diff --git a/widgets/GCSFile-batchsource.json b/widgets/GCSFile-batchsource.json index 28275ff999..4285b58757 100644 --- a/widgets/GCSFile-batchsource.json +++ b/widgets/GCSFile-batchsource.json @@ -86,13 +86,37 @@ { "label" : "Credentials", "properties" : [ + { + "name": "serviceAccountType", + "label": "Service Account Type", + "widget-type": "radio-group", + "widget-attributes": { + "layout": "inline", + "default": "filePath", + "options": [ + { + "id": "filePath", + "label": "File Path" + }, + { + "id": "json", + "label": "JSON" + } + ] + } + }, { "widget-type": "textbox", - "label": "Service Account File Path", + "label": "File Path", "name": "serviceFilePath", "widget-attributes": { "default": "auto-detect" } + }, + { + "widget-type": "textbox", + "label": "JSON", + "name": "serviceJson" } ] }, @@ -200,6 +224,32 @@ ] } ], + "filters": [ + { + "name": "ServiceAuthenticationTypeFilePath", + "condition": { + "expression": "serviceAccountType == 'filePath'" + }, + "show": [ + { + "type": "property", + "name": "serviceFilePath" + } + ] + }, + { + "name": "ServiceAuthenticationTypeJSON", + "condition": { + "expression": "serviceAccountType == 'json'" + }, + "show": [ + { + "type": "property", + "name": "serviceJson" + } + ] + } + ], "outputs": [ { "name": "schema",