From e4985dea5b7f32413214d340e05c4866b381272b Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Sun, 13 Oct 2024 16:02:10 -0400 Subject: [PATCH] Emit a span for workflow activation handling --- temporalio/worker/_workflow_instance.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 1ca70a23..5e2fd7b9 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -42,6 +42,8 @@ cast, ) +from google.protobuf.json_format import MessageToJson +from opentelemetry import trace from typing_extensions import Self, TypeAlias, TypedDict import temporalio.activity @@ -75,6 +77,7 @@ ) logger = logging.getLogger(__name__) +tracer = trace.get_tracer(__name__) # Set to true to log all cases where we're ignoring things during delete LOG_IGNORE_DURING_DELETE = False @@ -325,6 +328,21 @@ def get_thread_id(self) -> Optional[int]: def activate( self, act: temporalio.bridge.proto.workflow_activation.WorkflowActivation + ) -> temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion: + with tracer.start_as_current_span("HandleWorkflowActivation") as span: + span.set_attribute("rpc.method", "WorkflowActivation") + span.set_attribute("rpc.request.type", "WorkflowActivation") + span.set_attribute("rpc.request.payload", MessageToJson(act)) + span.set_attribute("temporalWorkflowID", self._info.workflow_id) + span.set_attribute("temporal.worker", True) + completion = self._activate(act) + span.set_attribute("rpc.response.type", "WorkflowActivationCompletion") + span.set_attribute("rpc.response.payload", MessageToJson(completion)) + trace.get_tracer_provider().force_flush() # type: ignore + return completion + + def _activate( + self, act: temporalio.bridge.proto.workflow_activation.WorkflowActivation ) -> temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion: # Reset current completion, time, and whether replaying self._current_completion = (