From 997dc6b4840c2f6ce6dacd6d2abcec0cb613e38d Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Thu, 26 May 2016 13:34:49 -0700 Subject: [PATCH 01/14] Export to OrientDB database name duplication bug fixed --- .../atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala index 1569adf6e0..240689da37 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala @@ -15,10 +15,12 @@ */ package org.trustedanalytics.atk.plugins.orientdb +import com.orientechnologies.orient.client.remote.OServerAdmin import org.trustedanalytics.atk.domain.DomainJsonProtocol import org.trustedanalytics.atk.domain.graph.SeamlessGraphMeta import org.trustedanalytics.atk.engine.graph.{ SparkEdgeFrame, SparkVertexFrame, SparkGraph } import org.trustedanalytics.atk.engine.plugin.{ ApiMaturityTag, Invocation, PluginDoc, SparkCommandPlugin } +import org.trustedanalytics.atk.event.EventLogging import scala.collection.immutable.Map import spray.json._ @@ -49,7 +51,6 @@ class ExportOrientDbGraphPlugin extends SparkCommandPlugin[ExportOrientDbGraphAr override def name: String = "graph:/export_to_orientdb" - override def apiMaturityTag = Some(ApiMaturityTag.Alpha) /** * Method to export graph to OrientDB * @@ -63,6 +64,11 @@ class ExportOrientDbGraphPlugin extends SparkCommandPlugin[ExportOrientDbGraphAr // Get OrientDB configurations val dbConfig = DbConfigReader.extractConfigurations(arguments.graphName) + // Check if the given graph name/database name already exists + val rootUserName = "root" + if (new OServerAdmin(dbConfig.dbUri).connect(rootUserName, dbConfig.rootPassword).existsDatabase()) { + require(arguments.graphName != s"${arguments.graphName}", s"the database name ${arguments.graphName} already exists, a new database name is required") + } //Get the graph meta data val graph: SparkGraph = arguments.graph From 22c98928f7c80a7e2c6a5648d8bbb027b901f3f1 Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Thu, 26 May 2016 13:58:38 -0700 Subject: [PATCH 02/14] cleaned up the imports --- .../atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala index 240689da37..393568993f 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala @@ -19,8 +19,7 @@ import com.orientechnologies.orient.client.remote.OServerAdmin import org.trustedanalytics.atk.domain.DomainJsonProtocol import org.trustedanalytics.atk.domain.graph.SeamlessGraphMeta import org.trustedanalytics.atk.engine.graph.{ SparkEdgeFrame, SparkVertexFrame, SparkGraph } -import org.trustedanalytics.atk.engine.plugin.{ ApiMaturityTag, Invocation, PluginDoc, SparkCommandPlugin } -import org.trustedanalytics.atk.event.EventLogging +import org.trustedanalytics.atk.engine.plugin.{Invocation, PluginDoc, SparkCommandPlugin } import scala.collection.immutable.Map import spray.json._ From 04f985248c70c85308f3e69bb9db0ab9b207c3e8 Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Thu, 26 May 2016 14:21:43 -0700 Subject: [PATCH 03/14] cleaned up the imports --- .../atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala index 393568993f..8437d4db05 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala @@ -19,7 +19,7 @@ import com.orientechnologies.orient.client.remote.OServerAdmin import org.trustedanalytics.atk.domain.DomainJsonProtocol import org.trustedanalytics.atk.domain.graph.SeamlessGraphMeta import org.trustedanalytics.atk.engine.graph.{ SparkEdgeFrame, SparkVertexFrame, SparkGraph } -import org.trustedanalytics.atk.engine.plugin.{Invocation, PluginDoc, SparkCommandPlugin } +import org.trustedanalytics.atk.engine.plugin.{ Invocation, PluginDoc, SparkCommandPlugin } import scala.collection.immutable.Map import spray.json._ From 10a4b51d2beb105da763d413fd57fe2a393a43e2 Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Fri, 27 May 2016 12:16:42 -0700 Subject: [PATCH 04/14] code review comments addressed --- .../atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala index 8437d4db05..6b7640c792 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala @@ -64,10 +64,10 @@ class ExportOrientDbGraphPlugin extends SparkCommandPlugin[ExportOrientDbGraphAr val dbConfig = DbConfigReader.extractConfigurations(arguments.graphName) // Check if the given graph name/database name already exists - val rootUserName = "root" - if (new OServerAdmin(dbConfig.dbUri).connect(rootUserName, dbConfig.rootPassword).existsDatabase()) { + if (new OServerAdmin(dbConfig.dbUri).connect(GraphDbFactory.rootUserName, dbConfig.rootPassword).existsDatabase()) { require(arguments.graphName != s"${arguments.graphName}", s"the database name ${arguments.graphName} already exists, a new database name is required") } + //Get the graph meta data val graph: SparkGraph = arguments.graph From 108e7a16322f928078e590c1bee7f3c387795d01 Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Mon, 6 Jun 2016 11:11:41 -0700 Subject: [PATCH 05/14] update existing graph for export to OrientBD plugin --- .../src/main/resources/reference.conf | 1 + .../atk/plugins/orientdb/DbConfigReader.scala | 3 +- .../plugins/orientdb/DbConfiguration.scala | 3 +- .../plugins/orientdb/EdgeFrameWriter.scala | 11 ++- .../atk/plugins/orientdb/EdgeWriter.scala | 56 +++++++++++++- .../orientdb/ExportOrientDbGraphPlugin.scala | 2 +- .../plugins/orientdb/VertexFrameWriter.scala | 9 ++- .../atk/plugins/orientdb/VertexWriter.scala | 75 +++++++++++++++---- .../orientdb/EdgeFrameWriterTest.scala | 2 +- .../atk/plugins/orientdb/EdgeWriterTest.scala | 6 +- .../plugins/orientdb/GraphDbFactoryTest.scala | 4 +- .../orientdb/VertexFrameWriterTest.scala | 2 +- .../plugins/orientdb/VertexWriterTest.scala | 10 +-- .../orientdbimport/EdgeReaderTest.scala | 6 +- .../orientdbimport/SchemaReaderTest.scala | 6 +- .../orientdbimport/VertexReaderTest.scala | 2 +- 16 files changed, 151 insertions(+), 47 deletions(-) diff --git a/engine-plugins/graph-plugins/src/main/resources/reference.conf b/engine-plugins/graph-plugins/src/main/resources/reference.conf index 7e3214a213..b78426f58a 100644 --- a/engine-plugins/graph-plugins/src/main/resources/reference.conf +++ b/engine-plugins/graph-plugins/src/main/resources/reference.conf @@ -8,5 +8,6 @@ trustedanalytics.atk { connection-orientdb.password = "invalid-orientdb-password" connection-orientdb.url = "invalid-orientdb-url" connection-orientdb.rootpassword ="invalid-orientdb-rootpassword" + connection-orientdb.append ="invalid-orientdb-append" } } diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfigReader.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfigReader.scala index baa180a6ba..c0700ed24e 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfigReader.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfigReader.scala @@ -38,6 +38,7 @@ object DbConfigReader extends Serializable { val portNumber = ConfigFactory.load().getString("trustedanalytics.atk.datastore.connection-orientdb.port") val dbDir = ConfigFactory.load().getString("trustedanalytics.atk.datastore.connection-orientdb.url") val dbUri = dbDir + "/" + dbName - DbConfiguration(dbUri, userName, password, portNumber, dbHost, rootPassword) + val append = ConfigFactory.load().getString("trustedanalytics.atk.datastore.connection-orientdb.append").toBoolean + DbConfiguration(dbUri, userName, password, portNumber, dbHost, rootPassword, append) } } diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfiguration.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfiguration.scala index f02e49f695..d67930b613 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfiguration.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfiguration.scala @@ -26,7 +26,8 @@ case class DbConfiguration(@ArgDoc("""OrientDB database URI.""") dbUri: String, @ArgDoc("""The database password.""") dbPassword: String, @ArgDoc("""Port number.""") portNumber: String, @ArgDoc("""The database host.""") dbHost: String, - @ArgDoc("""The root password.""") rootPassword: String) extends Serializable { + @ArgDoc("""The root password.""") rootPassword: String, + @ArgDoc("""Enables/Disables updating an existing OrientDB graph""") append: Boolean) extends Serializable { require(dbUri != null, "database URI is required") require(dbUserName != null, "the user name is required") diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriter.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriter.scala index 4312a2f03c..052e78760c 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriter.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriter.scala @@ -43,10 +43,15 @@ class EdgeFrameWriter(edgeFrameRdd: EdgeFrameRdd, dbConfigurations: DbConfigurat val edge = edgeWrapper.toEdge // lookup the source and destination vertices val findOrientVertex = new VertexWriter(orientGraph) - val srcVertex = findOrientVertex.findOrCreateVertex(edge.srcVertexId()) - val destVertex = findOrientVertex.findOrCreateVertex(edge.destVertexId()) + val srcVertex = findOrientVertex.findOrCreate(edge.srcVertexId()) + val destVertex = findOrientVertex.findOrCreate(edge.destVertexId()) val edgeWriter = new EdgeWriter(orientGraph, edge) - val orientEdge = edgeWriter.addEdge(srcVertex, destVertex) + val orientEdge = if (dbConfigurations.append) { + edgeWriter.updateOrCreate(edge, srcVertex, destVertex) + } + else { + edgeWriter.create(srcVertex, destVertex) + } batchCounter += 1 if (batchCounter % batchSize == 0 && batchCounter != 0) { orientGraph.commit() diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriter.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriter.scala index a1ef7c5f1b..1f49f070dc 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriter.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriter.scala @@ -15,8 +15,8 @@ */ package org.trustedanalytics.atk.plugins.orientdb -import com.tinkerpop.blueprints.{ Vertex => BlueprintsVertex } -import com.tinkerpop.blueprints.impls.orient.{ OrientGraphNoTx, OrientEdge, OrientGraph } +import com.tinkerpop.blueprints.{ Vertex => BlueprintsVertex, Edge => BlueprintsEdge } +import com.tinkerpop.blueprints.impls.orient.{ OrientGraphNoTx, OrientEdge } import org.apache.spark.atk.graph.Edge import org.trustedanalytics.atk.domain.schema.GraphSchema import org.trustedanalytics.atk.engine.frame.RowWrapper @@ -29,7 +29,7 @@ import org.trustedanalytics.atk.engine.frame.RowWrapper */ class EdgeWriter(orientGraph: OrientGraphNoTx, edge: Edge) { - require(orientGraph != null, "The Orient graph database instance must not equal null") + require(orientGraph != null, "The OrientDB graph database instance must not equal null") /** * Method for exporting an edge @@ -38,7 +38,7 @@ class EdgeWriter(orientGraph: OrientGraphNoTx, edge: Edge) { * @param destVertex is a blueprintsVertex as a destination * @return OrientDB edge */ - def addEdge(srcVertex: BlueprintsVertex, destVertex: BlueprintsVertex): OrientEdge = { + def create(srcVertex: BlueprintsVertex, destVertex: BlueprintsVertex): OrientEdge = { val className = edge.schema.label val orientEdge = orientGraph.addEdge("class:" + className, srcVertex, destVertex, className) @@ -51,4 +51,52 @@ class EdgeWriter(orientGraph: OrientGraphNoTx, edge: Edge) { orientEdge } + /** + * a method that finds OrientDB edge + * @param edge ATK edge + * @return OrientDB edge + */ + def find(edge: Edge): Option[BlueprintsEdge] = { + val edgeIterator = orientGraph.getEdges(GraphSchema.srcVidProperty, edge.srcVertexId()).iterator() + if (edgeIterator.hasNext) { + val existingEdge = edgeIterator.next() + return Some(existingEdge) + } + None + } + + /** + * a method that updates OrientDB edge + * @param edge ATK edge + * @param orientDbEdge OrientDB edge + * @return updated OrientDB edge + */ + def update(edge: Edge, orientDbEdge: BlueprintsEdge): BlueprintsEdge = { + val rowWrapper = new RowWrapper(edge.schema) + edge.schema.columns.foreach(col => { + if (col.name != GraphSchema.labelProperty) { + orientDbEdge.setProperty(col.name, rowWrapper(edge.row).value(col.name)) + } + }) + orientDbEdge + } + + /** + * a method that updates OrientDB edge if exists or creates a new edge if not found + * @param edge ATK edge + * @param srcVertex OrientDB vertex as a source + * @param destVertex OrientDB vertex as a destination + * @return OrientDB edge + */ + def updateOrCreate(edge: Edge, srcVertex: BlueprintsVertex, destVertex: BlueprintsVertex): BlueprintsEdge = { + val orientEdge = find(edge) + val newEdge = if (orientEdge.isEmpty) { + create(srcVertex, destVertex) + } + else { + update(edge, orientEdge.get) + } + newEdge + } + } diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala index 6b7640c792..d3a7e9709d 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala @@ -64,7 +64,7 @@ class ExportOrientDbGraphPlugin extends SparkCommandPlugin[ExportOrientDbGraphAr val dbConfig = DbConfigReader.extractConfigurations(arguments.graphName) // Check if the given graph name/database name already exists - if (new OServerAdmin(dbConfig.dbUri).connect(GraphDbFactory.rootUserName, dbConfig.rootPassword).existsDatabase()) { + if (new OServerAdmin(dbConfig.dbUri).connect(GraphDbFactory.rootUserName, dbConfig.rootPassword).existsDatabase() && !dbConfig.append) { require(arguments.graphName != s"${arguments.graphName}", s"the database name ${arguments.graphName} already exists, a new database name is required") } diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriter.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriter.scala index cd934f3757..df6d0687c2 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriter.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriter.scala @@ -42,8 +42,13 @@ class VertexFrameWriter(vertexFrameRdd: VertexFrameRdd, dbConfigurations: DbConf while (iter.hasNext) { val vertexWrapper = iter.next() val vertex = vertexWrapper.toVertex - val addOrientVertex = new VertexWriter(orientGraph) - val orientVertex = addOrientVertex.addVertex(vertex) + val vertexWriter = new VertexWriter(orientGraph) + if (dbConfigurations.append) { + vertexWriter.updateOrCreate(vertex) + } + else { + vertexWriter.create(vertex) + } batchCounter += 1 if (batchCounter % batchSize == 0 && batchCounter != 0) { orientGraph.commit() diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala index c82ce67c00..e813d4bb20 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala @@ -28,16 +28,15 @@ import org.trustedanalytics.atk.engine.frame.RowWrapper */ class VertexWriter(orientGraph: OrientGraphNoTx) { - require(orientGraph != null, "The Orient graph database instance must not equal null") + require(orientGraph != null, "The OrientDB graph database instance must not equal null") /** - * Method for exporting a vertex + * Method for creates a vertex * - * @param vertex atk vertex to be converted to Orient BlueprintsVertex - * @return Orient BlueprintsVertex + * @param vertex atk vertex to be converted to OrientDB BlueprintsVertex + * @return OrientDB BlueprintsVertex */ - - def addVertex(vertex: Vertex): BlueprintsVertex = { + def create(vertex: Vertex): BlueprintsVertex = { val className: String = vertex.schema.label val orientVertexType: BlueprintsVertex = orientGraph.addVertex(className, null) @@ -47,29 +46,73 @@ class VertexWriter(orientGraph: OrientGraphNoTx) { orientVertexType.setProperty(col.name, rowWrapper(vertex.row).value(col.name)) } }) - orientVertexType - } /** - * a method for checking an existing vertex and creates a new vertex if not found + * a method for looking up a vertex in OrientDB graph and creates a new vertex if not found * * @param vertexId the vertex ID - * @return vertex + * @return OrientDB vertex */ - def findOrCreateVertex(vertexId: Long): BlueprintsVertex = { + def findOrCreate(vertexId: Long): BlueprintsVertex = { + val vertex = find(vertexId) + if (vertex.isEmpty) { + val newVertex = orientGraph.addVertex(GraphSchema.labelProperty, null) + newVertex.setProperty(GraphSchema.vidProperty, vertexId) + newVertex + } + else { + vertex.get + } + } + /** + * a method that finds a vertex + * + * @param vertexId vertex ID + * @return OrientDB vertex if exists or null if not found + */ + def find(vertexId: Long): Option[BlueprintsVertex] = { val vertexIterator = orientGraph.getVertices(GraphSchema.vidProperty, vertexId).iterator() if (vertexIterator.hasNext) { val existingVertex = vertexIterator.next() - existingVertex + return Some(existingVertex) + } + None + } + + /** + * updates an existing OrientDB vertex + * + * @param vertex ATK vertex + * @param orientDbVertex OrientDB vertex + * @return updated OrientDB vertex + */ + def update(vertex: Vertex, orientDbVertex: BlueprintsVertex): BlueprintsVertex = { + val rowWrapper = new RowWrapper(vertex.schema) + vertex.schema.columns.foreach(col => { + if (col.name != GraphSchema.labelProperty) { + orientDbVertex.setProperty(col.name, rowWrapper(vertex.row).value(col.name)) + } + }) + orientDbVertex + } + + /** + * a method that updates OrientDB vertex if exists or creates a new vertex if not found + * + * @param vertex ATK vertex + * @return OrientDB vertex + */ + def updateOrCreate(vertex: Vertex): BlueprintsVertex = { + val orientVertex = find(vertex.vid) + val newVertex = if (orientVertex.isEmpty) { + create(vertex) } else { - val newVertex = orientGraph.addVertex(GraphSchema.labelProperty, null) - newVertex.setProperty(GraphSchema.vidProperty, vertexId) - newVertex + update(vertex, orientVertex.get) } - + newVertex } } diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala index 2cfb8388a4..41827ab52e 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala @@ -35,7 +35,7 @@ class EdgeFrameWriterTest extends WordSpec with TestingSparkContextWordSpec with "Edge frame writer" should { "Export edge frame" in { // exporting a vertex frame: - val dbConfig = new DbConfiguration(dbUri, dbUserName, dbUserName, "port", "host", rootPassword) + val dbConfig = new DbConfiguration(dbUri, dbUserName, dbUserName, "port", "host", rootPassword, false) val vColumns = List(Column(GraphSchema.vidProperty, DataTypes.int64), Column(GraphSchema.labelProperty, DataTypes.string), Column("name", DataTypes.string), Column("from", DataTypes.string), Column("to", DataTypes.string), Column("fair", DataTypes.int32)) val vSchema = new VertexSchema(vColumns, GraphSchema.labelProperty, null) diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriterTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriterTest.scala index 1ac188e135..1fc18c8ae4 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriterTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriterTest.scala @@ -39,8 +39,8 @@ class EdgeWriterTest extends WordSpec with Matchers with TestingSparkContextWord val vertexSrc = Vertex(schema, rowSrc) val vertexDest = Vertex(schema, rowDest) val vertexWriter = new VertexWriter(orientMemoryGraph) - val orientVertexSrc = vertexWriter.addVertex(vertexSrc) - val orientVertexDest = vertexWriter.addVertex(vertexDest) + val orientVertexSrc = vertexWriter.create(vertexSrc) + val orientVertexDest = vertexWriter.create(vertexDest) // create the edge val edgeColumns = List(Column(GraphSchema.edgeProperty, DataTypes.int64), Column(GraphSchema.srcVidProperty, DataTypes.int64), Column(GraphSchema.destVidProperty, DataTypes.int64), Column(GraphSchema.labelProperty, DataTypes.string), Column("distance", DataTypes.int32)) val edgeSchema = new EdgeSchema(edgeColumns, "label", "srclabel", "destlabel") @@ -48,7 +48,7 @@ class EdgeWriterTest extends WordSpec with Matchers with TestingSparkContextWord val edge = Edge(edgeSchema, edgeRow) val edgeWriter = new EdgeWriter(orientMemoryGraph, edge) // call method under test - val orientEdge = edgeWriter.addEdge(orientVertexSrc, orientVertexDest) + val orientEdge = edgeWriter.create(orientVertexSrc, orientVertexDest) //validate results val srcVidProp: Any = orientEdge.getProperty(GraphSchema.srcVidProperty) val destVidProp: Any = orientEdge.getProperty(GraphSchema.destVidProperty) diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/GraphDbFactoryTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/GraphDbFactoryTest.scala index 897598dd3e..9702ce797f 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/GraphDbFactoryTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/GraphDbFactoryTest.scala @@ -31,7 +31,7 @@ class GraphDbFactoryTest extends WordSpec with Matchers { "creates a graph database and takes input arguments" in { val dbUri: String = "memory:OrientTestDb" - val dbConfig = new DbConfiguration(dbUri, userName, password, port, host, rootPassword) + val dbConfig = new DbConfiguration(dbUri, userName, password, port, host, rootPassword, false) //Tested method val graph: OrientGraphNoTx = GraphDbFactory.createGraphDb(dbConfig) //Results validation @@ -45,7 +45,7 @@ class GraphDbFactoryTest extends WordSpec with Matchers { val dbName = "OrientDbTest" tmpDir = DirectoryUtils.createTempDirectory("orient-graph-for-unit-testing") val dbUri = "plocal:" + tmpDir.getAbsolutePath + "/" + dbName - val dbConfig = new DbConfiguration(dbUri, userName, password, port, host, rootPassword) + val dbConfig = new DbConfiguration(dbUri, userName, password, port, host, rootPassword, false) //Tested method val orientDb = GraphDbFactory.graphDbConnector(dbConfig) //Results validation diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriterTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriterTest.scala index 7c9b62d505..e5bdbd8dc5 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriterTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriterTest.scala @@ -33,7 +33,7 @@ class VertexFrameWriterTest extends WordSpec with TestingSparkContextWordSpec wi "vertex frame writer" should { "export vertex frame to OrientDB" in { - val dbConfig = new DbConfiguration(dbUri, dbUserName, dbPassword, "port", "host", rootPassword) + val dbConfig = new DbConfiguration(dbUri, dbUserName, dbPassword, "port", "host", rootPassword, false) val columns = List(Column(GraphSchema.vidProperty, DataTypes.int64), Column(GraphSchema.labelProperty, DataTypes.string), Column("name", DataTypes.string), Column("from", DataTypes.string), Column("to", DataTypes.string), Column("fair", DataTypes.int32)) val schema = new VertexSchema(columns, GraphSchema.labelProperty, null) val vertices: List[Row] = List( diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriterTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriterTest.scala index 172ad14ba4..05b1eb76d3 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriterTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriterTest.scala @@ -43,7 +43,7 @@ class VertexWriterTest extends WordSpec with Matchers with TestingSparkContextWo "export vertex to OrientDb vertex " in { val addOrientVertex = new VertexWriter(orientMemoryGraph) //Tested method - val orientVertex = addOrientVertex.addVertex(vertex) + val orientVertex = addOrientVertex.create(vertex) //Results validation val vidProp: Any = orientVertex.getProperty(GraphSchema.vidProperty) val propName: Any = orientVertex.getProperty("name") @@ -54,10 +54,10 @@ class VertexWriterTest extends WordSpec with Matchers with TestingSparkContextWo "findOrCreate gets a vertex" in { val addOrientVertex = new VertexWriter(orientMemoryGraph) - val orientVertex = addOrientVertex.addVertex(vertex) + val orientVertex = addOrientVertex.create(vertex) val vertexId = 1L //Tested method - val newVertex = addOrientVertex.findOrCreateVertex(vertexId) + val newVertex = addOrientVertex.findOrCreate(vertexId) //Results validation val vidProp: Any = newVertex.getProperty(GraphSchema.vidProperty) val propName: Any = newVertex.getProperty("from") @@ -67,10 +67,10 @@ class VertexWriterTest extends WordSpec with Matchers with TestingSparkContextWo "findOrCreate creates a vertex if not found" in { val addOrientVertex = new VertexWriter(orientMemoryGraph) - val orientVertex = addOrientVertex.addVertex(vertex) + val orientVertex = addOrientVertex.create(vertex) val vertexId = 2L //Tested Method - val newVertex = addOrientVertex.findOrCreateVertex(vertexId) + val newVertex = addOrientVertex.findOrCreate(vertexId) //Results validation val vidProp: Any = newVertex.getProperty(GraphSchema.vidProperty) assert(vidProp == 2) diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/EdgeReaderTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/EdgeReaderTest.scala index 998743aa16..eee034389f 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/EdgeReaderTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/EdgeReaderTest.scala @@ -38,10 +38,10 @@ class EdgeReaderTest extends WordSpec with TestingOrientDb with Matchers with Be Edge(edgeSchema, edgeRow) } val addOrientVertex = new VertexWriter(orientMemoryGraph) - val srcVertex = addOrientVertex.findOrCreateVertex(1L) - val destVertex = addOrientVertex.findOrCreateVertex(2L) + val srcVertex = addOrientVertex.findOrCreate(1L) + val destVertex = addOrientVertex.findOrCreate(2L) val edgeWriter = new EdgeWriter(orientMemoryGraph, edge) - edgeWriter.addEdge(srcVertex, destVertex) + edgeWriter.create(srcVertex, destVertex) } override def afterEach() { cleanupOrientDbInMemory() diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/SchemaReaderTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/SchemaReaderTest.scala index 6762020876..5f65abf63b 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/SchemaReaderTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/SchemaReaderTest.scala @@ -49,11 +49,11 @@ class SchemaReaderTest extends WordSpec with TestingOrientDb with Matchers with val addOrientVertex = new VertexWriter(orientMemoryGraph) val schemaWriter = new SchemaWriter(orientMemoryGraph) schemaWriter.createVertexSchema(vertex.schema) - val srcVertex = addOrientVertex.addVertex(vertex) - val destVertex = addOrientVertex.findOrCreateVertex(2L) + val srcVertex = addOrientVertex.create(vertex) + val destVertex = addOrientVertex.findOrCreate(2L) schemaWriter.createEdgeSchema(edge.schema) val edgeWriter = new EdgeWriter(orientMemoryGraph, edge) - edgeWriter.addEdge(srcVertex, destVertex) + edgeWriter.create(srcVertex, destVertex) } override def afterEach() { diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/VertexReaderTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/VertexReaderTest.scala index 10275d54db..8614b5a0b7 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/VertexReaderTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/VertexReaderTest.scala @@ -37,7 +37,7 @@ class VertexReaderTest extends WordSpec with TestingOrientDb with Matchers with Vertex(schema, row) } val addOrientVertex = new VertexWriter(orientMemoryGraph) - val orientVertex = addOrientVertex.addVertex(vertex) + val orientVertex = addOrientVertex.create(vertex) } override def afterEach() { From 9d918419f1d2237719901a774fbf05cf86d2bdf4 Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Mon, 6 Jun 2016 16:14:38 -0700 Subject: [PATCH 06/14] fixed find or create vertex class name, considering the plocal mode in checking the database existance --- .../plugins/orientdb/EdgeFrameWriter.scala | 4 ++-- .../orientdb/ExportOrientDbGraphPlugin.scala | 24 +++++++++++++++---- .../atk/plugins/orientdb/VertexWriter.scala | 19 +++++++++------ .../plugins/orientdb/VertexWriterTest.scala | 4 ++-- .../orientdbimport/EdgeReaderTest.scala | 4 ++-- .../orientdbimport/SchemaReaderTest.scala | 2 +- 6 files changed, 38 insertions(+), 19 deletions(-) diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriter.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriter.scala index 052e78760c..21daccb5c7 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriter.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriter.scala @@ -43,8 +43,8 @@ class EdgeFrameWriter(edgeFrameRdd: EdgeFrameRdd, dbConfigurations: DbConfigurat val edge = edgeWrapper.toEdge // lookup the source and destination vertices val findOrientVertex = new VertexWriter(orientGraph) - val srcVertex = findOrientVertex.findOrCreate(edge.srcVertexId()) - val destVertex = findOrientVertex.findOrCreate(edge.destVertexId()) + val srcVertex = findOrientVertex.findOrCreate(edge.srcVertexId(), edge.schema.srcVertexLabel) + val destVertex = findOrientVertex.findOrCreate(edge.destVertexId(), edge.schema.destVertexLabel) val edgeWriter = new EdgeWriter(orientGraph, edge) val orientEdge = if (dbConfigurations.append) { edgeWriter.updateOrCreate(edge, srcVertex, destVertex) diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala index d3a7e9709d..ba7743fa5a 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala @@ -16,6 +16,7 @@ package org.trustedanalytics.atk.plugins.orientdb import com.orientechnologies.orient.client.remote.OServerAdmin +import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx import org.trustedanalytics.atk.domain.DomainJsonProtocol import org.trustedanalytics.atk.domain.graph.SeamlessGraphMeta import org.trustedanalytics.atk.engine.graph.{ SparkEdgeFrame, SparkVertexFrame, SparkGraph } @@ -63,9 +64,19 @@ class ExportOrientDbGraphPlugin extends SparkCommandPlugin[ExportOrientDbGraphAr // Get OrientDB configurations val dbConfig = DbConfigReader.extractConfigurations(arguments.graphName) - // Check if the given graph name/database name already exists - if (new OServerAdmin(dbConfig.dbUri).connect(GraphDbFactory.rootUserName, dbConfig.rootPassword).existsDatabase() && !dbConfig.append) { - require(arguments.graphName != s"${arguments.graphName}", s"the database name ${arguments.graphName} already exists, a new database name is required") + // Check the existence of the given graph name/database name + if (!dbConfig.append) { + if (dbConfig.dbUri.startsWith("remote:")) { + if (new OServerAdmin(dbConfig.dbUri).connect(GraphDbFactory.rootUserName, dbConfig.rootPassword).existsDatabase()) { + require(arguments.graphName != s"${arguments.graphName}", s"the database name ${arguments.graphName} already exists, a new database name is required or set 'append' to 'true'") + } + } + else { + val orientDb: ODatabaseDocumentTx = new ODatabaseDocumentTx(dbConfig.dbUri) + if (orientDb.exists()) { + require(arguments.graphName != s"${arguments.graphName}", s"the database name ${arguments.graphName} already exists, a new database name is required or set 'append' to 'true'") + } + } } //Get the graph meta data @@ -86,7 +97,8 @@ class ExportOrientDbGraphPlugin extends SparkCommandPlugin[ExportOrientDbGraphAr * @return a value of type declared as the return type. */ - def exportVertexFramesToOrient(arguments: ExportOrientDbGraphArgs, dbConfigurations: DbConfiguration, graphMeta: SeamlessGraphMeta)(implicit invocation: Invocation): Map[String, Statistics] = { + def exportVertexFramesToOrient(arguments: ExportOrientDbGraphArgs, + dbConfigurations: DbConfiguration, graphMeta: SeamlessGraphMeta)(implicit invocation: Invocation): Map[String, Statistics] = { val orientDatabase = GraphDbFactory.graphDbConnector(dbConfigurations) val vertexFrames = graphMeta.vertexFrames.map(_.toReference) @@ -116,7 +128,9 @@ class ExportOrientDbGraphPlugin extends SparkCommandPlugin[ExportOrientDbGraphAr * @return a value of type declared as the return type. */ - def exportEdgeFramesToOrient(arguments: ExportOrientDbGraphArgs, dbConfigurations: DbConfiguration, graphMeta: SeamlessGraphMeta)(implicit invocation: Invocation): Map[String, Statistics] = { + def exportEdgeFramesToOrient(arguments: ExportOrientDbGraphArgs, + dbConfigurations: DbConfiguration, + graphMeta: SeamlessGraphMeta)(implicit invocation: Invocation): Map[String, Statistics] = { val orientDatabase = GraphDbFactory.graphDbConnector(dbConfigurations) val edgeFrames = graphMeta.edgeFrames.map(_.toReference) diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala index e813d4bb20..24df5af31b 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala @@ -15,7 +15,8 @@ */ package org.trustedanalytics.atk.plugins.orientdb -import com.tinkerpop.blueprints.impls.orient.{ OrientGraphNoTx } +import com.orientechnologies.orient.core.sql.OCommandSQL +import com.tinkerpop.blueprints.impls.orient.{ OrientDynaElementIterable, OrientGraphNoTx } import com.tinkerpop.blueprints.{ Vertex => BlueprintsVertex } import org.apache.spark.atk.graph.Vertex import org.trustedanalytics.atk.domain.schema.GraphSchema @@ -55,10 +56,10 @@ class VertexWriter(orientGraph: OrientGraphNoTx) { * @param vertexId the vertex ID * @return OrientDB vertex */ - def findOrCreate(vertexId: Long): BlueprintsVertex = { - val vertex = find(vertexId) + def findOrCreate(vertexId: Long, className: String): BlueprintsVertex = { + val vertex = find(vertexId, className) if (vertex.isEmpty) { - val newVertex = orientGraph.addVertex(GraphSchema.labelProperty, null) + val newVertex = orientGraph.addVertex(className, null) newVertex.setProperty(GraphSchema.vidProperty, vertexId) newVertex } @@ -73,8 +74,12 @@ class VertexWriter(orientGraph: OrientGraphNoTx) { * @param vertexId vertex ID * @return OrientDB vertex if exists or null if not found */ - def find(vertexId: Long): Option[BlueprintsVertex] = { - val vertexIterator = orientGraph.getVertices(GraphSchema.vidProperty, vertexId).iterator() + def find(vertexId: Long, className: String): Option[BlueprintsVertex] = { + //val vertexIterator = orientGraph.getVertices(GraphSchema.vidProperty, vertexId).iterator() + val vertices: OrientDynaElementIterable = orientGraph.command( + new OCommandSQL(s"select from ${className} where ${GraphSchema.vidProperty}= ${vertexId}") + ).execute() + val vertexIterator = vertices.iterator().asInstanceOf[java.util.Iterator[BlueprintsVertex]] if (vertexIterator.hasNext) { val existingVertex = vertexIterator.next() return Some(existingVertex) @@ -106,7 +111,7 @@ class VertexWriter(orientGraph: OrientGraphNoTx) { * @return OrientDB vertex */ def updateOrCreate(vertex: Vertex): BlueprintsVertex = { - val orientVertex = find(vertex.vid) + val orientVertex = find(vertex.vid, vertex.label) val newVertex = if (orientVertex.isEmpty) { create(vertex) } diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriterTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriterTest.scala index 82dd2694f4..2a88d5c9a3 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriterTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriterTest.scala @@ -63,7 +63,7 @@ class VertexWriterTest extends WordSpec with Matchers with TestingSparkContextWo val orientVertex = addOrientVertex.create(vertex) val vertexId = 1L //Tested method - val newVertex = addOrientVertex.findOrCreate(vertexId) + val newVertex = addOrientVertex.findOrCreate(vertexId, vertex.schema.label) //Results validation val vidProp: Any = newVertex.getProperty(GraphSchema.vidProperty) val propName: Any = newVertex.getProperty("from") @@ -76,7 +76,7 @@ class VertexWriterTest extends WordSpec with Matchers with TestingSparkContextWo val orientVertex = addOrientVertex.create(vertex) val vertexId = 2L //Tested Method - val newVertex = addOrientVertex.findOrCreate(vertexId) + val newVertex = addOrientVertex.findOrCreate(vertexId, vertex.schema.label) //Results validation val vidProp: Any = newVertex.getProperty(GraphSchema.vidProperty) assert(vidProp == 2) diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/EdgeReaderTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/EdgeReaderTest.scala index eee034389f..ec931bc408 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/EdgeReaderTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/EdgeReaderTest.scala @@ -38,8 +38,8 @@ class EdgeReaderTest extends WordSpec with TestingOrientDb with Matchers with Be Edge(edgeSchema, edgeRow) } val addOrientVertex = new VertexWriter(orientMemoryGraph) - val srcVertex = addOrientVertex.findOrCreate(1L) - val destVertex = addOrientVertex.findOrCreate(2L) + val srcVertex = addOrientVertex.findOrCreate(1L, edge.schema.srcVertexLabel) + val destVertex = addOrientVertex.findOrCreate(2L, edge.schema.destVertexLabel) val edgeWriter = new EdgeWriter(orientMemoryGraph, edge) edgeWriter.create(srcVertex, destVertex) } diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/SchemaReaderTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/SchemaReaderTest.scala index 5f65abf63b..9b0c3674f8 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/SchemaReaderTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/SchemaReaderTest.scala @@ -50,7 +50,7 @@ class SchemaReaderTest extends WordSpec with TestingOrientDb with Matchers with val schemaWriter = new SchemaWriter(orientMemoryGraph) schemaWriter.createVertexSchema(vertex.schema) val srcVertex = addOrientVertex.create(vertex) - val destVertex = addOrientVertex.findOrCreate(2L) + val destVertex = addOrientVertex.findOrCreate(2L, edge.schema.destVertexLabel) schemaWriter.createEdgeSchema(edge.schema) val edgeWriter = new EdgeWriter(orientMemoryGraph, edge) edgeWriter.create(srcVertex, destVertex) From 3e9f1ccbf29e31ab19601c1158db4fcc8a0a24d3 Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Fri, 17 Jun 2016 17:50:28 -0700 Subject: [PATCH 07/14] addressed code review comments --- .../graph-plugins/src/main/resources/reference.conf | 1 - .../atk/plugins/orientdb/DbConfigReader.scala | 3 +-- .../atk/plugins/orientdb/DbConfiguration.scala | 3 +-- .../atk/plugins/orientdb/EdgeFrameWriter.scala | 4 ++-- .../atk/plugins/orientdb/EdgeWriter.scala | 2 +- .../plugins/orientdb/ExportOrientDbGraphArgs.scala | 3 ++- .../orientdb/ExportOrientDbGraphPlugin.scala | 8 ++++---- .../atk/plugins/orientdb/VertexFrameWriter.scala | 4 ++-- .../atk/plugins/orientdb/VertexWriter.scala | 13 ++++++------- .../atk/plugins/orientdb/EdgeFrameWriterTest.scala | 6 +++--- .../atk/plugins/orientdb/GraphDbFactoryTest.scala | 4 ++-- .../plugins/orientdb/VertexFrameWriterTest.scala | 4 ++-- 12 files changed, 26 insertions(+), 29 deletions(-) diff --git a/engine-plugins/graph-plugins/src/main/resources/reference.conf b/engine-plugins/graph-plugins/src/main/resources/reference.conf index b78426f58a..7e3214a213 100644 --- a/engine-plugins/graph-plugins/src/main/resources/reference.conf +++ b/engine-plugins/graph-plugins/src/main/resources/reference.conf @@ -8,6 +8,5 @@ trustedanalytics.atk { connection-orientdb.password = "invalid-orientdb-password" connection-orientdb.url = "invalid-orientdb-url" connection-orientdb.rootpassword ="invalid-orientdb-rootpassword" - connection-orientdb.append ="invalid-orientdb-append" } } diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfigReader.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfigReader.scala index c0700ed24e..baa180a6ba 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfigReader.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfigReader.scala @@ -38,7 +38,6 @@ object DbConfigReader extends Serializable { val portNumber = ConfigFactory.load().getString("trustedanalytics.atk.datastore.connection-orientdb.port") val dbDir = ConfigFactory.load().getString("trustedanalytics.atk.datastore.connection-orientdb.url") val dbUri = dbDir + "/" + dbName - val append = ConfigFactory.load().getString("trustedanalytics.atk.datastore.connection-orientdb.append").toBoolean - DbConfiguration(dbUri, userName, password, portNumber, dbHost, rootPassword, append) + DbConfiguration(dbUri, userName, password, portNumber, dbHost, rootPassword) } } diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfiguration.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfiguration.scala index d67930b613..f02e49f695 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfiguration.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfiguration.scala @@ -26,8 +26,7 @@ case class DbConfiguration(@ArgDoc("""OrientDB database URI.""") dbUri: String, @ArgDoc("""The database password.""") dbPassword: String, @ArgDoc("""Port number.""") portNumber: String, @ArgDoc("""The database host.""") dbHost: String, - @ArgDoc("""The root password.""") rootPassword: String, - @ArgDoc("""Enables/Disables updating an existing OrientDB graph""") append: Boolean) extends Serializable { + @ArgDoc("""The root password.""") rootPassword: String) extends Serializable { require(dbUri != null, "database URI is required") require(dbUserName != null, "the user name is required") diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriter.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriter.scala index 21daccb5c7..debe5f0eec 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriter.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriter.scala @@ -32,7 +32,7 @@ class EdgeFrameWriter(edgeFrameRdd: EdgeFrameRdd, dbConfigurations: DbConfigurat * @param batchSize the number of edges to be committed * @return the number of exported edges */ - def exportEdgeFrame(batchSize: Int): Long = { + def exportEdgeFrame(batchSize: Int, append: Boolean): Long = { val edgesCountRdd = edgeFrameRdd.mapPartitionEdges(iter => { val orientGraph = GraphDbFactory.graphDbConnector(dbConfigurations) @@ -46,7 +46,7 @@ class EdgeFrameWriter(edgeFrameRdd: EdgeFrameRdd, dbConfigurations: DbConfigurat val srcVertex = findOrientVertex.findOrCreate(edge.srcVertexId(), edge.schema.srcVertexLabel) val destVertex = findOrientVertex.findOrCreate(edge.destVertexId(), edge.schema.destVertexLabel) val edgeWriter = new EdgeWriter(orientGraph, edge) - val orientEdge = if (dbConfigurations.append) { + val orientEdge = if (append) { edgeWriter.updateOrCreate(edge, srcVertex, destVertex) } else { diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriter.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriter.scala index 1f49f070dc..57788cb5be 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriter.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriter.scala @@ -57,7 +57,7 @@ class EdgeWriter(orientGraph: OrientGraphNoTx, edge: Edge) { * @return OrientDB edge */ def find(edge: Edge): Option[BlueprintsEdge] = { - val edgeIterator = orientGraph.getEdges(GraphSchema.srcVidProperty, edge.srcVertexId()).iterator() + val edgeIterator = orientGraph.getEdges(GraphSchema.srcVidProperty == edge.srcVertexId() && GraphSchema.destVidProperty == edge.destVertexId()).iterator() if (edgeIterator.hasNext) { val existingEdge = edgeIterator.next() return Some(existingEdge) diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphArgs.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphArgs.scala index 32deb419ec..12f275d3d3 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphArgs.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphArgs.scala @@ -23,10 +23,11 @@ import org.trustedanalytics.atk.engine.plugin.ArgDoc */ case class ExportOrientDbGraphArgs(graph: GraphReference, @ArgDoc("""OrientDB database name.""") graphName: String, + @ArgDoc("""if true, append data to an existing OrientDB graph""") append: Boolean, @ArgDoc("""batch size for commiting transactions.""") batchSize: Int = 1000) { require(graph != null, "graph is required") require(graphName != null, "database name is required") require(batchSize > 0, "batch size should be a positive value") - + require(append || !append, "set append to a value of type boolean") } diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala index ba7743fa5a..1fdde1d12e 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/ExportOrientDbGraphPlugin.scala @@ -27,7 +27,7 @@ import spray.json._ /** Json conversion for arguments and return value case classes */ import org.trustedanalytics.atk.domain.DomainJsonProtocol._ object ExportOrientDbGraphJsonFormat { - implicit val exportOrientDbGraphArgsFormat = jsonFormat3(ExportOrientDbGraphArgs) + implicit val exportOrientDbGraphArgsFormat = jsonFormat4(ExportOrientDbGraphArgs) implicit val statisticsFormat = jsonFormat2(Statistics) implicit val exportOrientDbGraphReturnFormat = jsonFormat3(ExportOrientDbGraphReturn) } @@ -65,7 +65,7 @@ class ExportOrientDbGraphPlugin extends SparkCommandPlugin[ExportOrientDbGraphAr val dbConfig = DbConfigReader.extractConfigurations(arguments.graphName) // Check the existence of the given graph name/database name - if (!dbConfig.append) { + if (!arguments.append) { if (dbConfig.dbUri.startsWith("remote:")) { if (new OServerAdmin(dbConfig.dbUri).connect(GraphDbFactory.rootUserName, dbConfig.rootPassword).existsDatabase()) { require(arguments.graphName != s"${arguments.graphName}", s"the database name ${arguments.graphName} already exists, a new database name is required or set 'append' to 'true'") @@ -111,7 +111,7 @@ class ExportOrientDbGraphPlugin extends SparkCommandPlugin[ExportOrientDbGraphAr val oVertexType = schemaWriter.createVertexSchema(vertexSchema) } val exportVertexFrame = new VertexFrameWriter(vertexFrameRdd, dbConfigurations) - val verticesCount = exportVertexFrame.exportVertexFrame(arguments.batchSize) + val verticesCount = exportVertexFrame.exportVertexFrame(arguments.batchSize, arguments.append) val exportedVerticesCount = orientDatabase.countVertices(vertexSchema.label) val failedVerticesCount = verticesCount - exportedVerticesCount (vertexSchema.label, Statistics(exportedVerticesCount, failedVerticesCount)) @@ -143,7 +143,7 @@ class ExportOrientDbGraphPlugin extends SparkCommandPlugin[ExportOrientDbGraphAr val edgeType = schemaWriter.createEdgeSchema(edgeSchema) } val exportEdgeFrame = new EdgeFrameWriter(edgeFrameRdd, dbConfigurations) - val edgesCount = exportEdgeFrame.exportEdgeFrame(arguments.batchSize) + val edgesCount = exportEdgeFrame.exportEdgeFrame(arguments.batchSize, arguments.append) val exportedEdgesCount = orientDatabase.countEdges(edgeSchema.label) val failedEdgesCount = edgesCount - exportedEdgesCount (edgeSchema.label, Statistics(exportedEdgesCount, failedEdgesCount)) diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriter.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriter.scala index df6d0687c2..afb6cdf461 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriter.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriter.scala @@ -33,7 +33,7 @@ class VertexFrameWriter(vertexFrameRdd: VertexFrameRdd, dbConfigurations: DbConf * @param batchSize the number of vertices to be committed * @return the number of exported vertices */ - def exportVertexFrame(batchSize: Int): Long = { + def exportVertexFrame(batchSize: Int, append: Boolean): Long = { val verticesCountRdd = vertexFrameRdd.mapPartitionVertices(iter => { var batchCounter = 0L @@ -43,7 +43,7 @@ class VertexFrameWriter(vertexFrameRdd: VertexFrameRdd, dbConfigurations: DbConf val vertexWrapper = iter.next() val vertex = vertexWrapper.toVertex val vertexWriter = new VertexWriter(orientGraph) - if (dbConfigurations.append) { + if (append) { vertexWriter.updateOrCreate(vertex) } else { diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala index 24df5af31b..f757548544 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala @@ -58,13 +58,12 @@ class VertexWriter(orientGraph: OrientGraphNoTx) { */ def findOrCreate(vertexId: Long, className: String): BlueprintsVertex = { val vertex = find(vertexId, className) - if (vertex.isEmpty) { - val newVertex = orientGraph.addVertex(className, null) - newVertex.setProperty(GraphSchema.vidProperty, vertexId) - newVertex - } - else { - vertex.get + vertex match { + case Some(vertex) => vertex + case _ => + val newVertex = orientGraph.addVertex(className, null) + newVertex.setProperty(GraphSchema.vidProperty, vertexId) + newVertex } } diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala index 41827ab52e..ed6963b3ae 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala @@ -35,7 +35,7 @@ class EdgeFrameWriterTest extends WordSpec with TestingSparkContextWordSpec with "Edge frame writer" should { "Export edge frame" in { // exporting a vertex frame: - val dbConfig = new DbConfiguration(dbUri, dbUserName, dbUserName, "port", "host", rootPassword, false) + val dbConfig = new DbConfiguration(dbUri, dbUserName, dbUserName, "port", "host", rootPassword) val vColumns = List(Column(GraphSchema.vidProperty, DataTypes.int64), Column(GraphSchema.labelProperty, DataTypes.string), Column("name", DataTypes.string), Column("from", DataTypes.string), Column("to", DataTypes.string), Column("fair", DataTypes.int32)) val vSchema = new VertexSchema(vColumns, GraphSchema.labelProperty, null) @@ -52,7 +52,7 @@ class EdgeFrameWriterTest extends WordSpec with TestingSparkContextWordSpec with val oVertexType = schemaWriter.createVertexSchema(vSchema) } val vertexFrameWriter = new VertexFrameWriter(vertexFrameRdd, dbConfig) - val verticesCountRdd = vertexFrameWriter.exportVertexFrame(vBatchSize) + val verticesCountRdd = vertexFrameWriter.exportVertexFrame(vBatchSize, false) //exporting the edge frame: val eColumns = List(Column(GraphSchema.edgeProperty, DataTypes.int64), Column(GraphSchema.srcVidProperty, DataTypes.int64), Column(GraphSchema.destVidProperty, DataTypes.int64), Column(GraphSchema.labelProperty, DataTypes.string), Column("distance", DataTypes.int32)) @@ -70,7 +70,7 @@ class EdgeFrameWriterTest extends WordSpec with TestingSparkContextWordSpec with } val edgeFrameWriter = new EdgeFrameWriter(edgeFrameRdd, dbConfig) // call method under test - val edgesCount = edgeFrameWriter.exportEdgeFrame(batchSize) + val edgesCount = edgeFrameWriter.exportEdgeFrame(batchSize, false) //validate results val exportedEdges = orientFileGraph.countEdges() edgesCount shouldEqual exportedEdges diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/GraphDbFactoryTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/GraphDbFactoryTest.scala index 9702ce797f..897598dd3e 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/GraphDbFactoryTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/GraphDbFactoryTest.scala @@ -31,7 +31,7 @@ class GraphDbFactoryTest extends WordSpec with Matchers { "creates a graph database and takes input arguments" in { val dbUri: String = "memory:OrientTestDb" - val dbConfig = new DbConfiguration(dbUri, userName, password, port, host, rootPassword, false) + val dbConfig = new DbConfiguration(dbUri, userName, password, port, host, rootPassword) //Tested method val graph: OrientGraphNoTx = GraphDbFactory.createGraphDb(dbConfig) //Results validation @@ -45,7 +45,7 @@ class GraphDbFactoryTest extends WordSpec with Matchers { val dbName = "OrientDbTest" tmpDir = DirectoryUtils.createTempDirectory("orient-graph-for-unit-testing") val dbUri = "plocal:" + tmpDir.getAbsolutePath + "/" + dbName - val dbConfig = new DbConfiguration(dbUri, userName, password, port, host, rootPassword, false) + val dbConfig = new DbConfiguration(dbUri, userName, password, port, host, rootPassword) //Tested method val orientDb = GraphDbFactory.graphDbConnector(dbConfig) //Results validation diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriterTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriterTest.scala index e5bdbd8dc5..ec777c5cb6 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriterTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriterTest.scala @@ -33,7 +33,7 @@ class VertexFrameWriterTest extends WordSpec with TestingSparkContextWordSpec wi "vertex frame writer" should { "export vertex frame to OrientDB" in { - val dbConfig = new DbConfiguration(dbUri, dbUserName, dbPassword, "port", "host", rootPassword, false) + val dbConfig = new DbConfiguration(dbUri, dbUserName, dbPassword, "port", "host", rootPassword) val columns = List(Column(GraphSchema.vidProperty, DataTypes.int64), Column(GraphSchema.labelProperty, DataTypes.string), Column("name", DataTypes.string), Column("from", DataTypes.string), Column("to", DataTypes.string), Column("fair", DataTypes.int32)) val schema = new VertexSchema(columns, GraphSchema.labelProperty, null) val vertices: List[Row] = List( @@ -50,7 +50,7 @@ class VertexFrameWriterTest extends WordSpec with TestingSparkContextWordSpec wi val vertexFrameRdd = new VertexFrameRdd(schema, rowRdd) val vertexFrameWriter = new VertexFrameWriter(vertexFrameRdd, dbConfig) //Tested method - val verticesCount = vertexFrameWriter.exportVertexFrame(batchSize) + val verticesCount = vertexFrameWriter.exportVertexFrame(batchSize, false) //Results validation val exportedVertices = orientFileGraph.countVertices() verticesCount shouldEqual exportedVertices From 2db1d7b4c57c42fb518088896fa1e8d82f246e77 Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Mon, 20 Jun 2016 11:40:43 -0700 Subject: [PATCH 08/14] fixed the scala test error --- .../atk/plugins/orientdb/EdgeWriter.scala | 12 +++- .../atk/plugins/orientdb/VertexWriter.scala | 3 +- .../orientdb/EdgeFrameWriterTest.scala | 2 +- .../atk/plugins/orientdb/EdgeWriterTest.scala | 67 ++++++++++++------- 4 files changed, 56 insertions(+), 28 deletions(-) diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriter.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriter.scala index 57788cb5be..7619d72c1a 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriter.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriter.scala @@ -15,8 +15,9 @@ */ package org.trustedanalytics.atk.plugins.orientdb +import com.orientechnologies.orient.core.sql.OCommandSQL import com.tinkerpop.blueprints.{ Vertex => BlueprintsVertex, Edge => BlueprintsEdge } -import com.tinkerpop.blueprints.impls.orient.{ OrientGraphNoTx, OrientEdge } +import com.tinkerpop.blueprints.impls.orient.{ OrientDynaElementIterable, OrientGraphNoTx, OrientEdge } import org.apache.spark.atk.graph.Edge import org.trustedanalytics.atk.domain.schema.GraphSchema import org.trustedanalytics.atk.engine.frame.RowWrapper @@ -53,11 +54,16 @@ class EdgeWriter(orientGraph: OrientGraphNoTx, edge: Edge) { /** * a method that finds OrientDB edge + * * @param edge ATK edge * @return OrientDB edge */ def find(edge: Edge): Option[BlueprintsEdge] = { - val edgeIterator = orientGraph.getEdges(GraphSchema.srcVidProperty == edge.srcVertexId() && GraphSchema.destVidProperty == edge.destVertexId()).iterator() + val edges: OrientDynaElementIterable = orientGraph.command( + new OCommandSQL(s"select from ${edge.schema.label} where ${GraphSchema.srcVidProperty}== ${edge.srcVertexId()} and ${GraphSchema.destVidProperty}== ${edge.destVertexId()}") + ).execute() + val edgeIterator = edges.iterator().asInstanceOf[java.util.Iterator[BlueprintsEdge]] + // val edgeIterator = orientGraph.getEdges(GraphSchema.srcVidProperty,edge.srcVertexId()).iterator() if (edgeIterator.hasNext) { val existingEdge = edgeIterator.next() return Some(existingEdge) @@ -67,6 +73,7 @@ class EdgeWriter(orientGraph: OrientGraphNoTx, edge: Edge) { /** * a method that updates OrientDB edge + * * @param edge ATK edge * @param orientDbEdge OrientDB edge * @return updated OrientDB edge @@ -83,6 +90,7 @@ class EdgeWriter(orientGraph: OrientGraphNoTx, edge: Edge) { /** * a method that updates OrientDB edge if exists or creates a new edge if not found + * * @param edge ATK edge * @param srcVertex OrientDB vertex as a source * @param destVertex OrientDB vertex as a destination diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala index f757548544..9c53786093 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala @@ -74,9 +74,8 @@ class VertexWriter(orientGraph: OrientGraphNoTx) { * @return OrientDB vertex if exists or null if not found */ def find(vertexId: Long, className: String): Option[BlueprintsVertex] = { - //val vertexIterator = orientGraph.getVertices(GraphSchema.vidProperty, vertexId).iterator() val vertices: OrientDynaElementIterable = orientGraph.command( - new OCommandSQL(s"select from ${className} where ${GraphSchema.vidProperty}= ${vertexId}") + new OCommandSQL(s"select from ${className} where ${GraphSchema.vidProperty}== ${vertexId}") ).execute() val vertexIterator = vertices.iterator().asInstanceOf[java.util.Iterator[BlueprintsVertex]] if (vertexIterator.hasNext) { diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala index ed6963b3ae..df39c8276c 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala @@ -56,7 +56,7 @@ class EdgeFrameWriterTest extends WordSpec with TestingSparkContextWordSpec with //exporting the edge frame: val eColumns = List(Column(GraphSchema.edgeProperty, DataTypes.int64), Column(GraphSchema.srcVidProperty, DataTypes.int64), Column(GraphSchema.destVidProperty, DataTypes.int64), Column(GraphSchema.labelProperty, DataTypes.string), Column("distance", DataTypes.int32)) - val eSchema = new EdgeSchema(eColumns, "label", "srclabel", "destlabel") + val eSchema = new EdgeSchema(eColumns, "label", GraphSchema.labelProperty, GraphSchema.labelProperty) val edges: List[Row] = List( new GenericRow(Array(1L, 1L, 2L, "distance1", 100)), new GenericRow(Array(2L, 2L, 3L, "distance2", 200)), diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriterTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriterTest.scala index 2d34549ce7..995ae8eb05 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriterTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriterTest.scala @@ -30,33 +30,36 @@ class EdgeWriterTest extends WordSpec with Matchers with TestingSparkContextWord cleanupOrientDbInMemory() } "Edge writer" should { + //create the source and destination vertices + val columns = List( + Column(GraphSchema.vidProperty, DataTypes.int64), + Column(GraphSchema.labelProperty, DataTypes.string), + Column("name", DataTypes.string), + Column("from", DataTypes.string), + Column("to", DataTypes.string), + Column("fair", DataTypes.int32)) + val schema = new VertexSchema(columns, GraphSchema.labelProperty, null) + val rowSrc = new GenericRow(Array(1L, "l1", "Bob", "PDX", "LAX", 350)) + val rowDest = new GenericRow(Array(2L, "l1", "Alice", "SFO", "SEA", 465)) + val vertexSrc = Vertex(schema, rowSrc) + val vertexDest = Vertex(schema, rowDest) + + // create the edge + val edgeColumns = List( + Column(GraphSchema.edgeProperty, DataTypes.int64), + Column(GraphSchema.srcVidProperty, DataTypes.int64), + Column(GraphSchema.destVidProperty, DataTypes.int64), + Column(GraphSchema.labelProperty, DataTypes.string), + Column("distance", DataTypes.int32)) + val edgeSchema = new EdgeSchema(edgeColumns, "label", "srclabel", "destlabel") + val edgeRow = new GenericRow(Array(1L, 2L, 3L, "distance", 500)) + val edge = Edge(edgeSchema, edgeRow) + "export edge to Orient edge" in { - //create the source and destination vertices - val columns = List( - Column(GraphSchema.vidProperty, DataTypes.int64), - Column(GraphSchema.labelProperty, DataTypes.string), - Column("name", DataTypes.string), - Column("from", DataTypes.string), - Column("to", DataTypes.string), - Column("fair", DataTypes.int32)) - val schema = new VertexSchema(columns, GraphSchema.labelProperty, null) - val rowSrc = new GenericRow(Array(1L, "l1", "Bob", "PDX", "LAX", 350)) - val rowDest = new GenericRow(Array(2L, "l1", "Alice", "SFO", "SEA", 465)) - val vertexSrc = Vertex(schema, rowSrc) - val vertexDest = Vertex(schema, rowDest) + // export vertices to OrientDB graph val vertexWriter = new VertexWriter(orientMemoryGraph) val orientVertexSrc = vertexWriter.create(vertexSrc) val orientVertexDest = vertexWriter.create(vertexDest) - // create the edge - val edgeColumns = List( - Column(GraphSchema.edgeProperty, DataTypes.int64), - Column(GraphSchema.srcVidProperty, DataTypes.int64), - Column(GraphSchema.destVidProperty, DataTypes.int64), - Column(GraphSchema.labelProperty, DataTypes.string), - Column("distance", DataTypes.int32)) - val edgeSchema = new EdgeSchema(edgeColumns, "label", "srclabel", "destlabel") - val edgeRow = new GenericRow(Array(1L, 2L, 3L, "distance", 500)) - val edge = Edge(edgeSchema, edgeRow) val edgeWriter = new EdgeWriter(orientMemoryGraph, edge) // call method under test val orientEdge = edgeWriter.create(orientVertexSrc, orientVertexDest) @@ -68,5 +71,23 @@ class EdgeWriterTest extends WordSpec with Matchers with TestingSparkContextWord assert(destVidProp == 3) assert(edgeProp == 500) } + "finds an edge" in { + // export vertices and the edge to OrientDB graph + val vertexWriter = new VertexWriter(orientMemoryGraph) + val orientVertexSrc = vertexWriter.create(vertexSrc) + val orientVertexDest = vertexWriter.create(vertexDest) + val edgeWriter = new EdgeWriter(orientMemoryGraph, edge) + edgeWriter.create(orientVertexSrc, orientVertexDest) + // call method under test + val orientEdge = edgeWriter.find(edge).get + //validating results + val srcVidProp: Any = orientEdge.getProperty(GraphSchema.srcVidProperty) + val destVidProp: Any = orientEdge.getProperty(GraphSchema.destVidProperty) + val edgeProp: Any = orientEdge.getProperty("distance") + assert(srcVidProp == 2) + assert(destVidProp == 3) + assert(edgeProp == 500) + + } } } From a9ac96f977d8d81af16aca776073e945f1cd8ce1 Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Mon, 20 Jun 2016 12:59:05 -0700 Subject: [PATCH 09/14] scala test error fixed --- .../atk/plugins/orientdb/EdgeWriter.scala | 1 - .../atk/plugins/orientdb/VertexWriter.scala | 2 +- .../atk/plugins/orientdbimport/EdgeReaderTest.scala | 12 ++++++++---- .../plugins/orientdbimport/SchemaReaderTest.scala | 2 +- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriter.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriter.scala index 7619d72c1a..a0e3a98376 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriter.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriter.scala @@ -63,7 +63,6 @@ class EdgeWriter(orientGraph: OrientGraphNoTx, edge: Edge) { new OCommandSQL(s"select from ${edge.schema.label} where ${GraphSchema.srcVidProperty}== ${edge.srcVertexId()} and ${GraphSchema.destVidProperty}== ${edge.destVertexId()}") ).execute() val edgeIterator = edges.iterator().asInstanceOf[java.util.Iterator[BlueprintsEdge]] - // val edgeIterator = orientGraph.getEdges(GraphSchema.srcVidProperty,edge.srcVertexId()).iterator() if (edgeIterator.hasNext) { val existingEdge = edgeIterator.next() return Some(existingEdge) diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala index 9c53786093..b6a33f49c8 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriter.scala @@ -75,7 +75,7 @@ class VertexWriter(orientGraph: OrientGraphNoTx) { */ def find(vertexId: Long, className: String): Option[BlueprintsVertex] = { val vertices: OrientDynaElementIterable = orientGraph.command( - new OCommandSQL(s"select from ${className} where ${GraphSchema.vidProperty}== ${vertexId}") + new OCommandSQL(s"select from ${className} where ${GraphSchema.vidProperty} = ${vertexId}") ).execute() val vertexIterator = vertices.iterator().asInstanceOf[java.util.Iterator[BlueprintsVertex]] if (vertexIterator.hasNext) { diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/EdgeReaderTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/EdgeReaderTest.scala index ec931bc408..8dd868835e 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/EdgeReaderTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/EdgeReaderTest.scala @@ -18,9 +18,9 @@ package org.trustedanalytics.atk.plugins.orientdbimport import org.apache.spark.atk.graph.Edge import org.apache.spark.sql.catalyst.expressions.GenericRow import org.scalatest.{ Matchers, BeforeAndAfterEach, WordSpec } -import org.trustedanalytics.atk.domain.schema.{ EdgeSchema, DataTypes, GraphSchema, Column } +import org.trustedanalytics.atk.domain.schema._ import org.trustedanalytics.atk.engine.frame.RowWrapper -import org.trustedanalytics.atk.plugins.orientdb.{ VertexWriter, EdgeWriter } +import org.trustedanalytics.atk.plugins.orientdb.{ SchemaWriter, VertexWriter, EdgeWriter } import org.trustedanalytics.atk.testutils.TestingOrientDb class EdgeReaderTest extends WordSpec with TestingOrientDb with Matchers with BeforeAndAfterEach { @@ -33,10 +33,14 @@ class EdgeReaderTest extends WordSpec with TestingOrientDb with Matchers with Be Column(GraphSchema.destVidProperty, DataTypes.int64), Column(GraphSchema.labelProperty, DataTypes.string), Column("distance", DataTypes.int32)) - val edgeSchema = new EdgeSchema(edgeColumns, "label", "srclabel", "destlabel") + val edgeSchema = new EdgeSchema(edgeColumns, "label", GraphSchema.labelProperty, GraphSchema.labelProperty) val edgeRow = new GenericRow(Array(1L, 1L, 2L, "distance", 500)) Edge(edgeSchema, edgeRow) } + val columns = List(Column(GraphSchema.vidProperty, DataTypes.int64), Column(GraphSchema.labelProperty, DataTypes.string)) + val schema = new VertexSchema(columns, GraphSchema.labelProperty, null) + val schemaWriter = new SchemaWriter(orientMemoryGraph) + schemaWriter.createVertexSchema(schema) val addOrientVertex = new VertexWriter(orientMemoryGraph) val srcVertex = addOrientVertex.findOrCreate(1L, edge.schema.srcVertexLabel) val destVertex = addOrientVertex.findOrCreate(2L, edge.schema.destVertexLabel) @@ -53,7 +57,7 @@ class EdgeReaderTest extends WordSpec with TestingOrientDb with Matchers with Be Column(GraphSchema.destVidProperty, DataTypes.int64), Column(GraphSchema.labelProperty, DataTypes.string), Column("distance", DataTypes.int32)) - val schema = new EdgeSchema(edgeColumns, "label", "srclabel", "destlabel") + val schema = new EdgeSchema(edgeColumns, "label", GraphSchema.labelProperty, GraphSchema.labelProperty) "import edge" in { val srcVertexId = 1L diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/SchemaReaderTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/SchemaReaderTest.scala index 9b0c3674f8..9302b6677b 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/SchemaReaderTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/SchemaReaderTest.scala @@ -42,7 +42,7 @@ class SchemaReaderTest extends WordSpec with TestingOrientDb with Matchers with Column(GraphSchema.destVidProperty, DataTypes.int64), Column(GraphSchema.labelProperty, DataTypes.string), Column("distance", DataTypes.int32)) - val edgeSchema = new EdgeSchema(edgeColumns, "label", "srclabel", "destlabel") + val edgeSchema = new EdgeSchema(edgeColumns, "label", GraphSchema.labelProperty, GraphSchema.labelProperty) val edgeRow = new GenericRow(Array(1L, 1L, 2L, "distance", 500)) Edge(edgeSchema, edgeRow) } From 36f3bd45bf4e1a2b14bf2c4553b8777ba7df6957 Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Tue, 9 Aug 2016 12:20:57 -0700 Subject: [PATCH 10/14] Convert OType.DOUBLE to Data Type float64 and OType.FLOAT to Data Type float32 --- .../atk/plugins/orientdb/OrientDbTypeConverter.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/OrientDbTypeConverter.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/OrientDbTypeConverter.scala index ec70af3459..dc0c99a22a 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/OrientDbTypeConverter.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/OrientDbTypeConverter.scala @@ -34,7 +34,7 @@ object OrientDbTypeConverter { case int64 if dataType.equalsDataType(DataTypes.int64) => OType.LONG case int32 if dataType.equalsDataType(DataTypes.int32) => OType.INTEGER case float32 if dataType.equalsDataType(DataTypes.float32) => OType.FLOAT - case float64 if dataType.equalsDataType(DataTypes.float64) => OType.FLOAT + case float64 if dataType.equalsDataType(DataTypes.float64) => OType.DOUBLE case string if dataType.equalsDataType(DataTypes.string) => OType.STRING case _ => throw new IllegalArgumentException(s"Unable to convert $dataType to OrientDB data type") } @@ -47,8 +47,9 @@ object OrientDbTypeConverter { def convertOrientDbtoDataType(orientDbType: OType): DataType = orientDbType match { case OType.LONG => DataTypes.toDataType("int64") case OType.INTEGER => DataTypes.toDataType("int32") - case OType.FLOAT => DataTypes.toDataType("float64") + case OType.FLOAT => DataTypes.toDataType("float32") case OType.STRING => DataTypes.toDataType("string") + case OType.DOUBLE => DataTypes.toDataType("float64") case _ => throw new IllegalArgumentException(s"Unable to convert $orientDbType to DataType") } } From 22be07e0daad49fa037bbc055e214c5268307acc Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Wed, 17 Aug 2016 13:51:41 -0700 Subject: [PATCH 11/14] upgrade OrientDB version to 2.2.6 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 07fec12fae..e126af4318 100644 --- a/pom.xml +++ b/pom.xml @@ -140,7 +140,7 @@ export MAVEN_OPTS="-Xmx512m -XX:PermSize=256m" 1.1.0-${dep.cdh.version} 1.6.0-${dep.cdh.version} 2016.2.181 - 2.1.16 + 2.2.6 From df62d1c5a29ab9112d8686abb4604ac62cd5748b Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Wed, 17 Aug 2016 14:56:59 -0700 Subject: [PATCH 12/14] upgraded the compiler memory Please enter the commit message for your changes. Lines starting --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index cfefa4ff74..8fb0caa9e9 100644 --- a/pom.xml +++ b/pom.xml @@ -307,7 +307,7 @@ export MAVEN_OPTS="-Xmx512m -XX:PermSize=256m" WDF TestSuite.txt false FTD - -Xmx512m -XX:PermSize=256m + -Xmx1000m -XX:PermSize=256m From 955da7f9ffbc1da9ec33ee631c9dc7a06409eacf Mon Sep 17 00:00:00 2001 From: wafaa Taie Date: Thu, 18 Aug 2016 18:08:48 -0700 Subject: [PATCH 13/14] fix the memory error --- .../main/resources/python/graph/export_to_orientdb.rst | 2 +- .../atk/plugins/orientdb/DbConfiguration.scala | 3 ++- .../atk/plugins/orientdb/GraphDbFactory.scala | 1 + .../atk/plugins/orientdbimport/OrientDbEdgeRdd.scala | 3 ++- .../atk/plugins/orientdb/EdgeFrameWriterTest.scala | 6 +++--- .../model/plugins/clustering/lda/LdaPluginArgs.scala | 3 +-- pom.xml | 2 +- .../atk/testutils/TestingOrientDb.scala | 10 ++++++---- 8 files changed, 17 insertions(+), 13 deletions(-) diff --git a/doc-api-examples/src/main/resources/python/graph/export_to_orientdb.rst b/doc-api-examples/src/main/resources/python/graph/export_to_orientdb.rst index 904d7c3e14..7db4fd1d70 100644 --- a/doc-api-examples/src/main/resources/python/graph/export_to_orientdb.rst +++ b/doc-api-examples/src/main/resources/python/graph/export_to_orientdb.rst @@ -28,7 +28,7 @@ >>> result = graph.export_to_orientdb("OrientDbDocTest",5) >>> result - {u'db_uri': u'remote:hostname:2424/OrientDbTest', + {u'db_uri': u'remote:hostname:2424/OrientDbDocTest', u'exported_edges': {u'edges': {u'exported_count': 5, u'failure_count': 0}}, u'exported_vertices': {u'source': {u'exported_count': 5,u'failure_count': 0}}} diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfiguration.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfiguration.scala index f02e49f695..51d7166908 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfiguration.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfiguration.scala @@ -26,7 +26,8 @@ case class DbConfiguration(@ArgDoc("""OrientDB database URI.""") dbUri: String, @ArgDoc("""The database password.""") dbPassword: String, @ArgDoc("""Port number.""") portNumber: String, @ArgDoc("""The database host.""") dbHost: String, - @ArgDoc("""The root password.""") rootPassword: String) extends Serializable { + @ArgDoc("""The root password.""") rootPassword: String, + @ArgDoc("""Additional database properties.""") dbProperties: Option[Map[String,Any]]=None) extends Serializable { require(dbUri != null, "database URI is required") require(dbUserName != null, "the user name is required") diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/GraphDbFactory.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/GraphDbFactory.scala index 5cc02e606b..b0127336d5 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/GraphDbFactory.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/GraphDbFactory.scala @@ -38,6 +38,7 @@ object GraphDbFactory extends EventLogging { */ def graphDbConnector(dbConfigurations: DbConfiguration): OrientGraphNoTx = { val orientDb: ODatabaseDocumentTx = new ODatabaseDocumentTx(dbConfigurations.dbUri) + orientDb.setProperty("storage.diskCache.bufferSize", 256) val orientGraphDb = if (dbConfigurations.dbUri.startsWith("remote:")) { if (!new OServerAdmin(dbConfigurations.dbUri).connect(rootUserName, dbConfigurations.rootPassword).existsDatabase()) { new OServerAdmin(dbConfigurations.dbUri).connect(rootUserName, dbConfigurations.rootPassword).createDatabase("graph", "plocal") diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdbimport/OrientDbEdgeRdd.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdbimport/OrientDbEdgeRdd.scala index 85cc94298a..a89d3bbec6 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdbimport/OrientDbEdgeRdd.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdbimport/OrientDbEdgeRdd.scala @@ -37,7 +37,8 @@ class OrientDbEdgeRdd(sc: SparkContext, dbConfigurations: DbConfiguration) exten val edgeBuffer = new ArrayBuffer[Edge]() val schemaReader = new SchemaReader(graph) val edgeSchema = schemaReader.importEdgeSchema(partition.className) - val edges: OrientDynaElementIterable = graph.command(new OCommandSQL(s"select from ${partition.className}")).execute() + val edges: OrientDynaElementIterable = graph.command( + new OCommandSQL(s"select from cluster:${partition.clusterId} where @class='${partition.className}'")).execute() val edgeIterator = edges.iterator().asInstanceOf[java.util.Iterator[BlueprintsEdge]] while (edgeIterator.hasNext) { val edgeReader = new EdgeReader(graph, edgeSchema) diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala index df39c8276c..5bb2dfecd9 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala @@ -35,7 +35,7 @@ class EdgeFrameWriterTest extends WordSpec with TestingSparkContextWordSpec with "Edge frame writer" should { "Export edge frame" in { // exporting a vertex frame: - val dbConfig = new DbConfiguration(dbUri, dbUserName, dbUserName, "port", "host", rootPassword) + val dbConfig = new DbConfiguration(dbUri, dbUserName, dbUserName, "port", "host", rootPassword,) val vColumns = List(Column(GraphSchema.vidProperty, DataTypes.int64), Column(GraphSchema.labelProperty, DataTypes.string), Column("name", DataTypes.string), Column("from", DataTypes.string), Column("to", DataTypes.string), Column("fair", DataTypes.int32)) val vSchema = new VertexSchema(vColumns, GraphSchema.labelProperty, null) @@ -56,14 +56,14 @@ class EdgeFrameWriterTest extends WordSpec with TestingSparkContextWordSpec with //exporting the edge frame: val eColumns = List(Column(GraphSchema.edgeProperty, DataTypes.int64), Column(GraphSchema.srcVidProperty, DataTypes.int64), Column(GraphSchema.destVidProperty, DataTypes.int64), Column(GraphSchema.labelProperty, DataTypes.string), Column("distance", DataTypes.int32)) - val eSchema = new EdgeSchema(eColumns, "label", GraphSchema.labelProperty, GraphSchema.labelProperty) + val eSchema = new EdgeSchema(eColumns, "edge_label", GraphSchema.labelProperty, GraphSchema.labelProperty) val edges: List[Row] = List( new GenericRow(Array(1L, 1L, 2L, "distance1", 100)), new GenericRow(Array(2L, 2L, 3L, "distance2", 200)), new GenericRow(Array(3L, 3L, 4L, "distance3", 400))) val eRowRdd = sparkContext.parallelize(edges) val edgeFrameRdd = new EdgeFrameRdd(eSchema, eRowRdd) - val batchSize = 3 + val batchSize = 4 if (orientFileGraph.getEdgeType(eSchema.label) == null) { val schemaWriter = new SchemaWriter(orientFileGraph) val oEdgeType = schemaWriter.createEdgeSchema(eSchema) diff --git a/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/clustering/lda/LdaPluginArgs.scala b/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/clustering/lda/LdaPluginArgs.scala index e53a80906a..b07d450082 100644 --- a/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/clustering/lda/LdaPluginArgs.scala +++ b/engine-plugins/model-plugins/src/main/scala/org/trustedanalytics/atk/engine/model/plugins/clustering/lda/LdaPluginArgs.scala @@ -65,8 +65,7 @@ time the model is trained, allows LDA to generate the same topic distribution if the corpus and LDA parameters are unchanged.""") randomSeed: Option[Long] = None, @ArgDoc("""Period (in iterations) between checkpoints (default = 10). Checkpointing helps with recovery * (when nodes fail). It also helps with eliminating temporary shuffle files on disk, which can be -* important when LDA is run for many iterations. If the checkpoint directory is not set, this setting is ignored.""") - checkPointInterval: Int = 10) { +* important when LDA is run for many iterations. If the checkpoint directory is not set, this setting is ignored.""") checkPointInterval: Int = 10) { require(model != null, "model is required") require(frame != null, "frame is required") diff --git a/pom.xml b/pom.xml index 8fb0caa9e9..dc10005bce 100644 --- a/pom.xml +++ b/pom.xml @@ -307,7 +307,7 @@ export MAVEN_OPTS="-Xmx512m -XX:PermSize=256m" WDF TestSuite.txt false FTD - -Xmx1000m -XX:PermSize=256m + -Xmx1024m -XX:PermSize=256m diff --git a/testutils/src/main/scala/org/trustedanalytics/atk/testutils/TestingOrientDb.scala b/testutils/src/main/scala/org/trustedanalytics/atk/testutils/TestingOrientDb.scala index 2be7e3f8b1..217eed2348 100644 --- a/testutils/src/main/scala/org/trustedanalytics/atk/testutils/TestingOrientDb.scala +++ b/testutils/src/main/scala/org/trustedanalytics/atk/testutils/TestingOrientDb.scala @@ -16,6 +16,7 @@ package org.trustedanalytics.atk.testutils import java.io.File +import com.orientechnologies.orient.core.intent.OIntentMassiveInsert import com.tinkerpop.blueprints.impls.orient.{ OrientGraphNoTx, OrientGraphFactory } /** @@ -25,7 +26,7 @@ trait TestingOrientDb { var tmpDir: File = null var dbUri: String = null - var dbName: String = "OrientDbTest" + var dbName: String = "OrientDbTest1" var dbUserName = "admin" var dbPassword = "admin" var rootPassword = "root" @@ -44,11 +45,12 @@ trait TestingOrientDb { * create plocal Orient graph database */ def setupOrientDb(): Unit = { - - tmpDir = DirectoryUtils.createTempDirectory("orient-graph-for-unit-testing") - dbUri = "plocal:/" + tmpDir.getAbsolutePath + "/" + dbName + val uuid = java.util.UUID.randomUUID.toString + tmpDir = DirectoryUtils.createTempDirectory("orientgraphtests") + dbUri = "plocal:" + tmpDir.getAbsolutePath + "/" + dbName + uuid val factory = new OrientGraphFactory(dbUri, dbUserName, dbPassword) orientFileGraph = factory.getNoTx + orientFileGraph.declareIntent(new OIntentMassiveInsert()) } /** From fa016502e8aeb7e12849725e79168e5d8c17debb Mon Sep 17 00:00:00 2001 From: spkavuly Date: Thu, 18 Aug 2016 18:56:33 -0700 Subject: [PATCH 14/14] Update OrientDB configuration to support optional database properties --- .../atk/plugins/orientdb/DbConfiguration.scala | 2 +- .../atk/plugins/orientdb/GraphDbFactory.scala | 5 ++++- .../atk/plugins}/TestingOrientDb.scala | 10 ++++++++-- .../atk/plugins/orientdb/EdgeFrameWriterTest.scala | 4 ++-- .../atk/plugins/orientdb/EdgeWriterTest.scala | 3 ++- .../atk/plugins/orientdb/VertexFrameWriterTest.scala | 4 ++-- .../atk/plugins/orientdb/VertexWriterTest.scala | 3 ++- .../atk/plugins/orientdbimport/EdgeReaderTest.scala | 2 +- .../atk/plugins/orientdbimport/SchemaReaderTest.scala | 2 +- .../atk/plugins/orientdbimport/VertexReaderTest.scala | 2 +- 10 files changed, 24 insertions(+), 13 deletions(-) rename {testutils/src/main/scala/org/trustedanalytics/atk/testutils => engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins}/TestingOrientDb.scala (83%) diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfiguration.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfiguration.scala index 51d7166908..d5352ac70f 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfiguration.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/DbConfiguration.scala @@ -27,7 +27,7 @@ case class DbConfiguration(@ArgDoc("""OrientDB database URI.""") dbUri: String, @ArgDoc("""Port number.""") portNumber: String, @ArgDoc("""The database host.""") dbHost: String, @ArgDoc("""The root password.""") rootPassword: String, - @ArgDoc("""Additional database properties.""") dbProperties: Option[Map[String,Any]]=None) extends Serializable { + @ArgDoc("""Additional database properties.""") dbProperties: Option[Map[String, Any]] = None) extends Serializable { require(dbUri != null, "database URI is required") require(dbUserName != null, "the user name is required") diff --git a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/GraphDbFactory.scala b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/GraphDbFactory.scala index b0127336d5..7830108afd 100644 --- a/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/GraphDbFactory.scala +++ b/engine-plugins/graph-plugins/src/main/scala/org/trustedanalytics/atk/plugins/orientdb/GraphDbFactory.scala @@ -38,7 +38,10 @@ object GraphDbFactory extends EventLogging { */ def graphDbConnector(dbConfigurations: DbConfiguration): OrientGraphNoTx = { val orientDb: ODatabaseDocumentTx = new ODatabaseDocumentTx(dbConfigurations.dbUri) - orientDb.setProperty("storage.diskCache.bufferSize", 256) + dbConfigurations.dbProperties.foreach(propertyMap => { + propertyMap.foreach { case (key, value) => orientDb.setProperty(key, value) } + }) + val orientGraphDb = if (dbConfigurations.dbUri.startsWith("remote:")) { if (!new OServerAdmin(dbConfigurations.dbUri).connect(rootUserName, dbConfigurations.rootPassword).existsDatabase()) { new OServerAdmin(dbConfigurations.dbUri).connect(rootUserName, dbConfigurations.rootPassword).createDatabase("graph", "plocal") diff --git a/testutils/src/main/scala/org/trustedanalytics/atk/testutils/TestingOrientDb.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/TestingOrientDb.scala similarity index 83% rename from testutils/src/main/scala/org/trustedanalytics/atk/testutils/TestingOrientDb.scala rename to engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/TestingOrientDb.scala index 217eed2348..d3993a0434 100644 --- a/testutils/src/main/scala/org/trustedanalytics/atk/testutils/TestingOrientDb.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/TestingOrientDb.scala @@ -13,11 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.trustedanalytics.atk.testutils +package org.trustedanalytics.atk.plugins import java.io.File + import com.orientechnologies.orient.core.intent.OIntentMassiveInsert -import com.tinkerpop.blueprints.impls.orient.{ OrientGraphNoTx, OrientGraphFactory } +import com.tinkerpop.blueprints.impls.orient.{ OrientGraphFactory, OrientGraphNoTx } +import org.trustedanalytics.atk.plugins.orientdb.DbConfiguration +import org.trustedanalytics.atk.testutils.DirectoryUtils /** * setup for testing export to OrientDB plugin functions @@ -30,8 +33,10 @@ trait TestingOrientDb { var dbUserName = "admin" var dbPassword = "admin" var rootPassword = "root" + var dbConfig: DbConfiguration = null var orientMemoryGraph: OrientGraphNoTx = null var orientFileGraph: OrientGraphNoTx = null + val dbProperties: Map[String, Any] = Map(("storage.diskCache.bufferSize", 256)) /** * create in memory Orient graph database @@ -48,6 +53,7 @@ trait TestingOrientDb { val uuid = java.util.UUID.randomUUID.toString tmpDir = DirectoryUtils.createTempDirectory("orientgraphtests") dbUri = "plocal:" + tmpDir.getAbsolutePath + "/" + dbName + uuid + dbConfig = new DbConfiguration(dbUri, dbUserName, dbUserName, "port", "host", rootPassword, Some(dbProperties)) val factory = new OrientGraphFactory(dbUri, dbUserName, dbPassword) orientFileGraph = factory.getNoTx orientFileGraph.declareIntent(new OIntentMassiveInsert()) diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala index 5bb2dfecd9..acef70c58b 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeFrameWriterTest.scala @@ -20,7 +20,8 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericRow import org.scalatest.{ BeforeAndAfterEach, Matchers, WordSpec } import org.trustedanalytics.atk.domain.schema._ -import org.trustedanalytics.atk.testutils.{ TestingOrientDb, TestingSparkContextWordSpec } +import org.trustedanalytics.atk.plugins.TestingOrientDb +import org.trustedanalytics.atk.testutils.TestingSparkContextWordSpec class EdgeFrameWriterTest extends WordSpec with TestingSparkContextWordSpec with TestingOrientDb with Matchers with BeforeAndAfterEach { @@ -35,7 +36,6 @@ class EdgeFrameWriterTest extends WordSpec with TestingSparkContextWordSpec with "Edge frame writer" should { "Export edge frame" in { // exporting a vertex frame: - val dbConfig = new DbConfiguration(dbUri, dbUserName, dbUserName, "port", "host", rootPassword,) val vColumns = List(Column(GraphSchema.vidProperty, DataTypes.int64), Column(GraphSchema.labelProperty, DataTypes.string), Column("name", DataTypes.string), Column("from", DataTypes.string), Column("to", DataTypes.string), Column("fair", DataTypes.int32)) val vSchema = new VertexSchema(vColumns, GraphSchema.labelProperty, null) diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriterTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriterTest.scala index 995ae8eb05..9a9367f045 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriterTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/EdgeWriterTest.scala @@ -19,7 +19,8 @@ import org.apache.spark.atk.graph.{ Edge, Vertex } import org.apache.spark.sql.catalyst.expressions.GenericRow import org.scalatest.{ BeforeAndAfterEach, Matchers, WordSpec } import org.trustedanalytics.atk.domain.schema._ -import org.trustedanalytics.atk.testutils.{ TestingOrientDb, TestingSparkContextWordSpec } +import org.trustedanalytics.atk.plugins.TestingOrientDb +import org.trustedanalytics.atk.testutils.TestingSparkContextWordSpec class EdgeWriterTest extends WordSpec with Matchers with TestingSparkContextWordSpec with TestingOrientDb with BeforeAndAfterEach { override def beforeEach() { diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriterTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriterTest.scala index ec777c5cb6..2ce47f0271 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriterTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexFrameWriterTest.scala @@ -20,7 +20,8 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericRow import org.scalatest.{ BeforeAndAfterEach, Matchers, WordSpec } import org.trustedanalytics.atk.domain.schema.{ VertexSchema, DataTypes, GraphSchema, Column } -import org.trustedanalytics.atk.testutils.{ TestingOrientDb, TestingSparkContextWordSpec } +import org.trustedanalytics.atk.plugins.TestingOrientDb +import org.trustedanalytics.atk.testutils.TestingSparkContextWordSpec class VertexFrameWriterTest extends WordSpec with TestingSparkContextWordSpec with Matchers with TestingOrientDb with BeforeAndAfterEach { override def beforeEach() { @@ -33,7 +34,6 @@ class VertexFrameWriterTest extends WordSpec with TestingSparkContextWordSpec wi "vertex frame writer" should { "export vertex frame to OrientDB" in { - val dbConfig = new DbConfiguration(dbUri, dbUserName, dbPassword, "port", "host", rootPassword) val columns = List(Column(GraphSchema.vidProperty, DataTypes.int64), Column(GraphSchema.labelProperty, DataTypes.string), Column("name", DataTypes.string), Column("from", DataTypes.string), Column("to", DataTypes.string), Column("fair", DataTypes.int32)) val schema = new VertexSchema(columns, GraphSchema.labelProperty, null) val vertices: List[Row] = List( diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriterTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriterTest.scala index 2a88d5c9a3..cec9703408 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriterTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdb/VertexWriterTest.scala @@ -20,7 +20,8 @@ import org.apache.spark.atk.graph.Vertex import org.apache.spark.sql.catalyst.expressions.GenericRow import org.scalatest.{ BeforeAndAfterEach, Matchers, WordSpec } import org.trustedanalytics.atk.domain.schema.{ VertexSchema, DataTypes, GraphSchema, Column } -import org.trustedanalytics.atk.testutils.{ TestingOrientDb, TestingSparkContextWordSpec } +import org.trustedanalytics.atk.plugins.TestingOrientDb +import org.trustedanalytics.atk.testutils.TestingSparkContextWordSpec class VertexWriterTest extends WordSpec with Matchers with TestingSparkContextWordSpec with TestingOrientDb with BeforeAndAfterEach { diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/EdgeReaderTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/EdgeReaderTest.scala index 8dd868835e..e4768873de 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/EdgeReaderTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/EdgeReaderTest.scala @@ -20,8 +20,8 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow import org.scalatest.{ Matchers, BeforeAndAfterEach, WordSpec } import org.trustedanalytics.atk.domain.schema._ import org.trustedanalytics.atk.engine.frame.RowWrapper +import org.trustedanalytics.atk.plugins.TestingOrientDb import org.trustedanalytics.atk.plugins.orientdb.{ SchemaWriter, VertexWriter, EdgeWriter } -import org.trustedanalytics.atk.testutils.TestingOrientDb class EdgeReaderTest extends WordSpec with TestingOrientDb with Matchers with BeforeAndAfterEach { diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/SchemaReaderTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/SchemaReaderTest.scala index 9302b6677b..3c8c3554b5 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/SchemaReaderTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/SchemaReaderTest.scala @@ -20,8 +20,8 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow import org.scalatest.{ Matchers, BeforeAndAfterEach, WordSpec } import org.trustedanalytics.atk.domain.schema.DataTypes.string import org.trustedanalytics.atk.domain.schema._ +import org.trustedanalytics.atk.plugins.TestingOrientDb import org.trustedanalytics.atk.plugins.orientdb.{ SchemaWriter, EdgeWriter, VertexWriter } -import org.trustedanalytics.atk.testutils.TestingOrientDb class SchemaReaderTest extends WordSpec with TestingOrientDb with Matchers with BeforeAndAfterEach { diff --git a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/VertexReaderTest.scala b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/VertexReaderTest.scala index 8614b5a0b7..782eba929b 100644 --- a/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/VertexReaderTest.scala +++ b/engine-plugins/graph-plugins/src/test/scala/org/trustedanalytics/atk/plugins/orientdbimport/VertexReaderTest.scala @@ -20,8 +20,8 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow import org.scalatest.{ Matchers, BeforeAndAfterEach, WordSpec } import org.trustedanalytics.atk.domain.schema.{ VertexSchema, DataTypes, GraphSchema, Column } import org.trustedanalytics.atk.engine.frame.RowWrapper +import org.trustedanalytics.atk.plugins.TestingOrientDb import org.trustedanalytics.atk.plugins.orientdb.VertexWriter -import org.trustedanalytics.atk.testutils.TestingOrientDb class VertexReaderTest extends WordSpec with TestingOrientDb with Matchers with BeforeAndAfterEach {