Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

collect based on tenant id #104

Merged
merged 17 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// This can be used to network with other containers or the host.
// "forwardPorts": [5000, 5432],
"forwardPorts": [
3306, 6379
3306, 6379, 8001
],
// Use 'postCreateCommand' to run commands after the container is created.
"postCreateCommand": "bash .devcontainer/postCreate.sh",
Expand Down
5 changes: 5 additions & 0 deletions .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ services:
network_mode: service:app
container_name: "fixbackend-redis-${USER}"

redisinsight:
image: redislabs/redisinsight:latest
network_mode: service:app
container_name: "fixbackend-redisinsight-${USER}"

db:
image: mysql:8
network_mode: service:app
Expand Down
2 changes: 1 addition & 1 deletion fixbackend/all_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from fixbackend.workspaces.models.orm import Organization, OrganizationInvite # noqa
from fixbackend.graph_db.service import GraphDatabaseAccessEntity # noqa
from fixbackend.cloud_accounts.models.orm import CloudAccount # noqa
from fixbackend.dispatcher.next_run_repository import NextRun # noqa
from fixbackend.dispatcher.next_run_repository import NextTenantRun # noqa
from fixbackend.metering.metering_repository import MeteringRecordEntity # noqa
from fixbackend.keyvalue.json_kv import JsonEntry # noqa
from fixbackend.subscription.subscription_repository import SubscriptionEntity # noqa
61 changes: 42 additions & 19 deletions fixbackend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from fastapi.exception_handlers import http_exception_handler
from fastapi.middleware.cors import CORSMiddleware
from fastapi.openapi.docs import get_swagger_ui_html
from fastapi.responses import HTMLResponse
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fixcloudutils.logging import setup_logger
from fixcloudutils.redis.event_stream import RedisStreamPublisher
Expand All @@ -51,7 +51,7 @@
from fixbackend.dispatcher.dispatcher_service import DispatcherService
from fixbackend.dispatcher.next_run_repository import NextRunRepository
from fixbackend.domain_events.consumers import CustomerIoEventConsumer
from fixbackend.domain_events.sender_impl import DomainEventSenderImpl
from fixbackend.domain_events.publisher_impl import DomainEventPublisherImpl
from fixbackend.events.router import websocket_router
from fixbackend.graph_db.service import GraphDatabaseAccessManager
from fixbackend.inventory.inventory_client import InventoryClient
Expand All @@ -63,9 +63,11 @@
from fixbackend.subscription.subscription_repository import SubscriptionRepository
from fixbackend.workspaces.repository import WorkspaceRepositoryImpl
from fixbackend.workspaces.router import workspaces_router
from fixbackend.errors import Unauthorized

log = logging.getLogger(__name__)
API_PREFIX = "/api"
domain_events_stream_name = "fixbackend:domain_events"


# noinspection PyUnresolvedReferences
Expand Down Expand Up @@ -118,21 +120,7 @@ async def setup_teardown_application(app: FastAPI) -> AsyncIterator[None]:
graph_db_access = deps.add(SN.graph_db_access, GraphDatabaseAccessManager(cfg, session_maker))
inventory_client = deps.add(SN.inventory_client, InventoryClient(cfg.inventory_url, http_client))
deps.add(SN.inventory, InventoryService(inventory_client))
deps.add(
SN.cloudaccount_publisher,
RedisStreamPublisher(readwrite_redis, "fixbackend::cloudaccount", f"fixbackend-{cfg.instance_id}"),
)
workspace_repo = deps.add(SN.workspace_repo, WorkspaceRepositoryImpl(session_maker, graph_db_access))
subscription_repo = deps.add(SN.subscription_repo, SubscriptionRepository(session_maker))
deps.add(
SN.aws_marketplace_handler,
AwsMarketplaceHandler(
subscription_repo, workspace_repo, boto_session, cfg.args.aws_marketplace_metering_sqs_url
),
)

domain_events_stream_name = "fixbackend::domain_events"
domain_event_redis_publisher = deps.add(
fixbackend_events = deps.add(
SN.domain_event_redis_stream_publisher,
RedisStreamPublisher(
readwrite_redis,
Expand All @@ -141,7 +129,17 @@ async def setup_teardown_application(app: FastAPI) -> AsyncIterator[None]:
keep_unprocessed_messages_for=timedelta(days=7),
),
)
deps.add(SN.domain_event_sender, DomainEventSenderImpl(domain_event_redis_publisher))
domain_event_publisher = deps.add(SN.domain_event_sender, DomainEventPublisherImpl(fixbackend_events))
workspace_repo = deps.add(
SN.workspace_repo, WorkspaceRepositoryImpl(session_maker, graph_db_access, domain_event_publisher)
)
subscription_repo = deps.add(SN.subscription_repo, SubscriptionRepository(session_maker))
deps.add(
SN.aws_marketplace_handler,
AwsMarketplaceHandler(
subscription_repo, workspace_repo, boto_session, cfg.args.aws_marketplace_metering_sqs_url
),
)
deps.add(
SN.customerio_consumer,
CustomerIoEventConsumer(http_client, cfg, readwrite_redis, domain_events_stream_name),
Expand Down Expand Up @@ -169,6 +167,7 @@ async def setup_teardown_dispatcher(_: FastAPI) -> AsyncIterator[None]:
),
)
rw_redis = deps.add(SN.readwrite_redis, create_redis(cfg.redis_readwrite_url))
temp_store_redis = deps.add(SN.temp_store_redis, create_redis(cfg.redis_temp_store_url))
engine = deps.add(
SN.async_engine,
create_async_engine(
Expand All @@ -185,9 +184,29 @@ async def setup_teardown_dispatcher(_: FastAPI) -> AsyncIterator[None]:
metering_repo = deps.add(SN.metering_repo, MeteringRepository(session_maker))
collect_queue = deps.add(SN.collect_queue, RedisCollectQueue(arq_redis))
db_access = deps.add(SN.graph_db_access, GraphDatabaseAccessManager(cfg, session_maker))
fixbackend_events = deps.add(
SN.domain_event_redis_stream_publisher,
RedisStreamPublisher(
rw_redis,
domain_events_stream_name,
"dispatching",
keep_unprocessed_messages_for=timedelta(days=7),
),
)
domain_event_sender = deps.add(SN.domain_event_sender, DomainEventPublisherImpl(fixbackend_events))
deps.add(
SN.dispatching,
DispatcherService(rw_redis, cloud_accounts, next_run_repo, metering_repo, collect_queue, db_access),
DispatcherService(
rw_redis,
cloud_accounts,
next_run_repo,
metering_repo,
collect_queue,
db_access,
domain_event_sender,
temp_store_redis,
domain_events_stream_name,
),
)

async with deps:
Expand All @@ -214,6 +233,10 @@ async def setup_teardown_dispatcher(_: FastAPI) -> AsyncIterator[None]:
allow_headers=["X-Fix-Csrf"],
)

@app.exception_handler(Unauthorized)
async def unauthorized_handler(request: Request, exception: Unauthorized) -> Response:
return JSONResponse(status_code=403, content={"message": str(exception)})
aquamatthias marked this conversation as resolved.
Show resolved Hide resolved

class EndpointFilter(logging.Filter):
endpoints_to_filter: ClassVar[Set[str]] = {
"/health",
Expand Down
14 changes: 7 additions & 7 deletions fixbackend/auth/user_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
from fixbackend.auth.user_verifier import UserVerifier, UserVerifierDependency
from fixbackend.config import Config, ConfigDependency
from fixbackend.domain_events.events import UserRegistered
from fixbackend.domain_events.sender import DomainEventSender
from fixbackend.domain_events.dependencies import DomainEventSenderDependency
from fixbackend.domain_events.publisher import DomainEventPublisher
from fixbackend.domain_events.dependencies import DomainEventPublisherDependency
from fixbackend.workspaces.repository import WorkspaceRepository, WorkspaceRepositoryDependency


Expand All @@ -38,14 +38,14 @@ def __init__(
password_helper: PasswordHelperProtocol | None,
user_verifier: UserVerifier,
workspace_repository: WorkspaceRepository,
domain_events_sender: DomainEventSender,
domain_events_publisher: DomainEventPublisher,
):
super().__init__(user_repository, password_helper)
self.user_verifier = user_verifier
self.reset_password_token_secret = config.secret
self.verification_token_secret = config.secret
self.workspace_repository = workspace_repository
self.domain_events_sender = domain_events_sender
self.domain_events_publisher = domain_events_publisher

async def on_after_register(self, user: User, request: Request | None = None) -> None:
if user.is_verified: # oauth2 users are already verified
Expand All @@ -62,17 +62,17 @@ async def on_after_verify(self, user: User, request: Request | None = None) -> N
async def create_default_workspace(self, user: User) -> None:
org_slug = re.sub("[^a-zA-Z0-9-]", "-", user.email)
org = await self.workspace_repository.create_workspace(user.email, org_slug, user)
await self.domain_events_sender.publish(UserRegistered(user_id=user.id, email=user.email, tenant_id=org.id))
await self.domain_events_publisher.publish(UserRegistered(user_id=user.id, email=user.email, tenant_id=org.id))


async def get_user_manager(
config: ConfigDependency,
user_repository: UserRepositoryDependency,
user_verifier: UserVerifierDependency,
workspace_repository: WorkspaceRepositoryDependency,
domain_event_sender: DomainEventSenderDependency,
domain_event_publisher: DomainEventPublisherDependency,
) -> AsyncIterator[UserManager]:
yield UserManager(config, user_repository, None, user_verifier, workspace_repository, domain_event_sender)
yield UserManager(config, user_repository, None, user_verifier, workspace_repository, domain_event_publisher)


UserManagerDependency = Annotated[UserManager, Depends(get_user_manager)]
38 changes: 19 additions & 19 deletions fixbackend/cloud_accounts/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@

from attrs import evolve
from fastapi import Depends
from fixcloudutils.redis.event_stream import RedisStreamPublisher
from fixcloudutils.redis.pub_sub import RedisPubSubPublisher

from fixbackend.cloud_accounts.models import AwsCloudAccess, CloudAccount
from fixbackend.cloud_accounts.repository import CloudAccountRepository, CloudAccountRepositoryDependency
from fixbackend.dependencies import FixDependency
from fixbackend.domain_events.dependencies import DomainEventSenderDependency
from fixbackend.domain_events.events import AwsAccountDiscovered
from fixbackend.domain_events.sender import DomainEventSender
from fixbackend.domain_events.dependencies import DomainEventPublisherDependency
from fixbackend.domain_events.events import AwsAccountDeleted, AwsAccountDiscovered
from fixbackend.domain_events.publisher import DomainEventPublisher
from fixbackend.errors import Unauthorized
from fixbackend.ids import CloudAccountId, ExternalId, WorkspaceId
from fixbackend.workspaces.repository import WorkspaceRepository, WorkspaceRepositoryDependency

Expand Down Expand Up @@ -56,15 +56,13 @@ def __init__(
self,
workspace_repository: WorkspaceRepository,
cloud_account_repository: CloudAccountRepository,
publisher: RedisStreamPublisher,
pubsub_publisher: RedisPubSubPublisher,
domain_event_sender: DomainEventSender,
domain_event_publisher: DomainEventPublisher,
) -> None:
self.workspace_repository = workspace_repository
self.cloud_account_repository = cloud_account_repository
self.publisher = publisher
self.pubsub_publisher = pubsub_publisher
self.domain_event_sender = domain_event_sender
self.domain_events = domain_event_publisher

async def create_aws_account(
self, workspace_id: WorkspaceId, account_id: str, role_name: str, external_id: ExternalId
Expand Down Expand Up @@ -107,41 +105,43 @@ async def account_already_exists(workspace_id: WorkspaceId, account_id: str) ->
"workspace_id": str(result.workspace_id),
"aws_account_id": account_id,
}
await self.publisher.publish(kind="cloud_account_created", message=message)
await self.pubsub_publisher.publish(
kind="cloud_account_created", message=message, channel=f"tenant-events::{workspace_id}"
)
await self.domain_event_sender.publish(
AwsAccountDiscovered(
cloud_account_id=result.id, tenant_id=workspace_id, cloud_id="aws", aws_account_id=account_id
)
await self.domain_events.publish(
AwsAccountDiscovered(cloud_account_id=result.id, tenant_id=workspace_id, aws_account_id=account_id)
)
return result

async def delete_cloud_account(self, cloud_account_id: CloudAccountId, workspace_id: WorkspaceId) -> None:
account = await self.cloud_account_repository.get(cloud_account_id)
if not account or account.workspace_id != workspace_id:
raise ValueError("Cloud account does not exist")
if account is None:
return None # account already deleted, do nothing
if account.workspace_id != workspace_id:
raise Unauthorized("Deletion of cloud accounts is only allowed by the owning organization.")

await self.cloud_account_repository.delete(cloud_account_id)
await self.publisher.publish("cloud_account_deleted", {"id": str(cloud_account_id)})
match account.access:
case AwsCloudAccess(account_id, _, _):
await self.domain_events.publish(AwsAccountDeleted(cloud_account_id, workspace_id, account_id))
case _:
pass


def get_cloud_account_service(
workspace_repository_dependency: WorkspaceRepositoryDependency,
cloud_account_repository_dependency: CloudAccountRepositoryDependency,
fix_dependency: FixDependency,
domain_event_sender_dependency: DomainEventSenderDependency,
domain_event_publisher_dependency: DomainEventPublisherDependency,
) -> CloudAccountService:
redis_publisher = RedisPubSubPublisher(
redis=fix_dependency.readwrite_redis, channel="cloud_accounts", publisher_name="cloud_account_service"
)
return CloudAccountServiceImpl(
workspace_repository_dependency,
cloud_account_repository_dependency,
fix_dependency.cloudaccount_publisher,
redis_publisher,
domain_event_sender_dependency,
domain_event_publisher_dependency,
)


Expand Down
4 changes: 4 additions & 0 deletions fixbackend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Config(BaseSettings):
redis_readwrite_url: str
redis_readonly_url: str
redis_queue_url: str
redis_temp_store_url: str
cdn_endpoint: str
cdn_bucket: str
fixui_sha: str
Expand Down Expand Up @@ -93,6 +94,9 @@ def parse_args(argv: Optional[Sequence[str]] = None) -> Namespace:
parser.add_argument(
"--redis-readonly-url", default=os.environ.get("REDIS_READONLY_URL", "redis://localhost:6379/0")
)
parser.add_argument(
"--redis-temp-store-url", default=os.environ.get("REDIS_TEMP_STORE_URL", "redis://localhost:6379/1")
)
parser.add_argument("--redis-queue-url", default=os.environ.get("REDIS_QUEUE_URL", "redis://localhost:6379/5"))
parser.add_argument("--redis-password", default=os.environ.get("REDIS_PASSWORD"))
parser.add_argument("--skip-migrations", default=False, action="store_true")
Expand Down
15 changes: 5 additions & 10 deletions fixbackend/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@

from arq import ArqRedis
from fastapi.params import Depends
from fixcloudutils.redis.event_stream import RedisStreamPublisher
from fixcloudutils.service import Dependencies
from redis.asyncio import Redis
from sqlalchemy.ext.asyncio import AsyncEngine

from fixbackend.certificates.cert_store import CertificateStore
from fixbackend.domain_events.sender import DomainEventSender
from fixbackend.domain_events.sender_impl import DomainEventSenderImpl
from fixbackend.domain_events.publisher import DomainEventPublisher
from fixbackend.domain_events.publisher_impl import DomainEventPublisherImpl
from fixbackend.graph_db.service import GraphDatabaseAccessManager
from fixbackend.inventory.inventory_service import InventoryService
from fixbackend.types import AsyncSessionMaker
Expand All @@ -33,6 +32,7 @@ class ServiceNames:
arq_redis = "arq_redis"
readonly_redis = "readonly_redis"
readwrite_redis = "readwrite_redis"
temp_store_redis = "temp_store_redis"
collect_queue = "collect_queue"
async_engine = "async_engine"
session_maker = "session_maker"
Expand All @@ -43,7 +43,6 @@ class ServiceNames:
inventory = "inventory"
inventory_client = "inventory_client"
dispatching = "dispatching"
cloudaccount_publisher = "cloudaccount_publisher"
certificate_store = "certificate_store"
domain_event_redis_stream_publisher = "domain_event_redis_stream_publisher"
domain_event_sender = "domain_event_sender"
Expand Down Expand Up @@ -82,17 +81,13 @@ def readwrite_redis(self) -> Redis:
def graph_database_access(self) -> GraphDatabaseAccessManager:
return self.service(ServiceNames.graph_db_access, GraphDatabaseAccessManager)

@property
def cloudaccount_publisher(self) -> RedisStreamPublisher:
return self.service(ServiceNames.cloudaccount_publisher, RedisStreamPublisher)

@property
def certificate_store(self) -> CertificateStore:
return self.service(ServiceNames.certificate_store, CertificateStore)

@property
def domain_event_sender(self) -> DomainEventSender:
return self.service(ServiceNames.domain_event_sender, DomainEventSenderImpl)
def domain_event_sender(self) -> DomainEventPublisher:
return self.service(ServiceNames.domain_event_sender, DomainEventPublisherImpl)


# placeholder for dependencies, will be replaced during the app initialization
Expand Down
30 changes: 30 additions & 0 deletions fixbackend/dispatcher/collect_progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from attrs import frozen
from typing import Literal

from cattrs.preconf.json import make_converter
from attrs import evolve
from fixbackend.ids import CloudAccountId
from uuid import UUID
from datetime import datetime

json_converter = make_converter()

json_converter.register_structure_hook(UUID, lambda v, _: UUID(v))
json_converter.register_unstructure_hook(UUID, lambda v: str(v))


@frozen
class AccountCollectInProgress:
account_id: CloudAccountId
started_at: datetime
status: Literal["in_progress", "done"] = "in_progress"

def done(self) -> "AccountCollectInProgress":
return evolve(self, status="done")

def to_json_str(self) -> str:
return json_converter.dumps(self)

@staticmethod
def from_json_bytes(value: bytes) -> "AccountCollectInProgress":
return json_converter.loads(value, AccountCollectInProgress)
Loading