Skip to content

Commit

Permalink
expire revisions on remote servers
Browse files Browse the repository at this point in the history
  • Loading branch information
Johann Bahl committed May 22, 2023
1 parent 5fa55f0 commit 5353686
Show file tree
Hide file tree
Showing 27 changed files with 2,300 additions and 569 deletions.
16 changes: 16 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ Changelog

- Fix crash for non-numeric Ceph version strings

- Restore default value for missing entries on daemon reload

- Crash on invalid config while reloading

- Replace prettytable with rich

- Replace telnet shell with HTTP API

- Support backup job migration across servers

- Add `tags {set, add, remove}` subcommand

- Add `expire` subcommand

- Migrate `backy check` to `backy client check` and use the new HTTP API

2.4.3 (2019-04-17)
==================

Expand Down
7 changes: 0 additions & 7 deletions README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,6 @@ configurable.
Features
========

Telnet shell
------------

Telnet into localhost port 6023 to get an interactive console. The console can
currently be used to inspect the scheduler's live status.


Self-check
----------

Expand Down
26 changes: 4 additions & 22 deletions doc/man-backy.rst
Original file line number Diff line number Diff line change
Expand Up @@ -241,24 +241,6 @@ environment variables like **CEPH_CLUSTER** or **CEPH_ARGS**.
**backy scheduler** processes exit cleanly on SIGTERM.


Telnet shell
------------

The schedules opens a telnet server (default: localhost port 6023) for live
inspection. The telnet interface accepts the following commands:

jobs [REGEX]
Prints an overview of all configured jobs together with their last and
next backup run. An optional (extended) regular expression restricts output
to matching job names.

status
Dumps internal server status details.

quit
Exits the telnet shell.


Files
-----

Expand Down Expand Up @@ -288,12 +270,12 @@ config
status-interval
Update status file every N seconds (default: 30).

telnet-addrs
Comma-separated list of listen addresses for the telnet server
api-addrs
Comma-separated list of listen addresses for the api server
(default: 127.0.0.1, ::1).

telnet-port
Port number of the telnet server (default: 6023).
api-port
Port number of the api server (default: 6023).

.. _schedules:

Expand Down
517 changes: 480 additions & 37 deletions poetry.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,19 @@ consulate-fc-nix-test = "1.1.0a1"
humanize = "^4.4.0"
mmh3 = "^3.0"
packaging = "^22.0"
prettytable = "^3.6.0"
python-lzo = "^1.14"
requests = "^2.28.0"
shortuuid = "^1.0.11"
structlog = "^22.3.0"
telnetlib3 = "^2.0.0"
tzlocal = "^4.2"
colorama = "^0.4.6"
aiohttp = "^3.8.4"
rich = "^13.3.2"

[tool.poetry.dev-dependencies]
pre-commit = "^2.21.0"
pytest = "^7.2.0"
pytest-aiohttp = "^1.0.4"
pytest-asyncio = "^0.20.3"
pytest-cache = "^1.0"
pytest-cov = "^4.0.0"
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ def version():
install_requires=[
"consulate",
"packaging",
"prettytable",
"tzlocal",
"PyYaml",
"setuptools",
"shortuuid",
"python-lzo",
"telnetlib3>=1.0",
"humanize",
"mmh3",
"structlog",
"aiohttp",
"rich",
],
extras_require={
"test": [
Expand Down
233 changes: 233 additions & 0 deletions src/backy/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
import datetime
import re
from json import JSONEncoder
from os import path as p
from typing import Any, Tuple

from aiohttp import hdrs, web
from aiohttp.web_exceptions import (
HTTPAccepted,
HTTPBadRequest,
HTTPForbidden,
HTTPNotFound,
HTTPPreconditionFailed,
HTTPPreconditionRequired,
HTTPServiceUnavailable,
HTTPUnauthorized,
)
from aiohttp.web_middlewares import middleware
from aiohttp.web_runner import AppRunner, TCPSite
from structlog.stdlib import BoundLogger

import backy.daemon
from backy.backup import Backup
from backy.revision import Revision


class BackyJSONEncoder(JSONEncoder):
def default(self, o: Any) -> Any:
if hasattr(o, "to_dict"):
return o.to_dict()
elif isinstance(o, datetime.datetime):
return o.isoformat()
else:
super().default(o)


class BackyAPI:
daemon: "backy.daemon.BackyDaemon"
sites: dict[Tuple[str, int], TCPSite]
runner: AppRunner
tokens: dict
log: BoundLogger

def __init__(self, daemon, log):
self.log = log.bind(subsystem="api")
self.daemon = daemon
self.sites = {}
self.app = web.Application(
middlewares=[self.log_conn, self.require_auth, self.to_json]
)
self.app.add_routes(
[
web.get("/status", self.get_status),
web.post("/reload", self.reload_daemon),
web.get("/jobs", self.get_jobs),
# web.get("/jobs/{job_name}", self.get_job),
web.post("/jobs/{job_name}/run", self.run_job),
web.get("/backups", self.list_backups),
# web.get("/backups/{backup_name}", self.get_backup),
web.post("/backups/{backup_name}/purge", self.run_purge),
web.post("/backups/{backup_name}/touch", self.touch_backup),
web.get("/backups/{backup_name}/revs", self.get_revs),
# web.get("/backups/{backup_name}/revs/{rev_spec}", self.get_rev),
web.put(
"/backups/{backup_name}/revs/{rev_spec}/tags", self.put_tags
),
]
)

async def start(self):
self.runner = AppRunner(self.app)
await self.runner.setup()

async def stop(self):
await self.runner.cleanup()
self.sites = {}

async def reconfigure(self, tokens, addrs, port):
self.log.debug("reconfigure")
self.tokens = tokens
endpoints = [(addr, port) for addr in addrs if addr and port]
for ep in endpoints:
if ep not in self.sites:
self.sites[ep] = site = TCPSite(self.runner, ep[0], ep[1])
await site.start()
self.log.info("added-site", site=site.name)
for ep, site in list(self.sites.items()):
if ep not in endpoints:
await site.stop()
del self.sites[ep]
self.log.info("deleted-site", site=site.name)

@middleware
async def log_conn(self, request: web.Request, handler):
request["log"] = self.log.bind(
path=request.path, query=request.query_string
)
try:
resp = await handler(request)
except Exception as e:
if not isinstance(e, web.HTTPException):
request["log"].exception("error-handling-request")
else:
request["log"].debug(
"request-result", status_code=e.status_code
)
raise
request["log"].debug("request-result", response=resp.body)
return resp

@middleware
async def require_auth(self, request: web.Request, handler):
request["log"].debug("new-conn")
token = request.headers.get(hdrs.AUTHORIZATION, "")
if not token.startswith("Bearer "):
request["log"].info("auth-invalid-token")
raise HTTPUnauthorized()
token = token.removeprefix("Bearer ")
if len(token) < 3: # avoid potential truthiness edge cases
request["log"].info("auth-token-too-short")
raise HTTPUnauthorized()
client = self.tokens.get(token, None)
if not client:
request["log"].info("auth-token-unknown")
raise HTTPUnauthorized()
request["client"] = client
request["log"] = request["log"].bind(client=client)
request["log"].debug("auth-passed")
return await handler(request)

@middleware
async def to_json(self, request: web.Request, handler):
resp = await handler(request)
if isinstance(resp, web.Response):
return resp
elif resp is None:
raise web.HTTPNoContent()
else:
return web.json_response(resp, dumps=BackyJSONEncoder().encode)

async def get_status(self, request: web.Request):
filter = request.query.get("filter", "")
if filter:
filter = re.compile(filter)
return self.daemon.status(filter)

async def reload_daemon(self, request: web.Request):
self.daemon.reload()

async def get_jobs(self, request: web.Request):
return list(self.daemon.jobs.values())

async def get_job(self, request: web.Request):
try:
name = request.match_info.get("job_name", None)
return self.daemon.jobs[name]
except KeyError:
raise HTTPNotFound()

async def run_job(self, request: web.Request):
j = await self.get_job(request)
j.run_immediately.set()
raise HTTPAccepted()

async def list_backups(self, request: web.Request):
backups = self.daemon.find_all_backups()
return [b for b in backups if b not in self.daemon.jobs]

async def get_backup(self, request: web.Request) -> Backup:
name = request.match_info.get("backup_name", None)
if not name:
raise HTTPNotFound()
if name in self.daemon.jobs:
raise HTTPForbidden()
try:
return Backup(p.join(self.daemon.base_dir, name), request["log"])
except FileNotFoundError:
raise HTTPNotFound()

async def run_purge(self, request: web.Request):
backup = await self.get_backup(request)
backup.set_purge_pending()
raise HTTPAccepted()

async def touch_backup(self, request: web.Request):
backup = await self.get_backup(request)
backup.touch()

async def get_revs(self, request: web.Request):
backup = await self.get_backup(request)
if request.query.get("only_clean", "") == "1":
revs = backup.clean_history
else:
revs = backup.history
return [r for r in revs if not r.location]

async def get_rev(self, request: web.Request) -> Revision:
spec = request.match_info.get("rev_spec", None)
backup = await self.get_backup(request)
try:
rev = backup.find(spec)
if rev.location:
raise HTTPNotFound()
return rev
except KeyError:
raise HTTPNotFound()

async def put_tags(self, request: web.Request):
json = await request.json()
if "old_tags" not in json:
raise HTTPPreconditionRequired()
old_tags = set(json["old_tags"])
if "new_tags" not in json:
raise HTTPBadRequest()
new_tags = set(json["new_tags"])

autoremove = request.query.get("autoremove", "") == "1"
spec = request.match_info.get("rev_spec", None)
backup = await self.get_backup(request)
try:
if not backup.tags(
"set",
spec,
new_tags,
old_tags,
autoremove=autoremove,
force=True,
):
raise HTTPPreconditionFailed()
except KeyError:
raise HTTPNotFound()
except BlockingIOError:
raise HTTPServiceUnavailable()
2 changes: 1 addition & 1 deletion src/backy/backends/chunked/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self, revision: Revision, log: BoundLogger):
self.revision.backup.path + "/chunks", log
)
self.store = self.STORES[path]
self.log = log
self.log = log.bind(subsystem="chunked-backend")

def open(self, mode="rb"):
if "w" in mode or "+" in mode and self.clone_parent:
Expand Down
2 changes: 0 additions & 2 deletions src/backy/backends/cowfile.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import os.path

from structlog.stdlib import BoundLogger

import backy.revision
from backy.backends import BackyBackend
from backy.utils import CHUNK_SIZE, cp_reflink
Expand Down
Loading

0 comments on commit 5353686

Please sign in to comment.