Skip to content

Commit

Permalink
workers and site_dir
Browse files Browse the repository at this point in the history
  • Loading branch information
yzqzss committed Jan 9, 2025
1 parent 42d9a6f commit c02312a
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 63 deletions.
11 changes: 9 additions & 2 deletions pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions src/discuz_logger/define/post.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ class Post:
replycredit: str
position: str
groupid: str
number: str
dbdateline: str
groupiconid: str

adminid: Optional[str] = None
attachments: Optional[list] = None
imagelist: Optional[list] = None
memberstatus: Optional[str] = None
username: Optional[str] = None
username: Optional[str] = None
attachlist: Optional[list] = None
number: Optional[str] = None
118 changes: 62 additions & 56 deletions src/discuz_logger/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import json
from pathlib import Path
import traceback
from typing import Coroutine, Tuple

import httpx
Expand All @@ -22,78 +23,83 @@ async def thread_worker(queue: asyncio.Queue[Tuple[MobileApi, ThreadMeta, Path]]
no_first_check = True
while True:
site, thread_meta, thread_dir = await queue.get()
thread_dir.mkdir(parents=True, exist_ok=True)

maxposition = maxposition_last_run = -1
fetched_postion = -2
thread_json_path = thread_dir / "thread.json"
if thread_json_path.exists():
if no_first_check:
continue
thread = json.loads(thread_json_path.read_text())
maxposition_last_run = int(thread["maxposition"])
viewthread = None
tqd = None
page = 1
while fetched_postion < maxposition:
for t in range(3):
try:
viewthread, _ = await site.viewthread(thread_meta.tid, page=page)
except Exception as e:
if t == 2:
raise e
print(f"tid={thread_meta.tid} page={page} ,retry={t} error:{e}")
await asyncio.sleep(10)
try:
thread_dir.mkdir(parents=True, exist_ok=True)

maxposition = maxposition_last_run = -1
fetched_postion = -2
thread_json_path = thread_dir / "thread.json"
if thread_json_path.exists():
if no_first_check:
continue
maxposition = APIHelper.get_maxposition(viewthread)

if maxposition == maxposition_last_run:
break
if len(APIHelper.get_posts(viewthread)) == 0:
assert int(thread_meta.readperm) != 0
print(f"tid={thread_meta.tid} readperm:{thread_meta.readperm}")
break

if tqd is None:
tqd = tqdm(desc=f"tid={thread_meta.tid} subj:{thread_meta.subject}", unit="posts", dynamic_ncols=True)

for post, post_raw in APIHelper.get_posts(viewthread):
fetched_postion = int(post.position) if int(post.position) > fetched_postion else fetched_postion

with open(thread_dir / f"pid-{post.pid}.json", "w") as f:
f.write(json_dump(post_raw))

if fetched_postion < 0:
fetched_postion = 0

tqd.total = maxposition
tqd.n = fetched_postion
tqd.refresh()

page += 1

assert viewthread is not None
thread_json_path.write_text(json_dump(APIHelper.get_thread(viewthread)))

tqd.close() if tqd is not None else None
queue.task_done()
thread = json.loads(thread_json_path.read_text())
maxposition_last_run = int(thread["maxposition"])
viewthread = None
page = 1
while fetched_postion < maxposition:
if tqd is None:
tqd = tqdm(desc=f"tid={thread_meta.tid} subj:{thread_meta.subject}", unit="posts", dynamic_ncols=True)

for t in range(3):
try:
viewthread, r = await site.viewthread(thread_meta.tid, page=page)
except Exception as e:
if t == 2:
raise e
print(f"tid={thread_meta.tid} page={page} ,retry={t} error:{e}")
await asyncio.sleep(10)
continue
assert viewthread is not None
maxposition = APIHelper.get_maxposition(viewthread)

if maxposition == maxposition_last_run:
break
if len(APIHelper.get_posts(viewthread)) == 0:
print(f"tid={thread_meta.tid} page={page} no post. {viewthread['Message']}")
break

for post, post_raw in APIHelper.get_posts(viewthread):
fetched_postion = int(post.position) if int(post.position) > fetched_postion else fetched_postion

with open(thread_dir / f"pid-{post.pid}.json", "w") as f:
f.write(json_dump(post_raw))

if fetched_postion < 0:
fetched_postion = 0

tqd.total = maxposition
if tqd.n == fetched_postion:
print(f"tid={thread_meta.tid} in_loop? {fetched_postion}. break")
break
tqd.n = fetched_postion
tqd.refresh()

page += 1
assert viewthread is not None
thread_json_path.write_text(json_dump(APIHelper.get_thread(viewthread)))
except Exception:
traceback.print_exc()
finally:
tqd.close() if tqd is not None else None
queue.task_done()


async def _main():
args = arg_parser()

forum_queue: asyncio.Queue[Coroutine] = asyncio.Queue(maxsize=5)
threads_queue: asyncio.Queue[Tuple[MobileApi, ThreadMeta, Path]] = asyncio.Queue(maxsize=100)
for i in range(5):
for i in range(args.workers // 2 + 1):
asyncio.create_task(forum_worker(forum_queue))
for i in range(3):
for i in range(args.workers):
asyncio.create_task(thread_worker(threads_queue))

transport = httpx.AsyncHTTPTransport(retries=5, http1=True, http2=True)
client = httpx.AsyncClient(transport=transport, headers={"User-Agent": "saveweb/0.1 ([email protected])"}, timeout=30)
site = MobileApi(client, args.site)
check, r = await site.check_mobile_api()
site_dir = Path("data") / "site" / check.mysiteid
site_dir = Path("data") / "site" / (check.mysiteid or check.sitename)
site_dir.mkdir(parents=True, exist_ok=True)
with open(site_dir / "check.json", "w") as f:
f.write(json_dump(r.json()))
Expand Down
7 changes: 4 additions & 3 deletions src/discuz_logger/mobile_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ async def forumdisplay(self, fid, page=1, orderby="dateline", filter="author"):
async def iter_threads(self, fid: str | int):
page = 1
forumdisplay = await self.forumdisplay(fid, page=page)
assert int(forumdisplay["Variables"]["forum"]["threads"]) <= int(
forumdisplay["Variables"]["forum"]["threadcount"]
)
threads_total: int = int(forumdisplay["Variables"]["forum"]["threads"])
threads_fetched: int = 0
tqd = tqdm(total=threads_total, desc=f"fid={fid}", unit="threads",dynamic_ncols=True)
Expand All @@ -94,4 +91,8 @@ async def iter_threads(self, fid: str | int):

tqd.total = threads_total
tqd.update(len(threads))

if len(threads) == 0:
print(f'fid={fid}: {forumdisplay["Message"]}')
break
tqd.close()
2 changes: 2 additions & 0 deletions src/discuz_logger/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
@dataclass
class Args:
site: str
workers: int

def arg_parser():
parser = argparse.ArgumentParser()
parser.add_argument("--site", type=str, required=True)
parser.add_argument("--workers", type=int, default=5)
return Args(**vars(parser.parse_args()))


Expand Down

0 comments on commit c02312a

Please sign in to comment.