Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Support pre-split in GPU project exec #11916

Open
firestarman opened this issue Jan 6, 2025 · 2 comments
Open

[FEA] Support pre-split in GPU project exec #11916

firestarman opened this issue Jan 6, 2025 · 2 comments
Labels
feature request New feature or request

Comments

@firestarman
Copy link
Collaborator

firestarman commented Jan 6, 2025

Is your feature request related to a problem? Please describe.
We met a CPU OOM due to a quite large batch(~5.4G), who has more than 250 columns.

24/12/22 09:34:16 ERROR Utils: Aborting task
com.nvidia.spark.rapids.jni.CpuRetryOOM: CPU OutOfMemory: Could not split the current attempt: {GpuSCB size:5475284352, handle:buffer 
handle TempSpillBufferId(109845,temp_local_fae8ba55-603b-4ddf-b906-079e0717c3cb) at 9223372036854774808, rows:2541469, 
types:List(LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, 
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, 
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, 
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, 
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, 
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, 
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, 
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, 
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, 
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, 
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, 
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, 
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, 
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, 
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType,
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType,
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType,
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, 
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType,
LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, 
LongType, LongType, LongType, LongType, LongType, LongType, LongType, StringType, IntegerType, IntegerType), refCount:1}

After checking the query eventlog, we found there is a big projection after a symmetric join. This big projection was trying to build about 266 columns from a batch with only about 50 columns. So the projected batch size (~5G) grew up to about 5 times of the input batch size (1G).

Describe the solution you'd like
Add the pre-split support to GPU project exe, similarly as what we have done in GPU aggregate exec to avoid producing large batches after some aggregations.

Additional context
The exception call stack

24/12/22 09:34:16 ERROR Utils: Aborting task
com.nvidia.spark.rapids.jni.CpuRetryOOM: CPU OutOfMemory: Could not split the current attempt: {GpuSCB size:5475284352, handle:buffer handle TempSpillBufferId(109845,temp_local_fae8ba55-603b-4ddf-b906-079e0717c3cb) at 9223372036854774808, rows:2541469, types:List(LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, LongType, StringType, IntegerType, IntegerType), refCount:1}
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$AutoCloseableAttemptSpliterator.split(RmmRapidsRetryIterator.scala:494)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryIterator.next(RmmRapidsRetryIterator.scala:624)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryAutoCloseableIterator.next(RmmRapidsRetryIterator.scala:553)
	at com.nvidia.spark.rapids.GpuOutOfCoreSortIterator.$anonfun$splitOneSortedBatch$1(GpuSortExec.scala:463)
	at com.nvidia.spark.rapids.GpuOutOfCoreSortIterator.$anonfun$splitOneSortedBatch$1$adapted(GpuSortExec.scala:455)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.GpuOutOfCoreSortIterator.splitOneSortedBatch(GpuSortExec.scala:455)
	at com.nvidia.spark.rapids.GpuOutOfCoreSortIterator.firstPassReadBatches(GpuSortExec.scala:474)
	at com.nvidia.spark.rapids.GpuOutOfCoreSortIterator.$anonfun$next$4(GpuSortExec.scala:622)
	at com.nvidia.spark.rapids.Arm$.closeOnExcept(Arm.scala:98)
	at com.nvidia.spark.rapids.GpuOutOfCoreSortIterator.next(GpuSortExec.scala:618)
	at com.nvidia.spark.rapids.GpuOutOfCoreSortIterator.next(GpuSortExec.scala:284)
	at org.apache.spark.sql.rapids.GpuFileFormatDataWriter.writeWithIterator(GpuFileFormatDataWriter.scala:179)
	at org.apache.spark.sql.rapids.GpuFileFormatWriter$.$anonfun$executeTask$1(GpuFileFormatWriter.scala:335)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1652)
	at org.apache.spark.sql.rapids.GpuFileFormatWriter$.executeTask(GpuFileFormatWriter.scala:342)
	at org.apache.spark.sql.rapids.GpuFileFormatWriter$.$anonfun$write$14(GpuFileFormatWriter.scala:260)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:134)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:538)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1618)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:541)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: com.nvidia.spark.rapids.jni.CpuRetryOOM: Could not complete allocation after 1000 retries
	at com.nvidia.spark.rapids.HostAlloc.alloc(HostAlloc.scala:246)
	at com.nvidia.spark.rapids.HostAlloc$.alloc(HostAlloc.scala:278)
	at com.nvidia.spark.rapids.RapidsDiskStore$RapidsDiskBuffer.getMemoryBuffer(RapidsDiskStore.scala:169)
	at com.nvidia.spark.rapids.RapidsBufferStore$RapidsBufferBase.materializeMemoryBuffer(RapidsBufferStore.scala:438)
	at com.nvidia.spark.rapids.RapidsBufferStore$RapidsBufferBase.getDeviceMemoryBuffer(RapidsBufferStore.scala:512)
	at com.nvidia.spark.rapids.RapidsBufferStore$RapidsBufferBase.getColumnarBatch(RapidsBufferStore.scala:452)
	at com.nvidia.spark.rapids.SpillableColumnarBatchImpl.$anonfun$getColumnarBatch$1(SpillableColumnarBatch.scala:127)
	at com.nvidia.spark.rapids.SpillableColumnarBatchImpl.$anonfun$withRapidsBuffer$1(SpillableColumnarBatch.scala:110)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.SpillableColumnarBatchImpl.withRapidsBuffer(SpillableColumnarBatch.scala:109)
	at com.nvidia.spark.rapids.SpillableColumnarBatchImpl.getColumnarBatch(SpillableColumnarBatch.scala:125)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$.$anonfun$splitSpillableInHalfByRows$2(RmmRapidsRetryIterator.scala:714)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$.$anonfun$splitSpillableInHalfByRows$1(RmmRapidsRetryIterator.scala:708)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$AutoCloseableAttemptSpliterator.split(RmmRapidsRetryIterator.scala:478)
	... 24 more
@firestarman firestarman added ? - Needs Triage Need team to review and classify feature request New feature or request labels Jan 6, 2025
@firestarman firestarman changed the title [FEA] OOM due a big projection [FEA] OOM due to a big projection Jan 6, 2025
@firestarman firestarman changed the title [FEA] OOM due to a big projection [FEA] Support pre-split in GPU project exec Jan 6, 2025
@revans2
Copy link
Collaborator

revans2 commented Jan 6, 2025

For a project we just need to be careful with window operations. We also need to be careful with the performance impact that this can have.

For some window operations we need to have the entire window in a single batch to be able to process the data. That is archived with the childrenCoalesceGoal. We also can tell other nodes in the plan what our output batching is like with the outputBatching method. For example.

override def childrenCoalesceGoal: Seq[CoalesceGoal] = Seq(outputBatching)
override def outputBatching: CoalesceGoal = if (gpuPartitionSpec.isEmpty) {
RequireSingleBatch
} else {
BatchedByKey(gpuPartitionOrdering)(cpuPartitionOrdering)
}

A ProjectExec can be inserted before the window and after the sort in some cases.

GpuProjectExec(pre.toList, childPlans.head.convertIfNeeded())

We mainly need to make sure that if we can split the input batch into smaller batches that we do not mark Project as preserving the batching.

override def outputPartitioning: Partitioning = child.outputPartitioning

If we update the code to do a pre-project split we could also update it to split on retry as well.

boundProjectList.projectAndCloseWithRetrySingleBatch(sb)

When I first put in the pre-project split code for hash aggregate I also implemented it for project, but I saw a large performance regression so I reverted that part of the code. Please make sure that we measure this performance regression, especially around window operations that require that all of the data for a window be in a single batch.

@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label Jan 7, 2025
@firestarman
Copy link
Collaborator Author

firestarman commented Jan 14, 2025

After enabling the pre-split for Project, we met another case there are some exrepssions of complex type in the project list, but their sizes are wrongly estimated, then it produced a ~3GB batch, leading to OOM when performing the project.
A toy query to reproduce this issue is followed.

scala> spark.range(100).selectExpr("cast(id as long)").createOrReplaceTempView("tt")

scala> sql("select map('sid', id, '1', id), struct('s1', id), array(1, id) from tt").collect
25/01/13 06:31:13 WARN GpuOverrides: 
*Exec <ProjectExec> will run on GPU
  *Expression <Alias> map(sid, id#0L, 1, id#0L) AS map(sid, id, 1, id)#4 will run on GPU
    *Expression <CreateMap> map(sid, id#0L, 1, id#0L) will run on GPU
  *Expression <Alias> struct(col1, s1, id, id#0L) AS struct(s1, id)#5 will run on GPU
    *Expression <CreateNamedStruct> struct(col1, s1, id, id#0L) will run on GPU
  *Expression <Alias> array(1, id#0L) AS array(1, id)#6 will run on GPU
    *Expression <CreateArray> array(1, id#0L) will run on GPU
  *Exec <RangeExec> will run on GPU

===>estimated size: 1204, actual size: 3316, splitUntilSize 1.395081216E9, numSplits 1

We need to react the estimation code to cover these cases at least.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants