Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cards] Fix config issue permanently. #2205

Merged
merged 2 commits into from
Jan 15, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 57 additions & 20 deletions metaflow/plugins/cards/card_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ class CardDecorator(StepDecorator):

card_creator = None

_config_values = None

_config_file_name = None

task_finished_decos = 0

def __init__(self, *args, **kwargs):
super(CardDecorator, self).__init__(*args, **kwargs)
self._task_datastore = None
Expand Down Expand Up @@ -106,6 +112,25 @@ def _set_card_counts_per_step(cls, step_name, total_count):
def _increment_step_counter(cls):
cls.step_counter += 1

@classmethod
def _increment_completed_counter(cls):
cls.task_finished_decos += 1

@classmethod
def _set_config_values(cls, config_values):
cls._config_values = config_values

@classmethod
def _set_config_file_name(cls, flow):
# Only create a config file from the very first card decorator.
if cls._config_values and not cls._config_file_name:
with tempfile.NamedTemporaryFile(
mode="w", encoding="utf-8", delete=False
) as config_file:
config_value = dump_config_values(flow)
json.dump(config_value, config_file)
cls._config_file_name = config_file.name

def step_init(
self, flow, graph, step_name, decorators, environment, flow_datastore, logger
):
Expand All @@ -116,11 +141,13 @@ def step_init(

# We check for configuration options. We do this here before they are
# converted to properties.
self._config_values = [
(config.name, ConfigInput.make_key_name(config.name))
for _, config in flow._get_parameters()
if config.IS_CONFIG_PARAMETER
]
self._set_config_values(
[
(config.name, ConfigInput.make_key_name(config.name))
for _, config in flow._get_parameters()
if config.IS_CONFIG_PARAMETER
]
)

self.card_options = self.attributes["options"]

Expand Down Expand Up @@ -159,15 +186,11 @@ def task_pre_step(

# If we have configs, we need to dump them to a file so we can re-use them
# when calling the card creation subprocess.
if self._config_values:
with tempfile.NamedTemporaryFile(
mode="w", encoding="utf-8", delete=False
) as config_file:
config_value = dump_config_values(flow)
json.dump(config_value, config_file)
self._config_file_name = config_file.name
else:
self._config_file_name = None
# Since a step can contain multiple card decorators, and all the card creation processes
# will reference the same config file (because of how the CardCreator is created (only single class instance)),
# we need to ensure that a single config file is being referenced for all card create commands.
# This config file will be removed when the last card decorator has finished creating its card.
self._set_config_file_name(flow)

card_type = self.attributes["type"]
card_class = get_card_class(card_type)
Expand Down Expand Up @@ -246,12 +269,14 @@ def task_finished(
self.card_creator.create(mode="render", final=True, **create_options)
self.card_creator.create(mode="refresh", final=True, **create_options)

# Unlink the config file if it exists
if self._config_file_name:
try:
os.unlink(self._config_file_name)
except Exception as e:
pass
Comment on lines -249 to -254
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier we were naively removing the config file when the same config file is referenced for all CardCreators. So this broke configs when many cards are present.

self._increment_completed_counter()
if self.task_finished_decos == self.total_decos_on_step[step_name]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would move this to a function (it's called twice).

# Unlink the config file if it exists
if self._config_file_name:
try:
os.unlink(self._config_file_name)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we only remove the config when all the decorators are either done with task_finished or with task_exception

except Exception as e:
pass

@staticmethod
def _options(mapping):
Expand Down Expand Up @@ -286,3 +311,15 @@ def _create_top_level_args(self, flow):
top_level_options["local-config-file"] = self._config_file_name

return list(self._options(top_level_options))

def task_exception(
self, exception, step_name, flow, graph, retry_count, max_user_code_retries
):
self._increment_completed_counter()
if self.task_finished_decos == self.total_decos_on_step[step_name]:
# Unlink the config file if it exists
if self._config_file_name:
try:
os.unlink(self._config_file_name)
except Exception as e:
pass
Loading