Skip to content

Commit

Permalink
Ppl count approximate support (#884)
Browse files Browse the repository at this point in the history
* add functional approximation support for:
- distinct count
- top
- rare

Signed-off-by: YANGDB <[email protected]>

* update license and scalafmt

Signed-off-by: YANGDB <[email protected]>

* update additional tests using APPROX_COUNT_DISTINCT

Signed-off-by: YANGDB <[email protected]>

* add visitFirstChild(node, context) method for the PlanVisitor for simplify node inner child access visibility

Signed-off-by: YANGDB <[email protected]>

* update inline documentation

Signed-off-by: YANGDB <[email protected]>

* update according to PR comments
- DISTINCT_COUNT_APPROX should be added to keywordsCanBeId

Signed-off-by: YANGDB <[email protected]>

---------

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB authored Nov 11, 2024
1 parent a0c246b commit b53a699
Show file tree
Hide file tree
Showing 20 changed files with 668 additions and 56 deletions.
5 changes: 5 additions & 0 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ source = table | where ispresent(a) |
- `source = table | stats max(c) by b`
- `source = table | stats count(c) by b | head 5`
- `source = table | stats distinct_count(c)`
- `source = table | stats distinct_count_approx(c)`
- `source = table | stats stddev_samp(c)`
- `source = table | stats stddev_pop(c)`
- `source = table | stats percentile(c, 90)`
Expand All @@ -202,6 +203,7 @@ source = table | where ispresent(a) |
- `source = table | where a < 50 | eventstats avg(c) `
- `source = table | eventstats max(c) by b`
- `source = table | eventstats count(c) by b | head 5`
- `source = table | eventstats count(c) by b | head 5`
- `source = table | eventstats stddev_samp(c)`
- `source = table | eventstats stddev_pop(c)`
- `source = table | eventstats percentile(c, 90)`
Expand Down Expand Up @@ -246,12 +248,15 @@ source = table | where ispresent(a) |
- `source=accounts | rare gender`
- `source=accounts | rare age by gender`
- `source=accounts | rare 5 age by gender`
- `source=accounts | rare_approx age by gender`
#### **Top**
[See additional command details](ppl-top-command.md)
- `source=accounts | top gender`
- `source=accounts | top 1 gender`
- `source=accounts | top_approx 5 gender`
- `source=accounts | top 1 age by gender`
#### **Parse**
Expand Down
10 changes: 8 additions & 2 deletions docs/ppl-lang/ppl-rare-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ Using ``rare`` command to find the least common tuple of values of all fields in
**Note**: A maximum of 10 results is returned for each distinct tuple of values of the group-by fields.

**Syntax**
`rare <field-list> [by-clause]`
`rare [N] <field-list> [by-clause]`
`rare_approx [N] <field-list> [by-clause]`

* N: number of results to return. **Default**: 10
* field-list: mandatory. comma-delimited list of field names.
* by-clause: optional. one or more fields to group the results by.
* rare_approx: approximate count of the rare (n) fields by using estimated [cardinality by HyperLogLog++ algorithm](https://spark.apache.org/docs/3.5.2/sql-ref-functions-builtin.html).


### Example 1: Find the least common values in a field
Expand All @@ -19,6 +22,8 @@ The example finds least common gender of all the accounts.
PPL query:

os> source=accounts | rare gender;
os> source=accounts | rare_approx 10 gender;
os> source=accounts | rare_approx gender;
fetched rows / total rows = 2/2
+----------+
| gender |
Expand All @@ -34,7 +39,8 @@ The example finds least common age of all the accounts group by gender.

PPL query:

os> source=accounts | rare age by gender;
os> source=accounts | rare 5 age by gender;
os> source=accounts | rare_approx 5 age by gender;
fetched rows / total rows = 4/4
+----------+-------+
| gender | age |
Expand Down
7 changes: 5 additions & 2 deletions docs/ppl-lang/ppl-top-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ Using ``top`` command to find the most common tuple of values of all fields in t

### Syntax
`top [N] <field-list> [by-clause]`
`top_approx [N] <field-list> [by-clause]`

* N: number of results to return. **Default**: 10
* field-list: mandatory. comma-delimited list of field names.
* by-clause: optional. one or more fields to group the results by.

* top_approx: approximate count of the (n) top fields by using estimated [cardinality by HyperLogLog++ algorithm](https://spark.apache.org/docs/3.5.2/sql-ref-functions-builtin.html).

### Example 1: Find the most common values in a field

Expand All @@ -19,6 +20,7 @@ The example finds most common gender of all the accounts.
PPL query:

os> source=accounts | top gender;
os> source=accounts | top_approx gender;
fetched rows / total rows = 2/2
+----------+
| gender |
Expand All @@ -33,7 +35,7 @@ The example finds most common gender of all the accounts.

PPL query:

os> source=accounts | top 1 gender;
os> source=accounts | top_approx 1 gender;
fetched rows / total rows = 1/1
+----------+
| gender |
Expand All @@ -48,6 +50,7 @@ The example finds most common age of all the accounts group by gender.
PPL query:

os> source=accounts | top 1 age by gender;
os> source=accounts | top_approx 1 age by gender;
fetched rows / total rows = 2/2
+----------+-------+
| gender | age |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,4 +494,43 @@ class FlintSparkPPLAggregationWithSpanITSuite
// Compare the two plans
comparePlans(expectedPlan, logicalPlan, false)
}

test(
"create ppl simple distinct count age by span of interval of 10 years query with state filter test using approximation") {
val frame = sql(s"""
| source = $testTable | where state != 'Quebec' | stats distinct_count_approx(age) by span(age, 10) as age_span
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(Row(1, 70L), Row(1, 30L), Row(1, 20L))

// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Long](_.getAs[Long](1))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val ageField = UnresolvedAttribute("age")
val stateField = UnresolvedAttribute("state")
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val aggregateExpressions =
Alias(
UnresolvedFunction(Seq("APPROX_COUNT_DISTINCT"), Seq(ageField), isDistinct = true),
"distinct_count_approx(age)")()
val span = Alias(
Multiply(Floor(Divide(UnresolvedAttribute("age"), Literal(10))), Literal(10)),
"age_span")()
val filterExpr = Not(EqualTo(stateField, Literal("Quebec")))
val filterPlan = Filter(filterExpr, table)
val aggregatePlan = Aggregate(Seq(span), Seq(aggregateExpressions, span), filterPlan)
val expectedPlan = Project(star, aggregatePlan)

// Compare the two plans
comparePlans(expectedPlan, logicalPlan, false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,43 @@ class FlintSparkPPLAggregationsITSuite
comparePlans(expectedPlan, logicalPlan, false)
}

test("create ppl simple country distinct_count using approximation ") {
val frame = sql(s"""
| source = $testTable| stats distinct_count_approx(country)
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()

// Define the expected results
val expectedResults: Array[Row] = Array(Row(2L))

// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](1))
assert(
results.sorted.sameElements(expectedResults.sorted),
s"Expected: ${expectedResults.mkString(", ")}, but got: ${results.mkString(", ")}")

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val countryField = UnresolvedAttribute("country")
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val aggregateExpressions =
Alias(
UnresolvedFunction(Seq("APPROX_COUNT_DISTINCT"), Seq(countryField), isDistinct = true),
"distinct_count_approx(country)")()

val aggregatePlan =
Aggregate(Seq.empty, Seq(aggregateExpressions), table)
val expectedPlan = Project(star, aggregatePlan)

// Compare the two plans
comparePlans(expectedPlan, logicalPlan, false)
}

test("create ppl simple age distinct_count group by country query test with sort") {
val frame = sql(s"""
| source = $testTable | stats distinct_count(age) by country | sort country
Expand Down Expand Up @@ -881,6 +918,53 @@ class FlintSparkPPLAggregationsITSuite
s"Expected plan: ${compareByString(expectedPlan)}, but got: ${compareByString(logicalPlan)}")
}

test(
"create ppl simple age distinct_count group by country query test with sort using approximation") {
val frame = sql(s"""
| source = $testTable | stats distinct_count_approx(age) by country | sort country
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(Row(2L, "Canada"), Row(2L, "USA"))

// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](1))
assert(
results.sorted.sameElements(expectedResults.sorted),
s"Expected: ${expectedResults.mkString(", ")}, but got: ${results.mkString(", ")}")

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val aggregateExpressions =
Alias(
UnresolvedFunction(Seq("APPROX_COUNT_DISTINCT"), Seq(ageField), isDistinct = true),
"distinct_count_approx(age)")()
val productAlias = Alias(countryField, "country")()

val aggregatePlan =
Aggregate(groupByAttributes, Seq(aggregateExpressions, productAlias), table)
val sortedPlan: LogicalPlan =
Sort(
Seq(SortOrder(UnresolvedAttribute("country"), Ascending)),
global = true,
aggregatePlan)
val expectedPlan = Project(star, sortedPlan)

// Compare the two plans
assert(
compareByString(expectedPlan) === compareByString(logicalPlan),
s"Expected plan: ${compareByString(expectedPlan)}, but got: ${compareByString(logicalPlan)}")
}

test("create ppl simple age distinct_count group by country with state filter query test") {
val frame = sql(s"""
| source = $testTable | where state != 'Ontario' | stats distinct_count(age) by country
Expand Down Expand Up @@ -920,6 +1004,46 @@ class FlintSparkPPLAggregationsITSuite
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}

test(
"create ppl simple age distinct_count group by country with state filter query test using approximation") {
val frame = sql(s"""
| source = $testTable | where state != 'Ontario' | stats distinct_count_approx(age) by country
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(Row(1L, "Canada"), Row(2L, "USA"))

// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](1))
assert(results.sorted.sameElements(expectedResults.sorted))

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val stateField = UnresolvedAttribute("state")
val countryField = UnresolvedAttribute("country")
val ageField = UnresolvedAttribute("age")
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))

val groupByAttributes = Seq(Alias(countryField, "country")())
val filterExpr = Not(EqualTo(stateField, Literal("Ontario")))
val filterPlan = Filter(filterExpr, table)
val aggregateExpressions =
Alias(
UnresolvedFunction(Seq("APPROX_COUNT_DISTINCT"), Seq(ageField), isDistinct = true),
"distinct_count_approx(age)")()
val productAlias = Alias(countryField, "country")()
val aggregatePlan =
Aggregate(groupByAttributes, Seq(aggregateExpressions, productAlias), filterPlan)
val expectedPlan = Project(star, aggregatePlan)

// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}

test("two-level stats") {
val frame = sql(s"""
| source = $testTable| stats avg(age) as avg_age by state, country | stats avg(avg_age) as avg_state_age by country
Expand Down
Loading

0 comments on commit b53a699

Please sign in to comment.