diff --git a/.github/workflows/integration-tests-against-emulator.yaml b/.github/workflows/integration-tests-against-emulator.yaml index 3a4390219d..88571c5e70 100644 --- a/.github/workflows/integration-tests-against-emulator.yaml +++ b/.github/workflows/integration-tests-against-emulator.yaml @@ -24,9 +24,17 @@ jobs: python-version: 3.8 - name: Install nox run: python -m pip install nox - - name: Run system tests + - name: Run system tests (without extended tracing) run: nox -s system env: SPANNER_EMULATOR_HOST: localhost:9010 GOOGLE_CLOUD_PROJECT: emulator-test-project GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE: true + EXTENDED_TRACING_ENABLED: false + - name: Run system tests (with extended tracing) + run: nox -s system + env: + SPANNER_EMULATOR_HOST: localhost:9010 + GOOGLE_CLOUD_PROJECT: emulator-test-project + GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE: true + EXTENDED_TRACING_ENABLED: true diff --git a/docs/opentelemetry-tracing.rst b/docs/opentelemetry-tracing.rst index 9b3dea276f..93c59c7a72 100644 --- a/docs/opentelemetry-tracing.rst +++ b/docs/opentelemetry-tracing.rst @@ -9,9 +9,7 @@ To take advantage of these traces, we first need to install OpenTelemetry: .. code-block:: sh pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation - - # [Optional] Installs the cloud monitoring exporter, however you can use any exporter of your choice - pip install opentelemetry-exporter-google-cloud + pip install opentelemetry-exporter-gcp-trace We also need to tell OpenTelemetry which exporter to use. To export Spanner traces to `Cloud Tracing `_, add the following lines to your application: @@ -19,22 +17,71 @@ We also need to tell OpenTelemetry which exporter to use. To export Spanner trac from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.trace.sampling import ProbabilitySampler + from opentelemetry.sdk.trace.sampling import TraceIdRatioBased from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter - # BatchExportSpanProcessor exports spans to Cloud Trace + # BatchSpanProcessor exports spans to Cloud Trace # in a seperate thread to not block on the main thread - from opentelemetry.sdk.trace.export import BatchExportSpanProcessor + from opentelemetry.sdk.trace.export import BatchSpanProcessor # Create and export one trace every 1000 requests - sampler = ProbabilitySampler(1/1000) + sampler = TraceIdRatioBased(1/1000) # Use the default tracer provider trace.set_tracer_provider(TracerProvider(sampler=sampler)) trace.get_tracer_provider().add_span_processor( # Initialize the cloud tracing exporter - BatchExportSpanProcessor(CloudTraceSpanExporter()) + BatchSpanProcessor(CloudTraceSpanExporter()) + ) + + +Alternatively you can pass in a tracer provider into the Cloud Spanner +initialization, otherwise the global tracer will be used: + +.. code:: python + + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.sampling import TraceIdRatioBased + from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter + + # Create and export one trace every 1000 requests + sampler = TraceIdRatioBased(1/1000) + tracerProvider = TracerProvider(sampler=sampler) + tracerProvider.add_span_processor( + # Initialize the cloud tracing exporter + BatchSpanProcessor(CloudTraceSpanExporter()) ) + options = dict(tracer_provider=tracerProvider) + # Pass the tracer provider while creating the Spanner client. + spanner_client = spanner.Client(observability_options=options) + instance = spanner_client.instance(instance_id) + database = instance.database(database_id) + + +To get more fine-grained traces from gRPC, you can enable the gRPC instrumentation by the following + +.. code-block:: sh + + pip install opentelemetry-instrumentation-grpc + +and then in your Python code, please add the following lines: + +.. code:: python + + from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient + grpc_client_instrumentor = GrpcInstrumentorClient() + grpc_client_instrumentor.instrument() + + Generated spanner traces should now be available on `Cloud Trace `_. Tracing is most effective when many libraries are instrumented to provide insight over the entire lifespan of a request. For a list of libraries that can be instrumented, see the `OpenTelemetry Integrations` section of the `OpenTelemetry Python docs `_ + +To allow for SQL statements to be annotated in your spans, please set +the environment variable `SPANNER_ENABLE_EXTENDED_TRACING=true` or please set the configuration field `enable_extended_tracing` to `True` when configuring the Cloud Spanner client, like this: + +.. code:: python + + tracerProvider = TracerProvider(sampler=sampler) + opts = dict(tracer_provider=tracerProvider, enable_extended_tracing=true) + spanner_client = spanner.Client(project_id, observability_options=opts) diff --git a/examples/grpc_instrumentation_enabled.py b/examples/grpc_instrumentation_enabled.py new file mode 100644 index 0000000000..98a16cee69 --- /dev/null +++ b/examples/grpc_instrumentation_enabled.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +import os +import time + +import google.cloud.spanner as spanner +from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.sdk.trace.sampling import ALWAYS_ON +from opentelemetry import trace + +# Enable the gRPC instrumentation if you'd like more introspection. +from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient +grpc_client_instrumentor = GrpcInstrumentorClient() +grpc_client_instrumentor.instrument() + +def main(): + # Setup common variables that'll be used between Spanner and traces. + project_id = os.environ.get('SPANNER_PROJECT_ID') + + # Setup OpenTelemetry, trace and Cloud Trace exporter. + sampler = ALWAYS_ON + tracerProvider = TracerProvider(sampler=sampler) + traceExporter = CloudTraceSpanExporter(project_id) + tracerProvider.add_span_processor( + BatchSpanProcessor(traceExporter)) + trace.set_tracer_provider(tracerProvider) + # Retrieve the set shared tracer. + tracer = tracerProvider.get_tracer('cloud.google.com/python/spanner', spanner.__version__) + + # Setup the Cloud Spanner Client. + # Here we directly pass in the tracerProvider into the spanner client. + opts = dict(tracer_provider=tracerProvider) + spanner_client = spanner.Client(project_id, observability_options=opts) + + instance = spanner_client.instance('test-instance') + database = instance.database('test-db') + + # Now run our queries + with tracer.start_as_current_span('QueryInformationSchema'): + with database.snapshot() as snapshot: + with tracer.start_as_current_span('InformationSchema'): + info_schema = snapshot.execute_sql( + 'SELECT * FROM INFORMATION_SCHEMA.TABLES') + for row in info_schema: + print(row) + + with tracer.start_as_current_span('ServerTimeQuery'): + with database.snapshot() as snapshot: + # Purposefully issue a bad SQL statement to examine exceptions + # that get recorded and a ERROR span status. + data = snapshot.execute_sql('SELECT CURRENT_TIMESTAMP()') + for row in data: + print(row) + +if __name__ == '__main__': + main() diff --git a/examples/trace.py b/examples/trace.py new file mode 100644 index 0000000000..fd61f3aa04 --- /dev/null +++ b/examples/trace.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License + +import os +import time + +import google.cloud.spanner as spanner +from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.sdk.trace.sampling import ALWAYS_ON +from opentelemetry import trace + +def main(): + # Setup common variables that'll be used between Spanner and traces. + project_id = os.environ.get('SPANNER_PROJECT_ID') + + # Setup OpenTelemetry, trace and Cloud Trace exporter. + sampler = ALWAYS_ON + tracerProvider = TracerProvider(sampler=sampler) + traceExporter = CloudTraceSpanExporter(project_id) + tracerProvider.add_span_processor( + BatchSpanProcessor(traceExporter)) + trace.set_tracer_provider(tracerProvider) + # Retrieve the set shared tracer. + tracer = tracerProvider.get_tracer('MyPackage') + + # Setup the Cloud Spanner Client. + spanner_client = spanner.Client(project_id) + # Alternatively you can directly pass in the tracerProvider into + # the spanner client, otherwise the global tracer shall be used. + if False: + opts = dict(tracer_provider=tracerProvider) + spanner_client = spanner.Client(project_id, observability_options=opts) + + instance = spanner_client.instance('test-instance') + database = instance.database('test-db') + + # Now run our queries + with tracer.start_as_current_span('QueryInformationSchema'): + with database.snapshot() as snapshot: + with tracer.start_as_current_span('InformationSchema'): + info_schema = snapshot.execute_sql( + 'SELECT * FROM INFORMATION_SCHEMA.TABLES') + for row in info_schema: + print(row) + + with tracer.start_as_current_span('ServerTimeQuery'): + with database.snapshot() as snapshot: + # Purposefully issue a bad SQL statement to examine exceptions + # that get recorded and a ERROR span status. + data = snapshot.execute_sql('SELECT CURRENT_TIMESTAMP()') + for row in data: + print(row) + + +if __name__ == '__main__': + main() diff --git a/google/cloud/spanner_v1/_helpers.py b/google/cloud/spanner_v1/_helpers.py index a1d6a60cb0..21a21b5992 100644 --- a/google/cloud/spanner_v1/_helpers.py +++ b/google/cloud/spanner_v1/_helpers.py @@ -358,8 +358,9 @@ class _SessionWrapper(object): :param session: the session used to perform the commit """ - def __init__(self, session): + def __init__(self, session, observability_options=None): self._session = session + self._observability_options = observability_options def _metadata_with_prefix(prefix, **kw): diff --git a/google/cloud/spanner_v1/_opentelemetry_tracing.py b/google/cloud/spanner_v1/_opentelemetry_tracing.py index 8f9f8559ef..161c2666c0 100644 --- a/google/cloud/spanner_v1/_opentelemetry_tracing.py +++ b/google/cloud/spanner_v1/_opentelemetry_tracing.py @@ -15,39 +15,95 @@ """Manages OpenTelemetry trace creation and handling""" from contextlib import contextmanager +import os from google.api_core.exceptions import GoogleAPICallError from google.cloud.spanner_v1 import SpannerClient +from google.cloud.spanner_v1 import gapic_version as LIB_VERSION try: from opentelemetry import trace from opentelemetry.trace.status import Status, StatusCode + from opentelemetry.semconv.trace import SpanAttributes + from opentelemetry.semconv.attributes import ( + OTEL_SCOPE_NAME, + OTEL_SCOPE_VERSION, + ) HAS_OPENTELEMETRY_INSTALLED = True -except ImportError: + DB_SYSTEM = SpanAttributes.DB_SYSTEM + DB_NAME = SpanAttributes.DB_NAME + DB_CONNECTION_STRING = SpanAttributes.DB_CONNECTION_STRING + NET_HOST_NAME = SpanAttributes.NET_HOST_NAME + DB_STATEMENT = SpanAttributes.DB_STATEMENT +except ImportError as e: HAS_OPENTELEMETRY_INSTALLED = False + DB_STATEMENT = 'db.statement' + + +EXTENDED_TRACING_ENABLED = os.environ.get('SPANNER_ENABLE_EXTENDED_TRACING', '') == 'true' +LIB_FQNAME = 'cloud.google.com/python/spanner' +TRACER_NAME = LIB_FQNAME +TRACER_VERSION = LIB_VERSION + +def get_tracer(tracer_provider=None): + """ + get_tracer is a utility to unify and simplify retrieval of the tracer, without + leaking implementation details given that retrieving a tracer requires providing + the full qualified library name and version. + When the tracer_provider is set, it'll retrieve the tracer from it, otherwise + it'll fall back to the global tracer provider and use this library's specific semantics. + """ + if not tracer_provider: + # Acquire the global tracer provider. + tracer_provider = trace.get_tracer_provider() + + return tracer_provider.get_tracer(TRACER_NAME, TRACER_VERSION) @contextmanager -def trace_call(name, session, extra_attributes=None): +def trace_call(name, session, extra_attributes=None, observability_options=None): if not HAS_OPENTELEMETRY_INSTALLED or not session: # Empty context manager. Users will have to check if the generated value is None or a span yield None return - tracer = trace.get_tracer(__name__) + tracer = None + if observability_options: + tracerProvider = observability_options.get('tracer_provider', None) + if tracerProvider: + tracer = get_tracer(tracerProvider) + + if tracer is None: # Acquire the global tracer if none was provided. + tracer = get_tracer() + + # It is imperative that we properly record the Cloud Spanner + # endpoint tracks whether we are connecting to the emulator + # or to production. + spanner_endpoint = os.getenv("SPANNER_EMULATOR_HOST") + if not spanner_endpoint: + spanner_endpoint = SpannerClient.DEFAULT_ENDPOINT # Set base attributes that we know for every trace created attributes = { - "db.type": "spanner", - "db.url": SpannerClient.DEFAULT_ENDPOINT, - "db.instance": session._database.name, - "net.host.name": SpannerClient.DEFAULT_ENDPOINT, + DB_SYSTEM: "spanner", + DB_CONNECTION_STRING: spanner_endpoint, + DB_NAME: session._database.name, + NET_HOST_NAME: spanner_endpoint, + OTEL_SCOPE_NAME: LIB_FQNAME, + OTEL_SCOPE_VERSION: TRACER_VERSION, } if extra_attributes: attributes.update(extra_attributes) + extended_tracing = EXTENDED_TRACING_ENABLED or ( + observability_options and + observability_options.get('enable_extended_tracing', False)) + + if not extended_tracing: + attributes.pop(DB_STATEMENT, None) + with tracer.start_as_current_span( name, kind=trace.SpanKind.CLIENT, attributes=attributes ) as span: diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index e3d681189c..07cb3df5d5 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -205,7 +205,8 @@ def commit( max_commit_delay=max_commit_delay, request_options=request_options, ) - with trace_call("CloudSpanner.Commit", self._session, trace_attributes): + with trace_call("CloudSpanner.Commit", self._session, trace_attributes, + observability_options=self._observability_options): method = functools.partial( api.commit, request=request, @@ -318,7 +319,8 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals request_options=request_options, exclude_txn_from_change_streams=exclude_txn_from_change_streams, ) - with trace_call("CloudSpanner.BatchWrite", self._session, trace_attributes): + with trace_call("CloudSpanner.BatchWrite", self._session, trace_attributes, + observability_options=self._observability_options): method = functools.partial( api.batch_write, request=request, diff --git a/google/cloud/spanner_v1/client.py b/google/cloud/spanner_v1/client.py index f8f3fdb72c..871b6a2369 100644 --- a/google/cloud/spanner_v1/client.py +++ b/google/cloud/spanner_v1/client.py @@ -146,6 +146,7 @@ def __init__( query_options=None, route_to_leader_enabled=True, directed_read_options=None, + observability_options=None, ): self._emulator_host = _get_spanner_emulator_host() @@ -187,6 +188,7 @@ def __init__( self._route_to_leader_enabled = route_to_leader_enabled self._directed_read_options = directed_read_options + self._observability_options = observability_options @property def credentials(self): @@ -371,6 +373,7 @@ def instance( self._emulator_host, labels, processing_units, + self._observability_options, ) def list_instances(self, filter_="", page_size=None): diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 6bd4f3703e..c991930d2f 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -156,6 +156,7 @@ def __init__( database_role=None, enable_drop_protection=False, proto_descriptors=None, + observability_options=None, ): self.database_id = database_id self._instance = instance @@ -178,6 +179,7 @@ def __init__( self._reconciling = False self._directed_read_options = self._instance._client.directed_read_options self._proto_descriptors = proto_descriptors + self._observability_options = observability_options if pool is None: pool = BurstyPool(database_role=database_role) diff --git a/google/cloud/spanner_v1/instance.py b/google/cloud/spanner_v1/instance.py index a67e0e630b..cbb8cd35a3 100644 --- a/google/cloud/spanner_v1/instance.py +++ b/google/cloud/spanner_v1/instance.py @@ -122,6 +122,7 @@ def __init__( emulator_host=None, labels=None, processing_units=None, + observability_options=None, ): self.instance_id = instance_id self._client = client @@ -145,6 +146,7 @@ def __init__( if labels is None: labels = {} self.labels = labels + self._observability_options = observability_options def _update_from_pb(self, instance_pb): """Refresh self from the server-provided protobuf. @@ -436,6 +438,7 @@ def database( # should be only set for tests if tests want to use interceptors enable_interceptors_in_tests=False, proto_descriptors=None, + observability_options=None, ): """Factory to create a database within this instance. @@ -499,6 +502,7 @@ def database( database_role=database_role, enable_drop_protection=enable_drop_protection, proto_descriptors=proto_descriptors, + observability_options=observability_options or self._observability_options, ) else: return TestDatabase( @@ -511,6 +515,7 @@ def database( database_dialect=database_dialect, database_role=database_role, enable_drop_protection=enable_drop_protection, + observability_options=observability_options or self._observability_options, ) def list_databases(self, page_size=None): diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index 28280282f4..8645eb4e35 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -63,12 +63,13 @@ class Session(object): _session_id = None _transaction = None - def __init__(self, database, labels=None, database_role=None): + def __init__(self, database, labels=None, database_role=None, observability_options=None): self._database = database if labels is None: labels = {} self._labels = labels self._database_role = database_role + self._observability_options = observability_options def __lt__(self, other): return self._session_id < other._session_id @@ -142,7 +143,8 @@ def create(self): if self._labels: request.session.labels = self._labels - with trace_call("CloudSpanner.CreateSession", self, self._labels): + with trace_call("CloudSpanner.CreateSession", self, self._labels, + observability_options=self._observability_options): session_pb = api.create_session( request=request, metadata=metadata, @@ -169,7 +171,8 @@ def exists(self): ) ) - with trace_call("CloudSpanner.GetSession", self) as span: + opts = self._observability_options + with trace_call("CloudSpanner.GetSession", self, observability_options=opts) as span: try: api.get_session(name=self.name, metadata=metadata) if span: @@ -194,7 +197,8 @@ def delete(self): raise ValueError("Session ID not set by back-end") api = self._database.spanner_api metadata = _metadata_with_prefix(self._database.name) - with trace_call("CloudSpanner.DeleteSession", self): + opts = self._observability_options + with trace_call("CloudSpanner.DeleteSession", self, observability_options=opts): api.delete_session(name=self.name, metadata=metadata) def ping(self): @@ -226,6 +230,9 @@ def snapshot(self, **kw): if self._session_id is None: raise ValueError("Session has not been created.") + # Pass the observability options in to create the snapshot. + kw['observability_options'] = self._observability_options + return Snapshot(self, **kw) def read(self, table, columns, keyset, index="", limit=0, column_info=None): diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index 3bc1a746bd..329e265513 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -38,7 +38,10 @@ _check_rst_stream_error, _SessionWrapper, ) -from google.cloud.spanner_v1._opentelemetry_tracing import trace_call +from google.cloud.spanner_v1._opentelemetry_tracing import ( + trace_call, + DB_STATEMENT, +) from google.cloud.spanner_v1.streamed import StreamedResultSet from google.cloud.spanner_v1 import RequestOptions @@ -56,6 +59,7 @@ def _restart_on_unavailable( attributes=None, transaction=None, transaction_selector=None, + observability_options=None, ): """Restart iteration after :exc:`.ServiceUnavailable`. @@ -84,7 +88,7 @@ def _restart_on_unavailable( ) request.transaction = transaction_selector - with trace_call(trace_name, session, attributes): + with trace_call(trace_name, session, attributes, observability_options=observability_options): iterator = method(request=request) while True: try: @@ -104,7 +108,7 @@ def _restart_on_unavailable( break except ServiceUnavailable: del item_buffer[:] - with trace_call(trace_name, session, attributes): + with trace_call(trace_name, session, attributes, observability_options=observability_options): request.resume_token = resume_token if transaction is not None: transaction_selector = transaction._make_txn_selector() @@ -119,7 +123,7 @@ def _restart_on_unavailable( if not resumable_error: raise del item_buffer[:] - with trace_call(trace_name, session, attributes): + with trace_call(trace_name, session, attributes, observability_options=observability_options): request.resume_token = resume_token if transaction is not None: transaction_selector = transaction._make_txn_selector() @@ -306,7 +310,7 @@ def read( iterator = _restart_on_unavailable( restart, request, - "CloudSpanner.ReadOnlyTransaction", + "Snapshot.read", self._session, trace_attributes, transaction=self, @@ -322,7 +326,7 @@ def read( iterator = _restart_on_unavailable( restart, request, - "CloudSpanner.ReadOnlyTransaction", + "Snapshot.read", self._session, trace_attributes, transaction=self, @@ -488,7 +492,7 @@ def execute_sql( timeout=timeout, ) - trace_attributes = {"db.statement": sql} + trace_attributes = {DB_STATEMENT: sql} if self._transaction_id is None: # lock is added to handle the inline begin for first rpc @@ -598,7 +602,8 @@ def partition_read( trace_attributes = {"table_id": table, "columns": columns} with trace_call( - "CloudSpanner.PartitionReadOnlyTransaction", self._session, trace_attributes + "CloudSpanner.PartitionReadOnlyTransaction", self._session, trace_attributes, + observability_options=self._observability_options, ): method = functools.partial( api.partition_read, @@ -696,11 +701,12 @@ def partition_query( partition_options=partition_options, ) - trace_attributes = {"db.statement": sql} + trace_attributes = {DB_STATEMENT: sql} with trace_call( "CloudSpanner.PartitionReadWriteTransaction", self._session, trace_attributes, + observability_options=self._observability_options, ): method = functools.partial( api.partition_query, @@ -761,6 +767,7 @@ def __init__( exact_staleness=None, multi_use=False, transaction_id=None, + observability_options=None, ): super(Snapshot, self).__init__(session) opts = [read_timestamp, min_read_timestamp, max_staleness, exact_staleness] @@ -784,6 +791,7 @@ def __init__( self._exact_staleness = exact_staleness self._multi_use = multi_use self._transaction_id = transaction_id + self._observability_options = observability_options def _make_txn_selector(self): """Helper for :meth:`read`.""" @@ -843,7 +851,8 @@ def begin(self): (_metadata_with_leader_aware_routing(database._route_to_leader_enabled)) ) txn_selector = self._make_txn_selector() - with trace_call("CloudSpanner.BeginTransaction", self._session): + opts = self._observability_options + with trace_call("CloudSpanner.BeginTransaction", self._session, observability_options=opts): method = functools.partial( api.begin_transaction, session=self._session.name, diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index c872cc380d..a9321f30dd 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -32,7 +32,10 @@ from google.cloud.spanner_v1 import TransactionOptions from google.cloud.spanner_v1.snapshot import _SnapshotBase from google.cloud.spanner_v1.batch import _BatchBase -from google.cloud.spanner_v1._opentelemetry_tracing import trace_call +from google.cloud.spanner_v1._opentelemetry_tracing import ( + DB_STATEMENT, + trace_call, +) from google.cloud.spanner_v1 import RequestOptions from google.api_core import gapic_v1 from google.api_core.exceptions import InternalServerError @@ -110,7 +113,8 @@ def _execute_request( """ transaction = self._make_txn_selector() request.transaction = transaction - with trace_call(trace_name, session, attributes): + with trace_call(trace_name, session, attributes, + observability_options=self._observability_options): method = functools.partial(method, request=request) response = _retry( method, @@ -147,7 +151,8 @@ def begin(self): read_write=TransactionOptions.ReadWrite(), exclude_txn_from_change_streams=self.exclude_txn_from_change_streams, ) - with trace_call("CloudSpanner.BeginTransaction", self._session): + with trace_call("CloudSpanner.BeginTransaction", self._session, + observability_options=self._observability_options): method = functools.partial( api.begin_transaction, session=self._session.name, @@ -175,7 +180,8 @@ def rollback(self): database._route_to_leader_enabled ) ) - with trace_call("CloudSpanner.Rollback", self._session): + with trace_call("CloudSpanner.Rollback", self._session, + observability_options=self._observability_options): method = functools.partial( api.rollback, session=self._session.name, @@ -248,7 +254,8 @@ def commit( max_commit_delay=max_commit_delay, request_options=request_options, ) - with trace_call("CloudSpanner.Commit", self._session, trace_attributes): + with trace_call("CloudSpanner.Commit", self._session, trace_attributes, + observability_options=self._observability_options): method = functools.partial( api.commit, request=request, @@ -369,7 +376,7 @@ def execute_update( request_options = RequestOptions(request_options) request_options.transaction_tag = self.transaction_tag - trace_attributes = {"db.statement": dml} + trace_attributes = {DB_STATEMENT: dml} request = ExecuteSqlRequest( session=self._session.name, @@ -495,7 +502,7 @@ def batch_update( trace_attributes = { # Get just the queries from the DML statement batch - "db.statement": ";".join([statement.sql for statement in parsed]) + DB_STATEMENT: ";".join([statement.sql for statement in parsed]) } request = ExecuteBatchDmlRequest( session=self._session.name, diff --git a/setup.py b/setup.py index 98b1a61748..9efd0f3626 100644 --- a/setup.py +++ b/setup.py @@ -47,9 +47,10 @@ ] extras = { "tracing": [ - "opentelemetry-api >= 1.1.0", - "opentelemetry-sdk >= 1.1.0", - "opentelemetry-instrumentation >= 0.20b0, < 0.23dev", + "opentelemetry-api >= 1.24.0", + "opentelemetry-sdk >= 1.24.0", + "opentelemetry-instrumentation >= 0.46b0", + "opentelemetry-semantic-conventions >= 0.46b0", ], "libcst": "libcst >= 0.2.5", } diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index 20170203f5..ec9cd244b5 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -10,9 +10,10 @@ grpc-google-iam-v1==0.12.4 libcst==0.2.5 proto-plus==1.22.0 sqlparse==0.4.4 -opentelemetry-api==1.1.0 -opentelemetry-sdk==1.1.0 -opentelemetry-instrumentation==0.20b0 +opentelemetry-api>=1.24.0 +opentelemetry-sdk>=1.24.0 +opentelemetry-instrumentation==0.46b0 +opentelemetry-semantic-conventions==0.46b0 protobuf==3.20.2 deprecated==1.2.14 grpc-interceptor==0.15.4 diff --git a/tests/_helpers.py b/tests/_helpers.py index 42178fd439..c9ab05d865 100644 --- a/tests/_helpers.py +++ b/tests/_helpers.py @@ -1,3 +1,4 @@ +import os import unittest import mock @@ -10,16 +11,37 @@ ) from opentelemetry.trace.status import StatusCode + from opentelemetry.semconv.trace import SpanAttributes + from opentelemetry.semconv.attributes import ( + OTEL_SCOPE_NAME, + OTEL_SCOPE_VERSION, + ) + trace.set_tracer_provider(TracerProvider()) HAS_OPENTELEMETRY_INSTALLED = True + + DB_SYSTEM = SpanAttributes.DB_SYSTEM + DB_NAME = SpanAttributes.DB_NAME + DB_CONNECTION_STRING = SpanAttributes.DB_CONNECTION_STRING + NET_HOST_NAME = SpanAttributes.NET_HOST_NAME + DB_STATEMENT = SpanAttributes.DB_STATEMENT + except ImportError: HAS_OPENTELEMETRY_INSTALLED = False StatusCode = mock.Mock() + DB_SYSTEM = "db.system" + DB_NAME = "db.name" + DB_CONNECTION_STRING = "db.connection_string" + NET_HOST_NAME = "net.host.name" + DB_STATEMENT = "db.statement" + OTEL_SCOPE_NAME = "otel.scope.name" + OTEL_SCOPE_VERSION = "otel.scope.version" _TEST_OT_EXPORTER = None _TEST_OT_PROVIDER_INITIALIZED = False +EXTENDED_TRACING_ENABLED = os.environ.get('SPANNER_ENABLE_EXTENDED_TRACING', '') == 'true' def get_test_ot_exporter(): diff --git a/tests/system/test_session_api.py b/tests/system/test_session_api.py index 31e38f967a..0731b2c1f1 100644 --- a/tests/system/test_session_api.py +++ b/tests/system/test_session_api.py @@ -341,10 +341,10 @@ def assert_span_attributes( def _make_attributes(db_instance, **kwargs): attributes = { - "db.type": "spanner", - "db.url": "spanner.googleapis.com", - "net.host.name": "spanner.googleapis.com", - "db.instance": db_instance, + ot_helpers.DB_SYSTEM: "spanner", + ot_helpers.DB_CONNECTION_STRING: "spanner.googleapis.com", + ot_helpers.DB_NAME: db_instance, + ot_helpers.NET_HOST_NAME: "spanner.googleapis.com", } attributes.update(kwargs) diff --git a/tests/unit/test__opentelemetry_tracing.py b/tests/unit/test__opentelemetry_tracing.py index 25870227bf..8165758eb7 100644 --- a/tests/unit/test__opentelemetry_tracing.py +++ b/tests/unit/test__opentelemetry_tracing.py @@ -12,8 +12,16 @@ from google.api_core.exceptions import GoogleAPICallError from google.cloud.spanner_v1 import _opentelemetry_tracing -from tests._helpers import OpenTelemetryBase, HAS_OPENTELEMETRY_INSTALLED - +from tests._helpers import ( + OpenTelemetryBase, + StatusCode, + DB_SYSTEM, + DB_NAME, + DB_CONNECTION_STRING, + HAS_OPENTELEMETRY_INSTALLED, + NET_HOST_NAME, + OTEL_SCOPE_NAME, +) def _make_rpc_error(error_cls, trailing_metadata=None): import grpc @@ -51,14 +59,15 @@ class TestTracing(OpenTelemetryBase): def test_trace_call(self): extra_attributes = { "attribute1": "value1", - # Since our database is mocked, we have to override the db.instance parameter so it is a string - "db.instance": "database_name", + # Since our database is mocked, we have to override the DB_NAME parameter so it is a string + DB_NAME: "database_name", } expected_attributes = { - "db.type": "spanner", - "db.url": "spanner.googleapis.com", - "net.host.name": "spanner.googleapis.com", + DB_SYSTEM: "spanner", + DB_CONNECTION_STRING: "spanner.googleapis.com", + NET_HOST_NAME: "spanner.googleapis.com", + OTEL_SCOPE_NAME: "cloud.google.com/python/spanner", } expected_attributes.update(extra_attributes) @@ -78,13 +87,15 @@ def test_trace_call(self): self.assertEqual(span.status.status_code, StatusCode.OK) def test_trace_error(self): - extra_attributes = {"db.instance": "database_name"} + extra_attributes = {DB_NAME: "database_name"} expected_attributes = { - "db.type": "spanner", - "db.url": "spanner.googleapis.com", - "net.host.name": "spanner.googleapis.com", + DB_SYSTEM: "spanner", + DB_CONNECTION_STRING: "spanner.googleapis.com", + NET_HOST_NAME: "spanner.googleapis.com", + OTEL_SCOPE_NAME: "cloud.google.com/python/spanner", } + expected_attributes.update(extra_attributes) with self.assertRaises(GoogleAPICallError): @@ -104,13 +115,15 @@ def test_trace_error(self): self.assertEqual(span.status.status_code, StatusCode.ERROR) def test_trace_grpc_error(self): - extra_attributes = {"db.instance": "database_name"} + extra_attributes = {DB_NAME: "database_name"} expected_attributes = { - "db.type": "spanner", - "db.url": "spanner.googleapis.com:443", - "net.host.name": "spanner.googleapis.com:443", + DB_SYSTEM: "spanner", + DB_CONNECTION_STRING: "spanner.googleapis.com", + NET_HOST_NAME: "spanner.googleapis.com", + OTEL_SCOPE_NAME: "cloud.google.com/python/spanner", } + expected_attributes.update(extra_attributes) with self.assertRaises(GoogleAPICallError): @@ -127,13 +140,15 @@ def test_trace_grpc_error(self): self.assertEqual(span.status.status_code, StatusCode.ERROR) def test_trace_codeless_error(self): - extra_attributes = {"db.instance": "database_name"} + extra_attributes = {DB_NAME: "database_name"} expected_attributes = { - "db.type": "spanner", - "db.url": "spanner.googleapis.com:443", - "net.host.name": "spanner.googleapis.com:443", + DB_SYSTEM: "spanner", + DB_CONNECTION_STRING: "spanner.googleapis.com", + NET_HOST_NAME: "spanner.googleapis.com", + OTEL_SCOPE_NAME: "cloud.google.com/python/spanner", } + expected_attributes.update(extra_attributes) with self.assertRaises(GoogleAPICallError): diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index ee96decf5e..f675094f57 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -14,7 +14,14 @@ import unittest -from tests._helpers import OpenTelemetryBase, StatusCode +from tests._helpers import ( + OpenTelemetryBase, + StatusCode, + DB_SYSTEM, + DB_NAME, + DB_CONNECTION_STRING, + NET_HOST_NAME, +) from google.cloud.spanner_v1 import RequestOptions TABLE_NAME = "citizens" @@ -24,10 +31,10 @@ ["bharney@example.com", "Bharney", "Rhubble", 31], ] BASE_ATTRIBUTES = { - "db.type": "spanner", - "db.url": "spanner.googleapis.com", - "db.instance": "testing", - "net.host.name": "spanner.googleapis.com", + DB_SYSTEM: "spanner", + DB_CONNECTION_STRING: "spanner.googleapis.com", + DB_NAME: "testing", + NET_HOST_NAME: "spanner.googleapis.com", } diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index d4052f0ae3..d47669ac2e 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -20,6 +20,10 @@ OpenTelemetryBase, StatusCode, HAS_OPENTELEMETRY_INSTALLED, + DB_SYSTEM, + DB_NAME, + DB_CONNECTION_STRING, + NET_HOST_NAME, ) @@ -46,10 +50,10 @@ class TestSession(OpenTelemetryBase): SESSION_NAME = DATABASE_NAME + "/sessions/" + SESSION_ID DATABASE_ROLE = "dummy-role" BASE_ATTRIBUTES = { - "db.type": "spanner", - "db.url": "spanner.googleapis.com", - "db.instance": DATABASE_NAME, - "net.host.name": "spanner.googleapis.com", + DB_SYSTEM: "spanner", + DB_CONNECTION_STRING: "spanner.googleapis.com", + DB_NAME: DATABASE_NAME, + NET_HOST_NAME: "spanner.googleapis.com", } def _getTargetClass(self): diff --git a/tests/unit/test_snapshot.py b/tests/unit/test_snapshot.py index bf5563dcfd..d810bc9fae 100644 --- a/tests/unit/test_snapshot.py +++ b/tests/unit/test_snapshot.py @@ -20,7 +20,13 @@ from tests._helpers import ( OpenTelemetryBase, StatusCode, + EXTENDED_TRACING_ENABLED, HAS_OPENTELEMETRY_INSTALLED, + DB_STATEMENT, + DB_SYSTEM, + DB_NAME, + DB_CONNECTION_STRING, + NET_HOST_NAME, ) from google.cloud.spanner_v1.param_types import INT64 from google.api_core.retry import Retry @@ -41,10 +47,10 @@ SECONDS = 3 MICROS = 123456 BASE_ATTRIBUTES = { - "db.type": "spanner", - "db.url": "spanner.googleapis.com", - "db.instance": "testing", - "net.host.name": "spanner.googleapis.com", + DB_SYSTEM: "spanner", + DB_CONNECTION_STRING: "spanner.googleapis.com", + DB_NAME: "testing", + NET_HOST_NAME: "spanner.googleapis.com", } DIRECTED_READ_OPTIONS = { "include_replicas": { @@ -531,10 +537,10 @@ def test_iteration_w_multiple_span_creation(self): self.assertEqual( dict(span.attributes), { - "db.type": "spanner", - "db.url": "spanner.googleapis.com", - "db.instance": "testing", - "net.host.name": "spanner.googleapis.com", + DB_SYSTEM: "spanner", + DB_CONNECTION_STRING: "spanner.googleapis.com", + DB_NAME: "testing", + NET_HOST_NAME: "spanner.googleapis.com", }, ) @@ -862,11 +868,18 @@ def test_execute_sql_other_error(self): self.assertEqual(derived._execute_sql_count, 1) - self.assertSpanAttributes( - "CloudSpanner.ReadWriteTransaction", - status=StatusCode.ERROR, - attributes=dict(BASE_ATTRIBUTES, **{"db.statement": SQL_QUERY}), - ) + if EXTENDED_TRACING_ENABLED: + self.assertSpanAttributes( + "CloudSpanner.ReadWriteTransaction", + status=StatusCode.ERROR, + attributes=dict(BASE_ATTRIBUTES, **{DB_STATEMENT: SQL_QUERY}), + ) + else: + self.assertSpanAttributes( + "CloudSpanner.ReadWriteTransaction", + status=StatusCode.ERROR, + attributes=BASE_ATTRIBUTES, + ) def _execute_sql_helper( self, @@ -1018,11 +1031,18 @@ def _execute_sql_helper( self.assertEqual(derived._execute_sql_count, sql_count + 1) - self.assertSpanAttributes( - "CloudSpanner.ReadWriteTransaction", - status=StatusCode.OK, - attributes=dict(BASE_ATTRIBUTES, **{"db.statement": SQL_QUERY_WITH_PARAM}), - ) + if EXTENDED_TRACING_ENABLED: + self.assertSpanAttributes( + "CloudSpanner.ReadWriteTransaction", + status=StatusCode.OK, + attributes=dict(BASE_ATTRIBUTES, **{DB_STATEMENT: SQL_QUERY_WITH_PARAM}), + ) + else: + self.assertSpanAttributes( + "CloudSpanner.ReadWriteTransaction", + status=StatusCode.OK, + attributes=BASE_ATTRIBUTES, + ) def test_execute_sql_wo_multi_use(self): self._execute_sql_helper(multi_use=False) @@ -1363,11 +1383,18 @@ def _partition_query_helper( timeout=timeout, ) - self.assertSpanAttributes( - "CloudSpanner.PartitionReadWriteTransaction", - status=StatusCode.OK, - attributes=dict(BASE_ATTRIBUTES, **{"db.statement": SQL_QUERY_WITH_PARAM}), - ) + if EXTENDED_TRACING_ENABLED: + self.assertSpanAttributes( + "CloudSpanner.PartitionReadWriteTransaction", + status=StatusCode.OK, + attributes=dict(BASE_ATTRIBUTES, **{DB_STATEMENT: SQL_QUERY_WITH_PARAM}), + ) + else: + self.assertSpanAttributes( + "CloudSpanner.PartitionReadWriteTransaction", + status=StatusCode.OK, + attributes=BASE_ATTRIBUTES, + ) def test_partition_query_other_error(self): database = _Database() @@ -1381,11 +1408,18 @@ def test_partition_query_other_error(self): with self.assertRaises(RuntimeError): list(derived.partition_query(SQL_QUERY)) - self.assertSpanAttributes( - "CloudSpanner.PartitionReadWriteTransaction", - status=StatusCode.ERROR, - attributes=dict(BASE_ATTRIBUTES, **{"db.statement": SQL_QUERY}), - ) + if EXTENDED_TRACING_ENABLED: + self.assertSpanAttributes( + "CloudSpanner.PartitionReadWriteTransaction", + status=StatusCode.ERROR, + attributes=dict(BASE_ATTRIBUTES, **{DB_STATEMENT: SQL_QUERY}), + ) + else: + self.assertSpanAttributes( + "CloudSpanner.PartitionReadWriteTransaction", + status=StatusCode.ERROR, + attributes=BASE_ATTRIBUTES, + ) def test_partition_query_single_use_raises(self): with self.assertRaises(ValueError): diff --git a/tests/unit/test_spanner.py b/tests/unit/test_spanner.py index ab5479eb3c..4625d53b08 100644 --- a/tests/unit/test_spanner.py +++ b/tests/unit/test_spanner.py @@ -45,7 +45,13 @@ from google.api_core import gapic_v1 -from tests._helpers import OpenTelemetryBase +from tests._helpers import ( + OpenTelemetryBase, + DB_SYSTEM, + DB_NAME, + DB_CONNECTION_STRING, + NET_HOST_NAME, +) TABLE_NAME = "citizens" COLUMNS = ["email", "first_name", "last_name", "age"] @@ -110,10 +116,10 @@ class TestTransaction(OpenTelemetryBase): TRANSACTION_TAG = "transaction-tag" BASE_ATTRIBUTES = { - "db.type": "spanner", - "db.url": "spanner.googleapis.com", - "db.instance": "testing", - "net.host.name": "spanner.googleapis.com", + DB_SYSTEM: "spanner", + DB_CONNECTION_STRING: "spanner.googleapis.com", + DB_NAME: "testing", + NET_HOST_NAME: "spanner.googleapis.com", } def _getTargetClass(self): diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index b40ae8843f..a7867f083e 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -21,7 +21,14 @@ from google.api_core.retry import Retry from google.api_core import gapic_v1 -from tests._helpers import OpenTelemetryBase, StatusCode +from tests._helpers import ( + OpenTelemetryBase, + StatusCode, + DB_SYSTEM, + DB_NAME, + DB_CONNECTION_STRING, + NET_HOST_NAME, +) TABLE_NAME = "citizens" COLUMNS = ["email", "first_name", "last_name", "age"] @@ -53,10 +60,10 @@ class TestTransaction(OpenTelemetryBase): TRANSACTION_TAG = "transaction-tag" BASE_ATTRIBUTES = { - "db.type": "spanner", - "db.url": "spanner.googleapis.com", - "db.instance": "testing", - "net.host.name": "spanner.googleapis.com", + DB_SYSTEM: "spanner", + DB_CONNECTION_STRING: "spanner.googleapis.com", + DB_NAME: "testing", + NET_HOST_NAME: "spanner.googleapis.com", } def _getTargetClass(self):