From 19df1e4169c8afe070059296c1b23ca1b40d4343 Mon Sep 17 00:00:00 2001 From: 1yam <40899431+1yam@users.noreply.github.com> Date: Mon, 11 Dec 2023 10:32:23 +0100 Subject: [PATCH] Feature: New Metrics endpoint for CRN & CNN (#528) * Feature: New metric endpoint for CRN & CNN * Internal: Unit Test for metric endpoint * Fix: Remove unused import * Refactor: Unit Test & remove too big message content for unit * Fix: mypy error * Refactor: easier way to parse sql query * Fix typo --------- Co-authored-by: Mike Hukiewitz <70762838+MHHukiewitz@users.noreply.github.com> --- .../0020_08602db6c78f_ccn_metric_view.py | 74 ++++++++ src/aleph/db/accessors/metrics.py | 124 ++++++++++++ src/aleph/web/controllers/main.py | 75 +++++++- src/aleph/web/controllers/routes.py | 7 +- tests/api/fixtures/test-metric.json | 178 ++++++++++++++++++ tests/api/test_new_metric.py | 119 ++++++++++++ 6 files changed, 572 insertions(+), 5 deletions(-) create mode 100644 deployment/migrations/versions/0020_08602db6c78f_ccn_metric_view.py create mode 100644 src/aleph/db/accessors/metrics.py create mode 100644 tests/api/fixtures/test-metric.json create mode 100644 tests/api/test_new_metric.py diff --git a/deployment/migrations/versions/0020_08602db6c78f_ccn_metric_view.py b/deployment/migrations/versions/0020_08602db6c78f_ccn_metric_view.py new file mode 100644 index 000000000..cc17b41dd --- /dev/null +++ b/deployment/migrations/versions/0020_08602db6c78f_ccn_metric_view.py @@ -0,0 +1,74 @@ +"""ccn_metric_view + +Revision ID: 08602db6c78f +Revises: 3bf484f2cc95 +Create Date: 2023-11-27 17:03:28.184344 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "08602db6c78f" +down_revision = "3bf484f2cc95" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.execute( + """ + create or replace view ccn_metric_view AS + WITH json_data AS ( + SELECT + item_hash, + jsonb_array_elements(content->'content'->'metrics'->'ccn') as ccn_data + FROM messages + WHERE channel = 'aleph-scoring' AND sender = '0x4D52380D3191274a04846c89c069E6C3F2Ed94e4' + ) + SELECT + item_hash, + (ccn_data->>'measured_at')::float as measured_at, + ccn_data->>'node_id' as node_id, + (ccn_data->>'base_latency')::float as base_latency, + (ccn_data->>'metrics_latency')::float as metrics_latency, + (ccn_data->>'aggregate_latency')::float as aggregate_latency, + (ccn_data->>'base_latency_ipv4')::float as base_latency_ipv4, + (ccn_data->>'file_download_latency')::float as file_download_latency, + (ccn_data->>'pending_messages')::int as pending_messages, + (ccn_data->>'eth_height_remaining')::int as eth_height_remaining + FROM json_data; + """ + ) + + op.execute( + """ + create or replace view crn_metric_view AS + WITH json_data AS ( + SELECT + item_hash, + jsonb_array_elements(content->'content'->'metrics'->'crn') as crn_data + FROM messages + WHERE channel = 'aleph-scoring' AND sender = '0x4D52380D3191274a04846c89c069E6C3F2Ed94e4' + ) + SELECT + item_hash as item_hash, + (crn_data->>'measured_at')::float as measured_at, + crn_data->>'node_id' as node_id, + (crn_data->>'base_latency')::float as base_latency, + (crn_data->>'base_latency_ipv4')::float as base_latency_ipv4, + (crn_data->>'full_check_latency')::float as full_check_latency, + (crn_data->>'diagnostic_vm_latency')::float as diagnostic_vm_latency + FROM json_data; + """ + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.execute("DROP VIEW ccn_metric_view;") + op.execute("DROP VIEW crn_metric_view;") + + # ### end Alembic commands ### diff --git a/src/aleph/db/accessors/metrics.py b/src/aleph/db/accessors/metrics.py new file mode 100644 index 000000000..045ebf59f --- /dev/null +++ b/src/aleph/db/accessors/metrics.py @@ -0,0 +1,124 @@ +from typing import Optional + +from sqlalchemy import select, text +from sqlalchemy.orm.session import Session + +from aleph.types.db_session import DbSession + + +def _parse_ccn_result(result): + keys = [ + "item_hash", + "measured_at", + "base_latency", + "base_latency_ipv4", + "metrics_latency", + "aggregate_latency", + "file_download_latency", + "pending_messages", + "eth_height_remaining", + ] + + # Transpose the result and create a dictionary + result_dict = {key: list(values) for key, values in zip(keys, zip(*result))} + + return result_dict + + +def _parse_crn_result(result): + keys = [ + "item_hash", + "measured_at", + "base_latency", + "base_latency_ipv4", + "full_check_latency", + "diagnostic_vm_latency", + ] + + # Transpose the result and create a dictionary + result_dict = {key: list(values) for key, values in zip(keys, zip(*result))} + + return result_dict + + +def _build_metric_filter(select_stmt, node_id, start_date, end_date, sort_order): + if node_id: + select_stmt = select_stmt.where(text("node_id = :node_id")).params( + node_id=node_id + ) + if start_date: + select_stmt = select_stmt.where(text("measured_at >= :start_date")).params( + start_date=start_date + ) + if end_date: + select_stmt = select_stmt.where(text("measured_at <= :end_date")).params( + end_date=end_date + ) + if sort_order: + select_stmt = select_stmt.order_by(text(f"measured_at {sort_order}")) + return select_stmt + + +def query_metric_ccn( + session: Session, + node_id: Optional[str] = None, + start_date: Optional[float] = None, + end_date: Optional[float] = None, + sort_order: Optional[str] = None, +): + select_stmt = select( + [ + text("item_hash"), + text("measured_at"), + text("base_latency"), + text("base_latency_ipv4"), + text("metrics_latency"), + text("aggregate_latency"), + text("file_download_latency"), + text("pending_messages"), + text("eth_height_remaining"), + ] + ).select_from(text("ccn_metric_view")) + + select_stmt = _build_metric_filter( + select_stmt=select_stmt, + node_id=node_id, + start_date=start_date, + end_date=end_date, + sort_order=sort_order, + ) + + result = session.execute(select_stmt).fetchall() + + return _parse_ccn_result(result=result) + + +def query_metric_crn( + session: DbSession, + node_id: str, + start_date: Optional[float] = None, + end_date: Optional[float] = None, + sort_order: Optional[str] = None, +): + select_stmt = select( + [ + text("item_hash"), + text("measured_at"), + text("base_latency"), + text("base_latency_ipv4"), + text("full_check_latency"), + text("diagnostic_vm_latency"), + ] + ).select_from(text("crn_metric_view")) + + select_stmt = _build_metric_filter( + select_stmt=select_stmt, + node_id=node_id, + start_date=start_date, + end_date=end_date, + sort_order=sort_order, + ) + + result = session.execute(select_stmt).fetchall() + + return _parse_crn_result(result=result) diff --git a/src/aleph/web/controllers/main.py b/src/aleph/web/controllers/main.py index 8088926d9..7fcb7aa8e 100644 --- a/src/aleph/web/controllers/main.py +++ b/src/aleph/web/controllers/main.py @@ -1,13 +1,18 @@ import asyncio import logging from dataclasses import asdict -from typing import Dict +from typing import Dict, Optional import aiohttp_jinja2 from aiohttp import web +from pydantic import BaseModel +from aleph.db.accessors.metrics import query_metric_ccn, query_metric_crn from aleph.types.db_session import DbSessionFactory -from aleph.web.controllers.app_state_getters import get_node_cache_from_request, get_session_factory_from_request +from aleph.web.controllers.app_state_getters import ( + get_node_cache_from_request, + get_session_factory_from_request, +) from aleph.web.controllers.metrics import format_dataclass_for_prometheus, get_metrics logger = logging.getLogger(__name__) @@ -74,3 +79,69 @@ async def metrics_json(request: web.Request) -> web.Response: text=(await get_metrics(session=session, node_cache=node_cache)).to_json(), content_type="application/json", ) + + +class Metrics(BaseModel): + start_date: Optional[float] = None + end_date: Optional[float] = None + sort: Optional[str] = None + + +def _get_node_id_from_request(request: web.Request) -> str: + address = request.match_info.get("node_id") + if address is None: + raise web.HTTPUnprocessableEntity(body="node_id must be specified.") + return address + + +async def ccn_metric(request: web.Request) -> web.Response: + """Fetch metrics for CCN node id""" + + session_factory: DbSessionFactory = get_session_factory_from_request(request) + query_params = Metrics.parse_obj(request.query) + + node_id = _get_node_id_from_request(request) + + with session_factory() as session: + ccn = query_metric_ccn( + session, + node_id=node_id, + start_date=query_params.start_date, + end_date=query_params.end_date, + sort_order=query_params.sort, + ) + if not ccn: + raise web.HTTPNotFound() + + if not ccn["item_hash"]: + raise web.HTTPNotFound() + + result = {"metrics": ccn} + return web.json_response(result) + + +async def crn_metric(request: web.Request) -> web.Response: + """Fetch Metric for crn.""" + + session_factory: DbSessionFactory = get_session_factory_from_request(request) + query_params = Metrics.parse_obj(request.query) + + node_id = _get_node_id_from_request(request) + + with session_factory() as session: + crn = query_metric_crn( + session, + node_id=node_id, + start_date=query_params.start_date, + end_date=query_params.end_date, + sort_order=query_params.sort, + ) + + if not crn: + raise web.HTTPNotFound() + + if not crn["item_hash"]: + raise web.HTTPNotFound() + + result = {"metrics": crn} + return web.json_response(result) diff --git a/src/aleph/web/controllers/routes.py b/src/aleph/web/controllers/routes.py index 75f490c15..e09ebd0fe 100644 --- a/src/aleph/web/controllers/routes.py +++ b/src/aleph/web/controllers/routes.py @@ -28,6 +28,9 @@ def register_routes(app: web.Application): app.router.add_get("/metrics", main.metrics) app.router.add_get("/metrics.json", main.metrics_json) + app.router.add_get("/api/v0/core/{node_id}/metrics", main.ccn_metric) + app.router.add_get("/api/v0/compute/{node_id}/metrics", main.crn_metric) + app.router.add_get( "/api/v0/aggregates/{address}.json", aggregates.address_aggregate ) @@ -58,9 +61,7 @@ def register_routes(app: web.Application): app.router.add_get( "/api/v0/addresses/{address}/balance", accounts.get_account_balance ) - app.router.add_get( - "/api/v0/addresses/{address}/files", accounts.get_account_files - ) + app.router.add_get("/api/v0/addresses/{address}/files", accounts.get_account_files) app.router.add_post("/api/v0/ipfs/add_json", storage.add_ipfs_json_controller) app.router.add_post("/api/v0/storage/add_json", storage.add_storage_json_controller) diff --git a/tests/api/fixtures/test-metric.json b/tests/api/fixtures/test-metric.json new file mode 100644 index 000000000..9e18ec5dd --- /dev/null +++ b/tests/api/fixtures/test-metric.json @@ -0,0 +1,178 @@ +[ + { + "item_hash": "56c82c6d3b28b76456594b4b57154b6826a6d5fb97d355d0428e5ca7d08193b9", + "type": "POST", + "chain": "ETH", + "sender": "0x4D52380D3191274a04846c89c069E6C3F2Ed94e4", + "signature": "0x40e307f1645718dc245a8ed754e1f652ce60be914cae780eb73fbe164a5372de3c2e631dbc4cfa7c548d85d1d0f935defa332f116f9226b6fc4ecc53f2edd2901b", + "item_type": "storage", + "item_content": null, + "content": { + "time": 1701261226.1406364, + "type": "aleph-network-metrics", + "address": "0x4D52380D3191274a04846c89c069E6C3F2Ed94e4", + "content": { + "tags": ["mainnet"], + "metrics": { + "ccn": [ + { + "asn": 16276, + "url": "http://142.4.199.242:4024/", + "as_name": "OVH, FR", + "node_id": "b8b9104da69c54e58531212234fa31f49ef4c668a39a0bf6793322407857b821", + "version": "v0.5.1", + "txs_total": 9, + "measured_at": 1701261022.881753, + "base_latency": 0.2345137596130371, + "metrics_latency": 0.39575982093811035, + "pending_messages": 0, + "aggregate_latency": 0.12717533111572266, + "base_latency_ipv4": 0.2345137596130371, + "eth_height_remaining": 224, + "file_download_latency": 0.22372746467590332 + }, + { + "asn": 24940, + "url": "http://65.109.36.183:4024/", + "as_name": "HETZNER-AS, DE", + "node_id": "2e7cd67ff8f556b0b3fb8a2ef8ab0e8e1466cfa279dd7b9bfbc8aba92e0c5672", + "version": "v0.5.1", + "txs_total": 0, + "measured_at": 1701261021.777543, + "base_latency": 0.10312747955322266, + "metrics_latency": 0.3111591339111328, + "pending_messages": 0, + "aggregate_latency": 0.06761360168457031, + "base_latency_ipv4": 0.10312747955322266, + "eth_height_remaining": 248, + "file_download_latency": 0.10559391975402832 + } + ], + "crn": [ + { + "asn": 21409, + "url": "https://rnode01.canucks.icu/", + "as_name": "IKOULA, FR", + "node_id": "5adbe916593ff10ace96d4d9841a8532f81c07bf8905c78178e64639a61685f5", + "version": "0.3.0", + "measured_at": 1701261121.356539, + "base_latency": 0.14533090591430664, + "base_latency_ipv4": 0.13749980926513672, + "full_check_latency": 0.4012613296508789, + "diagnostic_vm_latency": 0.07122135162353516 + }, + { + "asn": 24940, + "url": "https://kimchibitfwd4rcn11.bitfwd.xyz/", + "as_name": "HETZNER-AS, DE", + "node_id": "d491f38ec66fe23a9c9ad398a04fd4dcb44a115b948ef612db844caea85cd59a", + "version": "0.3.0", + "measured_at": 1701261122.643961, + "base_latency": 0.03546738624572754, + "base_latency_ipv4": 0.05208730697631836, + "full_check_latency": 0.6230778694152832, + "diagnostic_vm_latency": 0.05135464668273926 + } + ], + "server": "151.115.63.76", + "server_asn": 12876, + "server_as_name": "Online SAS, FR" + }, + "version": "1.0" + } + }, + "time": 1701261226.148742, + "channel": "aleph-scoring", + "size": 126038, + "confirmations": [], + "confirmed": false + }, + { + "item_hash": "172bab8f624fff1be70a19fecd45ff51fa4f833a34074451c7d79ece19bf37f0", + "type": "POST", + "chain": "ETH", + "sender": "0x4D52380D3191274a04846c89c069E6C3F2Ed94e4", + "signature": "0xb6ef922442f25c71a9ce60e3fbd92115648d2d312e8e93e02f3af254b69ef4bc6f6f02f027500e2248d5174d8d6a9cfedca2c59c2d188a2213e5b13cf74b7ea01c", + "item_type": "storage", + "item_content": null, + "content": { + "time": 1701264226.1443472, + "type": "aleph-network-metrics", + "address": "0x4D52380D3191274a04846c89c069E6C3F2Ed94e4", + "content": { + "tags": ["mainnet"], + "metrics": { + "ccn": [ + { + "asn": 201814, + "url": "http://193.34.212.124:4024/", + "as_name": "MEVSPACE, PL", + "node_id": "b8b9104da69c54e58531212234fa31f49ef4c668a39a0bf6793322407857b821", + "version": "v0.5.1", + "txs_total": 0, + "measured_at": 1701264026.243765, + "base_latency": 0.009937286376953125, + "metrics_latency": 0.43869686126708984, + "pending_messages": 0, + "aggregate_latency": 0.014438152313232422, + "base_latency_ipv4": 0.009937286376953125, + "eth_height_remaining": 4672, + "file_download_latency": 0.007421016693115234 + }, + { + "asn": 28753, + "url": "http://212.95.53.162:4024/", + "as_name": "LEASEWEB-DE-FRA-10, DE", + "node_id": "2e7cd67ff8f556b0b3fb8a2ef8ab0e8e1466cfa279dd7b9bfbc8aba92e0c5672", + "version": "v0.5.1", + "txs_total": 0, + "measured_at": 1701264002.795776, + "base_latency": 0.04943251609802246, + "metrics_latency": 1.2472684383392334, + "pending_messages": 0, + "aggregate_latency": 0.06596589088439941, + "base_latency_ipv4": 0.04943251609802246, + "eth_height_remaining": 156, + "file_download_latency": 0.055548906326293945 + } + ], + "crn": [ + { + "asn": 24940, + "url": "https://aleph8.serverrg.eu/", + "as_name": "HETZNER-AS, DE", + "node_id": "5adbe916593ff10ace96d4d9841a8532f81c07bf8905c78178e64639a61685f5", + "version": "0.3.0", + "measured_at": 1701264120.645951, + "base_latency": 0.1416456699371338, + "base_latency_ipv4": 0.15400004386901855, + "full_check_latency": 0.5233616828918457, + "diagnostic_vm_latency": 0.06812047958374023 + }, + { + "asn": 63023, + "url": "https://crabble.xyz/", + "as_name": "AS-GLOBALTELEHOST, US", + "node_id": "d491f38ec66fe23a9c9ad398a04fd4dcb44a115b948ef612db844caea85cd59a", + "version": "0.3.0", + "measured_at": 1701264107.823557, + "base_latency": 0.11734175682067871, + "base_latency_ipv4": 0.37552332878112793, + "full_check_latency": 1.1796252727508545, + "diagnostic_vm_latency": 0.14794635772705078 + } + ], + "server": "151.115.63.76", + "server_asn": 12876, + "server_as_name": "Online SAS, FR" + }, + "version": "1.0" + } + }, + "time": 1701264226.153022, + "channel": "aleph-scoring", + "size": 126050, + "confirmations": [], + "confirmed": false + } +] diff --git a/tests/api/test_new_metric.py b/tests/api/test_new_metric.py new file mode 100644 index 000000000..37f88a3f0 --- /dev/null +++ b/tests/api/test_new_metric.py @@ -0,0 +1,119 @@ +from typing import Any, Dict, Sequence + +import pytest +import pytest_asyncio + +from aleph.types.db_session import DbSessionFactory + +from .conftest import _load_fixtures + + +def _generate_uri(node_type: str, node_id: str) -> str: + return f"/api/v0/{node_type}/{node_id}/metrics" + + +@pytest_asyncio.fixture +async def fixture_metrics_messages( + session_factory: DbSessionFactory, +) -> Sequence[Dict[str, Any]]: + return await _load_fixtures(session_factory, "test-metric.json") + + +@pytest.mark.asyncio +async def test_node_core_metrics(fixture_metrics_messages, ccn_api_client): + uri = _generate_uri( + "core", "b8b9104da69c54e58531212234fa31f49ef4c668a39a0bf6793322407857b821" + ) + response = await ccn_api_client.get(uri) + test_data = await response.json() + + assert response.status == 200 + assert ( + test_data["metrics"]["item_hash"][0] + == "56c82c6d3b28b76456594b4b57154b6826a6d5fb97d355d0428e5ca7d08193b9" + ) + assert ( + test_data["metrics"]["item_hash"][1] + == "172bab8f624fff1be70a19fecd45ff51fa4f833a34074451c7d79ece19bf37f0" + ) + + +@pytest.mark.asyncio +async def test_node_core_metrics_sort(fixture_metrics_messages, ccn_api_client): + uri = _generate_uri( + "core", "2e7cd67ff8f556b0b3fb8a2ef8ab0e8e1466cfa279dd7b9bfbc8aba92e0c5672" + ) + response = await ccn_api_client.get(uri, params={"sort": "DESC"}) + test_data = await response.json() + + assert response.status == 200 + assert ( + test_data["metrics"]["item_hash"][1] + == "56c82c6d3b28b76456594b4b57154b6826a6d5fb97d355d0428e5ca7d08193b9" + ) + assert ( + test_data["metrics"]["item_hash"][0] + == "172bab8f624fff1be70a19fecd45ff51fa4f833a34074451c7d79ece19bf37f0" + ) + + +@pytest.mark.asyncio +async def test_node_core_metrics_end_date(fixture_metrics_messages, ccn_api_client): + uri = _generate_uri( + "core", "b8b9104da69c54e58531212234fa31f49ef4c668a39a0bf6793322407857b821" + ) + response = await ccn_api_client.get(uri, params={"end_date": 1701261023}) + test_data = await response.json() + + assert response.status == 200 + assert ( + test_data["metrics"]["item_hash"][0] + == "56c82c6d3b28b76456594b4b57154b6826a6d5fb97d355d0428e5ca7d08193b9" + ) + + +@pytest.mark.asyncio +async def test_node_core_metrics_start_date(fixture_metrics_messages, ccn_api_client): + uri = _generate_uri( + "core", "b8b9104da69c54e58531212234fa31f49ef4c668a39a0bf6793322407857b821" + ) + response = await ccn_api_client.get(uri, params={"start_date": 1701261023}) + test_data = await response.json() + + assert response.status == 200 + assert ( + test_data["metrics"]["item_hash"][0] + == "172bab8f624fff1be70a19fecd45ff51fa4f833a34074451c7d79ece19bf37f0" + ) + + +@pytest.mark.asyncio +async def test_node_core_not_exist(fixture_metrics_messages, ccn_api_client): + uri = _generate_uri("core", "This_is_a_node_id") + response = await ccn_api_client.get(uri) + + assert response.status == 404 + + +@pytest.mark.asyncio +async def test_node_compute_metric(fixture_metrics_messages, ccn_api_client): + uri = _generate_uri( + "compute", "d491f38ec66fe23a9c9ad398a04fd4dcb44a115b948ef612db844caea85cd59a" + ) + response = await ccn_api_client.get(uri) + test_data = await response.json() + + assert response.status == 200 + assert ( + test_data["metrics"]["item_hash"][0] + == "56c82c6d3b28b76456594b4b57154b6826a6d5fb97d355d0428e5ca7d08193b9" + ) + +@pytest.mark.asyncio +async def test_node_compute_metric_not_exist(fixture_metrics_messages, ccn_api_client): + uri = _generate_uri( + "compute", "This_is_a_node_id" + ) + response = await ccn_api_client.get(uri) + + assert response.status == 404 \ No newline at end of file