Skip to content

Commit

Permalink
add PoC scripts for gtk_application_inhibit
Browse files Browse the repository at this point in the history
  • Loading branch information
fohrloop committed Sep 25, 2024
1 parent aa026da commit 5ce3042
Show file tree
Hide file tree
Showing 3 changed files with 374 additions and 0 deletions.
63 changes: 63 additions & 0 deletions scripts/gtk_application_inhibit_poc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from __future__ import annotations

import warnings

with warnings.catch_warnings():
# Ignore the PyGIWarning: Gtk was imported without specifying a version
# first. This should work on GtK 3 and 4.
warnings.filterwarnings(action="ignore")
from gi.repository import Gio, Gtk


class Inhibitor:
"""Inhibitor which uses GTK, namely the gtk_application_inhibit()
Docs: https://docs.gtk.org/gtk3/method.Application.inhibit.html
"""

def __init__(self):
self.app: Gtk.Application | None = None
self.cookie: int | None = None

def start(self, *_) -> None:
self.app = Gtk.Application(
application_id="io.readthedocs.wakepy",
flags=Gio.ApplicationFlags.IS_SERVICE | Gio.ApplicationFlags.NON_UNIQUE,
)

try:
# Cannot use the inhibit() if the app is not registered first.
# Docs: https://lazka.github.io/pgi-docs/Gio-2.0/classes/Application.html#Gio.Application.register
self.app.register()

# Docs: https://lazka.github.io/pgi-docs/#Gtk-4.0/classes/Application.html#Gtk.Application.inhibit
cookie = self.app.inhibit(
Gtk.ApplicationWindow(application=self.app),
Gtk.ApplicationInhibitFlags(8), # prevent idle
"wakelock requested (wakepy)",
)
if not cookie:
raise RuntimeError(
"Failed to inhibit the system (Gtk.Application.inhibit did not return a "
"non-zero cookie)"
)

self.cookie = cookie

# The hold() keeps the app alive even without a window.
# Docs: https://lazka.github.io/pgi-docs/Gio-2.0/classes/Application.html#Gio.Application.hold
self.app.hold()

except Exception as error:
self.app.quit()
raise RuntimeError(f"Failed to inhibit the system: {error}")

def stop(self) -> None:
if self.cookie:
# Counterpart to hold().
self.app.uninhibit(self.cookie)
self.cookie = None

self.app.release()
self.app.quit()
self.app = None
170 changes: 170 additions & 0 deletions scripts/inhibitor_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
from __future__ import annotations

import importlib.util
import sys
import typing
import warnings
from pathlib import Path
from socket import AF_UNIX, SOCK_STREAM, socket

if sys.version_info < (3, 8): # pragma: no-cover-if-py-gte-38
from typing_extensions import Protocol
else: # pragma: no-cover-if-py-lt-38
from typing import Protocol

if typing.TYPE_CHECKING:
from typing import Type


class Inhibitor(Protocol):
"""The Inhibitor protocol. An inhibitor module should provide a class
called Inhibitor which implements this protocol."""

def start(self, *args) -> None: ...
def stop(self) -> None: ...


CLIENT_CONNECTION_TIMEOUT = 60
"""Time to wait (seconds) for the client to connect to the server."""
CLIENT_MESSAGE_TIMEOUT = 1
"""Time to wait (seconds) for each message from the client."""


class InhibitorServer:
"""A very simple class for inhibiting suspend/idle.
Communicates with a main process using a Unix domain socket.
What happens when run() is called:
1. When the process starts, inhibit() is called. If it succeeds, this
process sends "INHIBIT_OK". If it fails, this process sends
"INHIBIT_ERROR" and exits.
2. This process waits indefinitely for a "QUIT" message.
3. When "QUIT" (or empty string) is received, uninhibit() is called. If it
succeeds, this process sends "UNINHIBIT_OK". If it fails, this process
sends "UNINHIBIT_ERROR". Then, this process exits.
"""

def __init__(self):
self._inhibitor: Inhibitor | None = None

def run(self, socket_path: str, inhibitor_module: str, *inhibit_args) -> None:
"""Inhibit the system using inhibitor_module and wait for a quit
message at socket_path.
Parameters
----------
inhibitor_module : str
The python module that contains the Inhibitor class
socket_path : str
The path to the Unix domain socket which is used for communication.
"""
server_socket = socket(AF_UNIX, SOCK_STREAM)
Path(socket_path).expanduser().unlink(missing_ok=True)
server_socket.bind(socket_path)

try:
self._run(server_socket, inhibitor_module, *inhibit_args)
finally:
server_socket.close()

def _run(self, server_socket: socket, inhibitor_module: str, *inhibit_args) -> None:
server_socket.listen(1) # Only allow 1 connection at a time
client_socket = self._get_client_socket(server_socket)
client_socket.settimeout(CLIENT_MESSAGE_TIMEOUT)

try:
self.inhibit(inhibitor_module, *inhibit_args)
self.send_message(client_socket, "INHIBIT_OK")
except Exception as error:
self.send_message(client_socket, f"INHIBIT_ERROR:{error}")
sys.exit(0)

while True:
# Called every `CLIENT_MESSAGE_TIMEOUT` seconds.
should_quit = self.check_for_quit_message(client_socket)
if should_quit:
break

try:
self.uninhibit()
self.send_message(client_socket, "UNINHIBIT_OK")
except Exception as error:
self.send_message(client_socket, f"UNINHIBIT_ERROR:{error}")
sys.exit(0)

@staticmethod
def _get_client_socket(server_socket: socket) -> socket:
server_socket.settimeout(CLIENT_CONNECTION_TIMEOUT)

try:
client_socket, _ = server_socket.accept()
except TimeoutError as e:
raise TimeoutError(
f"Client did not connect within {CLIENT_CONNECTION_TIMEOUT} seconds."
) from e
except KeyboardInterrupt:
print("Interrupted manually. Exiting.")
sys.exit(0)

return client_socket

def inhibit(self, inhibitor_module: str, *inhibit_args) -> None:
"""Inhibit using the Inhibitor class in the given `inhibitor_module`.
In case the operation fails, raises a RuntimeError."""
inhibitor_class = self.get_inhibitor_class(inhibitor_module)
self._inhibitor = inhibitor_class()
self._inhibitor.start(*inhibit_args)

@staticmethod
def get_inhibitor_class(inhibitor_module_path: str) -> Type[Inhibitor]:
try:
module_name = "__wakepy_inhibitor"
spec = importlib.util.spec_from_file_location(
module_name, inhibitor_module_path
)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
except ImportError as e:
raise ImportError(
f"{e} | Used python interpreter: {sys.executable}."
) from e
return module.Inhibitor

def uninhibit(self) -> None:
"""Uninhibit what was inhibited. In case the operation fails, raises a
RuntimeError."""
if self._inhibitor:
self._inhibitor.stop()
self._inhibitor = None
else:
warnings.warn("Called uninhibit before inhibit -> doing nothing.")

def send_message(self, client_socket: socket, message: str) -> None:
client_socket.sendall(message.encode())

def check_for_quit_message(self, sock: socket) -> bool:
# waits until the socket gets a message
try:
request = sock.recv(1024).decode()
except TimeoutError:
return False
print(f"Received request: {request}")
# if the client disconnects, empty string is returned. This will make
# sure that the server process quits automatically when it's not needed
# anymore.
return request == "QUIT" or request == ""


if __name__ == "__main__":
if len(sys.argv) < 3:
print(
f"Usage: python {__file__} <socket_path> <inhibitor_module> [inhibit_args...]"
)
sys.exit(1)

# Get the socket path from the command-line arguments
InhibitorServer().run(
socket_path=sys.argv[1], inhibitor_module=sys.argv[2], *sys.argv[3:]
)
141 changes: 141 additions & 0 deletions scripts/main_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import subprocess
import sys
import time
from contextlib import contextmanager
from pathlib import Path
from socket import AF_UNIX, SOCK_STREAM, socket

# Path to the Unix domain socket file
# TODO: Add unique identifier
SOCKET_PATH = "/tmp/wakepy.socket"

python_path = "/usr/bin/python"

inhibitor_modules_root = Path(__file__).parent


def start_inhibit_server(
socket_path: str, python_path: str, inhibitor_relpath: str, *inhibitor_args: object
):
"""
Parameters
----------
python_path : str
The path to the python interpreter
"""
socket_pth = Path(socket_path).expanduser()
# Remove the file so we can just wait the file to appear and know that
# the server is ready.
socket_pth.unlink(missing_ok=True)
inhibitor_module = str(inhibitor_modules_root / inhibitor_relpath)

# TODO: Get the inhibiitor server full filepath
cmd = [
python_path,
"inhibitor_server.py",
socket_path,
inhibitor_module,
*inhibitor_args,
]
subprocess.Popen(cmd)
try:
wait_until_file_exists(socket_pth)
except Exception as e:
raise RuntimeError(f"Something went wrong while calling {cmd}") from e


def wait_until_file_exists(
file_path: Path, total_wait_time: float = 2, wait_time_per_cycle=0.001
) -> None:
"""Waits until a file exists or the total_wait_time is reached.
Parameters
----------
file_path : Path
The path to the file
total_wait_time : float, optional
The total time to wait. Default: 2 (seconds)
wait_time_per_cycle : float, optional
The time to wait between each cycle. Default: 0.001 (seconds)
Raises
------
FileNotFoundError
If the file does not exist after the total_wait_time
"""

for _ in range(int(total_wait_time / wait_time_per_cycle)):
if file_path.exists():
break
time.sleep(wait_time_per_cycle)
else:
raise FileNotFoundError(f"File {file_path} does not exists. Wait")


@contextmanager
def inhibited_with_python_module(socket_path: str):

client_socket = socket(AF_UNIX, SOCK_STREAM)
try:
client_socket.connect(socket_path)
except ConnectionRefusedError:
raise RuntimeError("Must start the server first.")
client_socket.settimeout(1)

try:
handle_initial(client_socket)
except Exception:
client_socket.close()
raise

yield

try:
uninhibit(client_socket)
finally:
client_socket.close()


def handle_initial(client_socket: socket):
response = client_socket.recv(1024).decode()

print(f"Response from server: {response}")
if response.startswith("INHIBIT_ERROR"):
errtext = response.split(":", maxsplit=1)[1]
raise RuntimeError(errtext)
elif response != "INHIBIT_OK": # should never happen
raise RuntimeError("Failed to inhibit the system")


def uninhibit(client_socket: socket):
client_socket.sendall("QUIT".encode())
response = client_socket.recv(1024).decode()
print(f"Response from side process: {response}")

if response.startswith("UNINHIBIT_ERROR"):
errtext = response.split(":", maxsplit=1)[-1]
raise RuntimeError(f"Failed to uninhibit the system: {errtext}")
elif response != "UNINHIBIT_OK": # should never happen
raise RuntimeError("Failed to uninhibit the system")


def start_server_and_inhibit(
socket_path: str,
python_path: str,
inhibitor_relpath: str,
*inhibitor_args: object,
):
start_inhibit_server(socket_path, python_path, inhibitor_relpath, *inhibitor_args)
with inhibited_with_python_module(socket_path):
print("doing work")


if __name__ == "__main__":
if len(sys.argv) != 2:
print(f"Usage: python {__file__} <socket_path>")
sys.exit(1)
socket_path = sys.argv[1]

with inhibited_with_python_module(socket_path):
print("doing work")
time.sleep(100)

0 comments on commit 5ce3042

Please sign in to comment.