Skip to content

Commit

Permalink
[SPARK-50578][PYTHON][SS][FOLLOWUP] Change to use Thread.interrupt
Browse files Browse the repository at this point in the history
…instead of `Thread.stop` to interrupt the execution of TransformWithStateInPandasPythonPreInitRunner#daemonThread

### What changes were proposed in this pull request?
This PR change to use `Thread.interrupt()` instead of `Thread.stop()` to attempt to interrupt the execution of `TransformWithStateInPandasPythonPreInitRunner#daemonThread`.

Additionally, logic has been added in `TransformWithStateInPandasStateServer#run` to respond to the interrupt by setting the `CLOSED` state and exiting.

### Why are the changes needed?
The `Thread.stop` method in Java 21 directly throws an `UnsupportedOperationException`, which led to the failure of the Java 21 daily tests:

- https://github.com/apache/spark/actions/runs/12511573912/job/34903859772
- https://github.com/apache/spark/actions/runs/12523542188/job/34933207012
- https://github.com/apache/spark/actions/runs/12592534465/job/35097321533

![image](https://github.com/user-attachments/assets/75cef6d7-d66a-4652-b01d-38412d6db3b0)

So the primary purpose of this change is to restore the daily tests for Java 21.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Pass GitHub Actions
- Pass Java 21 GitHub Action test: https://github.com/LuciferYang/spark/actions/runs/12606699142/job/35137180872

![image](https://github.com/user-attachments/assets/9e5e8b08-d167-4f7a-959c-8ebe6e22f9bc)

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #49354 from LuciferYang/java21-test-2.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
LuciferYang authored and dongjoon-hyun committed Jan 4, 2025
1 parent 98dc763 commit 0d0fa86
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class TransformWithStateInPandasPythonPreInitRunner(
override def stop(): Unit = {
super.stop()
closeServerSocketChannelSilently(stateServerSocket)
daemonThread.stop()
daemonThread.interrupt()
}

private def startStateServer(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ class TransformWithStateInPandasStateServer(

while (listeningSocket.isConnected &&
statefulProcessorHandle.getHandleState != StatefulProcessorHandleState.CLOSED) {
if (Thread.currentThread().isInterrupted) {
throw new InterruptedException("Thread was interrupted")
}
try {
val version = inputStream.readInt()
if (version != -1) {
Expand All @@ -159,6 +162,11 @@ class TransformWithStateInPandasStateServer(
logWarning(log"No more data to read from the socket")
statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.CLOSED)
return
case _: InterruptedException =>
logInfo(log"Thread interrupted, shutting down state server")
Thread.currentThread().interrupt()
statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.CLOSED)
return
case e: Exception =>
logError(log"Error reading message: ${MDC(LogKeys.ERROR, e.getMessage)}", e)
sendResponse(1, e.getMessage)
Expand Down

0 comments on commit 0d0fa86

Please sign in to comment.