Skip to content

Commit

Permalink
WIP add spill for single page
Browse files Browse the repository at this point in the history
  • Loading branch information
Zand100 committed Nov 14, 2024
1 parent 35e5ca7 commit caca67f
Showing 1 changed file with 30 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package org.apache.spark.sql.execution

import org.apache.spark.SparkEnv
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.memory.{MemoryConsumer, MemoryMode, SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData
import org.apache.spark.unsafe.{Platform, UnsafeAlignedOffset}
import org.apache.spark.unsafe.memory.MemoryBlock
import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter

import java.util
import java.util.LinkedList

class UnsafeArray(taskMemoryManager: TaskMemoryManager) extends MemoryConsumer(taskMemoryManager, MemoryMode.OFF_HEAP) {

Expand All @@ -13,6 +19,7 @@ class UnsafeArray(taskMemoryManager: TaskMemoryManager) extends MemoryConsumer(
protected var pageCursor = 0
private var keyOffsets: Array[Long] = null
protected var numRows = 0
private val spillWriters: util.LinkedList[UnsafeSorterSpillWriter] = new util.LinkedList[UnsafeSorterSpillWriter]

def iterator() {}

Expand Down Expand Up @@ -58,5 +65,27 @@ class UnsafeArray(taskMemoryManager: TaskMemoryManager) extends MemoryConsumer(
numRows += 1
}

override def spill(l: Long, memoryConsumer: MemoryConsumer): Long = ???
override def spill(numBytes: Long, memoryConsumer: MemoryConsumer): Long = ???
// should we fail, or write to disk? probably the latter. write to UnsafeSorterSpillWriter (other usages?)
// TODO handle multiple data pages
// write to disk
var spillBase = page.getBaseObject
var spillOffset = page.getBaseOffset
var numRecords: Int = UnsafeAlignedOffset.getSize(spillBase, spillOffset) // probably wrong
val writeMetrics = new ShuffleWriteMetrics
val spillWriter = new UnsafeSorterSpillWriter(if (SparkEnv.get != null) SparkEnv.get.blockManager
else null, 32 * 1024, writeMetrics, numRecords)
val uaoSize: Int = UnsafeAlignedOffset.getUaoSize
while (numRecords >0) {
val length = UnsafeAlignedOffset.getSize(spillBase, spillOffset)
spillWriter.write(spillBase, spillOffset + uaoSize, length, 0)
spillOffset += uaoSize + length + 8 //why 8? OBO?
numRecords -= 1
}
spillWriter.close()

// memory stuff
// free a whole page, but right now we only have a single page. TODO


}

0 comments on commit caca67f

Please sign in to comment.