Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<hide>
>>> import trustedanalytics as ta

>>> ta.connect()
-etc-

>>> vertex_schema = [('source', ta.int32), ('label', ta.float32)]
>>> edge_schema = [('source', ta.int32), ('dest', ta.int32), ('weight', ta.int32)]

>>> vertex_rows = [ [1, 1], [2, 1], [3, 5], [4, 5], [5, 5] ]
>>> edge_rows = [ [1, 2, 1], [1, 3, 1], [2, 3, 1], [1, 4, 1], [4, 5, 1] ]
>>> vertex_frame = ta.Frame(ta.UploadRows (vertex_rows, vertex_schema))
<progress>
>>> edge_frame = ta.Frame(ta.UploadRows (edge_rows, edge_schema))
-etc-

</hide>
>>> graph = ta.Graph()

>>> graph.define_vertex_type('source')
<progress>
>>> graph.vertices['source'].add_vertices(vertex_frame, 'source', 'label')
<progress>
>>> graph.define_edge_type('edges','source', 'source', directed=False)
<progress>
>>> graph.edges['edges'].add_edges(edge_frame, 'source', 'dest', ['weight'])
<progress>
>>> result = graph.export_to_csv("csvFolderName")
<progress>
>>> result['source'].inspect()
[#] _vid _label source label connectedComponentId
======================================================
[0] 5 source 5 5.0 1
[1] 1 source 1 1.0 1
[2] 2 source 2 1.0 1
[3] 3 source 3 5.0 1
[4] 4 source 4 5.0 1

>>> graph.edges['edges'].inspect()
[#] _eid _src_vid _dest_vid _label weight
==============================================
[0] 6 1 2 edges 1
[1] 7 1 3 edges 1
[2] 9 1 4 edges 1
[3] 8 2 3 edges 1
[4] 10 4 5 edges 1
Original file line number Diff line number Diff line change
Expand Up @@ -69,47 +69,11 @@ class ExportHdfsCsvPlugin extends SparkCommandPlugin[ExportHdfsCsvArgs, ExportMe
require(!fileStorage.exists(new Path(arguments.folderName)), "File or Directory already exists")
val frame: SparkFrame = arguments.frame
// load frame as RDD
val sample = exportToHdfsCsv(frame.rdd, arguments.folderName, arguments.separator.charAt(0), arguments.count, arguments.offset)
val sample = frame.rdd.exportToHdfsCsv(arguments.folderName, arguments.separator.charAt(0), arguments.count, arguments.offset)

val artifactPath = new Path(s"${fileStorage.hdfs.getHomeDirectory()}/${arguments.folderName}")
ExportMetadata(artifactPath.toString, "all", "csv", frame.rowCount, sample,
fileStorage.size(artifactPath.toString), Some(arguments.folderName))
}

/**
* Export to a file in CSV format
*
* @param frameRdd input rdd containing all columns
* @param filename file path where to store the file
*/
private def exportToHdfsCsv(
frameRdd: FrameRdd,
filename: String,
separator: Char,
count: Int,
offset: Int) = {

val filterRdd = if (count > 0) MiscFrameFunctions.getPagedRdd(frameRdd, offset, count, -1) else frameRdd
val headers = frameRdd.frameSchema.columnNames.mkString(separator.toString)
val csvFormat = CSVFormat.RFC4180.withDelimiter(separator)

val csvRdd = filterRdd.map(row => {
val stringBuilder = new java.lang.StringBuilder
val printer = new CSVPrinter(stringBuilder, csvFormat)
val array = row.toSeq.map(col =>
col match {
case null => ""
case arr: ArrayBuffer[_] => arr.mkString(",")
case seq: Seq[_] => seq.mkString(",")
case x => x.toString
})
for (i <- array) printer.print(i)
stringBuilder.toString
})

val dataSample = if (csvRdd.isEmpty()) StringUtils.EMPTY else csvRdd.first()
val addHeaders = frameRdd.sparkContext.parallelize(List(headers)) ++ csvRdd
addHeaders.saveAsTextFile(filename)
dataSample
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ atk.plugin {
"org.trustedanalytics.atk.plugins.connectedcomponents.ConnectedComponentsPlugin"
"org.trustedanalytics.atk.plugins.trianglecount.TriangleCountPlugin"
"org.trustedanalytics.atk.plugins.RenameGraphPlugin"
"org.trustedanalytics.atk.plugins.export.csv.ExportGraphCsvPlugin"
]

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Copyright (c) 2015 Intel Corporation 
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*       http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.trustedanalytics.atk.plugins.export.csv

import org.apache.commons.lang3.StringUtils
import org.trustedanalytics.atk.domain.graph.GraphReference
import org.trustedanalytics.atk.engine.plugin.ArgDoc

/**
* Input arguments class for export to CSV
*/
case class ExportGraphCsvArgs(graph: GraphReference,
@ArgDoc("""The HDFS folder path where the graph vertices and edges subfolders
will be created.""") folderName: String,
@ArgDoc("""The separator for separating the values.
Default is comma (,).""") separator: String = ",",
@ArgDoc("""The number of records you want.
Default, or a non-positive value, is the whole frame.""") count: Int = -1,
@ArgDoc("""The number of rows to skip before exporting to the file.
Default is zero (0).""") offset: Int = 0) {
require(graph != null, "graph is required")
require(folderName != null, "folder name is required")
require(StringUtils.isNotBlank(separator) && separator.length == 1, "A single character separator is required")
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* Copyright (c) 2015 Intel Corporation 
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*       http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.trustedanalytics.atk.plugins.export.csv

import org.apache.hadoop.fs.Path
import org.trustedanalytics.atk.domain.datacatalog.ExportMetadata
import org.trustedanalytics.atk.domain.frame.FrameEntity
import org.trustedanalytics.atk.engine.FileStorage
import org.trustedanalytics.atk.engine.frame.SparkFrame
import org.trustedanalytics.atk.engine.graph.SparkGraph
import org.trustedanalytics.atk.engine.plugin.{ Invocation, PluginDoc, SparkCommandPlugin }

//Implicits needed for JSON conversion
//import org.trustedanalytics.atk.domain.datacatalog.DataCatalogRestResponseJsonProtocol._
import org.trustedanalytics.atk.domain.DomainJsonProtocol._

object ExportGraphCsvJsonFormat {
implicit val exportGraphCsvArgsFormat = jsonFormat5(ExportGraphCsvArgs)
implicit val graphExportMetadataFormat = jsonFormat1(GraphExportMetadata)
}

import org.trustedanalytics.atk.plugins.export.csv.ExportGraphCsvJsonFormat._

/**
* Export a graph to csv format
*/
@PluginDoc(oneLine = "Write current Parquet graph to HDFS in csv format.",
extended = "Export the graph to multiple Hadoop files for the vertices and the edges in csv format.",
returns = "A dictionary with the location of the graph vertices and edges csv files.")
class ExportGraphCsvPlugin extends SparkCommandPlugin[ExportGraphCsvArgs, GraphExportMetadata] {

/**
* The name of the command
*/
override def name: String = "graph:/export_to_csv"

/**
* Calculate covariance for the specified columns
*
* @param invocation information about the user and the circumstances at the time of the call, as well as a function
* that can be called to produce a SparkContext that can be used during this invocation
* @param arguments input specification for covariance
* @return value of type declared as the Return type
*/
override def execute(arguments: ExportGraphCsvArgs)(implicit invocation: Invocation): GraphExportMetadata = {

val fileStorage = new FileStorage
require(!fileStorage.exists(new Path(arguments.folderName)), "File or Directory already exists")

//Get the graph meta data
val graph: SparkGraph = arguments.graph

// Get the list of the graph from the meta data
val graphMeta = engine.graphs.expectSeamless(graph)

//Export the graph vertices and edges to CSV file in HDFS, in two subdirectories inside the given directory name
val vertexFolderName = arguments.folderName + "/vertices"
val edgeFolderName = arguments.folderName + "/edges"
val vertexMetadata = exportFramesToCsv(vertexFolderName, graphMeta.vertexFrames, arguments, fileStorage)
val edgeMetadata = exportFramesToCsv(edgeFolderName, graphMeta.edgeFrames, arguments, fileStorage)
GraphExportMetadata(vertexMetadata.map(_.targetUri) ++ edgeMetadata.map(_.targetUri))
}

// Method for exporting the graph vertices/edges to CSV files in HDFS.
def exportFramesToCsv(folderName: String, frameEntities: List[FrameEntity], arguments: ExportGraphCsvArgs, fileStorage: FileStorage)(implicit invocation: Invocation): List[ExportMetadata] = {
val frames = frameEntities.map(_.toReference)
val metadata = frames.map(frame => {
// load frame as RDD
val sparkFrame: SparkFrame = frame
val subFolderName = s"${folderName}/${sparkFrame.label.getOrElse(sparkFrame.frameId)}"
val sample = sparkFrame.rdd.exportToHdfsCsv(subFolderName, arguments.separator.charAt(0), arguments.count, arguments.offset)

// Create the vertices and edges folders paths in HDFS
val artifactPath = new Path(s"${fileStorage.hdfs.getHomeDirectory()}/${subFolderName}")
ExportMetadata(artifactPath.toString, "all", "csv", sparkFrame.rowCount, sample,
fileStorage.size(artifactPath.toString), Some(folderName))
})
metadata
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* Copyright (c) 2015 Intel Corporation 
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*       http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.trustedanalytics.atk.plugins.export.csv

import org.trustedanalytics.atk.domain.datacatalog.ExportMetadata

//TODO: the JSON serialization issue needs to be fixed such that we can return List[ExportMetadata] instead of List[String]
/**
* Metadata class being returned by ExportGraphCsvPlugin.
* @param metadata
*/
case class GraphExportMetadata(metadata: List[String])
Loading