Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/GCSFile-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ If the format is 'text', the schema must contain a field named 'body' of type 's

**Skip Header** Whether to skip the first line of each file. Supported formats are 'text', 'csv', 'tsv', 'delimited'.

**Service Account File Path**: Path on the local file system of the service account key used for
**Service Account** - service account key used for authorization

* **File Path**: Path on the local file system of the service account key used for
authorization. Can be set to 'auto-detect' when running on a Dataproc cluster.
When running on other clusters, the file must be present on every node in the cluster.

* **JSON**: Contents of the service account JSON file.

**Maximum Split Size:** Maximum size in bytes for each input partition.
Smaller partitions will increase the level of parallelism, but will require more resources and overhead.
The default value is 128MB.
Expand Down
52 changes: 51 additions & 1 deletion src/main/java/io/cdap/plugin/gcp/common/GCPConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
*/
public class GCPConfig extends PluginConfig {
public static final String NAME_PROJECT = "project";
public static final String NAME_SERVICE_ACCOUNT_TYPE = "serviceAccountType";
public static final String NAME_SERVICE_ACCOUNT_FILE_PATH = "serviceFilePath";
public static final String NAME_SERVICE_ACCOUNT_JSON = "serviceJson";
public static final String AUTO_DETECT = "auto-detect";
public static final String SERVICE_ACCOUNT_TYPE_FILE_PATH = "filePath";
public static final String SERVICE_ACCOUNT_TYPE_JSON = "json";

@Name(NAME_PROJECT)
@Description("Google Cloud Project ID, which uniquely identifies a project. "
Expand All @@ -26,6 +30,12 @@ public class GCPConfig extends PluginConfig {
@Nullable
protected String project;

@Name(NAME_SERVICE_ACCOUNT_TYPE)
@Description("Service account which can be either file path or JSON content.")
@Macro
@Nullable
protected String serviceAccountType;

@Name(NAME_SERVICE_ACCOUNT_FILE_PATH)
@Description("Path on the local file system of the service account key used "
+ "for authorization. Can be set to 'auto-detect' when running on a Dataproc cluster. "
Expand All @@ -34,6 +44,12 @@ public class GCPConfig extends PluginConfig {
@Nullable
protected String serviceFilePath;

@Name(NAME_SERVICE_ACCOUNT_JSON)
@Description("Content of the service account file.")
@Macro
@Nullable
protected String serviceJson;

public String getProject() {
String projectId = tryGetProject();
if (projectId == null) {
Expand Down Expand Up @@ -64,6 +80,40 @@ public String getServiceAccountFilePath() {
return serviceFilePath;
}

@Nullable
public String getServiceAccountJson() {
if (containsMacro(NAME_SERVICE_ACCOUNT_JSON) || Strings.isNullOrEmpty(serviceJson)) {
return null;
}
return serviceJson;
}

public String getServiceAccountType() {
if (containsMacro(NAME_SERVICE_ACCOUNT_TYPE) || Strings.isNullOrEmpty(serviceAccountType)) {
return null;
}
return serviceAccountType;
}

public Boolean isServiceAccountJson() {
return getServiceAccountType() == null ?
null : getServiceAccountType().equals(SERVICE_ACCOUNT_TYPE_JSON);
}

public Boolean isServiceAccountFilePath() {
return getServiceAccountType() == null ?
null : getServiceAccountType().equals(SERVICE_ACCOUNT_TYPE_FILE_PATH);
}

@Nullable
public String getServiceAccount() {
Boolean serviceAccountJson = isServiceAccountJson();
if (serviceAccountJson == null) {
return null;
}
return serviceAccountJson ? getServiceAccountJson() : getServiceAccountFilePath();
}

/**
* Return true if the service account is set to auto-detect but it can't be fetched from the environment.
* This shouldn't result in a deployment failure, as the credential could be detected at runtime if the pipeline
Expand All @@ -72,7 +122,7 @@ public String getServiceAccountFilePath() {
* @return true if the service account is set to auto-detect but it can't be fetched from the environment.
*/
public boolean autoServiceAccountUnavailable() {
if (getServiceAccountFilePath() == null) {
if (getServiceAccountFilePath() == null && SERVICE_ACCOUNT_TYPE_FILE_PATH.equals(getServiceAccountType())) {
try {
ServiceAccountCredentials.getApplicationDefault();
} catch (IOException e) {
Expand Down
67 changes: 63 additions & 4 deletions src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@
import com.google.cloud.storage.StorageOptions;
import io.cdap.plugin.gcp.gcs.GCSPath;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;

Expand All @@ -40,7 +44,18 @@
public class GCPUtils {
public static final String CMEK_KEY = "gcp.cmek.key.name";
public static final String FS_GS_PROJECT_ID = "fs.gs.project.id";
public static final String CLOUD_JSON_KEYFILE = "google.cloud.auth.service.account.json.keyfile";
public static final String CLOUD_JSON_KEYFILE_SUFFIX = "auth.service.account.json.keyfile";
public static final String CLOUD_JSON_KEYFILE_PREFIX = "google.cloud";
public static final String CLOUD_JSON_KEYFILE = String.format("%s.%s", CLOUD_JSON_KEYFILE_PREFIX,
CLOUD_JSON_KEYFILE_SUFFIX);
public static final String CLOUD_ACCOUNT_EMAIL_SUFFIX = "auth.service.account.email";
public static final String CLOUD_ACCOUNT_PRIVATE_KEY_ID_SUFFIX = "auth.service.account.private.key.id";
public static final String CLOUD_ACCOUNT_KEY_SUFFIX = "auth.service.account.private.key";
public static final String CLOUD_ACCOUNT_JSON_SUFFIX = "auth.service.account.json";
public static final String PRIVATE_KEY_WRAP = "-----BEGIN PRIVATE KEY-----\\n%s\\n-----END PRIVATE KEY-----\\n";
public static final String SERVICE_ACCOUNT_TYPE = "cdap.gcs.auth.service.account.type";
public static final String SERVICE_ACCOUNT_TYPE_FILE_PATH = "filePath";
public static final String SERVICE_ACCOUNT_TYPE_JSON = "json";

public static ServiceAccountCredentials loadServiceAccountCredentials(String path) throws IOException {
File credentialsPath = new File(path);
Expand All @@ -49,11 +64,55 @@ public static ServiceAccountCredentials loadServiceAccountCredentials(String pat
}
}

public static ServiceAccountCredentials loadServiceAccountCredentials(String content,
boolean isServiceAccountFilePath)
throws IOException {
if (isServiceAccountFilePath) {
return loadServiceAccountCredentials(content);
}
InputStream jsonInputStream = new ByteArrayInputStream(content.getBytes());
return ServiceAccountCredentials.fromStream(jsonInputStream);
}

public static String extractPrivateKey(ServiceAccountCredentials credentials) {
return String.format(PRIVATE_KEY_WRAP,
Base64.getEncoder().encodeToString(credentials.getPrivateKey().getEncoded()));
}

public static Map<String, String> generateAuthProperties(String serviceAccount,
boolean isServiceAccountFilePath,
String... keyPrefix) throws IOException {
Map<String, String> properties = new HashMap<>();
String privateKeyData = null;
properties.put(SERVICE_ACCOUNT_TYPE, isServiceAccountFilePath ? SERVICE_ACCOUNT_TYPE_FILE_PATH
: SERVICE_ACCOUNT_TYPE_JSON);

for (String prefix : keyPrefix) {
if (isServiceAccountFilePath) {
properties.put(String.format("%s.%s", prefix, CLOUD_JSON_KEYFILE_SUFFIX), serviceAccount);
continue;
}
ServiceAccountCredentials credentials = loadServiceAccountCredentials(serviceAccount, false);

properties.put(String.format("%s.%s", prefix, CLOUD_ACCOUNT_EMAIL_SUFFIX), credentials.getClientEmail());
properties.put(String.format("%s.%s", prefix, CLOUD_ACCOUNT_PRIVATE_KEY_ID_SUFFIX),
credentials.getPrivateKeyId());
if (privateKeyData == null) {
privateKeyData = extractPrivateKey(credentials);
}
properties.put(String.format("%s.%s", prefix, CLOUD_ACCOUNT_KEY_SUFFIX), privateKeyData);
properties.put(String.format("%s.%s", prefix, CLOUD_ACCOUNT_JSON_SUFFIX), serviceAccount);
}
return properties;
}

public static Map<String, String> getFileSystemProperties(GCPConfig config, String path,
Map<String, String> properties) {
String serviceAccountFilePath = config.getServiceAccountFilePath();
if (serviceAccountFilePath != null) {
properties.put(CLOUD_JSON_KEYFILE, serviceAccountFilePath);
try {
properties.putAll(generateAuthProperties(config.getServiceAccount(), config.isServiceAccountFilePath(),
CLOUD_JSON_KEYFILE_PREFIX));
} catch (Exception ignored) {

}
properties.put("fs.gs.impl", GoogleHadoopFileSystem.class.getName());
properties.put("fs.AbstractFileSystem.gs.impl", GoogleHadoopFS.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.cdap.plugin.gcp.gcs.actions;

import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
Expand All @@ -36,9 +35,7 @@
import io.cdap.cdap.etl.api.action.ActionContext;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.GCSPath;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -110,26 +107,11 @@ public void run(ActionContext context) throws Exception {

private static Storage getStorage(GCSArgumentSetterConfig config) throws IOException {
return StorageOptions.newBuilder()
.setProjectId(config.getProject())
.setCredentials(getCredentials(config))
.build()
.getService();
}

public static ServiceAccountCredentials getCredentials(GCSArgumentSetterConfig config)
throws IOException {
ServiceAccountCredentials credentials = null;
switch (config.getServiceAccountType()) {
case FILE_PATH:
credentials = GCPUtils.loadServiceAccountCredentials(config.getServiceAccountFilePath());
break;
case JSON:
InputStream jsonInputStream =
new ByteArrayInputStream(config.getServiceAccountJSON().getBytes());
credentials = ServiceAccountCredentials.fromStream(jsonInputStream);
break;
}
return credentials;
.setProjectId(config.getProject())
.setCredentials(GCPUtils.loadServiceAccountCredentials(config.getServiceAccount(),
config.isServiceAccountJson()))
.build()
.getService();
}

public static String getContent(GCSArgumentSetterConfig config) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,17 @@
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.plugin.gcp.common.GCPConfig;
import io.cdap.plugin.gcp.gcs.GCSPath;
import javax.annotation.Nullable;

/** Holds configuration required for configuring {@link GCSArgumentSetter}. */
public final class GCSArgumentSetterConfig extends GCPConfig {

public static final String NAME_PATH = "path";
public static final String NAME_SERVICE_ACCOUNT_TYPE = "serviceAccountType";
public static final String NAME_SERVICE_ACCOUNT_JSON = "serviceAccountJSON";

@Name(NAME_PATH)
@Macro
@Description("GCS Path to the file containing the arguments")
private String path;

@Name(NAME_SERVICE_ACCOUNT_TYPE)
@Macro
@Nullable
@Description(
"Provide service account as JSON. When it is set to 'Yes', "
+ "the content of service account key needs to be copied, whereas when it is set to 'No' "
+ "the service Account file path needs to be specified. The default value is 'No'")
private String serviceAccountType;

@Name(NAME_SERVICE_ACCOUNT_JSON)
@Macro
@Nullable
@Description("The content of the service account.")
private String serviceAccountJSON;

public void validate(FailureCollector collector) {
validateProperties(collector);

Expand All @@ -74,12 +56,12 @@ public void validateProperties(FailureCollector collector) {
}
}

if (getServiceAccountType() == ServiceAccountType.JSON
&& !containsMacro(NAME_SERVICE_ACCOUNT_JSON)
&& Strings.isNullOrEmpty(getServiceAccountJSON())) {
if (isServiceAccountJson()
&& !containsMacro(NAME_SERVICE_ACCOUNT_JSON)
&& Strings.isNullOrEmpty(getServiceAccountJson())) {
collector
.addFailure("Required property 'Service Account JSON' has no value.", "")
.withConfigProperty(NAME_SERVICE_ACCOUNT_JSON);
.addFailure("Required property 'Service Account JSON' has no value.", "")
.withConfigProperty(NAME_SERVICE_ACCOUNT_JSON);
}
}

Expand All @@ -93,33 +75,15 @@ private boolean canConnect() {
return false;
}

ServiceAccountType serviceAccountType = getServiceAccountType();

if (serviceAccountType == ServiceAccountType.FILE_PATH) {
if (!isServiceAccountJson()) {
return !containsMacro(NAME_SERVICE_ACCOUNT_FILE_PATH)
&& !Strings.isNullOrEmpty(getServiceAccountFilePath());
}
return !containsMacro(NAME_SERVICE_ACCOUNT_JSON)
&& !Strings.isNullOrEmpty(getServiceAccountJSON());
&& !Strings.isNullOrEmpty(getServiceAccountJson());
}

public GCSPath getPath() {
return GCSPath.from(path);
}

public ServiceAccountType getServiceAccountType() {
return "JSON".equalsIgnoreCase(serviceAccountType)
? ServiceAccountType.JSON
: ServiceAccountType.FILE_PATH;
}

public String getServiceAccountJSON() {
return serviceAccountJSON;
}

/** The type of service account. */
public enum ServiceAccountType {
FILE_PATH,
JSON;
}
}
Loading