diff --git a/.gitignore b/.gitignore index ad354819..7dc60f54 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ # Netbeans files # nb-configuration.xml +nbaction.xml # IntelliJ IDEA files # .idea @@ -43,3 +44,4 @@ dependency-reduced-pom.xml /affiliation-organization-matching/affiliation-organization-matching-workflow/src/main/oozie/workflow.xml /deduplication-organization/deduplication-organization-workflow/src/main/oozie/workflow.xml +/deduplication-document-spark/deduplication-document-spark-impl/nbproject/ diff --git a/deduplication-document-spark/deduplication-document-spark-impl/nbactions.xml b/deduplication-document-spark/deduplication-document-spark-impl/nbactions.xml new file mode 100644 index 00000000..b9137ba9 --- /dev/null +++ b/deduplication-document-spark/deduplication-document-spark-impl/nbactions.xml @@ -0,0 +1,73 @@ + + + + run + + jar + + + process-classes + org.codehaus.mojo:exec-maven-plugin:1.2.1:exec + + + -classpath %classpath pl.edu.icm.coansys.document.deduplication.DeduplicateDocuments + java + + + + debug + + jar + + + process-classes + org.codehaus.mojo:exec-maven-plugin:1.2.1:exec + + + -Xdebug -Xrunjdwp:transport=dt_socket,server=n,address=${jpda.address} -classpath %classpath pl.edu.icm.coansys.document.deduplication.DeduplicateDocuments + java + true + + + + profile + + jar + + + process-classes + org.codehaus.mojo:exec-maven-plugin:1.2.1:exec + + + -classpath %classpath pl.edu.icm.coansys.document.deduplication.DeduplicateDocuments + java + + + + CUSTOM-scala:run + scala:run + + scala:run + + + + CUSTOM-RunSmall + RunSmall + + scala:run + + + test|test2 + + + + + CUSTOM-clean,build,upload + clean,build,upload + + clean + install + wagon:upload-single + + + diff --git a/deduplication-document-spark/deduplication-document-spark-impl/pom.xml b/deduplication-document-spark/deduplication-document-spark-impl/pom.xml new file mode 100644 index 00000000..4abc9031 --- /dev/null +++ b/deduplication-document-spark/deduplication-document-spark-impl/pom.xml @@ -0,0 +1,198 @@ + + + 4.0.0 + + pl.edu.icm.coansys + deduplication-document-spark + 1.11-SNAPSHOT + + + deduplication-document-spark-impl + jar + Deduplication - Document - SparkVersion - Implementation + + + GNU AFFERO GENERAL PUBLIC LICENSE, Version 3 (AGPL-3.0) + http://opensource.org/licenses/AGPL-3.0 + + + + + ssh-cypisek + scpexe://cypisek/jobs + + + + + src/main/scala + src/test/scala + + + + + + net.alchim31.maven + scala-maven-plugin + + + + + compile + testCompile + + + + + -dependencyfile + ${project.build.directory}/.scala_dependencies + + + + + + + + base + + pl.edu.icm.coansys.document.deduplication.DeduplicateDocuments + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.3.2 + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-shade-plugin + 2.3 + + + package + + shade + + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + junit:junit + log4j:log4j:jar: + org.scala-lang:scala-library:jar: + org.apache.spark:spark-core_2.10 + org.apache.spark:spark-sql_2.10 + org.apache.spark:spark-streaming_2.10 + + + ${project.artifactId}-${project.version} + + + + org.codehaus.mojo + wagon-maven-plugin + 1.0-beta-3 + + ${project.build.directory}/${project.build.finalName}.jar + scp://cypisek-gw.ocean.icm.edu.pl/home/axnow/jobs/ + dedupdocs.jar + + + + org.scalatest + scalatest-maven-plugin + 1.0 + + ${project.build.directory}/surefire-reports + . + WDF TestSuite.txt + + + + test + + test + + + + + + + + + org.apache.maven.wagon + wagon-ssh + 2.8 + + + + + + + + ${project.groupId} + models + ${project.version} + + + ${project.groupId} + deduplication-document-impl + ${project.version} + + + org.apache.spark + spark-core_2.10 + + + org.apache.spark + spark-graphx_2.10 + + + javax.servlet + javax.servlet-api + 3.1.0 + runtime + + + com.google.guava + guava + 15.0 + + + com.github.scopt + scopt_2.10 + 3.6.0 + + + org.scalatest + scalatest_2.10 + 3.0.1 + test + + + com.holdenkarau + spark-testing-base_2.11 + 1.6.0_0.7.2 + test + + + diff --git a/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/CartesianTaskSplit.scala b/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/CartesianTaskSplit.scala new file mode 100644 index 00000000..5404d932 --- /dev/null +++ b/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/CartesianTaskSplit.scala @@ -0,0 +1,99 @@ +/* + * This file is part of CoAnSys project. + * Copyright (c) 2012-2017 ICM-UW + * + * CoAnSys is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + + * CoAnSys is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with CoAnSys. If not, see . + */ +package pl.edu.icm.coansys.document.deduplication + +import pl.edu.icm.coansys.models.DocumentProtos.DocumentWrapper + +class CartesianTaskSplit( + val clusterId: String, + val taskId: String, + val rows: Seq[DocumentWrapper], + val columns: Seq[DocumentWrapper] +) { + /** + * Generate list of clusters of the documents, where predicate is conformed, ie + * function passed returned true. The predicate is assumed to be + * symmetrical, so it is executed only once on each pair. Note, that as we + * expect that all the tiles will appear within the task, and the comparison + * operator may be expensive, only situations where row key is lesser than + * column key are taken into account + * + * @param equalityTest predicate which defines whether or no two elements + * are considered matching (typically equal) + * @return list of lists of keys of equal documents (documents where + * equalityTest returned true) + */ + def processPairs(equalityTest: (DocumentWrapper, DocumentWrapper) => Boolean): Seq[Seq[String]] = { + return List.empty + + val clusters: Seq[Seq[String]] = rows.map(row => { + val rkey = row.getDocumentMetadata.getKey + val equalColumnKeys = columns.filter(rkey < _.getDocumentMetadata.getKey) + .filter(equalityTest(row, _)) + .map(_.getDocumentMetadata.getKey) + equalColumnKeys :+ rkey + }).filter(_.size > 1) + CartesianTaskSplit.coalesceClusters(clusters) + } + +} + +object CartesianTaskSplit { + val log = org.slf4j.LoggerFactory.getLogger(getClass().getName()) + /** + * Combine clusters which have non-empty intersection, so result will be + * only separate lists. + * + * @param clusters lists to combine + * @return list of the separate clusters, obtained from merging input clusters + */ + def coalesceClusters(clusters: Seq[Seq[String]]): Seq[Seq[String]] = { + var sets = clusters.map(_.toSet[String]) + var res = List.empty[Set[String]] + while (!sets.isEmpty) { + var current = sets.head + sets = sets.tail + var ps: (Seq[Set[String]], Seq[Set[String]]) = null + do { + ps = sets.partition(_.exists(current.contains(_))) + current +: ps._1.flatMap(x => x) + sets = ps._2 + } while (!ps._1.isEmpty) + res :+ current + } + res.map(_.toSeq) + } + + /** Split one large cluster into parallel tasks of the given size. + */ + def parallelizeCluster(clusterId: String, documents: Iterable[DocumentWrapper], tileSize: Int): Seq[CartesianTaskSplit] = { + log.info(f"Document count: ${documents.size}, tile size $tileSize") + val ntiles = documents.size/tileSize + (if(documents.size % tileSize>0) 1 else 0) + println(f"ntiles: $ntiles") + + val sdoc = documents.toVector.sorted(Ordering.by[DocumentWrapper, String](_.getDocumentMetadata.getKey)) + val groupedDocs = sdoc.zipWithIndex.map(docidx => (docidx._2%ntiles, docidx._1)).groupBy[Int](_._1).mapValues(_.map(_._2).toVector).toVector + val res = groupedDocs.flatMap(kv => + groupedDocs.map(kvin => new CartesianTaskSplit( + clusterId, f"${clusterId}_${kv._1}:${kv._2}",kv._2, kvin._2 + ) + ) + ) + res + } +} diff --git a/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/DeduplicateDocuments.scala b/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/DeduplicateDocuments.scala new file mode 100644 index 00000000..0e4c59e7 --- /dev/null +++ b/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/DeduplicateDocuments.scala @@ -0,0 +1,420 @@ +/* + * This file is part of CoAnSys project. + * Copyright (c) 2012-2017 ICM-UW + * + * CoAnSys is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + + * CoAnSys is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with CoAnSys. If not, see . + */ + +package pl.edu.icm.coansys.document.deduplication +import scala.collection.JavaConversions._ +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import java.util.function.BiPredicate +import org.apache.hadoop.io.BytesWritable +import org.apache.spark.SparkConf +import pl.edu.icm.coansys.deduplication.document.voter.AuthorsVoter +import pl.edu.icm.coansys.deduplication.document.voter.DoiVoter +import pl.edu.icm.coansys.deduplication.document.voter.IssueVolumeVoter +import pl.edu.icm.coansys.deduplication.document.voter.JournalVoter +import pl.edu.icm.coansys.deduplication.document.voter.PagesVoter +import pl.edu.icm.coansys.deduplication.document.voter.SimilarityVoter +import pl.edu.icm.coansys.deduplication.document.voter.TitleVoter +import pl.edu.icm.coansys.deduplication.document.voter.YearVoter +import pl.edu.icm.coansys.document.deduplication.merge.AdvancedDuplicatesMerger +import pl.edu.icm.coansys.document.deduplication.merge.DuplicatesMerger +import pl.edu.icm.coansys.models.DocumentProtos +import pl.edu.icm.coansys.models.DocumentProtos._ +import org.apache.spark.rdd.RDD +import pl.edu.icm.coansys.deduplication.document.comparator.VotesProductComparator +import pl.edu.icm.coansys.deduplication.document.comparator.WorkComparator +import scala.collection.mutable.ListBuffer +import pl.edu.icm.coansys.document.deduplication._ +import scala.collection.JavaConverters._ + +/** Main application for the deduplication of the documents. + * + */ +object DeduplicateDocuments { + val log = org.slf4j.LoggerFactory.getLogger(getClass().getName()) + + implicit def toJavaBiPredicate[A, B](predicate: (A, B) => Boolean) = + new BiPredicate[A, B] { + def test(a: A, b: B) = predicate(a, b) + } + + def isValidDocument(doc: DocumentWrapper): Boolean = { //todo: fix based on if return value. + if (doc.hasDocumentMetadata()) { + val md = doc.getDocumentMetadata + if (md.hasBasicMetadata) { + val bmd = md.getBasicMetadata + (bmd.getTitleCount() > 0 || bmd.getAuthorCount > 0 || bmd.hasDoi || bmd.hasJournal) + } else { + false + } + } else { + false + } + } + + + def calculateKeys(doc: DocumentMetadata, initialClusteringKeySize: Int, maximumClusteringKeySize: Int): Seq[String] = { + val keySizes = initialClusteringKeySize to maximumClusteringKeySize + var res = MultiLengthTitleKeyGenerator.generateKeys(doc)(keySizes) + if (res.head.isEmpty) { + res = Array.fill[String](keySizes.length)(doc.getKey) + } + res + } + + /** + * Group items into large clusters, within which detailed analysis will be + * held. + * + * Items are grouped by keys generated from the normalised titles. + * If the cluster is too big, then longer keys are used, so smaller clusters are + * generated. Treshold is maximumClusterSize. + * + */ + def prepareInitialClustering(inputDocs: RDD[(String, DocumentWrapper)], initialClusteringKeySize: Int, + maximumClusteringKeySize: Int, maximumClusterSize: Int): RDD[(String, Iterable[DocumentWrapper])] = { + log.info("Initializing cluster preparation.") + val keySizes = initialClusteringKeySize to maximumClusteringKeySize + log.info("Will use key sizes: " + keySizes.mkString(", ")) + + val idClusterKeys = inputDocs.mapValues(doc => calculateKeys( + doc.getDocumentMetadata(), initialClusteringKeySize, maximumClusteringKeySize)) //we loose documents here, ony ids are preseved + val clusterDoc = idClusterKeys.flatMap(kv => kv._2.map(idcluster => (idcluster, kv._1))) // (clusterId => docId) + val clusterSizes = idClusterKeys.flatMap(x => (x._2.map(y => (y, 1)))).reduceByKey(_ + _) //(clusterId => clusterSize) + + //build rdd (docId, (clusterId, clusterSize) ) + val docClustersWithSizes = clusterDoc.join(clusterSizes).map(p => (p._2._1, (p._1, p._2._2))) + //build rdd - (docId, clusterId) + val selectedClusters = docClustersWithSizes.reduceByKey((x, y) => { + if (x._2 <= maximumClusterSize) { + if (y._2 <= maximumClusterSize) { + if (x._1.length <= y._1.length) { x } else { y } + } else { + x + } + } else { + if (y._2 <= maximumClusterSize) { + y + } else { + if (x._1.length > y._1.length) { x } else { y } + } + } + }).mapValues(_._1) + inputDocs.join(selectedClusters).map(p => (p._2._2, p._2._1)).groupByKey + } + + def buildDocumentsMerger(): DuplicatesMerger = { + val res = new AdvancedDuplicatesMerger + res.setup("") + res + } + + /** + * Merge the documents using appropriate document merger. + */ + def mergeDocuments(docs: List[DocumentWrapper]): DocumentWrapper = { + val merger = buildDocumentsMerger() + val merged = merger.merge(docs); + merged + } + + /** + * Defines comparator according to the weights resulting from experiments. + * + * This is reimplementation of the original Spring XML bean definition, which + * was unnecessary complication at this moment. + */ + def buildWorkComparator(): WorkComparator = { + val result = new VotesProductComparator; + result.setMinVotersWeightRequired(1.5f) + result.setProbabilityTreshold(0.5f) + result.setTresholdIncreasingVotersRequired(0.7f) + + val voters = new ListBuffer[SimilarityVoter]() + val dv = new DoiVoter() + dv.setWeight(1.0f) + voters += dv + val jv = new JournalVoter() + jv.setWeight(0.3f) + jv.setDisapproveLevel(0.5f) + jv.setApproveLevel(0.05f) + voters += jv + + val wivv = new IssueVolumeVoter + wivv.setWeight(0.3f) + wivv.setAbstainIfAbsent(true) + wivv.setSubsetResult(0.8f) + wivv.setPartiallyMatchResult(0.52f) + voters += wivv + + val wpv = new PagesVoter + wpv.setWeight(.3f) + wpv.setAbstainIfAbsent(true) + wpv.setAbsentResult(0.6f) + wpv.setSubsetResult(0.75f) + wpv.setPartiallyMatchResult(0.64f) + wpv.setRemoveRepeated(true) + voters += wpv + + val wyv = new YearVoter + wyv.setWeight(.3f) + wyv.setAbstainIfAbsent(true) + wyv.setAbsentResult(.52f) + wyv.setSubsetResult(.9f) + wyv.setPartiallyMatchResult(.75f) + wyv.setRemoveRepeated(true) + voters += wyv + + val wtv = new TitleVoter() + wtv.setWeight(0.8f) + wtv.setDisapproveLevel(0.11f) + wtv.setApproveLevel(0.001f) + wtv.setMaxNormalizedTitleLength(90) + voters += wtv + + val wav = new AuthorsVoter + wav.setWeight(0.8f) + wav.setDisapproveLevel(0.2f) + wav.setApproveLevel(0.03f) + voters += wav + + result.setSimilarityVoters(voters) + result; + } + + + case class Config( + inputFile: String = "", + outputFile: String = "", + dumpClusters: Boolean = false, + keySizeMin: Int = 5, + keySizeMax: Int = 15, + clusterSizeMax: Int = 500, + tileSize: Int = 25, + filterInvalidDocuments: Boolean = false, + removeDuplicateDocuments: Boolean = false + ) + + /** Load the documents from the given sequence file, do the optional + * cleanups. + * + */ + def loadDocuments( sc: SparkContext, file: String, + filterInvalid: Boolean, removeDoubles: Boolean):RDD[(String, DocumentWrapper)] = { + val rawbytes = sc.sequenceFile[String, BytesWritable](file).mapValues(_.copyBytes) + println("Loaded raw bytes.") + + val dirtyWrappers = rawbytes.mapValues(b => DocumentProtos.DocumentWrapper.parseFrom(b)) + + //fix invalid documents: + val fixedWrappers = if (filterInvalid) { + val x = dirtyWrappers.filter(w => isValidDocument(w._2)) + val afterSize = x.count; + val preSize = dirtyWrappers.count + log.info(f"Filtering invalid documents done, before filtering: $preSize and after filtering $afterSize documents left.") + x + } else { + dirtyWrappers + } + + if (removeDoubles) { + fixedWrappers.reduceByKey((x, y) => y) + } else { + fixedWrappers + } + } + + /** Debug method to printout top clusters. */ + def printTopClusters(finalClusters:RDD[(String, Seq[String])], count:Int):Unit = { + val finclSizes = finalClusters.mapValues(_.size).takeOrdered(100)(Ordering[Int].on(-_._2)) + println("Top 100 cluster sizes:") + finclSizes.foreach(println(_)) + println("-----\n\n") + + } + + + + + def main(args: Array[String]): Unit = { + val fixInvalidDocuments = true; + val removeDoubles = true; + + println("Starting document deduplication") + + val parser = new scopt.OptionParser[Config]("CoAnSys Deduplicate Documents") { + head("Deduplicate documents", "0.1") + + opt[Unit]('f', "filter-invalid").action((x, c) => + c.copy(filterInvalidDocuments = true)).text("filter invalid (empty) documents before run.") + + opt[Unit]('d', "remove-doubles").action((x, c) => + c.copy(removeDuplicateDocuments = true)).text("filter out duplicates sharing the same key before processing.") + + opt[Int]("cluster-key-min").abbr("kmn").action((x, c) => c.copy(keySizeMin = x)). + validate(x => + if (x >= 2) success + else failure("Value must be >=2")). + text("shortest valid key for cluster, defines pre-clustering. Recommended value more thab 4, minimum 2.") + + opt[Int]("cluster-key-max").abbr("kmx").action((x, c) => c.copy(keySizeMax = x)). + validate(x => + if (x >= 2 && x <= 20) success + else failure("Value must be >=2")). + text("longest valid key for cluster, during pre-clustering. Used to split large clusters. Recommended value more than min, minimum 2, max 20.") + + opt[Int]("cluster-size-max").abbr("cs").action((x, c) => c.copy(clusterSizeMax = x)). + text("Largest acceptable cluster size during preclustering phase. If cluster exceeds algorithm attempts to use longer key if possible. Typically 400+") + + opt[Int]("tile-size").abbr("ts").action((x, c) => c.copy(keySizeMax = x)). + validate(x => + if (x >= 2) success + else failure("Value must be >=2")). + text("Size of the tile tasks used to split large clusters. Min 2, recommended approx 40") + + arg[String]("").required.text("Input sequence file").action((f, c) => c.copy(inputFile = f)) + arg[String]("").optional.text("Output sequence file. If ommited, then no output is written but calculation is done.").action((f, c) => c.copy(outputFile = f)) + note("Blah") + } + + val cfg: Config = parser.parse(args, Config()) match { + case Some(config) => + println(f"Got config:\n${config}") + println(config); + config + case None => + // arguments are bad, error message will have been displayed + println("No config, aborting.") + return + } + + println("Creating context...") + + //required to operate protobuf correctly + val conf = new SparkConf() + .setAppName("Document deduplication") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.kryo.registrator", "pl.edu.icm.coansys.document.deduplication.DocumentWrapperKryoRegistrator") + + val sc = new SparkContext(conf) + + println("Created context...") + // sc.getConf.getAll.foreach(x => println(x._1 + ": " + x._2)) + val inputDocuments = cfg.inputFile + // for pasting into console: + // val inputDocuments = "/user/kura/curr-res-navigator/hbase-sf-out/DOCUMENT" + // val inputDocuments = "/user/kura/curr-res-navigator-no-blogs/hbase-sf-out/DOCUMENT" + val outputDocuments = cfg.outputFile + + + //load documents + val wrappers = loadDocuments(sc, inputDocuments, cfg.filterInvalidDocuments, cfg.removeDuplicateDocuments) + val initialSize = wrappers.count + println(f"Starting processing with $initialSize documents.") + + val initialGroups = prepareInitialClustering(wrappers, cfg.keySizeMin, cfg.keySizeMax, cfg.clusterSizeMax) + initialGroups.persist + + val clustersToDeduplicate = initialGroups.filter(t => t._2.size > 1) + val initialClusterCount = clustersToDeduplicate.count + //TODO: some statistics here on cluster, would be useful. + + val tiledTasks = clustersToDeduplicate.flatMap(p => CartesianTaskSplit.parallelizeCluster(p._1, p._2, cfg.tileSize)) + + tiledTasks.persist + val tileCount = tiledTasks.count; + + println(f"Prepared $initialClusterCount clusters, and then split it to $tileCount tiles") + + //build (clusterId, Seq(docId)) rdd: + val partialEqualityClusters = tiledTasks.flatMap( + task => { + val t0 = java.lang.System.currentTimeMillis() + val comparator = buildWorkComparator + val res = task.processPairs((a: DocumentWrapper, b: DocumentWrapper) => + comparator.isDuplicate(a.getDocumentMetadata, b.getDocumentMetadata, null)) + val time = (java.lang.System.currentTimeMillis() - t0) / 1000.0 + //useful for identification of possible problem. + log.info(f"Finishing tile task ${task.taskId} in $time%.4f sec") + res.map((task.clusterId, _)) + } + ) + + val finalClusters = partialEqualityClusters.mapValues(List(_)). + reduceByKey(_ ++ _). //one long list of lists of ids for each cluster + map(pair => { + val t0 = java.lang.System.currentTimeMillis() + val res = CartesianTaskSplit.coalesceClusters(pair._2.asJava) + val tt = System.currentTimeMillis() - t0; + val clusterSize = pair._2.size + log.info(f"Finished tile coalesce task. (Cluster,time[s], size): ${pair._1}, ${tt / 1000.0}, ${pair._2.size}") + (pair._1, res) + }). + flatMap( //build proper ids for equality clusters. + p => { + val cid = p._1 + val cl = p._2 + cl.zipWithIndex.map(q => (cid + f"_${q._2}%03d", q._1)) + } + ) + + //now we got all the items in place + finalClusters.persist + + printTopClusters(finalClusters, 100) + + //count clusters, documents in clusters and number of documents to be deduced: + val finalClusterCount = finalClusters.count + val documentInFinalClusterCount = finalClusters.map(_._2.size).fold(0)(_ + _) + val documentRemovedDuringClusteringCount = documentInFinalClusterCount - finalClusterCount + println(f"Finally created $finalClusterCount clusters, containing $documentInFinalClusterCount documents, $documentRemovedDuringClusteringCount documents will be removed.") + + // merge documents + val docIdWithClusterId = finalClusters.flatMapValues(x => x). + map(v => (v._2, v._1)) + val documentWrappersPrepared = wrappers.leftOuterJoin(docIdWithClusterId); + val mergedDocuments = documentWrappersPrepared.filter(_._2._2.isDefined). + map(x => (x._2._2, List(x._2._1))).foldByKey(List())(_ ++ _). //get lists of cluster documents by cluster id + map(kv => { + val doc = mergeDocuments(kv._2) + (doc.getDocumentMetadata.getKey, doc) + }) + + // documents not touched + val singularDocuments = documentWrappersPrepared.filter(_._2._2.isEmpty).map(x => (x._1, x._2._1)) + + //final result. + val finalResult = singularDocuments.union(mergedDocuments) + finalResult.persist + + val finalSize = finalResult.count + println(f"Final counts:\n-----------\n" + + f" input: $initialSize\n" + + f" output: $finalSize\n" + + f" removed: $documentRemovedDuringClusteringCount\n" + + f" difference: ${initialSize - finalSize - documentRemovedDuringClusteringCount}") + + if ("-" != outputDocuments && !outputDocuments.isEmpty) { + val bas = finalResult.mapValues(doc => doc.toByteArray()).saveAsSequenceFile(outputDocuments); + } else { + log.info("Simulating timing by counting.") + finalResult.count() + println("Finished counting.") + } + } +} diff --git a/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/DocumentWrapperKryoRegistrator.java b/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/DocumentWrapperKryoRegistrator.java new file mode 100644 index 00000000..1c096122 --- /dev/null +++ b/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/DocumentWrapperKryoRegistrator.java @@ -0,0 +1,76 @@ +package pl.edu.icm.coansys.document.deduplication; + + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.spark.serializer.KryoRegistrator; +import pl.edu.icm.coansys.models.DocumentProtos.DocumentMetadata; +import pl.edu.icm.coansys.models.DocumentProtos.DocumentWrapper; + + +/** + * Simple class which registers custom serializers for the documents protocol + * buffer generated classes. + * @author Aleksander Nowinski + */ +public class DocumentWrapperKryoRegistrator implements KryoRegistrator { + + @Override + public void registerClasses(Kryo kryo) { + kryo.register(DocumentWrapper.class, new DocumentWrapperSerializer()); + kryo.register(DocumentMetadata.class, new DocumentMetadataSerializer()); + } + + + public static class DocumentWrapperSerializer extends Serializer { + + @Override + public void write(Kryo kryo, Output output, DocumentWrapper object) { + byte[] bytes = object.toByteArray(); + + output.writeInt(bytes.length, true); + output.writeBytes(bytes); + } + + @Override + public DocumentWrapper read(Kryo kryo, Input input, Class type) { + int length = input.readInt(true); + byte[] bytes = input.readBytes(length); + + try { + return DocumentWrapper.parseFrom(bytes);//FIXME: is this exception handling ok? + } catch (InvalidProtocolBufferException ex) { + throw new RuntimeException(ex); + } + } + + } + + public static class DocumentMetadataSerializer extends Serializer { + + @Override + public void write(Kryo kryo, Output output, DocumentMetadata object) { + byte[] bytes = object.toByteArray(); + + output.writeInt(bytes.length, true); + output.writeBytes(bytes); + } + + @Override + public DocumentMetadata read(Kryo kryo, Input input, Class type) { + int length = input.readInt(true); + byte[] bytes = input.readBytes(length); + + try { + return DocumentMetadata.parseFrom(bytes);//FIXME: is this exception handling ok? + } catch (InvalidProtocolBufferException ex) { + throw new RuntimeException(ex); + } + } + + } + +} \ No newline at end of file diff --git a/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/MultiLengthTitleKeyGenerator.scala b/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/MultiLengthTitleKeyGenerator.scala new file mode 100644 index 00000000..c1ae7b8e --- /dev/null +++ b/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/MultiLengthTitleKeyGenerator.scala @@ -0,0 +1,54 @@ +/* + * This file is part of CoAnSys project. + * Copyright (c) 2012-2017 ICM-UW + * + * CoAnSys is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + + * CoAnSys is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with CoAnSys. If not, see . + */ +package pl.edu.icm.coansys.document.deduplication +import pl.edu.icm.coansys.commons.java.DocumentWrapperUtils +import pl.edu.icm.coansys.commons.java.StringTools +import pl.edu.icm.coansys.models.DocumentProtos.DocumentMetadata + +/** + * Generator for the keys used in early stage of the document deduplication. + */ +class MultiLengthTitleKeyGenerator(val keySizes: Seq[Int]) { + def cleanUpString(title: String): String = { + val normalized = StringTools.normalize(title); + //seems that normalize removes stopwords, which is wrong, and quite expensive + //val normalized = StringTools.removeStopWords(StringTools.normalize(title)); + val res = normalized.replaceAll("\\s+", "") + res + } + + def generateKeys(title: String): Seq[String] = { + val ctitle = cleanUpString(title) + val mlen = keySizes.max + val longestKey = ctitle.zipWithIndex.filter(_._2 % 2 == 0).map(_._1).take(mlen).mkString + keySizes.map(keyLength => longestKey.substring(0, Math.min(keyLength, longestKey.size))).distinct + } + + def generateKeys(document: DocumentMetadata): Seq[String] = { + val title: String = DocumentWrapperUtils.getMainTitle(document) + generateKeys(title) + } +} + + +object MultiLengthTitleKeyGenerator { + def generateKeys(document: DocumentMetadata)(keySizes: Seq[Int]): Seq[String] = { + val generator = new MultiLengthTitleKeyGenerator(keySizes) + generator.generateKeys(document) + } +} diff --git a/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/merge/AdvancedDuplicatesMerger.java b/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/merge/AdvancedDuplicatesMerger.java new file mode 100644 index 00000000..af45ee69 --- /dev/null +++ b/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/merge/AdvancedDuplicatesMerger.java @@ -0,0 +1,349 @@ +package pl.edu.icm.coansys.document.deduplication.merge; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.commons.lang.StringUtils; + +import pl.edu.icm.coansys.commons.java.DiacriticsRemover; +import pl.edu.icm.coansys.commons.java.Pair; +import pl.edu.icm.coansys.models.DocumentProtos.Author; +import pl.edu.icm.coansys.models.DocumentProtos.BasicMetadata; +import pl.edu.icm.coansys.models.DocumentProtos.DocumentMetadata; +import pl.edu.icm.coansys.models.DocumentProtos.DocumentWrapper; +import pl.edu.icm.coansys.models.DocumentProtos.KeyValue; +import pl.edu.icm.coansys.models.DocumentProtos.KeywordsList; + +/** + * Class which merge list of DocumentWrapper to one single DocumentWrapper + * object + * + * @author acz + */ +public class AdvancedDuplicatesMerger implements DuplicatesMerger { + public static final String MERGED_ID_SEPARATOR = "+"; + private Map collectionPreferences; + + @Override + public void setup(String collectionPriorities) { + collectionPreferences = new HashMap(); + + for (String coll : collectionPriorities.split("," )) { + coll = coll.trim(); + + Pattern prefPattern = Pattern.compile("^(-?\\d+):(.+)"); + Matcher prefMatcher = prefPattern.matcher(coll); + if (prefMatcher.matches()) { + String priority = prefMatcher.group(1); + String collectionName = prefMatcher.group(2); + collectionPreferences.put(collectionName, Integer.parseInt(priority)); + + } + } + } + + /** + * Chooses the best DocumentWrapper, updates keys in DocumentWrapper, + * DocumentMetadata and authors, gets extIds, auxiliarInfos from all + * DocumentWrappers, matches authors, gets extIds from matched authors + * + * @param duplicates + * @return + */ + @Override + public DocumentWrapper merge(List duplicates) { + + if (duplicates == null || duplicates.isEmpty()) { + throw new RuntimeException("Nothing to merge"); + } else if (duplicates.size() == 1) { + return duplicates.get(0); + } + + int theBestDocumentWrapperIndex = chooseTheBestIndex(duplicates); + + // Collect information from all items + List identifiers = new ArrayList(duplicates.size()); + List allExtIds = new ArrayList(); + List allAuxiliarInfos = new ArrayList(); + SortedSet sortedCollections = new TreeSet(); + List allKeywords = new ArrayList(); + + for (DocumentWrapper dw : duplicates) { + DocumentMetadata dm = dw.getDocumentMetadata(); + identifiers.add(dw.getRowId()); + List collectionList = new ArrayList(dm.getCollectionList()); + if (collectionList.isEmpty()) { + collectionList.add("unknown"); //TODO move to constants + } + for (String collection : collectionList) { + sortedCollections.add(collection); + } + allExtIds.addAll(dm.getExtIdList()); + allAuxiliarInfos.addAll(dm.getAuxiliarInfoList()); + allKeywords.addAll(dm.getKeywordsList()); + } + Collections.sort(identifiers); + String joinedIds = StringUtils.join(identifiers, MERGED_ID_SEPARATOR); + String newIdentifier = UUID.nameUUIDFromBytes(joinedIds.getBytes()).toString(); + + // Create new DocumentWrapper.Builder + DocumentWrapper.Builder resultBuilder = DocumentWrapper + .newBuilder(duplicates.get(theBestDocumentWrapperIndex)); + + // Modify fields of DocumentWrapper.Builder + resultBuilder.setRowId(newIdentifier); + + DocumentMetadata.Builder documentMetadataBuilder = resultBuilder + .getDocumentMetadataBuilder(); + + BasicMetadata.Builder basicMetadataBuilder = documentMetadataBuilder + .getBasicMetadataBuilder(); + documentMetadataBuilder.setKey(newIdentifier); + documentMetadataBuilder.addAllOrigKey(identifiers); + documentMetadataBuilder.addAllCollection(sortedCollections); + + List finalAuthorBuilderList = basicMetadataBuilder + .getAuthorBuilderList(); + for (Author.Builder authorBuilder : finalAuthorBuilderList) { + String positionSuffix = authorBuilder.getKey().replaceAll(".*(#c\\d+)", "$1"); + authorBuilder.setDocId(newIdentifier); + authorBuilder.setKey(newIdentifier + positionSuffix); + } + + List> authorListsToMerge = new ArrayList>(); + + for (int i = 0; i < duplicates.size(); i++) { + if (i != theBestDocumentWrapperIndex) { + List unmatchedList = duplicates.get(i) + .getDocumentMetadata().getBasicMetadata() + .getAuthorList(); + List matchedList = matchAuthors(finalAuthorBuilderList, + unmatchedList); + if (matchedList != null) { + authorListsToMerge.add(matchedList); + } + } + } + + mergeAuthors(finalAuthorBuilderList, authorListsToMerge); + + documentMetadataBuilder.clearExtId(); + documentMetadataBuilder.addAllExtId(mergeKeyValues(allExtIds)); + documentMetadataBuilder + .addAllAuxiliarInfo(mergeKeyValues(allAuxiliarInfos)); + documentMetadataBuilder.addAllKeywords(mergeKeywords(allKeywords)); + + // Build and return DocumentWrapper + return resultBuilder.build(); + } + + /** + * Moves some informations from author lists in listsToMerge to base list + * + * @param base + * @param listsToMerge + */ + protected void mergeAuthors(List base, + List> listsToMerge) { + + for (int i = 0; i < base.size(); i++) { + Author.Builder baseBuilder = base.get(i); + List allExtIds = new ArrayList(); + allExtIds.addAll(baseBuilder.getExtIdList()); + for (List authorsToMerge : listsToMerge) { + Author author = authorsToMerge.get(i); + if (author != null) { + allExtIds.addAll(author.getExtIdList()); + } + } + + baseBuilder.clearExtId(); + baseBuilder.addAllExtId(mergeKeyValues(allExtIds)); + } + } + + /** + * Checks if tho author lists contain the same authors. Returns second list + * in order as in base list. + * + * @param base + * @param second + * @return + */ + protected List matchAuthors(List base, + List second) { + List result = new ArrayList(base.size()); + List secondCopy = new ArrayList(second); + + for (Author.Builder author : base) { + Author foundAuthor = null; + for (Author secondAuthor : secondCopy) { + + if (equalsIgnoreCaseIgnoreDiacritics( + author.getName(), secondAuthor.getName()) + || equalsIgnoreCaseIgnoreDiacritics( + author.getForenames(), secondAuthor.getForenames()) + && equalsIgnoreCaseIgnoreDiacritics( + author.getSurname(), secondAuthor.getSurname())) { + foundAuthor = secondAuthor; + break; + } + } + if (foundAuthor != null) { + result.add(foundAuthor); + secondCopy.remove(foundAuthor); + } else { + result.add(null); + } + } + + if (result.size() == base.size()) { + return result; + } else { + return null; + } + } + + private boolean equalsIgnoreCaseIgnoreDiacritics(String firstName, + String secondName) { + if (firstName.isEmpty() || secondName.isEmpty()) { + return false; + } + return DiacriticsRemover.removeDiacritics(firstName).equalsIgnoreCase( + DiacriticsRemover.removeDiacritics(secondName)); + } + + /** + * Merges KeyValue messages. Removes repetitions, concatenates comments. + * + * @param listWithRepetitions + * @return + */ + protected static List mergeKeyValues(List listWithRepetitions) { + + Map, String> map = new HashMap, String>(); + for (KeyValue extId : listWithRepetitions) { + Pair keyValue = new Pair( + extId.getKey(), extId.getValue()); + String comment = extId.getComment(); + if (!map.containsKey(keyValue)) { + map.put(keyValue, comment); + } else if (!comment.isEmpty()) { + String oldComment = map.get(keyValue); + if (oldComment.isEmpty()) { + map.put(keyValue, comment); + } else { + map.put(keyValue, oldComment + "\t" + comment); + } + } + } + + List result = new ArrayList(); + + for (Map.Entry, String> mapEntry : map.entrySet()) { + KeyValue.Builder kvBuilder = KeyValue.newBuilder(); + kvBuilder.setKey(mapEntry.getKey().getX()); + kvBuilder.setValue(mapEntry.getKey().getY()); + String comment = mapEntry.getValue(); + if (!comment.isEmpty()) { + kvBuilder.setComment(comment); + } + result.add(kvBuilder.build()); + } + + return result; + } + + /** + * Chooses index of item which will be the base for merged result. + * + * @param duplicates + * @return + */ + protected int chooseTheBestIndex(List duplicates) { + if (collectionPreferences == null || collectionPreferences.isEmpty()) { + return 0; + } + + int bestDuplicateIdx = 0; + int bestPref = Integer.MIN_VALUE; + + for (int i = 0; i < duplicates.size(); i++) { + DocumentWrapper dw = duplicates.get(i); + for (String collection : dw.getDocumentMetadata().getCollectionList()) { + + int pref = 0; + if (collectionPreferences.containsKey(collection)) { + pref = collectionPreferences.get(collection); + } + + if (pref > bestPref) { + bestPref = pref; + bestDuplicateIdx = i; + } + } + } + return bestDuplicateIdx; + } + + private List mergeKeywords(List allKeywords) { + Map, Pair, String>> keywordsMap = new HashMap, Pair, String>>(); + // type, lang, keywords, comment + + for (KeywordsList kwdList : allKeywords) { + Pair typeAndLang = new Pair(kwdList.getType(), + kwdList.getLanguage()); + Pair, String> keywordsAndComment; + String comment = kwdList.getComment(); + if (!keywordsMap.containsKey(typeAndLang)) { + keywordsAndComment = new Pair, String>( + new HashSet(), comment); + keywordsMap.put(typeAndLang, keywordsAndComment); + } else { + keywordsAndComment = keywordsMap.get(typeAndLang); + if (!comment.isEmpty()) { + String oldComment = keywordsAndComment.getY(); + if (oldComment.isEmpty()) { + keywordsAndComment.setY(comment); + } else { + keywordsAndComment.setY(oldComment + "\t" + comment); + } + } + } + keywordsAndComment.getX().addAll(kwdList.getKeywordsList()); + } + + List result = new ArrayList(); + for (Map.Entry, Pair, String>> entry : keywordsMap + .entrySet()) { + KeywordsList.Builder kwdlBuilder = KeywordsList.newBuilder(); + String type = entry.getKey().getX(); + String lang = entry.getKey().getY(); + Set keywords = entry.getValue().getX(); + String comment = entry.getValue().getY(); + + if (type != null && !type.isEmpty()) { + kwdlBuilder.setType(type); + } + if (lang != null && !lang.isEmpty()) { + kwdlBuilder.setLanguage(lang); + } + if (comment != null && !comment.isEmpty()) { + kwdlBuilder.setComment(comment); + } + kwdlBuilder.addAllKeywords(keywords); + + result.add(kwdlBuilder.build()); + } + return result; + } +} diff --git a/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/merge/DuplicatesMerger.java b/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/merge/DuplicatesMerger.java new file mode 100644 index 00000000..dd4327de --- /dev/null +++ b/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/merge/DuplicatesMerger.java @@ -0,0 +1,13 @@ +package pl.edu.icm.coansys.document.deduplication.merge; + +import java.util.List; +import pl.edu.icm.coansys.models.DocumentProtos.DocumentWrapper; + +/** + * + * @author acz + */ +public interface DuplicatesMerger { + public void setup(String mergerConfiguration); + public DocumentWrapper merge(List duplicates); +} diff --git a/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/merge/SimpleDuplicatesMerger.java b/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/merge/SimpleDuplicatesMerger.java new file mode 100644 index 00000000..2e01d455 --- /dev/null +++ b/deduplication-document-spark/deduplication-document-spark-impl/src/main/scala/pl/edu/icm/coansys/document/deduplication/merge/SimpleDuplicatesMerger.java @@ -0,0 +1,98 @@ +package pl.edu.icm.coansys.document.deduplication.merge; + +//import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; +import org.apache.commons.lang.StringUtils; +import pl.edu.icm.coansys.models.DocumentProtos.Author; +import pl.edu.icm.coansys.models.DocumentProtos.BasicMetadata; +import pl.edu.icm.coansys.models.DocumentProtos.DocumentMetadata; +import pl.edu.icm.coansys.models.DocumentProtos.DocumentWrapper; +import pl.edu.icm.coansys.models.DocumentProtos.KeyValue; +//import pl.edu.icm.coansys.1output.merge.MergeConstants; + +/** + * Chooses first DocumentWrapper, updates keys in DocumentWrapper, DocumentMetadata and authors, gets extIds from all + * DocumentWrappers + * + * @author acz + */ +public class SimpleDuplicatesMerger implements DuplicatesMerger { + + @Override + public DocumentWrapper merge(List duplicates) { + + if (duplicates == null || duplicates.isEmpty()) { + throw new RuntimeException("Nothing to merge"); + } else if (duplicates.size() == 1) { + return duplicates.get(0); + } + + // Collect information for final result + List identifiers = new ArrayList(duplicates.size()); + Map> extIds = new HashMap>(); + SortedSet sortedCollections = new TreeSet(); + + for (DocumentWrapper dw : duplicates) { + DocumentMetadata dm = dw.getDocumentMetadata(); + identifiers.add(dw.getRowId()); + for (String collection : dm.getCollectionList()) { + sortedCollections.add(collection); + } + for (KeyValue id : dm.getExtIdList()) { + String idSource = id.getKey(); + String idValue = id.getValue(); + if (!extIds.containsKey(idSource)) { + extIds.put(idSource, new HashSet()); + } + extIds.get(idSource).add(idValue); + } + } + Collections.sort(identifiers); + String joinedIds = StringUtils.join(identifiers, "???");//MergeConstants.MERGED_ID_SEPARATOR); + String newIdentifier = UUID.nameUUIDFromBytes(joinedIds.getBytes()).toString(); + + // Create new DocumentWrapper.Builder + DocumentWrapper.Builder resultBuilder = DocumentWrapper.newBuilder(duplicates.get(0)); + + // Modify fields of DocumentWrapper.Builder + resultBuilder.setRowId(newIdentifier); + + DocumentMetadata.Builder documentMetadataBuilder = resultBuilder.getDocumentMetadataBuilder(); + + BasicMetadata.Builder basicMetadataBuilder = documentMetadataBuilder.getBasicMetadataBuilder(); + documentMetadataBuilder.setKey(newIdentifier); + documentMetadataBuilder.addAllCollection(sortedCollections); + + for (Author.Builder authorBuilder : basicMetadataBuilder.getAuthorBuilderList()) { + String positionSuffix = authorBuilder.getKey().replaceAll(".*(#c\\d+)", "$1"); + authorBuilder.setDocId(newIdentifier); + authorBuilder.setKey(newIdentifier + positionSuffix); + } + + documentMetadataBuilder.clearExtId(); + for (String eKey : extIds.keySet()) { + for (String eValue : extIds.get(eKey)) { + KeyValue.Builder ei = KeyValue.newBuilder(); + ei.setKey(eKey); + ei.setValue(eValue); + documentMetadataBuilder.addExtId(ei); + } + } + + // Build and return DocumentWrapper + return resultBuilder.build(); + } + + @Override + public void setup(String mergerConfiguration) { + } +} diff --git a/deduplication-document-spark/deduplication-document-spark-impl/src/test/scala/pl/edu/icm/coansys/document/deduplication/CartesianTaskSplitSuite.scala b/deduplication-document-spark/deduplication-document-spark-impl/src/test/scala/pl/edu/icm/coansys/document/deduplication/CartesianTaskSplitSuite.scala new file mode 100644 index 00000000..c4e0f191 --- /dev/null +++ b/deduplication-document-spark/deduplication-document-spark-impl/src/test/scala/pl/edu/icm/coansys/document/deduplication/CartesianTaskSplitSuite.scala @@ -0,0 +1,88 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ + +package pl.edu.icm.coansys.document.deduplication + +import org.scalatest.FunSuite +import org.scalatest._ +import pl.edu.icm.coansys.models.DocumentProtos +import pl.edu.icm.coansys.models.DocumentProtos._ + +class CartesianTaskSplitSuite extends FunSuite with GivenWhenThen { + + + def createDocument(key:String , title:String):DocumentWrapper = { + DocumentProtos.DocumentWrapper.newBuilder().setDocumentMetadata( + DocumentProtos.DocumentMetadata.newBuilder().setKey(key).setBasicMetadata( + DocumentProtos.BasicMetadata.newBuilder().addTitle( + DocumentProtos.TextWithLanguage.newBuilder().setText(title))) + ).setRowId(key).build(); + } + + def createDocumentList(size:Int):Seq[DocumentWrapper] = { + (1 to size).map(idx => createDocument(f"key_$idx", f"title_$idx")).toSeq + } + + def crossProduct[T](l1:Seq[T]):Seq[(T,T)] = { + crossProduct(l1, l1) + } + + + def crossProduct[T](l1:Seq[T], l2:Seq[T]):Seq[(T,T)] = { + l1.flatMap(x1=>l2.map((x1,_))) + } + + + + test("Parallelize empty set") { + Given("Empty task list") + When("We parallelise") + val res = CartesianTaskSplit.parallelizeCluster("testCluster", Seq.empty[DocumentWrapper], 10) + Then("result is empty") + assert(res.isEmpty) + } + + test("Parallelize set") { + Given("Set of 5 documents") + val docs = createDocumentList(5) + val clusterId = "testCluster" + When("We parallelise with size equal to doc number") + val res = CartesianTaskSplit.parallelizeCluster(clusterId, docs, docs.size) + Then("result is single item") + assertResult(1)(res.size) + When("We parallelize with large tile size") + val r2 = CartesianTaskSplit.parallelizeCluster(clusterId, docs, docs.size+3) + Then("result is single item") + assertResult(1)(r2.size) + When("We parallelize with large 3") + val r3 = CartesianTaskSplit.parallelizeCluster(clusterId, docs, 3) + Then("result have 4 tasks") + assertResult(4)(r3.size) + And("Each task the same Given clusterId") + assert(r3.forall(_.clusterId==clusterId)) + } + + + +// +// test("All items present in result tasks") { +// Given("Set of 5 documents") +// val docs = createDocumentList(5) +// val clusterId = "testCluster" +// When("We parallelise to size 2") +// val res = CartesianTaskSplit.parallelizeCluster(clusterId, docs,2) +// Then("Expect 9 tasks") +// assertResult(9)(res.size) +// And("Each cartesian pair is present") +// val allPairs = crossProduct(docs.map(_.getDocumentMetadata.getKey)).toSet +// +// val taskPairs = res.flatMap(task=> {crossProduct(task.rows, task.columns)}).toSet +// assertResult(allPairs)(taskPairs) +// +// } + + +} diff --git a/deduplication-document-spark/deduplication-document-spark-impl/src/test/scala/pl/edu/icm/coansys/document/deduplication/DeduplicateDocumentTest.scala b/deduplication-document-spark/deduplication-document-spark-impl/src/test/scala/pl/edu/icm/coansys/document/deduplication/DeduplicateDocumentTest.scala new file mode 100644 index 00000000..39b358c0 --- /dev/null +++ b/deduplication-document-spark/deduplication-document-spark-impl/src/test/scala/pl/edu/icm/coansys/document/deduplication/DeduplicateDocumentTest.scala @@ -0,0 +1,124 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ + +package pl.edu.icm.coansys.document.deduplication + +import com.holdenkarau.spark.testing.SharedSparkContext +import org.scalatest.FunSuite +import org.scalatest.GivenWhenThen +import pl.edu.icm.coansys.models.DocumentProtos +import pl.edu.icm.coansys.models.DocumentProtos._ + +class DeduplicateDocumentTest extends FunSuite with GivenWhenThen with SharedSparkContext { + + test("docaument validation") { + Given("Empty document") + val doc = DocumentWrapper.newBuilder().setRowId("test").build; + When("We validate") + Then("Document is invalid") + assert(!DeduplicateDocuments.isValidDocument(doc)) + Given("Doc with empty metadata") + val doc2 = DocumentWrapper.newBuilder(doc).setDocumentMetadata( + DocumentMetadata.newBuilder + .setBasicMetadata(BasicMetadata.newBuilder.build) + .setKey("Key") + .build + ).build + When("We test if it is valid") + + Then("It is not valid") + assert(!DeduplicateDocuments.isValidDocument(doc2)) + + Given("Doc with title ") + val doc3 = DocumentWrapper.newBuilder(doc2).setDocumentMetadata( + DocumentMetadata.newBuilder() + .setBasicMetadata(BasicMetadata.newBuilder().addTitle(TextWithLanguage.newBuilder.setText("Title")).build) + .setKey("key") + .build + ) + .build + When("We test if it is valid: ") + assert(DeduplicateDocuments.isValidDocument(doc3)) + Then("It is not valid") + } + + test("Sample with spark context") { + Given("RDD of sequence 1 to n (n=100)") + val n = 100 + val rdd = sc.parallelize(1 to n) + When("We sum") + val sum = rdd.sum + Then("result is n*(n+1)/2") + assertResult(n * (n + 1) / 2)(sum) + + } + + + def createDocument(key:String , title:String):DocumentWrapper = { + DocumentProtos.DocumentWrapper.newBuilder().setDocumentMetadata( + DocumentProtos.DocumentMetadata.newBuilder().setKey(key).setBasicMetadata( + DocumentProtos.BasicMetadata.newBuilder().addTitle( + DocumentProtos.TextWithLanguage.newBuilder().setText(title))) + ).setRowId(key).build(); + } + + + test("Initial clustering test:") { + Given("Data set has the same title begninnings") + + val d3 = (1 to 10).map(x=> createDocument("id_aaa"+x, "aaa")) + val d4 = (1 to 10).map(x=> createDocument("id_aaaa"+x, "aaaa")) + val d5 = (1 to 10).map(x=> createDocument("id_aaaaa"+x, "aaaaa")) + val d12 = (1 to 10).map(x=> createDocument("id_aaaaaaaaaaa"+x, "aaaaaaaaaa"+x)) + val docs = List()++d3++d4++d5++d12; + val input = sc.parallelize(docs).map(doc=> (doc.getRowId, doc)) + When("We build clustering with short key") + val r1 = DeduplicateDocuments.prepareInitialClustering(input, 2, + 2, 20) + Then("We get only one cluster, with all documents:") + val r1c = r1.collect + assert(r1c.size==1) + assert(r1c(0)._2.size==40) + And("Key is 1st and 3rd letter") + assert(r1c(0)._1=="aa") + When("We build clustering with variable key 2-3") + val r2 = DeduplicateDocuments.prepareInitialClustering(input, 2, + 3, 10) + Then("We get only two clusters:") + val r2c = r2.collect + assert(r2c.size==2) + val r2cm = r2c.toMap + assert(r2cm("aa").size==20) + assert(r2cm("aaa").size==20) +// r2c.flatMap(_._2).map(_.getKey()) +// + When("We build clustering with variable key 2-5") + val r3 = DeduplicateDocuments.prepareInitialClustering(input, 2, + 5, 10) + Then("We get 3 clusters:") + val r3c = r3.collect + assert(r3c.size==3) + val r3cm = r3c.toMap + assert(r3cm("aa").size==20) + assert(r3cm("aaa").size==10) + assert(r3cm("aaaa").size==10) + + When("We build clustering with variable key 2-6") + val r4 = DeduplicateDocuments.prepareInitialClustering(input, 2, + 6, 9) + Then("We get 11 clusters:") + val r4c = r4.collect + assert(r4c.size==11) + val r4cm = r4c.toMap + assert(r4cm("aa").size==20) + assert(r4cm("aaa").size==10) + assert(r4cm("aaaaa2").size==1) + assert(r4cm("aaaaa1").size==2) + + + } + +} diff --git a/deduplication-document-spark/deduplication-document-spark-impl/src/test/scala/pl/edu/icm/coansys/document/deduplication/MultiLengthTitleKeyGeneratorSuite.scala b/deduplication-document-spark/deduplication-document-spark-impl/src/test/scala/pl/edu/icm/coansys/document/deduplication/MultiLengthTitleKeyGeneratorSuite.scala new file mode 100644 index 00000000..13fc490c --- /dev/null +++ b/deduplication-document-spark/deduplication-document-spark-impl/src/test/scala/pl/edu/icm/coansys/document/deduplication/MultiLengthTitleKeyGeneratorSuite.scala @@ -0,0 +1,67 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ + +package pl.edu.icm.coansys.document.deduplication + +import org.scalatest.FunSuite +import org.scalatest._ + +class MultiLengthTitleKeyGeneratorSuite extends FunSuite with GivenWhenThen { + test("cleaning the string") { + Given("an empty instance") + val instance = new MultiLengthTitleKeyGenerator(3 to 7) + When("empty string is given") + val empty = "" + Then("result should be empty") + assertResult("")(instance.cleanUpString(empty)) + + When("String has varied case") + val varcas = "SomeCaseS" + Then("result should be lowercase") + assertResult("somecases")(instance.cleanUpString(varcas)) + + When("String has spaces") + val spc = "Some spaces" + Then("result should be lowercase, no spaces") + assertResult("somespaces")(instance.cleanUpString(spc)) + + When("String has punctuation") + val pct = "String with \"so called\" - phy - punctuation!" + Then("result have no punctuation nor spaces") + assertResult("stringwithsocalledphypunctuation")(instance.cleanUpString(pct)) + + When("String has some stopwords") + val stopwords = "A the long! of short and tall" + Then("result should contain no stopwords") + assertResult("longshorttall")(instance.cleanUpString(stopwords)) + info("That's all folks!") + } + + test("Building the key set") { + Given("An empty instance with sequence keyset from 1 to 6") + val instance = new MultiLengthTitleKeyGenerator(1 to 6) + When("empty string is given") + val empty = "" + Then("result should be list with single, empty string element.") + assert(instance.generateKeys(empty).size==1) + assert(instance.generateKeys(empty)(0).isEmpty) + + When("Normal string is given") + val normal = "abcdefghijklmnopqr" + Then("result array has appropriate lengths") + val normalRes = instance.generateKeys(normal) + assert(normalRes.map(_.size).toList == (1 to 6).toList) + And("result arrray has proper contents.") + assertResult(List("a", "ac", "ace", "aceg", "acegi", "acegik"))(normalRes.toList) + + When("Short string is given") + val short = "abcdef" + Then("result array has appropriate lengths") + val shortRes = instance.generateKeys(short) + assert(shortRes.map(_.size).toList == (1 to 3).toList) + assertResult(List("a", "ac", "ace"))(shortRes.toList) + } +} diff --git a/deduplication-document-spark/deduplication-document-spark-workflow/pom.xml b/deduplication-document-spark/deduplication-document-spark-workflow/pom.xml new file mode 100644 index 00000000..1c5222a6 --- /dev/null +++ b/deduplication-document-spark/deduplication-document-spark-workflow/pom.xml @@ -0,0 +1,41 @@ + + + deduplication-document-spark + pl.edu.icm.coansys + 1.11-SNAPSHOT + + 4.0.0 + deduplication-document-spark-workflow + oozie + Deduplication - Document - SparkVersion - Workflow + + UTF-8 + + + + GNU AFFERO GENERAL PUBLIC LICENSE, Version 3 (AGPL-3.0) + http://opensource.org/licenses/AGPL-3.0 + + + + + + pl.edu.icm.maven + oozie-maven-plugin + true + + + + + + pl.edu.icm.coansys + deduplication-document-spark-impl + ${project.version} + + + pl.edu.icm.oozie + oozie-runner + test + + + diff --git a/deduplication-document-spark/deduplication-document-spark-workflow/src/main/oozie/workflow.xml b/deduplication-document-spark/deduplication-document-spark-workflow/src/main/oozie/workflow.xml new file mode 100644 index 00000000..02d2053a --- /dev/null +++ b/deduplication-document-spark/deduplication-document-spark-workflow/src/main/oozie/workflow.xml @@ -0,0 +1,101 @@ + + + + + jobTracker + + + nameNode + + + queueName + default + + + input + ${inputSeqFile} + + + output + ${outputSeqFile} + + + + sparkExecutorMemory + 128G + memory for individual executor + + + sparkExecutorCores + 16 + number of cores used by single executor + + + sparkExecutorsNumber + 16 + total number of executors + + + sparkHistoryServer + http://spark-m2.vls.icm.edu.pl:18080 + Address of spark history server + + + sparkEventLogDir + hdfs:/user/spark/applicationHistory + Directory for spark events logging + + + + + + + + + + ${jobTracker} + ${nameNode} + + + + + + yarn-cluster + cluster + document-deduplication-spark + + pl.edu.icm.coansys.document.deduplication.DeduplicateDocuments + + ${comacWfPath}/lib/citation-matching-coansys-code-${project.version}.jar + + --conf spark.network.timeout=10000000 --conf spark.executor.heartbeatInterval=10000000 --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --num-executors ${sparkExecutorsNumber} --conf spark.yarn.historyServer.address=${sparkHistoryServer} --conf spark.eventLog.dir=${sparkEventLogDir} --conf spark.eventLog.enabled=true + f + d + -ts + 50 + ${input} + ${output} + + + + + + + + Workflow failed, error message [${wf:errorMessage(wf:lastErrorNode())}] + + + diff --git a/deduplication-document-spark/pom.xml b/deduplication-document-spark/pom.xml new file mode 100644 index 00000000..b46619a7 --- /dev/null +++ b/deduplication-document-spark/pom.xml @@ -0,0 +1,20 @@ + + + 4.0.0 + + + pl.edu.icm.coansys + coansys + 1.11-SNAPSHOT + + + deduplication-document-spark + pom + Deduplication - Document SparkVersion + http://maven.apache.org + + + deduplication-document-spark-impl + deduplication-document-spark-workflow + + diff --git a/deduplication-document/deduplication-document-impl/pom.xml b/deduplication-document/deduplication-document-impl/pom.xml index b3b13ce5..4fb1514c 100644 --- a/deduplication-document/deduplication-document-impl/pom.xml +++ b/deduplication-document/deduplication-document-impl/pom.xml @@ -35,6 +35,23 @@ 1.8 + + org.apache.maven.plugins + maven-shade-plugin + 3.0.0 + + + + + + package + + shade + + + + + diff --git a/pom.xml b/pom.xml index bff9ad32..4734b1a6 100644 --- a/pom.xml +++ b/pom.xml @@ -198,6 +198,7 @@ citation-matching commons deduplication-document + deduplication-document-spark deduplication-organization disambiguation disambiguation-author