diff --git a/doc-api-examples/src/main/resources/python/graph/ExportGraphCsv.rst b/doc-api-examples/src/main/resources/python/graph/ExportGraphCsv.rst new file mode 100644 index 0000000000..dd9cc37c87 --- /dev/null +++ b/doc-api-examples/src/main/resources/python/graph/ExportGraphCsv.rst @@ -0,0 +1,46 @@ + +>>> import trustedanalytics as ta + +>>> ta.connect() +-etc- + +>>> vertex_schema = [('source', ta.int32), ('label', ta.float32)] +>>> edge_schema = [('source', ta.int32), ('dest', ta.int32), ('weight', ta.int32)] + +>>> vertex_rows = [ [1, 1], [2, 1], [3, 5], [4, 5], [5, 5] ] +>>> edge_rows = [ [1, 2, 1], [1, 3, 1], [2, 3, 1], [1, 4, 1], [4, 5, 1] ] +>>> vertex_frame = ta.Frame(ta.UploadRows (vertex_rows, vertex_schema)) + +>>> edge_frame = ta.Frame(ta.UploadRows (edge_rows, edge_schema)) +-etc- + + +>>> graph = ta.Graph() + +>>> graph.define_vertex_type('source') + +>>> graph.vertices['source'].add_vertices(vertex_frame, 'source', 'label') + +>>> graph.define_edge_type('edges','source', 'source', directed=False) + +>>> graph.edges['edges'].add_edges(edge_frame, 'source', 'dest', ['weight']) + +>>> result = graph.export_to_csv("csvFolderName") + +>>> result['source'].inspect() + [#] _vid _label source label connectedComponentId + ====================================================== + [0] 5 source 5 5.0 1 + [1] 1 source 1 1.0 1 + [2] 2 source 2 1.0 1 + [3] 3 source 3 5.0 1 + [4] 4 source 4 5.0 1 + +>>> graph.edges['edges'].inspect() + [#] _eid _src_vid _dest_vid _label weight + ============================================== + [0] 6 1 2 edges 1 + [1] 7 1 3 edges 1 + [2] 9 1 4 edges 1 + [3] 8 2 3 edges 1 + [4] 10 4 5 edges 1 diff --git a/engine-plugins/frame-plugins/src/main/scala/org/trustedanalytics/atk/engine/frame/plugins/exporthdfs/ExportHdfsCsvPlugin.scala b/engine-plugins/frame-plugins/src/main/scala/org/trustedanalytics/atk/engine/frame/plugins/exporthdfs/ExportHdfsCsvPlugin.scala index 7f74d17148..86b758f603 100644 --- a/engine-plugins/frame-plugins/src/main/scala/org/trustedanalytics/atk/engine/frame/plugins/exporthdfs/ExportHdfsCsvPlugin.scala +++ b/engine-plugins/frame-plugins/src/main/scala/org/trustedanalytics/atk/engine/frame/plugins/exporthdfs/ExportHdfsCsvPlugin.scala @@ -69,47 +69,11 @@ class ExportHdfsCsvPlugin extends SparkCommandPlugin[ExportHdfsCsvArgs, ExportMe require(!fileStorage.exists(new Path(arguments.folderName)), "File or Directory already exists") val frame: SparkFrame = arguments.frame // load frame as RDD - val sample = exportToHdfsCsv(frame.rdd, arguments.folderName, arguments.separator.charAt(0), arguments.count, arguments.offset) + val sample = frame.rdd.exportToHdfsCsv(arguments.folderName, arguments.separator.charAt(0), arguments.count, arguments.offset) val artifactPath = new Path(s"${fileStorage.hdfs.getHomeDirectory()}/${arguments.folderName}") ExportMetadata(artifactPath.toString, "all", "csv", frame.rowCount, sample, fileStorage.size(artifactPath.toString), Some(arguments.folderName)) } - /** - * Export to a file in CSV format - * - * @param frameRdd input rdd containing all columns - * @param filename file path where to store the file - */ - private def exportToHdfsCsv( - frameRdd: FrameRdd, - filename: String, - separator: Char, - count: Int, - offset: Int) = { - - val filterRdd = if (count > 0) MiscFrameFunctions.getPagedRdd(frameRdd, offset, count, -1) else frameRdd - val headers = frameRdd.frameSchema.columnNames.mkString(separator.toString) - val csvFormat = CSVFormat.RFC4180.withDelimiter(separator) - - val csvRdd = filterRdd.map(row => { - val stringBuilder = new java.lang.StringBuilder - val printer = new CSVPrinter(stringBuilder, csvFormat) - val array = row.toSeq.map(col => - col match { - case null => "" - case arr: ArrayBuffer[_] => arr.mkString(",") - case seq: Seq[_] => seq.mkString(",") - case x => x.toString - }) - for (i <- array) printer.print(i) - stringBuilder.toString - }) - - val dataSample = if (csvRdd.isEmpty()) StringUtils.EMPTY else csvRdd.first() - val addHeaders = frameRdd.sparkContext.parallelize(List(headers)) ++ csvRdd - addHeaders.saveAsTextFile(filename) - dataSample - } } diff --git a/engine-plugins/graph-plugins/src/main/resources/atk-plugin.conf b/engine-plugins/graph-plugins/src/main/resources/atk-plugin.conf index cdad64f112..be93db94e9 100644 --- a/engine-plugins/graph-plugins/src/main/resources/atk-plugin.conf +++ b/engine-plugins/graph-plugins/src/main/resources/atk-plugin.conf @@ -24,6 +24,7 @@ atk.plugin { "org.trustedanalytics.atk.plugins.connectedcomponents.ConnectedComponentsPlugin" "org.trustedanalytics.atk.plugins.trianglecount.TriangleCountPlugin" "org.trustedanalytics.atk.plugins.RenameGraphPlugin" + "org.trustedanalytics.atk.plugins.export.csv.ExportGraphCsvPlugin" ] } \ No newline at end of file diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvArgs.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvArgs.scala new file mode 100644 index 0000000000..7c64cccd36 --- /dev/null +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvArgs.scala @@ -0,0 +1,39 @@ +/** + * Copyright (c) 2015 Intel Corporation  + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *       http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.trustedanalytics.atk.plugins.export.csv + +import org.apache.commons.lang3.StringUtils +import org.trustedanalytics.atk.domain.graph.GraphReference +import org.trustedanalytics.atk.engine.plugin.ArgDoc + +/** + * Input arguments class for export to CSV + */ +case class ExportGraphCsvArgs(graph: GraphReference, + @ArgDoc("""The HDFS folder path where the graph vertices and edges subfolders +will be created.""") folderName: String, + @ArgDoc("""The separator for separating the values. +Default is comma (,).""") separator: String = ",", + @ArgDoc("""The number of records you want. +Default, or a non-positive value, is the whole frame.""") count: Int = -1, + @ArgDoc("""The number of rows to skip before exporting to the file. +Default is zero (0).""") offset: Int = 0) { + require(graph != null, "graph is required") + require(folderName != null, "folder name is required") + require(StringUtils.isNotBlank(separator) && separator.length == 1, "A single character separator is required") +} + diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala new file mode 100644 index 0000000000..b251f4b650 --- /dev/null +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/ExportGraphCsvPlugin.scala @@ -0,0 +1,93 @@ +/** + * Copyright (c) 2015 Intel Corporation  + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *       http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.trustedanalytics.atk.plugins.export.csv + +import org.apache.hadoop.fs.Path +import org.trustedanalytics.atk.domain.datacatalog.ExportMetadata +import org.trustedanalytics.atk.domain.frame.FrameEntity +import org.trustedanalytics.atk.engine.FileStorage +import org.trustedanalytics.atk.engine.frame.SparkFrame +import org.trustedanalytics.atk.engine.graph.SparkGraph +import org.trustedanalytics.atk.engine.plugin.{ Invocation, PluginDoc, SparkCommandPlugin } + +//Implicits needed for JSON conversion +//import org.trustedanalytics.atk.domain.datacatalog.DataCatalogRestResponseJsonProtocol._ +import org.trustedanalytics.atk.domain.DomainJsonProtocol._ + +object ExportGraphCsvJsonFormat { + implicit val exportGraphCsvArgsFormat = jsonFormat5(ExportGraphCsvArgs) + implicit val graphExportMetadataFormat = jsonFormat1(GraphExportMetadata) +} + +import org.trustedanalytics.atk.plugins.export.csv.ExportGraphCsvJsonFormat._ + +/** + * Export a graph to csv format + */ +@PluginDoc(oneLine = "Write current Parquet graph to HDFS in csv format.", + extended = "Export the graph to multiple Hadoop files for the vertices and the edges in csv format.", + returns = "A dictionary with the location of the graph vertices and edges csv files.") +class ExportGraphCsvPlugin extends SparkCommandPlugin[ExportGraphCsvArgs, GraphExportMetadata] { + + /** + * The name of the command + */ + override def name: String = "graph:/export_to_csv" + + /** + * Calculate covariance for the specified columns + * + * @param invocation information about the user and the circumstances at the time of the call, as well as a function + * that can be called to produce a SparkContext that can be used during this invocation + * @param arguments input specification for covariance + * @return value of type declared as the Return type + */ + override def execute(arguments: ExportGraphCsvArgs)(implicit invocation: Invocation): GraphExportMetadata = { + + val fileStorage = new FileStorage + require(!fileStorage.exists(new Path(arguments.folderName)), "File or Directory already exists") + + //Get the graph meta data + val graph: SparkGraph = arguments.graph + + // Get the list of the graph from the meta data + val graphMeta = engine.graphs.expectSeamless(graph) + + //Export the graph vertices and edges to CSV file in HDFS, in two subdirectories inside the given directory name + val vertexFolderName = arguments.folderName + "/vertices" + val edgeFolderName = arguments.folderName + "/edges" + val vertexMetadata = exportFramesToCsv(vertexFolderName, graphMeta.vertexFrames, arguments, fileStorage) + val edgeMetadata = exportFramesToCsv(edgeFolderName, graphMeta.edgeFrames, arguments, fileStorage) + GraphExportMetadata(vertexMetadata.map(_.targetUri) ++ edgeMetadata.map(_.targetUri)) + } + + // Method for exporting the graph vertices/edges to CSV files in HDFS. + def exportFramesToCsv(folderName: String, frameEntities: List[FrameEntity], arguments: ExportGraphCsvArgs, fileStorage: FileStorage)(implicit invocation: Invocation): List[ExportMetadata] = { + val frames = frameEntities.map(_.toReference) + val metadata = frames.map(frame => { + // load frame as RDD + val sparkFrame: SparkFrame = frame + val subFolderName = s"${folderName}/${sparkFrame.label.getOrElse(sparkFrame.frameId)}" + val sample = sparkFrame.rdd.exportToHdfsCsv(subFolderName, arguments.separator.charAt(0), arguments.count, arguments.offset) + + // Create the vertices and edges folders paths in HDFS + val artifactPath = new Path(s"${fileStorage.hdfs.getHomeDirectory()}/${subFolderName}") + ExportMetadata(artifactPath.toString, "all", "csv", sparkFrame.rowCount, sample, + fileStorage.size(artifactPath.toString), Some(folderName)) + }) + metadata + } +} diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala new file mode 100644 index 0000000000..5ef7d8c290 --- /dev/null +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/export/csv/GraphExportMetadata.scala @@ -0,0 +1,25 @@ +/** + * Copyright (c) 2015 Intel Corporation  + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *       http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.trustedanalytics.atk.plugins.export.csv + +import org.trustedanalytics.atk.domain.datacatalog.ExportMetadata + +//TODO: the JSON serialization issue needs to be fixed such that we can return List[ExportMetadata] instead of List[String] +/** + * Metadata class being returned by ExportGraphCsvPlugin. + * @param metadata + */ +case class GraphExportMetadata(metadata: List[String]) diff --git a/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala b/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala index 61ee0047d1..d28bc907f4 100644 --- a/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala +++ b/engine/engine-core/src/main/scala/org/apache/spark/frame/FrameRdd.scala @@ -16,9 +16,10 @@ package org.apache.spark.frame -import breeze.linalg.DenseVector -import breeze.linalg.DenseVector -import org.apache.spark.mllib.regression.LabeledPoint +import java.lang.StringBuilder + +import org.apache.commons.csv.{ CSVFormat, CSVPrinter } +import org.apache.commons.lang.StringUtils import org.apache.spark.mllib.stat.{ MultivariateStatisticalSummary, Statistics } import org.apache.spark.atk.graph.{ EdgeWrapper, VertexWrapper } import org.apache.spark.frame.ordering.FrameOrderingUtils @@ -35,7 +36,7 @@ import org.trustedanalytics.atk.engine.frame.plugins.ScoreAndLabel import org.trustedanalytics.atk.engine.frame.{ MiscFrameFunctions, RowWrapper } import org.trustedanalytics.atk.engine.graph.plugins.{ VertexSchemaAggregator, EdgeSchemaAggregator, EdgeHolder } import org.trustedanalytics.atk.graphbuilder.elements.{ GBEdge, GBVertex } -import scala.collection.mutable.ListBuffer +import scala.collection.mutable.{ ArrayBuffer, ListBuffer } import scala.reflect.ClassTag /** @@ -82,6 +83,7 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Convert a FrameRdd to a Spark Dataframe + * * @return Dataframe representing the FrameRdd */ def toDataFrame: DataFrame = { @@ -109,6 +111,7 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Convert FrameRdd into RDD[Vector] format required by MLLib + * * @param featureColumnNames Names of the frame's column(s) to be used * @return RDD of (org.apache.spark.mllib)Vector */ @@ -122,6 +125,7 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Compute MLLib's MultivariateStatisticalSummary from FrameRdd + * * @param columnNames Names of the frame's column(s) whose column statistics are to be computed * @return MLLib's MultivariateStatisticalSummary */ @@ -132,6 +136,7 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Convert FrameRdd to RDD[Vector] by mean centering the specified columns + * * @param featureColumnNames Names of the frame's column(s) to be used * @return RDD of (org.apache.spark.mllib)Vector */ @@ -145,6 +150,7 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Convert FrameRdd to RDD[Vector] + * * @param featureColumnNames Names of the frame's column(s) to be used * @param columnWeights The weights of the columns * @return RDD of (org.apache.spark.mllib)Vector @@ -162,6 +168,7 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Convert FrameRdd to RDD[(Long, Long, Double)] + * * @param sourceColumnName Name of the frame's column storing the source id of the edge * @param destinationColumnName Name of the frame's column storing the destination id of the edge * @param edgeSimilarityColumnName Name of the frame's column storing the similarity between the source and destination @@ -225,6 +232,7 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Create a new FrameRdd that is only a subset of the columns of this FrameRdd + * * @param columnNames names to include * @return the FrameRdd with only some columns */ @@ -237,6 +245,7 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Select a subset of columns while renaming them + * * @param columnNamesWithRename map of old names to new names * @return the new FrameRdd */ @@ -342,6 +351,7 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Sort by one or more columns + * * @param columnNamesAndAscending column names to sort by, true for ascending, false for descending * @return the sorted Frame */ @@ -396,6 +406,7 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) /** * Save this RDD to disk or other store + * * @param absolutePath location to store * @param storageFormat "file/parquet", "file/sequence", etc. */ @@ -459,6 +470,34 @@ class FrameRdd(val frameSchema: Schema, val prev: RDD[Row]) new FrameRdd(frameSchema.addColumn(column), rows) } + /** + * Export graph to multiple files in CSV format + * + * @param filename file path where to store the file + * @param separator + * @param count + * @param offset + * @return + */ + def exportToHdfsCsv(filename: String, + separator: Char, + count: Int, + offset: Int) = { + + val filterRdd = if (count > 0) MiscFrameFunctions.getPagedRdd(this, offset, count, -1) else this + val headers = this.frameSchema.columnNames.mkString(separator.toString) + val csvFormat = CSVFormat.RFC4180.withDelimiter(separator) + val rowWrapper = new RowWrapper(frameSchema) + val csvRdd = filterRdd.map(row => { + rowWrapper(row).exportRowToCsv(csvFormat) + }) + + val dataSample = if (csvRdd.isEmpty()) StringUtils.EMPTY else csvRdd.first() + val addHeaders = this.sparkContext.parallelize(List(headers)) ++ csvRdd + addHeaders.saveAsTextFile(filename) + dataSample + } + } /** @@ -472,6 +511,7 @@ object FrameRdd { /** * converts a data frame to frame rdd + * * @param rdd a data frame * @return a frame rdd */ @@ -519,6 +559,7 @@ object FrameRdd { /** * Convert an RDD of mixed vertex types into a map where the keys are labels and values are FrameRdd's + * * @param gbVertexRDD Graphbuilder Vertex RDD * @return keys are labels and values are FrameRdd's */ @@ -549,6 +590,7 @@ object FrameRdd { /** * Convert an RDD of mixed vertex types into a map where the keys are labels and values are FrameRdd's + * * @param gbEdgeRDD Graphbuilder Edge RDD * @param gbVertexRDD Graphbuilder Vertex RDD * @return keys are labels and values are FrameRdd's @@ -570,6 +612,7 @@ object FrameRdd { /** * Converts row object from an RDD[Array[Any]] to an RDD[Product] so that it can be used to create a SchemaRDD + * * @return RDD[org.apache.spark.sql.Row] with values equal to row object */ def toRowRDD(schema: Schema, rows: RDD[Array[Any]]): RDD[org.apache.spark.sql.Row] = { @@ -592,6 +635,7 @@ object FrameRdd { /** * Converts row object to RDD[IndexedRow] needed to create an IndexedRowMatrix + * * @param indexedRows Rows of the frame as RDD[Row] * @param frameSchema Schema of the frame * @param featureColumnNames List of the frame's column(s) to be used @@ -609,6 +653,7 @@ object FrameRdd { /** * Converts row object to RDD[IndexedRow] needed to create an IndexedRowMatrix + * * @param indexedRows Rows of the frame as RDD[Row] * @param frameSchema Schema of the frame * @param featureColumnNames List of the frame's column(s) to be used @@ -633,6 +678,7 @@ object FrameRdd { /** * Converts the schema object to a StructType for use in creating a SchemaRDD + * * @return StructType with StructFields corresponding to the columns of the schema object */ def schemaToStructType(schema: Schema): StructType = { @@ -654,6 +700,7 @@ object FrameRdd { /** * Converts the spark DataTypes to our schema Datatypes + * * @return our schema DataType */ def sparkDataTypeToSchemaDataType(dataType: org.apache.spark.sql.types.DataType): org.trustedanalytics.atk.domain.schema.DataTypes.DataType = { @@ -688,6 +735,7 @@ object FrameRdd { /** * Converts a spark dataType (as string)to our schema Datatype + * * @param sparkDataType spark data type * @return a DataType */ @@ -708,6 +756,7 @@ object FrameRdd { /** * Converts the schema object to a StructType for use in creating a SchemaRDD + * * @return StructType with StructFields corresponding to the columns of the schema object */ def schemaToAvroType(schema: Schema): List[(String, String)] = { diff --git a/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/frame/Row.scala b/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/frame/Row.scala index 8b1bec7cf1..fe4c025e23 100644 --- a/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/frame/Row.scala +++ b/engine/engine-core/src/main/scala/org/trustedanalytics/atk/engine/frame/Row.scala @@ -16,6 +16,7 @@ package org.trustedanalytics.atk.engine.frame +import org.apache.commons.csv.{ CSVPrinter, CSVFormat } import org.apache.spark.frame.FrameRdd import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.GenericArrayData @@ -26,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow import org.trustedanalytics.atk.graphbuilder.elements.GBVertex import scala.collection.mutable.ArrayBuffer - +import java.lang.StringBuilder /** * This class wraps raw row data adding schema information - this allows for a richer easier to use API. * @@ -44,6 +45,7 @@ class RowWrapper(override val schema: Schema) extends AbstractRow with Serializa /** * Set the data in this wrapper + * * @param row the data to set inside this Wrapper * @return this instance */ @@ -68,6 +70,7 @@ trait AbstractRow { /** * Determine whether the property exists + * * @param name name of the property * @return boolean value indicating whether the property exists */ @@ -83,6 +86,7 @@ trait AbstractRow { /** * Get property + * * @param columnName name of the property * @return property value */ @@ -90,6 +94,7 @@ trait AbstractRow { /** * Get more than one value as a List + * * @param columnNames the columns to get values for * @return the values for the columns */ @@ -99,6 +104,7 @@ trait AbstractRow { /** * Get property of boolean data type + * * @param columnName name of the property * @return property value */ @@ -106,6 +112,7 @@ trait AbstractRow { /** * Get property of integer data type + * * @param columnName name of the property * @return property value */ @@ -113,6 +120,7 @@ trait AbstractRow { /** * Get property of long data type + * * @param columnName name of the property * @return property value */ @@ -120,6 +128,7 @@ trait AbstractRow { /** * Get property of float data type + * * @param columnName name of the property * @return property value */ @@ -127,6 +136,7 @@ trait AbstractRow { /** * Get property of double data type + * * @param columnName name of the property * @return property value */ @@ -134,6 +144,7 @@ trait AbstractRow { /** * Get property of string data type + * * @param columnName name of the property * @return property value */ @@ -141,6 +152,7 @@ trait AbstractRow { /** * Get property of string data type + * * @param columnName name of the property * @return property value */ @@ -160,6 +172,7 @@ trait AbstractRow { /** * Set a value in a column - validates the supplied value is the correct type + * * @param name the name of the column to set * @param value the value of the column */ @@ -170,6 +183,7 @@ trait AbstractRow { /** * Set all of the values for an entire row with validation + * * @param values the values to set * @return the row */ @@ -180,6 +194,7 @@ trait AbstractRow { /** * Validate the supplied value matches the schema for the supplied columnName. + * * @param name column name * @param value the value to check */ @@ -197,6 +212,7 @@ trait AbstractRow { /** * Set the value in a column - don't validate the type + * * @param name the name of the column to set * @param value the value of the column */ @@ -211,6 +227,7 @@ trait AbstractRow { /** * Set all of the values for a row - don't validate type + * * @param values all of the values * @return the row */ @@ -266,6 +283,7 @@ trait AbstractRow { /** * Get underlying data for this row + * * @return the actual row */ def data: Row = row @@ -280,6 +298,7 @@ trait AbstractRow { /** * Select several property values from their names + * * @param names the names of the properties to put into an array * @param flattenInputs If true, flatten vector data types * @return values for the supplied properties @@ -299,6 +318,7 @@ trait AbstractRow { /** * Select several property values from their names as an array of doubles + * * @param names the names of the properties to put into an array * @param flattenInputs If true, flatten vector data types * @return array of doubles with values for the supplied properties @@ -359,6 +379,7 @@ trait AbstractRow { /** * Create a row with values + * * @param content the values * @return the row */ @@ -372,4 +393,25 @@ trait AbstractRow { vertex.properties.foreach(prop => setValue(prop.key, prop.value)) row } + + /** + * Export row to CSV format + * + * @param csvFormat + * @return CSV record/row + */ + def exportRowToCsv(csvFormat: CSVFormat): String = { + val stringBuilder = new StringBuilder + val printer = new CSVPrinter(stringBuilder, csvFormat) + val array = row.toSeq.map(col => + col match { + case null => "" + case arr: ArrayBuffer[_] => arr.mkString(",") + case seq: Seq[_] => seq.mkString(",") + case x => x.toString + }) + for (i <- array) printer.print(i) + stringBuilder.toString + } + } diff --git a/engine/engine-core/src/test/scala/org/trustedanalytics/atk/engine/frame/RowWrapperTest.scala b/engine/engine-core/src/test/scala/org/trustedanalytics/atk/engine/frame/RowWrapperTest.scala new file mode 100644 index 0000000000..4b3ecd37ed --- /dev/null +++ b/engine/engine-core/src/test/scala/org/trustedanalytics/atk/engine/frame/RowWrapperTest.scala @@ -0,0 +1,50 @@ +/** + * Copyright (c) 2015 Intel Corporation  + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *       http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.trustedanalytics.atk.engine.frame + +import org.apache.commons.csv.CSVFormat +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.GenericRow +import org.scalatest.{ Matchers, WordSpec } +import org.trustedanalytics.atk.domain.schema.{ DataTypes, Column, FrameSchema } +/** + * Created by wtaie on 3/23/16. + */ +class RowWrapperTest extends WordSpec with Matchers { + + val inputRows: Array[Row] = Array( + new GenericRow(Array[Any]("a", 1, 1d, "w")), + new GenericRow(Array[Any]("c", 5, 1d, "5"))) + + val inputSchema = FrameSchema(List( + Column("col_0", DataTypes.string), + Column("col_1", DataTypes.int32), + Column("col_2", DataTypes.float64), + Column("col_3", DataTypes.string) + )) + "export row to CSV" should { + "convert input rows to string" in { + val csvFormat = CSVFormat.RFC4180.withDelimiter(',') + val rowWrapper1 = new RowWrapper(inputSchema) + val csvRecords = rowWrapper1(inputRows(1)).exportRowToCsv(csvFormat) + csvRecords shouldEqual ("c,5,1.0,5") + } + "throw an IllegalArgumentException if input rows were empty" in { + inputRows should not be empty + } + } + +} diff --git a/integration-tests/tests/gendoct.py b/integration-tests/tests/gendoct.py index c3e30a8b9b..8a34f3e0b3 100644 --- a/integration-tests/tests/gendoct.py +++ b/integration-tests/tests/gendoct.py @@ -96,6 +96,7 @@ def _trim_test_path(path): graph/graphx_connected_components.rst graph/graphx_triangle_count.rst graph/graphx_pagerank.rst +graph/ExportGraphCsv.rst frame/column_median.rst frame/column_mode.rst frame/column_summary_statistics.rst