Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
>>> result = graph.export_to_orientdb("OrientDbDocTest",5)
<progress>
>>> result
{u'db_uri': u'remote:hostname:2424/OrientDbTest',
{u'db_uri': u'remote:hostname:2424/OrientDbDocTest',
u'exported_edges': {u'edges': {u'exported_count': 5, u'failure_count': 0}},
u'exported_vertices': {u'source': {u'exported_count': 5,u'failure_count': 0}}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ case class DbConfiguration(@ArgDoc("""OrientDB database URI.""") dbUri: String,
@ArgDoc("""The database password.""") dbPassword: String,
@ArgDoc("""Port number.""") portNumber: String,
@ArgDoc("""The database host.""") dbHost: String,
@ArgDoc("""The root password.""") rootPassword: String) extends Serializable {
@ArgDoc("""The root password.""") rootPassword: String,
@ArgDoc("""Additional database properties.""") dbProperties: Option[Map[String, Any]] = None) extends Serializable {

require(dbUri != null, "database URI is required")
require(dbUserName != null, "the user name is required")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ object GraphDbFactory extends EventLogging {
*/
def graphDbConnector(dbConfigurations: DbConfiguration): OrientGraphNoTx = {
val orientDb: ODatabaseDocumentTx = new ODatabaseDocumentTx(dbConfigurations.dbUri)
dbConfigurations.dbProperties.foreach(propertyMap => {
propertyMap.foreach { case (key, value) => orientDb.setProperty(key, value) }
})

val orientGraphDb = if (dbConfigurations.dbUri.startsWith("remote:")) {
if (!new OServerAdmin(dbConfigurations.dbUri).connect(rootUserName, dbConfigurations.rootPassword).existsDatabase()) {
new OServerAdmin(dbConfigurations.dbUri).connect(rootUserName, dbConfigurations.rootPassword).createDatabase("graph", "plocal")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class OrientDbEdgeRdd(sc: SparkContext, dbConfigurations: DbConfiguration) exten
val edgeBuffer = new ArrayBuffer[Edge]()
val schemaReader = new SchemaReader(graph)
val edgeSchema = schemaReader.importEdgeSchema(partition.className)
val edges: OrientDynaElementIterable = graph.command(new OCommandSQL(s"select from ${partition.className}")).execute()
val edges: OrientDynaElementIterable = graph.command(
new OCommandSQL(s"select from cluster:${partition.clusterId} where @class='${partition.className}'")).execute()
val edgeIterator = edges.iterator().asInstanceOf[java.util.Iterator[BlueprintsEdge]]
while (edgeIterator.hasNext) {
val edgeReader = new EdgeReader(graph, edgeSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.trustedanalytics.atk.testutils
package org.trustedanalytics.atk.plugins

import java.io.File
import com.tinkerpop.blueprints.impls.orient.{ OrientGraphNoTx, OrientGraphFactory }

import com.orientechnologies.orient.core.intent.OIntentMassiveInsert
import com.tinkerpop.blueprints.impls.orient.{ OrientGraphFactory, OrientGraphNoTx }
import org.trustedanalytics.atk.plugins.orientdb.DbConfiguration
import org.trustedanalytics.atk.testutils.DirectoryUtils

/**
* setup for testing export to OrientDB plugin functions
Expand All @@ -25,12 +29,14 @@ trait TestingOrientDb {

var tmpDir: File = null
var dbUri: String = null
var dbName: String = "OrientDbTest"
var dbName: String = "OrientDbTest1"
var dbUserName = "admin"
var dbPassword = "admin"
var rootPassword = "root"
var dbConfig: DbConfiguration = null
var orientMemoryGraph: OrientGraphNoTx = null
var orientFileGraph: OrientGraphNoTx = null
val dbProperties: Map[String, Any] = Map(("storage.diskCache.bufferSize", 256))

/**
* create in memory Orient graph database
Expand All @@ -44,11 +50,13 @@ trait TestingOrientDb {
* create plocal Orient graph database
*/
def setupOrientDb(): Unit = {

tmpDir = DirectoryUtils.createTempDirectory("orient-graph-for-unit-testing")
dbUri = "plocal:/" + tmpDir.getAbsolutePath + "/" + dbName
val uuid = java.util.UUID.randomUUID.toString
tmpDir = DirectoryUtils.createTempDirectory("orientgraphtests")
dbUri = "plocal:" + tmpDir.getAbsolutePath + "/" + dbName + uuid
dbConfig = new DbConfiguration(dbUri, dbUserName, dbUserName, "port", "host", rootPassword, Some(dbProperties))
val factory = new OrientGraphFactory(dbUri, dbUserName, dbPassword)
orientFileGraph = factory.getNoTx
orientFileGraph.declareIntent(new OIntentMassiveInsert())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.scalatest.{ BeforeAndAfterEach, Matchers, WordSpec }
import org.trustedanalytics.atk.domain.schema._
import org.trustedanalytics.atk.testutils.{ TestingOrientDb, TestingSparkContextWordSpec }
import org.trustedanalytics.atk.plugins.TestingOrientDb
import org.trustedanalytics.atk.testutils.TestingSparkContextWordSpec

class EdgeFrameWriterTest extends WordSpec with TestingSparkContextWordSpec with TestingOrientDb with Matchers with BeforeAndAfterEach {

Expand All @@ -35,7 +36,6 @@ class EdgeFrameWriterTest extends WordSpec with TestingSparkContextWordSpec with
"Edge frame writer" should {
"Export edge frame" in {
// exporting a vertex frame:
val dbConfig = new DbConfiguration(dbUri, dbUserName, dbUserName, "port", "host", rootPassword)
val vColumns = List(Column(GraphSchema.vidProperty, DataTypes.int64), Column(GraphSchema.labelProperty, DataTypes.string), Column("name", DataTypes.string), Column("from", DataTypes.string), Column("to", DataTypes.string), Column("fair", DataTypes.int32))
val vSchema = new VertexSchema(vColumns, GraphSchema.labelProperty, null)

Expand All @@ -56,14 +56,14 @@ class EdgeFrameWriterTest extends WordSpec with TestingSparkContextWordSpec with

//exporting the edge frame:
val eColumns = List(Column(GraphSchema.edgeProperty, DataTypes.int64), Column(GraphSchema.srcVidProperty, DataTypes.int64), Column(GraphSchema.destVidProperty, DataTypes.int64), Column(GraphSchema.labelProperty, DataTypes.string), Column("distance", DataTypes.int32))
val eSchema = new EdgeSchema(eColumns, "label", GraphSchema.labelProperty, GraphSchema.labelProperty)
val eSchema = new EdgeSchema(eColumns, "edge_label", GraphSchema.labelProperty, GraphSchema.labelProperty)
val edges: List[Row] = List(
new GenericRow(Array(1L, 1L, 2L, "distance1", 100)),
new GenericRow(Array(2L, 2L, 3L, "distance2", 200)),
new GenericRow(Array(3L, 3L, 4L, "distance3", 400)))
val eRowRdd = sparkContext.parallelize(edges)
val edgeFrameRdd = new EdgeFrameRdd(eSchema, eRowRdd)
val batchSize = 3
val batchSize = 4
if (orientFileGraph.getEdgeType(eSchema.label) == null) {
val schemaWriter = new SchemaWriter(orientFileGraph)
val oEdgeType = schemaWriter.createEdgeSchema(eSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import org.apache.spark.atk.graph.{ Edge, Vertex }
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.scalatest.{ BeforeAndAfterEach, Matchers, WordSpec }
import org.trustedanalytics.atk.domain.schema._
import org.trustedanalytics.atk.testutils.{ TestingOrientDb, TestingSparkContextWordSpec }
import org.trustedanalytics.atk.plugins.TestingOrientDb
import org.trustedanalytics.atk.testutils.TestingSparkContextWordSpec

class EdgeWriterTest extends WordSpec with Matchers with TestingSparkContextWordSpec with TestingOrientDb with BeforeAndAfterEach {
override def beforeEach() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.scalatest.{ BeforeAndAfterEach, Matchers, WordSpec }
import org.trustedanalytics.atk.domain.schema.{ VertexSchema, DataTypes, GraphSchema, Column }
import org.trustedanalytics.atk.testutils.{ TestingOrientDb, TestingSparkContextWordSpec }
import org.trustedanalytics.atk.plugins.TestingOrientDb
import org.trustedanalytics.atk.testutils.TestingSparkContextWordSpec

class VertexFrameWriterTest extends WordSpec with TestingSparkContextWordSpec with Matchers with TestingOrientDb with BeforeAndAfterEach {
override def beforeEach() {
Expand All @@ -33,7 +34,6 @@ class VertexFrameWriterTest extends WordSpec with TestingSparkContextWordSpec wi

"vertex frame writer" should {
"export vertex frame to OrientDB" in {
val dbConfig = new DbConfiguration(dbUri, dbUserName, dbPassword, "port", "host", rootPassword)
val columns = List(Column(GraphSchema.vidProperty, DataTypes.int64), Column(GraphSchema.labelProperty, DataTypes.string), Column("name", DataTypes.string), Column("from", DataTypes.string), Column("to", DataTypes.string), Column("fair", DataTypes.int32))
val schema = new VertexSchema(columns, GraphSchema.labelProperty, null)
val vertices: List[Row] = List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import org.apache.spark.atk.graph.Vertex
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.scalatest.{ BeforeAndAfterEach, Matchers, WordSpec }
import org.trustedanalytics.atk.domain.schema.{ VertexSchema, DataTypes, GraphSchema, Column }
import org.trustedanalytics.atk.testutils.{ TestingOrientDb, TestingSparkContextWordSpec }
import org.trustedanalytics.atk.plugins.TestingOrientDb
import org.trustedanalytics.atk.testutils.TestingSparkContextWordSpec

class VertexWriterTest extends WordSpec with Matchers with TestingSparkContextWordSpec with TestingOrientDb with BeforeAndAfterEach {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.scalatest.{ Matchers, BeforeAndAfterEach, WordSpec }
import org.trustedanalytics.atk.domain.schema._
import org.trustedanalytics.atk.engine.frame.RowWrapper
import org.trustedanalytics.atk.plugins.TestingOrientDb
import org.trustedanalytics.atk.plugins.orientdb.{ SchemaWriter, VertexWriter, EdgeWriter }
import org.trustedanalytics.atk.testutils.TestingOrientDb

class EdgeReaderTest extends WordSpec with TestingOrientDb with Matchers with BeforeAndAfterEach {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.scalatest.{ Matchers, BeforeAndAfterEach, WordSpec }
import org.trustedanalytics.atk.domain.schema.DataTypes.string
import org.trustedanalytics.atk.domain.schema._
import org.trustedanalytics.atk.plugins.TestingOrientDb
import org.trustedanalytics.atk.plugins.orientdb.{ SchemaWriter, EdgeWriter, VertexWriter }
import org.trustedanalytics.atk.testutils.TestingOrientDb

class SchemaReaderTest extends WordSpec with TestingOrientDb with Matchers with BeforeAndAfterEach {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.scalatest.{ Matchers, BeforeAndAfterEach, WordSpec }
import org.trustedanalytics.atk.domain.schema.{ VertexSchema, DataTypes, GraphSchema, Column }
import org.trustedanalytics.atk.engine.frame.RowWrapper
import org.trustedanalytics.atk.plugins.TestingOrientDb
import org.trustedanalytics.atk.plugins.orientdb.VertexWriter
import org.trustedanalytics.atk.testutils.TestingOrientDb

class VertexReaderTest extends WordSpec with TestingOrientDb with Matchers with BeforeAndAfterEach {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ time the model is trained, allows LDA to generate the same topic distribution
if the corpus and LDA parameters are unchanged.""") randomSeed: Option[Long] = None,
@ArgDoc("""Period (in iterations) between checkpoints (default = 10). Checkpointing helps with recovery
* (when nodes fail). It also helps with eliminating temporary shuffle files on disk, which can be
* important when LDA is run for many iterations. If the checkpoint directory is not set, this setting is ignored.""")
checkPointInterval: Int = 10) {
* important when LDA is run for many iterations. If the checkpoint directory is not set, this setting is ignored.""") checkPointInterval: Int = 10) {

require(model != null, "model is required")
require(frame != null, "frame is required")
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ export MAVEN_OPTS="-Xmx512m -XX:PermSize=256m"
<dep.hive.version>1.1.0-${dep.cdh.version}</dep.hive.version>
<dev.spark.hive.version>1.6.0-${dep.cdh.version}</dev.spark.hive.version>
<dep.daal.version>2016.2.181</dep.daal.version>
<dep.orientdb.version>2.1.16</dep.orientdb.version>
<dep.orientdb.version>2.2.6</dep.orientdb.version>

<!--START GAO MAVEN -->

Expand Down Expand Up @@ -307,7 +307,7 @@ export MAVEN_OPTS="-Xmx512m -XX:PermSize=256m"
<filereports>WDF TestSuite.txt</filereports>
<parallel>false</parallel>
<stdout>FTD</stdout>
<argLine>-Xmx512m -XX:PermSize=256m</argLine>
<argLine>-Xmx1024m -XX:PermSize=256m</argLine>
</configuration>
<executions>
<execution>
Expand Down