From c038ca417d0f190b45f6b3228b0889e65547428f Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Mon, 7 Mar 2016 18:28:02 -0800 Subject: [PATCH 1/8] Export Parquet graph to CSV --- .../exporthdfs/ExportHdfsCsvPlugin.scala | 37 +------ .../src/main/resources/atk-plugin.conf | 1 + .../export/csv/ExportGraphCsvArgs.scala | 49 +++++++++ .../export/csv/ExportGraphCsvPlugin.scala | 95 ++++++++++++++++ .../export/csv/GraphExportMetadata.scala | 9 ++ .../org/apache/spark/frame/FrameRdd.scala | 104 ++++++++++++++---- 6 files changed, 237 insertions(+), 58 deletions(-) create mode 100644 engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvArgs.scala create mode 100644 engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala create mode 100644 engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala diff --git a/engine-plugins/frame-plugins/src/main/scala/org/trustedanalytics/atk/engine/frame/plugins/exporthdfs/ExportHdfsCsvPlugin.scala b/engine-plugins/frame-plugins/src/main/scala/org/trustedanalytics/atk/engine/frame/plugins/exporthdfs/ExportHdfsCsvPlugin.scala index 7f74d17148..fcd4554f5b 100644 --- a/engine-plugins/frame-plugins/src/main/scala/org/trustedanalytics/atk/engine/frame/plugins/exporthdfs/ExportHdfsCsvPlugin.scala +++ b/engine-plugins/frame-plugins/src/main/scala/org/trustedanalytics/atk/engine/frame/plugins/exporthdfs/ExportHdfsCsvPlugin.scala @@ -69,47 +69,12 @@ class ExportHdfsCsvPlugin extends SparkCommandPlugin[ExportHdfsCsvArgs, ExportMe require(!fileStorage.exists(new Path(arguments.folderName)), "File or Directory already exists") val frame: SparkFrame = arguments.frame // load frame as RDD - val sample = exportToHdfsCsv(frame.rdd, arguments.folderName, arguments.separator.charAt(0), arguments.count, arguments.offset) + val sample = frame.rdd.exportToHdfsCsv(arguments.folderName, arguments.separator.charAt(0), arguments.count, arguments.offset) val artifactPath = new Path(s"${fileStorage.hdfs.getHomeDirectory()}/${arguments.folderName}") ExportMetadata(artifactPath.toString, "all", "csv", frame.rowCount, sample, fileStorage.size(artifactPath.toString), Some(arguments.folderName)) } - /** - * Export to a file in CSV format - * - * @param frameRdd input rdd containing all columns - * @param filename file path where to store the file - */ - private def exportToHdfsCsv( - frameRdd: FrameRdd, - filename: String, - separator: Char, - count: Int, - offset: Int) = { - val filterRdd = if (count > 0) MiscFrameFunctions.getPagedRdd(frameRdd, offset, count, -1) else frameRdd - val headers = frameRdd.frameSchema.columnNames.mkString(separator.toString) - val csvFormat = CSVFormat.RFC4180.withDelimiter(separator) - - val csvRdd = filterRdd.map(row => { - val stringBuilder = new java.lang.StringBuilder - val printer = new CSVPrinter(stringBuilder, csvFormat) - val array = row.toSeq.map(col => - col match { - case null => "" - case arr: ArrayBuffer[_] => arr.mkString(",") - case seq: Seq[_] => seq.mkString(",") - case x => x.toString - }) - for (i <- array) printer.print(i) - stringBuilder.toString - }) - - val dataSample = if (csvRdd.isEmpty()) StringUtils.EMPTY else csvRdd.first() - val addHeaders = frameRdd.sparkContext.parallelize(List(headers)) ++ csvRdd - addHeaders.saveAsTextFile(filename) - dataSample - } } diff --git a/engine-plugins/graph-plugins/src/main/resources/atk-plugin.conf b/engine-plugins/graph-plugins/src/main/resources/atk-plugin.conf index cdad64f112..be93db94e9 100644 --- a/engine-plugins/graph-plugins/src/main/resources/atk-plugin.conf +++ b/engine-plugins/graph-plugins/src/main/resources/atk-plugin.conf @@ -24,6 +24,7 @@ atk.plugin { "org.trustedanalytics.atk.plugins.connectedcomponents.ConnectedComponentsPlugin" "org.trustedanalytics.atk.plugins.trianglecount.TriangleCountPlugin" "org.trustedanalytics.atk.plugins.RenameGraphPlugin" + "org.trustedanalytics.atk.plugins.export.csv.ExportGraphCsvPlugin" ] } \ No newline at end of file diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvArgs.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvArgs.scala new file mode 100644 index 0000000000..848af3ab1b --- /dev/null +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvArgs.scala @@ -0,0 +1,49 @@ +/** + * Copyright (c) 2015 Intel Corporation  + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *       http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.trustedanalytics.atk.plugins.export.csv + +import org.apache.commons.lang3.StringUtils +import org.trustedanalytics.atk.domain.frame.FrameReference +import org.trustedanalytics.atk.domain.graph.GraphReference +import org.trustedanalytics.atk.engine.plugin.ArgDoc + +/** + * Input arguments class for export to CSV + */ +//TODO: update doc + +case class ExportGraphCsvArgs(graph: GraphReference, + @ArgDoc("""The HDFS folder path where the files +will be created.""") folderName: String, + @ArgDoc("""The separator for separating the values. +Default is comma (,).""") separator: String = ",", + @ArgDoc("""The number of records you want. +Default, or a non-positive value, is the whole frame.""") count: Int = -1, + @ArgDoc("""The number of rows to skip before exporting to the file. +Default is zero (0).""") offset: Int = 0) { + require(graph != null, "graph is required") + require(folderName != null, "folder name is required") + require(StringUtils.isNotBlank(separator) && separator.length == 1, "A single character separator is required") +} + + + + + + + + diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala new file mode 100644 index 0000000000..a262f22992 --- /dev/null +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala @@ -0,0 +1,95 @@ +/** + * Copyright (c) 2015 Intel Corporation  + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *       http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.trustedanalytics.atk.plugins.export.csv + +import org.apache.commons.csv.{CSVFormat, CSVPrinter} +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.fs.Path +import org.apache.spark.frame.FrameRdd +import org.trustedanalytics.atk.domain.datacatalog.ExportMetadata +import org.trustedanalytics.atk.domain.frame.{FrameEntity, ExportHdfsCsvArgs} +import org.trustedanalytics.atk.domain.graph.SeamlessGraphMeta +import org.trustedanalytics.atk.engine.FileStorage +import org.trustedanalytics.atk.engine.frame.{MiscFrameFunctions, SparkFrame} +import org.trustedanalytics.atk.engine.graph.SparkGraph +import org.trustedanalytics.atk.engine.plugin.{Invocation, PluginDoc, SparkCommandPlugin} + +import scala.collection.mutable.ArrayBuffer + +// Implicits needed for JSON conversion + +/** + * Export a frame to csv file + */ +// TODO:update the doc +@PluginDoc(oneLine = "Write current frame to HDFS in csv format.", + extended = "Export the frame to a file in csv format as a Hadoop file.", + returns = "dict with .....") +class ExportGraphCsvPlugin extends SparkCommandPlugin[ExportGraphCsvArgs, GraphExportMetadata] { + + /** + * The name of the command + */ + override def name: String = "graph:/export_to_csv" + + + /** + * Calculate covariance for the specified columns + * + * @param invocation information about the user and the circumstances at the time of the call, as well as a function + * that can be called to produce a SparkContext that can be used during this invocation + * @param arguments input specification for covariance + * @return value of type declared as the Return type + */ + override def execute(arguments: ExportGraphCsvArgs)(implicit invocation: Invocation): GraphExportMetadata = { + + val fileStorage = new FileStorage + require(!fileStorage.exists(new Path(arguments.folderName)), "File or Directory already exists") + + //get the graph meta data + val graph = arguments.graph + + // TODO: get the list of frame (of the graph ) from the meta data + val graphMeta = engine.graphs.expectSeamless(graph) + val vertexFrames = graphMeta.vertexFrames.map(_.toReference) + + + //TODO: export each frame to CSV file in HDFS + val vertexFolderName = arguments.folderName+"/vertices" + val edgeFolderName = arguments.folderName+"/edges" + val vertexMetadata = exportFramesToCsv(vertexFolderName,graphMeta.vertexFrames,arguments,fileStorage) + val edgeMetadata = exportFramesToCsv(edgeFolderName,graphMeta.edgeFrames,arguments,fileStorage) + GraphExportMetadata(vertexMetadata ++ edgeMetadata) + } + // TODO: add scala doc + + def exportFramesToCsv(folderName: String,frameEntities: List[FrameEntity],arguments: ExportGraphCsvArgs,fileStorage: FileStorage)(implicit invocation: Invocation):List[ExportMetadata]= { + val frames = frameEntities.map(_.toReference) + val metadata = frames.map(frame => { + // load frame as RDD + val sparkFrame: SparkFrame = frame + val sample = sparkFrame.rdd.exportToHdfsCsv(arguments.folderName, arguments.separator.charAt(0), arguments.count, arguments.offset) + + val artifactPath = new Path(s"${fileStorage.hdfs.getHomeDirectory()}/${folderName}/${sparkFrame.label.getOrElse(sparkFrame.frameId)}") + ExportMetadata(artifactPath.toString, "all", "csv", sparkFrame.rowCount, sample, + fileStorage.size(artifactPath.toString), Some(folderName)) + + }) + metadata + } + +} diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala new file mode 100644 index 0000000000..ee2d2c6d9b --- /dev/null +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala @@ -0,0 +1,9 @@ +package org.trustedanalytics.atk.plugins.export.csv + +import org.trustedanalytics.atk.domain.datacatalog.ExportMetadata +//TODO: add scala doc +/** + * + * @param metadata + */ +case class GraphExportMetadata(metadata: List[ExportMetadata]) diff --git a/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala b/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala index 0727911c7d..7cb7794d92 100644 --- a/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala +++ b/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala @@ -17,7 +17,9 @@ package org.apache.spark.frame import breeze.linalg.DenseVector -import org.apache.spark.mllib.stat.{ MultivariateStatisticalSummary, Statistics } +import org.apache.commons.csv.{CSVPrinter, CSVFormat} +import org.apache.commons.lang3.StringUtils +import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics } import org.apache.spark.atk.graph.{ EdgeWrapper, VertexWrapper } import org.apache.spark.frame.ordering.FrameOrderingUtils import org.apache.spark.mllib.linalg.distributed.IndexedRow @@ -33,7 +35,7 @@ import org.trustedanalytics.atk.engine.frame.plugins.ScoreAndLabel import org.trustedanalytics.atk.engine.frame.{ MiscFrameFunctions, RowWrapper } import org.trustedanalytics.atk.engine.graph.plugins.{ VertexSchemaAggregator, EdgeSchemaAggregator, EdgeHolder } import org.trustedanalytics.atk.graphbuilder.elements.{ GBEdge, GBVertex } -import scala.collection.mutable.ListBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.reflect.ClassTag /** @@ -80,7 +82,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Convert a FrameRdd to a Spark Dataframe - * @return Dataframe representing the FrameRdd + * + * @return Dataframe representing the FrameRdd */ def toDataFrame: DataFrame = { new SQLContext(this.sparkContext).createDataFrame(this, sparkSchema) @@ -103,7 +106,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Convert FrameRdd into RDD[Vector] format required by MLLib - * @param featureColumnNames Names of the frame's column(s) to be used + * + * @param featureColumnNames Names of the frame's column(s) to be used * @return RDD of (org.apache.spark.mllib)Vector */ def toDenseVectorRDD(featureColumnNames: List[String]): RDD[Vector] = { @@ -116,7 +120,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Compute MLLib's MultivariateStatisticalSummary from FrameRdd - * @param columnNames Names of the frame's column(s) whose column statistics are to be computed + * + * @param columnNames Names of the frame's column(s) whose column statistics are to be computed * @return MLLib's MultivariateStatisticalSummary */ def columnStatistics(columnNames: List[String]): MultivariateStatisticalSummary = { @@ -126,7 +131,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Convert FrameRdd to RDD[Vector] by mean centering the specified columns - * @param featureColumnNames Names of the frame's column(s) to be used + * + * @param featureColumnNames Names of the frame's column(s) to be used * @return RDD of (org.apache.spark.mllib)Vector */ def toMeanCenteredDenseVectorRDD(featureColumnNames: List[String]): RDD[Vector] = { @@ -139,7 +145,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Convert FrameRdd to RDD[Vector] - * @param featureColumnNames Names of the frame's column(s) to be used + * + * @param featureColumnNames Names of the frame's column(s) to be used * @param columnWeights The weights of the columns * @return RDD of (org.apache.spark.mllib)Vector */ @@ -156,7 +163,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Convert FrameRdd to RDD[(Long, Long, Double)] - * @param sourceColumnName Name of the frame's column storing the source id of the edge + * + * @param sourceColumnName Name of the frame's column storing the source id of the edge * @param destinationColumnName Name of the frame's column storing the destination id of the edge * @param edgeSimilarityColumnName Name of the frame's column storing the similarity between the source and destination * @return RDD[(Long, Long, Double)] @@ -219,7 +227,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Create a new FrameRdd that is only a subset of the columns of this FrameRdd - * @param columnNames names to include + * + * @param columnNames names to include * @return the FrameRdd with only some columns */ def selectColumns(columnNames: List[String]): FrameRdd = { @@ -231,7 +240,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Select a subset of columns while renaming them - * @param columnNamesWithRename map of old names to new names + * + * @param columnNamesWithRename map of old names to new names * @return the new FrameRdd */ def selectColumnsWithRename(columnNamesWithRename: Map[String, String]): FrameRdd = { @@ -336,7 +346,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Sort by one or more columns - * @param columnNamesAndAscending column names to sort by, true for ascending, false for descending + * + * @param columnNamesAndAscending column names to sort by, true for ascending, false for descending * @return the sorted Frame */ def sortByColumns(columnNamesAndAscending: List[(String, Boolean)]): FrameRdd = { @@ -390,7 +401,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Save this RDD to disk or other store - * @param absolutePath location to store + * + * @param absolutePath location to store * @param storageFormat "file/parquet", "file/sequence", etc. */ def save(absolutePath: String, storageFormat: String = "file/parquet"): Unit = { @@ -453,6 +465,44 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) new FrameRdd(frameSchema.addColumn(column), rows) } + //TODO: update doc + /** + * Export to a file in CSV format + * @param filename file path where to store the file + * @param separator + * @param count + * @param offset + * @return + */ + def exportToHdfsCsv(filename: String, + separator: Char, + count: Int, + offset: Int) = { + + val filterRdd = if (count > 0) MiscFrameFunctions.getPagedRdd(this, offset, count, -1) else this + val headers = this.frameSchema.columnNames.mkString(separator.toString) + val csvFormat = CSVFormat.RFC4180.withDelimiter(separator) + + val csvRdd = filterRdd.map(row => { + val stringBuilder = new java.lang.StringBuilder + val printer = new CSVPrinter(stringBuilder, csvFormat) + val array = row.toSeq.map(col => + col match { + case null => "" + case arr: ArrayBuffer[_] => arr.mkString(",") + case seq: Seq[_] => seq.mkString(",") + case x => x.toString + }) + for (i <- array) printer.print(i) + stringBuilder.toString + }) + + val dataSample = if (csvRdd.isEmpty()) StringUtils.EMPTY else csvRdd.first() + val addHeaders = this.sparkContext.parallelize(List(headers)) ++ csvRdd + addHeaders.saveAsTextFile(filename) + dataSample + } + } /** @@ -466,7 +516,8 @@ object FrameRdd { /** * converts a data frame to frame rdd - * @param rdd a data frame + * + * @param rdd a data frame * @return a frame rdd */ def toFrameRdd(rdd: DataFrame): FrameRdd = { @@ -513,7 +564,8 @@ object FrameRdd { /** * Convert an RDD of mixed vertex types into a map where the keys are labels and values are FrameRdd's - * @param gbVertexRDD Graphbuilder Vertex RDD + * + * @param gbVertexRDD Graphbuilder Vertex RDD * @return keys are labels and values are FrameRdd's */ def toFrameRddMap(gbVertexRDD: RDD[GBVertex]): Map[String, FrameRdd] = { @@ -543,7 +595,8 @@ object FrameRdd { /** * Convert an RDD of mixed vertex types into a map where the keys are labels and values are FrameRdd's - * @param gbEdgeRDD Graphbuilder Edge RDD + * + * @param gbEdgeRDD Graphbuilder Edge RDD * @param gbVertexRDD Graphbuilder Vertex RDD * @return keys are labels and values are FrameRdd's */ @@ -564,7 +617,8 @@ object FrameRdd { /** * Converts row object from an RDD[Array[Any]] to an RDD[Product] so that it can be used to create a SchemaRDD - * @return RDD[org.apache.spark.sql.Row] with values equal to row object + * + * @return RDD[org.apache.spark.sql.Row] with values equal to row object */ def toRowRDD(schema: Schema, rows: RDD[Array[Any]]): RDD[org.apache.spark.sql.Row] = { val rowRDD: RDD[org.apache.spark.sql.Row] = rows.map(row => { @@ -586,7 +640,8 @@ object FrameRdd { /** * Converts row object to RDD[IndexedRow] needed to create an IndexedRowMatrix - * @param indexedRows Rows of the frame as RDD[Row] + * + * @param indexedRows Rows of the frame as RDD[Row] * @param frameSchema Schema of the frame * @param featureColumnNames List of the frame's column(s) to be used * @return RDD[IndexedRow] @@ -603,7 +658,8 @@ object FrameRdd { /** * Converts row object to RDD[IndexedRow] needed to create an IndexedRowMatrix - * @param indexedRows Rows of the frame as RDD[Row] + * + * @param indexedRows Rows of the frame as RDD[Row] * @param frameSchema Schema of the frame * @param featureColumnNames List of the frame's column(s) to be used * @param meanVector Vector storing the means of the columns @@ -627,7 +683,8 @@ object FrameRdd { /** * Converts the schema object to a StructType for use in creating a SchemaRDD - * @return StructType with StructFields corresponding to the columns of the schema object + * + * @return StructType with StructFields corresponding to the columns of the schema object */ def schemaToStructType(schema: Schema): StructType = { val fields: Seq[StructField] = schema.columns.map { @@ -648,7 +705,8 @@ object FrameRdd { /** * Converts the spark DataTypes to our schema Datatypes - * @return our schema DataType + * + * @return our schema DataType */ def sparkDataTypeToSchemaDataType(dataType: org.apache.spark.sql.types.DataType): org.trustedanalytics.atk.domain.schema.DataTypes.DataType = { val intType = IntegerType.getClass @@ -682,7 +740,8 @@ object FrameRdd { /** * Converts a spark dataType (as string)to our schema Datatype - * @param sparkDataType spark data type + * + * @param sparkDataType spark data type * @return a DataType */ def sparkDataTypeToSchemaDataType(sparkDataType: String): DataType = { @@ -702,7 +761,8 @@ object FrameRdd { /** * Converts the schema object to a StructType for use in creating a SchemaRDD - * @return StructType with StructFields corresponding to the columns of the schema object + * + * @return StructType with StructFields corresponding to the columns of the schema object */ def schemaToAvroType(schema: Schema): List[(String, String)] = { val fields = schema.columns.map { From 57213fa02b6992668004d37cdd92b45d7c58ee8d Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Wed, 16 Mar 2016 14:28:38 -0700 Subject: [PATCH 2/8] Export graph to CSV --- .../resources/python/graph/ExportGraphCsv.rst | 46 ++++++++ .../exporthdfs/ExportHdfsCsvPlugin.scala | 1 - .../export/csv/ExportGraphCsvArgs.scala | 7 -- .../export/csv/ExportGraphCsvPlugin.scala | 20 ++-- .../export/csv/GraphExportMetadata.scala | 6 +- .../org/apache/spark/frame/FrameRdd.scala | 108 +++++++++--------- 6 files changed, 112 insertions(+), 76 deletions(-) create mode 100644 doc-api-examples/src/main/resources/python/graph/ExportGraphCsv.rst diff --git a/doc-api-examples/src/main/resources/python/graph/ExportGraphCsv.rst b/doc-api-examples/src/main/resources/python/graph/ExportGraphCsv.rst new file mode 100644 index 0000000000..b1ef4c17ee --- /dev/null +++ b/doc-api-examples/src/main/resources/python/graph/ExportGraphCsv.rst @@ -0,0 +1,46 @@ + +>>> import trustedanalytics as ta + +>>> ta.connect() +-etc- + +>>> vertex_schema = [('source', ta.int32), ('label', ta.float32)] +>>> edge_schema = [('source', ta.int32), ('dest', ta.int32), ('weight', ta.int32)] + +>>> vertex_rows = [ [1, 1], [2, 1], [3, 5], [4, 5], [5, 5] ] +>>> edge_rows = [ [1, 2, 1], [1, 3, 1], [2, 3, 1], [1, 4, 1], [4, 5, 1] ] +>>> vertex_frame = ta.Frame(ta.UploadRows (vertex_rows, vertex_schema)) + +>>> edge_frame = ta.Frame(ta.UploadRows (edge_rows, edge_schema)) +-etc- + + +>>> graph = ta.Graph() + +>>> graph.define_vertex_type('source') + +>>> graph.vertices['source'].add_vertices(vertex_frame, 'source', 'label') + +>>> graph.define_edge_type('edges','source', 'source', directed=False) + +>>> graph.edges['edges'].add_edges(edge_frame, 'source', 'dest', ['weight']) + +>>> result = graph.ExportGraphCsvPlugin() + +>>> result['source'].inspect() + [#] _vid _label source label connectedComponentId + ====================================================== + [0] 5 source 5 5.0 1 + [1] 1 source 1 1.0 1 + [2] 2 source 2 1.0 1 + [3] 3 source 3 5.0 1 + [4] 4 source 4 5.0 1 + +>>> graph.edges['edges'].inspect() + [#] _eid _src_vid _dest_vid _label weight + ============================================== + [0] 6 1 2 edges 1 + [1] 7 1 3 edges 1 + [2] 9 1 4 edges 1 + [3] 8 2 3 edges 1 + [4] 10 4 5 edges 1 diff --git a/engine-plugins/frame-plugins/src/main/scala/org/trustedanalytics/atk/engine/frame/plugins/exporthdfs/ExportHdfsCsvPlugin.scala b/engine-plugins/frame-plugins/src/main/scala/org/trustedanalytics/atk/engine/frame/plugins/exporthdfs/ExportHdfsCsvPlugin.scala index fcd4554f5b..86b758f603 100644 --- a/engine-plugins/frame-plugins/src/main/scala/org/trustedanalytics/atk/engine/frame/plugins/exporthdfs/ExportHdfsCsvPlugin.scala +++ b/engine-plugins/frame-plugins/src/main/scala/org/trustedanalytics/atk/engine/frame/plugins/exporthdfs/ExportHdfsCsvPlugin.scala @@ -76,5 +76,4 @@ class ExportHdfsCsvPlugin extends SparkCommandPlugin[ExportHdfsCsvArgs, ExportMe fileStorage.size(artifactPath.toString), Some(arguments.folderName)) } - } diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvArgs.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvArgs.scala index 848af3ab1b..f0ca6db6db 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvArgs.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvArgs.scala @@ -40,10 +40,3 @@ Default is zero (0).""") offset: Int = 0) { require(StringUtils.isNotBlank(separator) && separator.length == 1, "A single character separator is required") } - - - - - - - diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala index a262f22992..276f5e0479 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala @@ -16,17 +16,17 @@ package org.trustedanalytics.atk.plugins.export.csv -import org.apache.commons.csv.{CSVFormat, CSVPrinter} +import org.apache.commons.csv.{ CSVFormat, CSVPrinter } import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.spark.frame.FrameRdd import org.trustedanalytics.atk.domain.datacatalog.ExportMetadata -import org.trustedanalytics.atk.domain.frame.{FrameEntity, ExportHdfsCsvArgs} +import org.trustedanalytics.atk.domain.frame.{ FrameEntity, ExportHdfsCsvArgs } import org.trustedanalytics.atk.domain.graph.SeamlessGraphMeta import org.trustedanalytics.atk.engine.FileStorage -import org.trustedanalytics.atk.engine.frame.{MiscFrameFunctions, SparkFrame} +import org.trustedanalytics.atk.engine.frame.{ MiscFrameFunctions, SparkFrame } import org.trustedanalytics.atk.engine.graph.SparkGraph -import org.trustedanalytics.atk.engine.plugin.{Invocation, PluginDoc, SparkCommandPlugin} +import org.trustedanalytics.atk.engine.plugin.{ Invocation, PluginDoc, SparkCommandPlugin } import scala.collection.mutable.ArrayBuffer @@ -46,7 +46,6 @@ class ExportGraphCsvPlugin extends SparkCommandPlugin[ExportGraphCsvArgs, GraphE */ override def name: String = "graph:/export_to_csv" - /** * Calculate covariance for the specified columns * @@ -67,17 +66,16 @@ class ExportGraphCsvPlugin extends SparkCommandPlugin[ExportGraphCsvArgs, GraphE val graphMeta = engine.graphs.expectSeamless(graph) val vertexFrames = graphMeta.vertexFrames.map(_.toReference) - //TODO: export each frame to CSV file in HDFS - val vertexFolderName = arguments.folderName+"/vertices" - val edgeFolderName = arguments.folderName+"/edges" - val vertexMetadata = exportFramesToCsv(vertexFolderName,graphMeta.vertexFrames,arguments,fileStorage) - val edgeMetadata = exportFramesToCsv(edgeFolderName,graphMeta.edgeFrames,arguments,fileStorage) + val vertexFolderName = arguments.folderName + "/vertices" + val edgeFolderName = arguments.folderName + "/edges" + val vertexMetadata = exportFramesToCsv(vertexFolderName, graphMeta.vertexFrames, arguments, fileStorage) + val edgeMetadata = exportFramesToCsv(edgeFolderName, graphMeta.edgeFrames, arguments, fileStorage) GraphExportMetadata(vertexMetadata ++ edgeMetadata) } // TODO: add scala doc - def exportFramesToCsv(folderName: String,frameEntities: List[FrameEntity],arguments: ExportGraphCsvArgs,fileStorage: FileStorage)(implicit invocation: Invocation):List[ExportMetadata]= { + def exportFramesToCsv(folderName: String, frameEntities: List[FrameEntity], arguments: ExportGraphCsvArgs, fileStorage: FileStorage)(implicit invocation: Invocation): List[ExportMetadata] = { val frames = frameEntities.map(_.toReference) val metadata = frames.map(frame => { // load frame as RDD diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala index ee2d2c6d9b..ef5c02c3ff 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala @@ -3,7 +3,7 @@ package org.trustedanalytics.atk.plugins.export.csv import org.trustedanalytics.atk.domain.datacatalog.ExportMetadata //TODO: add scala doc /** - * - * @param metadata - */ + * + * @param metadata + */ case class GraphExportMetadata(metadata: List[ExportMetadata]) diff --git a/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala b/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala index 7cb7794d92..676c56b33f 100644 --- a/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala +++ b/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala @@ -17,9 +17,9 @@ package org.apache.spark.frame import breeze.linalg.DenseVector -import org.apache.commons.csv.{CSVPrinter, CSVFormat} +import org.apache.commons.csv.{ CSVPrinter, CSVFormat } import org.apache.commons.lang3.StringUtils -import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics } +import org.apache.spark.mllib.stat.{ MultivariateStatisticalSummary, Statistics } import org.apache.spark.atk.graph.{ EdgeWrapper, VertexWrapper } import org.apache.spark.frame.ordering.FrameOrderingUtils import org.apache.spark.mllib.linalg.distributed.IndexedRow @@ -35,7 +35,7 @@ import org.trustedanalytics.atk.engine.frame.plugins.ScoreAndLabel import org.trustedanalytics.atk.engine.frame.{ MiscFrameFunctions, RowWrapper } import org.trustedanalytics.atk.engine.graph.plugins.{ VertexSchemaAggregator, EdgeSchemaAggregator, EdgeHolder } import org.trustedanalytics.atk.graphbuilder.elements.{ GBEdge, GBVertex } -import scala.collection.mutable.{ArrayBuffer, ListBuffer} +import scala.collection.mutable.{ ArrayBuffer, ListBuffer } import scala.reflect.ClassTag /** @@ -82,8 +82,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Convert a FrameRdd to a Spark Dataframe - * - * @return Dataframe representing the FrameRdd + * + * @return Dataframe representing the FrameRdd */ def toDataFrame: DataFrame = { new SQLContext(this.sparkContext).createDataFrame(this, sparkSchema) @@ -106,8 +106,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Convert FrameRdd into RDD[Vector] format required by MLLib - * - * @param featureColumnNames Names of the frame's column(s) to be used + * + * @param featureColumnNames Names of the frame's column(s) to be used * @return RDD of (org.apache.spark.mllib)Vector */ def toDenseVectorRDD(featureColumnNames: List[String]): RDD[Vector] = { @@ -120,8 +120,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Compute MLLib's MultivariateStatisticalSummary from FrameRdd - * - * @param columnNames Names of the frame's column(s) whose column statistics are to be computed + * + * @param columnNames Names of the frame's column(s) whose column statistics are to be computed * @return MLLib's MultivariateStatisticalSummary */ def columnStatistics(columnNames: List[String]): MultivariateStatisticalSummary = { @@ -131,8 +131,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Convert FrameRdd to RDD[Vector] by mean centering the specified columns - * - * @param featureColumnNames Names of the frame's column(s) to be used + * + * @param featureColumnNames Names of the frame's column(s) to be used * @return RDD of (org.apache.spark.mllib)Vector */ def toMeanCenteredDenseVectorRDD(featureColumnNames: List[String]): RDD[Vector] = { @@ -145,8 +145,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Convert FrameRdd to RDD[Vector] - * - * @param featureColumnNames Names of the frame's column(s) to be used + * + * @param featureColumnNames Names of the frame's column(s) to be used * @param columnWeights The weights of the columns * @return RDD of (org.apache.spark.mllib)Vector */ @@ -163,8 +163,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Convert FrameRdd to RDD[(Long, Long, Double)] - * - * @param sourceColumnName Name of the frame's column storing the source id of the edge + * + * @param sourceColumnName Name of the frame's column storing the source id of the edge * @param destinationColumnName Name of the frame's column storing the destination id of the edge * @param edgeSimilarityColumnName Name of the frame's column storing the similarity between the source and destination * @return RDD[(Long, Long, Double)] @@ -227,8 +227,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Create a new FrameRdd that is only a subset of the columns of this FrameRdd - * - * @param columnNames names to include + * + * @param columnNames names to include * @return the FrameRdd with only some columns */ def selectColumns(columnNames: List[String]): FrameRdd = { @@ -240,8 +240,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Select a subset of columns while renaming them - * - * @param columnNamesWithRename map of old names to new names + * + * @param columnNamesWithRename map of old names to new names * @return the new FrameRdd */ def selectColumnsWithRename(columnNamesWithRename: Map[String, String]): FrameRdd = { @@ -346,8 +346,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Sort by one or more columns - * - * @param columnNamesAndAscending column names to sort by, true for ascending, false for descending + * + * @param columnNamesAndAscending column names to sort by, true for ascending, false for descending * @return the sorted Frame */ def sortByColumns(columnNamesAndAscending: List[(String, Boolean)]): FrameRdd = { @@ -401,8 +401,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Save this RDD to disk or other store - * - * @param absolutePath location to store + * + * @param absolutePath location to store * @param storageFormat "file/parquet", "file/sequence", etc. */ def save(absolutePath: String, storageFormat: String = "file/parquet"): Unit = { @@ -467,17 +467,17 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) //TODO: update doc /** - * Export to a file in CSV format - * @param filename file path where to store the file - * @param separator - * @param count - * @param offset - * @return - */ - def exportToHdfsCsv(filename: String, - separator: Char, - count: Int, - offset: Int) = { + * Export to a file in CSV format + * @param filename file path where to store the file + * @param separator + * @param count + * @param offset + * @return + */ + def exportToHdfsCsv(filename: String, + separator: Char, + count: Int, + offset: Int) = { val filterRdd = if (count > 0) MiscFrameFunctions.getPagedRdd(this, offset, count, -1) else this val headers = this.frameSchema.columnNames.mkString(separator.toString) @@ -516,8 +516,8 @@ object FrameRdd { /** * converts a data frame to frame rdd - * - * @param rdd a data frame + * + * @param rdd a data frame * @return a frame rdd */ def toFrameRdd(rdd: DataFrame): FrameRdd = { @@ -564,8 +564,8 @@ object FrameRdd { /** * Convert an RDD of mixed vertex types into a map where the keys are labels and values are FrameRdd's - * - * @param gbVertexRDD Graphbuilder Vertex RDD + * + * @param gbVertexRDD Graphbuilder Vertex RDD * @return keys are labels and values are FrameRdd's */ def toFrameRddMap(gbVertexRDD: RDD[GBVertex]): Map[String, FrameRdd] = { @@ -595,8 +595,8 @@ object FrameRdd { /** * Convert an RDD of mixed vertex types into a map where the keys are labels and values are FrameRdd's - * - * @param gbEdgeRDD Graphbuilder Edge RDD + * + * @param gbEdgeRDD Graphbuilder Edge RDD * @param gbVertexRDD Graphbuilder Vertex RDD * @return keys are labels and values are FrameRdd's */ @@ -617,8 +617,8 @@ object FrameRdd { /** * Converts row object from an RDD[Array[Any]] to an RDD[Product] so that it can be used to create a SchemaRDD - * - * @return RDD[org.apache.spark.sql.Row] with values equal to row object + * + * @return RDD[org.apache.spark.sql.Row] with values equal to row object */ def toRowRDD(schema: Schema, rows: RDD[Array[Any]]): RDD[org.apache.spark.sql.Row] = { val rowRDD: RDD[org.apache.spark.sql.Row] = rows.map(row => { @@ -640,8 +640,8 @@ object FrameRdd { /** * Converts row object to RDD[IndexedRow] needed to create an IndexedRowMatrix - * - * @param indexedRows Rows of the frame as RDD[Row] + * + * @param indexedRows Rows of the frame as RDD[Row] * @param frameSchema Schema of the frame * @param featureColumnNames List of the frame's column(s) to be used * @return RDD[IndexedRow] @@ -658,8 +658,8 @@ object FrameRdd { /** * Converts row object to RDD[IndexedRow] needed to create an IndexedRowMatrix - * - * @param indexedRows Rows of the frame as RDD[Row] + * + * @param indexedRows Rows of the frame as RDD[Row] * @param frameSchema Schema of the frame * @param featureColumnNames List of the frame's column(s) to be used * @param meanVector Vector storing the means of the columns @@ -683,8 +683,8 @@ object FrameRdd { /** * Converts the schema object to a StructType for use in creating a SchemaRDD - * - * @return StructType with StructFields corresponding to the columns of the schema object + * + * @return StructType with StructFields corresponding to the columns of the schema object */ def schemaToStructType(schema: Schema): StructType = { val fields: Seq[StructField] = schema.columns.map { @@ -705,8 +705,8 @@ object FrameRdd { /** * Converts the spark DataTypes to our schema Datatypes - * - * @return our schema DataType + * + * @return our schema DataType */ def sparkDataTypeToSchemaDataType(dataType: org.apache.spark.sql.types.DataType): org.trustedanalytics.atk.domain.schema.DataTypes.DataType = { val intType = IntegerType.getClass @@ -740,8 +740,8 @@ object FrameRdd { /** * Converts a spark dataType (as string)to our schema Datatype - * - * @param sparkDataType spark data type + * + * @param sparkDataType spark data type * @return a DataType */ def sparkDataTypeToSchemaDataType(sparkDataType: String): DataType = { @@ -761,8 +761,8 @@ object FrameRdd { /** * Converts the schema object to a StructType for use in creating a SchemaRDD - * - * @return StructType with StructFields corresponding to the columns of the schema object + * + * @return StructType with StructFields corresponding to the columns of the schema object */ def schemaToAvroType(schema: Schema): List[(String, String)] = { val fields = schema.columns.map { From 7ebaec1b3896164b5c26a3c601395f63add1a4e9 Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Tue, 22 Mar 2016 16:47:48 -0700 Subject: [PATCH 3/8] Export Parquet graph to HDFS in CSV format #Please enter the commit message for your changes. Lines starting --- .../export/csv/ExportGraphCsvPlugin.scala | 68 ++++++++++--------- 1 file changed, 35 insertions(+), 33 deletions(-) diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala index 276f5e0479..2a07f53b6a 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala @@ -1,44 +1,46 @@ /** - * Copyright (c) 2015 Intel Corporation  + * Copyright (c) 2015 Intel Corporation  * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * *       http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.trustedanalytics.atk.plugins.export.csv -import org.apache.commons.csv.{ CSVFormat, CSVPrinter } -import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path -import org.apache.spark.frame.FrameRdd import org.trustedanalytics.atk.domain.datacatalog.ExportMetadata -import org.trustedanalytics.atk.domain.frame.{ FrameEntity, ExportHdfsCsvArgs } -import org.trustedanalytics.atk.domain.graph.SeamlessGraphMeta +import org.trustedanalytics.atk.domain.frame.FrameEntity import org.trustedanalytics.atk.engine.FileStorage -import org.trustedanalytics.atk.engine.frame.{ MiscFrameFunctions, SparkFrame } +import org.trustedanalytics.atk.engine.frame.SparkFrame import org.trustedanalytics.atk.engine.graph.SparkGraph import org.trustedanalytics.atk.engine.plugin.{ Invocation, PluginDoc, SparkCommandPlugin } -import scala.collection.mutable.ArrayBuffer +//Implicits needed for JSON conversion +//import org.trustedanalytics.atk.domain.datacatalog.DataCatalogRestResponseJsonProtocol._ +import org.trustedanalytics.atk.domain.DomainJsonProtocol._ -// Implicits needed for JSON conversion +object ExportGraphCsvJsonFormat { + implicit val exportGraphCsvArgsFormat = jsonFormat5(ExportGraphCsvArgs) + implicit val graphExportMetadataFormat = jsonFormat1(GraphExportMetadata) +} + +import org.trustedanalytics.atk.plugins.export.csv.ExportGraphCsvJsonFormat._ /** - * Export a frame to csv file + * Export a graph to csv format */ -// TODO:update the doc -@PluginDoc(oneLine = "Write current frame to HDFS in csv format.", - extended = "Export the frame to a file in csv format as a Hadoop file.", - returns = "dict with .....") +@PluginDoc(oneLine = "Write current Parquet graph to HDFS in csv format.", + extended = "Export the graph to some Hadoop files for the vertices and the edges in csv format.", + returns = "A dictionary with the graph vertices and edges csv files.") class ExportGraphCsvPlugin extends SparkCommandPlugin[ExportGraphCsvArgs, GraphExportMetadata] { /** @@ -51,7 +53,7 @@ class ExportGraphCsvPlugin extends SparkCommandPlugin[ExportGraphCsvArgs, GraphE * * @param invocation information about the user and the circumstances at the time of the call, as well as a function * that can be called to produce a SparkContext that can be used during this invocation - * @param arguments input specification for covariance + * @param arguments input specification for covariance * @return value of type declared as the Return type */ override def execute(arguments: ExportGraphCsvArgs)(implicit invocation: Invocation): GraphExportMetadata = { @@ -59,33 +61,33 @@ class ExportGraphCsvPlugin extends SparkCommandPlugin[ExportGraphCsvArgs, GraphE val fileStorage = new FileStorage require(!fileStorage.exists(new Path(arguments.folderName)), "File or Directory already exists") - //get the graph meta data - val graph = arguments.graph - - // TODO: get the list of frame (of the graph ) from the meta data + //Get the graph meta data + val graph: SparkGraph = arguments.graph + // Get the list of the graph from the meta data val graphMeta = engine.graphs.expectSeamless(graph) val vertexFrames = graphMeta.vertexFrames.map(_.toReference) - //TODO: export each frame to CSV file in HDFS + //Export the graph vertices and edges to CSV file in HDFS, in two subdirectories inside the given directory name val vertexFolderName = arguments.folderName + "/vertices" val edgeFolderName = arguments.folderName + "/edges" val vertexMetadata = exportFramesToCsv(vertexFolderName, graphMeta.vertexFrames, arguments, fileStorage) val edgeMetadata = exportFramesToCsv(edgeFolderName, graphMeta.edgeFrames, arguments, fileStorage) - GraphExportMetadata(vertexMetadata ++ edgeMetadata) + GraphExportMetadata(vertexMetadata.map(_.targetUri) ++ edgeMetadata.map(_.targetUri)) } - // TODO: add scala doc + // Method for exporting the graph vertices/edges to CSV files in HDFS. def exportFramesToCsv(folderName: String, frameEntities: List[FrameEntity], arguments: ExportGraphCsvArgs, fileStorage: FileStorage)(implicit invocation: Invocation): List[ExportMetadata] = { val frames = frameEntities.map(_.toReference) val metadata = frames.map(frame => { // load frame as RDD val sparkFrame: SparkFrame = frame - val sample = sparkFrame.rdd.exportToHdfsCsv(arguments.folderName, arguments.separator.charAt(0), arguments.count, arguments.offset) + val subFolderName = s"${folderName}/${sparkFrame.label.getOrElse(sparkFrame.frameId)}" + val sample = sparkFrame.rdd.exportToHdfsCsv(subFolderName, arguments.separator.charAt(0), arguments.count, arguments.offset) - val artifactPath = new Path(s"${fileStorage.hdfs.getHomeDirectory()}/${folderName}/${sparkFrame.label.getOrElse(sparkFrame.frameId)}") + // Create the vertices and edges folders paths in HDFS + val artifactPath = new Path(s"${fileStorage.hdfs.getHomeDirectory()}/${subFolderName}") ExportMetadata(artifactPath.toString, "all", "csv", sparkFrame.rowCount, sample, fileStorage.size(artifactPath.toString), Some(folderName)) - }) metadata } From abb80f4970b8fde275be164b3d93499980a7e20e Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Tue, 22 Mar 2016 16:55:54 -0700 Subject: [PATCH 4/8] Export Parquet graph to HDFS in CSV format --- .../src/main/resources/python/graph/ExportGraphCsv.rst | 2 +- .../atk/plugins/export/csv/ExportGraphCsvArgs.scala | 5 +---- .../atk/plugins/export/csv/GraphExportMetadata.scala | 8 ++++++-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/doc-api-examples/src/main/resources/python/graph/ExportGraphCsv.rst b/doc-api-examples/src/main/resources/python/graph/ExportGraphCsv.rst index b1ef4c17ee..e4034159c1 100644 --- a/doc-api-examples/src/main/resources/python/graph/ExportGraphCsv.rst +++ b/doc-api-examples/src/main/resources/python/graph/ExportGraphCsv.rst @@ -25,7 +25,7 @@ >>> graph.edges['edges'].add_edges(edge_frame, 'source', 'dest', ['weight']) ->>> result = graph.ExportGraphCsvPlugin() +>>> result = graph.export_to_csv("graphCsv") >>> result['source'].inspect() [#] _vid _label source label connectedComponentId diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvArgs.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvArgs.scala index f0ca6db6db..7c64cccd36 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvArgs.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvArgs.scala @@ -17,17 +17,14 @@ package org.trustedanalytics.atk.plugins.export.csv import org.apache.commons.lang3.StringUtils -import org.trustedanalytics.atk.domain.frame.FrameReference import org.trustedanalytics.atk.domain.graph.GraphReference import org.trustedanalytics.atk.engine.plugin.ArgDoc /** * Input arguments class for export to CSV */ -//TODO: update doc - case class ExportGraphCsvArgs(graph: GraphReference, - @ArgDoc("""The HDFS folder path where the files + @ArgDoc("""The HDFS folder path where the graph vertices and edges subfolders will be created.""") folderName: String, @ArgDoc("""The separator for separating the values. Default is comma (,).""") separator: String = ",", diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala index ef5c02c3ff..877ed8c5dd 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala @@ -1,9 +1,13 @@ package org.trustedanalytics.atk.plugins.export.csv import org.trustedanalytics.atk.domain.datacatalog.ExportMetadata -//TODO: add scala doc + +/** + * + * Metadata class being returned by ExportGraphCsvPlugin. + */ /** * * @param metadata */ -case class GraphExportMetadata(metadata: List[ExportMetadata]) +case class GraphExportMetadata(metadata: List[String]) From e1468bf603bc4820ea9e8e6d2bc71a98e3b67933 Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Tue, 22 Mar 2016 17:08:06 -0700 Subject: [PATCH 5/8] Export Parquet graph to hdfs in CSV format --- .../src/main/scala/org/apache/spark/frame/FrameRdd.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala b/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala index 482c2d6654..6b07ff7fe5 100644 --- a/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala +++ b/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala @@ -16,8 +16,7 @@ package org.apache.spark.frame - -import org.apache.commons.csv.{CSVFormat, CSVPrinter} +import org.apache.commons.csv.{ CSVFormat, CSVPrinter } import org.apache.commons.lang.StringUtils import org.apache.spark.mllib.stat.{ MultivariateStatisticalSummary, Statistics } import org.apache.spark.atk.graph.{ EdgeWrapper, VertexWrapper } @@ -472,8 +471,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) //TODO: update doc /** * Export to a file in CSV format - * - * @param filename file path where to store the file + * + * @param filename file path where to store the file * @param separator * @param count * @param offset From 3b9d25fd806dd30d3dbbb98b0feac1958476f790 Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Thu, 24 Mar 2016 13:49:20 -0700 Subject: [PATCH 6/8] Export graoh to CSV (reviewers comments addressed). --- .../resources/python/graph/ExportGraphCsv.rst | 2 +- .../export/csv/ExportGraphCsvPlugin.scala | 7 ++- .../export/csv/GraphExportMetadata.scala | 5 +-- .../org/apache/spark/frame/FrameRdd.scala | 19 +++----- .../atk/engine/frame/Row.scala | 44 ++++++++++++++++++- .../atk/engine/frame/RowWrapperTest.scala | 35 +++++++++++++++ 6 files changed, 88 insertions(+), 24 deletions(-) create mode 100644 engine/engine-core/src/test/scala/org/trustedanalytics/atk/engine/frame/RowWrapperTest.scala diff --git a/doc-api-examples/src/main/resources/python/graph/ExportGraphCsv.rst b/doc-api-examples/src/main/resources/python/graph/ExportGraphCsv.rst index e4034159c1..dd9cc37c87 100644 --- a/doc-api-examples/src/main/resources/python/graph/ExportGraphCsv.rst +++ b/doc-api-examples/src/main/resources/python/graph/ExportGraphCsv.rst @@ -25,7 +25,7 @@ >>> graph.edges['edges'].add_edges(edge_frame, 'source', 'dest', ['weight']) ->>> result = graph.export_to_csv("graphCsv") +>>> result = graph.export_to_csv("csvFolderName") >>> result['source'].inspect() [#] _vid _label source label connectedComponentId diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala index 2a07f53b6a..8d28680bc5 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala @@ -39,8 +39,8 @@ import org.trustedanalytics.atk.plugins.export.csv.ExportGraphCsvJsonFormat._ * Export a graph to csv format */ @PluginDoc(oneLine = "Write current Parquet graph to HDFS in csv format.", - extended = "Export the graph to some Hadoop files for the vertices and the edges in csv format.", - returns = "A dictionary with the graph vertices and edges csv files.") + extended = "Export the graph to multiple Hadoop files for the vertices and the edges in csv format.", + returns = "A dictionary with the location of the graph vertices and edges csv files.") class ExportGraphCsvPlugin extends SparkCommandPlugin[ExportGraphCsvArgs, GraphExportMetadata] { /** @@ -63,9 +63,9 @@ class ExportGraphCsvPlugin extends SparkCommandPlugin[ExportGraphCsvArgs, GraphE //Get the graph meta data val graph: SparkGraph = arguments.graph + // Get the list of the graph from the meta data val graphMeta = engine.graphs.expectSeamless(graph) - val vertexFrames = graphMeta.vertexFrames.map(_.toReference) //Export the graph vertices and edges to CSV file in HDFS, in two subdirectories inside the given directory name val vertexFolderName = arguments.folderName + "/vertices" @@ -91,5 +91,4 @@ class ExportGraphCsvPlugin extends SparkCommandPlugin[ExportGraphCsvArgs, GraphE }) metadata } - } diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala index 877ed8c5dd..38886f0904 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala @@ -2,12 +2,9 @@ package org.trustedanalytics.atk.plugins.export.csv import org.trustedanalytics.atk.domain.datacatalog.ExportMetadata +//TODO: the JSON serialization issue needs to be fixed such that we can return List[ExportMetadata] instead of List[String] /** - * * Metadata class being returned by ExportGraphCsvPlugin. - */ -/** - * * @param metadata */ case class GraphExportMetadata(metadata: List[String]) diff --git a/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala b/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala index 6b07ff7fe5..d28bc907f4 100644 --- a/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala +++ b/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala @@ -16,6 +16,8 @@ package org.apache.spark.frame +import java.lang.StringBuilder + import org.apache.commons.csv.{ CSVFormat, CSVPrinter } import org.apache.commons.lang.StringUtils import org.apache.spark.mllib.stat.{ MultivariateStatisticalSummary, Statistics } @@ -468,9 +470,8 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) new FrameRdd(frameSchema.addColumn(column), rows) } - //TODO: update doc /** - * Export to a file in CSV format + * Export graph to multiple files in CSV format * * @param filename file path where to store the file * @param separator @@ -486,19 +487,9 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) val filterRdd = if (count > 0) MiscFrameFunctions.getPagedRdd(this, offset, count, -1) else this val headers = this.frameSchema.columnNames.mkString(separator.toString) val csvFormat = CSVFormat.RFC4180.withDelimiter(separator) - + val rowWrapper = new RowWrapper(frameSchema) val csvRdd = filterRdd.map(row => { - val stringBuilder = new java.lang.StringBuilder - val printer = new CSVPrinter(stringBuilder, csvFormat) - val array = row.toSeq.map(col => - col match { - case null => "" - case arr: ArrayBuffer[_] => arr.mkString(",") - case seq: Seq[_] => seq.mkString(",") - case x => x.toString - }) - for (i <- array) printer.print(i) - stringBuilder.toString + rowWrapper(row).exportRowToCsv(csvFormat) }) val dataSample = if (csvRdd.isEmpty()) StringUtils.EMPTY else csvRdd.first() diff --git a/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/frame/Row.scala b/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/frame/Row.scala index 8b1bec7cf1..fe4c025e23 100644 --- a/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/frame/Row.scala +++ b/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/frame/Row.scala @@ -16,6 +16,7 @@ package org.trustedanalytics.atk.engine.frame +import org.apache.commons.csv.{ CSVPrinter, CSVFormat } import org.apache.spark.frame.FrameRdd import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.GenericArrayData @@ -26,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow import org.trustedanalytics.atk.graphbuilder.elements.GBVertex import scala.collection.mutable.ArrayBuffer - +import java.lang.StringBuilder /** * This class wraps raw row data adding schema information - this allows for a richer easier to use API. * @@ -44,6 +45,7 @@ class RowWrapper(override val schema: Schema) extends AbstractRow with Serializa /** * Set the data in this wrapper + * * @param row the data to set inside this Wrapper * @return this instance */ @@ -68,6 +70,7 @@ trait AbstractRow { /** * Determine whether the property exists + * * @param name name of the property * @return boolean value indicating whether the property exists */ @@ -83,6 +86,7 @@ trait AbstractRow { /** * Get property + * * @param columnName name of the property * @return property value */ @@ -90,6 +94,7 @@ trait AbstractRow { /** * Get more than one value as a List + * * @param columnNames the columns to get values for * @return the values for the columns */ @@ -99,6 +104,7 @@ trait AbstractRow { /** * Get property of boolean data type + * * @param columnName name of the property * @return property value */ @@ -106,6 +112,7 @@ trait AbstractRow { /** * Get property of integer data type + * * @param columnName name of the property * @return property value */ @@ -113,6 +120,7 @@ trait AbstractRow { /** * Get property of long data type + * * @param columnName name of the property * @return property value */ @@ -120,6 +128,7 @@ trait AbstractRow { /** * Get property of float data type + * * @param columnName name of the property * @return property value */ @@ -127,6 +136,7 @@ trait AbstractRow { /** * Get property of double data type + * * @param columnName name of the property * @return property value */ @@ -134,6 +144,7 @@ trait AbstractRow { /** * Get property of string data type + * * @param columnName name of the property * @return property value */ @@ -141,6 +152,7 @@ trait AbstractRow { /** * Get property of string data type + * * @param columnName name of the property * @return property value */ @@ -160,6 +172,7 @@ trait AbstractRow { /** * Set a value in a column - validates the supplied value is the correct type + * * @param name the name of the column to set * @param value the value of the column */ @@ -170,6 +183,7 @@ trait AbstractRow { /** * Set all of the values for an entire row with validation + * * @param values the values to set * @return the row */ @@ -180,6 +194,7 @@ trait AbstractRow { /** * Validate the supplied value matches the schema for the supplied columnName. + * * @param name column name * @param value the value to check */ @@ -197,6 +212,7 @@ trait AbstractRow { /** * Set the value in a column - don't validate the type + * * @param name the name of the column to set * @param value the value of the column */ @@ -211,6 +227,7 @@ trait AbstractRow { /** * Set all of the values for a row - don't validate type + * * @param values all of the values * @return the row */ @@ -266,6 +283,7 @@ trait AbstractRow { /** * Get underlying data for this row + * * @return the actual row */ def data: Row = row @@ -280,6 +298,7 @@ trait AbstractRow { /** * Select several property values from their names + * * @param names the names of the properties to put into an array * @param flattenInputs If true, flatten vector data types * @return values for the supplied properties @@ -299,6 +318,7 @@ trait AbstractRow { /** * Select several property values from their names as an array of doubles + * * @param names the names of the properties to put into an array * @param flattenInputs If true, flatten vector data types * @return array of doubles with values for the supplied properties @@ -359,6 +379,7 @@ trait AbstractRow { /** * Create a row with values + * * @param content the values * @return the row */ @@ -372,4 +393,25 @@ trait AbstractRow { vertex.properties.foreach(prop => setValue(prop.key, prop.value)) row } + + /** + * Export row to CSV format + * + * @param csvFormat + * @return CSV record/row + */ + def exportRowToCsv(csvFormat: CSVFormat): String = { + val stringBuilder = new StringBuilder + val printer = new CSVPrinter(stringBuilder, csvFormat) + val array = row.toSeq.map(col => + col match { + case null => "" + case arr: ArrayBuffer[_] => arr.mkString(",") + case seq: Seq[_] => seq.mkString(",") + case x => x.toString + }) + for (i <- array) printer.print(i) + stringBuilder.toString + } + } diff --git a/engine/engine-core/src/test/scala/org/trustedanalytics/atk/engine/frame/RowWrapperTest.scala b/engine/engine-core/src/test/scala/org/trustedanalytics/atk/engine/frame/RowWrapperTest.scala new file mode 100644 index 0000000000..bc6367dd99 --- /dev/null +++ b/engine/engine-core/src/test/scala/org/trustedanalytics/atk/engine/frame/RowWrapperTest.scala @@ -0,0 +1,35 @@ +package org.trustedanalytics.atk.engine.frame + +import org.apache.commons.csv.CSVFormat +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.GenericRow +import org.scalatest.{ Matchers, WordSpec } +import org.trustedanalytics.atk.domain.schema.{ DataTypes, Column, FrameSchema } +/** + * Created by wtaie on 3/23/16. + */ +class RowWrapperTest extends WordSpec with Matchers { + + val inputRows: Array[Row] = Array( + new GenericRow(Array[Any]("a", 1, 1d, "w")), + new GenericRow(Array[Any]("c", 5, 1d, "5"))) + + val inputSchema = FrameSchema(List( + Column("col_0", DataTypes.string), + Column("col_1", DataTypes.int32), + Column("col_2", DataTypes.float64), + Column("col_3", DataTypes.string) + )) + "export row to CSV" should { + "convert input rows to string" in { + val csvFormat = CSVFormat.RFC4180.withDelimiter(',') + val rowWrapper1 = new RowWrapper(inputSchema) + val csvRecords = rowWrapper1(inputRows(1)).exportRowToCsv(csvFormat) + csvRecords shouldEqual ("c,5,1.0,5") + } + "throw an IllegalArgumentException if input rows were empty" in { + inputRows should not be empty + } + } + +} From 82e2d0df0f9b522c78afc27310ba3e7cd1560500 Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Wed, 6 Apr 2016 16:52:08 -0700 Subject: [PATCH 7/8] skipped the document integration test --- integration-tests/tests/gendoct.py | 1 + 1 file changed, 1 insertion(+) diff --git a/integration-tests/tests/gendoct.py b/integration-tests/tests/gendoct.py index c3e30a8b9b..8a34f3e0b3 100644 --- a/integration-tests/tests/gendoct.py +++ b/integration-tests/tests/gendoct.py @@ -96,6 +96,7 @@ def _trim_test_path(path): graph/graphx_connected_components.rst graph/graphx_triangle_count.rst graph/graphx_pagerank.rst +graph/ExportGraphCsv.rst frame/column_median.rst frame/column_mode.rst frame/column_summary_statistics.rst From d36d79b3ffd4727b8c88918334405c80f1ede7ef Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Sat, 23 Apr 2016 01:18:58 -0700 Subject: [PATCH 8/8] Export to CSV,license check passed --- .../export/csv/ExportGraphCsvPlugin.scala | 19 +++++++++---------- .../export/csv/GraphExportMetadata.scala | 15 +++++++++++++++ .../atk/engine/frame/RowWrapperTest.scala | 15 +++++++++++++++ 3 files changed, 39 insertions(+), 10 deletions(-) diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala index 8d28680bc5..b251f4b650 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala @@ -1,19 +1,18 @@ /** - * Copyright (c) 2015 Intel Corporation  + * Copyright (c) 2015 Intel Corporation  * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * *       http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ - package org.trustedanalytics.atk.plugins.export.csv import org.apache.hadoop.fs.Path diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala index 38886f0904..5ef7d8c290 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala @@ -1,3 +1,18 @@ +/** + * Copyright (c) 2015 Intel Corporation  + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *       http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.trustedanalytics.atk.plugins.export.csv import org.trustedanalytics.atk.domain.datacatalog.ExportMetadata diff --git a/engine/engine-core/src/test/scala/org/trustedanalytics/atk/engine/frame/RowWrapperTest.scala b/engine/engine-core/src/test/scala/org/trustedanalytics/atk/engine/frame/RowWrapperTest.scala index bc6367dd99..4b3ecd37ed 100644 --- a/engine/engine-core/src/test/scala/org/trustedanalytics/atk/engine/frame/RowWrapperTest.scala +++ b/engine/engine-core/src/test/scala/org/trustedanalytics/atk/engine/frame/RowWrapperTest.scala @@ -1,3 +1,18 @@ +/** + * Copyright (c) 2015 Intel Corporation  + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *       http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.trustedanalytics.atk.engine.frame import org.apache.commons.csv.CSVFormat