diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala index 3cd5008cd1..c151053027 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/result/MongoDocument.scala @@ -9,6 +9,7 @@ import org.bson.Document import java.net.URI import java.util.Date +import scala.collection.mutable /** * MongoDocument provides an implementation of VirtualDocument for MongoDB. @@ -25,6 +26,12 @@ class MongoDocument[T >: Null <: AnyRef]( var fromDocument: Document => T ) extends VirtualDocument[T] { + var previousCount: mutable.Map[String, Int] = mutable.Map() + var previousNumStats: mutable.Map[String, Map[String, Double]] = mutable.Map() + var previousDateStats: mutable.Map[String, Map[String, Date]] = mutable.Map() + var previousCatStats: mutable.Map[String, Map[String, Int]] = mutable.Map() + var previousReachedLimit: Boolean = false + /** * The batch size for committing items to the MongoDB collection. */ @@ -143,11 +150,115 @@ class MongoDocument[T >: Null <: AnyRef]( collectionMgr.getCount } - def getNumericColStats: Map[String, Map[String, Double]] = - collectionMgr.calculateNumericStats() + def getNumericColStats: Map[String, Map[String, Double]] = { + val offset: Int = previousCount.getOrElse("numeric_offset", 0) + val currentResult: Map[String, Map[String, Double]] = + collectionMgr.calculateNumericStats(offset) + + currentResult.keys.foreach(field => { + val (prevMin, prevMax, prevMean) = + ( + previousNumStats.getOrElse(field, Map("min" -> Double.MaxValue))("min"), + previousNumStats.getOrElse(field, Map("max" -> Double.MinValue))("max"), + previousNumStats.getOrElse(field, Map("mean" -> 0.0))("mean") + ) + + val (minValue, maxValue, meanValue, count) = + ( + currentResult(field)("min"), + currentResult(field)("max"), + currentResult(field)("mean"), + currentResult(field)("count") + ) + + val newMin = Math.min(prevMin, minValue) + val newMax = Math.max(prevMax, maxValue) + val newMean = (prevMean * offset + meanValue * count) / (offset + count) + + previousNumStats.update(field, Map("min" -> newMin, "max" -> newMax, "mean" -> newMean)) + previousCount.update("numeric_offset", offset + count.toInt) + }) + + previousNumStats.toMap + } + + def getDateColStats: Map[String, Map[String, Date]] = { + val offset: Int = previousCount.getOrElse("date_offset", 0) + val currentResult: Map[String, Map[String, Any]] = collectionMgr.calculateDateStats(offset) + + currentResult.keys.foreach(field => { + val (prevMin, prevMax) = + ( + previousDateStats + .getOrElse(field, Map("min" -> new java.util.Date(Long.MaxValue)))("min"), + previousDateStats.getOrElse(field, Map("max" -> new java.util.Date(Long.MinValue)))("max") + ) + + val (minValue: java.util.Date, maxValue: java.util.Date, count: Int) = + (currentResult(field)("min"), currentResult(field)("max"), currentResult(field)("count")) + + val newMin: java.util.Date = if (minValue.before(prevMin)) minValue else prevMin + val newMax: java.util.Date = if (maxValue.after(prevMax)) maxValue else prevMax - def getDateColStats: Map[String, Map[String, Date]] = collectionMgr.calculateDateStats() + previousDateStats.update(field, Map("min" -> newMin, "max" -> newMax)) + previousCount.update("date_offset", offset + count) + }) - def getCategoricalStats: Map[String, Map[String, Map[String, Integer]]] = - collectionMgr.calculateCategoricalStats() + previousDateStats.toMap + } + + def getCategoricalStats: Map[String, Map[String, Any]] = { + val offset: Int = previousCount.getOrElse("category_offset", 0) + val (currentResult: Map[String, Map[String, Int]], ifReachedLimit: Boolean) = + collectionMgr.calculateCategoricalStats(offset) + val result: mutable.Map[String, Map[String, Any]] = mutable.Map() + + currentResult.keys.foreach(field => { + val fieldResult: mutable.Map[String, Any] = mutable.Map() + val count: Int = currentResult(field).values.sum + val newCatCounts: mutable.Map[String, Int] = mutable.Map() + + currentResult(field).keys.foreach(category => { + try { + val prevCatCounts: Int = previousCatStats(field)(category) + newCatCounts.put(category, prevCatCounts + currentResult(field)(category)) + } catch { + case e: NoSuchElementException => + newCatCounts.put(category, 0 + currentResult(field)(category)) + } + }) + + previousCatStats.update(field, newCatCounts.toMap) + previousCount.update("category_offset", offset + count) + previousReachedLimit = ifReachedLimit || previousReachedLimit + + val top2 = previousCatStats(field).toSeq.sortBy(-_._2).take(2).map(_._1) + top2.size match { + case 2 => + fieldResult("firstCat") = if (top2.head != null) top2.head else "NULL" + fieldResult("secondCat") = if (top2(1) != null) top2(1) else "NULL" + val first = + (previousCatStats(field)(top2.head).toDouble / (offset + count).toDouble) * 100 + val second = + (previousCatStats(field)(top2(1)).toDouble / (offset + count).toDouble) * 100 + fieldResult("firstPercent") = first + fieldResult("secondPercent") = second + fieldResult("other") = 100 - first - second + fieldResult("reachedLimit") = if (previousReachedLimit) 1 else 0 + case 1 => + fieldResult("firstCat") = if (top2.head != null) top2.head else "NULL" + fieldResult("secondCat") = "" + fieldResult("firstPercent") = + (previousCatStats(field)(top2.head).toDouble / (offset + count).toDouble) * 100 + fieldResult("secondPercent") = 0 + fieldResult("other") = 0 + fieldResult("reachedLimit") = if (previousReachedLimit) 1 else 0 + case _ => None + } + + if (fieldResult.nonEmpty) result(field) = fieldResult.toMap + }) + + result.toMap + } } diff --git a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/util/mongo/MongoCollectionManager.scala b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/util/mongo/MongoCollectionManager.scala index b3f650450f..2d1c925c1c 100644 --- a/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/util/mongo/MongoCollectionManager.scala +++ b/core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/util/mongo/MongoCollectionManager.scala @@ -37,7 +37,7 @@ class MongoCollectionManager(collection: MongoCollection[Document]) { /** * Calculate numeric statistics (min, max, mean) for numeric fields. */ - def calculateNumericStats(): Map[String, Map[String, Double]] = { + def calculateNumericStats(offset: Int): Map[String, Map[String, Double]] = { val numericFields = detectNumericFields() numericFields.flatMap { field => @@ -48,8 +48,10 @@ class MongoCollectionManager(collection: MongoCollection[Document]) { .append("minValue", new Document("$min", "$" + field)) .append("maxValue", new Document("$max", "$" + field)) .append("meanValue", new Document("$avg", "$" + field)) + .append("count", new Document("$sum", 1)) val pipeline = Seq( + new Document("$skip", offset), new Document("$project", projection), new Document("$group", groupDoc) ) @@ -60,7 +62,8 @@ class MongoCollectionManager(collection: MongoCollection[Document]) { val stats = Map( "min" -> doc.get("minValue").asInstanceOf[Number].doubleValue(), "max" -> doc.get("maxValue").asInstanceOf[Number].doubleValue(), - "mean" -> doc.get("meanValue").asInstanceOf[Number].doubleValue() + "mean" -> doc.get("meanValue").asInstanceOf[Number].doubleValue(), + "count" -> doc.get("count").asInstanceOf[Number].doubleValue() ) Some(field -> stats) } else { @@ -72,7 +75,7 @@ class MongoCollectionManager(collection: MongoCollection[Document]) { /** * Calculate date statistics (min, max) for date fields. */ - def calculateDateStats(): Map[String, Map[String, Date]] = { + def calculateDateStats(offset: Int): Map[String, Map[String, Any]] = { val dateFields = detectDateFields() dateFields.flatMap { field => @@ -82,8 +85,10 @@ class MongoCollectionManager(collection: MongoCollection[Document]) { val groupDoc = new Document("_id", null) .append("minValue", new Document("$min", "$" + field)) .append("maxValue", new Document("$max", "$" + field)) + .append("count", new Document("$sum", 1)) val pipeline = Seq( + new Document("$skip", offset), new Document("$project", projection), new Document("$group", groupDoc) ) @@ -93,7 +98,8 @@ class MongoCollectionManager(collection: MongoCollection[Document]) { val doc = result.head val stats = Map( "min" -> doc.get("minValue").asInstanceOf[Date], - "max" -> doc.get("maxValue").asInstanceOf[Date] + "max" -> doc.get("maxValue").asInstanceOf[Date], + "count" -> doc.get("count").asInstanceOf[Number].intValue() ) Some(field -> stats) } else { @@ -103,26 +109,33 @@ class MongoCollectionManager(collection: MongoCollection[Document]) { } /** - * Calculate categorical statistics (value counts) for categorical fields. + * Calculate categorical statistics (value percentages) for categorical fields. */ - def calculateCategoricalStats(): Map[String, Map[String, Map[String, Integer]]] = { + def calculateCategoricalStats(offset: Int): (Map[String, Map[String, Int]], Boolean) = { val categoricalFields = detectCategoricalFields() + var reachedLimit: Boolean = false + + ( + categoricalFields.flatMap { field => + val pipeline = Seq( + new Document("$skip", offset), + Aggregates.group("$" + field, com.mongodb.client.model.Accumulators.sum("count", 1)), + Aggregates.sort(Sorts.descending("count")), + Aggregates.limit(1000) + ) - categoricalFields.flatMap { field => - val pipeline = Seq( - Aggregates.group("$" + field, com.mongodb.client.model.Accumulators.sum("count", 1)), - Aggregates.sort(Sorts.descending("count")), - Aggregates.limit(1000) - ) - - val result = collection.aggregate(pipeline.asJava).iterator().asScala.toList - if (result.nonEmpty) { - val counts = result.map(doc => doc.getString("_id") -> doc.getInteger("count")).toMap - Some(field -> Map("counts" -> counts)) - } else { - None - } - }.toMap + val result = collection.aggregate(pipeline.asJava).iterator().asScala.toList + if (result.nonEmpty) { + val counts = + result.map(doc => doc.getString("_id") -> doc.getInteger("count").toInt).toMap + reachedLimit = result.size >= 1000 + Some(field -> counts) + } else { + None + } + }.toMap, + reachedLimit + ) } /**