diff --git a/.github/workflows/pyaleph-ci.yml b/.github/workflows/pyaleph-ci.yml index b42e7cb5b..1ea752c53 100644 --- a/.github/workflows/pyaleph-ci.yml +++ b/.github/workflows/pyaleph-ci.yml @@ -69,7 +69,8 @@ jobs: sudo cp .github/openssl-ci.cnf /etc/ssl/openssl.cnf export OPENSSL_CONF=/etc/ssl/openssl.cnf touch config.yml # Fake config file for alembic - pytest -v . + # TODO: determine why ResourceWarning warnings occur in some tests. + pytest -Werror -Wignore::ResourceWarning -v . build: runs-on: ubuntu-22.04 needs: tests diff --git a/src/aleph/api_entrypoint.py b/src/aleph/api_entrypoint.py index deaf03d48..ed97ec57a 100644 --- a/src/aleph/api_entrypoint.py +++ b/src/aleph/api_entrypoint.py @@ -48,9 +48,9 @@ async def configure_aiohttp_app( # TODO: find a way to close the node cache when exiting the API process, not closing it causes # a warning. await node_cache.open() + # TODO: same, find a way to call await ipfs_service.close() on shutdown + ipfs_service = IpfsService.new(config) - ipfs_client = make_ipfs_client(config) - ipfs_service = IpfsService(ipfs_client=ipfs_client) storage_service = StorageService( storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value), ipfs_service=ipfs_service, diff --git a/src/aleph/chains/ethereum.py b/src/aleph/chains/ethereum.py index f2a5812b0..ea699e21c 100644 --- a/src/aleph/chains/ethereum.py +++ b/src/aleph/chains/ethereum.py @@ -1,10 +1,10 @@ import asyncio import functools +import importlib.resources import json import logging from typing import AsyncIterator, Dict, Tuple -import pkg_resources from aleph_message.models import Chain from configmanager import Config from eth_account import Account @@ -48,16 +48,16 @@ def get_web3(config) -> Web3: async def get_contract_abi(): - return json.loads( - pkg_resources.resource_string( - "aleph.chains", "assets/ethereum_sc_abi.json" - ).decode("utf-8") + contract_abi_resource = ( + importlib.resources.files("aleph.chains.assets") / "ethereum_sc_abi.json" ) + with contract_abi_resource.open("r", encoding="utf-8") as f: + return json.load(f) async def get_contract(config, web3: Web3): return web3.eth.contract( - config.ethereum.sync_contract.value, abi=await get_contract_abi() + address=config.ethereum.sync_contract.value, abi=await get_contract_abi() ) diff --git a/src/aleph/commands.py b/src/aleph/commands.py index b43fd089e..50fca476b 100644 --- a/src/aleph/commands.py +++ b/src/aleph/commands.py @@ -136,11 +136,10 @@ async def main(args: List[str]) -> None: mq_channel = await mq_conn.channel() node_cache = await init_node_cache(config) - async with node_cache: + async with node_cache, IpfsService.new(config) as ipfs_service: # Reset the cache await node_cache.reset() - ipfs_service = IpfsService(ipfs_client=make_ipfs_client(config)) storage_service = StorageService( storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value), ipfs_service=ipfs_service, diff --git a/src/aleph/db/accessors/files.py b/src/aleph/db/accessors/files.py index 0c7cae609..cea15418e 100644 --- a/src/aleph/db/accessors/files.py +++ b/src/aleph/db/accessors/files.py @@ -84,7 +84,7 @@ def insert_message_file_pin( def count_file_pins(session: DbSession, file_hash: str) -> int: select_count_stmt = select(func.count()).select_from( - select(FilePinDb).where(FilePinDb.file_hash == file_hash) + select(FilePinDb).where(FilePinDb.file_hash == file_hash).subquery() ) return session.execute(select_count_stmt).scalar_one() diff --git a/src/aleph/db/accessors/messages.py b/src/aleph/db/accessors/messages.py index 4a764601c..0f88caa86 100644 --- a/src/aleph/db/accessors/messages.py +++ b/src/aleph/db/accessors/messages.py @@ -181,7 +181,7 @@ def count_matching_messages( include_confirmations=False, page=1, pagination=0, - ) + ).subquery() select_count_stmt = select(func.count()).select_from(select_stmt) return session.execute(select_count_stmt).scalar_one() diff --git a/src/aleph/db/accessors/posts.py b/src/aleph/db/accessors/posts.py index 50a1e72d4..eb30e3114 100644 --- a/src/aleph/db/accessors/posts.py +++ b/src/aleph/db/accessors/posts.py @@ -315,10 +315,10 @@ def count_matching_posts( pagination=0, start_date=start_date, end_date=end_date, - ) + ).subquery() else: # Without filters, counting the number of original posts is faster. - select_stmt = select(PostDb).where(PostDb.amends.is_(None)) + select_stmt = select(PostDb).where(PostDb.amends.is_(None)).subquery() select_count_stmt = select(func.count()).select_from(select_stmt) return session.execute(select_count_stmt).scalar_one() diff --git a/src/aleph/jobs/fetch_pending_messages.py b/src/aleph/jobs/fetch_pending_messages.py index bc3e4d6db..719469f3f 100644 --- a/src/aleph/jobs/fetch_pending_messages.py +++ b/src/aleph/jobs/fetch_pending_messages.py @@ -177,9 +177,7 @@ async def fetch_messages_task(config: Config): async with NodeCache( redis_host=config.redis.host.value, redis_port=config.redis.port.value - ) as node_cache: - ipfs_client = make_ipfs_client(config) - ipfs_service = IpfsService(ipfs_client=ipfs_client) + ) as node_cache, IpfsService.new(config) as ipfs_service: storage_service = StorageService( storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value), ipfs_service=ipfs_service, diff --git a/src/aleph/jobs/process_pending_messages.py b/src/aleph/jobs/process_pending_messages.py index c825062e7..6e45a4214 100644 --- a/src/aleph/jobs/process_pending_messages.py +++ b/src/aleph/jobs/process_pending_messages.py @@ -20,7 +20,6 @@ from aleph.handlers.message_handler import MessageHandler from aleph.services.cache.node_cache import NodeCache from aleph.services.ipfs import IpfsService -from aleph.services.ipfs.common import make_ipfs_client from aleph.services.storage.fileystem_engine import FileSystemStorageEngine from aleph.storage import StorageService from aleph.toolkit.logging import setup_logging @@ -156,9 +155,7 @@ async def fetch_and_process_messages_task(config: Config): async with NodeCache( redis_host=config.redis.host.value, redis_port=config.redis.port.value - ) as node_cache: - ipfs_client = make_ipfs_client(config) - ipfs_service = IpfsService(ipfs_client=ipfs_client) + ) as node_cache, IpfsService.new(config) as ipfs_service: storage_service = StorageService( storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value), ipfs_service=ipfs_service, diff --git a/src/aleph/jobs/process_pending_txs.py b/src/aleph/jobs/process_pending_txs.py index 77f609377..be352a36b 100644 --- a/src/aleph/jobs/process_pending_txs.py +++ b/src/aleph/jobs/process_pending_txs.py @@ -131,9 +131,7 @@ async def handle_txs_task(config: Config): async with NodeCache( redis_host=config.redis.host.value, redis_port=config.redis.port.value - ) as node_cache: - ipfs_client = make_ipfs_client(config) - ipfs_service = IpfsService(ipfs_client=ipfs_client) + ) as node_cache, IpfsService.new(config) as ipfs_service: storage_service = StorageService( storage_engine=FileSystemStorageEngine(folder=config.storage.folder.value), ipfs_service=ipfs_service, diff --git a/src/aleph/services/cache/node_cache.py b/src/aleph/services/cache/node_cache.py index 24d6d8842..48c68dfe2 100644 --- a/src/aleph/services/cache/node_cache.py +++ b/src/aleph/services/cache/node_cache.py @@ -39,7 +39,7 @@ async def __aenter__(self): async def close(self): if self.redis_client: - await self.redis_client.close() + await self.redis_client.aclose() self._redis_client = None async def __aexit__(self, exc_type, exc_val, exc_tb): diff --git a/src/aleph/services/ipfs/service.py b/src/aleph/services/ipfs/service.py index 7d7bbf827..0c3cd33cf 100644 --- a/src/aleph/services/ipfs/service.py +++ b/src/aleph/services/ipfs/service.py @@ -2,11 +2,13 @@ import concurrent import json import logging -from typing import IO, Optional, Union, Dict +from typing import IO, Optional, Union, Dict, Self import aiohttp import aioipfs +from configmanager import Config +from aleph.services.ipfs.common import make_ipfs_client from aleph.services.utils import get_IP from aleph.types.message_status import FileUnavailable from aleph.utils import run_in_executor @@ -20,6 +22,20 @@ class IpfsService: def __init__(self, ipfs_client: aioipfs.AsyncIPFS): self.ipfs_client = ipfs_client + @classmethod + def new(cls, config: Config) -> Self: + ipfs_client = make_ipfs_client(config) + return cls(ipfs_client=ipfs_client) + + async def __aenter__(self): + return self + + async def close(self): + await self.ipfs_client.close() + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() + async def connect(self, peer: str) -> Dict: return await self.ipfs_client.swarm.connect(peer) diff --git a/src/aleph/services/utils.py b/src/aleph/services/utils.py index 4b0513552..9d5472861 100644 --- a/src/aleph/services/utils.py +++ b/src/aleph/services/utils.py @@ -19,7 +19,7 @@ async def get_ip4_from_service() -> str: async with aiohttp.ClientSession() as session: async with session.get(IP4_SERVICE_URL) as resp: resp.raise_for_status() - ip = await resp.text() + ip = await resp.text(encoding="utf-8") if is_valid_ip4(ip): return ip diff --git a/src/aleph/web/__init__.py b/src/aleph/web/__init__.py index bc363c9c9..216087a6b 100644 --- a/src/aleph/web/__init__.py +++ b/src/aleph/web/__init__.py @@ -31,8 +31,8 @@ def init_cors(app: web.Application): cors.add(route) -def create_aiohttp_app(debug: bool = False) -> web.Application: - app = web.Application(client_max_size=1024**2 * 64, debug=debug) +def create_aiohttp_app() -> web.Application: + app = web.Application(client_max_size=1024**2 * 64) tpl_path = pkg_resources.resource_filename("aleph.web", "templates") jinja_loader = jinja2.ChoiceLoader( diff --git a/src/aleph/web/controllers/accounts.py b/src/aleph/web/controllers/accounts.py index 23dde87ef..687275641 100644 --- a/src/aleph/web/controllers/accounts.py +++ b/src/aleph/web/controllers/accounts.py @@ -58,7 +58,7 @@ async def addresses_stats_view(request: web.Request): def _get_address_from_request(request: web.Request) -> str: address = request.match_info.get("address") if address is None: - raise web.HTTPUnprocessableEntity(body="Address must be specified.") + raise web.HTTPUnprocessableEntity(text="Address must be specified.") return address @@ -88,7 +88,7 @@ async def get_account_files(request: web.Request) -> web.Response: try: query_params = GetAccountFilesQueryParams.parse_obj(request.query) except ValidationError as e: - raise web.HTTPUnprocessableEntity(body=e.json(indent=4)) + raise web.HTTPUnprocessableEntity(text=e.json(indent=4)) session_factory: DbSessionFactory = get_session_factory_from_request(request) diff --git a/src/aleph/web/controllers/aggregates.py b/src/aleph/web/controllers/aggregates.py index 0d996d3f3..8c77e2dfa 100644 --- a/src/aleph/web/controllers/aggregates.py +++ b/src/aleph/web/controllers/aggregates.py @@ -1,6 +1,6 @@ -import logging import datetime as dt -from typing import List, Optional, Any, Dict, Tuple, Literal, Sequence +import logging +from typing import List, Optional, Dict from aiohttp import web from pydantic import BaseModel, validator, ValidationError @@ -67,7 +67,7 @@ async def address_aggregate(request: web.Request) -> web.Response: ) if not aggregates: - return web.HTTPNotFound(text="No aggregate found for this address") + raise web.HTTPNotFound(text="No aggregate found for this address") output = { "address": address, diff --git a/src/aleph/web/controllers/messages.py b/src/aleph/web/controllers/messages.py index 47d49655a..b25f84762 100644 --- a/src/aleph/web/controllers/messages.py +++ b/src/aleph/web/controllers/messages.py @@ -213,7 +213,7 @@ async def view_messages_list(request: web.Request) -> web.Response: try: query_params = MessageQueryParams.parse_obj(request.query) except ValidationError as e: - raise web.HTTPUnprocessableEntity(body=e.json(indent=4)) + raise web.HTTPUnprocessableEntity(text=e.json(indent=4)) # If called from the messages/page/{page}.json endpoint, override the page # parameters with the URL one @@ -358,7 +358,7 @@ async def messages_ws(request: web.Request) -> web.WebSocketResponse: try: query_params = WsMessageQueryParams.parse_obj(request.query) except ValidationError as e: - raise web.HTTPUnprocessableEntity(body=e.json(indent=4)) + raise web.HTTPUnprocessableEntity(text=e.json(indent=4)) history = query_params.history @@ -482,7 +482,7 @@ async def view_message(request: web.Request): try: item_hash = ItemHash(item_hash_str) except ValueError: - raise web.HTTPUnprocessableEntity(body=f"Invalid message hash: {item_hash_str}") + raise web.HTTPUnprocessableEntity(text=f"Invalid message hash: {item_hash_str}") session_factory: DbSessionFactory = request.app["session_factory"] with session_factory() as session: diff --git a/src/aleph/web/controllers/p2p.py b/src/aleph/web/controllers/p2p.py index 4bbb54ac6..a1b308e0c 100644 --- a/src/aleph/web/controllers/p2p.py +++ b/src/aleph/web/controllers/p2p.py @@ -127,7 +127,7 @@ async def pub_message(request: web.Request): try: request_data = PubMessageRequest.parse_obj(await request.json()) except ValidationError as e: - raise web.HTTPUnprocessableEntity(body=e.json(indent=4)) + raise web.HTTPUnprocessableEntity(text=e.json(indent=4)) except ValueError: # Body must be valid JSON raise web.HTTPUnprocessableEntity() diff --git a/src/aleph/web/controllers/posts.py b/src/aleph/web/controllers/posts.py index 16014b5a8..70cbd57f9 100644 --- a/src/aleph/web/controllers/posts.py +++ b/src/aleph/web/controllers/posts.py @@ -170,7 +170,7 @@ def get_query_params(request: web.Request) -> PostQueryParams: try: query_params = PostQueryParams.parse_obj(request.query) except ValidationError as e: - raise web.HTTPUnprocessableEntity(body=e.json(indent=4)) + raise web.HTTPUnprocessableEntity(text=e.json(indent=4)) path_page = get_path_page(request) if path_page: @@ -228,7 +228,7 @@ async def view_posts_list_v1(request) -> web.Response: try: query_params = PostQueryParams.parse_obj(request.query) except ValidationError as e: - raise web.HTTPUnprocessableEntity(body=e.json(indent=4)) + raise web.HTTPUnprocessableEntity(text=e.json(indent=4)) path_page = get_path_page(request) if path_page: diff --git a/src/aleph/web/controllers/utils.py b/src/aleph/web/controllers/utils.py index 4762bd472..ce1d13b82 100644 --- a/src/aleph/web/controllers/utils.py +++ b/src/aleph/web/controllers/utils.py @@ -247,7 +247,7 @@ def validate_message_dict(message_dict: Mapping[str, Any]) -> BasePendingMessage try: return parse_message(message_dict) except InvalidMessageException as e: - raise web.HTTPUnprocessableEntity(body=str(e)) + raise web.HTTPUnprocessableEntity(text=str(e)) class PublicationStatus(BaseModel): diff --git a/tests/api/test_list_messages.py b/tests/api/test_list_messages.py index 69b4534f1..8e81f8ad7 100644 --- a/tests/api/test_list_messages.py +++ b/tests/api/test_list_messages.py @@ -119,6 +119,7 @@ async def test_get_messages_filter_by_chain(fixture_messages, ccn_api_client): @pytest.mark.asyncio async def test_get_messages_filter_invalid_chain(fixture_messages, ccn_api_client): response = await fetch_messages_by_chain(api_client=ccn_api_client, chain="2CHAINZ") + text = await response.text() assert response.status == 422, await response.text() diff --git a/tests/api/test_storage.py b/tests/api/test_storage.py index 1d37cfb74..fc97c93a9 100644 --- a/tests/api/test_storage.py +++ b/tests/api/test_storage.py @@ -6,10 +6,10 @@ import aiohttp import orjson import pytest -import requests +import pytest_asyncio from aleph_message.models import ItemHash, Chain +from in_memory_storage_engine import InMemoryStorageEngine -from aleph.chains.connector import ChainConnector from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.accessors.files import get_file from aleph.db.models import AlephBalanceDb @@ -19,7 +19,6 @@ from aleph.types.message_status import MessageStatus from aleph.web.controllers.app_state_getters import APP_STATE_SIGNATURE_VERIFIER, APP_STATE_STORAGE_SERVICE from aleph.web.controllers.utils import BroadcastStatus, PublicationStatus -from in_memory_storage_engine import InMemoryStorageEngine IPFS_ADD_FILE_URI = "/api/v0/ipfs/add_file" IPFS_ADD_JSON_URI = "/api/v0/ipfs/add_json" @@ -60,8 +59,8 @@ } -@pytest.fixture -def api_client(ccn_api_client, mocker): +@pytest_asyncio.fixture +async def api_client(ccn_test_aiohttp_app, mocker, aiohttp_client): ipfs_service = mocker.AsyncMock() ipfs_service.add_bytes = mocker.AsyncMock(return_value=EXPECTED_FILE_CID) ipfs_service.add_file = mocker.AsyncMock( @@ -82,15 +81,15 @@ def api_client(ccn_api_client, mocker): } ) - ccn_api_client.app[APP_STATE_STORAGE_SERVICE] = StorageService( + ccn_test_aiohttp_app[APP_STATE_STORAGE_SERVICE] = StorageService( storage_engine=InMemoryStorageEngine(files={}), ipfs_service=ipfs_service, node_cache=mocker.AsyncMock(), ) + ccn_test_aiohttp_app[APP_STATE_SIGNATURE_VERIFIER] = SignatureVerifier() - ccn_api_client.app[APP_STATE_SIGNATURE_VERIFIER] = SignatureVerifier() - - return ccn_api_client + client = await aiohttp_client(ccn_test_aiohttp_app) + return client async def add_file( diff --git a/tests/chains/test_ethereum.py b/tests/chains/test_ethereum.py new file mode 100644 index 000000000..506a37e98 --- /dev/null +++ b/tests/chains/test_ethereum.py @@ -0,0 +1,18 @@ +import pytest +from configmanager import Config +from web3 import Web3 + +from aleph.chains.ethereum import get_contract + + +@pytest.fixture +def web3(): + return Web3() + + +@pytest.mark.asyncio +async def test_get_contract(mock_config: Config, web3: Web3): + contract = await get_contract(config=mock_config, web3=web3) + # The type hint provided by the web3 library is clearly wrong. This is a simple check + # to ensure that we get a proper web3 object. Improve as needed. + assert contract.w3 == web3 diff --git a/tests/conftest.py b/tests/conftest.py index 67b333930..3f1c4cb1b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,10 +1,12 @@ import asyncio import contextlib +import datetime as dt import json import logging import os import shutil import sys +from decimal import Decimal from pathlib import Path from typing import Protocol, List @@ -43,8 +45,12 @@ from aleph.types.files import FileType, FileTag from aleph.types.message_status import MessageStatus from aleph.web import create_aiohttp_app -from decimal import Decimal -import datetime as dt +from aleph.web.controllers.app_state_getters import ( + APP_STATE_CONFIG, + APP_STATE_P2P_CLIENT, + APP_STATE_STORAGE_SERVICE, + APP_STATE_SESSION_FACTORY, +) # Add the helpers to the PYTHONPATH. # Note: mark the "helpers" directory as a source directory to tell PyCharm @@ -115,7 +121,6 @@ async def node_cache(mock_config: Config): yield node_cache - @pytest_asyncio.fixture async def test_storage_service(mock_config: Config, mocker) -> StorageService: data_folder = Path("./data") @@ -126,31 +131,37 @@ async def test_storage_service(mock_config: Config, mocker) -> StorageService: data_folder.mkdir(parents=True) storage_engine = FileSystemStorageEngine(folder=data_folder) - ipfs_client = make_ipfs_client(mock_config) - ipfs_service = IpfsService(ipfs_client=ipfs_client) - storage_service = StorageService( - storage_engine=storage_engine, - ipfs_service=ipfs_service, - node_cache=mocker.AsyncMock(), - ) - return storage_service + async with IpfsService.new(mock_config) as ipfs_service: + storage_service = StorageService( + storage_engine=storage_engine, + ipfs_service=ipfs_service, + node_cache=mocker.AsyncMock(), + ) + yield storage_service -@pytest_asyncio.fixture -async def ccn_api_client( - mocker, aiohttp_client, mock_config, session_factory: DbSessionFactory -): + +@pytest.fixture +def ccn_test_aiohttp_app(mocker, mock_config, session_factory): # Make aiohttp return the stack trace on 500 errors event_loop = asyncio.get_event_loop() event_loop.set_debug(True) - app = create_aiohttp_app(debug=True) - app["config"] = mock_config - app["p2p_client"] = mocker.AsyncMock() - app["storage_service"] = mocker.AsyncMock() - app["session_factory"] = session_factory - client = await aiohttp_client(app) + app = create_aiohttp_app() + app[APP_STATE_CONFIG] = mock_config + app[APP_STATE_P2P_CLIENT] = mocker.AsyncMock() + app[APP_STATE_STORAGE_SERVICE] = mocker.AsyncMock() + app[APP_STATE_SESSION_FACTORY] = session_factory + + return app + +@pytest_asyncio.fixture +async def ccn_api_client( + aiohttp_client, + ccn_test_aiohttp_app, +): + client = await aiohttp_client(ccn_test_aiohttp_app) return client