-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Moebooru: Collect entire collection instead of single post #44
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
import json | ||
import os | ||
import re | ||
from typing import List, Optional | ||
from urllib.parse import unquote | ||
|
||
|
@@ -13,103 +14,165 @@ | |
from nazurin.utils.exceptions import NazurinError | ||
from nazurin.utils.helpers import ensureExistence | ||
|
||
|
||
class Moebooru(object): | ||
def site(self, site_url: Optional[str] = 'yande.re'): | ||
def site(self, site_url: Optional[str] = "yande.re"): | ||
self.url = site_url | ||
return self | ||
|
||
async def getPost(self, post_id: int): | ||
url = 'https://' + self.url + '/post/show/' + str(post_id) | ||
url = "https://" + self.url + "/post/show/" + str(post_id) | ||
async with Request() as request: | ||
async with request.get(url) as response: | ||
try: | ||
response.raise_for_status() | ||
except ClientResponseError as err: | ||
raise NazurinError(err) from None | ||
response = await response.text() | ||
soup = BeautifulSoup(response, 'html.parser') | ||
soup = BeautifulSoup(response, "html.parser") | ||
tag = soup.find(id="post-view").find(recursive=False) | ||
if tag.name == 'script': | ||
if tag.name == "script": | ||
content = str.strip(tag.string) | ||
elif tag.name == 'div' and ('status-notice' in tag['class']): | ||
elif tag.name == "div" and ("status-notice" in tag["class"]): | ||
raise NazurinError(tag.get_text(strip=True)) | ||
else: | ||
logger.error(tag) | ||
raise NazurinError('Unknown error') | ||
raise NazurinError("Unknown error") | ||
|
||
info = content[19:-2] | ||
try: | ||
info = json.loads(info) | ||
post = info['posts'][0] | ||
tags = info['tags'] | ||
post = info["posts"][0] | ||
tags = info["tags"] | ||
except json.decoder.JSONDecodeError as err: | ||
logger.error(err) | ||
return post, tags | ||
|
||
async def getCollection(self, parent_id: int): | ||
url = "https://" + self.url + "/post?tags=parent:" + str(parent_id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's an official API for post search, so we don't need to bother with the regex here. |
||
async with Request() as request: | ||
async with request.get(url) as response: | ||
try: | ||
response.raise_for_status() | ||
except ClientResponseError as err: | ||
raise NazurinError(err) from None | ||
response = await response.text() | ||
|
||
# parse tags in Post.register_tags block | ||
tag_re = r"Post.register_tags\(\{([\s\S]+?)\}\)" | ||
tag_ma = re.search(tag_re, response, re.MULTILINE) | ||
if tag_ma: | ||
tags = "{" + tag_ma.group(1) + "}" | ||
try: | ||
tags = json.loads(tags) | ||
except json.decoder.JSONDecodeError as err: | ||
logger.error(err) | ||
|
||
# parse post data in Post.register blocks | ||
post_re = r"Post.register\(\{([\s\S]+?)\}\)" | ||
post_ma = re.finditer(post_re, response, re.MULTILINE) | ||
if not post_ma: | ||
raise NazurinError("No post find in parent collection") | ||
posts = [] | ||
for _, match in enumerate(post_ma, start=1): | ||
post = "{" + match.group(1) + "}" | ||
try: | ||
post = json.loads(post) | ||
if post["has_children"]: | ||
posts.insert(0, post) | ||
else: | ||
posts.append(post) | ||
except json.decoder.JSONDecodeError as err: | ||
logger.error(err) | ||
|
||
return posts, tags | ||
|
||
async def view(self, post_id: int) -> Illust: | ||
post, tags = await self.getPost(post_id) | ||
imgs = self.getImages(post) | ||
caption = self.buildCaption(post, tags) | ||
return Illust(imgs, caption, post) | ||
parent_id = post["parent_id"] | ||
if not parent_id and post["has_children"]: | ||
parent_id = post_id | ||
if parent_id: | ||
posts, tags = await self.getCollection(parent_id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure that everyone needs both the parent and the children. Take myself as an example, when collecting a post with parent, I tend to choose the parent post, since in most cases it has the best quality. Therefore I would suggest giving user options like these:
|
||
else: | ||
posts = [post] | ||
imgs = self.getImages(posts) | ||
caption = self.buildCaption(posts, tags) | ||
metadata = {"url": caption["url"], "tags": tags, "posts": posts} | ||
return Illust(imgs, caption, metadata) | ||
|
||
def pool(self, pool_id: int, jpeg=False): | ||
client = moebooru(self.site) | ||
info = client.pool_posts(id=pool_id) | ||
posts = info['posts'] | ||
posts = info["posts"] | ||
imgs = list() | ||
for post in posts: | ||
if not jpeg: | ||
url = post['file_url'] | ||
url = post["file_url"] | ||
else: | ||
url = post['jpeg_url'] | ||
url = post["jpeg_url"] | ||
name, _ = self.parseUrl(url) | ||
imgs.append(Image(name, url)) | ||
caption = Caption({ | ||
'name': info['name'], | ||
'description': info['description'] | ||
}) | ||
caption = Caption({"name": info["name"], "description": info["description"]}) | ||
return imgs, caption | ||
|
||
async def download_pool(self, pool_id, jpeg=False): | ||
imgs, caption = self.pool(pool_id, jpeg) | ||
pool_name = caption['name'] | ||
pool_name = caption["name"] | ||
ensureExistence(os.path.join(TEMP_DIR, pool_name)) | ||
for key, img in enumerate(imgs): | ||
filename = str(key + 1) | ||
filename = '0' * (3 - len(filename)) + filename | ||
filename = "0" * (3 - len(filename)) + filename | ||
_, ext = self.parseUrl(img.url) | ||
filename += ext | ||
img.name = pool_name + '/' + img.name | ||
img.name = pool_name + "/" + img.name | ||
await img.download() # TODO | ||
|
||
def getImages(self, post) -> List[Image]: | ||
file_url = post['file_url'] | ||
name = unquote(os.path.basename(file_url)) | ||
imgs = [ | ||
Image(name, file_url, post['sample_url'], post['file_size'], | ||
post['width'], post['height']) | ||
] | ||
def getImages(self, posts) -> List[Image]: | ||
imgs = [] | ||
for post in posts: | ||
file_url = post["file_url"] | ||
name = unquote(os.path.basename(file_url)) | ||
imgs.append( | ||
Image( | ||
name, | ||
file_url, | ||
post["sample_url"], | ||
post["file_size"], | ||
post["width"], | ||
post["height"], | ||
) | ||
) | ||
return imgs | ||
|
||
def buildCaption(self, post, tags) -> Caption: | ||
def buildCaption(self, posts, tags) -> Caption: | ||
"""Build media caption from an post.""" | ||
title = post['tags'] | ||
source = post['source'] | ||
post = posts[0] | ||
parent_id = post["parent_id"] | ||
has_children = post["has_children"] | ||
if has_children: | ||
url = f"https://{self.url}/post?tags=parent:{str(post['id'])}" | ||
parent_id = post["id"] | ||
else: | ||
url = f"https://{self.url}/post/show/{str(post['id'])}" | ||
title = post["tags"] | ||
tag_string = artists = str() | ||
for tag, tag_type in tags.items(): | ||
if tag_type == 'artist': | ||
artists += tag + ' ' | ||
if tag_type == "artist": | ||
artists += tag + " " | ||
else: | ||
tag_string += '#' + tag + ' ' | ||
caption = Caption({ | ||
'title': title, | ||
'artists': artists, | ||
'url': 'https://' + self.url + '/post/show/' + str(post['id']), | ||
'tags': tag_string, | ||
'source': source, | ||
'parent_id': post['parent_id'], | ||
'has_children': post['has_children'] | ||
}) | ||
tag_string += "#" + tag + " " | ||
source = " ".join(list(set([p["source"] for p in posts]))).strip() | ||
caption = Caption( | ||
{ | ||
"title": title, | ||
"artists": artists, | ||
"url": url, | ||
"tags": tag_string, | ||
"source": source, | ||
"parent_id": parent_id, | ||
} | ||
) | ||
return caption | ||
|
||
def parseUrl(self, url: str) -> str: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please adjust your code formatter to eliminate these changes, including the extra blank line above and the single quotes. The code format issue should be dealt later in a separate commit if necessary.