Skip to content

Commit

Permalink
Feature: New Metrics endpoint for CRN & CNN (#528)
Browse files Browse the repository at this point in the history
* Feature: New metric endpoint for CRN & CNN

* Internal: Unit Test for metric endpoint

* Fix: Remove unused import

* Refactor: Unit Test & remove too big message content for unit

* Fix: mypy error

* Refactor: easier way to parse sql query

* Fix typo

---------

Co-authored-by: Mike Hukiewitz <[email protected]>
  • Loading branch information
1yam and MHHukiewitz authored Dec 11, 2023
1 parent aaa60a5 commit 19df1e4
Show file tree
Hide file tree
Showing 6 changed files with 572 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""ccn_metric_view
Revision ID: 08602db6c78f
Revises: 3bf484f2cc95
Create Date: 2023-11-27 17:03:28.184344
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "08602db6c78f"
down_revision = "3bf484f2cc95"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.execute(
"""
create or replace view ccn_metric_view AS
WITH json_data AS (
SELECT
item_hash,
jsonb_array_elements(content->'content'->'metrics'->'ccn') as ccn_data
FROM messages
WHERE channel = 'aleph-scoring' AND sender = '0x4D52380D3191274a04846c89c069E6C3F2Ed94e4'
)
SELECT
item_hash,
(ccn_data->>'measured_at')::float as measured_at,
ccn_data->>'node_id' as node_id,
(ccn_data->>'base_latency')::float as base_latency,
(ccn_data->>'metrics_latency')::float as metrics_latency,
(ccn_data->>'aggregate_latency')::float as aggregate_latency,
(ccn_data->>'base_latency_ipv4')::float as base_latency_ipv4,
(ccn_data->>'file_download_latency')::float as file_download_latency,
(ccn_data->>'pending_messages')::int as pending_messages,
(ccn_data->>'eth_height_remaining')::int as eth_height_remaining
FROM json_data;
"""
)

op.execute(
"""
create or replace view crn_metric_view AS
WITH json_data AS (
SELECT
item_hash,
jsonb_array_elements(content->'content'->'metrics'->'crn') as crn_data
FROM messages
WHERE channel = 'aleph-scoring' AND sender = '0x4D52380D3191274a04846c89c069E6C3F2Ed94e4'
)
SELECT
item_hash as item_hash,
(crn_data->>'measured_at')::float as measured_at,
crn_data->>'node_id' as node_id,
(crn_data->>'base_latency')::float as base_latency,
(crn_data->>'base_latency_ipv4')::float as base_latency_ipv4,
(crn_data->>'full_check_latency')::float as full_check_latency,
(crn_data->>'diagnostic_vm_latency')::float as diagnostic_vm_latency
FROM json_data;
"""
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.execute("DROP VIEW ccn_metric_view;")
op.execute("DROP VIEW crn_metric_view;")

# ### end Alembic commands ###
124 changes: 124 additions & 0 deletions src/aleph/db/accessors/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from typing import Optional

from sqlalchemy import select, text
from sqlalchemy.orm.session import Session

from aleph.types.db_session import DbSession


def _parse_ccn_result(result):
keys = [
"item_hash",
"measured_at",
"base_latency",
"base_latency_ipv4",
"metrics_latency",
"aggregate_latency",
"file_download_latency",
"pending_messages",
"eth_height_remaining",
]

# Transpose the result and create a dictionary
result_dict = {key: list(values) for key, values in zip(keys, zip(*result))}

return result_dict


def _parse_crn_result(result):
keys = [
"item_hash",
"measured_at",
"base_latency",
"base_latency_ipv4",
"full_check_latency",
"diagnostic_vm_latency",
]

# Transpose the result and create a dictionary
result_dict = {key: list(values) for key, values in zip(keys, zip(*result))}

return result_dict


def _build_metric_filter(select_stmt, node_id, start_date, end_date, sort_order):
if node_id:
select_stmt = select_stmt.where(text("node_id = :node_id")).params(
node_id=node_id
)
if start_date:
select_stmt = select_stmt.where(text("measured_at >= :start_date")).params(
start_date=start_date
)
if end_date:
select_stmt = select_stmt.where(text("measured_at <= :end_date")).params(
end_date=end_date
)
if sort_order:
select_stmt = select_stmt.order_by(text(f"measured_at {sort_order}"))
return select_stmt


def query_metric_ccn(
session: Session,
node_id: Optional[str] = None,
start_date: Optional[float] = None,
end_date: Optional[float] = None,
sort_order: Optional[str] = None,
):
select_stmt = select(
[
text("item_hash"),
text("measured_at"),
text("base_latency"),
text("base_latency_ipv4"),
text("metrics_latency"),
text("aggregate_latency"),
text("file_download_latency"),
text("pending_messages"),
text("eth_height_remaining"),
]
).select_from(text("ccn_metric_view"))

select_stmt = _build_metric_filter(
select_stmt=select_stmt,
node_id=node_id,
start_date=start_date,
end_date=end_date,
sort_order=sort_order,
)

result = session.execute(select_stmt).fetchall()

return _parse_ccn_result(result=result)


def query_metric_crn(
session: DbSession,
node_id: str,
start_date: Optional[float] = None,
end_date: Optional[float] = None,
sort_order: Optional[str] = None,
):
select_stmt = select(
[
text("item_hash"),
text("measured_at"),
text("base_latency"),
text("base_latency_ipv4"),
text("full_check_latency"),
text("diagnostic_vm_latency"),
]
).select_from(text("crn_metric_view"))

select_stmt = _build_metric_filter(
select_stmt=select_stmt,
node_id=node_id,
start_date=start_date,
end_date=end_date,
sort_order=sort_order,
)

result = session.execute(select_stmt).fetchall()

return _parse_crn_result(result=result)
75 changes: 73 additions & 2 deletions src/aleph/web/controllers/main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import asyncio
import logging
from dataclasses import asdict
from typing import Dict
from typing import Dict, Optional

import aiohttp_jinja2
from aiohttp import web
from pydantic import BaseModel

from aleph.db.accessors.metrics import query_metric_ccn, query_metric_crn
from aleph.types.db_session import DbSessionFactory
from aleph.web.controllers.app_state_getters import get_node_cache_from_request, get_session_factory_from_request
from aleph.web.controllers.app_state_getters import (
get_node_cache_from_request,
get_session_factory_from_request,
)
from aleph.web.controllers.metrics import format_dataclass_for_prometheus, get_metrics

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -74,3 +79,69 @@ async def metrics_json(request: web.Request) -> web.Response:
text=(await get_metrics(session=session, node_cache=node_cache)).to_json(),
content_type="application/json",
)


class Metrics(BaseModel):
start_date: Optional[float] = None
end_date: Optional[float] = None
sort: Optional[str] = None


def _get_node_id_from_request(request: web.Request) -> str:
address = request.match_info.get("node_id")
if address is None:
raise web.HTTPUnprocessableEntity(body="node_id must be specified.")
return address


async def ccn_metric(request: web.Request) -> web.Response:
"""Fetch metrics for CCN node id"""

session_factory: DbSessionFactory = get_session_factory_from_request(request)
query_params = Metrics.parse_obj(request.query)

node_id = _get_node_id_from_request(request)

with session_factory() as session:
ccn = query_metric_ccn(
session,
node_id=node_id,
start_date=query_params.start_date,
end_date=query_params.end_date,
sort_order=query_params.sort,
)
if not ccn:
raise web.HTTPNotFound()

if not ccn["item_hash"]:
raise web.HTTPNotFound()

result = {"metrics": ccn}
return web.json_response(result)


async def crn_metric(request: web.Request) -> web.Response:
"""Fetch Metric for crn."""

session_factory: DbSessionFactory = get_session_factory_from_request(request)
query_params = Metrics.parse_obj(request.query)

node_id = _get_node_id_from_request(request)

with session_factory() as session:
crn = query_metric_crn(
session,
node_id=node_id,
start_date=query_params.start_date,
end_date=query_params.end_date,
sort_order=query_params.sort,
)

if not crn:
raise web.HTTPNotFound()

if not crn["item_hash"]:
raise web.HTTPNotFound()

result = {"metrics": crn}
return web.json_response(result)
7 changes: 4 additions & 3 deletions src/aleph/web/controllers/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ def register_routes(app: web.Application):
app.router.add_get("/metrics", main.metrics)
app.router.add_get("/metrics.json", main.metrics_json)

app.router.add_get("/api/v0/core/{node_id}/metrics", main.ccn_metric)
app.router.add_get("/api/v0/compute/{node_id}/metrics", main.crn_metric)

app.router.add_get(
"/api/v0/aggregates/{address}.json", aggregates.address_aggregate
)
Expand Down Expand Up @@ -58,9 +61,7 @@ def register_routes(app: web.Application):
app.router.add_get(
"/api/v0/addresses/{address}/balance", accounts.get_account_balance
)
app.router.add_get(
"/api/v0/addresses/{address}/files", accounts.get_account_files
)
app.router.add_get("/api/v0/addresses/{address}/files", accounts.get_account_files)

app.router.add_post("/api/v0/ipfs/add_json", storage.add_ipfs_json_controller)
app.router.add_post("/api/v0/storage/add_json", storage.add_storage_json_controller)
Expand Down
Loading

0 comments on commit 19df1e4

Please sign in to comment.