From 247e352034e20f1c6064a432e6e54d5a2ceaa0aa Mon Sep 17 00:00:00 2001 From: Johann Bahl Date: Thu, 15 Jun 2023 01:12:53 +0200 Subject: [PATCH] add taskid --- ...26_005856_jb_issue_30_server_migration.rst | 2 ++ src/backy/api.py | 4 ++- src/backy/backup.py | 35 ++++++++++--------- src/backy/client.py | 9 +++-- src/backy/daemon.py | 2 +- src/backy/logging.py | 12 ++----- src/backy/main.py | 27 +++++++++----- src/backy/scheduler.py | 28 ++++++++++++--- src/backy/tests/test_client.py | 24 +++++++------ src/backy/tests/test_main.py | 6 ++-- src/backy/utils.py | 5 +++ 11 files changed, 96 insertions(+), 58 deletions(-) diff --git a/changelog.d/20230626_005856_jb_issue_30_server_migration.rst b/changelog.d/20230626_005856_jb_issue_30_server_migration.rst index beaffa57..740875c7 100644 --- a/changelog.d/20230626_005856_jb_issue_30_server_migration.rst +++ b/changelog.d/20230626_005856_jb_issue_30_server_migration.rst @@ -3,3 +3,5 @@ - Add `tags {set, add, remove}` subcommand - Add `expire` subcommand + +- logging: add taskid diff --git a/src/backy/api.py b/src/backy/api.py index 2dbbc47e..1b80b308 100644 --- a/src/backy/api.py +++ b/src/backy/api.py @@ -96,7 +96,9 @@ async def reconfigure( @middleware async def log_conn(self, request: web.Request, handler): request["log"] = self.log.bind( - path=request.path, query=request.query_string + path=request.path, + query=request.query_string, + remote_taskid=request.headers.get("taskid"), ) try: resp = await handler(request) diff --git a/src/backy/backup.py b/src/backy/backup.py index 845dbc56..3dd839d9 100644 --- a/src/backy/backup.py +++ b/src/backy/backup.py @@ -182,7 +182,7 @@ def touch(self): def set_purge_pending(self): open(p.join(self.path, ".purge_pending"), "w").close() - def remove_purge_pending(self): + def clear_purge_pending(self): path = p.join(self.path, ".purge_pending") if p.exists(path): os.remove(path) @@ -346,7 +346,7 @@ def verify(self, revision=None): def purge(self): backend = self.backend_factory(self.history[0], self.log) backend.purge() - self.remove_purge_pending() + self.clear_purge_pending() ################# # Restoring @@ -555,17 +555,18 @@ def find(self, spec) -> Revision: ################### # Syncing Revisions + # called by the scheduler without a subprocess @locked(target=".backup", mode="exclusive") - async def push_metadata(self, peers): - self.log.info("push-metadata-start") - async with APIClientManager(peers, self.log) as apis: + async def push_metadata(self, peers, taskid: str, log): + log.info("push-metadata-start") + async with APIClientManager(peers, taskid, log) as apis: peers_with_removals = set() for r in self.history: if not r.pending_changes: continue - self.log.debug( + log.debug( "push-metadata-updating-tags", server=r.location, rev_uuid=r.uuid, @@ -582,37 +583,37 @@ async def push_metadata(self, peers): isinstance(e, ClientResponseError) and e.status == HTTPPreconditionFailed.status_code ): - self.log.warning( + log.warning( "push-metadata-unexpected-server-state", expected_tags=r.orig_tags, ) elif isinstance(e, ClientConnectorError): - self.log.debug("pull-metadata-connection-error") + log.debug("pull-metadata-connection-error") else: - self.log.exception("push-metadata-error") + log.exception("push-metadata-error") r.remove(force=True) for s in peers_with_removals: - self.log.debug("push-metadata-purging-remote", server=s) + log.debug("push-metadata-purging-remote", server=s) try: await apis[s].run_purge(self.name) except ClientError: - self.log.warning( + log.warning( "push-metadata-remote-purge-error", exc_info=True, ) continue @locked(target=".backup", mode="exclusive") - async def pull_metadata(self, peers: dict): - self.log.info("pull-metadata-start") - async with APIClientManager(peers, self.log) as apis: + async def pull_metadata(self, peers: dict, taskid: str, log): + log.info("pull-metadata-start") + async with APIClientManager(peers, taskid, log) as apis: await asyncio.gather( - *[self._pull_metadata(apis[server]) for server in apis] + *[self._pull_metadata(apis[server], log) for server in apis] ) - async def _pull_metadata(self, api: APIClient): - log = self.log.bind(server=api.server_name) + async def _pull_metadata(self, api: APIClient, log): + log = log.bind(server=api.server_name) try: await api.touch_backup(self.name) remote_revs = await api.get_revs(self) diff --git a/src/backy/client.py b/src/backy/client.py index f4704a6f..b04a0817 100644 --- a/src/backy/client.py +++ b/src/backy/client.py @@ -19,18 +19,20 @@ class APIClientManager: connector: TCPConnector peers: dict clients: dict[str, "APIClient"] + taskid: str log: BoundLogger - def __init__(self, peers, log): + def __init__(self, peers, taskid, log): self.connector = TCPConnector() self.peers = peers self.clients = dict() + self.taskid = taskid self.log = log.bind(subsystem="APIClientManager") def __getitem__(self, name): if name and name not in self.clients: self.clients[name] = APIClient.from_conf( - name, self.peers[name], self.log, self.connector + name, self.peers[name], self.taskid, self.log, self.connector ) return self.clients[name] @@ -59,6 +61,7 @@ def __init__( server_name: str, url: str, token: str, + taskid: str, log, connector=None, ): @@ -67,7 +70,7 @@ def __init__( self.server_name = server_name self.session = aiohttp.ClientSession( url, - headers={hdrs.AUTHORIZATION: "Bearer " + token}, + headers={hdrs.AUTHORIZATION: "Bearer " + token, "taskid": taskid}, raise_for_status=True, timeout=ClientTimeout(30, connect=10), connector=connector, diff --git a/src/backy/daemon.py b/src/backy/daemon.py index 08c378dd..0935811f 100644 --- a/src/backy/daemon.py +++ b/src/backy/daemon.py @@ -113,8 +113,8 @@ def _apply_config(self): job = self.jobs[name] if config != job.last_config: self.log.info("changed-job", job_name=name) - job.configure(config) job.stop() + job.configure(config) job.start() for name, job in list(self.jobs.items()): diff --git a/src/backy/logging.py b/src/backy/logging.py index 6410a7f8..ac1ca2af 100644 --- a/src/backy/logging.py +++ b/src/backy/logging.py @@ -201,9 +201,9 @@ def write(line): + " " ) - pid = event_dict.pop("pid", None) - if pid is not None: - write(DIM + str(pid) + RESET_ALL + " ") + taskid = event_dict.pop("taskid", None) + if taskid is not None: + write(DIM + str(taskid) + RESET_ALL + " ") level = event_dict.pop("level", None) if level is not None: @@ -278,11 +278,6 @@ def write(line): return {"console": console_io.getvalue(), "file": log_io.getvalue()} -def add_pid(logger, method_name, event_dict): - event_dict["pid"] = os.getpid() - return event_dict - - def process_exc_info(logger, name, event_dict): """Transforms exc_info to the exception tuple format returned by sys.exc_info(). Uses the the same logic as as structlog's format_exc_info() @@ -336,7 +331,6 @@ def init_logging( ) processors = [ - add_pid, structlog.processors.add_log_level, process_exc_info, format_exc_info, diff --git a/src/backy/main.py b/src/backy/main.py index 02a8536d..c9dc1c4e 100644 --- a/src/backy/main.py +++ b/src/backy/main.py @@ -16,7 +16,7 @@ import backy.backup import backy.daemon -from backy.utils import format_datetime_local +from backy.utils import format_datetime_local, generate_taskid from . import logging from .client import APIClient, CLIClient @@ -36,10 +36,12 @@ class Command(object): """Proxy between CLI calls and actual backup code.""" path: str + taskid: str log: BoundLogger - def __init__(self, path, log): + def __init__(self, path, taskid, log): self.path = path + self.taskid = taskid self.log = log def status(self): @@ -136,15 +138,17 @@ def verify(self, revision): def client(self, config, peer, url, token, apifunc, **kwargs): async def run(): if url and token: - api = APIClient("", url, token, self.log) + api = APIClient("", url, token, self.taskid, self.log) else: d = backy.daemon.BackyDaemon(config, self.log) d._read_config() if peer: - api = APIClient.from_conf(peer, d.peers[peer], self.log) + api = APIClient.from_conf( + peer, d.peers[peer], self.taskid, self.log + ) else: api = APIClient.from_conf( - "", d.api_cli_default, self.log + "", d.api_cli_default, self.taskid, self.log ) async with CLIClient(api, self.log) as c: await getattr(c, apifunc)(**kwargs) @@ -186,7 +190,7 @@ def setup_argparser(): default=argparse.SUPPRESS, help=( "file name to write log output in. " - "(default: /var/log/backy.log for `scheduler`, " + "(default: /var/log/backy.log for `scheduler`, ignored for `client`, " "$backupdir/backy.log otherwise)" ), ) @@ -200,6 +204,12 @@ def setup_argparser(): "(default: %(default)s)" ), ) + parser.add_argument( + "-t", + "--taskid", + default=generate_taskid(), + help=("id to include in log messages" "(default: %(default)s)"), + ) subparsers = parser.add_subparsers() @@ -515,10 +525,10 @@ def main(): or (args.func == "client" and args.apifunc == "check") else "", ) - log = structlog.stdlib.get_logger(subsystem="command") + log = structlog.stdlib.get_logger(subsystem="command", taskid=args.taskid) log.debug("invoked", args=" ".join(sys.argv)) - command = Command(args.backupdir, log) + command = Command(args.backupdir, args.taskid, log) func = getattr(command, args.func) # Pass over to function @@ -527,6 +537,7 @@ def main(): del func_args["verbose"] del func_args["backupdir"] del func_args["logfile"] + del func_args["taskid"] try: log.debug("parsed", func=args.func, func_args=func_args) diff --git a/src/backy/scheduler.py b/src/backy/scheduler.py index 82e16687..b07388df 100644 --- a/src/backy/scheduler.py +++ b/src/backy/scheduler.py @@ -17,7 +17,12 @@ from .backup import Backup from .ext_deps import BACKY_CMD -from .utils import SafeFile, format_datetime_local, time_or_event +from .utils import ( + SafeFile, + format_datetime_local, + generate_taskid, + time_or_event, +) class Job(object): @@ -35,6 +40,7 @@ class Job(object): run_immediately: asyncio.Event errors: int = 0 backoff: int = 0 + taskid: str = "" log: BoundLogger _task: Optional[asyncio.Task] = None @@ -146,7 +152,10 @@ async def run_forever(self): self.backoff = 0 self.log.debug("loop-started") while True: - self.backup.scan() + self.taskid = generate_taskid() + self.log = self.log.bind(job_name=self.name + f"[{self.taskid}]") + + self.backup = Backup(self.path, self.log) next_time, next_tags = self.schedule.next( backy.utils.now(), self.spread, self.backup @@ -188,7 +197,6 @@ async def run_forever(self): async with self.daemon.backup_semaphores[speed]: self.update_status(f"running ({speed})") - self.update_config() await self.run_backup(next_tags) await self.pull_metadata() await self.run_expiry() @@ -216,15 +224,21 @@ async def run_forever(self): self.update_status("finished") async def pull_metadata(self): - await self.backup.pull_metadata(self.daemon.peers) + await self.backup.pull_metadata( + self.daemon.peers, self.taskid, self.log + ) async def push_metadata(self): - await self.backup.push_metadata(self.daemon.peers) + await self.backup.push_metadata( + self.daemon.peers, self.taskid, self.log + ) async def run_backup(self, tags): self.log.info("backup-started", tags=", ".join(tags)) proc = await asyncio.create_subprocess_exec( BACKY_CMD, + "-t", + self.taskid, "-b", self.path, "-l", @@ -260,6 +274,8 @@ async def run_expiry(self): self.log.info("expiry-started") proc = await asyncio.create_subprocess_exec( BACKY_CMD, + "-t", + self.taskid, "-b", self.path, "-l", @@ -294,6 +310,8 @@ async def run_purge(self): self.log.info("purge-started") proc = await asyncio.create_subprocess_exec( BACKY_CMD, + "-t", + self.taskid, "-b", self.path, "-l", diff --git a/src/backy/tests/test_client.py b/src/backy/tests/test_client.py index e2302e74..027509ef 100644 --- a/src/backy/tests/test_client.py +++ b/src/backy/tests/test_client.py @@ -58,10 +58,12 @@ async def test_api_wrong_token(api, token, method, endpoint, aiohttp_client): async def api_client(api, aiohttp_client, log): client = await aiohttp_client( api.app, - headers={hdrs.AUTHORIZATION: "Bearer testtoken"}, + headers={hdrs.AUTHORIZATION: "Bearer testtoken", "taskid": "ABCD"}, raise_for_status=True, ) - api_client = APIClient("", "http://localhost:0", "", log) + api_client = APIClient( + "", "http://localhost:0", "token", "task", log + ) await api_client.session.close() api_client.session = client return api_client @@ -208,9 +210,9 @@ async def test_cli_check_ok(daemon, cli_client): assert ( Ellipsis( """\ -... D - api/new-conn path='/v1/status' query='filter=' -... D - api/auth-passed client='cli' path='/v1/status' query='filter=' -... D - api/request-result client='cli' path='/v1/status' query='filter=' response=... +... D - api/new-conn path='/v1/status' query='filter=' remote_taskid='ABCD' +... D - api/auth-passed client='cli' path='/v1/status' query='filter=' remote_taskid='ABCD' +... D - api/request-result client='cli' path='/v1/status' query='filter=' remote_taskid='ABCD' response=... ... I - CLIClient/check-within-sla num=2 """ ) @@ -233,9 +235,9 @@ async def test_cli_check_too_old(daemon, clock, cli_client, log): assert ( Ellipsis( """\ -... D - api/new-conn path='/v1/status' query='filter=' -... D - api/auth-passed client='cli' path='/v1/status' query='filter=' -... D - api/request-result client='cli' path='/v1/status' query='filter=' response=... +... D - api/new-conn path='/v1/status' query='filter=' remote_taskid='ABCD' +... D - api/auth-passed client='cli' path='/v1/status' query='filter=' remote_taskid='ABCD' +... D - api/request-result client='cli' path='/v1/status' query='filter=' remote_taskid='ABCD' response=... ... C test01 CLIClient/check-sla-violation last_time='2015-08-30 07:06:47+00:00' sla_overdue=172800.0 ... D - CLIClient/check-jobs-failed failed_jobs=1 """ @@ -256,9 +258,9 @@ async def test_cli_check_manual_tags(daemon, cli_client, log): assert ( Ellipsis( """\ -... D - api/new-conn path='/v1/status' query='filter=' -... D - api/auth-passed client='cli' path='/v1/status' query='filter=' -... D - api/request-result client='cli' path='/v1/status' query='filter=' response=... +... D - api/new-conn path='/v1/status' query='filter=' remote_taskid='ABCD' +... D - api/auth-passed client='cli' path='/v1/status' query='filter=' remote_taskid='ABCD' +... D - api/request-result client='cli' path='/v1/status' query='filter=' remote_taskid='ABCD' response=... ... I test01 CLIClient/check-manual-tags manual_tags='manual:test' ... I - CLIClient/check-within-sla num=2 """ diff --git a/src/backy/tests/test_main.py b/src/backy/tests/test_main.py index 30aa1c40..c9250f6b 100644 --- a/src/backy/tests/test_main.py +++ b/src/backy/tests/test_main.py @@ -28,7 +28,7 @@ def test_display_usage(capsys, argv): out, err = capsys.readouterr() assert ( """\ -usage: pytest [-h] [-v] [-l LOGFILE] [-b BACKUPDIR] +usage: pytest [-h] [-v] [-l LOGFILE] [-b BACKUPDIR] [-t TASKID] {client,backup,restore,purge,find,status,nbd-server,\ upgrade,scheduler,distrust,verify,forget,tags,expire} ... @@ -47,7 +47,7 @@ def test_display_help(capsys, argv): assert ( Ellipsis( """\ -usage: pytest [-h] [-v] [-l LOGFILE] [-b BACKUPDIR] +usage: pytest [-h] [-v] [-l LOGFILE] [-b BACKUPDIR] [-t TASKID] {client,backup,restore,purge,find,status,nbd-server,\ upgrade,scheduler,distrust,verify,forget,tags,expire} ... @@ -379,7 +379,7 @@ def do_raise(*args, **kw): def test_commands_wrapper_status(backup, tmpdir, capsys, clock, tz_berlin, log): - commands = backy.main.Command(str(tmpdir), log) + commands = backy.main.Command(str(tmpdir), "AAAA", log) revision = Revision(backup, log, "1") revision.timestamp = backy.utils.now() diff --git a/src/backy/utils.py b/src/backy/utils.py index f79b0851..7bcd3698 100644 --- a/src/backy/utils.py +++ b/src/backy/utils.py @@ -1,4 +1,5 @@ import asyncio +import base64 import contextlib import datetime import hashlib @@ -470,3 +471,7 @@ def format_datetime_local(dt): dt.astimezone(tz).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M:%S"), tz, ) + + +def generate_taskid(): + return base64.b32encode(random.randbytes(3)).decode("utf-8")[:4]