Skip to content

Commit

Permalink
feat: limit max parallel download & upload
Browse files Browse the repository at this point in the history
Run download / upload tasks in a pool with concurrency limits to improve stability.
  • Loading branch information
y-young committed Jan 21, 2024
1 parent 0d945b0 commit 91061c2
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 21 deletions.
16 changes: 16 additions & 0 deletions docs/getting-started/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,22 @@ Chunk size when writing downloaded files, in bytes.

_Added in v2.6.0._

## MAX_PARALLEL_DOWNLOAD

:material-lightbulb-on: Optional, defaults to `5`

Maximum number of parallel downloads.

_Added in v2.7.0._

## MAX_PARALLEL_UPLOAD

:material-lightbulb-on: Optional, defaults to `5`

Maximum number of parallel uploads.

_Added in v2.7.0._

## HTTP_PROXY

:material-lightbulb-on: Optional, defaults to your environment
Expand Down
16 changes: 16 additions & 0 deletions docs/getting-started/configuration.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,22 @@ _在 v2 中新增。_

_在 v2.6.0 中新增。_

## MAX_PARALLEL_DOWNLOAD

:material-lightbulb-on: 可选,默认为 `5`

同时下载的最大文件数。

_在 v2.7.0 中新增。_

## MAX_PARALLEL_UPLOAD

:material-lightbulb-on: 可选,默认为 `5`

同时上传的最大文件数。

_在 v2.7.0 中新增。_

## HTTP_PROXY

:material-lightbulb-on: 可选,默认遵循环境变量
Expand Down
2 changes: 2 additions & 0 deletions nazurin/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
RETRIES: int = env.int("RETRIES", default=5)
TIMEOUT: int = env.int("TIMEOUT", default=20)
DOWNLOAD_CHUNK_SIZE: int = env.int("DOWNLOAD_CHUNK_SIZE", default=4096)
MAX_PARALLEL_DOWNLOAD: int = env.int("MAX_PARALLEL_DOWNLOAD", default=5)
MAX_PARALLEL_UPLOAD: int = env.int("MAX_PARALLEL_UPLOAD", default=5)
PROXY: str = env.str("HTTP_PROXY", default=None)
UA: str = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
Expand Down
12 changes: 5 additions & 7 deletions nazurin/models/illust.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
from dataclasses import dataclass, field
from typing import List

from nazurin.config import MAX_PARALLEL_DOWNLOAD
from nazurin.utils import Request
from nazurin.utils.helpers import run_in_pool
from nazurin.utils.network import NazurinRequestSession

from .caption import Caption
Expand Down Expand Up @@ -31,9 +32,6 @@ async def download(
self, *, request_class: NazurinRequestSession = Request, **kwargs
):
async with request_class(**kwargs) as session:
tasks = []
for file in self.all_files:
if not file.url:
continue
tasks.append(asyncio.create_task(file.download(session)))
await asyncio.gather(*tasks)
files = filter(lambda file: file.url, self.all_files)
tasks = [file.download(session) for file in files]
await run_in_pool(tasks, MAX_PARALLEL_DOWNLOAD)
5 changes: 3 additions & 2 deletions nazurin/storage/googledrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
from pydrive2.drive import GoogleDrive as GDrive
from pydrive2.drive import GoogleDriveFile

from nazurin.config import STORAGE_DIR, env
from nazurin.config import MAX_PARALLEL_UPLOAD, STORAGE_DIR, env
from nazurin.models import File
from nazurin.utils import logger
from nazurin.utils.decorators import Cache, async_wrap
from nazurin.utils.exceptions import NazurinError
from nazurin.utils.helpers import run_in_pool

GD_FOLDER = env.str("GD_FOLDER")
GD_CREDENTIALS = env.str(
Expand Down Expand Up @@ -77,7 +78,7 @@ async def store(self, files: List[File]):
folders[destination] = folder_id

tasks = [self.upload(item, folders) for item in files]
await asyncio.gather(*tasks)
await run_in_pool(tasks, MAX_PARALLEL_UPLOAD)

@staticmethod
@Cache.lru()
Expand Down
5 changes: 3 additions & 2 deletions nazurin/storage/mega.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
from mega import Mega as mega
from mega.errors import RequestError

from nazurin.config import NAZURIN_DATA, env
from nazurin.config import MAX_PARALLEL_UPLOAD, NAZURIN_DATA, env
from nazurin.database import Database
from nazurin.models import File
from nazurin.utils import logger
from nazurin.utils.decorators import Cache, async_wrap, network_retry
from nazurin.utils.exceptions import NazurinError
from nazurin.utils.helpers import run_in_pool

MEGA_USER = env.str("MEGA_USER")
MEGA_PASS = env.str("MEGA_PASS")
Expand Down Expand Up @@ -91,7 +92,7 @@ async def store(self, files: List[File]):
folders[destination] = folder_id

tasks = [self.upload(file, folders) for file in files]
await asyncio.gather(*tasks)
await run_in_pool(tasks, MAX_PARALLEL_UPLOAD)

@network_retry
@Cache.lru()
Expand Down
6 changes: 3 additions & 3 deletions nazurin/storage/onedrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

from humanize import naturalsize

from nazurin.config import NAZURIN_DATA, STORAGE_DIR, env
from nazurin.config import MAX_PARALLEL_UPLOAD, NAZURIN_DATA, STORAGE_DIR, env
from nazurin.database import Database
from nazurin.models import File
from nazurin.utils import Request, logger
from nazurin.utils.decorators import Cache, network_retry
from nazurin.utils.exceptions import NazurinError
from nazurin.utils.helpers import read_by_chunks, sanitize_path
from nazurin.utils.helpers import read_by_chunks, run_in_pool, sanitize_path

OD_FOLDER = STORAGE_DIR
OD_CLIENT = env.str("OD_CLIENT")
Expand Down Expand Up @@ -67,7 +67,7 @@ async def store(self, files: List[File]):
await asyncio.gather(*tasks)

tasks = [self.upload(file) for file in files]
await asyncio.gather(*tasks)
await run_in_pool(tasks, MAX_PARALLEL_UPLOAD)

@network_retry
async def find_folder(self, name: str) -> Optional[str]:
Expand Down
6 changes: 3 additions & 3 deletions nazurin/storage/s3.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import asyncio
import mimetypes
import pathlib
from typing import List

from minio import Minio

from nazurin.config import env
from nazurin.config import MAX_PARALLEL_UPLOAD, env
from nazurin.models import File
from nazurin.utils import logger
from nazurin.utils.decorators import async_wrap
from nazurin.utils.helpers import run_in_pool

with env.prefixed("S3_"):
ENDPOINT = env.str("ENDPOINT", default="s3.amazonaws.com")
Expand Down Expand Up @@ -50,6 +50,6 @@ async def store(self, files: List[File]):
await self.check_bucket()

tasks = [self.upload(file) for file in files]
await asyncio.gather(*tasks)
await run_in_pool(tasks, MAX_PARALLEL_UPLOAD)

return True
6 changes: 3 additions & 3 deletions nazurin/storage/telegram.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import asyncio
from typing import List

from humanize import naturalsize

from nazurin import bot
from nazurin.config import env
from nazurin.config import MAX_PARALLEL_UPLOAD, env
from nazurin.models import File
from nazurin.utils import logger
from nazurin.utils.helpers import run_in_pool

ALBUM_ID = env.int("ALBUM_ID")

Expand All @@ -26,5 +26,5 @@ async def store(self, files: List[File]):
)
continue
tasks.append(bot.send_doc(file, chat_id=ALBUM_ID))
await asyncio.gather(*tasks)
await run_in_pool(tasks, MAX_PARALLEL_UPLOAD)
return True
13 changes: 12 additions & 1 deletion nazurin/utils/helpers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os
import pathlib
import re
Expand All @@ -8,10 +9,11 @@
from mimetypes import guess_type
from pathlib import Path
from string import capwords
from typing import Callable, List, Union
from typing import Callable, Coroutine, Iterable, List, Union

import aiofiles
import aiofiles.os
import aiojobs
from aiogram.types import Message
from aiogram.utils.exceptions import (
BadRequest,
Expand Down Expand Up @@ -211,3 +213,12 @@ def check_image(path: Union[str, os.PathLike]) -> bool:
except OSError as error:
logger.warning("Invalid image {}: {}", path, error)
return False


async def run_in_pool(tasks: Iterable[Coroutine], pool_size: int):
scheduler = await aiojobs.create_scheduler(limit=pool_size)
jobs: List[aiojobs.Job] = []
for task in tasks:
jobs.append(await scheduler.spawn(task))
await asyncio.gather(*[job.wait() for job in jobs])
await scheduler.close()

0 comments on commit 91061c2

Please sign in to comment.