Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Windows Threading Issues #385

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Changes from 17 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
134963b
Fix bug on windows with uvicorn when multiple workers.
Dec 4, 2024
cb8e349
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 4, 2024
47efcc6
Force socket to listen before starting server
Dec 5, 2024
fd977d4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 5, 2024
a2c269a
Update src/litserve/server.py
aniketmaurya Dec 6, 2024
0f5c29b
Fix Ctrl+C on windows
Dec 8, 2024
2b27f14
Merge remote-tracking branch 'origin/bugfix/windows_multiple_workers'…
Dec 8, 2024
494fcf9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 8, 2024
f9c3141
Fix comments - Ctrl+C on Windows
Dec 8, 2024
583a249
Merge branch 'main' into bugfix/windows_multiple_workers
FrsECM Dec 8, 2024
d05e664
Update src/litserve/server.py
FrsECM Dec 8, 2024
cbc9be5
Update src/litserve/server.py
FrsECM Dec 8, 2024
2d598ed
Fix threading import Thread
Dec 8, 2024
d84e445
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 8, 2024
3d7ff88
Merge branch 'main' into bugfix/windows_multiple_workers
aniketmaurya Dec 10, 2024
3d51d28
Merge branch 'main' into bugfix/windows_multiple_workers
Borda Dec 11, 2024
2478dcd
Merge branch 'main' into bugfix/windows_multiple_workers
aniketmaurya Dec 11, 2024
3103bb8
Increase test timeout => 30mn
Dec 13, 2024
63b1963
Merge branch 'main' into bugfix/windows_multiple_workers
FrsECM Dec 13, 2024
bde6fdb
Merge branch 'main' into bugfix/windows_multiple_workers
aniketmaurya Jan 5, 2025
d1d2c70
Fix default self._uvicorn_servers
FrsECM Jan 24, 2025
5f16eaa
Fix sockets iteration
FrsECM Jan 24, 2025
5756de4
Merge branch 'upstream/main' into bugfix/windows_multiple_workers
FrsECM Jan 24, 2025
85a6a1c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 45 additions & 11 deletions src/litserve/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager
from multiprocessing.context import Process
from queue import Empty
from threading import Thread
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union

import uvicorn
import uvicorn.server
from fastapi import Depends, FastAPI, HTTPException, Request, Response
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.security import APIKeyHeader
Expand Down Expand Up @@ -233,6 +236,7 @@ def __init__(
self.model_metadata = model_metadata
self._connector = _Connector(accelerator=accelerator, devices=devices)
self._callback_runner = CallbackRunner(callbacks)
self._uvicorn_servers = None
FrsECM marked this conversation as resolved.
Show resolved Hide resolved

specs = spec if spec is not None else []
self._specs = specs if isinstance(specs, Sequence) else [specs]
Expand Down Expand Up @@ -559,19 +563,40 @@ def run(
elif api_server_worker_type is None:
api_server_worker_type = "process"

manager, litserve_workers = self.launch_inference_worker(num_api_servers)
manager, inference_workers = self.launch_inference_worker(num_api_servers)

self.verify_worker_status()
try:
servers = self._start_server(port, num_api_servers, log_level, sockets, api_server_worker_type, **kwargs)
uvicorn_workers = self._start_server(
port, num_api_servers, log_level, sockets, api_server_worker_type, **kwargs
)
print(f"Swagger UI is available at http://0.0.0.0:{port}/docs")
for s in servers:
s.join()
if sys.platform != "win32":
# On Linux, kill signal will be captured by uvicorn.
# => They will join and raise a KeyboardInterrupt, allowing to Shutdown server.
for uw in uvicorn_workers:
uw: Union[Process, Thread]
uw.join()
else:
# On Windows, kill signal is captured by inference workers.
# => They will join and raise a KeyboardInterrupt, allowing to Shutdown Server
for iw in inference_workers:
iw: Process
iw.join()
except KeyboardInterrupt:
# KeyboardInterruption
if sys.platform == "win32":
# We kindly ask uvicorn servers to exit.
# It will properly end threads on windows.
for us in self._uvicorn_servers:
us: uvicorn.Server
us.should_exit = True
finally:
print("Shutting down LitServe")
for w in litserve_workers:
w.terminate()
w.join()
for iw in inference_workers:
iw: Process
iw.terminate()
iw.join()
manager.shutdown()

def _prepare_app_run(self, app: FastAPI):
Expand All @@ -581,16 +606,24 @@ def _prepare_app_run(self, app: FastAPI):
app.add_middleware(RequestCountMiddleware, active_counter=active_counter)

def _start_server(self, port, num_uvicorn_servers, log_level, sockets, uvicorn_worker_type, **kwargs):
servers = []
workers = []
self._uvicorn_servers = []
for response_queue_id in range(num_uvicorn_servers):
self.app.response_queue_id = response_queue_id
if self.lit_spec:
self.lit_spec.response_queue_id = response_queue_id
app: FastAPI = copy.copy(self.app)

self._prepare_app_run(app)

config = uvicorn.Config(app=app, host="0.0.0.0", port=port, log_level=log_level, **kwargs)
if sys.platform == "win32" and num_uvicorn_servers > 1:
logger.debug("Enable Windows explicit socket sharing...")
# We make sure sockets is listening...
# It prevents further [WinError 10022]
[sock.listen(config.backlog) for sock in sockets]
FrsECM marked this conversation as resolved.
Show resolved Hide resolved
# We add worker to say unicorn to use a shared socket (win32)
# https://github.com/encode/uvicorn/pull/802
config.workers = num_uvicorn_servers
server = uvicorn.Server(config=config)
if uvicorn_worker_type == "process":
ctx = mp.get_context("fork")
Expand All @@ -600,8 +633,9 @@ def _start_server(self, port, num_uvicorn_servers, log_level, sockets, uvicorn_w
else:
raise ValueError("Invalid value for api_server_worker_type. Must be 'process' or 'thread'")
w.start()
servers.append(w)
return servers
workers.append(w)
self._uvicorn_servers.append(server)
return workers

def setup_auth(self):
if hasattr(self.lit_api, "authorize") and callable(self.lit_api.authorize):
Expand Down
Loading