From 53a9f107cae5f7eeb703d0f21fc2734584fc5ee4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Moura?= Date: Fri, 18 Oct 2024 08:32:06 -0300 Subject: [PATCH] Avoiding exceptions --- src/crewai/telemetry/telemetry.py | 890 +++++++++++++++--------------- 1 file changed, 442 insertions(+), 448 deletions(-) diff --git a/src/crewai/telemetry/telemetry.py b/src/crewai/telemetry/telemetry.py index f6a018f274..a08ccd96f6 100644 --- a/src/crewai/telemetry/telemetry.py +++ b/src/crewai/telemetry/telemetry.py @@ -65,7 +65,7 @@ def __init__(self): self.provider.add_span_processor(processor) self.ready = True - except BaseException as e: + except Exception as e: if isinstance( e, (SystemExit, KeyboardInterrupt, GeneratorExit, asyncio.CancelledError), @@ -83,404 +83,33 @@ def set_tracer(self): self.ready = False self.trace_set = False + def _safe_telemetry_operation(self, operation): + if not self.ready: + return + try: + operation() + except Exception: + pass + def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None): """Records the creation of a crew.""" - if self.ready: - try: - tracer = trace.get_tracer("crewai.telemetry") - span = tracer.start_span("Crew Created") - self._add_attribute( - span, - "crewai_version", - pkg_resources.get_distribution("crewai").version, - ) - self._add_attribute(span, "python_version", platform.python_version()) - self._add_attribute(span, "crew_key", crew.key) - self._add_attribute(span, "crew_id", str(crew.id)) - self._add_attribute(span, "crew_process", crew.process) - self._add_attribute(span, "crew_memory", crew.memory) - self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks)) - self._add_attribute(span, "crew_number_of_agents", len(crew.agents)) - if crew.share_crew: - self._add_attribute( - span, - "crew_agents", - json.dumps( - [ - { - "key": agent.key, - "id": str(agent.id), - "role": agent.role, - "goal": agent.goal, - "backstory": agent.backstory, - "verbose?": agent.verbose, - "max_iter": agent.max_iter, - "max_rpm": agent.max_rpm, - "i18n": agent.i18n.prompt_file, - "function_calling_llm": ( - agent.function_calling_llm.model - if agent.function_calling_llm - else "" - ), - "llm": agent.llm.model, - "delegation_enabled?": agent.allow_delegation, - "allow_code_execution?": agent.allow_code_execution, - "max_retry_limit": agent.max_retry_limit, - "tools_names": [ - tool.name.casefold() - for tool in agent.tools or [] - ], - } - for agent in crew.agents - ] - ), - ) - self._add_attribute( - span, - "crew_tasks", - json.dumps( - [ - { - "key": task.key, - "id": str(task.id), - "description": task.description, - "expected_output": task.expected_output, - "async_execution?": task.async_execution, - "human_input?": task.human_input, - "agent_role": ( - task.agent.role if task.agent else "None" - ), - "agent_key": task.agent.key if task.agent else None, - "context": ( - [task.description for task in task.context] - if task.context - else None - ), - "tools_names": [ - tool.name.casefold() - for tool in task.tools or [] - ], - } - for task in crew.tasks - ] - ), - ) - self._add_attribute(span, "platform", platform.platform()) - self._add_attribute(span, "platform_release", platform.release()) - self._add_attribute(span, "platform_system", platform.system()) - self._add_attribute(span, "platform_version", platform.version()) - self._add_attribute(span, "cpus", os.cpu_count()) - self._add_attribute( - span, "crew_inputs", json.dumps(inputs) if inputs else None - ) - else: - self._add_attribute( - span, - "crew_agents", - json.dumps( - [ - { - "key": agent.key, - "id": str(agent.id), - "role": agent.role, - "verbose?": agent.verbose, - "max_iter": agent.max_iter, - "max_rpm": agent.max_rpm, - "function_calling_llm": ( - agent.function_calling_llm.model - if agent.function_calling_llm - else "" - ), - "llm": agent.llm.model, - "delegation_enabled?": agent.allow_delegation, - "allow_code_execution?": agent.allow_code_execution, - "max_retry_limit": agent.max_retry_limit, - "tools_names": [ - tool.name.casefold() - for tool in agent.tools or [] - ], - } - for agent in crew.agents - ] - ), - ) - self._add_attribute( - span, - "crew_tasks", - json.dumps( - [ - { - "key": task.key, - "id": str(task.id), - "async_execution?": task.async_execution, - "human_input?": task.human_input, - "agent_role": ( - task.agent.role if task.agent else "None" - ), - "agent_key": task.agent.key if task.agent else None, - "tools_names": [ - tool.name.casefold() - for tool in task.tools or [] - ], - } - for task in crew.tasks - ] - ), - ) - span.set_status(Status(StatusCode.OK)) - span.end() - except Exception: - pass - - def task_started(self, crew: Crew, task: Task) -> Span | None: - """Records task started in a crew.""" - if self.ready: - try: - tracer = trace.get_tracer("crewai.telemetry") - - created_span = tracer.start_span("Task Created") - - self._add_attribute(created_span, "crew_key", crew.key) - self._add_attribute(created_span, "crew_id", str(crew.id)) - self._add_attribute(created_span, "task_key", task.key) - self._add_attribute(created_span, "task_id", str(task.id)) - - if crew.share_crew: - self._add_attribute( - created_span, "formatted_description", task.description - ) - self._add_attribute( - created_span, "formatted_expected_output", task.expected_output - ) - - created_span.set_status(Status(StatusCode.OK)) - created_span.end() - - span = tracer.start_span("Task Execution") - - self._add_attribute(span, "crew_key", crew.key) - self._add_attribute(span, "crew_id", str(crew.id)) - self._add_attribute(span, "task_key", task.key) - self._add_attribute(span, "task_id", str(task.id)) - - if crew.share_crew: - self._add_attribute(span, "formatted_description", task.description) - self._add_attribute( - span, "formatted_expected_output", task.expected_output - ) - - return span - except Exception: - pass - - return None - - def task_ended(self, span: Span, task: Task, crew: Crew): - """Records task execution in a crew.""" - if self.ready: - try: - if crew.share_crew: - self._add_attribute( - span, - "task_output", - task.output.raw if task.output else "", - ) - - span.set_status(Status(StatusCode.OK)) - span.end() - except Exception: - pass - - def tool_repeated_usage(self, llm: Any, tool_name: str, attempts: int): - """Records the repeated usage 'error' of a tool by an agent.""" - if self.ready: - try: - tracer = trace.get_tracer("crewai.telemetry") - span = tracer.start_span("Tool Repeated Usage") - self._add_attribute( - span, - "crewai_version", - pkg_resources.get_distribution("crewai").version, - ) - self._add_attribute(span, "tool_name", tool_name) - self._add_attribute(span, "attempts", attempts) - if llm: - self._add_attribute(span, "llm", llm.model) - span.set_status(Status(StatusCode.OK)) - span.end() - except Exception: - pass - - def tool_usage(self, llm: Any, tool_name: str, attempts: int): - """Records the usage of a tool by an agent.""" - if self.ready: - try: - tracer = trace.get_tracer("crewai.telemetry") - span = tracer.start_span("Tool Usage") - self._add_attribute( - span, - "crewai_version", - pkg_resources.get_distribution("crewai").version, - ) - self._add_attribute(span, "tool_name", tool_name) - self._add_attribute(span, "attempts", attempts) - if llm: - self._add_attribute(span, "llm", llm.model) - span.set_status(Status(StatusCode.OK)) - span.end() - except Exception: - pass - - def tool_usage_error(self, llm: Any): - """Records the usage of a tool by an agent.""" - if self.ready: - try: - tracer = trace.get_tracer("crewai.telemetry") - span = tracer.start_span("Tool Usage Error") - self._add_attribute( - span, - "crewai_version", - pkg_resources.get_distribution("crewai").version, - ) - if llm: - self._add_attribute(span, "llm", llm.model) - span.set_status(Status(StatusCode.OK)) - span.end() - except Exception: - pass - - def individual_test_result_span( - self, crew: Crew, quality: float, exec_time: int, model_name: str - ): - if self.ready: - try: - tracer = trace.get_tracer("crewai.telemetry") - span = tracer.start_span("Crew Individual Test Result") - - self._add_attribute( - span, - "crewai_version", - pkg_resources.get_distribution("crewai").version, - ) - self._add_attribute(span, "crew_key", crew.key) - self._add_attribute(span, "crew_id", str(crew.id)) - self._add_attribute(span, "quality", str(quality)) - self._add_attribute(span, "exec_time", str(exec_time)) - self._add_attribute(span, "model_name", model_name) - span.set_status(Status(StatusCode.OK)) - span.end() - except Exception: - pass - - def test_execution_span( - self, - crew: Crew, - iterations: int, - inputs: dict[str, Any] | None, - model_name: str, - ): - if self.ready: - try: - tracer = trace.get_tracer("crewai.telemetry") - span = tracer.start_span("Crew Test Execution") - - self._add_attribute( - span, - "crewai_version", - pkg_resources.get_distribution("crewai").version, - ) - self._add_attribute(span, "crew_key", crew.key) - self._add_attribute(span, "crew_id", str(crew.id)) - self._add_attribute(span, "iterations", str(iterations)) - self._add_attribute(span, "model_name", model_name) - - if crew.share_crew: - self._add_attribute( - span, "inputs", json.dumps(inputs) if inputs else None - ) - - span.set_status(Status(StatusCode.OK)) - span.end() - except Exception: - pass - - def deploy_signup_error_span(self): - if self.ready: - try: - tracer = trace.get_tracer("crewai.telemetry") - span = tracer.start_span("Deploy Signup Error") - span.set_status(Status(StatusCode.OK)) - span.end() - except Exception: - pass - - def start_deployment_span(self, uuid: Optional[str] = None): - if self.ready: - try: - tracer = trace.get_tracer("crewai.telemetry") - span = tracer.start_span("Start Deployment") - if uuid: - self._add_attribute(span, "uuid", uuid) - span.set_status(Status(StatusCode.OK)) - span.end() - except Exception: - pass - - def create_crew_deployment_span(self): - if self.ready: - try: - tracer = trace.get_tracer("crewai.telemetry") - span = tracer.start_span("Create Crew Deployment") - span.set_status(Status(StatusCode.OK)) - span.end() - except Exception: - pass - - def get_crew_logs_span(self, uuid: Optional[str], log_type: str = "deployment"): - if self.ready: - try: - tracer = trace.get_tracer("crewai.telemetry") - span = tracer.start_span("Get Crew Logs") - self._add_attribute(span, "log_type", log_type) - if uuid: - self._add_attribute(span, "uuid", uuid) - span.set_status(Status(StatusCode.OK)) - span.end() - except Exception: - pass - - def remove_crew_span(self, uuid: Optional[str] = None): - if self.ready: - try: - tracer = trace.get_tracer("crewai.telemetry") - span = tracer.start_span("Remove Crew") - if uuid: - self._add_attribute(span, "uuid", uuid) - span.set_status(Status(StatusCode.OK)) - span.end() - except Exception: - pass - - def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None): - """Records the complete execution of a crew. - This is only collected if the user has opted-in to share the crew. - """ - self.crew_creation(crew, inputs) - if (self.ready) and (crew.share_crew): - try: - tracer = trace.get_tracer("crewai.telemetry") - span = tracer.start_span("Crew Execution") - self._add_attribute( - span, - "crewai_version", - pkg_resources.get_distribution("crewai").version, - ) - self._add_attribute(span, "crew_key", crew.key) - self._add_attribute(span, "crew_id", str(crew.id)) - self._add_attribute( - span, "crew_inputs", json.dumps(inputs) if inputs else None - ) + def operation(): + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Crew Created") + self._add_attribute( + span, + "crewai_version", + pkg_resources.get_distribution("crewai").version, + ) + self._add_attribute(span, "python_version", platform.python_version()) + self._add_attribute(span, "crew_key", crew.key) + self._add_attribute(span, "crew_id", str(crew.id)) + self._add_attribute(span, "crew_process", crew.process) + self._add_attribute(span, "crew_memory", crew.memory) + self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks)) + self._add_attribute(span, "crew_number_of_agents", len(crew.agents)) + if crew.share_crew: self._add_attribute( span, "crew_agents", @@ -496,8 +125,15 @@ def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None): "max_iter": agent.max_iter, "max_rpm": agent.max_rpm, "i18n": agent.i18n.prompt_file, + "function_calling_llm": ( + agent.function_calling_llm.model + if agent.function_calling_llm + else "" + ), "llm": agent.llm.model, "delegation_enabled?": agent.allow_delegation, + "allow_code_execution?": agent.allow_code_execution, + "max_retry_limit": agent.max_retry_limit, "tools_names": [ tool.name.casefold() for tool in agent.tools or [] ], @@ -512,12 +148,15 @@ def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None): json.dumps( [ { + "key": task.key, "id": str(task.id), "description": task.description, "expected_output": task.expected_output, "async_execution?": task.async_execution, "human_input?": task.human_input, - "agent_role": task.agent.role if task.agent else "None", + "agent_role": ( + task.agent.role if task.agent else "None" + ), "agent_key": task.agent.key if task.agent else None, "context": ( [task.description for task in task.context] @@ -532,78 +171,433 @@ def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None): ] ), ) - return span - except Exception: - pass - - def end_crew(self, crew, final_string_output): - if (self.ready) and (crew.share_crew): - try: + self._add_attribute(span, "platform", platform.platform()) + self._add_attribute(span, "platform_release", platform.release()) + self._add_attribute(span, "platform_system", platform.system()) + self._add_attribute(span, "platform_version", platform.version()) + self._add_attribute(span, "cpus", os.cpu_count()) self._add_attribute( - crew._execution_span, - "crewai_version", - pkg_resources.get_distribution("crewai").version, + span, "crew_inputs", json.dumps(inputs) if inputs else None ) + else: self._add_attribute( - crew._execution_span, "crew_output", final_string_output + span, + "crew_agents", + json.dumps( + [ + { + "key": agent.key, + "id": str(agent.id), + "role": agent.role, + "verbose?": agent.verbose, + "max_iter": agent.max_iter, + "max_rpm": agent.max_rpm, + "function_calling_llm": ( + agent.function_calling_llm.model + if agent.function_calling_llm + else "" + ), + "llm": agent.llm.model, + "delegation_enabled?": agent.allow_delegation, + "allow_code_execution?": agent.allow_code_execution, + "max_retry_limit": agent.max_retry_limit, + "tools_names": [ + tool.name.casefold() for tool in agent.tools or [] + ], + } + for agent in crew.agents + ] + ), ) self._add_attribute( - crew._execution_span, - "crew_tasks_output", + span, + "crew_tasks", json.dumps( [ { + "key": task.key, "id": str(task.id), - "description": task.description, - "output": task.output.raw_output, + "async_execution?": task.async_execution, + "human_input?": task.human_input, + "agent_role": ( + task.agent.role if task.agent else "None" + ), + "agent_key": task.agent.key if task.agent else None, + "tools_names": [ + tool.name.casefold() for tool in task.tools or [] + ], } for task in crew.tasks ] ), ) - crew._execution_span.set_status(Status(StatusCode.OK)) - crew._execution_span.end() - except Exception: - pass + span.set_status(Status(StatusCode.OK)) + span.end() + + self._safe_telemetry_operation(operation) + + def task_started(self, crew: Crew, task: Task) -> Span | None: + """Records task started in a crew.""" + + def operation(): + tracer = trace.get_tracer("crewai.telemetry") + + created_span = tracer.start_span("Task Created") + + self._add_attribute(created_span, "crew_key", crew.key) + self._add_attribute(created_span, "crew_id", str(crew.id)) + self._add_attribute(created_span, "task_key", task.key) + self._add_attribute(created_span, "task_id", str(task.id)) + + if crew.share_crew: + self._add_attribute( + created_span, "formatted_description", task.description + ) + self._add_attribute( + created_span, "formatted_expected_output", task.expected_output + ) + + created_span.set_status(Status(StatusCode.OK)) + created_span.end() + + span = tracer.start_span("Task Execution") + + self._add_attribute(span, "crew_key", crew.key) + self._add_attribute(span, "crew_id", str(crew.id)) + self._add_attribute(span, "task_key", task.key) + self._add_attribute(span, "task_id", str(task.id)) + + if crew.share_crew: + self._add_attribute(span, "formatted_description", task.description) + self._add_attribute( + span, "formatted_expected_output", task.expected_output + ) + + return span + + return self._safe_telemetry_operation(operation) + + def task_ended(self, span: Span, task: Task, crew: Crew): + """Records task execution in a crew.""" + + def operation(): + if crew.share_crew: + self._add_attribute( + span, + "task_output", + task.output.raw if task.output else "", + ) + + span.set_status(Status(StatusCode.OK)) + span.end() + + self._safe_telemetry_operation(operation) + + def tool_repeated_usage(self, llm: Any, tool_name: str, attempts: int): + """Records the repeated usage 'error' of a tool by an agent.""" + + def operation(): + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Tool Repeated Usage") + self._add_attribute( + span, + "crewai_version", + pkg_resources.get_distribution("crewai").version, + ) + self._add_attribute(span, "tool_name", tool_name) + self._add_attribute(span, "attempts", attempts) + if llm: + self._add_attribute(span, "llm", llm.model) + span.set_status(Status(StatusCode.OK)) + span.end() + + self._safe_telemetry_operation(operation) + + def tool_usage(self, llm: Any, tool_name: str, attempts: int): + """Records the usage of a tool by an agent.""" + + def operation(): + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Tool Usage") + self._add_attribute( + span, + "crewai_version", + pkg_resources.get_distribution("crewai").version, + ) + self._add_attribute(span, "tool_name", tool_name) + self._add_attribute(span, "attempts", attempts) + if llm: + self._add_attribute(span, "llm", llm.model) + span.set_status(Status(StatusCode.OK)) + span.end() + + self._safe_telemetry_operation(operation) + + def tool_usage_error(self, llm: Any): + """Records the usage of a tool by an agent.""" + + def operation(): + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Tool Usage Error") + self._add_attribute( + span, + "crewai_version", + pkg_resources.get_distribution("crewai").version, + ) + if llm: + self._add_attribute(span, "llm", llm.model) + span.set_status(Status(StatusCode.OK)) + span.end() + + self._safe_telemetry_operation(operation) + + def individual_test_result_span( + self, crew: Crew, quality: float, exec_time: int, model_name: str + ): + def operation(): + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Crew Individual Test Result") + + self._add_attribute( + span, + "crewai_version", + pkg_resources.get_distribution("crewai").version, + ) + self._add_attribute(span, "crew_key", crew.key) + self._add_attribute(span, "crew_id", str(crew.id)) + self._add_attribute(span, "quality", str(quality)) + self._add_attribute(span, "exec_time", str(exec_time)) + self._add_attribute(span, "model_name", model_name) + span.set_status(Status(StatusCode.OK)) + span.end() + + self._safe_telemetry_operation(operation) + + def test_execution_span( + self, + crew: Crew, + iterations: int, + inputs: dict[str, Any] | None, + model_name: str, + ): + def operation(): + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Crew Test Execution") + + self._add_attribute( + span, + "crewai_version", + pkg_resources.get_distribution("crewai").version, + ) + self._add_attribute(span, "crew_key", crew.key) + self._add_attribute(span, "crew_id", str(crew.id)) + self._add_attribute(span, "iterations", str(iterations)) + self._add_attribute(span, "model_name", model_name) + + if crew.share_crew: + self._add_attribute( + span, "inputs", json.dumps(inputs) if inputs else None + ) + + span.set_status(Status(StatusCode.OK)) + span.end() + + self._safe_telemetry_operation(operation) + + def deploy_signup_error_span(self): + def operation(): + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Deploy Signup Error") + span.set_status(Status(StatusCode.OK)) + span.end() + + self._safe_telemetry_operation(operation) + + def start_deployment_span(self, uuid: Optional[str] = None): + def operation(): + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Start Deployment") + if uuid: + self._add_attribute(span, "uuid", uuid) + span.set_status(Status(StatusCode.OK)) + span.end() + + self._safe_telemetry_operation(operation) + + def create_crew_deployment_span(self): + def operation(): + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Create Crew Deployment") + span.set_status(Status(StatusCode.OK)) + span.end() + + self._safe_telemetry_operation(operation) + + def get_crew_logs_span(self, uuid: Optional[str], log_type: str = "deployment"): + def operation(): + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Get Crew Logs") + self._add_attribute(span, "log_type", log_type) + if uuid: + self._add_attribute(span, "uuid", uuid) + span.set_status(Status(StatusCode.OK)) + span.end() + + self._safe_telemetry_operation(operation) + + def remove_crew_span(self, uuid: Optional[str] = None): + def operation(): + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Remove Crew") + if uuid: + self._add_attribute(span, "uuid", uuid) + span.set_status(Status(StatusCode.OK)) + span.end() + + self._safe_telemetry_operation(operation) + + def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None): + """Records the complete execution of a crew. + This is only collected if the user has opted-in to share the crew. + """ + self.crew_creation(crew, inputs) + + def operation(): + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Crew Execution") + self._add_attribute( + span, + "crewai_version", + pkg_resources.get_distribution("crewai").version, + ) + self._add_attribute(span, "crew_key", crew.key) + self._add_attribute(span, "crew_id", str(crew.id)) + self._add_attribute( + span, "crew_inputs", json.dumps(inputs) if inputs else None + ) + self._add_attribute( + span, + "crew_agents", + json.dumps( + [ + { + "key": agent.key, + "id": str(agent.id), + "role": agent.role, + "goal": agent.goal, + "backstory": agent.backstory, + "verbose?": agent.verbose, + "max_iter": agent.max_iter, + "max_rpm": agent.max_rpm, + "i18n": agent.i18n.prompt_file, + "llm": agent.llm.model, + "delegation_enabled?": agent.allow_delegation, + "tools_names": [ + tool.name.casefold() for tool in agent.tools or [] + ], + } + for agent in crew.agents + ] + ), + ) + self._add_attribute( + span, + "crew_tasks", + json.dumps( + [ + { + "id": str(task.id), + "description": task.description, + "expected_output": task.expected_output, + "async_execution?": task.async_execution, + "human_input?": task.human_input, + "agent_role": task.agent.role if task.agent else "None", + "agent_key": task.agent.key if task.agent else None, + "context": ( + [task.description for task in task.context] + if task.context + else None + ), + "tools_names": [ + tool.name.casefold() for tool in task.tools or [] + ], + } + for task in crew.tasks + ] + ), + ) + return span + + if crew.share_crew: + return self._safe_telemetry_operation(operation) + return None + + def end_crew(self, crew, final_string_output): + def operation(): + self._add_attribute( + crew._execution_span, + "crewai_version", + pkg_resources.get_distribution("crewai").version, + ) + self._add_attribute( + crew._execution_span, "crew_output", final_string_output + ) + self._add_attribute( + crew._execution_span, + "crew_tasks_output", + json.dumps( + [ + { + "id": str(task.id), + "description": task.description, + "output": task.output.raw_output, + } + for task in crew.tasks + ] + ), + ) + crew._execution_span.set_status(Status(StatusCode.OK)) + crew._execution_span.end() + + if crew.share_crew: + self._safe_telemetry_operation(operation) def _add_attribute(self, span, key, value): """Add an attribute to a span.""" - try: + + def operation(): return span.set_attribute(key, value) - except Exception: - pass + + self._safe_telemetry_operation(operation) def flow_creation_span(self, flow_name: str): - if self.ready: - try: - tracer = trace.get_tracer("crewai.telemetry") - span = tracer.start_span("Flow Creation") - self._add_attribute(span, "flow_name", flow_name) - span.set_status(Status(StatusCode.OK)) - span.end() - except Exception: - pass + def operation(): + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Flow Creation") + self._add_attribute(span, "flow_name", flow_name) + span.set_status(Status(StatusCode.OK)) + span.end() + + self._safe_telemetry_operation(operation) def flow_plotting_span(self, flow_name: str, node_names: list[str]): - if self.ready: - try: - tracer = trace.get_tracer("crewai.telemetry") - span = tracer.start_span("Flow Plotting") - self._add_attribute(span, "flow_name", flow_name) - self._add_attribute(span, "node_names", json.dumps(node_names)) - span.set_status(Status(StatusCode.OK)) - span.end() - except Exception: - pass + def operation(): + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Flow Plotting") + self._add_attribute(span, "flow_name", flow_name) + self._add_attribute(span, "node_names", json.dumps(node_names)) + span.set_status(Status(StatusCode.OK)) + span.end() + + self._safe_telemetry_operation(operation) def flow_execution_span(self, flow_name: str, node_names: list[str]): - if self.ready: - try: - tracer = trace.get_tracer("crewai.telemetry") - span = tracer.start_span("Flow Execution") - self._add_attribute(span, "flow_name", flow_name) - self._add_attribute(span, "node_names", json.dumps(node_names)) - span.set_status(Status(StatusCode.OK)) - span.end() - except Exception: - pass + def operation(): + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Flow Execution") + self._add_attribute(span, "flow_name", flow_name) + self._add_attribute(span, "node_names", json.dumps(node_names)) + span.set_status(Status(StatusCode.OK)) + span.end() + + self._safe_telemetry_operation(operation)