Skip to content

Commit

Permalink
Add in an async task to migrate the data over
Browse files Browse the repository at this point in the history
  • Loading branch information
jbradberry committed Aug 10, 2023
1 parent 6db663e commit 14992ce
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 1 deletion.
53 changes: 53 additions & 0 deletions awx/main/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

# Django
from django.conf import settings # noqa
from django.db import connection
from django.db.models.signals import pre_delete # noqa

# AWX
Expand Down Expand Up @@ -99,6 +100,58 @@
User.add_to_class('accessible_objects', user_accessible_objects)


def convert_jsonfields():
if connection.vendor != 'postgresql':
return

# fmt: off
fields = [
('main_activitystream', 'id', (
'deleted_actor',
'setting',
)),
('main_job', 'unifiedjob_ptr_id', (
'survey_passwords',
)),
('main_joblaunchconfig', 'id', (
'char_prompts',
'survey_passwords',
)),
('main_notification', 'id', (
'body',
)),
('main_unifiedjob', 'id', (
'job_env',
)),
('main_workflowjob', 'unifiedjob_ptr_id', (
'char_prompts',
'survey_passwords',
)),
('main_workflowjobnode', 'id', (
'char_prompts',
'survey_passwords',
)),
]
# fmt: on

with connection.cursor() as cursor:
for table, pkfield, columns in fields:
# Do the renamed old columns still exist? If so, run the task.
old_columns = ','.join(f"'{column}_old'" for column in columns)
cursor.execute(
f"""
select count(1) from information_schema.columns
where
table_name = %s and column_name in ({old_columns});
""",
(table,),
)
if cursor.fetchone()[0]:
from awx.main.tasks.system import migrate_jsonfield

migrate_jsonfield.apply_async([table, pkfield, columns])


def cleanup_created_modified_by(sender, **kwargs):
# work around a bug in django-polymorphic that doesn't properly
# handle cascades for reverse foreign keys on the polymorphic base model
Expand Down
55 changes: 54 additions & 1 deletion awx/main/tasks/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from collections import namedtuple
import functools
import importlib
import itertools
import json
import logging
import os
Expand All @@ -14,7 +15,7 @@

# Django
from django.conf import settings
from django.db import transaction, DatabaseError, IntegrityError
from django.db import connection, transaction, DatabaseError, IntegrityError
from django.db.models.fields.related import ForeignKey
from django.utils.timezone import now, timedelta
from django.utils.encoding import smart_str
Expand Down Expand Up @@ -48,6 +49,7 @@
SmartInventoryMembership,
Job,
HostMetric,
convert_jsonfields,
)
from awx.main.constants import ACTIVE_STATES
from awx.main.dispatch.publish import task
Expand Down Expand Up @@ -86,6 +88,11 @@ def dispatch_startup():
if settings.IS_K8S:
write_receptor_config()

try:
convert_jsonfields()
except Exception:
logger.exception("Failed json field conversion, skipping.")

startup_logger.debug("Syncing Schedules")
for sch in Schedule.objects.all():
try:
Expand Down Expand Up @@ -129,6 +136,52 @@ def inform_cluster_of_shutdown():
logger.exception('Encountered problem with normal shutdown signal.')


@task(queue=get_task_queuename)
def migrate_jsonfield(table, pkfield, columns):
batchsize = 10000
with advisory_lock(f'json_migration_{table}', wait=False) as acquired:
if not acquired:
return

from django.db.migrations.executor import MigrationExecutor

# If Django is currently running migrations, wait until it is done.
while True:
executor = MigrationExecutor(connection)
if not executor.migration_plan(executor.loader.graph.leaf_nodes()):
break
time.sleep(120)

logger.warning(f"Migrating json fields for {table}: {', '.join(columns)}")

with connection.cursor() as cursor:
for i in itertools.count(0, batchsize):
# Are there even any rows in the table beyond this point?
cursor.execute(f"select count(1) from {table} where {pkfield} >= %s limit 1;", (i,))
if not cursor.fetchone()[0]:
break

column_expr = ', '.join(f"{colname} = {colname}_old::jsonb" for colname in columns)
# If any of the old columns have non-null values, the data needs to be cast and copied over.
empty_expr = ' or '.join(f"{colname}_old is not null" for colname in columns)
cursor.execute( # Only clobber the new fields if there is non-null data in the old ones.
f"""
update {table}
set {column_expr}
where {pkfield} >= %s and {pkfield} < %s
and {empty_expr};
""",
(i, i + batchsize),
)
rows = cursor.rowcount
logger.debug(f"Batch {i} to {i + batchsize} copied on {table}, {rows} rows affected.")

column_expr = ', '.join(f"DROP COLUMN {column}_old" for column in columns)
cursor.execute(f"ALTER TABLE {table} {column_expr};")

logger.warning(f"Migration of {table} to jsonb is finished.")


@task(queue=get_task_queuename)
def apply_cluster_membership_policies():
from awx.main.signals import disable_activity_stream
Expand Down

0 comments on commit 14992ce

Please sign in to comment.