diff --git a/CHANGELOG.md b/CHANGELOG.md index 93334ef8a..72095450c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,18 @@ We appreciate your patience while we speedily work towards a stable release of t +### 0.65.9 (2024-10-31) + +Output from `Sandbox.exec` can now be directed to `/dev/null`, `stdout`, or stored for consumption. This behavior can be controlled via the new `StreamType` arguments. + + + +### 0.65.8 (2024-10-31) + +- Fixed a bug where the `Image.imports` context manager would not correctly propagate ImportError when using a `modal.Cls`. + + + ### 0.65.2 (2024-10-30) * Fix issue where `modal run` wouldn't exit for 10s if there was a failure during app creation diff --git a/modal/_container_entrypoint.py b/modal/_container_entrypoint.py index c660367c9..6191be689 100644 --- a/modal/_container_entrypoint.py +++ b/modal/_container_entrypoint.py @@ -629,10 +629,16 @@ def import_class_service( if isinstance(cls, Cls): # The cls decorator is in global scope - method_partials = synchronizer._translate_in(cls._get_partial_functions()) + _cls = synchronizer._translate_in(cls) + method_partials = _cls._get_partial_functions() + function = _cls._class_service_function else: # Undecorated user class - find all methods method_partials = _find_partial_methods_for_user_cls(cls, _PartialFunctionFlags.all()) + function = None + + if function: + code_deps = function.deps(only_explicit_mounts=True) user_cls_instance = get_user_class_instance(cls, cls_args, cls_kwargs) diff --git a/modal/container_process.py b/modal/container_process.py index 9310825cd..6d949f563 100644 --- a/modal/container_process.py +++ b/modal/container_process.py @@ -11,6 +11,7 @@ from .client import _Client from .exception import InteractiveTimeoutError, InvalidError from .io_streams import _StreamReader, _StreamWriter +from .stream_type import StreamType class _ContainerProcess: @@ -20,11 +21,21 @@ class _ContainerProcess: _stdin: _StreamWriter _returncode: Optional[int] = None - def __init__(self, process_id: str, client: _Client) -> None: + def __init__( + self, + process_id: str, + client: _Client, + stdout: StreamType = StreamType.PIPE, + stderr: StreamType = StreamType.PIPE, + ) -> None: self._process_id = process_id self._client = client - self._stdout = _StreamReader(api_pb2.FILE_DESCRIPTOR_STDOUT, process_id, "container_process", self._client) - self._stderr = _StreamReader(api_pb2.FILE_DESCRIPTOR_STDERR, process_id, "container_process", self._client) + self._stdout = _StreamReader( + api_pb2.FILE_DESCRIPTOR_STDOUT, process_id, "container_process", self._client, stream_type=stdout + ) + self._stderr = _StreamReader( + api_pb2.FILE_DESCRIPTOR_STDERR, process_id, "container_process", self._client, stream_type=stderr + ) self._stdin = _StreamWriter(process_id, "container_process", self._client) @property diff --git a/modal/io_streams.py b/modal/io_streams.py index a22dd87c0..88d0dbac5 100644 --- a/modal/io_streams.py +++ b/modal/io_streams.py @@ -1,15 +1,17 @@ # Copyright Modal Labs 2022 import asyncio -from typing import TYPE_CHECKING, AsyncIterator, Literal, Optional, Tuple, Union +from typing import TYPE_CHECKING, AsyncGenerator, Literal, Optional, Tuple, Union from grpclib import Status from grpclib.exceptions import GRPCError, StreamTerminatedError +from modal.exception import ClientClosed, InvalidError from modal_proto import api_pb2 from ._utils.async_utils import synchronize_api from ._utils.grpc_utils import RETRYABLE_GRPC_STATUS_CODES, retry_transient_errors from .client import _Client +from .stream_type import StreamType if TYPE_CHECKING: pass @@ -17,7 +19,7 @@ async def _sandbox_logs_iterator( sandbox_id: str, file_descriptor: int, last_entry_id: Optional[str], client: _Client -) -> AsyncIterator[Tuple[Optional[api_pb2.TaskLogs], str]]: +) -> AsyncGenerator[Tuple[Optional[str], str], None]: req = api_pb2.SandboxGetLogsRequest( sandbox_id=sandbox_id, file_descriptor=file_descriptor, @@ -35,22 +37,19 @@ async def _sandbox_logs_iterator( async def _container_process_logs_iterator( - process_id: str, file_descriptor: int, last_entry_id: Optional[str], client: _Client -): + process_id: str, file_descriptor: int, client: _Client +) -> AsyncGenerator[Optional[str], None]: req = api_pb2.ContainerExecGetOutputRequest( exec_id=process_id, timeout=55, - last_batch_index=last_entry_id or 0, file_descriptor=file_descriptor, ) async for batch in client.stub.ContainerExecGetOutput.unary_stream(req): if batch.HasField("exit_code"): - yield (None, batch.batch_index) + yield None break for item in batch.items: - # TODO: do this on the server. - if item.file_descriptor == file_descriptor: - yield (item.message, batch.batch_index) + yield item.message class _StreamReader: @@ -80,21 +79,39 @@ def __init__( object_id: str, object_type: Literal["sandbox", "container_process"], client: _Client, + stream_type: StreamType = StreamType.PIPE, by_line: bool = False, # if True, streamed logs are further processed into complete lines. ) -> None: """mdmd:hidden""" + self._file_descriptor = file_descriptor self._object_type = object_type self._object_id = object_id self._client = client self._stream = None self._last_entry_id = None - self._buffer = "" + self._line_buffer = "" self._by_line = by_line # Whether the reader received an EOF. Once EOF is True, it returns # an empty string for any subsequent reads (including async for) self.eof = False + if not isinstance(stream_type, StreamType): + raise TypeError(f"stream_type must be of type StreamType, got {type(stream_type)}") + + # We only support piping sandbox logs because they're meant to be durable logs stored + # on the user's application. + if object_type == "sandbox" and stream_type != StreamType.PIPE: + raise ValueError("Sandbox streams must be piped.") + self._stream_type = stream_type + + if self._object_type == "container_process": + # Container process streams need to be consumed as they are produced, + # otherwise the process will block. Use a buffer to store the stream + # until the client consumes it. + self._container_process_buffer = [] + self._consume_container_process_task = asyncio.create_task(self._consume_container_process_stream()) + @property def file_descriptor(self): return self._file_descriptor @@ -124,7 +141,64 @@ async def read(self) -> str: return data - async def _get_logs(self) -> AsyncIterator[Optional[str]]: + async def _consume_container_process_stream(self): + """ + Consumes the container process stream and stores the messages in the buffer. + """ + if self._stream_type == StreamType.DEVNULL: + return + + completed = False + retries_remaining = 10 + while not completed: + try: + iterator = _container_process_logs_iterator(self._object_id, self._file_descriptor, self._client) + + async for message in iterator: + if self._stream_type == StreamType.STDOUT and message: + print(message, end="") + elif self._stream_type == StreamType.PIPE: + self._container_process_buffer.append(message) + if message is None: + completed = True + break + + except (GRPCError, StreamTerminatedError, ClientClosed) as exc: + if retries_remaining > 0: + retries_remaining -= 1 + if isinstance(exc, GRPCError): + if exc.status in RETRYABLE_GRPC_STATUS_CODES: + await asyncio.sleep(1.0) + continue + elif isinstance(exc, StreamTerminatedError): + continue + elif isinstance(exc, ClientClosed): + # If the client was closed, the user has triggered a cleanup. + break + raise exc + + async def _stream_container_process(self) -> AsyncGenerator[Tuple[Optional[str], str], None]: + """mdmd:hidden + Streams the container process buffer to the reader. + """ + entry_id = 0 + if self._last_entry_id: + entry_id = int(self._last_entry_id) + 1 + + while True: + if entry_id >= len(self._container_process_buffer): + await asyncio.sleep(0.1) + continue + + item = self._container_process_buffer[entry_id] + + yield (item, str(entry_id)) + if item is None: + break + + entry_id += 1 + + async def _get_logs(self) -> AsyncGenerator[Optional[str], None]: """mdmd:hidden Streams sandbox or process logs from the server to the reader. @@ -133,6 +207,9 @@ async def _get_logs(self) -> AsyncIterator[Optional[str]]: When the stream receives an EOF, it yields None. Once an EOF is received, subsequent invocations will not yield logs. """ + if self._stream_type != StreamType.PIPE: + raise InvalidError("Logs can only be retrieved using the PIPE stream type.") + if self.eof: yield None return @@ -147,9 +224,7 @@ async def _get_logs(self) -> AsyncIterator[Optional[str]]: self._object_id, self._file_descriptor, self._last_entry_id, self._client ) else: - iterator = _container_process_logs_iterator( - self._object_id, self._file_descriptor, self._last_entry_id, self._client - ) + iterator = self._stream_container_process() async for message, entry_id in iterator: self._last_entry_id = entry_id @@ -169,28 +244,29 @@ async def _get_logs(self) -> AsyncIterator[Optional[str]]: continue raise - async def _get_logs_by_line(self) -> AsyncIterator[Optional[str]]: + async def _get_logs_by_line(self) -> AsyncGenerator[Optional[str], None]: """mdmd:hidden Processes logs from the server and yields complete lines only. """ async for message in self._get_logs(): if message is None: - if self._buffer: - yield self._buffer - self._buffer = "" + if self._line_buffer: + yield self._line_buffer + self._line_buffer = "" yield None else: - self._buffer += message - while "\n" in self._buffer: - line, self._buffer = self._buffer.split("\n", 1) + self._line_buffer += message + while "\n" in self._line_buffer: + line, self._line_buffer = self._line_buffer.split("\n", 1) yield line + "\n" def __aiter__(self): """mdmd:hidden""" - if self._by_line: - self._stream = self._get_logs_by_line() - else: - self._stream = self._get_logs() + if not self._stream: + if self._by_line: + self._stream = self._get_logs_by_line() + else: + self._stream = self._get_logs() return self async def __anext__(self): diff --git a/modal/sandbox.py b/modal/sandbox.py index 5ee1c5e96..35da56faf 100644 --- a/modal/sandbox.py +++ b/modal/sandbox.py @@ -29,6 +29,7 @@ from .object import _get_environment_name, _Object from .scheduler_placement import SchedulerPlacement from .secret import _Secret +from .stream_type import StreamType _default_image: _Image = _Image.debian_slim() @@ -396,7 +397,16 @@ async def _get_task_id(self): await asyncio.sleep(0.5) return self._task_id - async def exec(self, *cmds: str, pty_info: Optional[api_pb2.PTYInfo] = None): + async def exec( + self, + *cmds: str, + # Deprecated: internal use only + pty_info: Optional[api_pb2.PTYInfo] = None, + stdout: StreamType = StreamType.PIPE, + stderr: StreamType = StreamType.PIPE, + # Internal option to set terminal size and metadata + _pty_info: Optional[api_pb2.PTYInfo] = None, + ): """Execute a command in the Sandbox and return a [`ContainerProcess`](/docs/reference/modal.ContainerProcess#modalcontainer_process) handle. @@ -419,11 +429,11 @@ async def exec(self, *cmds: str, pty_info: Optional[api_pb2.PTYInfo] = None): api_pb2.ContainerExecRequest( task_id=task_id, command=cmds, - pty_info=pty_info, + pty_info=_pty_info or pty_info, runtime_debug=config.get("function_runtime_debug"), ) ) - return _ContainerProcess(resp.exec_id, self._client) + return _ContainerProcess(resp.exec_id, self._client, stdout=stdout, stderr=stderr) @property def stdout(self) -> _StreamReader: diff --git a/modal/stream_type.py b/modal/stream_type.py new file mode 100644 index 000000000..e096febc2 --- /dev/null +++ b/modal/stream_type.py @@ -0,0 +1,15 @@ +# Copyright Modal Labs 2022 +import subprocess +from enum import Enum + + +class StreamType(Enum): + # Discard all logs from the stream. + DEVNULL = subprocess.DEVNULL + # Store logs in a pipe to be read by the client. + PIPE = subprocess.PIPE + # Print logs to stdout immediately. + STDOUT = subprocess.STDOUT + + def __repr__(self): + return f"{self.__module__}.{self.__class__.__name__}.{self.name}" diff --git a/modal_proto/api.proto b/modal_proto/api.proto index d1f183f66..dbade75b5 100644 --- a/modal_proto/api.proto +++ b/modal_proto/api.proto @@ -241,9 +241,13 @@ enum TaskState { } enum VolumeFsVersion { + option allow_alias = true; UNSPECIFIED = 0; V1 = 1; V2 = 2; + VOLUME_FS_VERSION_UNSPECIFIED = 0; + VOLUME_FS_VERSION_V1 = 1; + VOLUME_FS_VERSION_V2 = 2; } enum WebhookAsyncMode { diff --git a/modal_version/_version_generated.py b/modal_version/_version_generated.py index 30da65249..ba34acc06 100644 --- a/modal_version/_version_generated.py +++ b/modal_version/_version_generated.py @@ -1,4 +1,4 @@ # Copyright Modal Labs 2024 # Note: Reset this value to -1 whenever you make a minor `0.X` release of the client. -build_number = 7 # git: 1a2e341 +build_number = 10 # git: 754fa03 diff --git a/test/container_test.py b/test/container_test.py index 2bfdf2d53..8d85b2a67 100644 --- a/test/container_test.py +++ b/test/container_test.py @@ -1059,6 +1059,18 @@ def test_cls_enter_uses_event_loop(servicer): assert _unwrap_scalar(ret) == True +@skip_github_non_linux +def test_cls_with_image(servicer): + ret = _run_container( + servicer, + "test.supports.class_with_image", + "ClassWithImage.*", + inputs=_get_inputs(((), {}), method_name="image_is_hydrated"), + is_class=True, + ) + assert _unwrap_scalar(ret) == True + + @skip_github_non_linux def test_container_heartbeats(servicer): _run_container(servicer, "test.supports.functions", "square") diff --git a/test/supports/class_with_image.py b/test/supports/class_with_image.py new file mode 100644 index 000000000..1dac8bfa5 --- /dev/null +++ b/test/supports/class_with_image.py @@ -0,0 +1,12 @@ +# Copyright Modal Labs 2024 +import modal + +image = modal.Image.debian_slim() +app = modal.App(image=image) + + +@app.cls() +class ClassWithImage: + @modal.method() + def image_is_hydrated(self): + return image.is_hydrated diff --git a/test/telemetry/tracing_module_1.py b/test/telemetry/tracing_module_1.py index f1a432de2..e43e342ac 100644 --- a/test/telemetry/tracing_module_1.py +++ b/test/telemetry/tracing_module_1.py @@ -1,5 +1,5 @@ # Copyright Modal Labs 2022 -from . import tracing_module_2 # noqa +from . import tracing_module_2 # noqa def foo():