Skip to content

Commit

Permalink
Fix referenced watcher hanging event loop if worker is not joined
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Jan 7, 2024
1 parent 7b53bad commit 5aeaad2
Showing 1 changed file with 40 additions and 24 deletions.
64 changes: 40 additions & 24 deletions src/Context/Internal/AbstractContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Amp\Parallel\Context\Internal;

use Amp\Cancellation;
use Amp\CancelledException;
use Amp\ForbidCloning;
use Amp\ForbidSerialization;
use Amp\Future;
Expand All @@ -24,33 +25,13 @@ abstract class AbstractContext implements Context
use ForbidCloning;
use ForbidSerialization;

/** @var Future<ExitResult> */
private readonly Future $result;
/** @var Future<ExitResult<TResult>>|null */
private ?Future $result = null;

protected function __construct(
private readonly Channel $ipcChannel,
Channel $resultChannel,
private readonly Channel $resultChannel,
) {
$this->result = async(static function () use ($resultChannel): ExitResult {
try {
$data = $resultChannel->receive();
} catch (\Throwable $exception) {
throw new ContextException("Failed to receive result from context", previous: $exception);
} finally {
$resultChannel->close();
}

if (!$data instanceof ExitResult) {
throw new ContextException(\sprintf(
"The context sent data instead of exiting: %s",
flattenArgument($data),
));
}

return $data;
});

$this->result->ignore();
}

public function receive(?Cancellation $cancellation = null): mixed
Expand Down Expand Up @@ -116,6 +97,41 @@ public function onClose(\Closure $onClose): void

protected function receiveExitResult(?Cancellation $cancellation = null): ExitResult
{
return $this->result->await($cancellation);
while ($this->result) {
try {
$this->result->await($cancellation);
} catch (CancelledException) {
// Ignore cancellation from a prior join request, throw only if this request was cancelled.
$cancellation?->throwIfRequested();
}
}

$this->result = async(function () use ($cancellation): ExitResult {
try {
$data = $this->resultChannel->receive($cancellation);
$this->resultChannel->close();
} catch (CancelledException $exception) {
throw $exception;
} catch (\Throwable $exception) {
$this->resultChannel->close();
throw new ContextException("Failed to receive result from context", previous: $exception);
}

if (!$data instanceof ExitResult) {
throw new ContextException(\sprintf(
"The context sent data instead of exiting: %s",
flattenArgument($data),
));
}

return $data;
});

try {
return $this->result->await();
} catch (CancelledException $exception) {
$this->result = null;
throw $exception;
}
}
}

0 comments on commit 5aeaad2

Please sign in to comment.