Skip to content

Commit

Permalink
Improve: run_before_script() (#959)
Browse files Browse the repository at this point in the history
#958 causes the captured output to produce too many newlines.
Modernize our `run_before_script()`.
  • Loading branch information
tony authored Feb 2, 2025
2 parents 425bf3c + 03e6e68 commit 7bc8cc0
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 31 deletions.
4 changes: 4 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ $ pipx install --suffix=@next 'tmuxp' --pip-args '\--pre' --force

<!-- Maintainers, insert changes / features for the next release here -->

### Bug fixes

- `run_before_script()`: Fix output issue (#959)

## tmuxp 1.52.0 (2025-02-02)

_Maintenance only, no bug fixes or new features_
Expand Down
91 changes: 61 additions & 30 deletions src/tmuxp/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,41 +28,72 @@ def run_before_script(
script_file: str | pathlib.Path,
cwd: pathlib.Path | None = None,
) -> int:
"""Execute a shell script, wraps :meth:`subprocess.check_call()` in a try/catch."""
"""Execute shell script, ``tee``-ing output to both terminal (if TTY) and buffer."""
script_cmd = shlex.split(str(script_file))

try:
proc = subprocess.Popen(
shlex.split(str(script_file)),
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
script_cmd,
cwd=cwd,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True, # decode to str
errors="backslashreplace",
encoding="utf-8",
)
if proc.stdout is not None:
for line in iter(proc.stdout.readline, ""):
sys.stdout.write(line)
proc.wait()

if proc.returncode and proc.stderr is not None:
stderr = proc.stderr.read()
proc.stderr.close()
stderr_strlist = stderr.split("\n")
stderr_str = "\n".join(list(filter(None, stderr_strlist))) # filter empty

raise exc.BeforeLoadScriptError(
proc.returncode,
os.path.abspath(script_file), # NOQA: PTH100
stderr_str,
)
except OSError as e:
if e.errno == 2:
raise exc.BeforeLoadScriptNotExists(
e,
os.path.abspath(script_file), # NOQA: PTH100
) from e
raise
return proc.returncode
except FileNotFoundError as e:
raise exc.BeforeLoadScriptNotExists(
e,
os.path.abspath(script_file), # NOQA: PTH100
) from e

out_buffer = []
err_buffer = []

# While process is running, read lines from stdout/stderr
# and write them to this process's stdout/stderr if isatty
is_out_tty = sys.stdout.isatty()
is_err_tty = sys.stderr.isatty()

# You can do a simple loop reading in real-time:
while True:
# Use .poll() to check if the child has exited
return_code = proc.poll()

# Read one line from stdout, if available
line_out = proc.stdout.readline() if proc.stdout else ""

# Read one line from stderr, if available
line_err = proc.stderr.readline() if proc.stderr else ""

if line_out:
out_buffer.append(line_out)
if is_out_tty:
sys.stdout.write(line_out)
sys.stdout.flush()

if line_err:
err_buffer.append(line_err)
if is_err_tty:
sys.stderr.write(line_err)
sys.stderr.flush()

# If no more data from pipes and process ended, break
if not line_out and not line_err and return_code is not None:
break

# At this point, the process has finished
return_code = proc.wait()

if return_code != 0:
# Join captured stderr lines for your exception
stderr_str = "".join(err_buffer).strip()
raise exc.BeforeLoadScriptError(
return_code,
os.path.abspath(script_file), # NOQA: PTH100
stderr_str,
)

return return_code


def oh_my_zsh_auto_title() -> None:
Expand Down
55 changes: 54 additions & 1 deletion tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

import pathlib
import sys
import typing as t

import pytest
Expand Down Expand Up @@ -35,8 +37,59 @@ def test_run_before_script_raise_BeforeLoadScriptError_if_retcode() -> None:
run_before_script(script_file)


def test_return_stdout_if_ok(capsys: pytest.CaptureFixture[str]) -> None:
@pytest.fixture
def temp_script(tmp_path: pathlib.Path) -> pathlib.Path:
"""Fixture of an example script that prints "Hello, world!"."""
script = tmp_path / "test_script.sh"
script.write_text(
"""#!/bin/sh
echo "Hello, World!"
exit 0
"""
)
script.chmod(0o755)
return script


@pytest.mark.parametrize(
["isatty_value", "expected_output"],
[
(True, "Hello, World!"), # if stdout is a TTY, output should be passed through
(False, ""), # if not a TTY, output is not written to sys.stdout
],
)
def test_run_before_script_isatty(
temp_script: pathlib.Path,
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture[str],
isatty_value: bool,
expected_output: str,
) -> None:
"""Verify behavior of ``isatty()``, which we mock in `run_before_script()`."""
# Mock sys.stdout.isatty() to return the desired value.
monkeypatch.setattr(sys.stdout, "isatty", lambda: isatty_value)

# Run the script.
returncode = run_before_script(temp_script)

# Assert that the script ran successfully.
assert returncode == 0

out, _err = capsys.readouterr()

# In TTY mode, we expect the output; in non-TTY mode, we expect it to be suppressed.
assert expected_output in out


def test_return_stdout_if_ok(
capsys: pytest.CaptureFixture[str],
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""run_before_script() returns stdout if script succeeds."""
# Simulate sys.stdout.isatty() + sys.stderr.isatty()
monkeypatch.setattr(sys.stdout, "isatty", lambda: True)
monkeypatch.setattr(sys.stderr, "isatty", lambda: True)

script_file = FIXTURE_PATH / "script_complete.sh"

run_before_script(script_file)
Expand Down

0 comments on commit 7bc8cc0

Please sign in to comment.