Skip to content

Commit

Permalink
Jobs audit. pool.update audit (#14357)
Browse files Browse the repository at this point in the history
  • Loading branch information
themylogin authored Sep 3, 2024
1 parent 1218dea commit 10a978c
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 12 deletions.
25 changes: 23 additions & 2 deletions src/middlewared/middlewared/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ class Job:
pipes: Pipes
logs_fd: None

def __init__(self, middleware, method_name, serviceobj, method, args, options, pipes, on_progress_cb, app):
def __init__(self, middleware, method_name, serviceobj, method, args, options, pipes, on_progress_cb, app,
audit_callback):
self._finished = asyncio.Event()
self.middleware = middleware
self.method_name = method_name
Expand All @@ -285,6 +286,7 @@ def __init__(self, middleware, method_name, serviceobj, method, args, options, p
self.pipes = pipes or Pipes(input_=None, output=None)
self.on_progress_cb = on_progress_cb
self.app = app
self.audit_callback = audit_callback

self.id = None
self.lock = None
Expand All @@ -306,6 +308,8 @@ def __init__(self, middleware, method_name, serviceobj, method, args, options, p
self.loop = self.middleware.loop
self.future = None
self.wrapped = []
self.on_finish_cb = None
self.on_finish_cb_called = False

self.logs_path = None
self.logs_fd = None
Expand Down Expand Up @@ -500,6 +504,7 @@ async def run(self, queue):

queue.release_lock(self)
self._finished.set()
await self.call_on_finish_cb()
send_job_event(self.middleware, 'CHANGED', self, self.__encode__())
if self.options['transient']:
queue.remove(self.id)
Expand All @@ -517,6 +522,8 @@ async def __run_body(self):
if hasattr(self.method, '_pass_app'):
prepend.append(self.app)
prepend.append(self)
if getattr(self.method, 'audit_callback', None):
prepend.append(self.audit_callback)
# Make sure args are not altered during job run
args = prepend + copy.deepcopy(self.args)
if asyncio.iscoroutinefunction(self.method):
Expand Down Expand Up @@ -638,7 +645,7 @@ async def receive(middleware, job_dict, logs):
serviceobj = middleware._services[service_name]
methodobj = getattr(serviceobj, method_name)
job = Job(middleware, job_dict['method'], serviceobj, methodobj, job_dict['arguments'], methodobj._job, None,
None, None)
None, None, None)
job.id = job_dict['id']
job.description = job_dict['description']
if logs is not None:
Expand Down Expand Up @@ -696,6 +703,20 @@ def start_logging(self):
async def logs_fd_write(self, data):
await self.middleware.run_in_thread(self.logs_fd.write, data)

async def set_on_finish_cb(self, cb):
self.on_finish_cb = cb
if self.on_finish_cb_called:
await self.call_on_finish_cb()

async def call_on_finish_cb(self):
if self.on_finish_cb:
try:
await self.on_finish_cb(self)
except Exception:
logger.warning('Failed to run on finish callback', exc_info=True)

self.on_finish_cb_called = True


class JobProgressBuffer:
"""
Expand Down
31 changes: 23 additions & 8 deletions src/middlewared/middlewared/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .apidocs import routes as apidocs_routes
from .common.event_source.manager import EventSourceManager
from .event import Events
from .job import Job, JobsQueue
from .job import Job, JobsQueue, State
from .pipe import Pipes, Pipe
from .restful import parse_credentials, authenticate, create_application, copy_multipart_to_pipe, RESTfulAPI
from .role import ROLES, RoleManager
Expand Down Expand Up @@ -1311,15 +1311,17 @@ def pipe(self, buffered=False):
return Pipe(self, buffered)

def _call_prepare(
self, name, serviceobj, methodobj, params, app=None, audit_callback=None, job_on_progress_cb=None, pipes=None,
in_event_loop: bool = True,
self, name, serviceobj, methodobj, params, *, app=None, audit_callback=None, job_on_progress_cb=None,
pipes=None, in_event_loop: bool = True,
):
"""
:param in_event_loop: Whether we are in the event loop thread.
:return:
"""
audit_callback = audit_callback or (lambda message: None)

params = list(params)

args = []
if hasattr(methodobj, '_pass_app'):
if methodobj._pass_app['require'] and app is None:
Expand All @@ -1330,8 +1332,7 @@ def _call_prepare(
if getattr(methodobj, 'audit_callback', None):
args.append(audit_callback)

if params:
args.extend(params)
args.extend(params)

# If the method is marked as a @job we need to create a new
# entry to keep track of its state.
Expand All @@ -1340,7 +1341,8 @@ def _call_prepare(
if serviceobj._config.process_pool:
job_options['process'] = True
# Create a job instance with required args
job = Job(self, name, serviceobj, methodobj, list(params), job_options, pipes, job_on_progress_cb, app)
job = Job(self, name, serviceobj, methodobj, params, job_options, pipes, job_on_progress_cb, app,
audit_callback)
# Add the job to the queue.
# At this point an `id` is assigned to the job.
# Job might be replaced with an already existing job if `lock_queue_size` is used.
Expand Down Expand Up @@ -1497,11 +1499,23 @@ def can_subscribe(self, app, name):

async def call_with_audit(self, method, serviceobj, methodobj, params, app, **kwargs):
audit_callback_messages = []

async def log_audit_message_for_method(success):
await self.log_audit_message_for_method(method, methodobj, params, app, True, True, success,
audit_callback_messages)

async def job_on_finish_cb(job):
await log_audit_message_for_method(job.state == State.SUCCESS)

success = False
job = None
try:
result = await self._call(method, serviceobj, methodobj, params, app=app,
audit_callback=audit_callback_messages.append, **kwargs)
success = True
if isinstance(result, Job):
job = result
await job.set_on_finish_cb(job_on_finish_cb)

expose_secrets = True
if app and app.authenticated_credentials:
Expand All @@ -1516,8 +1530,9 @@ async def call_with_audit(self, method, serviceobj, methodobj, params, app, **kw

result = self.dump_result(methodobj, result, expose_secrets)
finally:
await self.log_audit_message_for_method(method, methodobj, params, app, True, True, success,
audit_callback_messages)
# If the method is a job, audit message will be logged by `job_on_finish_cb`
if job is None:
await log_audit_message_for_method(success)

return result

Expand Down
5 changes: 3 additions & 2 deletions src/middlewared/middlewared/plugins/pool_/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,9 +688,9 @@ async def do_create(self, job, data):
('rm', {'name': 'deduplication'}),
('rm', {'name': 'checksum'}),
('edit', {'name': 'topology', 'method': lambda x: setattr(x, 'update', True)}),
), audit='Pool update')
), audit='Pool update', audit_callback=True)
@job(lock='pool_createupdate')
async def do_update(self, job, id_, data):
async def do_update(self, job, audit_callback, id_, data):
"""
Update pool of `id`, adding the new topology.
Expand All @@ -715,6 +715,7 @@ async def do_update(self, job, id_, data):
}
"""
pool = await self.get_instance(id_)
audit_callback(pool['name'])

disks = vdevs = None
if 'topology' in data:
Expand Down
39 changes: 39 additions & 0 deletions tests/api2/test_audit_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import pytest

from middlewared.test.integration.assets.pool import another_pool
from middlewared.test.integration.utils import call
from middlewared.test.integration.utils.audit import expect_audit_log


def test_pool_update_audit_success():
with another_pool() as pool:
params = [pool['id'], {'autotrim': 'ON'}]
with expect_audit_log([{
'event_data': {
'authenticated': True,
'authorized': True,
'method': 'pool.update',
'params': params,
'description': f'Pool update test',
},
'success': True,
}]):
call('pool.update', *params, job=True)


def test_pool_update_audit_error():
with another_pool() as pool:
params = [pool['id'], {'topology': {'spares': ['nonexistent']}}]

with expect_audit_log([{
'event_data': {
'authenticated': True,
'authorized': True,
'method': 'pool.update',
'params': params,
'description': f'Pool update test',
},
'success': False,
}]):
with pytest.raises(Exception):
call('pool.update', *params, job=True)

0 comments on commit 10a978c

Please sign in to comment.