diff --git a/runtimes/eoapi/stac/eoapi/stac/app.py b/runtimes/eoapi/stac/eoapi/stac/app.py index 65a2b0b..5cda148 100644 --- a/runtimes/eoapi/stac/eoapi/stac/app.py +++ b/runtimes/eoapi/stac/eoapi/stac/app.py @@ -3,17 +3,21 @@ import logging from contextlib import asynccontextmanager +import attr import jinja2 from eoapi.auth_utils import OpenIdConnectAuth, OpenIdConnectSettings from fastapi import FastAPI from fastapi.responses import ORJSONResponse from stac_fastapi.api.app import StacApi from stac_fastapi.api.models import ( + CollectionUri, + EmptyRequest, ItemCollectionUri, - create_get_request_model, + ItemUri, create_post_request_model, create_request_model, ) +from stac_fastapi.api.routes import create_async_endpoint from stac_fastapi.extensions.core import ( FieldsExtension, FilterExtension, @@ -23,12 +27,14 @@ ) from stac_fastapi.extensions.third_party import BulkTransactionExtension from stac_fastapi.pgstac.config import Settings -from stac_fastapi.pgstac.core import CoreCrudClient from stac_fastapi.pgstac.db import close_db_connection, connect_to_db from stac_fastapi.pgstac.extensions import QueryExtension from stac_fastapi.pgstac.extensions.filter import FiltersClient from stac_fastapi.pgstac.transactions import BulkTransactionsClient, TransactionsClient from stac_fastapi.pgstac.types.search import PgstacSearch +from stac_fastapi.types.search import BaseSearchGetRequest +from stac_pydantic import api +from stac_pydantic.shared import MimeTypes from starlette.middleware import Middleware from starlette.middleware.cors import CORSMiddleware from starlette.requests import Request @@ -36,8 +42,9 @@ from starlette.templating import Jinja2Templates from starlette_cramjam.middleware import CompressionMiddleware +from .client import PgSTACClient from .config import ApiSettings -from .extension import TiTilerExtension +from .extension import HTMLorGeoJSONGetRequest, HTMLorJSONGetRequest, TiTilerExtension from .logs import init_logging jinja2_env = jinja2.Environment( @@ -115,19 +122,121 @@ async def lifespan(app: FastAPI): ) # Custom Models -items_get_model = ItemCollectionUri if any(isinstance(ext, TokenPaginationExtension) for ext in extensions): items_get_model = create_request_model( model_name="ItemCollectionUri", base_model=ItemCollectionUri, - mixins=[TokenPaginationExtension().GET], + mixins=[TokenPaginationExtension().GET, HTMLorGeoJSONGetRequest], + request_type="GET", + ) +else: + items_get_model = create_request_model( + model_name="ItemCollectionUri", + base_model=ItemCollectionUri, + mixins=[HTMLorGeoJSONGetRequest], request_type="GET", ) -search_get_model = create_get_request_model(extensions) +search_get_model = create_request_model( + "SearchGetRequest", + base_model=BaseSearchGetRequest, + extensions=extensions, + mixins=[HTMLorGeoJSONGetRequest], + request_type="GET", +) search_post_model = create_post_request_model(extensions, base_model=PgstacSearch) +collections_get_model = create_request_model( + model_name="CollectionsModel", + base_model=EmptyRequest, + mixins=[HTMLorJSONGetRequest], + request_type="GET", +) +collection_get_model = create_request_model( + model_name="CollectionUri", + base_model=CollectionUri, + mixins=[HTMLorJSONGetRequest], + request_type="GET", +) +item_get_model = create_request_model( + model_name="ItemUri", + base_model=ItemUri, + mixins=[HTMLorGeoJSONGetRequest], + request_type="GET", +) +conformance_get_model = create_request_model( + model_name="ConformanceModel", + base_model=EmptyRequest, + mixins=[HTMLorJSONGetRequest], + request_type="GET", +) +landing_get_model = create_request_model( + model_name="LandingModel", + base_model=EmptyRequest, + mixins=[HTMLorJSONGetRequest], + request_type="GET", +) + + +@attr.s +class CustomStacApi(StacApi): + def register_landing_page(self): + """Register landing page (GET /). + + Returns: + None + """ + self.router.add_api_route( + name="Landing Page", + path="/", + response_model=( + api.LandingPage if self.settings.enable_response_models else None + ), + responses={ + 200: { + "content": { + MimeTypes.json.value: {}, + }, + "model": api.LandingPage, + }, + }, + response_class=self.response_class, + response_model_exclude_unset=False, + response_model_exclude_none=True, + methods=["GET"], + endpoint=create_async_endpoint(self.client.landing_page, landing_get_model), + ) + + def register_conformance_classes(self): + """Register conformance classes (GET /conformance). + + Returns: + None + """ + self.router.add_api_route( + name="Conformance Classes", + path="/conformance", + response_model=( + api.Conformance if self.settings.enable_response_models else None + ), + responses={ + 200: { + "content": { + MimeTypes.json.value: {}, + }, + "model": api.Conformance, + }, + }, + response_class=self.response_class, + response_model_exclude_unset=True, + response_model_exclude_none=True, + methods=["GET"], + endpoint=create_async_endpoint( + self.client.conformance, conformance_get_model + ), + ) + -api = StacApi( +stac_api = CustomStacApi( # type: ignore app=FastAPI( title=api_settings.name, lifespan=lifespan, @@ -143,14 +252,17 @@ async def lifespan(app: FastAPI): description=api_settings.name, settings=settings, extensions=extensions, - client=CoreCrudClient(post_request_model=search_post_model), + client=PgSTACClient(post_request_model=search_post_model), # type: ignore items_get_request_model=items_get_model, + item_get_request_model=item_get_model, + collections_get_request_model=collections_get_model, + collection_get_request_model=collection_get_model, search_get_request_model=search_get_model, search_post_request_model=search_post_model, response_class=ORJSONResponse, middlewares=middlewares, ) -app = api.app +app = stac_api.app @app.get("/index.html", response_class=HTMLResponse) diff --git a/runtimes/eoapi/stac/eoapi/stac/client.py b/runtimes/eoapi/stac/eoapi/stac/client.py new file mode 100644 index 0000000..bd84766 --- /dev/null +++ b/runtimes/eoapi/stac/eoapi/stac/client.py @@ -0,0 +1,442 @@ +"""eoapi.stac client.""" + +import re +from enum import Enum +from typing import Any, List, Literal, Optional, get_args +from urllib.parse import urljoin + +import attr +import jinja2 +from fastapi import Request +from stac_fastapi.pgstac.core import CoreCrudClient +from stac_fastapi.types.requests import get_base_url +from stac_fastapi.types.stac import ( + Collection, + Collections, + Conformance, + Item, + ItemCollection, + LandingPage, +) +from stac_pydantic.links import Relations +from stac_pydantic.shared import MimeTypes +from starlette.templating import Jinja2Templates, _TemplateResponse + +ResponseType = Literal["json", "html"] +GeoJSONResponseType = Literal["geojson", "html"] + +jinja2_env = jinja2.Environment( + loader=jinja2.ChoiceLoader([jinja2.PackageLoader(__package__, "templates")]) +) +DEFAULT_TEMPLATES = Jinja2Templates(env=jinja2_env) + + +class MediaType(str, Enum): + """Responses Media types formerly known as MIME types.""" + + xml = "application/xml" + json = "application/json" + ndjson = "application/ndjson" + geojson = "application/geo+json" + geojsonseq = "application/geo+json-seq" + schemajson = "application/schema+json" + html = "text/html" + text = "text/plain" + csv = "text/csv" + openapi30_json = "application/vnd.oai.openapi+json;version=3.0" + openapi30_yaml = "application/vnd.oai.openapi;version=3.0" + pbf = "application/x-protobuf" + mvt = "application/vnd.mapbox-vector-tile" + + +def accept_media_type(accept: str, mediatypes: List[MediaType]) -> Optional[MediaType]: + """Return MediaType based on accept header and available mediatype. + + Links: + - https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html + - https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept + + """ + accept_values = {} + for m in accept.replace(" ", "").split(","): + values = m.split(";") + if len(values) == 1: + name = values[0] + quality = 1.0 + else: + name = values[0] + groups = dict([param.split("=") for param in values[1:]]) # type: ignore + try: + q = groups.get("q") + quality = float(q) if q else 1.0 + except ValueError: + quality = 0 + + # if quality is 0 we ignore encoding + if quality: + accept_values[name] = quality + + # Create Preference matrix + media_preference = { + v: [n for (n, q) in accept_values.items() if q == v] + for v in sorted(set(accept_values.values()), reverse=True) + } + + # Loop through available compression and encoding preference + for _, pref in media_preference.items(): + for media in mediatypes: + if media.value in pref: + return media + + # If no specified encoding is supported but "*" is accepted, + # take one of the available compressions. + if "*" in accept_values and mediatypes: + return mediatypes[0] + + return None + + +@attr.s +class PgSTACClient(CoreCrudClient): + templates: Jinja2Templates = attr.ib(default=DEFAULT_TEMPLATES) + + def create_html_response( + self, + request: Request, + data: Any, + template_name: str, + title: Optional[str] = None, + router_prefix: Optional[str] = None, + **kwargs: Any, + ) -> _TemplateResponse: + """Create Template response.""" + + router_prefix = request.app.state.router_prefix + + urlpath = request.url.path + if root_path := request.app.root_path: + urlpath = re.sub(r"^" + root_path, "", urlpath) + + if router_prefix: + urlpath = re.sub(r"^" + router_prefix, "", urlpath) + + crumbs = [] + baseurl = str(request.base_url).rstrip("/") + + if router_prefix: + baseurl += router_prefix + + crumbpath = str(baseurl) + if urlpath == "/": + urlpath = "" + + for crumb in urlpath.split("/"): + crumbpath = crumbpath.rstrip("/") + part = crumb + if part is None or part == "": + part = "Home" + crumbpath += f"/{crumb}" + crumbs.append({"url": crumbpath.rstrip("/"), "part": part.capitalize()}) + + return self.templates.TemplateResponse( + request, + name=f"{template_name}.html", + context={ + "response": data, + "template": { + "api_root": baseurl, + "params": request.query_params, + "title": title or template_name, + }, + "crumbs": crumbs, + "url": baseurl + urlpath, + "params": str(request.url.query), + **kwargs, + }, + ) + + async def landing_page( + self, + request: Request, + f: Optional[str] = None, + **kwargs, + ) -> LandingPage: + """Landing page. + + Called with `GET /`. + + Returns: + API landing page, serving as an entry point to the API. + """ + base_url = get_base_url(request) + + landing_page = self._landing_page( + base_url=base_url, + conformance_classes=self.conformance_classes(), + extension_schemas=[], + ) + + # Add Queryables link + if self.extension_is_enabled("FilterExtension"): + landing_page["links"].append( + { + "rel": Relations.queryables.value, + "type": MimeTypes.jsonschema.value, + "title": "Queryables", + "href": urljoin(base_url, "queryables"), + "method": "GET", + } + ) + + # Add Aggregation links + if self.extension_is_enabled("AggregationExtension"): + landing_page["links"].extend( + [ + { + "rel": "aggregate", + "type": "application/json", + "title": "Aggregate", + "href": urljoin(base_url, "aggregate"), + }, + { + "rel": "aggregations", + "type": "application/json", + "title": "Aggregations", + "href": urljoin(base_url, "aggregations"), + }, + ] + ) + + # NOTE: We need a custom `landing_page()` method because we need to force the result of `all_collections` + # to be of type json + # Add Collections links + collections = await self.all_collections(request=request, f="json") + + for collection in collections["collections"]: + landing_page["links"].append( + { + "rel": Relations.child.value, + "type": MimeTypes.json.value, + "title": collection.get("title") or collection.get("id"), + "href": urljoin(base_url, f"collections/{collection['id']}"), + } + ) + + # Add OpenAPI URL + landing_page["links"].append( + { + "rel": Relations.service_desc.value, + "type": MimeTypes.openapi.value, + "title": "OpenAPI service description", + "href": str(request.url_for("openapi")), + } + ) + + # Add human readable service-doc + landing_page["links"].append( + { + "rel": Relations.service_doc.value, + "type": MimeTypes.html.value, + "title": "OpenAPI service documentation", + "href": str(request.url_for("swagger_ui_html")), + } + ) + + landing = LandingPage(**landing_page) + + output_type: Optional[MediaType] + if f: + output_type = MediaType[f] + else: + accepted_media = [MediaType[v] for v in get_args(ResponseType)] + output_type = accept_media_type( + request.headers.get("accept", ""), accepted_media + ) + + if output_type == MediaType.html: + return self.create_html_response( + request, + landing, + template_name="landing", + title=landing["title"], + ) + + return landing + + async def conformance( + self, + request: Request, + f: Optional[str] = None, + **kwargs, + ) -> Conformance: + """Conformance classes. + + Called with `GET /conformance`. + + Returns: + Conformance classes which the server conforms to. + """ + conforms_to = Conformance(conformsTo=self.conformance_classes()) + + output_type: Optional[MediaType] + if f: + output_type = MediaType[f] + else: + accepted_media = [MediaType[v] for v in get_args(ResponseType)] + output_type = accept_media_type( + request.headers.get("accept", ""), accepted_media + ) + + if output_type == MediaType.html: + return self.create_html_response( + request, + conforms_to, + template_name="conformance", + ) + + return conforms_to + + async def all_collections( + self, + request: Request, + *args, + f: Optional[str] = None, + **kwargs, + ) -> Collections: + collections = await super().all_collections(request, *args, **kwargs) + + output_type: Optional[MediaType] + if f: + output_type = MediaType[f] + else: + accepted_media = [MediaType[v] for v in get_args(ResponseType)] + output_type = accept_media_type( + request.headers.get("accept", ""), accepted_media + ) + + if output_type == MediaType.html: + return self.create_html_response( + request, + collections, + template_name="collections", + title="Collections list", + ) + + return collections + + async def get_collection( + self, + collection_id: str, + request: Request, + *args, + f: Optional[str] = None, + **kwargs, + ) -> Collection: + collection = await super().get_collection( + collection_id, request, *args, **kwargs + ) + + output_type: Optional[MediaType] + if f: + output_type = MediaType[f] + else: + accepted_media = [MediaType[v] for v in get_args(ResponseType)] + output_type = accept_media_type( + request.headers.get("accept", ""), accepted_media + ) + + if output_type == MediaType.html: + return self.create_html_response( + request, + collection, + template_name="collection", + title=f"{collection_id} collection", + ) + + return collection + + async def item_collection( + self, + collection_id: str, + request: Request, + *args, + f: Optional[str] = None, + **kwargs, + ) -> ItemCollection: + items = await super().item_collection(collection_id, request, *args, **kwargs) + + output_type: Optional[MediaType] + if f: + output_type = MediaType[f] + else: + accepted_media = [MediaType[v] for v in get_args(GeoJSONResponseType)] + output_type = accept_media_type( + request.headers.get("accept", ""), accepted_media + ) + + if output_type == MediaType.html: + return self.create_html_response( + request, + items, + template_name="items", + title=f"{collection_id} items", + ) + + return items + + async def get_item( + self, + item_id: str, + collection_id: str, + request: Request, + *args, + f: Optional[str] = None, + **kwargs, + ) -> Item: + item = await super().get_item(item_id, collection_id, request, *args, **kwargs) + + output_type: Optional[MediaType] + if f: + output_type = MediaType[f] + else: + accepted_media = [MediaType[v] for v in get_args(GeoJSONResponseType)] + output_type = accept_media_type( + request.headers.get("accept", ""), accepted_media + ) + + if output_type == MediaType.html: + return self.create_html_response( + request, + item, + template_name="item", + title=f"{collection_id}/{item_id} item", + ) + + return item + + async def get_search( + self, + request: Request, + *args, + f: Optional[str] = None, + **kwargs, + ) -> ItemCollection: + items = await super().get_search(request, *args, **kwargs) + + output_type: Optional[MediaType] + if f: + output_type = MediaType[f] + else: + accepted_media = [MediaType[v] for v in get_args(GeoJSONResponseType)] + output_type = accept_media_type( + request.headers.get("accept", ""), accepted_media + ) + + if output_type == MediaType.html: + return self.create_html_response( + request, + items, + template_name="search", + ) + + return items diff --git a/runtimes/eoapi/stac/eoapi/stac/extension.py b/runtimes/eoapi/stac/eoapi/stac/extension.py index 180fcd7..cd0942d 100644 --- a/runtimes/eoapi/stac/eoapi/stac/extension.py +++ b/runtimes/eoapi/stac/eoapi/stac/extension.py @@ -1,12 +1,13 @@ """TiTiler extension.""" -from typing import Optional +from typing import Annotated, Literal, Optional from urllib.parse import urlencode import attr from fastapi import APIRouter, FastAPI, HTTPException, Path, Query from fastapi.responses import RedirectResponse from stac_fastapi.types.extension import ApiExtension +from stac_fastapi.types.search import APIRequest from starlette.requests import Request @@ -104,3 +105,23 @@ async def stac_viewer( return RedirectResponse(url) app.include_router(self.router, tags=["TiTiler Extension"]) + + +@attr.s +class HTMLorJSONGetRequest(APIRequest): + """HTML or JSON output.""" + + f: Annotated[ + Optional[Literal["json", "html"]], + Query(description="Response MediaType."), + ] = attr.ib(default=None) + + +@attr.s +class HTMLorGeoJSONGetRequest(APIRequest): + """HTML or GeoJSON output.""" + + f: Annotated[ + Optional[Literal["geojson", "html"]], + Query(description="Response MediaType."), + ] = attr.ib(default=None) diff --git a/runtimes/eoapi/stac/eoapi/stac/templates/collection.html b/runtimes/eoapi/stac/eoapi/stac/templates/collection.html new file mode 100644 index 0000000..9289d93 --- /dev/null +++ b/runtimes/eoapi/stac/eoapi/stac/templates/collection.html @@ -0,0 +1,82 @@ +{% include "header.html" %} +{% if params %} + {% set urlq = url + '?' + params + '&' %} + {% else %} + {% set urlq = url + '?' %} +{% endif %} + + + +
{{ response.description or response.title or response.id }}
+ {% if "keywords" in response and length(response.keywords) > 0 %} ++ {% for keyword in response.keywords %} + {{ keyword }} + {% endfor %} +
+
+ Number of matching collections: {{ response.numberMatched }}
+ Number of returned collections: {{ response.numberReturned }}
+ Page: of
+
Title | +Description | +
---|---|
{{ collection.title or collection.id }} | +{{ collection.description or collection.title or collection.id }} | +
This API implements the conformance classes from standards and community specifications that are listed below.
+ +