Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream Processing Result Statistics Summary #3202

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.bson.Document

import java.net.URI
import java.util.Date
import scala.collection.mutable

/**
* MongoDocument provides an implementation of VirtualDocument for MongoDB.
Expand All @@ -25,6 +26,12 @@ class MongoDocument[T >: Null <: AnyRef](
var fromDocument: Document => T
) extends VirtualDocument[T] {

var previousCount: mutable.Map[String, Int] = mutable.Map()
var previousNumStats: mutable.Map[String, Map[String, Double]] = mutable.Map()
var previousDateStats: mutable.Map[String, Map[String, Date]] = mutable.Map()
var previousCatStats: mutable.Map[String, Map[String, Int]] = mutable.Map()
var previousReachedLimit: Boolean = false

/**
* The batch size for committing items to the MongoDB collection.
*/
Expand Down Expand Up @@ -143,11 +150,115 @@ class MongoDocument[T >: Null <: AnyRef](
collectionMgr.getCount
}

def getNumericColStats: Map[String, Map[String, Double]] =
collectionMgr.calculateNumericStats()
def getNumericColStats: Map[String, Map[String, Double]] = {
val offset: Int = previousCount.getOrElse("numeric_offset", 0)
val currentResult: Map[String, Map[String, Double]] =
collectionMgr.calculateNumericStats(offset)

currentResult.keys.foreach(field => {
val (prevMin, prevMax, prevMean) =
(
previousNumStats.getOrElse(field, Map("min" -> Double.MaxValue))("min"),
previousNumStats.getOrElse(field, Map("max" -> Double.MinValue))("max"),
previousNumStats.getOrElse(field, Map("mean" -> 0.0))("mean")
)

val (minValue, maxValue, meanValue, count) =
(
currentResult(field)("min"),
currentResult(field)("max"),
currentResult(field)("mean"),
currentResult(field)("count")
)

val newMin = Math.min(prevMin, minValue)
val newMax = Math.max(prevMax, maxValue)
val newMean = (prevMean * offset + meanValue * count) / (offset + count)

previousNumStats.update(field, Map("min" -> newMin, "max" -> newMax, "mean" -> newMean))
previousCount.update("numeric_offset", offset + count.toInt)
})

previousNumStats.toMap
}

def getDateColStats: Map[String, Map[String, Date]] = {
val offset: Int = previousCount.getOrElse("date_offset", 0)
val currentResult: Map[String, Map[String, Any]] = collectionMgr.calculateDateStats(offset)

currentResult.keys.foreach(field => {
val (prevMin, prevMax) =
(
previousDateStats
.getOrElse(field, Map("min" -> new java.util.Date(Long.MaxValue)))("min"),
previousDateStats.getOrElse(field, Map("max" -> new java.util.Date(Long.MinValue)))("max")
)

val (minValue: java.util.Date, maxValue: java.util.Date, count: Int) =
(currentResult(field)("min"), currentResult(field)("max"), currentResult(field)("count"))

val newMin: java.util.Date = if (minValue.before(prevMin)) minValue else prevMin
val newMax: java.util.Date = if (maxValue.after(prevMax)) maxValue else prevMax

def getDateColStats: Map[String, Map[String, Date]] = collectionMgr.calculateDateStats()
previousDateStats.update(field, Map("min" -> newMin, "max" -> newMax))
previousCount.update("date_offset", offset + count)
})

def getCategoricalStats: Map[String, Map[String, Map[String, Integer]]] =
collectionMgr.calculateCategoricalStats()
previousDateStats.toMap
}

def getCategoricalStats: Map[String, Map[String, Any]] = {
val offset: Int = previousCount.getOrElse("category_offset", 0)
val (currentResult: Map[String, Map[String, Int]], ifReachedLimit: Boolean) =
collectionMgr.calculateCategoricalStats(offset)
val result: mutable.Map[String, Map[String, Any]] = mutable.Map()

currentResult.keys.foreach(field => {
val fieldResult: mutable.Map[String, Any] = mutable.Map()
val count: Int = currentResult(field).values.sum
val newCatCounts: mutable.Map[String, Int] = mutable.Map()

currentResult(field).keys.foreach(category => {
try {
val prevCatCounts: Int = previousCatStats(field)(category)
newCatCounts.put(category, prevCatCounts + currentResult(field)(category))
} catch {
case e: NoSuchElementException =>
newCatCounts.put(category, 0 + currentResult(field)(category))
}
})

previousCatStats.update(field, newCatCounts.toMap)
previousCount.update("category_offset", offset + count)
previousReachedLimit = ifReachedLimit || previousReachedLimit

val top2 = previousCatStats(field).toSeq.sortBy(-_._2).take(2).map(_._1)
top2.size match {
case 2 =>
fieldResult("firstCat") = if (top2.head != null) top2.head else "NULL"
fieldResult("secondCat") = if (top2(1) != null) top2(1) else "NULL"
val first =
(previousCatStats(field)(top2.head).toDouble / (offset + count).toDouble) * 100
val second =
(previousCatStats(field)(top2(1)).toDouble / (offset + count).toDouble) * 100
fieldResult("firstPercent") = first
fieldResult("secondPercent") = second
fieldResult("other") = 100 - first - second
fieldResult("reachedLimit") = if (previousReachedLimit) 1 else 0
case 1 =>
fieldResult("firstCat") = if (top2.head != null) top2.head else "NULL"
fieldResult("secondCat") = ""
fieldResult("firstPercent") =
(previousCatStats(field)(top2.head).toDouble / (offset + count).toDouble) * 100
fieldResult("secondPercent") = 0
fieldResult("other") = 0
fieldResult("reachedLimit") = if (previousReachedLimit) 1 else 0
case _ => None
}

if (fieldResult.nonEmpty) result(field) = fieldResult.toMap
})

result.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class MongoCollectionManager(collection: MongoCollection[Document]) {
/**
* Calculate numeric statistics (min, max, mean) for numeric fields.
*/
def calculateNumericStats(): Map[String, Map[String, Double]] = {
def calculateNumericStats(offset: Int): Map[String, Map[String, Double]] = {
val numericFields = detectNumericFields()

numericFields.flatMap { field =>
Expand All @@ -48,8 +48,10 @@ class MongoCollectionManager(collection: MongoCollection[Document]) {
.append("minValue", new Document("$min", "$" + field))
.append("maxValue", new Document("$max", "$" + field))
.append("meanValue", new Document("$avg", "$" + field))
.append("count", new Document("$sum", 1))

val pipeline = Seq(
new Document("$skip", offset),
new Document("$project", projection),
new Document("$group", groupDoc)
)
Expand All @@ -60,7 +62,8 @@ class MongoCollectionManager(collection: MongoCollection[Document]) {
val stats = Map(
"min" -> doc.get("minValue").asInstanceOf[Number].doubleValue(),
"max" -> doc.get("maxValue").asInstanceOf[Number].doubleValue(),
"mean" -> doc.get("meanValue").asInstanceOf[Number].doubleValue()
"mean" -> doc.get("meanValue").asInstanceOf[Number].doubleValue(),
"count" -> doc.get("count").asInstanceOf[Number].doubleValue()
)
Some(field -> stats)
} else {
Expand All @@ -72,7 +75,7 @@ class MongoCollectionManager(collection: MongoCollection[Document]) {
/**
* Calculate date statistics (min, max) for date fields.
*/
def calculateDateStats(): Map[String, Map[String, Date]] = {
def calculateDateStats(offset: Int): Map[String, Map[String, Any]] = {
val dateFields = detectDateFields()

dateFields.flatMap { field =>
Expand All @@ -82,8 +85,10 @@ class MongoCollectionManager(collection: MongoCollection[Document]) {
val groupDoc = new Document("_id", null)
.append("minValue", new Document("$min", "$" + field))
.append("maxValue", new Document("$max", "$" + field))
.append("count", new Document("$sum", 1))

val pipeline = Seq(
new Document("$skip", offset),
new Document("$project", projection),
new Document("$group", groupDoc)
)
Expand All @@ -93,7 +98,8 @@ class MongoCollectionManager(collection: MongoCollection[Document]) {
val doc = result.head
val stats = Map(
"min" -> doc.get("minValue").asInstanceOf[Date],
"max" -> doc.get("maxValue").asInstanceOf[Date]
"max" -> doc.get("maxValue").asInstanceOf[Date],
"count" -> doc.get("count").asInstanceOf[Number].intValue()
)
Some(field -> stats)
} else {
Expand All @@ -103,26 +109,33 @@ class MongoCollectionManager(collection: MongoCollection[Document]) {
}

/**
* Calculate categorical statistics (value counts) for categorical fields.
* Calculate categorical statistics (value percentages) for categorical fields.
*/
def calculateCategoricalStats(): Map[String, Map[String, Map[String, Integer]]] = {
def calculateCategoricalStats(offset: Int): (Map[String, Map[String, Int]], Boolean) = {
val categoricalFields = detectCategoricalFields()
var reachedLimit: Boolean = false

(
categoricalFields.flatMap { field =>
val pipeline = Seq(
new Document("$skip", offset),
Aggregates.group("$" + field, com.mongodb.client.model.Accumulators.sum("count", 1)),
Aggregates.sort(Sorts.descending("count")),
Aggregates.limit(1000)
)

categoricalFields.flatMap { field =>
val pipeline = Seq(
Aggregates.group("$" + field, com.mongodb.client.model.Accumulators.sum("count", 1)),
Aggregates.sort(Sorts.descending("count")),
Aggregates.limit(1000)
)

val result = collection.aggregate(pipeline.asJava).iterator().asScala.toList
if (result.nonEmpty) {
val counts = result.map(doc => doc.getString("_id") -> doc.getInteger("count")).toMap
Some(field -> Map("counts" -> counts))
} else {
None
}
}.toMap
val result = collection.aggregate(pipeline.asJava).iterator().asScala.toList
if (result.nonEmpty) {
val counts =
result.map(doc => doc.getString("_id") -> doc.getInteger("count").toInt).toMap
reachedLimit = result.size >= 1000
Some(field -> counts)
} else {
None
}
}.toMap,
reachedLimit
)
}

/**
Expand Down
Loading