From 952f473ba202a82bc10e779ca27ff84015b44aa9 Mon Sep 17 00:00:00 2001 From: David Perl Date: Mon, 15 Apr 2024 17:05:37 +0100 Subject: [PATCH 01/17] (#1682) Add OpenTelemetry tracing - Adds opentelemetry-api as dependency - Adds traces on RunEngine methods which take time: wait(), set(), complete(), ... --- pyproject.toml | 1 + src/bluesky/run_engine.py | 13 +++++++++++++ 2 files changed, 14 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index b791748b66..d917a6a03b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ dependencies = [ "msgpack", "msgpack-numpy", "numpy", + "opentelemetry-api", "super_state_machine", "toolz", "tqdm>=4.44", diff --git a/src/bluesky/run_engine.py b/src/bluesky/run_engine.py index d544ecd7f1..ffeb065dd5 100644 --- a/src/bluesky/run_engine.py +++ b/src/bluesky/run_engine.py @@ -17,6 +17,7 @@ from warnings import warn from event_model import DocumentNames +from opentelemetry import trace from super_state_machine.errors import TransitionError from super_state_machine.extras import PropertyMachine from super_state_machine.machines import StateMachine @@ -57,6 +58,9 @@ warn_if_msg_args_or_kwargs, ) +tracer = trace.get_tracer(__name__) +_SPAN_NAME_PREFIX = "Bluesky RunEngine" + current_task: typing.Callable[[typing.Optional[asyncio.AbstractEventLoop]], typing.Optional[asyncio.Task]] try: from asyncio import current_task @@ -2137,6 +2141,7 @@ def done_callback(status=None): return ret + @tracer.start_as_current_span(f"{_SPAN_NAME_PREFIX} complete") async def _complete(self, msg): """ Tell a flyer, 'stop collecting, whenever you are ready'. @@ -2179,6 +2184,7 @@ def done_callback(status=None): self._status_objs[group].add(ret) return ret + @tracer.start_as_current_span(f"{_SPAN_NAME_PREFIX} collect") async def _collect(self, msg): """ Collect data cached by a flyer and emit documents @@ -2208,6 +2214,7 @@ async def _RE_class(self, msg): """ return type(self) + @tracer.start_as_current_span(f"{_SPAN_NAME_PREFIX} set") async def _set(self, msg): """ Set a device and cache the returned status object. @@ -2221,6 +2228,7 @@ async def _set(self, msg): where arguments are passed through to `obj.set(*args, **kwargs)`. """ + trace.get_current_span().set_attribute("message", str(msg)) obj = check_supports(msg.obj, Movable) kwargs = dict(msg.kwargs) group = kwargs.pop("group", None) @@ -2261,6 +2269,7 @@ async def _trigger(self, msg): return ret + @tracer.start_as_current_span(f"{_SPAN_NAME_PREFIX} wait") async def _wait(self, msg): """Block progress until every object that was triggered or set with the keyword argument `group=` is done. @@ -2275,6 +2284,10 @@ async def _wait(self, msg): (group,) = msg.args else: group = msg.kwargs["group"] + if group: + trace.get_current_span().set_attribute("group", group) + else: + trace.get_current_span().set_attribute("no_group_given", "True") futs = list(self._groups.pop(group, [])) if futs: status_objs = self._status_objs.pop(group) From 84725f4cf856aeefa1925f8064ebcb3ff7d020d0 Mon Sep 17 00:00:00 2001 From: David Perl Date: Wed, 17 Apr 2024 14:06:37 +0100 Subject: [PATCH 02/17] (#1682) add a tracing decorator for plans --- src/bluesky/protocols.py | 2 ++ src/bluesky/run_engine.py | 8 +++----- src/bluesky/tracing.py | 25 +++++++++++++++++++++++++ src/bluesky/utils/__init__.py | 3 +++ 4 files changed, 33 insertions(+), 5 deletions(-) create mode 100644 src/bluesky/tracing.py diff --git a/src/bluesky/protocols.py b/src/bluesky/protocols.py index 5d30e38cc0..5b7dda7933 100644 --- a/src/bluesky/protocols.py +++ b/src/bluesky/protocols.py @@ -10,6 +10,7 @@ List, Literal, Optional, + ParamSpec, Protocol, Tuple, Type, @@ -66,6 +67,7 @@ class Reading(ReadingOptional): T = TypeVar("T") +P = ParamSpec("P") SyncOrAsync = Union[T, Awaitable[T]] SyncOrAsyncIterator = Union[Iterator[T], AsyncIterator[T]] diff --git a/src/bluesky/run_engine.py b/src/bluesky/run_engine.py index ffeb065dd5..bc36aa8c2e 100644 --- a/src/bluesky/run_engine.py +++ b/src/bluesky/run_engine.py @@ -34,9 +34,11 @@ Stageable, Status, Stoppable, + T, Triggerable, check_supports, ) +from .tracing import tracer from .utils import ( AsyncInput, CallbackRegistry, @@ -58,7 +60,6 @@ warn_if_msg_args_or_kwargs, ) -tracer = trace.get_tracer(__name__) _SPAN_NAME_PREFIX = "Bluesky RunEngine" current_task: typing.Callable[[typing.Optional[asyncio.AbstractEventLoop]], typing.Optional[asyncio.Task]] @@ -2287,7 +2288,7 @@ async def _wait(self, msg): if group: trace.get_current_span().set_attribute("group", group) else: - trace.get_current_span().set_attribute("no_group_given", "True") + trace.get_current_span().set_attribute("no_group_given", True) futs = list(self._groups.pop(group, [])) if futs: status_objs = self._status_objs.pop(group) @@ -2813,9 +2814,6 @@ def in_bluesky_event_loop() -> bool: return loop is _bluesky_event_loop -T = typing.TypeVar("T") - - def call_in_bluesky_event_loop(coro: typing.Awaitable[T], timeout: typing.Optional[float] = None) -> T: if _bluesky_event_loop is None or not _bluesky_event_loop.is_running(): # Quell "coroutine never awaited" warnings diff --git a/src/bluesky/tracing.py b/src/bluesky/tracing.py new file mode 100644 index 0000000000..c39368783e --- /dev/null +++ b/src/bluesky/tracing.py @@ -0,0 +1,25 @@ + +import functools +from typing import Callable, cast + +from opentelemetry.trace import Tracer, get_tracer + +from .protocols import P +from .utils import MsgGenerator + +tracer = get_tracer(__name__) + + +def trace_plan( + tracer: Tracer, span_name: str +) -> Callable[[Callable[P, MsgGenerator]], Callable[P, MsgGenerator]]: + """Wraps a generator function in tracer.start_as_current_span(span_name)""" + def wrap(f: Callable[P, MsgGenerator]) -> Callable[P, MsgGenerator]: + @functools.wraps(f) + def wrap_f(*args: P.args, **kwargs: P.kwargs): + with tracer.start_as_current_span(span_name): + yield from f(*args, **kwargs) + + return cast(Callable[P, MsgGenerator], wrap_f) + + return wrap diff --git a/src/bluesky/utils/__init__.py b/src/bluesky/utils/__init__.py index c3734608d0..a31b59391e 100644 --- a/src/bluesky/utils/__init__.py +++ b/src/bluesky/utils/__init__.py @@ -23,6 +23,7 @@ AsyncIterator, Callable, Dict, + Generator, List, Optional, Tuple, @@ -86,6 +87,8 @@ def __repr__(self): f"args={self.args}, kwargs={self.kwargs}, run={self.run!r})" ) +MsgGenerator = Generator[Msg, Any, None] + class RunEngineControlException(Exception): """Exception for signaling within the RunEngine.""" From d6407ff9bc388a643bf324ba8aa28a79cc80f99a Mon Sep 17 00:00:00 2001 From: David Perl Date: Wed, 17 Apr 2024 15:49:37 +0100 Subject: [PATCH 03/17] (#1682) add traces on runs --- src/bluesky/run_engine.py | 27 +++++++++++++++++++++++++++ src/bluesky/tracing.py | 6 ++---- src/bluesky/utils/__init__.py | 1 + 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/src/bluesky/run_engine.py b/src/bluesky/run_engine.py index bc36aa8c2e..45f1554edc 100644 --- a/src/bluesky/run_engine.py +++ b/src/bluesky/run_engine.py @@ -18,6 +18,7 @@ from event_model import DocumentNames from opentelemetry import trace +from opentelemetry.trace import Span from super_state_machine.errors import TransitionError from super_state_machine.extras import PropertyMachine from super_state_machine.machines import StateMachine @@ -431,6 +432,8 @@ def __init__( # When set, RunEngine.__call__ should stop blocking. self._blocking_event = threading.Event() + self._run_tracing_spans: list[Span] = [] + # When cleared, RunEngine._run will pause until set. self._run_permit = None @@ -1319,6 +1322,7 @@ async def _abort_coro(self, reason): self._reason = reason self._exit_status = "abort" + self._destroy_open_run_tracing_spans() was_paused = self._state == "paused" self._state = "aborting" @@ -1422,6 +1426,7 @@ async def _halt_coro(self): if self._state.is_idle: raise TransitionError("RunEngine is already idle.") print("Halting: skipping cleanup and marking exit_status as 'abort'...") + self._destroy_open_run_tracing_spans() self._interrupted = True was_paused = self._state == "paused" self._state = "halting" @@ -1450,6 +1455,12 @@ async def _stop_movable_objects(self, *, success=True): else: self.log.debug("No 'stop' method available on %r", obj) + def _destroy_open_run_tracing_spans(self): + while len(self._run_tracing_spans): + _span = self._run_tracing_spans.pop() + _span.set_attribute("exit_status", "aborted") + _span.end() + async def _run(self): """Pull messages from the plan, process them, send results back. @@ -1816,6 +1827,10 @@ async def _open_run(self, msg): where **kwargs are any additional metadata that should go into the RunStart document """ + _span = tracer.start_span(f"{_SPAN_NAME_PREFIX} run") + _span.set_attribute("message", repr(msg)) + self._run_tracing_spans.append(_span) + # TODO extract this from the Msg run_key = msg.run if run_key in self._run_bundlers: @@ -1877,8 +1892,20 @@ async def _close_run(self, msg): ) from ke ret = await current_run.close_run(msg) del self._run_bundlers[run_key] + self._close_run_trace(msg) return ret + def _close_run_trace(self, msg): + exit_status = msg.exit_status if hasattr(msg, "exit_status") else self._exit_status + reason = msg.reason if hasattr(msg, "reason") else self._reason + try: + _span: Span = self._run_tracing_spans.pop() + _span.set_attribute("exit_status", exit_status) + _span.set_attribute("reason", reason) + _span.end() + except IndexError: + logger.warning("No open traces left to close!") + async def _create(self, msg): """Trigger the run engine to start bundling future obj.read() calls for an Event document diff --git a/src/bluesky/tracing.py b/src/bluesky/tracing.py index c39368783e..b1ee337986 100644 --- a/src/bluesky/tracing.py +++ b/src/bluesky/tracing.py @@ -1,4 +1,3 @@ - import functools from typing import Callable, cast @@ -10,10 +9,9 @@ tracer = get_tracer(__name__) -def trace_plan( - tracer: Tracer, span_name: str -) -> Callable[[Callable[P, MsgGenerator]], Callable[P, MsgGenerator]]: +def trace_plan(tracer: Tracer, span_name: str) -> Callable[[Callable[P, MsgGenerator]], Callable[P, MsgGenerator]]: """Wraps a generator function in tracer.start_as_current_span(span_name)""" + def wrap(f: Callable[P, MsgGenerator]) -> Callable[P, MsgGenerator]: @functools.wraps(f) def wrap_f(*args: P.args, **kwargs: P.kwargs): diff --git a/src/bluesky/utils/__init__.py b/src/bluesky/utils/__init__.py index a31b59391e..7c2b41cbb6 100644 --- a/src/bluesky/utils/__init__.py +++ b/src/bluesky/utils/__init__.py @@ -87,6 +87,7 @@ def __repr__(self): f"args={self.args}, kwargs={self.kwargs}, run={self.run!r})" ) + MsgGenerator = Generator[Msg, Any, None] From 20038e5c80aa9697b83adf0ecbe33ca8ff180dd4 Mon Sep 17 00:00:00 2001 From: David Perl Date: Thu, 18 Apr 2024 15:40:52 +0100 Subject: [PATCH 04/17] (#1682) import ParamSpec from typing extensions if necessary --- src/bluesky/protocols.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/bluesky/protocols.py b/src/bluesky/protocols.py index 5b7dda7933..bebcd81265 100644 --- a/src/bluesky/protocols.py +++ b/src/bluesky/protocols.py @@ -10,7 +10,6 @@ List, Literal, Optional, - ParamSpec, Protocol, Tuple, Type, @@ -31,6 +30,11 @@ # Squashes warning Dtype = Dtype # type: ignore +try: + from typing import ParamSpec +except ImportError: + from typing_extensions import ParamSpec # type: ignore + # TODO: these are not placed in Events by RE yet class ReadingOptional(TypedDict, total=False): From e0a7d4968080bfff2e7fc63170e940e94c34a890 Mon Sep 17 00:00:00 2001 From: David Perl Date: Fri, 19 Apr 2024 09:49:31 +0100 Subject: [PATCH 05/17] (#1682) add tests --- src/bluesky/tests/test_tracing.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 src/bluesky/tests/test_tracing.py diff --git a/src/bluesky/tests/test_tracing.py b/src/bluesky/tests/test_tracing.py new file mode 100644 index 0000000000..35923ca560 --- /dev/null +++ b/src/bluesky/tests/test_tracing.py @@ -0,0 +1,26 @@ +from typing import Generator + +from opentelemetry.trace import get_current_span + +from bluesky.plan_stubs import sleep +from bluesky.run_engine import RunEngine +from bluesky.tracing import trace_plan, tracer + + +@trace_plan(tracer, "test_plan") +def _test_plan(): + yield from sleep(0) + + +def test_trace_plan_wrapper_gives_back_generator(): + assert isinstance(_test_plan(), Generator) + + +def test_trace_plan_wrapper_opens_span_but_doesnt_do_anything_without_trace_provider(): + @trace_plan(tracer, "test_plan_2") + def test_plan_2(): + yield from sleep(0) + assert not get_current_span().is_recording() + + RE = RunEngine() + RE(test_plan_2()) From 40f53cdf14bfa34971e57e9ecf659a19500589c4 Mon Sep 17 00:00:00 2001 From: David Perl Date: Mon, 10 Jun 2024 12:58:36 +0100 Subject: [PATCH 06/17] (#1682) separate trace attributes for msg attributes --- src/bluesky/run_engine.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/bluesky/run_engine.py b/src/bluesky/run_engine.py index 8b4cfbf105..0c57c134a2 100644 --- a/src/bluesky/run_engine.py +++ b/src/bluesky/run_engine.py @@ -1829,7 +1829,8 @@ async def _open_run(self, msg): the RunStart document """ _span = tracer.start_span(f"{_SPAN_NAME_PREFIX} run") - _span.set_attribute("message", repr(msg)) + _set_span_msg_attributes(_span, msg) + self._run_tracing_spans.append(_span) # TODO extract this from the Msg @@ -2187,6 +2188,7 @@ async def _complete(self, msg): where is a hashable identifier. """ + _set_span_msg_attributes(trace.get_current_span(), msg) kwargs = dict(msg.kwargs) group = kwargs.pop("group", None) obj = check_supports(msg.obj, Flyable) @@ -2223,6 +2225,7 @@ async def _collect(self, msg): Msg('collect', flyer_object) Msg('collect', flyer_object, stream=True, return_payload=False, name="a_name") """ + _set_span_msg_attributes(trace.get_current_span(), msg) run_key = msg.run try: current_run = self._run_bundlers[run_key] @@ -2257,7 +2260,7 @@ async def _set(self, msg): where arguments are passed through to `obj.set(*args, **kwargs)`. """ - trace.get_current_span().set_attribute("message", str(msg)) + _set_span_msg_attributes(trace.get_current_span(), msg) obj = check_supports(msg.obj, Movable) kwargs = dict(msg.kwargs) group = kwargs.pop("group", None) @@ -2309,6 +2312,7 @@ async def _wait(self, msg): where ```` is any hashable key. """ + _set_span_msg_attributes(trace.get_current_span(), msg) if msg.args: (group,) = msg.args else: @@ -2784,6 +2788,12 @@ def ignore_exceptions(self, val): http://nsls-ii.github.io/bluesky/plans_intro.html#combining-plans """ +def _set_span_msg_attributes(span, msg): + span.set_attribute("msg.command", msg.command) + span.set_attribute("msg.args", msg.args) + span.set_attribute("msg.kwargs", msg.kwargs) + span.set_attribute("msg.obj", msg.obj) + def _default_md_validator(md): if "sample" in md and not (hasattr(md["sample"], "keys") or isinstance(md["sample"], str)): From aa15684006eef8366c3e3a1ad4d67614d9b43ba5 Mon Sep 17 00:00:00 2001 From: David Perl Date: Mon, 17 Jun 2024 11:14:23 +0100 Subject: [PATCH 07/17] (#1682) pin numpy dependency --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index a6019f4928..d85e615302 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ dependencies = [ "historydict", "msgpack", "msgpack-numpy", - "numpy", + "numpy<2.0.0", "opentelemetry-api", "toolz", "tqdm>=4.44", From 1d650bc307a27068d071341e3995e91418a23588 Mon Sep 17 00:00:00 2001 From: David Perl Date: Mon, 17 Jun 2024 11:44:42 +0100 Subject: [PATCH 08/17] (#1682) add importlib_resources for old python --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index d85e615302..448ebe8271 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ "cycler", "event-model>=1.19.8", "historydict", + "importlib-resources;python_version<'3.9'", "msgpack", "msgpack-numpy", "numpy<2.0.0", From 12ea0d57f2901f60761bb67c36c1697bac82b589 Mon Sep 17 00:00:00 2001 From: David Perl Date: Mon, 17 Jun 2024 17:18:08 +0100 Subject: [PATCH 09/17] (#1682) add a doc page --- docs/otel-tracing.rst | 8 ++++++++ pyproject.toml | 3 ++- 2 files changed, 10 insertions(+), 1 deletion(-) create mode 100644 docs/otel-tracing.rst diff --git a/docs/otel-tracing.rst b/docs/otel-tracing.rst new file mode 100644 index 0000000000..75530d56f3 --- /dev/null +++ b/docs/otel-tracing.rst @@ -0,0 +1,8 @@ +Tracing with Opentelemetry +========================== + +Bluesky is instrumented with [OpenTelemetry](https://opentelemetry.io/) tracing span hooks on runs, and on potentially long-running RunEngine methods such as `wait()` and `collect()`. This can allow you to analyze where your plans are spending a lot of time. + +By itself, this doesn't do anything. To collect tracing output you should configure an exporter in your application or plan, and a collector to send the data to. You can find an example of how to do that [here](https://opentelemetry.io/docs/languages/python/exporters/#usage). + +Tracing messages from the `RunEngine` are named `"Bluesky RunEngine "`, e.g. `"Bluesky RunEngine wait"`. Traces for runs are tagged with the `success`, `exit_status` as attributes, as well as the `reason` if one is available. Traces for methods are tagged with the content of the message (`msg.command`, `msg.args`, `msg.kwargs`, and `msg.obj`). Traces for `wait()` also log the `group` if one was given, or set `no_group_given` true if none was. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 448ebe8271..5b6c3c60f4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -62,7 +62,8 @@ dev = [ "pipdeptree", "pre-commit", "pydata-sphinx-theme>=0.12", - "pyepics", + "pyepics<=3.5.2;python_version<'3.9'", # Currently broken on py3.8 + "pyepics;python_version>='3.9'", "pyqt5", "pytest", "pytest-cov", From c87ed8485bd0d058b9eb3e1cf97560bfb6c1e609 Mon Sep 17 00:00:00 2001 From: David Perl Date: Tue, 18 Jun 2024 08:28:49 +0100 Subject: [PATCH 10/17] (#1682) add doc page to tree --- docs/otel-tracing.rst | 4 +++- docs/userindex.rst | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/otel-tracing.rst b/docs/otel-tracing.rst index 75530d56f3..7cc714e792 100644 --- a/docs/otel-tracing.rst +++ b/docs/otel-tracing.rst @@ -5,4 +5,6 @@ Bluesky is instrumented with [OpenTelemetry](https://opentelemetry.io/) tracing By itself, this doesn't do anything. To collect tracing output you should configure an exporter in your application or plan, and a collector to send the data to. You can find an example of how to do that [here](https://opentelemetry.io/docs/languages/python/exporters/#usage). -Tracing messages from the `RunEngine` are named `"Bluesky RunEngine "`, e.g. `"Bluesky RunEngine wait"`. Traces for runs are tagged with the `success`, `exit_status` as attributes, as well as the `reason` if one is available. Traces for methods are tagged with the content of the message (`msg.command`, `msg.args`, `msg.kwargs`, and `msg.obj`). Traces for `wait()` also log the `group` if one was given, or set `no_group_given` true if none was. \ No newline at end of file +Tracing messages from the `RunEngine` are named `"Bluesky RunEngine "`, e.g. `"Bluesky RunEngine wait"`. Traces for runs are tagged with the `success`, `exit_status` as attributes, as well as the `reason` if one is available. Traces for methods are tagged with the content of the message (`msg.command`, `msg.args`, `msg.kwargs`, and `msg.obj`). Traces for `wait()` also log the `group` if one was given, or set `no_group_given` true if none was. Ophyd also has traces on `Status` objects to easily record how long they live for. + +Examples: \ No newline at end of file diff --git a/docs/userindex.rst b/docs/userindex.rst index 09a6d79193..ff28c8850f 100644 --- a/docs/userindex.rst +++ b/docs/userindex.rst @@ -19,6 +19,7 @@ User Documentation run_engine_api utils magics + otel-tracing from-pyepics-to-bluesky comparison-with-spec hardware-interfaces From 476aec0f30c678dedb233eb3dac1bd8066ba3f0f Mon Sep 17 00:00:00 2001 From: David Perl Date: Tue, 18 Jun 2024 09:42:54 +0100 Subject: [PATCH 11/17] (#1682) add demo data to doc --- docs/otel-tracing.rst | 141 +++++++++++++++++++++++++++++++++++++- src/bluesky/run_engine.py | 5 +- 2 files changed, 142 insertions(+), 4 deletions(-) diff --git a/docs/otel-tracing.rst b/docs/otel-tracing.rst index 7cc714e792..1519370b62 100644 --- a/docs/otel-tracing.rst +++ b/docs/otel-tracing.rst @@ -3,8 +3,145 @@ Tracing with Opentelemetry Bluesky is instrumented with [OpenTelemetry](https://opentelemetry.io/) tracing span hooks on runs, and on potentially long-running RunEngine methods such as `wait()` and `collect()`. This can allow you to analyze where your plans are spending a lot of time. -By itself, this doesn't do anything. To collect tracing output you should configure an exporter in your application or plan, and a collector to send the data to. You can find an example of how to do that [here](https://opentelemetry.io/docs/languages/python/exporters/#usage). +By itself, this doesn't do anything. To collect tracing output you should configure an exporter in your application or plan, and a collector to send the data to. You can find an example of how to do that [here](https://opentelemetry.io/docs/languages/python/exporters/#usage). Since the `start_as_current_span` decorator from the opentelemetry library doesn't work for generators, we also provide a `trace_plan` decorator in `bluesky.tracing` Tracing messages from the `RunEngine` are named `"Bluesky RunEngine "`, e.g. `"Bluesky RunEngine wait"`. Traces for runs are tagged with the `success`, `exit_status` as attributes, as well as the `reason` if one is available. Traces for methods are tagged with the content of the message (`msg.command`, `msg.args`, `msg.kwargs`, and `msg.obj`). Traces for `wait()` also log the `group` if one was given, or set `no_group_given` true if none was. Ophyd also has traces on `Status` objects to easily record how long they live for. -Examples: \ No newline at end of file +Examples: +--------- + +Using the following script, which runs a simple scan and sends traces to the console: + +.. literalinclude:: otel-tracing-demo.py + :language: python + +We obtain the following trace data. Note that the innermost spans are closed first, and therefore printed to the console first. + +```json +{ + "name": "Ophyd Status", + "context": { + "trace_id": "0x6a5abd70e7f74967975ecf64a863a12d", + "span_id": "0x0f94466a988f72ea", + "trace_state": "[]" + }, + "kind": "SpanKind.INTERNAL", + "parent_id": "0x4d6e0e523cfdfec8", + "start_time": "2024-06-18T08:40:56.378644Z", + "end_time": "2024-06-18T08:40:56.378896Z", + "status": { + "status_code": "UNSET" + }, + "attributes": { + "status_type": "MoveStatus", + "settle_time": 0, + "no_timeout_given": true, + "object_repr": "MoveStatus(done=False, pos=motor1, elapsed=0.0, success=False, settle_time=0.0)", + "device_name": "motor1", + "device_type": "SynAxis", + "kwargs": "{}", + "target": 5, + "start_time": 1718700056.37859, + "start_pos ": 0, + "unit": "mm", + "positioner_name": "motor1", + "positioner": "SynAxis(prefix='', name='motor1', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])", + "finish_time": 1718700056.3788693, + "finish_pos": 5.0 + }, + "events": [], + "links": [], + "resource": { + "attributes": { + "service.name": "bluesky-docs-example" + }, + "schema_url": "" + } +} +{ + "name": "Bluesky RunEngine set", + "context": { + "trace_id": "0x6a5abd70e7f74967975ecf64a863a12d", + "span_id": "0x4d6e0e523cfdfec8", + "trace_state": "[]" + }, + "kind": "SpanKind.INTERNAL", + "parent_id": "0x6d4272c02f6990f5", + "start_time": "2024-06-18T08:40:56.378468Z", + "end_time": "2024-06-18T08:40:56.378995Z", + "status": { + "status_code": "UNSET" + }, + "attributes": { + "msg.command": "set", + "msg.args": [ + 5 + ], + "msg.kwargs": "{\"group\": null}", + "msg.obj": "SynAxis(prefix='', name='motor1', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])" + }, + "events": [], + "links": [], + "resource": { + "attributes": { + "service.name": "bluesky-docs-example" + }, + "schema_url": "" + } +} +{ + "name": "demo plan", + "context": { + "trace_id": "0x6a5abd70e7f74967975ecf64a863a12d", + "span_id": "0x6d4272c02f6990f5", + "trace_state": "[]" + }, + "kind": "SpanKind.INTERNAL", + "parent_id": null, + "start_time": "2024-06-18T08:40:56.378255Z", + "end_time": "2024-06-18T08:40:56.379044Z", + "status": { + "status_code": "UNSET" + }, + "attributes": {}, + "events": [], + "links": [], + "resource": { + "attributes": { + "service.name": "bluesky-docs-example" + }, + "schema_url": "" + } +} +{ + "name": "Bluesky RunEngine run", + "context": { + "trace_id": "0xedc3dae09fb45d4982e0af29f053141c", + "span_id": "0xb54c64dcdebaa81a", + "trace_state": "[]" + }, + "kind": "SpanKind.INTERNAL", + "parent_id": null, + "start_time": "2024-06-18T08:40:56.377673Z", + "end_time": "2024-06-18T08:40:56.379335Z", + "status": { + "status_code": "UNSET" + }, + "attributes": { + "msg.command": "open_run", + "msg.args": [], + "msg.kwargs": "{}", + "msg.no_obj_given": true, + "exit_status": "success", + "reason": "" + }, + "events": [], + "links": [], + "resource": { + "attributes": { + "service.name": "bluesky-docs-example" + }, + "schema_url": "" + } +} +``` \ No newline at end of file diff --git a/src/bluesky/run_engine.py b/src/bluesky/run_engine.py index e42c5530d9..4d2036ebe9 100644 --- a/src/bluesky/run_engine.py +++ b/src/bluesky/run_engine.py @@ -3,6 +3,7 @@ import copy import functools import inspect +import json import sys import threading import typing @@ -2821,8 +2822,8 @@ def ignore_exceptions(self, val): def _set_span_msg_attributes(span, msg): span.set_attribute("msg.command", msg.command) span.set_attribute("msg.args", msg.args) - span.set_attribute("msg.kwargs", msg.kwargs) - span.set_attribute("msg.obj", msg.obj) + span.set_attribute("msg.kwargs", json.dumps(msg.kwargs)) + span.set_attribute("msg.obj", repr(msg.obj)) if msg.obj else span.set_attribute("msg.no_obj_given", True) def _default_md_validator(md): From 3ca0733517ef1128b580f0c3c14238f43b166343 Mon Sep 17 00:00:00 2001 From: David Perl Date: Tue, 18 Jun 2024 09:53:23 +0100 Subject: [PATCH 12/17] (#1682) add demo data to doc --- docs/otel-tracing-demo.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 docs/otel-tracing-demo.py diff --git a/docs/otel-tracing-demo.py b/docs/otel-tracing-demo.py new file mode 100644 index 0000000000..1d4d9667f6 --- /dev/null +++ b/docs/otel-tracing-demo.py @@ -0,0 +1,33 @@ +from opentelemetry.sdk.resources import SERVICE_NAME, Resource + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter + +from bluesky.tracing import trace_plan, tracer +from bluesky.run_engine import RunEngine +import bluesky.plan_stubs as bps +from bluesky.plans import count +import bluesky.preprocessors as bpp +from ophyd.sim import det1 + +# Service name is required for most backends, +# and although it's not necessary for console export, +# it's good to set service name anyways. +resource = Resource(attributes={ + SERVICE_NAME: "bluesky-docs-example" +}) + +traceProvider = TracerProvider(resource=resource) +processor = BatchSpanProcessor(ConsoleSpanExporter()) +traceProvider.add_span_processor(processor) +trace.set_tracer_provider(traceProvider) + +RE = RunEngine() + +@bpp.run_decorator() +@trace_plan(tracer, "demo plan") +def test_tracing_plan(): + yield from bps.abs_set(det1._motor, 5) + +RE(test_tracing_plan()) From 77f19c2048932cd280447866c6fc441e8794fecf Mon Sep 17 00:00:00 2001 From: David Perl Date: Tue, 18 Jun 2024 10:57:19 +0100 Subject: [PATCH 13/17] (#1682) ruff format --- docs/otel-tracing-demo.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/otel-tracing-demo.py b/docs/otel-tracing-demo.py index 1d4d9667f6..88951f4696 100644 --- a/docs/otel-tracing-demo.py +++ b/docs/otel-tracing-demo.py @@ -14,9 +14,7 @@ # Service name is required for most backends, # and although it's not necessary for console export, # it's good to set service name anyways. -resource = Resource(attributes={ - SERVICE_NAME: "bluesky-docs-example" -}) +resource = Resource(attributes={SERVICE_NAME: "bluesky-docs-example"}) traceProvider = TracerProvider(resource=resource) processor = BatchSpanProcessor(ConsoleSpanExporter()) @@ -25,9 +23,11 @@ RE = RunEngine() + @bpp.run_decorator() @trace_plan(tracer, "demo plan") def test_tracing_plan(): yield from bps.abs_set(det1._motor, 5) + RE(test_tracing_plan()) From a57ae1919bbf1d61b4d561bce9eb55d48f65b8f9 Mon Sep 17 00:00:00 2001 From: David Perl Date: Tue, 18 Jun 2024 10:58:16 +0100 Subject: [PATCH 14/17] (#1682) ruff format --- docs/otel-tracing-demo.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/docs/otel-tracing-demo.py b/docs/otel-tracing-demo.py index 88951f4696..b2ef7224c4 100644 --- a/docs/otel-tracing-demo.py +++ b/docs/otel-tracing-demo.py @@ -1,15 +1,13 @@ -from opentelemetry.sdk.resources import SERVICE_NAME, Resource - from opentelemetry import trace +from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter +from ophyd.sim import det1 -from bluesky.tracing import trace_plan, tracer -from bluesky.run_engine import RunEngine import bluesky.plan_stubs as bps -from bluesky.plans import count import bluesky.preprocessors as bpp -from ophyd.sim import det1 +from bluesky.run_engine import RunEngine +from bluesky.tracing import trace_plan, tracer # Service name is required for most backends, # and although it's not necessary for console export, From 3b1714aa21d068c09a800dfbc05d2f796b070155 Mon Sep 17 00:00:00 2001 From: David Perl Date: Tue, 18 Jun 2024 11:07:33 +0100 Subject: [PATCH 15/17] (#1682) fix docs formatting --- docs/otel-tracing.rst | 256 +++++++++++++++++++++--------------------- 1 file changed, 128 insertions(+), 128 deletions(-) diff --git a/docs/otel-tracing.rst b/docs/otel-tracing.rst index 1519370b62..b4ac1ec76e 100644 --- a/docs/otel-tracing.rst +++ b/docs/otel-tracing.rst @@ -17,131 +17,131 @@ Using the following script, which runs a simple scan and sends traces to the con We obtain the following trace data. Note that the innermost spans are closed first, and therefore printed to the console first. -```json -{ - "name": "Ophyd Status", - "context": { - "trace_id": "0x6a5abd70e7f74967975ecf64a863a12d", - "span_id": "0x0f94466a988f72ea", - "trace_state": "[]" - }, - "kind": "SpanKind.INTERNAL", - "parent_id": "0x4d6e0e523cfdfec8", - "start_time": "2024-06-18T08:40:56.378644Z", - "end_time": "2024-06-18T08:40:56.378896Z", - "status": { - "status_code": "UNSET" - }, - "attributes": { - "status_type": "MoveStatus", - "settle_time": 0, - "no_timeout_given": true, - "object_repr": "MoveStatus(done=False, pos=motor1, elapsed=0.0, success=False, settle_time=0.0)", - "device_name": "motor1", - "device_type": "SynAxis", - "kwargs": "{}", - "target": 5, - "start_time": 1718700056.37859, - "start_pos ": 0, - "unit": "mm", - "positioner_name": "motor1", - "positioner": "SynAxis(prefix='', name='motor1', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])", - "finish_time": 1718700056.3788693, - "finish_pos": 5.0 - }, - "events": [], - "links": [], - "resource": { - "attributes": { - "service.name": "bluesky-docs-example" - }, - "schema_url": "" - } -} -{ - "name": "Bluesky RunEngine set", - "context": { - "trace_id": "0x6a5abd70e7f74967975ecf64a863a12d", - "span_id": "0x4d6e0e523cfdfec8", - "trace_state": "[]" - }, - "kind": "SpanKind.INTERNAL", - "parent_id": "0x6d4272c02f6990f5", - "start_time": "2024-06-18T08:40:56.378468Z", - "end_time": "2024-06-18T08:40:56.378995Z", - "status": { - "status_code": "UNSET" - }, - "attributes": { - "msg.command": "set", - "msg.args": [ - 5 - ], - "msg.kwargs": "{\"group\": null}", - "msg.obj": "SynAxis(prefix='', name='motor1', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])" - }, - "events": [], - "links": [], - "resource": { - "attributes": { - "service.name": "bluesky-docs-example" - }, - "schema_url": "" - } -} -{ - "name": "demo plan", - "context": { - "trace_id": "0x6a5abd70e7f74967975ecf64a863a12d", - "span_id": "0x6d4272c02f6990f5", - "trace_state": "[]" - }, - "kind": "SpanKind.INTERNAL", - "parent_id": null, - "start_time": "2024-06-18T08:40:56.378255Z", - "end_time": "2024-06-18T08:40:56.379044Z", - "status": { - "status_code": "UNSET" - }, - "attributes": {}, - "events": [], - "links": [], - "resource": { - "attributes": { - "service.name": "bluesky-docs-example" - }, - "schema_url": "" - } -} -{ - "name": "Bluesky RunEngine run", - "context": { - "trace_id": "0xedc3dae09fb45d4982e0af29f053141c", - "span_id": "0xb54c64dcdebaa81a", - "trace_state": "[]" - }, - "kind": "SpanKind.INTERNAL", - "parent_id": null, - "start_time": "2024-06-18T08:40:56.377673Z", - "end_time": "2024-06-18T08:40:56.379335Z", - "status": { - "status_code": "UNSET" - }, - "attributes": { - "msg.command": "open_run", - "msg.args": [], - "msg.kwargs": "{}", - "msg.no_obj_given": true, - "exit_status": "success", - "reason": "" - }, - "events": [], - "links": [], - "resource": { - "attributes": { - "service.name": "bluesky-docs-example" - }, - "schema_url": "" - } -} -``` \ No newline at end of file +.. code-block:: JSON + + { + "name": "Ophyd Status", + "context": { + "trace_id": "0x6a5abd70e7f74967975ecf64a863a12d", + "span_id": "0x0f94466a988f72ea", + "trace_state": "[]" + }, + "kind": "SpanKind.INTERNAL", + "parent_id": "0x4d6e0e523cfdfec8", + "start_time": "2024-06-18T08:40:56.378644Z", + "end_time": "2024-06-18T08:40:56.378896Z", + "status": { + "status_code": "UNSET" + }, + "attributes": { + "status_type": "MoveStatus", + "settle_time": 0, + "no_timeout_given": true, + "object_repr": "MoveStatus(done=False, pos=motor1, elapsed=0.0, success=False, settle_time=0.0)", + "device_name": "motor1", + "device_type": "SynAxis", + "kwargs": "{}", + "target": 5, + "start_time": 1718700056.37859, + "start_pos ": 0, + "unit": "mm", + "positioner_name": "motor1", + "positioner": "SynAxis(prefix='', name='motor1', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])", + "finish_time": 1718700056.3788693, + "finish_pos": 5.0 + }, + "events": [], + "links": [], + "resource": { + "attributes": { + "service.name": "bluesky-docs-example" + }, + "schema_url": "" + } + } + { + "name": "Bluesky RunEngine set", + "context": { + "trace_id": "0x6a5abd70e7f74967975ecf64a863a12d", + "span_id": "0x4d6e0e523cfdfec8", + "trace_state": "[]" + }, + "kind": "SpanKind.INTERNAL", + "parent_id": "0x6d4272c02f6990f5", + "start_time": "2024-06-18T08:40:56.378468Z", + "end_time": "2024-06-18T08:40:56.378995Z", + "status": { + "status_code": "UNSET" + }, + "attributes": { + "msg.command": "set", + "msg.args": [ + 5 + ], + "msg.kwargs": "{\"group\": null}", + "msg.obj": "SynAxis(prefix='', name='motor1', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])" + }, + "events": [], + "links": [], + "resource": { + "attributes": { + "service.name": "bluesky-docs-example" + }, + "schema_url": "" + } + } + { + "name": "demo plan", + "context": { + "trace_id": "0x6a5abd70e7f74967975ecf64a863a12d", + "span_id": "0x6d4272c02f6990f5", + "trace_state": "[]" + }, + "kind": "SpanKind.INTERNAL", + "parent_id": null, + "start_time": "2024-06-18T08:40:56.378255Z", + "end_time": "2024-06-18T08:40:56.379044Z", + "status": { + "status_code": "UNSET" + }, + "attributes": {}, + "events": [], + "links": [], + "resource": { + "attributes": { + "service.name": "bluesky-docs-example" + }, + "schema_url": "" + } + } + { + "name": "Bluesky RunEngine run", + "context": { + "trace_id": "0xedc3dae09fb45d4982e0af29f053141c", + "span_id": "0xb54c64dcdebaa81a", + "trace_state": "[]" + }, + "kind": "SpanKind.INTERNAL", + "parent_id": null, + "start_time": "2024-06-18T08:40:56.377673Z", + "end_time": "2024-06-18T08:40:56.379335Z", + "status": { + "status_code": "UNSET" + }, + "attributes": { + "msg.command": "open_run", + "msg.args": [], + "msg.kwargs": "{}", + "msg.no_obj_given": true, + "exit_status": "success", + "reason": "" + }, + "events": [], + "links": [], + "resource": { + "attributes": { + "service.name": "bluesky-docs-example" + }, + "schema_url": "" + } + } From 6fd3fe96759f2bf13912b102d12a4b919373b6ff Mon Sep 17 00:00:00 2001 From: David Perl Date: Tue, 18 Jun 2024 11:26:53 +0100 Subject: [PATCH 16/17] (#1682) make json more robust --- docs/otel-tracing.rst | 13 ++++++++----- src/bluesky/run_engine.py | 2 +- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/docs/otel-tracing.rst b/docs/otel-tracing.rst index b4ac1ec76e..4ccc7f93ed 100644 --- a/docs/otel-tracing.rst +++ b/docs/otel-tracing.rst @@ -1,11 +1,14 @@ Tracing with Opentelemetry ========================== -Bluesky is instrumented with [OpenTelemetry](https://opentelemetry.io/) tracing span hooks on runs, and on potentially long-running RunEngine methods such as `wait()` and `collect()`. This can allow you to analyze where your plans are spending a lot of time. +.. role:: py(code) + :language: python + +Bluesky is instrumented with [OpenTelemetry](https://opentelemetry.io/) tracing span hooks on runs, and on potentially long-running RunEngine methods such as :py:`wait()` and :py:`collect()`. This can allow you to analyze where your plans are spending a lot of time. -By itself, this doesn't do anything. To collect tracing output you should configure an exporter in your application or plan, and a collector to send the data to. You can find an example of how to do that [here](https://opentelemetry.io/docs/languages/python/exporters/#usage). Since the `start_as_current_span` decorator from the opentelemetry library doesn't work for generators, we also provide a `trace_plan` decorator in `bluesky.tracing` +By itself, this doesn't do anything. To collect tracing output you should configure an exporter in your application or plan, and a collector to send the data to. You can find an example of how to do that [here](https://opentelemetry.io/docs/languages/python/exporters/#usage). Since the :py:`@start_as_current_span` decorator from the opentelemetry library doesn't work for generators, we also provide a :py:`@trace_plan` decorator in :py:`bluesky.tracing` -Tracing messages from the `RunEngine` are named `"Bluesky RunEngine "`, e.g. `"Bluesky RunEngine wait"`. Traces for runs are tagged with the `success`, `exit_status` as attributes, as well as the `reason` if one is available. Traces for methods are tagged with the content of the message (`msg.command`, `msg.args`, `msg.kwargs`, and `msg.obj`). Traces for `wait()` also log the `group` if one was given, or set `no_group_given` true if none was. Ophyd also has traces on `Status` objects to easily record how long they live for. +Tracing messages from the :py:`RunEngine` are named :py:`"Bluesky RunEngine "`, e.g. :py:`"Bluesky RunEngine wait"`. Traces for runs are tagged with the :py:`success`, :py:`exit_status` as attributes, as well as the :py:`reason` if one is available. Traces for methods are tagged with the content of the message (:py:`msg.command`, :py:`msg.args`, :py:`msg.kwargs`, and :py:`msg.obj`). Traces for :py:`wait()` also log the :py:`group` if one was given, or set :py:`no_group_given` true if none was. Ophyd also has traces on :py:`Status` objects to easily record how long they live for. Examples: --------- @@ -15,10 +18,10 @@ Using the following script, which runs a simple scan and sends traces to the con .. literalinclude:: otel-tracing-demo.py :language: python -We obtain the following trace data. Note that the innermost spans are closed first, and therefore printed to the console first. +We obtain this trace data. Note that the innermost spans are closed first, and therefore printed to the console first. You can see that the :code:`"parent_id"` of the :code:`Status` span corresponds to the :code:`"span_id"` of the :code:`set()` span, and so on. .. code-block:: JSON - + { "name": "Ophyd Status", "context": { diff --git a/src/bluesky/run_engine.py b/src/bluesky/run_engine.py index 4d2036ebe9..0f97ed0c16 100644 --- a/src/bluesky/run_engine.py +++ b/src/bluesky/run_engine.py @@ -2822,7 +2822,7 @@ def ignore_exceptions(self, val): def _set_span_msg_attributes(span, msg): span.set_attribute("msg.command", msg.command) span.set_attribute("msg.args", msg.args) - span.set_attribute("msg.kwargs", json.dumps(msg.kwargs)) + span.set_attribute("msg.kwargs", json.dumps(msg.kwargs, default=repr)) span.set_attribute("msg.obj", repr(msg.obj)) if msg.obj else span.set_attribute("msg.no_obj_given", True) From d91912dc3a16a0567329ec2f86cac07f82d30ca7 Mon Sep 17 00:00:00 2001 From: David Perl Date: Tue, 18 Jun 2024 11:36:16 +0100 Subject: [PATCH 17/17] (#1682) use RE fixture in test --- src/bluesky/tests/test_tracing.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/bluesky/tests/test_tracing.py b/src/bluesky/tests/test_tracing.py index 35923ca560..835a269908 100644 --- a/src/bluesky/tests/test_tracing.py +++ b/src/bluesky/tests/test_tracing.py @@ -3,7 +3,6 @@ from opentelemetry.trace import get_current_span from bluesky.plan_stubs import sleep -from bluesky.run_engine import RunEngine from bluesky.tracing import trace_plan, tracer @@ -16,11 +15,10 @@ def test_trace_plan_wrapper_gives_back_generator(): assert isinstance(_test_plan(), Generator) -def test_trace_plan_wrapper_opens_span_but_doesnt_do_anything_without_trace_provider(): +def test_trace_plan_wrapper_opens_span_but_doesnt_do_anything_without_trace_provider(RE): @trace_plan(tracer, "test_plan_2") def test_plan_2(): yield from sleep(0) assert not get_current_span().is_recording() - RE = RunEngine() RE(test_plan_2())