Skip to content

Commit

Permalink
Added more process arguments (#749)
Browse files Browse the repository at this point in the history
Fixes #742.
  • Loading branch information
agronholm authored Sep 3, 2024
1 parent 5a1e419 commit 2532e43
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 100 deletions.
6 changes: 6 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
- Added support for the ``from_uri()``, ``full_match()``, ``parser`` methods/properties
in ``anyio.Path``, newly added in Python 3.13
(`#737 <https://github.com/agronholm/anyio/issues/737>`_)
- Added support for more keyword arguments for ``run_process()`` and ``open_process()``:
``startupinfo``, ``creationflags``, ``pass_fds``, ``user``, ``group``,
``extra_groups`` and ``umask``
(`#742 <https://github.com/agronholm/anyio/issues/742>`_)
- Improved the type annotations and support for ``PathLike`` in ``run_process()`` and
``open_process()`` to allow for path-like arguments, just like ``subprocess.Popen``
- Changed the ``ResourceWarning`` from an unclosed memory object stream to include its
address for easier identification
- Changed ``start_blocking_portal()`` to always use daemonic threads, to accommodate the
Expand Down
25 changes: 11 additions & 14 deletions src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import asyncio
import concurrent.futures
import math
import os
import socket
import sys
import threading
Expand Down Expand Up @@ -47,7 +48,6 @@
Collection,
ContextManager,
Coroutine,
Mapping,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -81,6 +81,7 @@
UDPPacketType,
UNIXDatagramPacketType,
)
from ..abc._eventloop import StrOrBytesPath
from ..lowlevel import RunVar
from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream

Expand Down Expand Up @@ -2239,36 +2240,32 @@ def create_blocking_portal(cls) -> abc.BlockingPortal:
@classmethod
async def open_process(
cls,
command: str | bytes | Sequence[str | bytes],
command: StrOrBytesPath | Sequence[StrOrBytesPath],
*,
shell: bool,
stdin: int | IO[Any] | None,
stdout: int | IO[Any] | None,
stderr: int | IO[Any] | None,
cwd: str | bytes | PathLike | None = None,
env: Mapping[str, str] | None = None,
start_new_session: bool = False,
**kwargs: Any,
) -> Process:
await cls.checkpoint()
if shell:
if isinstance(command, PathLike):
command = os.fspath(command)

if isinstance(command, (str, bytes)):
process = await asyncio.create_subprocess_shell(
cast("str | bytes", command),
command,
stdin=stdin,
stdout=stdout,
stderr=stderr,
cwd=cwd,
env=env,
start_new_session=start_new_session,
**kwargs,
)
else:
process = await asyncio.create_subprocess_exec(
*command,
stdin=stdin,
stdout=stdout,
stderr=stderr,
cwd=cwd,
env=env,
start_new_session=start_new_session,
**kwargs,
)

stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None
Expand Down
47 changes: 30 additions & 17 deletions src/anyio/_backends/_trio.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import array
import math
import os
import socket
import sys
import types
Expand All @@ -25,7 +26,6 @@
ContextManager,
Coroutine,
Generic,
Mapping,
NoReturn,
Sequence,
TypeVar,
Expand Down Expand Up @@ -60,7 +60,7 @@
from .._core._synchronization import ResourceGuard
from .._core._tasks import CancelScope as BaseCancelScope
from ..abc import IPSockAddrType, UDPPacketType, UNIXDatagramPacketType
from ..abc._eventloop import AsyncBackend
from ..abc._eventloop import AsyncBackend, StrOrBytesPath
from ..streams.memory import MemoryObjectSendStream

if sys.version_info >= (3, 10):
Expand Down Expand Up @@ -967,26 +967,39 @@ def create_blocking_portal(cls) -> abc.BlockingPortal:
@classmethod
async def open_process(
cls,
command: str | bytes | Sequence[str | bytes],
command: StrOrBytesPath | Sequence[StrOrBytesPath],
*,
shell: bool,
stdin: int | IO[Any] | None,
stdout: int | IO[Any] | None,
stderr: int | IO[Any] | None,
cwd: str | bytes | PathLike | None = None,
env: Mapping[str, str] | None = None,
start_new_session: bool = False,
**kwargs: Any,
) -> Process:
process = await trio.lowlevel.open_process( # type: ignore[misc]
command, # type: ignore[arg-type]
stdin=stdin,
stdout=stdout,
stderr=stderr,
shell=shell,
cwd=cwd,
env=env,
start_new_session=start_new_session,
)
def convert_item(item: StrOrBytesPath) -> str:
str_or_bytes = os.fspath(item)
if isinstance(str_or_bytes, str):
return str_or_bytes
else:
return os.fsdecode(str_or_bytes)

if isinstance(command, (str, bytes, PathLike)):
process = await trio.lowlevel.open_process(
convert_item(command),
stdin=stdin,
stdout=stdout,
stderr=stderr,
shell=True,
**kwargs,
)
else:
process = await trio.lowlevel.open_process(
[convert_item(item) for item in command],
stdin=stdin,
stdout=stdout,
stderr=stderr,
shell=False,
**kwargs,
)

stdin_stream = SendStreamWrapper(process.stdin) if process.stdin else None
stdout_stream = ReceiveStreamWrapper(process.stdout) if process.stdout else None
stderr_stream = ReceiveStreamWrapper(process.stderr) if process.stderr else None
Expand Down
125 changes: 97 additions & 28 deletions src/anyio/_core/_subprocesses.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,41 @@
from __future__ import annotations

from collections.abc import AsyncIterable, Mapping, Sequence
import sys
from collections.abc import AsyncIterable, Iterable, Mapping, Sequence
from io import BytesIO
from os import PathLike
from subprocess import DEVNULL, PIPE, CalledProcessError, CompletedProcess
from typing import IO, Any, cast
from typing import IO, Any, Union, cast

from ..abc import Process
from ._eventloop import get_async_backend
from ._tasks import create_task_group

if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias

StrOrBytesPath: TypeAlias = Union[str, bytes, "PathLike[str]", "PathLike[bytes]"]


async def run_process(
command: str | bytes | Sequence[str | bytes],
command: StrOrBytesPath | Sequence[StrOrBytesPath],
*,
input: bytes | None = None,
stdout: int | IO[Any] | None = PIPE,
stderr: int | IO[Any] | None = PIPE,
check: bool = True,
cwd: str | bytes | PathLike[str] | None = None,
cwd: StrOrBytesPath | None = None,
env: Mapping[str, str] | None = None,
startupinfo: Any = None,
creationflags: int = 0,
start_new_session: bool = False,
pass_fds: Sequence[int] = (),
user: str | int | None = None,
group: str | int | None = None,
extra_groups: Iterable[str | int] | None = None,
umask: int = -1,
) -> CompletedProcess[bytes]:
"""
Run an external command in a subprocess and wait until it completes.
Expand All @@ -40,8 +55,20 @@ async def run_process(
command
:param env: if not ``None``, this mapping replaces the inherited environment
variables from the parent process
:param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used
to specify process startup parameters (Windows only)
:param creationflags: flags that can be used to control the creation of the
subprocess (see :class:`subprocess.Popen` for the specifics)
:param start_new_session: if ``true`` the setsid() system call will be made in the
child process prior to the execution of the subprocess. (POSIX only)
:param pass_fds: sequence of file descriptors to keep open between the parent and
child processes. (POSIX only)
:param user: effective user to run the process as (Python >= 3.9, POSIX only)
:param group: effective group to run the process as (Python >= 3.9, POSIX only)
:param extra_groups: supplementary groups to set in the subprocess (Python >= 3.9,
POSIX only)
:param umask: if not negative, this umask is applied in the child process before
running the given command (Python >= 3.9, POSIX only)
:return: an object representing the completed process
:raises ~subprocess.CalledProcessError: if ``check`` is ``True`` and the process
exits with a nonzero return code
Expand All @@ -62,7 +89,14 @@ async def drain_stream(stream: AsyncIterable[bytes], index: int) -> None:
stderr=stderr,
cwd=cwd,
env=env,
startupinfo=startupinfo,
creationflags=creationflags,
start_new_session=start_new_session,
pass_fds=pass_fds,
user=user,
group=group,
extra_groups=extra_groups,
umask=umask,
) as process:
stream_contents: list[bytes | None] = [None, None]
async with create_task_group() as tg:
Expand All @@ -86,14 +120,21 @@ async def drain_stream(stream: AsyncIterable[bytes], index: int) -> None:


async def open_process(
command: str | bytes | Sequence[str | bytes],
command: StrOrBytesPath | Sequence[StrOrBytesPath],
*,
stdin: int | IO[Any] | None = PIPE,
stdout: int | IO[Any] | None = PIPE,
stderr: int | IO[Any] | None = PIPE,
cwd: str | bytes | PathLike[str] | None = None,
cwd: StrOrBytesPath | None = None,
env: Mapping[str, str] | None = None,
startupinfo: Any = None,
creationflags: int = 0,
start_new_session: bool = False,
pass_fds: Sequence[int] = (),
user: str | int | None = None,
group: str | int | None = None,
extra_groups: Iterable[str | int] | None = None,
umask: int = -1,
) -> Process:
"""
Start an external command in a subprocess.
Expand All @@ -111,30 +152,58 @@ async def open_process(
:param cwd: If not ``None``, the working directory is changed before executing
:param env: If env is not ``None``, it must be a mapping that defines the
environment variables for the new process
:param creationflags: flags that can be used to control the creation of the
subprocess (see :class:`subprocess.Popen` for the specifics)
:param startupinfo: an instance of :class:`subprocess.STARTUPINFO` that can be used
to specify process startup parameters (Windows only)
:param start_new_session: if ``true`` the setsid() system call will be made in the
child process prior to the execution of the subprocess. (POSIX only)
:param pass_fds: sequence of file descriptors to keep open between the parent and
child processes. (POSIX only)
:param user: effective user to run the process as (Python >= 3.9; POSIX only)
:param group: effective group to run the process as (Python >= 3.9; POSIX only)
:param extra_groups: supplementary groups to set in the subprocess (Python >= 3.9;
POSIX only)
:param umask: if not negative, this umask is applied in the child process before
running the given command (Python >= 3.9; POSIX only)
:return: an asynchronous process object
"""
if isinstance(command, (str, bytes)):
return await get_async_backend().open_process(
command,
shell=True,
stdin=stdin,
stdout=stdout,
stderr=stderr,
cwd=cwd,
env=env,
start_new_session=start_new_session,
)
else:
return await get_async_backend().open_process(
command,
shell=False,
stdin=stdin,
stdout=stdout,
stderr=stderr,
cwd=cwd,
env=env,
start_new_session=start_new_session,
)
kwargs: dict[str, Any] = {}
if user is not None:
if sys.version_info < (3, 9):
raise TypeError("the 'user' argument requires Python 3.9 or later")

kwargs["user"] = user

if group is not None:
if sys.version_info < (3, 9):
raise TypeError("the 'group' argument requires Python 3.9 or later")

kwargs["group"] = group

if extra_groups is not None:
if sys.version_info < (3, 9):
raise TypeError("the 'extra_groups' argument requires Python 3.9 or later")

kwargs["extra_groups"] = group

if umask >= 0:
if sys.version_info < (3, 9):
raise TypeError("the 'umask' argument requires Python 3.9 or later")

kwargs["umask"] = umask

return await get_async_backend().open_process(
command,
stdin=stdin,
stdout=stdout,
stderr=stderr,
cwd=cwd,
env=env,
startupinfo=startupinfo,
creationflags=creationflags,
start_new_session=start_new_session,
pass_fds=pass_fds,
**kwargs,
)
Loading

0 comments on commit 2532e43

Please sign in to comment.