Skip to content

Commit

Permalink
Merge branch 'devel' into ulimits_to_docker-compose
Browse files Browse the repository at this point in the history
  • Loading branch information
Andersson007 authored Jul 31, 2023
2 parents c758b12 + 35a576f commit 327a201
Show file tree
Hide file tree
Showing 53 changed files with 397 additions and 351 deletions.
4 changes: 2 additions & 2 deletions .github/pr_labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@

"dependencies":
- any: ["awx/ui/package.json"]
- any: ["awx/requirements/*.txt"]
- any: ["awx/requirements/requirements.in"]
- any: ["requirements/*.txt"]
- any: ["requirements/requirements.in"]
3 changes: 2 additions & 1 deletion awx/api/generics.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ def finalize_response(self, request, response, *args, **kwargs):

response = super(APIView, self).finalize_response(request, response, *args, **kwargs)
time_started = getattr(self, 'time_started', None)
response['X-API-Product-Version'] = get_awx_version()
if request.user.is_authenticated:
response['X-API-Product-Version'] = get_awx_version()
response['X-API-Product-Name'] = server_product_name()

response['X-API-Node'] = settings.CLUSTER_HOST_ID
Expand Down
212 changes: 71 additions & 141 deletions awx/main/scheduler/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
InventoryUpdate,
Job,
Project,
ProjectUpdate,
UnifiedJob,
WorkflowApproval,
WorkflowJob,
Expand Down Expand Up @@ -165,7 +164,6 @@ def spawn_workflow_graph_jobs(self):
logger.warning("Workflow manager has reached time out while processing running workflows, exiting loop early")
ScheduleWorkflowManager().schedule()
# Do not process any more workflow jobs. Stop here.
# Maybe we should schedule another WorkflowManager run
break
dag = WorkflowDAG(workflow_job)
status_changed = False
Expand All @@ -180,8 +178,8 @@ def spawn_workflow_graph_jobs(self):
workflow_job.save(update_fields=['status', 'start_args'])
status_changed = True
else:
workflow_nodes = dag.mark_dnr_nodes()
WorkflowJobNode.objects.bulk_update(workflow_nodes, ['do_not_run'])
dnr_nodes = dag.mark_dnr_nodes()
WorkflowJobNode.objects.bulk_update(dnr_nodes, ['do_not_run'])
# If workflow is now done, we do special things to mark it as done.
is_done = dag.is_workflow_done()
if is_done:
Expand Down Expand Up @@ -261,6 +259,7 @@ def spawn_workflow_graph_jobs(self):
job.status = 'failed'
job.save(update_fields=['status', 'job_explanation'])
job.websocket_emit_status('failed')
ScheduleWorkflowManager().schedule()

# TODO: should we emit a status on the socket here similar to tasks.py awx_periodic_scheduler() ?
# emit_websocket_notification('/socket.io/jobs', '', dict(id=))
Expand All @@ -281,184 +280,115 @@ def _schedule(self):
class DependencyManager(TaskBase):
def __init__(self):
super().__init__(prefix="dependency_manager")
self.all_projects = {}
self.all_inventory_sources = {}

def create_project_update(self, task, project_id=None):
if project_id is None:
project_id = task.project_id
project_task = Project.objects.get(id=project_id).create_project_update(_eager_fields=dict(launch_type='dependency'))

# Project created 1 seconds behind
project_task.created = task.created - timedelta(seconds=1)
project_task.status = 'pending'
project_task.save()
logger.debug('Spawned {} as dependency of {}'.format(project_task.log_format, task.log_format))
return project_task

def create_inventory_update(self, task, inventory_source_task):
inventory_task = InventorySource.objects.get(id=inventory_source_task.id).create_inventory_update(_eager_fields=dict(launch_type='dependency'))

inventory_task.created = task.created - timedelta(seconds=2)
inventory_task.status = 'pending'
inventory_task.save()
logger.debug('Spawned {} as dependency of {}'.format(inventory_task.log_format, task.log_format))

return inventory_task

def add_dependencies(self, task, dependencies):
with disable_activity_stream():
task.dependent_jobs.add(*dependencies)

def get_inventory_source_tasks(self):
def cache_projects_and_sources(self, task_list):
project_ids = set()
inventory_ids = set()
for task in self.all_tasks:
for task in task_list:
if isinstance(task, Job):
inventory_ids.add(task.inventory_id)
self.all_inventory_sources = [invsrc for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True)]

def get_latest_inventory_update(self, inventory_source):
latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created")
if not latest_inventory_update.exists():
return None
return latest_inventory_update.first()

def should_update_inventory_source(self, job, latest_inventory_update):
now = tz_now()

if latest_inventory_update is None:
return True
if task.project_id:
project_ids.add(task.project_id)
if task.inventory_id:
inventory_ids.add(task.inventory_id)
elif isinstance(task, InventoryUpdate):
if task.inventory_source and task.inventory_source.source_project_id:
project_ids.add(task.inventory_source.source_project_id)

for proj in Project.objects.filter(id__in=project_ids, scm_update_on_launch=True):
self.all_projects[proj.id] = proj

for invsrc in InventorySource.objects.filter(inventory_id__in=inventory_ids, update_on_launch=True):
self.all_inventory_sources.setdefault(invsrc.inventory_id, [])
self.all_inventory_sources[invsrc.inventory_id].append(invsrc)

@staticmethod
def should_update_again(update, cache_timeout):
'''
If there's already a inventory update utilizing this job that's about to run
then we don't need to create one
If it has never updated, we need to update
If there is already an update in progress then we do not need to a new create one
If the last update failed, we always need to try and update again
If current time is more than cache_timeout after last update, then we need a new one
'''
if latest_inventory_update.status in ['waiting', 'pending', 'running']:
return False

timeout_seconds = timedelta(seconds=latest_inventory_update.inventory_source.update_cache_timeout)
if (latest_inventory_update.finished + timeout_seconds) < now:
return True
if latest_inventory_update.inventory_source.update_on_launch is True and latest_inventory_update.status in ['failed', 'canceled', 'error']:
return True
return False

def get_latest_project_update(self, project_id):
latest_project_update = ProjectUpdate.objects.filter(project=project_id, job_type='check').order_by("-created")
if not latest_project_update.exists():
return None
return latest_project_update.first()

def should_update_related_project(self, job, latest_project_update):
now = tz_now()

if latest_project_update is None:
if (update is None) or (update.status in ['failed', 'canceled', 'error']):
return True

if latest_project_update.status in ['failed', 'canceled']:
return True

'''
If there's already a project update utilizing this job that's about to run
then we don't need to create one
'''
if latest_project_update.status in ['waiting', 'pending', 'running']:
if update.status in ['waiting', 'pending', 'running']:
return False

'''
If the latest project update has a created time == job_created_time-1
then consider the project update found. This is so we don't enter an infinite loop
of updating the project when cache timeout is 0.
'''
if (
latest_project_update.project.scm_update_cache_timeout == 0
and latest_project_update.launch_type == 'dependency'
and latest_project_update.created == job.created - timedelta(seconds=1)
):
return False
'''
Normal Cache Timeout Logic
'''
timeout_seconds = timedelta(seconds=latest_project_update.project.scm_update_cache_timeout)
if (latest_project_update.finished + timeout_seconds) < now:
return True
return False
return bool(((update.finished + timedelta(seconds=cache_timeout))) < tz_now())

def get_or_create_project_update(self, project_id):
project = self.all_projects.get(project_id, None)
if project is not None:
latest_project_update = project.project_updates.filter(job_type='check').order_by("-created").first()
if self.should_update_again(latest_project_update, project.scm_update_cache_timeout):
project_task = project.create_project_update(_eager_fields=dict(launch_type='dependency'))
project_task.signal_start()
return [project_task]
else:
return [latest_project_update]
return []

def gen_dep_for_job(self, task):
created_dependencies = []
dependencies = []
# TODO: Can remove task.project None check after scan-job-default-playbook is removed
if task.project is not None and task.project.scm_update_on_launch is True:
latest_project_update = self.get_latest_project_update(task.project_id)
if self.should_update_related_project(task, latest_project_update):
latest_project_update = self.create_project_update(task)
created_dependencies.append(latest_project_update)
dependencies.append(latest_project_update)

# Inventory created 2 seconds behind job
dependencies = self.get_or_create_project_update(task.project_id)

try:
start_args = json.loads(decrypt_field(task, field_name="start_args"))
except ValueError:
start_args = dict()
# generator for inventory sources related to this task
task_inv_sources = (invsrc for invsrc in self.all_inventory_sources if invsrc.inventory_id == task.inventory_id)
for inventory_source in task_inv_sources:
# generator for update-on-launch inventory sources related to this task
for inventory_source in self.all_inventory_sources.get(task.inventory_id, []):
if "inventory_sources_already_updated" in start_args and inventory_source.id in start_args['inventory_sources_already_updated']:
continue
if not inventory_source.update_on_launch:
continue
latest_inventory_update = self.get_latest_inventory_update(inventory_source)
if self.should_update_inventory_source(task, latest_inventory_update):
inventory_task = self.create_inventory_update(task, inventory_source)
created_dependencies.append(inventory_task)
latest_inventory_update = inventory_source.inventory_updates.order_by("-created").first()
if self.should_update_again(latest_inventory_update, inventory_source.update_cache_timeout):
inventory_task = inventory_source.create_inventory_update(_eager_fields=dict(launch_type='dependency'))
inventory_task.signal_start()
dependencies.append(inventory_task)
else:
dependencies.append(latest_inventory_update)

if dependencies:
self.add_dependencies(task, dependencies)

return created_dependencies
return dependencies

def gen_dep_for_inventory_update(self, inventory_task):
created_dependencies = []
if inventory_task.source == "scm":
invsrc = inventory_task.inventory_source
if not invsrc.source_project.scm_update_on_launch:
return created_dependencies

latest_src_project_update = self.get_latest_project_update(invsrc.source_project_id)
if self.should_update_related_project(inventory_task, latest_src_project_update):
latest_src_project_update = self.create_project_update(inventory_task, project_id=invsrc.source_project_id)
created_dependencies.append(latest_src_project_update)
self.add_dependencies(inventory_task, [latest_src_project_update])
latest_src_project_update.scm_inventory_updates.add(inventory_task)
return created_dependencies
if invsrc:
return self.get_or_create_project_update(invsrc.source_project_id)
return []

@timeit
def generate_dependencies(self, undeped_tasks):
created_dependencies = []
dependencies = []
self.cache_projects_and_sources(undeped_tasks)
for task in undeped_tasks:
task.log_lifecycle("acknowledged")
if type(task) is Job:
created_dependencies += self.gen_dep_for_job(task)
job_deps = self.gen_dep_for_job(task)
elif type(task) is InventoryUpdate:
created_dependencies += self.gen_dep_for_inventory_update(task)
job_deps = self.gen_dep_for_inventory_update(task)
else:
continue
if job_deps:
dependencies += job_deps
with disable_activity_stream():
task.dependent_jobs.add(*dependencies)
logger.debug(f'Linked {[dep.log_format for dep in dependencies]} as dependencies of {task.log_format}')

UnifiedJob.objects.filter(pk__in=[task.pk for task in undeped_tasks]).update(dependencies_processed=True)

return created_dependencies

def process_tasks(self):
deps = self.generate_dependencies(self.all_tasks)
self.generate_dependencies(deps)
self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(self.all_tasks) + len(deps))
return dependencies

@timeit
def _schedule(self):
self.get_tasks(dict(status__in=["pending"], dependencies_processed=False))

if len(self.all_tasks) > 0:
self.get_inventory_source_tasks()
self.process_tasks()
deps = self.generate_dependencies(self.all_tasks)
undeped_deps = [dep for dep in deps if dep.dependencies_processed is False]
self.generate_dependencies(undeped_deps)
self.subsystem_metrics.inc(f"{self.prefix}_pending_processed", len(self.all_tasks) + len(undeped_deps))
ScheduleTaskManager().schedule()


Expand Down
5 changes: 1 addition & 4 deletions awx/main/tasks/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -843,10 +843,7 @@ def delete_inventory(inventory_id, user_id, retries=5):
user = None
with ignore_inventory_computed_fields(), ignore_inventory_group_removal(), impersonate(user):
try:
i = Inventory.objects.get(id=inventory_id)
for host in i.hosts.iterator():
host.job_events_as_primary_host.update(host=None)
i.delete()
Inventory.objects.get(id=inventory_id).delete()
emit_channel_notification('inventories-status_changed', {'group_name': 'inventories', 'inventory_id': inventory_id, 'status': 'deleted'})
logger.debug('Deleted inventory {} as user {}.'.format(inventory_id, user_id))
except Inventory.DoesNotExist:
Expand Down
Loading

0 comments on commit 327a201

Please sign in to comment.