Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance/extract-api #3

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
28 changes: 24 additions & 4 deletions API/auth/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from enum import Enum
from typing import Union

from fastapi.security import APIKeyHeader
from fastapi import Depends, Header, HTTPException
from osm_login_python.core import Auth
from pydantic import BaseModel, Field
Expand All @@ -9,6 +10,11 @@
from src.config import get_oauth_credentials


Raw_Data_Access_Token = APIKeyHeader(
name="Access_Token", description="Access Token to Authorize User"
)


class UserRole(Enum):
ADMIN = 1
STAFF = 2
Expand All @@ -21,6 +27,16 @@ class AuthUser(BaseModel):
img_url: Union[str, None]
role: UserRole = Field(default=UserRole.GUEST.value)

class Config:
json_schema_extra = {
"example": {
"id": "123",
"username": "HOT Team",
"img_url": "https://hotteamimage.com",
"role": UserRole.GUEST.value,
}
}


osm_auth = Auth(*get_oauth_credentials())

Expand All @@ -43,11 +59,15 @@ def get_osm_auth_user(access_token):
return user


def login_required(access_token: str = Header(...)):
def login_required(access_token: str = Depends(Raw_Data_Access_Token)):
return get_osm_auth_user(access_token)


def get_optional_user(access_token: str = Header(default=None)) -> AuthUser:
def get_optional_user(
access_token: str = Header(
default=None, description="Access Token to Authorize User"
)
) -> AuthUser:
if access_token:
return get_osm_auth_user(access_token)
else:
Expand All @@ -58,7 +78,7 @@ def get_optional_user(access_token: str = Header(default=None)) -> AuthUser:
def admin_required(user: AuthUser = Depends(login_required)):
db_user = get_user_from_db(user.id)
if not db_user["role"] is UserRole.ADMIN.value:
raise HTTPException(status_code=403, detail="User is not an admin")
raise HTTPException(status_code=403, detail=[{"msg": "User is not an admin"}])
return user


Expand All @@ -70,5 +90,5 @@ def staff_required(user: AuthUser = Depends(login_required)):
db_user["role"] is UserRole.STAFF.value
or db_user["role"] is UserRole.ADMIN.value
):
raise HTTPException(status_code=403, detail="User is not a staff")
raise HTTPException(status_code=403, detail=[{"msg": "User is not a staff"}])
return user
122 changes: 104 additions & 18 deletions API/auth/routers.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,32 @@
import json

from fastapi import APIRouter, Depends, Request
from fastapi import APIRouter, Depends, Request, Query, Path
from pydantic import BaseModel

from src.app import Users
from src.validation.models import ErrorMessage, common_responses

from . import AuthUser, admin_required, login_required, osm_auth, staff_required

router = APIRouter(prefix="/auth", tags=["Auth"])


@router.get("/login/")
@router.get(
"/login",
responses={
200: {
"description": "A Login URL",
"content": {
"application/json": {
"example": {
"login_url": "https://www.openstreetmap.org/oauth2/authorize/"
}
}
},
},
500: {"model": ErrorMessage},
},
)
def login_url(request: Request):
"""Generate Login URL for authentication using OAuth2 Application registered with OpenStreetMap.
Click on the download url returned to get access_token.
Expand All @@ -25,7 +41,7 @@ def login_url(request: Request):
return login_url


@router.get("/callback/")
@router.get("/callback", responses={500: {"model": ErrorMessage}})
def callback(request: Request):
"""Performs token exchange between OpenStreetMap and Raw Data API

Expand All @@ -42,18 +58,27 @@ def callback(request: Request):
return access_token


@router.get("/me/", response_model=AuthUser)
@router.get(
"/me",
response_model=AuthUser,
responses={**common_responses},
response_description="User Information",
)
def my_data(user_data: AuthUser = Depends(login_required)):
"""Read the access token and provide user details from OSM user's API endpoint,
also integrated with underpass .

Parameters:None

Returns: user_data
Returns: user_data\n
User Role :
ADMIN = 1
STAFF = 2
GUEST = 3

Raises:
- HTTPException 403: Due to authentication error(Wrong access token).
- HTTPException 500: Internal server error.
"""
return user_data

Expand All @@ -62,9 +87,19 @@ class User(BaseModel):
osm_id: int
role: int

class Config:
json_schema_extra = {"example": {"osm_id": 123, "role": 1}}


# Create user
@router.post("/users/", response_model=dict)
@router.post(
"/users",
response_model=dict,
responses={
**common_responses,
"200": {"content": {"application/json": {"example": {"osm_id": 123}}}},
},
)
async def create_user(params: User, user_data: AuthUser = Depends(admin_required)):
"""
Creates a new user and returns the user's information.
Expand All @@ -80,15 +115,26 @@ async def create_user(params: User, user_data: AuthUser = Depends(admin_required
- Dict[str, Any]: A dictionary containing the osm_id of the newly created user.

Raises:
- HTTPException: If the user creation fails.
- HTTPException 403: If the user creation fails due to insufficient permission.
- HTTPException 500: If the user creation fails due to internal server error.
"""
auth = Users()
return auth.create_user(params.osm_id, params.role)


# Read user by osm_id
@router.get("/users/{osm_id}", response_model=dict)
async def read_user(osm_id: int, user_data: AuthUser = Depends(staff_required)):
@router.get(
"/users/{osm_id}",
responses={
**common_responses,
"200": {"content": {"application/json": {"example": {"osm_id": 1, "role": 2}}}},
"404": {"model": ErrorMessage},
},
)
async def read_user(
osm_id: int = Path(description="The OSM ID of the User to Retrieve"),
user_data: AuthUser = Depends(staff_required),
):
"""
Retrieves user information based on the given osm_id.
User Role :
Expand All @@ -103,17 +149,28 @@ async def read_user(osm_id: int, user_data: AuthUser = Depends(staff_required)):
- Dict[str, Any]: A dictionary containing user information.

Raises:
- HTTPException: If the user with the given osm_id is not found.
- HTTPException 403: If the user has insufficient permission.
- HTTPException 404: If the user with the given osm_id is not found.
- HTTPException 500: If it fails due to internal server error.
"""
auth = Users()

return auth.read_user(osm_id)


# Update user by osm_id
@router.put("/users/{osm_id}", response_model=dict)
@router.put(
"/users/{osm_id}",
responses={
**common_responses,
"200": {"content": {"application/json": {"example": {"osm_id": 1, "role": 1}}}},
"404": {"model": ErrorMessage},
},
)
async def update_user(
osm_id: int, update_data: User, user_data: AuthUser = Depends(admin_required)
update_data: User,
user_data: AuthUser = Depends(admin_required),
osm_id: int = Path(description="The OSM ID of the User to Update"),
):
"""
Updates user information based on the given osm_id.
Expand All @@ -129,15 +186,27 @@ async def update_user(
- Dict[str, Any]: A dictionary containing the updated user information.

Raises:
- HTTPException: If the user with the given osm_id is not found.
- HTTPException 403: If the user has insufficient permission.
- HTTPException 404: If the user with the given osm_id is not found.
- HTTPException 500: If it fails due to internal server error.
"""
auth = Users()
return auth.update_user(osm_id, update_data)


# Delete user by osm_id
@router.delete("/users/{osm_id}", response_model=dict)
async def delete_user(osm_id: int, user_data: AuthUser = Depends(admin_required)):
@router.delete(
"/users/{osm_id}",
responses={
**common_responses,
"200": {"content": {"application/json": {"example": {"osm_id": 1, "role": 1}}}},
"404": {"model": ErrorMessage},
},
)
async def delete_user(
user_data: AuthUser = Depends(admin_required),
osm_id: int = Path(description="The OSM ID of the User to Delete"),
):
"""
Deletes a user based on the given osm_id.

Expand All @@ -148,16 +217,29 @@ async def delete_user(osm_id: int, user_data: AuthUser = Depends(admin_required)
- Dict[str, Any]: A dictionary containing the deleted user information.

Raises:
- HTTPException: If the user with the given osm_id is not found.
- HTTPException 403: If the user has insufficient permission.
- HTTPException 404: If the user with the given osm_id is not found.
- HTTPException 500: If it fails due to internal server error.
"""
auth = Users()
return auth.delete_user(osm_id)


# Get all users
@router.get("/users/", response_model=list)
@router.get(
"/users",
response_model=list,
responses={
**common_responses,
"200": {
"content": {"application/json": {"example": [{"osm_id": 1, "role": 2}]}}
},
},
)
async def read_users(
skip: int = 0, limit: int = 10, user_data: AuthUser = Depends(staff_required)
skip: int = Query(0, description="The Number of Users to Skip"),
limit: int = Query(10, description="The Maximum Number of Users to Retrieve"),
user_data: AuthUser = Depends(staff_required),
):
"""
Retrieves a list of users with optional pagination.
Expand All @@ -168,6 +250,10 @@ async def read_users(

Returns:
- List[Dict[str, Any]]: A list of dictionaries containing user information.

Raises:
- HTTPException 403: If it fails due to insufficient permission.
- HTTPException 500: If it fails due to internal server error.
"""
auth = Users()
return auth.read_users(skip, limit)
7 changes: 6 additions & 1 deletion API/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@

os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"

app = FastAPI(title="Raw Data API ", swagger_ui_parameters={"syntaxHighlight": False})
app = FastAPI(
title="Raw Data API ",
description="""The Raw Data API allows you to transform
and export OpenStreetMap (OSM) data in different GIS file formats""",
swagger_ui_parameters={"syntaxHighlight": False},
)
app.include_router(auth_router)
app.include_router(raw_data_router)
app.include_router(tasks_router)
Expand Down
Loading