Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/BigQueryMultiTable-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ It can be found on the Dashboard in the Google Cloud Platform Console. This is t
that the BigQuery job will run in. If a temporary bucket needs to be created, the service account
must have permission in this project to create buckets.

**Dataset Project**: Project the dataset belongs to. This is only required if the dataset is not
in the same project that the BigQuery job will run in. If no value is given, it will default to the
configured Project ID.

**Dataset:** Dataset the tables belongs to. A dataset is contained within a specific project.
Datasets are top-level containers that are used to organize and control access to tables and views.
If dataset does not exist, it will be created.
Expand Down
29 changes: 26 additions & 3 deletions docs/BigQueryTable-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ It can be found on the Dashboard in the Google Cloud Platform Console. This is t
that the BigQuery job will run in. If a temporary bucket needs to be created, the service account
must have permission in this project to create buckets.

**Dataset Project**: Project the dataset belongs to. This is only required if the dataset is not
in the same project that the BigQuery job will run in. If no value is given, it will default to the
configured Project ID.

**Dataset**: Dataset the table belongs to. A dataset is contained within a specific project.
Datasets are top-level containers that are used to organize and control access to tables and views.

Expand Down Expand Up @@ -66,11 +70,30 @@ the update operation will be performed only in the partitions meeting the criter
**Location:** The location where the big query dataset will get created. This value is ignored
if the dataset or temporary bucket already exist.

**Create Partitioned Table**: Whether to create the BigQuery table with time partitioning. This value
**Create Partitioned Table [DEPRECATED]**: Whether to create the BigQuery table with time partitioning. This value
is ignored if the table already exists.
* When this is set to true, table will be created with time partitioning.
* When this is set to false, table will be created without time partitioning.

* When this is set to false, value of Partitioning type will be used.
* [DEPRECATED] use Partitioning Type

**Partitioning Type**: Specifies the partitioning type. Can either be Integer or Time or None. Defaults to Time.
This value is ignored if the table already exists.
* When this is set to Time, table will be created with time partitioning.
* When this is set to Integer, table will be created with range partitioning.
* When this is set to None, table will be created without time partitioning.

**Range Start**: For integer partitioning, specifies the start of the range. Only used when table doesn’t
exist already, and partitioning type is set to Integer.
* The start value is inclusive.

**Range End**: For integer partitioning, specifies the end of the range. Only used when table doesn’t
exist already, and partitioning type is set to Integer.
* The end value is exclusive.

**Range Interval**: For integer partitioning, specifies the partition interval. Only used when table doesn’t exist already,
and partitioning type is set to Integer.
* The interval value must be a positive integer.

**Partition Field**: Partitioning column for the BigQuery table. This should be left empty if the
BigQuery table is an ingestion-time partitioned table.

Expand Down
6 changes: 6 additions & 0 deletions docs/GCS-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,11 @@ The delimiter will be ignored if the format is anything other than 'delimited'.
authorization. Can be set to 'auto-detect' when running on a Dataproc cluster.
When running on other clusters, the file must be present on every node in the cluster.

**Output File Prefix:** Prefix for the output file name.
If none is given, it will default to 'part', which means all data files written by the sink will look like
'part-r-00000', 'part-r-00001', etc.

**File System Properties:** Additional properties to use with the OutputFormat.

**Schema:** Schema of the data to write.
The 'avro' and 'parquet' formats require a schema but other formats do not.
58 changes: 58 additions & 0 deletions docs/GCSArgumentSetter-action.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# GCS Argument Setter

Description
-----------

Fetch Json File from GCS, to set arguments in the pipeline.

The plugin provides the ability to map json properties as pipeline arguments name and columns
values as pipeline arguments Following is JSON configuration that can be provided.

This is most commonly used when the structure of a pipeline is static,
and its configuration needs to be managed outside the pipeline itself.

The Json File must contain arguments in a list:

{
"arguments" : [
{ "name" : "argument name", type:"type", value" :"argument value"},
{ "name" : "argument1 name",type:"type", "value" :"argument1 value"}
]
}
Where type can be Schema, Int, Float, Double, Short, String, Char, Array, Map


Credentials
-----------
If the plugin is run on a Google Cloud Dataproc cluster, the service account key does not need to be
provided and can be set to 'auto-detect'.
Credentials will be automatically read from the cluster environment.

If the plugin is not run on a Dataproc cluster, the path to a service account key must be provided.
The service account key can be found on the Dashboard in the Cloud Platform Console.
Make sure the account key has permission to access BigQuery and Google Cloud Storage.
The service account key file needs to be available on every node in your cluster and
must be readable by all users running the job.

Properties
----------
**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc.

**Project ID**: Google Cloud Project ID, which uniquely identifies a project.
It can be found on the Dashboard in the Google Cloud Platform Console. This is the project
that the BigQuery job will run in. If a temporary bucket needs to be created, the service account
must have permission in this project to create buckets.

**Path**: GCS Path to the file containing the arguments.

**Provide Service Account as JSON Content**: Provide service account as JSON. When it is set to 'Yes',
the content of service account key needs to be copied, whereas when it is set to 'No' the service Account
file path needs to be specified. The default value is 'No'.

**Service Account File Path**: Path on the local file system of the service account key used for
authorization. Can be set to 'auto-detect' when running on a Dataproc cluster.
When running on other clusters, the file must be present on every node in the cluster.

**Service Account JSON**: The content of the service account.


Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
{
"artifact": {
"name": "cdap-data-pipeline",
"version": "6.3.0-SNAPSHOT",
"scope": "SYSTEM"
},
"description": "Data Pipeline Application",
"name": "example-csv-file-to-bigquery-with-range-partition",
"config": {
"resources": {
"memoryMB": 2048,
"virtualCores": 1
},
"driverResources": {
"memoryMB": 2048,
"virtualCores": 1
},
"connections": [
{
"from": "File",
"to": "BigQuery"
}
],
"comments": [],
"postActions": [],
"properties": {},
"processTimingEnabled": true,
"stageLoggingEnabled": false,
"stages": [
{
"name": "File",
"plugin": {
"name": "File",
"type": "batchsource",
"label": "File",
"artifact": {
"name": "core-plugins",
"version": "2.5.0-SNAPSHOT",
"scope": "SYSTEM"
},
"properties": {
"format": "delimited",
"skipHeader": "true",
"filenameOnly": "false",
"recursive": "false",
"ignoreNonExistingFolders": "false",
"schema": "${out_schema}",
"referenceName": "file",
"delimiter": ";",
"path": "${path_to_csv_example}"
}
},
"outputSchema": "${out_schema}"
},
{
"name": "BigQuery",
"plugin": {
"name": "BigQueryTable",
"type": "batchsink",
"label": "BigQuery",
"artifact": {
"name": "google-cloud",
"version": "0.16.0-SNAPSHOT",
"scope": "USER"
},
"properties": {
"serviceFilePath": "/Users/ardian/Workspace/CDAP/google-cloud-plugins/BigQueryArgumentSetter/Data/adaptivescale-178418-a5edd1eb37f9.json",
"operation": "insert",
"truncateTable": "false",
"allowSchemaRelaxation": "false",
"location": "US",
"createPartitionedTable": "false",
"partitioningType": "INTEGER",
"rangeStart": "${range_start}",
"partitionFilterRequired": "false",
"dataset": "${bqsink_dataset}",
"table": "${bqsink_table}",
"referenceName": "BigQuerySink",
"partitionByField": "${partitionfield}",
"rangeEnd": "${range_end}",
"rangeInterval": "${range_interval}",
"schema": "${out_schema}",
"project": "adaptivescale-178418",
"bucket": "adaptive_scale_test_bucket"
}
},
"outputSchema": "${out_schema}",
"inputSchema": [
{
"name": "File",
"schema": "${out_schema}"
}
]
}
],
"schedule": "0 * * * *",
"engine": "spark",
"numOfRecordsPreview": 100,
"description": "Data Pipeline Application",
"maxConcurrentRuns": 1
}
}
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@
<bigquery.connector.hadoop2.version>hadoop2-1.0.0</bigquery.connector.hadoop2.version>
<commons.codec.version>1.4</commons.codec.version>
<cdap.version>6.3.0-SNAPSHOT</cdap.version>
<cdap.plugin.version>2.4.0-SNAPSHOT</cdap.plugin.version>
<cdap.plugin.version>2.5.0-SNAPSHOT</cdap.plugin.version>
<dropwizard.metrics-core.version>3.2.6</dropwizard.metrics-core.version>
<flogger.system.backend.version>0.3.1</flogger.system.backend.version>
<gcs.connector.version>hadoop2-2.0.0</gcs.connector.version>
<google.cloud.bigtable.version>1.11.0</google.cloud.bigtable.version>
<google.cloud.bigquery.version>1.92.0</google.cloud.bigquery.version>
<google.cloud.bigquery.version>1.100.0</google.cloud.bigquery.version>
<google.cloud.pubsub.version>1.92.0</google.cloud.pubsub.version>
<google.cloud.spanner.version>1.37.0</google.cloud.spanner.version>
<google.cloud.speech.version>1.20.0</google.cloud.speech.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobConfiguration;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
Expand Down Expand Up @@ -86,8 +82,7 @@ public abstract class AbstractBigQuerySink extends BatchSink<StructuredRecord, A
private final UUID uuid = UUID.randomUUID();
protected Configuration baseConfiguration;
private StructuredToAvroTransformer avroTransformer;
private final String jobId = UUID.randomUUID().toString();
private BigQuery bigQuery;
protected BigQuery bigQuery;

/**
* Executes main prepare run logic. Child classes cannot override this method,
Expand All @@ -105,11 +100,11 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
Credentials credentials = serviceAccountFilePath == null ?
null : GCPUtils.loadServiceAccountCredentials(serviceAccountFilePath);
String project = config.getProject();
bigQuery = GCPUtils.getBigQuery(project, credentials);
String datasetProjectId = config.getDatasetProject();
bigQuery = GCPUtils.getBigQuery(datasetProjectId, credentials);
String cmekKey = context.getArguments().get(GCPUtils.CMEK_KEY);
baseConfiguration = getBaseConfiguration(cmekKey);
String bucket = configureBucket();
baseConfiguration.set(BigQueryConstants.CONFIG_JOB_ID, jobId);
if (!context.isPreviewEnabled()) {
createResources(bigQuery, GCPUtils.getStorage(project, credentials), config.getDataset(), bucket,
config.getLocation(), cmekKey);
Expand All @@ -119,9 +114,7 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
}

@Override
public final void onRunFinish(boolean succeeded, BatchSinkContext context) {
recordMetric(succeeded, context);

public void onRunFinish(boolean succeeded, BatchSinkContext context) {
if (getConfig().getBucket() == null) {
Path gcsPath = new Path(String.format(gcsPathFormat, uuid.toString()));
try {
Expand All @@ -136,47 +129,6 @@ public final void onRunFinish(boolean succeeded, BatchSinkContext context) {
}
}

private void recordMetric(boolean succeeded, BatchSinkContext context) {
if (!succeeded) {
return;
}
Job queryJob = bigQuery.getJob(getJobId());
if (queryJob == null) {
LOG.warn("Unable to find BigQuery job. No metric will be emitted for the number of affected rows.");
return;
}
long totalRows = getTotalRows(queryJob);
LOG.info("Job {} affected {} rows", queryJob.getJobId(), totalRows);
//work around since StageMetrics count() only takes int as of now
int cap = 10000; // so the loop will not cause significant delays
long count = totalRows / Integer.MAX_VALUE;
if (count > cap) {
LOG.warn("Total record count is too high! Metric for the number of affected rows may not be updated correctly");
}
count = count < cap ? count : cap;
for (int i = 0; i <= count && totalRows > 0; i++) {
int rowCount = totalRows < Integer.MAX_VALUE ? (int) totalRows : Integer.MAX_VALUE;
context.getMetrics().count(RECORDS_UPDATED_METRIC, rowCount);
totalRows -= rowCount;
}
}

private JobId getJobId() {
String location = bigQuery.getDataset(getConfig().getDataset()).getLocation();
return JobId.newBuilder().setLocation(location).setJob(jobId).build();
}

private long getTotalRows(Job queryJob) {
JobConfiguration.Type type = queryJob.getConfiguration().getType();
if (type == JobConfiguration.Type.LOAD) {
return ((JobStatistics.LoadStatistics) queryJob.getStatistics()).getOutputRows();
} else if (type == JobConfiguration.Type.QUERY) {
return ((JobStatistics.QueryStatistics) queryJob.getStatistics()).getNumDmlAffectedRows();
}
LOG.warn("Unable to identify BigQuery job type. No metric will be emitted for the number of affected rows.");
return 0;
}

@Override
public void initialize(BatchRuntimeContext context) throws Exception {
avroTransformer = new StructuredToAvroTransformer(null);
Expand Down Expand Up @@ -483,7 +435,7 @@ private Configuration getOutputConfiguration(String bucket,

BigQueryOutputConfiguration.configure(
configuration,
String.format("%s.%s", getConfig().getDataset(), tableName),
String.format("%s:%s.%s", getConfig().getDatasetProject(), getConfig().getDataset(), tableName),
outputTableSchema,
temporaryGcsPath,
BigQueryFileFormat.AVRO,
Expand Down
Loading