diff --git a/ehr/resources/pipeline/kinship/populateKinship.r b/ehr/resources/pipeline/kinship/populateKinship.r index fc2cad6be..ab2f4032d 100644 --- a/ehr/resources/pipeline/kinship/populateKinship.r +++ b/ehr/resources/pipeline/kinship/populateKinship.r @@ -67,4 +67,6 @@ for (species in unique(allPed$Species)){ } # write TSV to disk +print(paste0('Total kinship records: ', nrow(newRecords))) +newRecords <- dplyr::arrange(newRecords, Id, Id2) write.table(newRecords, file = "kinship.txt", append = FALSE, row.names = FALSE, quote = FALSE, sep = '\t') \ No newline at end of file diff --git a/ehr/src/org/labkey/ehr/pipeline/GeneticCalculationsImportTask.java b/ehr/src/org/labkey/ehr/pipeline/GeneticCalculationsImportTask.java index 217ee28b3..4255211ea 100644 --- a/ehr/src/org/labkey/ehr/pipeline/GeneticCalculationsImportTask.java +++ b/ehr/src/org/labkey/ehr/pipeline/GeneticCalculationsImportTask.java @@ -38,6 +38,7 @@ import org.labkey.api.exp.property.Domain; import org.labkey.api.pipeline.AbstractTaskFactory; import org.labkey.api.pipeline.AbstractTaskFactorySettings; +import org.labkey.api.pipeline.CancelledException; import org.labkey.api.pipeline.PipelineJob; import org.labkey.api.pipeline.PipelineJobException; import org.labkey.api.pipeline.RecordedAction; @@ -59,9 +60,9 @@ import java.io.BufferedReader; import java.io.File; -import java.io.FileReader; import java.io.IOException; import java.io.LineNumberReader; +import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; @@ -151,7 +152,7 @@ public RecordedActionSet run() throws PipelineJobException FileAnalysisJobSupport support = (FileAnalysisJobSupport) job; processInbreeding(job.getContainer(), job.getUser(), support.getAnalysisDirectoryPath().toFile(), job.getLogger()); - processKinship(job.getContainer(), job.getUser(), support.getAnalysisDirectoryPath().toFile(), job.getLogger()); + processKinship(job.getContainer(), job.getUser(), support.getAnalysisDirectoryPath().toFile(), job.getLogger(), job); if (GeneticCalculationsJob.isKinshipValidation()) { @@ -165,10 +166,10 @@ public RecordedActionSet run() throws PipelineJobException public static void standaloneProcessKinshipAndInbreeding(Container c, User u, File pipelineDir, Logger log) throws PipelineJobException { processInbreeding(c, u, pipelineDir, log); - processKinship(c, u, pipelineDir, log); + processKinship(c, u, pipelineDir, log, null); } - private static void processKinship(Container c, User u, File pipelineDir, Logger log) throws PipelineJobException + private static void processKinship(Container c, User u, File pipelineDir, Logger log, @Nullable PipelineJob job) throws PipelineJobException { File output = new File(pipelineDir, KINSHIP_FILE); if (!output.exists()) @@ -240,13 +241,22 @@ else if (kinshipTable.getSqlDialect().isPostgreSQL()) } try (DbScope.Transaction transaction = ExperimentService.get().ensureTransaction(); - BufferedReader reader = Readers.getReader(output)) + BufferedReader reader = Readers.getReader(output); + PreparedStatement stmt = transaction.getConnection().prepareStatement( + "INSERT INTO " + EHRSchema.EHR_SCHEMANAME + ".kinship\n" + + "\t(Id, Id2, coefficient, container, created, createdby, modified, modifiedby)\n" + + "\tVALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { log.info("Inserting rows"); String line = null; int lineNum = 0; while ((line = reader.readLine()) != null) { + if (job != null && job.isCancelled()) + { + throw new CancelledException(); + } + String[] fields = line.split("\t"); if (fields.length < 3) continue; @@ -260,34 +270,45 @@ else if (kinshipTable.getSqlDialect().isPostgreSQL()) assert fields[0].length() < 80 : "Field Id value too long: [" + fields[0] + ']'; assert fields[1].length() < 80 : "Field Id2 value too long: [" + fields[1] + "]"; - row.put("Id", fields[0]); - row.put("Id2", fields[1]); + stmt.setString(1, fields[0]); // Id + stmt.setString(2, fields[1]); // Id2 try { - row.put("coefficient", Double.parseDouble(fields[2])); + stmt.setDouble(3, Double.parseDouble(fields[2])); // coefficient } catch (NumberFormatException e) { throw new PipelineJobException("Invalid kinship coefficient on line " + (lineNum + 1) + " for IDs " + fields[0] + " and " + fields[1] + ": " + fields[2], e); } - row.put("container", c.getId()); - row.put("created", new Date()); - row.put("createdby", u.getUserId()); - Table.insert(u, kinshipTable, row); + stmt.setString(4, c.getId()); // container + java.sql.Date now = new java.sql.Date(new Date().getTime()); + stmt.setDate(5, now); // created + stmt.setInt(6, u.getUserId()); // createdby + stmt.setDate(7, now); // modified + stmt.setInt(8, u.getUserId()); // modifiedby + + stmt.addBatch(); + lineNum++; if (lineNum % 100000 == 0) { - log.info("processed " + lineNum + " rows"); + stmt.executeBatch(); + } + + if (lineNum % 250000 == 0) + { + log.info("imported " + lineNum + " rows"); } } - log.info("Inserted " + lineNum + " rows into ehr.kinship"); + stmt.executeBatch(); transaction.commit(); + log.info("Inserted " + lineNum + " rows into ehr.kinship"); } } - catch (RuntimeSQLException | IOException e) + catch (RuntimeSQLException | SQLException | IOException e) { throw new PipelineJobException(e); }