Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proof of concept implementation of asof Join #7

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
private var ordering = implicitly[Ordering[K]]

// An array of upper bounds for the first (partitions - 1) partitions
private var rangeBounds: Array[K] = {
var rangeBounds: Array[K] = {
if (partitions <= 1) {
Array.empty
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,19 @@ case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
}
}

case class AsofJoin(
left: LogicalPlan,
right: LogicalPlan,
leftOn: Expression,
rightOn: Expression,
leftBy: Seq[Expression],
rightBy: Seq[Expression],
tolerance: Long)
extends BinaryNode {

override def output: Seq[Attribute] = left.output ++ right.output.map(_.withNullability(true))
}

case class Join(
left: LogicalPlan,
right: LogicalPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@

package org.apache.spark.sql.catalyst.plans.physical

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import com.google.common.collect.{Range => GRange, RangeMap, TreeRangeMap}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{DataType, IntegerType}


/**
* Specifies how tuples that share common expressions will be distributed when a query is executed
* in parallel on many machines. Distribution can be used to refer to two distinct physical
Expand Down Expand Up @@ -148,6 +155,43 @@ case class BroadcastDistribution(mode: BroadcastMode) extends Distribution {
}
}

/**
* A object that holds distribution ranges to be shared cross different nodes.
*
* This is not thread safe.
*/
class DelayedRange extends Serializable {
var range: IndexedSeq[GRange[java.lang.Long]] = _

def realized(): Boolean = {
range != null
}

def setRange(range: IndexedSeq[GRange[java.lang.Long]]): Unit = {
this.range = range
}

def getRange(): IndexedSeq[GRange[java.lang.Long]] = {
if (range == null) {
throw new Exception("DelayedRange is not realized")
} else {
range
}
}
}

case class DelayedOverlappedRangeDistribution(
key: Expression,
overlap: Long,
core: Boolean // Decides whether it is range-defining
) extends Distribution {
override def requiredNumPartitions: Option[Int] = None

override def createPartitioning(numPartitions: Int): Partitioning = {
DelayedOverlappedRangePartitioning(key, null, numPartitions, overlap, core)
}
}

/**
* Describes how an operator's output is split across partitions. It has 2 major properties:
* 1. number of partitions.
Expand Down Expand Up @@ -261,6 +305,95 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
}
}

case class DelayedOverlappedRangePartitioning(
key: Expression,
var delayedRange: DelayedRange,
numPartitions: Int,
overlap: Long,
core: Boolean
) extends Expression with Partitioning with Unevaluable {
override def children: Seq[Expression] = Seq(key)
override def nullable: Boolean = false
override def dataType: DataType = IntegerType

def setDelayedRange(delayedRange: DelayedRange): Unit = this.delayedRange = delayedRange

override def satisfies(required: Distribution): Boolean = {
super.satisfies(required) || {
required match {
case DelayedOverlappedRangeDistribution(requiredKey, requiredOverlap, _) =>
key.semanticEquals(requiredKey) && overlap == requiredOverlap
case OrderedDistribution(ordering) =>
(ordering.length == 1) &&
ordering.head.child.semanticEquals(key) &&
ordering.head.direction == Ascending &&
overlap == 0
case ClusteredDistribution(clustering, None) =>
(clustering.length == 1) && clustering.head.semanticEquals(key) && overlap == 0
case _ => false
}
}
}

def realizeDelayedRange(bounds: Array[Long]): Unit = {
if (delayedRange.realized()) {
throw new Exception("ranges are realized already")
} else {
val ranges: IndexedSeq[GRange[java.lang.Long]] = {
(Seq(Long.MinValue) ++ bounds).zip(bounds ++ Seq(Long.MaxValue)).map {
case (lower, upper) => GRange.closedOpen(Long.box(lower), Long.box(upper))
}.toIndexedSeq
}

delayedRange.setRange(ranges)
}
}

def withPartitionIds(
iter: Iterator[InternalRow],
expr: Expression,
output: Seq[Attribute]): Iterator[Product2[Int, InternalRow]] = {
val expandedRanges = delayedRange.getRange().map{
case range => GRange.closedOpen(
Long.box(
if (range.lowerEndpoint() == Long.MinValue) {
Long.MinValue
}
else {
range.lowerEndpoint() - overlap
}
),
range.upperEndpoint()
)
}

val keyProj = UnsafeProjection.create(Seq(expr), output)
val proj = UnsafeProjection.create(output, output)

var currentStartIndex = 0

iter.flatMap { row =>
val key = keyProj(row).getLong(0)

// Update
while(currentStartIndex < expandedRanges.length &&
!expandedRanges(currentStartIndex).contains(key)) {
currentStartIndex += 1
}

var i = currentStartIndex
val rows = new ArrayBuffer[Product2[Int, UnsafeRow]]()

while(i < expandedRanges.length && expandedRanges(i).contains(key)) {
rows.append((i, proj(row)))
i += 1
}

rows
}
}
}

/**
* A collection of [[Partitioning]]s that can be used to describe the partitioning
* scheme of the output of a physical operator. It is usually used for an operator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,83 @@
package org.apache.spark.sql

import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{AsofJoin, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical.{DelayedOverlappedRangePartitioning, DelayedRange}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{AsofJoinExec, BroadcastAsofJoinExec, SparkPlan}
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
import org.apache.spark.sql.internal.SQLConf

// These are codes that can be added via experimental methods
// The actual rules don't need to be in this file. Keep them here for now
// for convenience.

object AsofJoinStrategy extends Strategy {

private def canBroadcastByHints(left: LogicalPlan, right: LogicalPlan)
: Boolean = {
left.stats.hints.broadcast || right.stats.hints.broadcast
}

override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case AsofJoin(left, right, leftOn, rightOn, leftBy, rightBy, tolerance)
if canBroadcastByHints(left, right) =>
val buildSide = if (left.stats.hints.broadcast) {
BuildLeft
} else {
BuildRight
}
BroadcastAsofJoinExec(
buildSide,
leftOn,
rightOn,
leftBy, rightBy, tolerance, planLater(left), planLater(right)) :: Nil

case AsofJoin(left, right, leftOn, rightOn, leftBy, rightBy, tolerance) =>
AsofJoinExec(
leftOn,
rightOn,
leftBy, rightBy, tolerance, planLater(left), planLater(right)) :: Nil

case _ => Nil
}
}

/**
* This must run after ensure requirements. This is not great but I don't know another way to
* do this, unless we modify ensure requirements.
*
* Currently this mutate the state of partitioning (by setting the delayed range object) so
* it's not great. We might need to make partitioning immutable and copy nodes with new
* partitioning.
*
*/
object EnsureRange extends Rule[SparkPlan] {

private def ensureChildrenRange(operator: SparkPlan): SparkPlan = operator match {
case asof: AsofJoinExec =>
// This code assumes EnsureRequirement will set the left and right partitioning
// properly
val leftPartitioning =
asof.left.outputPartitioning.asInstanceOf[DelayedOverlappedRangePartitioning]
val rightPartitioning =
asof.right.outputPartitioning.asInstanceOf[DelayedOverlappedRangePartitioning]

if (leftPartitioning.delayedRange == null) {
val delayedRange = new DelayedRange()
leftPartitioning.setDelayedRange(delayedRange)
rightPartitioning.setDelayedRange(delayedRange)
} else {
rightPartitioning.setDelayedRange(leftPartitioning.delayedRange)
}
asof
}

override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case operator: AsofJoinExec => ensureChildrenRange(operator)
}
}


/**
* :: Experimental ::
Expand All @@ -42,14 +117,17 @@ class ExperimentalMethods private[sql]() {
*
* @since 1.3.0
*/
@volatile var extraStrategies: Seq[Strategy] = Nil
@volatile var extraStrategies: Seq[Strategy] = Seq(AsofJoinStrategy)

@volatile var extraOptimizations: Seq[Rule[LogicalPlan]] = Nil

@volatile var extraPreparations: Seq[Rule[SparkPlan]] = Seq(EnsureRange)

override def clone(): ExperimentalMethods = {
val result = new ExperimentalMethods
result.extraStrategies = extraStrategies
result.extraOptimizations = extraOptimizations
result.extraPreparations = extraPreparations
result
}
}
Loading