diff --git a/maigret/checking.py b/maigret/checking.py index e5ef6dd9..4dfb5ba0 100644 --- a/maigret/checking.py +++ b/maigret/checking.py @@ -26,11 +26,7 @@ from . import errors from .activation import ParsingActivator, import_aiohttp_cookies from .errors import CheckError -from .executors import ( - AsyncExecutor, - AsyncioSimpleExecutor, - AsyncioProgressbarQueueExecutor, -) +from .executors import AsyncioQueueGeneratorExecutor from .result import MaigretCheckResult, MaigretCheckStatus from .sites import MaigretDatabase, MaigretSite from .types import QueryOptions, QueryResultWrapper @@ -670,18 +666,13 @@ async def maigret( await debug_ip_request(clearweb_checker, logger) # setup parallel executor - executor: Optional[AsyncExecutor] = None - if no_progressbar: - # TODO: switch to AsyncioProgressbarQueueExecutor with progress object mock - executor = AsyncioSimpleExecutor(logger=logger) - else: - executor = AsyncioProgressbarQueueExecutor( - logger=logger, - in_parallel=max_connections, - timeout=timeout + 0.5, - *args, - **kwargs, - ) + executor = AsyncioQueueGeneratorExecutor( + logger=logger, + in_parallel=max_connections, + timeout=timeout + 0.5, + *args, + **kwargs, + ) # make options objects for all the requests options: QueryOptions = {} @@ -728,13 +719,17 @@ async def maigret( }, ) - cur_results = await executor.run(tasks_dict.values()) - - # wait for executor timeout errors - await asyncio.sleep(1) + cur_results = [] + with alive_bar( + len(tasks_dict), title="Searching", force_tty=True, disable=no_progressbar + ) as progress: + async for result in executor.run(tasks_dict.values()): + cur_results.append(result) + progress() all_results.update(cur_results) + # rerun for failed sites sites = get_failed_sites(dict(cur_results)) attempts -= 1 diff --git a/maigret/executors.py b/maigret/executors.py index a548dc26..a14c4c2c 100644 --- a/maigret/executors.py +++ b/maigret/executors.py @@ -1,7 +1,7 @@ import asyncio import sys import time -from typing import Any, Iterable, List +from typing import Any, Iterable, List, Callable import alive_progress from alive_progress import alive_bar @@ -19,6 +19,7 @@ def create_task_func(): class AsyncExecutor: + # Deprecated: will be removed soon, don't use it def __init__(self, *args, **kwargs): self.logger = kwargs['logger'] @@ -34,6 +35,7 @@ async def _run(self, tasks: Iterable[QueryDraft]): class AsyncioSimpleExecutor(AsyncExecutor): + # Deprecated: will be removed soon, don't use it def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.semaphore = asyncio.Semaphore(kwargs.get('in_parallel', 100)) @@ -48,6 +50,7 @@ async def sem_task(f, args, kwargs): class AsyncioProgressbarExecutor(AsyncExecutor): + # Deprecated: will be removed soon, don't use it def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -71,6 +74,7 @@ async def track_task(task): class AsyncioProgressbarSemaphoreExecutor(AsyncExecutor): + # Deprecated: will be removed soon, don't use it def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.semaphore = asyncio.Semaphore(kwargs.get('in_parallel', 1)) @@ -174,3 +178,67 @@ async def _run(self, queries: Iterable[QueryDraft]): w.cancel() return self.results + + +class AsyncioQueueGeneratorExecutor: + # Deprecated: will be removed soon, don't use it + def __init__(self, *args, **kwargs): + self.workers_count = kwargs.get('in_parallel', 10) + self.queue = asyncio.Queue() + self.timeout = kwargs.get('timeout') + self.logger = kwargs['logger'] + self._results = asyncio.Queue() + self._stop_signal = object() + + async def worker(self): + """Process tasks from the queue and put results into the results queue.""" + while True: + task = await self.queue.get() + if task is self._stop_signal: + self.queue.task_done() + break + + try: + f, args, kwargs = task + query_future = f(*args, **kwargs) + query_task = create_task_func()(query_future) + + try: + result = await asyncio.wait_for(query_task, timeout=self.timeout) + except asyncio.TimeoutError: + result = kwargs.get('default') + await self._results.put(result) + except Exception as e: + self.logger.error(f"Error in worker: {e}") + finally: + self.queue.task_done() + + async def run(self, queries: Iterable[Callable[..., Any]]): + """Run workers to process queries in parallel.""" + start_time = time.time() + + # Add tasks to the queue + for t in queries: + await self.queue.put(t) + + # Create workers + workers = [ + asyncio.create_task(self.worker()) for _ in range(self.workers_count) + ] + + # Add stop signals + for _ in range(self.workers_count): + await self.queue.put(self._stop_signal) + + try: + while any(w.done() is False for w in workers) or not self._results.empty(): + try: + result = await asyncio.wait_for(self._results.get(), timeout=1) + yield result + except asyncio.TimeoutError: + pass + finally: + # Ensure all workers are awaited + await asyncio.gather(*workers) + self.execution_time = time.time() - start_time + self.logger.debug(f"Spent time: {self.execution_time}") diff --git a/maigret/maigret.py b/maigret/maigret.py index 69568b87..5c25382b 100755 --- a/maigret/maigret.py +++ b/maigret/maigret.py @@ -496,7 +496,9 @@ async def main(): if args.web is not None: from maigret.web.app import app - port = args.web if args.web else 5000 # args.web is either the specified port or 5000 by default + port = ( + args.web if args.web else 5000 + ) # args.web is either the specified port or 5000 by default app.run(port=port) return diff --git a/maigret/resources/data.json b/maigret/resources/data.json index 45e80d57..88407dde 100644 --- a/maigret/resources/data.json +++ b/maigret/resources/data.json @@ -7218,7 +7218,8 @@ "url": "https://gramho.com/explore-hashtag/{username}", "source": "Instagram", "usernameClaimed": "adam", - "usernameUnclaimed": "noonewouldeverusethis7" + "usernameUnclaimed": "noonewouldeverusethis7", + "disabled": true }, "Gravatar": { "tags": [ @@ -17476,7 +17477,7 @@ "method": "vimeo" }, "headers": { - "Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MzM5Njc3MjAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbCwianRpIjoiNGJkNDE4NzktM2VhOS00ZWRiLWIzZDUtNjAyNjQ3YjMyNTVhIn0.kPbKREujSfYsisyF0pS_HskTapRlHBfVLRw4cis1ezk" + "Authorization": "jwt eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MzQxMTc1NDAsInVzZXJfaWQiOm51bGwsImFwcF9pZCI6NTg0NzksInNjb3BlcyI6InB1YmxpYyIsInRlYW1fdXNlcl9pZCI6bnVsbCwianRpIjoiNDc4Y2ZhZGUtZjI0Yy00MDVkLTliYWItN2RlNGEzNGM4MzI5In0.guN7Fg8dqq7EYdckrJ-6Rdkj_5MOl6FaC4YUSOceDpU" }, "urlProbe": "https://api.vimeo.com/users/{username}?fields=name%2Cgender%2Cbio%2Curi%2Clink%2Cbackground_video%2Clocation_details%2Cpictures%2Cverified%2Cmetadata.public_videos.total%2Cavailable_for_hire%2Ccan_work_remotely%2Cmetadata.connections.videos.total%2Cmetadata.connections.albums.total%2Cmetadata.connections.followers.total%2Cmetadata.connections.following.total%2Cmetadata.public_videos.total%2Cmetadata.connections.vimeo_experts.is_enrolled%2Ctotal_collection_count%2Ccreated_time%2Cprofile_preferences%2Cmembership%2Cclients%2Cskills%2Cproject_types%2Crates%2Ccategories%2Cis_expert%2Cprofile_discovery%2Cwebsites%2Ccontact_emails&fetch_user_profile=1", "checkType": "status_code", diff --git a/maigret/submit.py b/maigret/submit.py index fb698e9b..fae40dd7 100644 --- a/maigret/submit.py +++ b/maigret/submit.py @@ -188,6 +188,7 @@ def extract_username_dialog(url): ) return entered_username if entered_username else supposed_username + # TODO: replace with checking.py/SimpleAiohttpChecker call @staticmethod async def get_html_response_to_compare( url: str, session: ClientSession = None, redirects=False, headers: Dict = None diff --git a/tests/test_executors.py b/tests/test_executors.py index e893773d..7a39897e 100644 --- a/tests/test_executors.py +++ b/tests/test_executors.py @@ -8,6 +8,7 @@ AsyncioProgressbarExecutor, AsyncioProgressbarSemaphoreExecutor, AsyncioProgressbarQueueExecutor, + AsyncioQueueGeneratorExecutor, ) logger = logging.getLogger(__name__) @@ -76,3 +77,35 @@ async def test_asyncio_progressbar_queue_executor(): assert await executor.run(tasks) == [0, 3, 6, 9, 1, 4, 7, 2, 5, 8] assert executor.execution_time > 0.2 assert executor.execution_time < 0.4 + + +@pytest.mark.asyncio +async def test_asyncio_queue_generator_executor(): + tasks = [(func, [n], {}) for n in range(10)] + + executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=2) + results = [result async for result in executor.run(tasks)] + assert results == [0, 1, 3, 2, 4, 6, 7, 5, 9, 8] + assert executor.execution_time > 0.5 + assert executor.execution_time < 0.6 + + executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=3) + results = [result async for result in executor.run(tasks)] + assert results == [0, 3, 1, 4, 6, 2, 7, 9, 5, 8] + assert executor.execution_time > 0.4 + assert executor.execution_time < 0.5 + + executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=5) + results = [result async for result in executor.run(tasks)] + assert results in ( + [0, 3, 6, 1, 4, 7, 9, 2, 5, 8], + [0, 3, 6, 1, 4, 9, 7, 2, 5, 8], + ) + assert executor.execution_time > 0.3 + assert executor.execution_time < 0.4 + + executor = AsyncioQueueGeneratorExecutor(logger=logger, in_parallel=10) + results = [result async for result in executor.run(tasks)] + assert results == [0, 3, 6, 9, 1, 4, 7, 2, 5, 8] + assert executor.execution_time > 0.2 + assert executor.execution_time < 0.3 diff --git a/tests/test_submit.py b/tests/test_submit.py index 25883002..a9e17a93 100644 --- a/tests/test_submit.py +++ b/tests/test_submit.py @@ -1,9 +1,8 @@ import pytest -from unittest.mock import AsyncMock, MagicMock, patch -from maigret.submit import Submitter, MaigretSite, MaigretEngine +from unittest.mock import MagicMock, patch +from maigret.submit import Submitter from aiohttp import ClientSession from maigret.sites import MaigretDatabase -from maigret.settings import Settings import logging