Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: save kemono post revision #96

Merged
merged 3 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions nazurin/sites/kemono/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from datetime import datetime, timezone
from mimetypes import guess_type
from typing import Tuple
from typing import ClassVar, Tuple, Union

from bs4 import BeautifulSoup

Expand All @@ -15,10 +15,12 @@


class Kemono:
API_BASE: ClassVar[str] = "https://kemono.su/api/v1"

@network_retry
async def get_post(self, service: str, user_id: str, post_id: str) -> dict:
"""Fetch an post."""
api = f"https://kemono.su/api/v1/{service}/user/{user_id}/post/{post_id}"
api = f"{self.API_BASE}/{service}/user/{user_id}/post/{post_id}"
async with Request() as request:
async with request.get(api) as response:
response.raise_for_status()
Expand All @@ -29,6 +31,27 @@
post["username"] = username
return post

@network_retry
async def get_post_revision(
self, service: str, user_id: str, post_id: str, revision_id: str
) -> dict:
"""Fetch a post revision."""
api = f"{self.API_BASE}/{service}/user/{user_id}/post/{post_id}/revisions"
async with Request() as request:
async with request.get(api) as response:
response.raise_for_status()
revisions = await response.json()
post = None
for revision in revisions:
if str(revision["revision_id"]) == revision_id:
post = revision
break
if not post:
raise NazurinError("Post revision not found")
username = await self.get_username(service, user_id)
post["username"] = username
return post

@network_retry
async def get_username(self, service: str, user_id: str) -> str:
url = f"https://kemono.su/{service}/user/{user_id}"
Expand All @@ -43,8 +66,13 @@
username = tag.get("content", "")
return username

async def fetch(self, service: str, user_id: str, post_id: str) -> Illust:
post = await self.get_post(service, user_id, post_id)
async def fetch(

Check warning on line 69 in nazurin/sites/kemono/api.py

View workflow job for this annotation

GitHub Actions / build (3.8)

Too many local variables (17/15)

Check warning on line 69 in nazurin/sites/kemono/api.py

View workflow job for this annotation

GitHub Actions / build (3.8)

Too many local variables (17/15)
self, service: str, user_id: str, post_id: str, revision_id: Union[str, None]
) -> Illust:
if revision_id:
post = await self.get_post_revision(service, user_id, post_id, revision_id)
else:
post = await self.get_post(service, user_id, post_id)
caption = self.build_caption(post)

images = []
Expand Down Expand Up @@ -113,16 +141,24 @@
filename = FILENAME.format_map(context)
return (DESTINATION.format_map(context), filename + extension)

@staticmethod
def get_url(post: dict) -> str:
url = (
f"https://kemono.su/{post['service']}"
f"/user/{post['user']}/post/{post['id']}"
)
revision = post.get("revision_id")
if revision:
url += f"/revision/{post['revision_id']}"
return url

@staticmethod
def build_caption(post) -> Caption:
return Caption(
{
"title": post["title"],
"author": "#" + post["username"],
"url": (
f"https://kemono.su/{post['service']}"
f"/user/{post['user']}/post/{post['id']}"
),
"url": Kemono.get_url(post),
}
)

Expand Down
9 changes: 6 additions & 3 deletions nazurin/sites/kemono/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,21 @@
# https://kemono.party/dlsite/user/RG12345/post/RE12345
# https://kemono.party/gumroad/user/12345/post/aBc1d2
# https://kemono.su/subscribestar/user/abcdef/post/12345
r"kemono\.(?:party|su)/(\w+)/user/([\w-]+)/post/([\w-]+)"
# https://kemono.su/fanbox/user/12345/post/12345/revision/12345
r"kemono\.(?:party|su)/(\w+)/user/([\w-]+)/post/([\w-]+)(?:/revision/(\d+))?"
]


async def handle(match) -> Illust:
service = match.group(1)
user_id = match.group(2)
post_id = match.group(3)
revision_id = match.group(4)
db = Database().driver()
collection = db.collection(COLLECTION)

illust = await Kemono().fetch(service, user_id, post_id)
illust = await Kemono().fetch(service, user_id, post_id, revision_id)
illust.metadata["collected_at"] = time()
await collection.insert("_".join([service, user_id, post_id]), illust.metadata)
identifier = filter(lambda x: x, [service, user_id, post_id, revision_id])
await collection.insert("_".join(identifier), illust.metadata)
return illust
Loading