Skip to content

Commit

Permalink
Merge branch 'kramstrom/mod-2454-async-map-2' of github.com:modal-lab…
Browse files Browse the repository at this point in the history
…s/modal-client into kramstrom/mod-2454-async-map-2
  • Loading branch information
kramstrom committed Oct 31, 2024
2 parents 4733f97 + d3833db commit 84115de
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 34 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ We appreciate your patience while we speedily work towards a stable release of t

<!-- NEW CONTENT GENERATED BELOW. PLEASE PRESERVE THIS COMMENT. -->

### 0.65.9 (2024-10-31)

Output from `Sandbox.exec` can now be directed to `/dev/null`, `stdout`, or stored for consumption. This behavior can be controlled via the new `StreamType` arguments.



### 0.65.8 (2024-10-31)

- Fixed a bug where the `Image.imports` context manager would not correctly propagate ImportError when using a `modal.Cls`.



### 0.65.2 (2024-10-30)

* Fix issue where `modal run` wouldn't exit for 10s if there was a failure during app creation
Expand Down
8 changes: 7 additions & 1 deletion modal/_container_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,10 +629,16 @@ def import_class_service(

if isinstance(cls, Cls):
# The cls decorator is in global scope
method_partials = synchronizer._translate_in(cls._get_partial_functions())
_cls = synchronizer._translate_in(cls)
method_partials = _cls._get_partial_functions()
function = _cls._class_service_function
else:
# Undecorated user class - find all methods
method_partials = _find_partial_methods_for_user_cls(cls, _PartialFunctionFlags.all())
function = None

if function:
code_deps = function.deps(only_explicit_mounts=True)

user_cls_instance = get_user_class_instance(cls, cls_args, cls_kwargs)

Expand Down
17 changes: 14 additions & 3 deletions modal/container_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .client import _Client
from .exception import InteractiveTimeoutError, InvalidError
from .io_streams import _StreamReader, _StreamWriter
from .stream_type import StreamType


class _ContainerProcess:
Expand All @@ -20,11 +21,21 @@ class _ContainerProcess:
_stdin: _StreamWriter
_returncode: Optional[int] = None

def __init__(self, process_id: str, client: _Client) -> None:
def __init__(
self,
process_id: str,
client: _Client,
stdout: StreamType = StreamType.PIPE,
stderr: StreamType = StreamType.PIPE,
) -> None:
self._process_id = process_id
self._client = client
self._stdout = _StreamReader(api_pb2.FILE_DESCRIPTOR_STDOUT, process_id, "container_process", self._client)
self._stderr = _StreamReader(api_pb2.FILE_DESCRIPTOR_STDERR, process_id, "container_process", self._client)
self._stdout = _StreamReader(
api_pb2.FILE_DESCRIPTOR_STDOUT, process_id, "container_process", self._client, stream_type=stdout
)
self._stderr = _StreamReader(
api_pb2.FILE_DESCRIPTOR_STDERR, process_id, "container_process", self._client, stream_type=stderr
)
self._stdin = _StreamWriter(process_id, "container_process", self._client)

@property
Expand Down
126 changes: 101 additions & 25 deletions modal/io_streams.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
# Copyright Modal Labs 2022
import asyncio
from typing import TYPE_CHECKING, AsyncIterator, Literal, Optional, Tuple, Union
from typing import TYPE_CHECKING, AsyncGenerator, Literal, Optional, Tuple, Union

from grpclib import Status
from grpclib.exceptions import GRPCError, StreamTerminatedError

from modal.exception import ClientClosed, InvalidError
from modal_proto import api_pb2

from ._utils.async_utils import synchronize_api
from ._utils.grpc_utils import RETRYABLE_GRPC_STATUS_CODES, retry_transient_errors
from .client import _Client
from .stream_type import StreamType

if TYPE_CHECKING:
pass


async def _sandbox_logs_iterator(
sandbox_id: str, file_descriptor: int, last_entry_id: Optional[str], client: _Client
) -> AsyncIterator[Tuple[Optional[api_pb2.TaskLogs], str]]:
) -> AsyncGenerator[Tuple[Optional[str], str], None]:
req = api_pb2.SandboxGetLogsRequest(
sandbox_id=sandbox_id,
file_descriptor=file_descriptor,
Expand All @@ -35,22 +37,19 @@ async def _sandbox_logs_iterator(


async def _container_process_logs_iterator(
process_id: str, file_descriptor: int, last_entry_id: Optional[str], client: _Client
):
process_id: str, file_descriptor: int, client: _Client
) -> AsyncGenerator[Optional[str], None]:
req = api_pb2.ContainerExecGetOutputRequest(
exec_id=process_id,
timeout=55,
last_batch_index=last_entry_id or 0,
file_descriptor=file_descriptor,
)
async for batch in client.stub.ContainerExecGetOutput.unary_stream(req):
if batch.HasField("exit_code"):
yield (None, batch.batch_index)
yield None
break
for item in batch.items:
# TODO: do this on the server.
if item.file_descriptor == file_descriptor:
yield (item.message, batch.batch_index)
yield item.message


class _StreamReader:
Expand Down Expand Up @@ -80,21 +79,39 @@ def __init__(
object_id: str,
object_type: Literal["sandbox", "container_process"],
client: _Client,
stream_type: StreamType = StreamType.PIPE,
by_line: bool = False, # if True, streamed logs are further processed into complete lines.
) -> None:
"""mdmd:hidden"""

self._file_descriptor = file_descriptor
self._object_type = object_type
self._object_id = object_id
self._client = client
self._stream = None
self._last_entry_id = None
self._buffer = ""
self._line_buffer = ""
self._by_line = by_line
# Whether the reader received an EOF. Once EOF is True, it returns
# an empty string for any subsequent reads (including async for)
self.eof = False

if not isinstance(stream_type, StreamType):
raise TypeError(f"stream_type must be of type StreamType, got {type(stream_type)}")

# We only support piping sandbox logs because they're meant to be durable logs stored
# on the user's application.
if object_type == "sandbox" and stream_type != StreamType.PIPE:
raise ValueError("Sandbox streams must be piped.")
self._stream_type = stream_type

if self._object_type == "container_process":
# Container process streams need to be consumed as they are produced,
# otherwise the process will block. Use a buffer to store the stream
# until the client consumes it.
self._container_process_buffer = []
self._consume_container_process_task = asyncio.create_task(self._consume_container_process_stream())

@property
def file_descriptor(self):
return self._file_descriptor
Expand Down Expand Up @@ -124,7 +141,64 @@ async def read(self) -> str:

return data

async def _get_logs(self) -> AsyncIterator[Optional[str]]:
async def _consume_container_process_stream(self):
"""
Consumes the container process stream and stores the messages in the buffer.
"""
if self._stream_type == StreamType.DEVNULL:
return

completed = False
retries_remaining = 10
while not completed:
try:
iterator = _container_process_logs_iterator(self._object_id, self._file_descriptor, self._client)

async for message in iterator:
if self._stream_type == StreamType.STDOUT and message:
print(message, end="")
elif self._stream_type == StreamType.PIPE:
self._container_process_buffer.append(message)
if message is None:
completed = True
break

except (GRPCError, StreamTerminatedError, ClientClosed) as exc:
if retries_remaining > 0:
retries_remaining -= 1
if isinstance(exc, GRPCError):
if exc.status in RETRYABLE_GRPC_STATUS_CODES:
await asyncio.sleep(1.0)
continue
elif isinstance(exc, StreamTerminatedError):
continue
elif isinstance(exc, ClientClosed):
# If the client was closed, the user has triggered a cleanup.
break
raise exc

async def _stream_container_process(self) -> AsyncGenerator[Tuple[Optional[str], str], None]:
"""mdmd:hidden
Streams the container process buffer to the reader.
"""
entry_id = 0
if self._last_entry_id:
entry_id = int(self._last_entry_id) + 1

while True:
if entry_id >= len(self._container_process_buffer):
await asyncio.sleep(0.1)
continue

item = self._container_process_buffer[entry_id]

yield (item, str(entry_id))
if item is None:
break

entry_id += 1

async def _get_logs(self) -> AsyncGenerator[Optional[str], None]:
"""mdmd:hidden
Streams sandbox or process logs from the server to the reader.
Expand All @@ -133,6 +207,9 @@ async def _get_logs(self) -> AsyncIterator[Optional[str]]:
When the stream receives an EOF, it yields None. Once an EOF is received,
subsequent invocations will not yield logs.
"""
if self._stream_type != StreamType.PIPE:
raise InvalidError("Logs can only be retrieved using the PIPE stream type.")

if self.eof:
yield None
return
Expand All @@ -147,9 +224,7 @@ async def _get_logs(self) -> AsyncIterator[Optional[str]]:
self._object_id, self._file_descriptor, self._last_entry_id, self._client
)
else:
iterator = _container_process_logs_iterator(
self._object_id, self._file_descriptor, self._last_entry_id, self._client
)
iterator = self._stream_container_process()

async for message, entry_id in iterator:
self._last_entry_id = entry_id
Expand All @@ -169,28 +244,29 @@ async def _get_logs(self) -> AsyncIterator[Optional[str]]:
continue
raise

async def _get_logs_by_line(self) -> AsyncIterator[Optional[str]]:
async def _get_logs_by_line(self) -> AsyncGenerator[Optional[str], None]:
"""mdmd:hidden
Processes logs from the server and yields complete lines only.
"""
async for message in self._get_logs():
if message is None:
if self._buffer:
yield self._buffer
self._buffer = ""
if self._line_buffer:
yield self._line_buffer
self._line_buffer = ""
yield None
else:
self._buffer += message
while "\n" in self._buffer:
line, self._buffer = self._buffer.split("\n", 1)
self._line_buffer += message
while "\n" in self._line_buffer:
line, self._line_buffer = self._line_buffer.split("\n", 1)
yield line + "\n"

def __aiter__(self):
"""mdmd:hidden"""
if self._by_line:
self._stream = self._get_logs_by_line()
else:
self._stream = self._get_logs()
if not self._stream:
if self._by_line:
self._stream = self._get_logs_by_line()
else:
self._stream = self._get_logs()
return self

async def __anext__(self):
Expand Down
16 changes: 13 additions & 3 deletions modal/sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .object import _get_environment_name, _Object
from .scheduler_placement import SchedulerPlacement
from .secret import _Secret
from .stream_type import StreamType

_default_image: _Image = _Image.debian_slim()

Expand Down Expand Up @@ -396,7 +397,16 @@ async def _get_task_id(self):
await asyncio.sleep(0.5)
return self._task_id

async def exec(self, *cmds: str, pty_info: Optional[api_pb2.PTYInfo] = None):
async def exec(
self,
*cmds: str,
# Deprecated: internal use only
pty_info: Optional[api_pb2.PTYInfo] = None,
stdout: StreamType = StreamType.PIPE,
stderr: StreamType = StreamType.PIPE,
# Internal option to set terminal size and metadata
_pty_info: Optional[api_pb2.PTYInfo] = None,
):
"""Execute a command in the Sandbox and return
a [`ContainerProcess`](/docs/reference/modal.ContainerProcess#modalcontainer_process) handle.
Expand All @@ -419,11 +429,11 @@ async def exec(self, *cmds: str, pty_info: Optional[api_pb2.PTYInfo] = None):
api_pb2.ContainerExecRequest(
task_id=task_id,
command=cmds,
pty_info=pty_info,
pty_info=_pty_info or pty_info,
runtime_debug=config.get("function_runtime_debug"),
)
)
return _ContainerProcess(resp.exec_id, self._client)
return _ContainerProcess(resp.exec_id, self._client, stdout=stdout, stderr=stderr)

@property
def stdout(self) -> _StreamReader:
Expand Down
15 changes: 15 additions & 0 deletions modal/stream_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright Modal Labs 2022
import subprocess
from enum import Enum


class StreamType(Enum):
# Discard all logs from the stream.
DEVNULL = subprocess.DEVNULL
# Store logs in a pipe to be read by the client.
PIPE = subprocess.PIPE
# Print logs to stdout immediately.
STDOUT = subprocess.STDOUT

def __repr__(self):
return f"{self.__module__}.{self.__class__.__name__}.{self.name}"
4 changes: 4 additions & 0 deletions modal_proto/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,13 @@ enum TaskState {
}

enum VolumeFsVersion {
option allow_alias = true;
UNSPECIFIED = 0;
V1 = 1;
V2 = 2;
VOLUME_FS_VERSION_UNSPECIFIED = 0;
VOLUME_FS_VERSION_V1 = 1;
VOLUME_FS_VERSION_V2 = 2;
}

enum WebhookAsyncMode {
Expand Down
2 changes: 1 addition & 1 deletion modal_version/_version_generated.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright Modal Labs 2024

# Note: Reset this value to -1 whenever you make a minor `0.X` release of the client.
build_number = 7 # git: 1a2e341
build_number = 10 # git: 754fa03
12 changes: 12 additions & 0 deletions test/container_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,18 @@ def test_cls_enter_uses_event_loop(servicer):
assert _unwrap_scalar(ret) == True


@skip_github_non_linux
def test_cls_with_image(servicer):
ret = _run_container(
servicer,
"test.supports.class_with_image",
"ClassWithImage.*",
inputs=_get_inputs(((), {}), method_name="image_is_hydrated"),
is_class=True,
)
assert _unwrap_scalar(ret) == True


@skip_github_non_linux
def test_container_heartbeats(servicer):
_run_container(servicer, "test.supports.functions", "square")
Expand Down
Loading

0 comments on commit 84115de

Please sign in to comment.