Skip to content

Commit

Permalink
Merge pull request #71 from NCATS-Gamma/prevent-redos
Browse files Browse the repository at this point in the history
The previous approach -- splitting the input file into chunks -- made it difficult to identify which records had already been generated and which remained to be processed. This PR replaces that system with one based around specifying the rows that need to be processed on each run, but then providing tools so that these don't need to be specified by the operator. 

Specifically, this PR:
- Modifies RoboCORD.scala so that, rather than specifying the "chunk" ID, you specify which rows should be processed in that run.
- Writes output to an intermediate file (named `result_from_${startIndex}_until_${endIndex}.in-progress.txt`) before renaming it to a final output file (named `result_from_${startIndex}_until_${endIndex}.tsv`). This doesn't mean much right now, since I think we store everything in memory before writing it all out to disk, but once we replace processing with FS2 streams, this will actually be important.
- Modifies robocord.job so that an input job can be specified with the number of chunks to run (using the syntax `--array=0-[number of chunks]`), and then have it calculate which runs should be processed in each run.
- Adds a new program called RoboCORDManager that goes through the list of output files produced and confirms that every row in the metadata.csv file has been processed. If not, it can batch remaining rows into jobs and start them uses the `robocord-sbatch.sh` script, which itself uses `srun`.

These changes are documented in the README file.
  • Loading branch information
gaurav authored Sep 1, 2020
2 parents 2d29eb6 + 8dc7c5e commit d8fed92
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 41 deletions.
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ PARALLEL = 4

# The date of CORD-19 data to download.
ROBOCORD_DATE="2020-08-30"

.PHONY: all
all: output

Expand Down Expand Up @@ -91,7 +90,8 @@ robocord-test: SciGraph
ln -s robocord-outputs/${ROBOCORD_DATE} robocord-output; \
fi

JAVA_OPTS="-Xmx$(MEMORY)" sbt "runMain org.renci.robocord.RoboCORD --metadata robocord-data/metadata.csv --current-chunk 4 --total-chunks 1000 robocord-data"
JAVA_OPTS="-Xmx$(MEMORY)" sbt "runMain org.renci.robocord.RoboCORD --metadata robocord-data/metadata.csv --current-chunk 5 --total-chunks 1000 robocord-data"
JAVA_OPTS="-Xmx$(MEMORY)" sbt "runMain org.renci.robocord.RoboCORD --metadata robocord-data/metadata.csv --current-chunk 6 --total-chunks 1000 robocord-data"
JAVA_OPTS="-Xmx$(MEMORY)" sbt "runMain org.renci.robocord.RoboCORD --metadata robocord-data/metadata.csv --current-chunk 437 --total-chunks 1000 robocord-data"
JAVA_OPTS="-Xmx$(MEMORY)" sbt "runMain org.renci.robocord.RoboCORD --metadata robocord-data/metadata.csv --from-row 512 --until-row 640 robocord-data"
JAVA_OPTS="-Xmx$(MEMORY)" sbt "runMain org.renci.robocord.RoboCORD --metadata robocord-data/metadata.csv --from-row 640 --until-row 768 robocord-data"
JAVA_OPTS="-Xmx$(MEMORY)" sbt "runMain org.renci.robocord.RoboCORD --metadata robocord-data/metadata.csv --from-row 768 --until-row 896 robocord-data"
JAVA_OPTS="-Xmx$(MEMORY)" sbt "runMain org.renci.robocord.RoboCORD --metadata robocord-data/metadata.csv --from-row 55936 --until-row 56064 robocord-data"
JAVA_OPTS="-Xmx$(MEMORY)" sbt "runMain org.renci.robocord.RoboCORD --metadata robocord-data/metadata.csv --from-row 140776 --until-row 141109 robocord-data"
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,21 @@

Extract ontology terms referenced from PubMed abstracts as per the [MEDLINE/PubMed Baseline Repository](https://mbr.nlm.nih.gov/) by using [SciGraph](https://github.com/SciGraph/SciGraph) against a set of ontologies.

## Ontologies
# OmniCORD

Extract ontology terms used in the [COVID-19 Open Research Dataset (CORD)](https://www.semanticscholar.org/cord19) as tab-delimited files for further processing in [COVID-KOP](https://www.ncbi.nlm.nih.gov/pmc/articles/PMC7316095/).

In order to generate OmniCORD output files, you should:
1. Use `robocord.job` to attempt to run all the jobs on a SLURM cluster.
Any number of jobs can be specified, but values of around 4000 seem
to work with. Example: `sbatch --array=0-3999 robocord.job`.
2. Use RoboCORDManager to re-run any jobs that failed to complete. You can
use the `--dry-run` option to see what jobs will be executed before they
are run. Jobs are executed using the `robocord-sbatch.sh` script, so
modify that if necessary.
Example: `srun sbt "runMain org.renci.robocord.RoboCORDManager --job-size 20`

# Ontologies used
Currently, we look for terms from the following ontologies:
* [Uberon (base)](http://uberon.org) ([OWL](http://purl.obolibrary.org/obo/uberon/uberon-base.owl))
* [ChEBI](https://www.ebi.ac.uk/chebi/) ([OWL](http://purl.obolibrary.org/obo/chebi.owl))
Expand Down
11 changes: 8 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ wartremoverWarnings ++= Warts.unsafe

javaOptions += "-Xmx20G"

fork in Test := true
fork := true

testFrameworks += new TestFramework("utest.runner.Framework")

Expand All @@ -43,12 +43,17 @@ libraryDependencies ++= {
"org.scala-lang.modules" %% "scala-xml" % "1.0.6",
"io.scigraph" % "scigraph-core" % "2.1-SNAPSHOT",
"io.scigraph" % "scigraph-entity" % "2.1-SNAPSHOT",
"com.typesafe.scala-logging" %% "scala-logging" % "3.7.1",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"org.codehaus.groovy" % "groovy-all" % "2.4.6",
"org.apache.jena" % "apache-jena-libs" % "3.13.1",

// Testing
"com.lihaoyi" %% "utest" % "0.7.1" % "test",

// Logging
"com.typesafe.scala-logging" %% "scala-logging" % "3.7.1",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"com.outr" %% "scribe" % "2.7.3",

// Command line argument parsing.
"org.rogach" %% "scallop" % "3.3.2",

Expand Down
29 changes: 29 additions & 0 deletions robocord-sbatch.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/bash

# All arguments are passed on to the RoboCORD instance.

sbatch <<EOT
#!/bin/bash
#
#SBATCH --job-name=RoboCORD
#SBATCH --output=robocord-output/log-output-%A.txt
#SBATCH --error=robocord-output/log-error-%A.txt
#SBATCH --cpus-per-task 16
#SBATCH --mem=50000
#SBATCH --time=2:00:00
#SBATCH [email protected]
set -e # Exit immediately if a pipeline fails.
export JAVA_OPTS="-Xmx50G"
export MY_SCIGRAPH=omnicorp-scigraph-\$SLURM_JOB_ID
echo "Duplicating omnicorp-scigraph so we can use it on multiple clusters"
cp -R omnicorp-scigraph "scigraphs/\$MY_SCIGRAPH"
echo "Starting RoboCORD with arguments: --neo4j-location scigraphs/\$MY_SCIGRAPH $@"
sbt "runMain org.renci.robocord.RoboCORD --neo4j-location scigraphs/\$MY_SCIGRAPH $@"
rm -rf "scigraphs/\$MY_SCIGRAPH"
echo "Deleted duplicated omnicorp-scigraph"
EOT
29 changes: 22 additions & 7 deletions robocord.job
Original file line number Diff line number Diff line change
@@ -1,23 +1,38 @@
#!/bin/bash
#
# This script should be run:
# sbatch --array=0-3999 robocord.job
# Where the total number of jobs (3999 in the example above) can be
# any number.
#
#SBATCH --job-name=RoboCORD
#SBATCH --output=robocord-output/log-output-%a.txt
#SBATCH --error=robocord-output/log-error-%a.txt
#SBATCH --cpus-per-task 16
#SBATCH --mem=50000
#SBATCH --time=12:00:00
#SBATCH --time=4:00:00
#SBATCH [email protected]

set -e # Exit immediately if a pipeline fails.

export JAVA_OPTS="-Xmx50G"
export MY_SCIGRAPH=omnicorp-scigraph-$SLURM_ARRAY_TASK_ID

echo "Duplicating omnicorp-scigraph so we can use it on multiple clusters"
cp -R omnicorp-scigraph "scigraphs/$MY_SCIGRAPH"
export METADATA_SIZE=$(wc -l < robocord-data/metadata.csv)
export CHUNK_SIZE=$(($METADATA_SIZE/$SLURM_ARRAY_TASK_MAX))
export FROM_ROW=$(($SLURM_ARRAY_TASK_ID * $CHUNK_SIZE))
export UNTIL_ROW=$(($FROM_ROW + $CHUNK_SIZE))
export OUTPUT_FILENAME=robocord-output/result_from_${FROM_ROW}_until_${UNTIL_ROW}.tsv

if [ -f $OUTPUT_FILENAME ]; then
echo Output filename $OUTPUT_FILENAME already exists, skipping.
else
echo "Duplicating omnicorp-scigraph so we can use it on multiple clusters"
cp -R omnicorp-scigraph "scigraphs/$MY_SCIGRAPH"

echo "Starting RoboCORD"
sbt "runMain org.renci.robocord.RoboCORD --metadata robocord-data/metadata.csv --current-chunk $SLURM_ARRAY_TASK_ID --total-chunks $SLURM_ARRAY_TASK_MAX --output-prefix robocord-output/results --neo4j-location scigraphs/$MY_SCIGRAPH robocord-data"
echo "Starting RoboCORD from row $FROM_ROW until $UNTIL_ROW."
sbt "runMain org.renci.robocord.RoboCORD --metadata robocord-data/metadata.csv --from-row $FROM_ROW --until-row $UNTIL_ROW --output-prefix robocord-output/result --neo4j-location scigraphs/$MY_SCIGRAPH robocord-data"

rm -rf "scigraphs/$MY_SCIGRAPH"
echo "Deleted duplicated omnicorp-scigraph"
rm -rf "scigraphs/$MY_SCIGRAPH"
echo "Deleted duplicated omnicorp-scigraph"
fi
55 changes: 38 additions & 17 deletions src/main/scala/org/renci/robocord/RoboCORD.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package org.renci.robocord

import java.io.{File, FileWriter, PrintWriter}
import java.nio.file.{CopyOption, Files, StandardCopyOption}
import java.time.Duration

import com.github.tototoshi.csv._
import com.typesafe.scalalogging.{LazyLogging, Logger}
import org.renci.robocord.annotator.Annotator
import org.renci.robocord.json.{CORDArticleWrapper, CORDJsonReader}
import org.renci.robocord.json.CORDJsonReader
import org.rogach.scallop._
import org.rogach.scallop.exceptions._

Expand Down Expand Up @@ -34,20 +35,20 @@ object RoboCORD extends App with LazyLogging {
default = Some(new File("robocord-data/all_sources_metadata_latest.csv"))
)
val outputPrefix: ScallopOption[String] = opt[String](
descr = "Prefix for the filename (we will add '_from_<start index>_until_<end index>.txt' to the filename)",
descr = "Prefix for the filename (we will add '_from_<start index>_until_<end index>.tsv' to the filename)",
default = Some("robocord-output/result")
)
val neo4jLocation: ScallopOption[File] = opt[File](
descr = "Location of the Neo4J database that SciGraph should use.",
default = Some(new File("omnicorp-scigraph"))
)
val currentChunk: ScallopOption[Int] = opt[Int](
descr = "The current chunks (from 0 to totalChunks-1)",
default = Some(0)
val fromRow: ScallopOption[Int] = opt[Int](
descr = "The row to start processing on",
required = true
)
val totalChunks: ScallopOption[Int] = opt[Int](
descr = "The total number of chunks to process",
default = Some(-1)
val untilRow: ScallopOption[Int] = opt[Int](
descr = "The row to end processing before (i.e. this row will not be processed)",
required = true
)
val context: ScallopOption[Int] = opt[Int](
descr = "How many characters before and after the matched term should be displayed.",
Expand Down Expand Up @@ -94,16 +95,30 @@ object RoboCORD extends App with LazyLogging {
))

// Which metadata entries do we actually need to process?
val currentChunk: Int = conf.currentChunk()
val totalChunks: Int = if (conf.totalChunks() == -1) allMetadata.size else conf.totalChunks()
val chunkLength: Int = allMetadata.size/totalChunks
val startIndex: Int = currentChunk * chunkLength
val endIndex: Int = startIndex + chunkLength
val startIndex = conf.fromRow.toOption.get
val endIndex = conf.untilRow.toOption.get

// Do we already have an output file? If so, we abort.
val inProgressFilename = conf.outputPrefix() + s"_from_${startIndex}_until_$endIndex.in-progress.txt"
if (new File(inProgressFilename).exists()) {
if (new File(inProgressFilename).delete())
logger.info(s"In-progress output file '${inProgressFilename}' already exists and has been deleted.")
else {
logger.info(s"In-progress output file '${inProgressFilename}' already exists but could not be deleted.")
System.exit(2)
}
}

val outputFilename = conf.outputPrefix() + s"_from_${startIndex}_until_$endIndex.tsv"
if (new File(outputFilename).length > 0) {
logger.info(s"Output file '${outputFilename}' already exists, skipping.")
System.exit(0)
}

// Divide allMetadata into chunks based on totalChunks.
val metadata: Seq[Map[String, String]] = allMetadata.slice(startIndex, endIndex)
val articlesTotal = metadata.size
logger.info(s"Selected $articlesTotal articles for processing (from $startIndex until $endIndex, chunk $currentChunk out of $totalChunks)")
logger.info(s"Selected $articlesTotal articles for processing (from $startIndex until $endIndex)")

// Run SciGraph in parallel over the chunk we need to process.
logger.info(s"Starting SciGraph in parallel on ${Runtime.getRuntime.availableProcessors} processors.")
Expand All @@ -130,7 +145,7 @@ object RoboCORD extends App with LazyLogging {
// Choose an "article ID", which is one of: (1) PubMed ID, (2) DOI, (3) PMCID or (4) CORD_UID.
val articleId = if (pmid.nonEmpty && pmid.get.nonEmpty) pmid.map("PMID:" + _).mkString("|")
else if(doi.nonEmpty && doi.get.nonEmpty) doi.map("DOI:" + _).mkString("|")
else if(pmcid.nonEmpty) pmcid.map("PMCID:" + _)
else if(pmcid.nonEmpty) s"PMCID:${pmcid}"
else s"CORD_UID:$id"

// Full-text articles are stored by path. We might have multiple PMC or PDF parses; we prioritize PMC over PDF.
Expand Down Expand Up @@ -181,12 +196,18 @@ object RoboCORD extends App with LazyLogging {
})

// Write out all the results to the output file.
val outputFilename = conf.outputPrefix() + s"_from_${startIndex}_until_$endIndex.txt"
logger.info(s"Writing tab-delimited output to $outputFilename.")
val pw = new PrintWriter(new FileWriter(new File(outputFilename)))
val pw = new PrintWriter(new FileWriter(new File(inProgressFilename)))
results.foreach(pw.println(_))
pw.close

val duration = Duration.ofNanos(System.nanoTime - timeStarted)
logger.info(s"Completed generating ${results.size} results for $articlesTotal articles in ${duration.getSeconds} seconds ($duration)")

Files.move(
new File(inProgressFilename).toPath,
new File(outputFilename).toPath,
StandardCopyOption.REPLACE_EXISTING
)
logger.info(s"Renamed ${inProgressFilename} to ${outputFilename}; file ready for use.")
}
Loading

0 comments on commit d8fed92

Please sign in to comment.