Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expire revisions on remote servers #37

Merged
merged 13 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ repos:
- types-PyYAML==5.4.0
- types-setuptools
- types-tzlocal==4.2
- types-aiofiles==23.2.0.20240311
exclude: tests
args:
- --check-untyped-defs
Expand Down
9 changes: 9 additions & 0 deletions changelog.d/20230626_005856_jb_issue_30_server_migration.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
- Support backup job migration across servers

- Add `tags {set, add, remove}` subcommand

- Add `expire` subcommand

- logging: improve exception formatting

- logging: add taskid
3 changes: 3 additions & 0 deletions changelog.d/20240205_012140_jb_issue_30_server_migration.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.. A new scriv changelog fragment.

- Add `push` and `pull` subcommand
3 changes: 3 additions & 0 deletions changelog.d/20240205_012340_jb_issue_30_server_migration.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.. A new scriv changelog fragment.

- Add `server:` selector to revision spec
3 changes: 3 additions & 0 deletions changelog.d/20240402_125207_jb_issue_30_server_migration.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.. A new scriv changelog fragment.

- Coordinate backups for the same job between servers
7 changes: 7 additions & 0 deletions doc/man-backy.rst
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ Subcommand-specific options
Trust state. Ordered by date, oldest first.
* A tag with the **tag:** prefix. Selects all revisions with this tag.
Ordered by date, oldest first.
* A server with the **server:** prefix: Selects all revisions located on
this server. The current server can be selected with an empty string.
Ordered by date, oldest first.
* The key word **local** selects all revisions located on the current
server (`server:`).
* The key word **local** selects all revisions located on remote servers
(`not(server:)`).
* An inclusive range using two single revision specifiers separated with two
dots. The singe revision specifiers may be omitted, in which case the
**first** and/or **last** revision is assumed.
Expand Down
15 changes: 12 additions & 3 deletions lib.nix
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ let
scriv = super.scriv.overrideAttrs (old: {
buildInputs = (old.buildInputs or []) ++ [ super.setuptools ];
});
backports-tarfile = super.backports-tarfile.overrideAttrs (old: {
buildInputs = (old.buildInputs or []) ++ [ super.setuptools ];
});
docutils = super.docutils.overrideAttrs (old: {
buildInputs = (old.buildInputs or []) ++ [ super.flit-core ];
});
execnet = super.execnet.overrideAttrs (old: {
buildInputs = (old.buildInputs or []) ++ [ super.hatchling super.hatch-vcs ];
});
Expand All @@ -48,10 +54,13 @@ let
# replace poetry to avoid dependency on vulnerable python-cryptography package
nativeBuildInputs = [ super.poetry-core ] ++ builtins.filter (p: p.pname or "" != "poetry") old.nativeBuildInputs;
});
aiofiles = super.aiofiles.overrideAttrs (old: {
buildInputs = (old.buildInputs or []) ++ [ super.hatchling super.hatch-vcs ];
});
nh3 =
let
getCargoHash = version: {
"0.2.15" = "sha256-fetAE3cj9hh4SoPE72Bqco5ytUMiDqbazeS2MHdUibM=";
"0.2.17" = "sha256-WomlVzKOUfcgAWGJInSvZn9hm+bFpgc4nJbRiyPCU64=";
}.${version} or (
lib.warn "Unknown nh3 version: '${version}'. Please update getCargoHash." lib.fakeHash
);
Expand All @@ -75,7 +84,7 @@ let
cryptography =
let
getCargoHash = version: {
"41.0.7" = "sha256-VeZhKisCPDRvmSjGNwCgJJeVj65BZ0Ge+yvXbZw86Rw";
"42.0.5" = "sha256-Pw3ftpcDMfZr/w6US5fnnyPVsFSB9+BuIKazDocYjTU=";
}.${version} or (
lib.warn "Unknown cryptography version: '${version}'. Please update getCargoHash." lib.fakeHash
);
Expand Down Expand Up @@ -118,7 +127,7 @@ in

devShells = {
default = mkShellNoCC {
BACKY_CMD = "backy";
BACKY_CMD = "${poetryEnv}/bin/backy";
packages = [
poetryEnv
poetry
Expand Down
1,243 changes: 696 additions & 547 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ aiohttp = "^3.8.4"
rich = "^13.3.2"
yarl = "1.9.2"
frozenlist = "1.4.0"
aiofiles = "^23.2.1"
aioshutil = "^1.3"

[tool.poetry.dev-dependencies]
pre-commit = "^3.3.3"
Expand Down
138 changes: 123 additions & 15 deletions src/backy/api.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,30 @@
import datetime
import re
from json import JSONEncoder
from typing import Any, List, Tuple
from pathlib import Path
from typing import TYPE_CHECKING, Any, List, Tuple

from aiohttp import hdrs, web
from aiohttp.web_exceptions import HTTPAccepted, HTTPNotFound, HTTPUnauthorized
from aiohttp.web_exceptions import (
HTTPAccepted,
HTTPBadRequest,
HTTPForbidden,
HTTPNotFound,
HTTPPreconditionFailed,
HTTPServiceUnavailable,
HTTPUnauthorized,
)
from aiohttp.web_middlewares import middleware
from aiohttp.web_runner import AppRunner, TCPSite
from structlog.stdlib import BoundLogger

import backy.daemon
from backy.backup import Backup
from backy.revision import Revision
from backy.scheduler import Job
from backy.utils import generate_taskid

if TYPE_CHECKING:
from backy.daemon import BackyDaemon


class BackyJSONEncoder(JSONEncoder):
Expand All @@ -23,14 +38,14 @@ def default(self, o: Any) -> Any:


class BackyAPI:
daemon: "backy.daemon.BackyDaemon"
daemon: "BackyDaemon"
sites: dict[Tuple[str, int], TCPSite]
runner: AppRunner
tokens: dict
log: BoundLogger

def __init__(self, daemon, log):
self.log = log.bind(subsystem="api")
self.log = log.bind(subsystem="api", job_name="~")
self.daemon = daemon
self.sites = {}
self.app = web.Application(
Expand All @@ -42,6 +57,14 @@ def __init__(self, daemon, log):
web.post("/v1/reload", self.reload_daemon),
web.get("/v1/jobs", self.get_jobs),
web.post("/v1/jobs/{job_name}/run", self.run_job),
web.get("/v1/backups", self.list_backups),
web.post("/v1/backups/{backup_name}/purge", self.run_purge),
web.post("/v1/backups/{backup_name}/touch", self.touch_backup),
web.get("/v1/backups/{backup_name}/revs", self.get_revs),
web.put(
"/v1/backups/{backup_name}/revs/{rev_spec}/tags",
self.put_tags,
),
]
)

Expand All @@ -67,7 +90,7 @@ async def reconfigure(
)
await site.start()
self.log.info("added-site", site=site.name)
for bind_addr, site in self.sites.items():
for bind_addr, site in list(self.sites.items()):
if bind_addr in bind_addrs:
continue
await site.stop()
Expand All @@ -77,9 +100,12 @@ async def reconfigure(
@middleware
async def log_conn(self, request: web.Request, handler):
request["log"] = self.log.bind(
path=request.path, query=request.query_string
sub_taskid=request.headers.get("taskid"),
taskid=generate_taskid(),
)
request["log"].debug(
"new-conn", path=request.path, query=request.query_string
)
request["log"].debug("new-conn")
try:
resp = await handler(request)
except Exception as e:
Expand Down Expand Up @@ -107,8 +133,7 @@ async def require_auth(self, request: web.Request, handler):
request["log"].info("auth-token-unknown")
raise HTTPUnauthorized()
request["client"] = client
request["log"] = request["log"].bind(client=client)
request["log"].debug("auth-passed")
request["log"] = request["log"].bind(job_name="~" + client)
return await handler(request)

@middleware
Expand All @@ -121,26 +146,109 @@ async def to_json(self, request: web.Request, handler):
else:
return web.json_response(resp, dumps=BackyJSONEncoder().encode)

async def get_status(self, request: web.Request):
filter = request.query.get("filter", "")
async def get_status(
self, request: web.Request
) -> List["BackyDaemon.StatusDict"]:
filter = request.query.get("filter", None)
request["log"].info("get-status", filter=filter)
if filter:
filter = re.compile(filter)
return self.daemon.status(filter)

async def reload_daemon(self, request: web.Request):
request["log"].info("reload-daemon")
self.daemon.reload()

async def get_jobs(self, request: web.Request):
async def get_jobs(self, request: web.Request) -> List[Job]:
request["log"].info("get-jobs")
return list(self.daemon.jobs.values())

async def get_job(self, request: web.Request):
async def get_job(self, request: web.Request) -> Job:
name = request.match_info.get("job_name")
request["log"].info("get-job", name=name)
try:
name = request.match_info.get("job_name", None)
return self.daemon.jobs[name]
except KeyError:
request["log"].info("get-job-not-found", name=name)
raise HTTPNotFound()

async def run_job(self, request: web.Request):
j = await self.get_job(request)
request["log"].info("run-job", name=j.name)
j.run_immediately.set()
raise HTTPAccepted()

async def list_backups(self, request: web.Request) -> List[str]:
request["log"].info("list-backups")
return list(self.daemon.dead_backups.keys())

async def get_backup(
self, request: web.Request, allow_active: bool
) -> Backup:
name = request.match_info.get("backup_name")
request["log"].info("get-backups", name=name)
if name in self.daemon.dead_backups:
return self.daemon.dead_backups[name]
if name in self.daemon.jobs:
if allow_active:
return self.daemon.jobs[name].backup
request["log"].info("get-backups-forbidden", name=name)
raise HTTPForbidden()
request["log"].info("get-backups-not-found", name=name)
raise HTTPNotFound()

async def run_purge(self, request: web.Request):
backup = await self.get_backup(request, False)
request["log"].info("run-purge", name=backup.name)
backup.set_purge_pending()
raise HTTPAccepted()

async def touch_backup(self, request: web.Request):
backup = await self.get_backup(request, True)
request["log"].info("touch-backup", name=backup.name)
backup.touch()

async def get_revs(self, request: web.Request) -> List[Revision]:
backup = await self.get_backup(request, True)
request["log"].info("get-revs", name=backup.name)
backup.scan()
return backup.get_history(
local=True, clean=request.query.get("only_clean", "") == "1"
)

async def put_tags(self, request: web.Request):
json = await request.json()
try:
old_tags = set(json["old_tags"])
new_tags = set(json["new_tags"])
except KeyError:
request["log"].info("put-tags-bad-request")
raise HTTPBadRequest()
autoremove = request.query.get("autoremove", "") == "1"
spec = request.match_info.get("rev_spec")
backup = await self.get_backup(request, False)
request["log"].info(
"put-tags",
name=backup.name,
old_tags=old_tags,
new_tags=new_tags,
spec=spec,
autoremove=autoremove,
)
backup.scan()
try:
if not backup.tags(
"set",
dhnasa marked this conversation as resolved.
Show resolved Hide resolved
spec,
new_tags,
old_tags,
autoremove=autoremove,
force=True,
):
raise HTTPPreconditionFailed()
except KeyError:
request["log"].info("put-tags-rev-not-found")
raise HTTPNotFound()
except BlockingIOError:
request["log"].info("put-tags-locked")
raise HTTPServiceUnavailable()
4 changes: 2 additions & 2 deletions src/backy/backends/chunked/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def open(self, mode: str = "rb", parent: Optional[Revision] = None) -> File: #
def purge(self) -> None:
self.log.debug("purge")
used_chunks: Set[Hash] = set()
for revision in self.backup.history:
for revision in self.backup.local_history:
if revision.backend_type != "chunked":
continue
used_chunks.update(
Expand All @@ -65,7 +65,7 @@ def verify(self):
verified_chunks: Set[Hash] = set()

# Load verified chunks to avoid duplicate work
for revision in self.backup.clean_history:
for revision in self.backup.get_history(clean=True, local=True):
if (
revision.trust != Trust.VERIFIED
or revision.backend_type != "chunked"
Expand Down
3 changes: 3 additions & 0 deletions src/backy/backends/chunked/tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def test_purge(simple_file_config, log):
f.write(b"asdf")
f.close()
r.materialize()
remote = Revision(b, log) # remote revision without local data
remote.server = "remote"
remote.materialize()
b.scan()
# Reassign as the scan will create a new reference
r = b.history[0]
Expand Down
Loading
Loading