Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
eafc85d
Initial version of spark-based document deduplication. It contains
axnow Mar 17, 2017
e95a8cc
Work on the algorithm which splits large clusters among the cluster
axnow Apr 4, 2017
efb3fc2
Complete version with tiled comparison task.
axnow Apr 7, 2017
fab0915
Work on tiled optimalization.
axnow Apr 14, 2017
6180bba
Stable version, does proper job within 2.5h on full data set.
axnow Jun 23, 2017
28834d2
Added options parsing from command line to control app behaviour.
axnow Jun 26, 2017
3a749d4
Added dependency for the scopt.
axnow Jul 11, 2017
0cfdcbb
Task tiling class rewritten to scala, with tests.
axnow Jul 14, 2017
2e0c205
Scala version.
axnow Jul 23, 2017
1a2c5dd
Fixed oozie workflow building.
axnow Jul 24, 2017
ca4592b
Cleaning up project files
axnow Jul 24, 2017
ece39dc
Initial version of spark-based document deduplication. It contains
axnow Mar 17, 2017
c056a0b
Work on the algorithm which splits large clusters among the cluster
axnow Apr 4, 2017
e7ad7aa
Complete version with tiled comparison task.
axnow Apr 7, 2017
0cf6672
Work on tiled optimalization.
axnow Apr 14, 2017
013b53c
Stable version, does proper job within 2.5h on full data set.
axnow Jun 23, 2017
ac56042
Added options parsing from command line to control app behaviour.
axnow Jun 26, 2017
81f6509
Added dependency for the scopt.
axnow Jul 11, 2017
cd1014c
Scala version.
axnow Jul 23, 2017
221cd52
Fixed oozie workflow building.
axnow Jul 24, 2017
20da6f8
Merge branch 'sparkdeduplication' of https://github.com/axnow/CoAnSys…
axnow Jul 25, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

# Netbeans files #
nb-configuration.xml
nbaction.xml

# IntelliJ IDEA files #
.idea
Expand Down Expand Up @@ -43,3 +44,4 @@ dependency-reduced-pom.xml
/affiliation-organization-matching/affiliation-organization-matching-workflow/src/main/oozie/workflow.xml
/deduplication-organization/deduplication-organization-workflow/src/main/oozie/workflow.xml

/deduplication-document-spark/deduplication-document-spark-impl/nbproject/
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?xml version="1.0" encoding="UTF-8"?>
<actions>
<action>
<actionName>run</actionName>
<packagings>
<packaging>jar</packaging>
</packagings>
<goals>
<goal>process-classes</goal>
<goal>org.codehaus.mojo:exec-maven-plugin:1.2.1:exec</goal>
</goals>
<properties>
<exec.args>-classpath %classpath pl.edu.icm.coansys.document.deduplication.DeduplicateDocuments</exec.args>
<exec.executable>java</exec.executable>
</properties>
</action>
<action>
<actionName>debug</actionName>
<packagings>
<packaging>jar</packaging>
</packagings>
<goals>
<goal>process-classes</goal>
<goal>org.codehaus.mojo:exec-maven-plugin:1.2.1:exec</goal>
</goals>
<properties>
<exec.args>-Xdebug -Xrunjdwp:transport=dt_socket,server=n,address=${jpda.address} -classpath %classpath pl.edu.icm.coansys.document.deduplication.DeduplicateDocuments</exec.args>
<exec.executable>java</exec.executable>
<jpda.listen>true</jpda.listen>
</properties>
</action>
<action>
<actionName>profile</actionName>
<packagings>
<packaging>jar</packaging>
</packagings>
<goals>
<goal>process-classes</goal>
<goal>org.codehaus.mojo:exec-maven-plugin:1.2.1:exec</goal>
</goals>
<properties>
<exec.args>-classpath %classpath pl.edu.icm.coansys.document.deduplication.DeduplicateDocuments</exec.args>
<exec.executable>java</exec.executable>
</properties>
</action>
<action>
<actionName>CUSTOM-scala:run</actionName>
<displayName>scala:run</displayName>
<goals>
<goal>scala:run</goal>
</goals>
</action>
<action>
<actionName>CUSTOM-RunSmall</actionName>
<displayName>RunSmall</displayName>
<goals>
<goal>scala:run</goal>
</goals>
<properties>
<addArgs>test|test2</addArgs>

</properties>
</action>
<action>
<actionName>CUSTOM-clean,build,upload</actionName>
<displayName>clean,build,upload</displayName>
<goals>
<goal>clean</goal>
<goal>install</goal>
<goal>wagon:upload-single</goal>
</goals>
</action>
</actions>
198 changes: 198 additions & 0 deletions deduplication-document-spark/deduplication-document-spark-impl/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>pl.edu.icm.coansys</groupId>
<artifactId>deduplication-document-spark</artifactId>
<version>1.11-SNAPSHOT</version>
</parent>

<artifactId>deduplication-document-spark-impl</artifactId>
<packaging>jar</packaging>
<name>Deduplication - Document - SparkVersion - Implementation</name>
<licenses>
<license>
<name>GNU AFFERO GENERAL PUBLIC LICENSE, Version 3 (AGPL-3.0)</name>
<url>http://opensource.org/licenses/AGPL-3.0</url>
</license>
</licenses>
<distributionManagement>
<repository>
<id>ssh-cypisek</id>
<url>scpexe://cypisek/jobs</url>
</repository>
</distributionManagement>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>



<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<!--<version>3.1.6</version>-->
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<!-- <arg>-make:transitive</arg>-->
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
<configuration>
<launchers>
<launcher>
<id>base</id>
<!-- This is to set the main to App.scala-->
<mainClass>pl.edu.icm.coansys.document.deduplication.DeduplicateDocuments</mainClass>
<args>
<!--<arg>${basedir}</arg>-->
</args>
</launcher>
</launchers>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<excludes>
<exclude>junit:junit</exclude>
<exclude>log4j:log4j:jar:</exclude>
<exclude>org.scala-lang:scala-library:jar:</exclude>
<exclude>org.apache.spark:spark-core_2.10</exclude>
<exclude>org.apache.spark:spark-sql_2.10</exclude>
<exclude>org.apache.spark:spark-streaming_2.10</exclude>
</excludes>
</artifactSet>
<finalName>${project.artifactId}-${project.version}</finalName>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>wagon-maven-plugin</artifactId>
<version>1.0-beta-3</version>
<configuration>
<fromFile>${project.build.directory}/${project.build.finalName}.jar</fromFile>
<url>scp://cypisek-gw.ocean.icm.edu.pl/home/axnow/jobs/</url>
<toFile>dedupdocs.jar</toFile>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<extensions>
<!-- Enabling the use of SSH -->
<extension>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-ssh</artifactId>
<version>2.8</version>
</extension>
</extensions>
</build>


<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>models</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>deduplication-document-impl</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.10</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>15.0</version>
</dependency>
<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_2.10</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.10</artifactId>
<version>3.0.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.holdenkarau</groupId>
<artifactId>spark-testing-base_2.11</artifactId>
<version>1.6.0_0.7.2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* This file is part of CoAnSys project.
* Copyright (c) 2012-2017 ICM-UW
*
* CoAnSys is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.

* CoAnSys is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with CoAnSys. If not, see <http://www.gnu.org/licenses/>.
*/
package pl.edu.icm.coansys.document.deduplication

import pl.edu.icm.coansys.models.DocumentProtos.DocumentWrapper

class CartesianTaskSplit(
val clusterId: String,
val taskId: String,
val rows: Seq[DocumentWrapper],
val columns: Seq[DocumentWrapper]
) {
/**
* Generate list of clusters of the documents, where predicate is conformed, ie
* function passed returned true. The predicate is assumed to be
* symmetrical, so it is executed only once on each pair. Note, that as we
* expect that all the tiles will appear within the task, and the comparison
* operator may be expensive, only situations where row key is lesser than
* column key are taken into account
*
* @param equalityTest predicate which defines whether or no two elements
* are considered matching (typically equal)
* @return list of lists of keys of equal documents (documents where
* equalityTest returned true)
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong indentation in this and other files.

Warning: Intellij has bad default way of formatting case clases (and classes with parametric fields, like CartesianTaskSplit), which should be changed before using code formatting in this IDE:
https://stackoverflow.com/a/26880974
Don't omit useful comment on this answer: "If you want 4 spaces indent (according to Scala style guides) in 'File -> Settings... -> Code Style -> Scala -> Other' check 'Alternate indentation for constructor args and parameter declarations' with 4"

Personally I find it more readable when there is an empty line after class/function header. This could be set in:
Editor > Code Style > Scala: Minimum Blank Lines: for {After class header, Before method body} change 0 to 1

def processPairs(equalityTest: (DocumentWrapper, DocumentWrapper) => Boolean): Seq[Seq[String]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function name could be more meaningful.

return List.empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to return empty list here? The rest of the method body is dead code.


val clusters: Seq[Seq[String]] = rows.map(row => {
val rkey = row.getDocumentMetadata.getKey
val equalColumnKeys = columns.filter(rkey < _.getDocumentMetadata.getKey)
.filter(equalityTest(row, _))
.map(_.getDocumentMetadata.getKey)
equalColumnKeys :+ rkey
}).filter(_.size > 1)
CartesianTaskSplit.coalesceClusters(clusters)
}

}

object CartesianTaskSplit {
val log = org.slf4j.LoggerFactory.getLogger(getClass().getName())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty parentheses should be removed in method calls that do not have side effects (here and in other places in this code)
'log' variable can be private

/**
* Combine clusters which have non-empty intersection, so result will be
* only separate lists.
*
* @param clusters lists to combine
* @return list of the separate clusters, obtained from merging input clusters
*/
def coalesceClusters(clusters: Seq[Seq[String]]): Seq[Seq[String]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDE has a few good suggestions for this method (e.g. use nonEmpty instead of !sets.isEmpty, val instead of var (where possible), change flatMap(x => x) to flatten).

var sets = clusters.map(_.toSet[String])
var res = List.empty[Set[String]]
while (!sets.isEmpty) {
var current = sets.head
sets = sets.tail
var ps: (Seq[Set[String]], Seq[Set[String]]) = null
do {
ps = sets.partition(_.exists(current.contains(_)))
current +: ps._1.flatMap(x => x)
sets = ps._2
} while (!ps._1.isEmpty)
res :+ current
}
res.map(_.toSeq)
}

/** Split one large cluster into parallel tasks of the given size.
*/
def parallelizeCluster(clusterId: String, documents: Iterable[DocumentWrapper], tileSize: Int): Seq[CartesianTaskSplit] = {
log.info(f"Document count: ${documents.size}, tile size $tileSize")
val ntiles = documents.size/tileSize + (if(documents.size % tileSize>0) 1 else 0)
println(f"ntiles: $ntiles")

val sdoc = documents.toVector.sorted(Ordering.by[DocumentWrapper, String](_.getDocumentMetadata.getKey))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary type annotation [DocumentWrapper, String] in Ordering.by. That doesn't make the code more readable.

val groupedDocs = sdoc.zipWithIndex.map(docidx => (docidx._2%ntiles, docidx._1)).groupBy[Int](_._1).mapValues(_.map(_._2).toVector).toVector
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

groupedDocs type annotation would make code more readable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pattern matching should be used for the parameters of the map lambda function for code clarity. Refrences to docidx._1 and docidx._2 can be then replaced with direct references to values from pattern matching: doc and idx respectively.

.map(docidx => …)` -> `.map { case (doc, idx) => … }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line too long, splitting it over subsequent transformations would be way more readable:

  val groupedDocs: Seq[(Int, Seq[DocumentWrapper])] =
    sdoc
      .zipWithIndex
      .map { case (doc, idx) => (idx % ntiles, doc) }
      .groupBy(_._1)
      .mapValues(_.map(_._2).toVector)
      .toVector

val res = groupedDocs.flatMap(kv =>
groupedDocs.map(kvin => new CartesianTaskSplit(
clusterId, f"${clusterId}_${kv._1}:${kv._2}",kv._2, kvin._2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pattern matching should be used in map & flatMap just like described previously.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line too long, arguments should be on separate lines:

      groupedDocs.map(kvin =>
        new CartesianTaskSplit(
          clusterId,
          f"${clusterId}_${kv._1}:${kv._2}",
          kv._2,
          kvin._2
        )
      )

)
)
)
res
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to define val res here, the expression assigned to it could just be simply the last expression in this function.

}
}
Loading