Skip to content

Commit

Permalink
Implement plugin data streaming using EventSource
Browse files Browse the repository at this point in the history
  • Loading branch information
Wrench56 committed Jul 16, 2024
1 parent 05027ae commit ab8bc84
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 31 deletions.
6 changes: 6 additions & 0 deletions src/backend/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import logging.config

import asyncio
import atexit
import signal
import sys
Expand Down Expand Up @@ -37,7 +38,12 @@ def init_logger():


def cleanup() -> None:
logging.info('Cleaning up...')
processes.terminate_subprocesses()
loop = asyncio.get_event_loop()
for task in asyncio.all_tasks(loop=loop):
task.cancel()
loop.stop()

# Raises SystemExit
sys.exit()
Expand Down
12 changes: 12 additions & 0 deletions src/backend/plugins/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
from api import expose
from plugins.base_plugin import Plugin
from plugins import priority
from utils.flag import Flag


_PLUGINS: Dict[str, Plugin] = {}
_PLUGINS_UPDATED = Flag(True)


def load_all() -> None:
Expand All @@ -26,6 +28,7 @@ def load_all() -> None:


def load(name: str) -> Optional[Plugin]:
_PLUGINS_UPDATED.set()
try:
source = f'plugins.plugins.{name}.backend.main'
plugin: Plugin = importlib.import_module(source).init()
Expand Down Expand Up @@ -81,6 +84,7 @@ def get_plugin_names() -> Tuple[str, ...]:


def get_plugin_statuses() -> List[Dict[str, Any]]:
_PLUGINS_UPDATED.reset()
result = []
for name, _ in priority.fetch_plugins(reload=False):
if name in _PLUGINS:
Expand All @@ -89,3 +93,11 @@ def get_plugin_statuses() -> List[Dict[str, Any]]:
result.append({'name': name, 'status': False})

return result


def is_updated() -> bool:
return _PLUGINS_UPDATED.get()


def set_update_flag() -> None:
_PLUGINS_UPDATED.set()
4 changes: 3 additions & 1 deletion src/backend/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
asyncio
fastapi
uvicorn[standard]
sse_starlette
uvicorn[standard]
25 changes: 20 additions & 5 deletions src/backend/server/main.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from typing import Any, Optional
from typing import Any, AsyncGenerator

import asyncio
import datetime
import inspect
import json
import logging

from fastapi import BackgroundTasks, FastAPI, Request, WebSocket
from fastapi.responses import FileResponse, ORJSONResponse, PlainTextResponse
from fastapi.staticfiles import StaticFiles
from sse_starlette.sse import EventSourceResponse

from api import expose
from db import users
Expand Down Expand Up @@ -217,9 +220,21 @@ async def ws_plugins(websocket: WebSocket, plugin: str, endpoint: str) -> Any:
return response


@app.get('/plugins/status', response_class=ORJSONResponse)
async def plugin_status(request: Request) -> ORJSONResponse:
@app.get('/plugins/status', response_class=EventSourceResponse)
async def plugin_status(request: Request) -> EventSourceResponse:
if not database.uuid_exists(request.cookies.get('auth_cookie')):
return ORJSONResponse({'ok': False, 'error': 'Auth failed'})
return EventSourceResponse(content=iter(('Authentication failed',)), status_code=401)
handler.set_update_flag()

return ORJSONResponse({'ok': True, 'plugins': handler.get_plugin_statuses()})
async def event_generator() -> AsyncGenerator[str, None]:
while True:
if await request.is_disconnected():
break

if handler.is_updated():
yield json.dumps(
{'ok': True, 'plugins': handler.get_plugin_statuses()}
)
await asyncio.sleep(2.0)

return EventSourceResponse(event_generator(), media_type='text/event-stream')
15 changes: 15 additions & 0 deletions src/backend/utils/flag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
class Flag:
def __init__(self, default: bool = True) -> None:
self._flag = default

def set(self) -> None:
self._flag = True

def reset(self) -> None:
self._flag = False

def get(self) -> bool:
return self._flag

def __repr__(self) -> str:
return f'Flag({self._flag})'
32 changes: 20 additions & 12 deletions src/frontend/src/lib/components/plugins/PluginsContainer.svelte
Original file line number Diff line number Diff line change
@@ -1,28 +1,36 @@
<script lang="ts">
import { onMount } from "svelte";
import type { PluginStatus } from "../shared/api/plugins.type";
import type {
PluginStatus,
PluginStatusContainer,
} from "../shared/api/plugins.type";
import Error from "./Error.svelte";
import Plugin from "./Plugin.svelte";
import Separator from "../shared/Separator.svelte";
import { initEventSources, pluginStatusStream } from "../shared/api/streams";
let plugins: Array<PluginStatus> = [];
let error: string;
function updatePluginsList(data: PluginStatusContainer) {
if (data.ok == true) {
plugins = data.plugins;
}
}
onMount(() => {
fetch("/plugins/status")
.then((response) => response.json())
.then((responseJson) => {
if (responseJson.ok == true) {
plugins = responseJson.plugins;
} else {
plugins = [];
error = responseJson.error;
}
});
initEventSources();
updatePluginsList(pluginStatusStream.fetch());
const unsubscribe = pluginStatusStream.subscribe(
(data: PluginStatusContainer) => updatePluginsList(data)
);
return () => {
unsubscribe();
};
});
</script>


<Separator />
{#if plugins.length == 0}
<Error {error} />
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/lib/components/shared/api/plugins.type.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
export type PluginStatusContainer = {
ok: boolean;
plugins: Array<PluginStatus>;
};

export type PluginStatus = {
name: string;
status: boolean;
Expand Down
49 changes: 49 additions & 0 deletions src/frontend/src/lib/components/shared/api/streams.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
type CallbackFunction = (data: any) => void;
let pluginStatusStream = undefined;

const createEventSource = (
url: string,
defaultData: object
): {
subscribe: (listener: CallbackFunction) => () => boolean;
fetch: () => object;
close: () => void;
} => {
const eventSource = new EventSource(url);
const listeners: Set<CallbackFunction> = new Set();
let cachedData: object = defaultData;

eventSource.onmessage = (event: MessageEvent) => {
cachedData = JSON.parse(event.data);
listeners.forEach((listener) => listener(cachedData));
};

eventSource.onerror = () => {
const errorData = defaultData;
listeners.forEach((listener) => listener(errorData));
};

return {
subscribe: (listener) => {
listeners.add(listener);
return () => listeners.delete(listener);
},
close: () => {
eventSource.close();
},
fetch: () => {
return cachedData;
},
};
};

export function initEventSources() {
if (pluginStatusStream == undefined) {
pluginStatusStream = createEventSource("/plugins/status", {
ok: false,
plugins: [],
});
}
}

export { pluginStatusStream };
37 changes: 24 additions & 13 deletions src/frontend/src/lib/components/shared/statusbar/Plugins.svelte
Original file line number Diff line number Diff line change
@@ -1,23 +1,34 @@
<script lang="ts">
import { onMount } from "svelte";
import { initEventSources, pluginStatusStream } from "../api/streams";
import type { PluginStatusContainer } from "../api/plugins.type";
let out_of: string = "";
let status: string = "success";
let content: string = "LOAD*";
function updateStatus(data: PluginStatusContainer) {
if (data.ok) {
status = "success";
out_of = `/${data.plugins.length}`;
content = data.plugins.length.toString();
} else {
status = "failure";
out_of = "";
content = "ERROR";
}
}
onMount(() => {
fetch("/plugins/status")
.then((response) => response.json())
.then((responseJson) => {
if (responseJson.ok == true) {
status = "success";
out_of = `/${responseJson.plugins.length}`;
content = responseJson.plugins.length;
} else {
status = "failure";
out_of = "";
content = "ERROR";
}
});
initEventSources();
updateStatus(pluginStatusStream.fetch());
const unsubscribe = pluginStatusStream.subscribe(
(data: PluginStatusContainer) => updateStatus(data)
);
return () => {
unsubscribe();
};
});
</script>

Expand Down

0 comments on commit ab8bc84

Please sign in to comment.