Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use modernized and standardized OpenTelemetry when tracing #1172

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .github/workflows/integration-tests-against-emulator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ jobs:
python-version: 3.8
- name: Install nox
run: python -m pip install nox
- name: Run system tests
- name: Run system tests (without extended tracing)
run: nox -s system
env:
SPANNER_EMULATOR_HOST: localhost:9010
GOOGLE_CLOUD_PROJECT: emulator-test-project
GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE: true
EXTENDED_TRACING_ENABLED: false
- name: Run system tests (with extended tracing)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a seperate job for extended tracing? What are we validating with the help of this job?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to ensure that extended tracing works as expected because there is no other mechanism to test for it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EXTENDED_TRACING_ENABLED will add sql statements in span attributes right?
I don't think we need to run a job explicitly for that if we don't have any tests specifically to validate this behavior in system tests.

run: nox -s system
env:
SPANNER_EMULATOR_HOST: localhost:9010
GOOGLE_CLOUD_PROJECT: emulator-test-project
GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE: true
EXTENDED_TRACING_ENABLED: true
63 changes: 55 additions & 8 deletions docs/opentelemetry-tracing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,79 @@ To take advantage of these traces, we first need to install OpenTelemetry:
.. code-block:: sh

pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation

# [Optional] Installs the cloud monitoring exporter, however you can use any exporter of your choice
pip install opentelemetry-exporter-google-cloud
pip install opentelemetry-exporter-gcp-trace

We also need to tell OpenTelemetry which exporter to use. To export Spanner traces to `Cloud Tracing <https://cloud.google.com/trace>`_, add the following lines to your application:

.. code:: python

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.trace.sampling import ProbabilitySampler
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
# BatchExportSpanProcessor exports spans to Cloud Trace
# BatchSpanProcessor exports spans to Cloud Trace
# in a seperate thread to not block on the main thread
from opentelemetry.sdk.trace.export import BatchExportSpanProcessor
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# Create and export one trace every 1000 requests
sampler = ProbabilitySampler(1/1000)
sampler = TraceIdRatioBased(1/1000)
# Use the default tracer provider
trace.set_tracer_provider(TracerProvider(sampler=sampler))
trace.get_tracer_provider().add_span_processor(
# Initialize the cloud tracing exporter
BatchExportSpanProcessor(CloudTraceSpanExporter())
BatchSpanProcessor(CloudTraceSpanExporter())
)


Alternatively you can pass in a tracer provider into the Cloud Spanner
initialization, otherwise the global tracer will be used:

.. code:: python

from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter

# Create and export one trace every 1000 requests
sampler = TraceIdRatioBased(1/1000)
tracerProvider = TracerProvider(sampler=sampler)
tracerProvider.add_span_processor(
# Initialize the cloud tracing exporter
BatchSpanProcessor(CloudTraceSpanExporter())
)

options = dict(tracer_provider=tracerProvider)
# Pass the tracer provider while creating the Spanner client.
spanner_client = spanner.Client(observability_options=options)
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)


To get more fine-grained traces from gRPC, you can enable the gRPC instrumentation by the following

.. code-block:: sh

pip install opentelemetry-instrumentation-grpc

and then in your Python code, please add the following lines:

.. code:: python

from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
grpc_client_instrumentor = GrpcInstrumentorClient()
grpc_client_instrumentor.instrument()


Generated spanner traces should now be available on `Cloud Trace <https://console.cloud.google.com/traces>`_.

Tracing is most effective when many libraries are instrumented to provide insight over the entire lifespan of a request.
For a list of libraries that can be instrumented, see the `OpenTelemetry Integrations` section of the `OpenTelemetry Python docs <https://opentelemetry-python.readthedocs.io/en/stable/>`_

To allow for SQL statements to be annotated in your spans, please set
the environment variable `SPANNER_ENABLE_EXTENDED_TRACING=true` or please set the configuration field `enable_extended_tracing` to `True` when configuring the Cloud Spanner client, like this:

.. code:: python

tracerProvider = TracerProvider(sampler=sampler)
opts = dict(tracer_provider=tracerProvider, enable_extended_tracing=true)
spanner_client = spanner.Client(project_id, observability_options=opts)
71 changes: 71 additions & 0 deletions examples/grpc_instrumentation_enabled.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License

import os
import time

import google.cloud.spanner as spanner
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import ALWAYS_ON
from opentelemetry import trace

# Enable the gRPC instrumentation if you'd like more introspection.
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient
grpc_client_instrumentor = GrpcInstrumentorClient()
grpc_client_instrumentor.instrument()

def main():
# Setup common variables that'll be used between Spanner and traces.
project_id = os.environ.get('SPANNER_PROJECT_ID')

# Setup OpenTelemetry, trace and Cloud Trace exporter.
sampler = ALWAYS_ON
tracerProvider = TracerProvider(sampler=sampler)
traceExporter = CloudTraceSpanExporter(project_id)
tracerProvider.add_span_processor(
BatchSpanProcessor(traceExporter))
trace.set_tracer_provider(tracerProvider)
# Retrieve the set shared tracer.
tracer = tracerProvider.get_tracer('cloud.google.com/python/spanner', spanner.__version__)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think this is needed. In this case QueryInformationSchema span will be shown as being generated from python spanner client, but it is generated from customer application.

Can you update the module name and version name to application name and version and see how the output changes?
Can you share me the results of both, with change and without change?


# Setup the Cloud Spanner Client.
# Here we directly pass in the tracerProvider into the spanner client.
opts = dict(tracer_provider=tracerProvider)
spanner_client = spanner.Client(project_id, observability_options=opts)

instance = spanner_client.instance('test-instance')
database = instance.database('test-db')

# Now run our queries
with tracer.start_as_current_span('QueryInformationSchema'):
with database.snapshot() as snapshot:
with tracer.start_as_current_span('InformationSchema'):
info_schema = snapshot.execute_sql(
'SELECT * FROM INFORMATION_SCHEMA.TABLES')
for row in info_schema:
print(row)

with tracer.start_as_current_span('ServerTimeQuery'):
with database.snapshot() as snapshot:
# Purposefully issue a bad SQL statement to examine exceptions
# that get recorded and a ERROR span status.
data = snapshot.execute_sql('SELECT CURRENT_TIMESTAMP()')
for row in data:
print(row)

if __name__ == '__main__':
main()
70 changes: 70 additions & 0 deletions examples/trace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# -*- coding: utf-8 -*-
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License

import os
import time

import google.cloud.spanner as spanner
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import ALWAYS_ON
from opentelemetry import trace

def main():
# Setup common variables that'll be used between Spanner and traces.
project_id = os.environ.get('SPANNER_PROJECT_ID')

# Setup OpenTelemetry, trace and Cloud Trace exporter.
sampler = ALWAYS_ON
tracerProvider = TracerProvider(sampler=sampler)
traceExporter = CloudTraceSpanExporter(project_id)
tracerProvider.add_span_processor(
BatchSpanProcessor(traceExporter))
trace.set_tracer_provider(tracerProvider)
# Retrieve the set shared tracer.
tracer = tracerProvider.get_tracer('MyPackage')

# Setup the Cloud Spanner Client.
spanner_client = spanner.Client(project_id)
# Alternatively you can directly pass in the tracerProvider into
# the spanner client, otherwise the global tracer shall be used.
if False:
opts = dict(tracer_provider=tracerProvider)
spanner_client = spanner.Client(project_id, observability_options=opts)

instance = spanner_client.instance('test-instance')
database = instance.database('test-db')

# Now run our queries
with tracer.start_as_current_span('QueryInformationSchema'):
with database.snapshot() as snapshot:
with tracer.start_as_current_span('InformationSchema'):
info_schema = snapshot.execute_sql(
'SELECT * FROM INFORMATION_SCHEMA.TABLES')
for row in info_schema:
print(row)

with tracer.start_as_current_span('ServerTimeQuery'):
with database.snapshot() as snapshot:
# Purposefully issue a bad SQL statement to examine exceptions
# that get recorded and a ERROR span status.
data = snapshot.execute_sql('SELECT CURRENT_TIMESTAMP()')
for row in data:
print(row)


if __name__ == '__main__':
main()
3 changes: 2 additions & 1 deletion google/cloud/spanner_v1/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,9 @@ class _SessionWrapper(object):
:param session: the session used to perform the commit
"""

def __init__(self, session):
def __init__(self, session, observability_options=None):
self._session = session
self._observability_options = observability_options


def _metadata_with_prefix(prefix, **kw):
Expand Down
70 changes: 63 additions & 7 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,95 @@
"""Manages OpenTelemetry trace creation and handling"""

from contextlib import contextmanager
import os

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.spanner_v1 import SpannerClient
from google.cloud.spanner_v1 import gapic_version as LIB_VERSION

try:
from opentelemetry import trace
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.semconv.attributes import (
OTEL_SCOPE_NAME,
OTEL_SCOPE_VERSION,
)

HAS_OPENTELEMETRY_INSTALLED = True
except ImportError:
DB_SYSTEM = SpanAttributes.DB_SYSTEM
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We are changing attribute conventions here. As discussed earlier do we need to make this change under a seperate option to avoid breaking changes?
  2. I also see that we might need to change span names to be consistent with this document, which is anyhow a breaking change.

So should we make these changes directly or instead protect them under an option?
My opinion is since we are refactoring observability portion, lets make all changes directly and warn the customers in release notes. Having a seperate option will make code messier.
@surbhigarg92 Can you give your thoughts on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@odeke-em

  1. For attribute conventions lets enable it under a seperate flag to avoid breaking changes.
  2. Lets not change any existing span names as they could bring in a breaking change. Instead can you help add additional spans that could help in debugging.

DB_NAME = SpanAttributes.DB_NAME
DB_CONNECTION_STRING = SpanAttributes.DB_CONNECTION_STRING
NET_HOST_NAME = SpanAttributes.NET_HOST_NAME
DB_STATEMENT = SpanAttributes.DB_STATEMENT
except ImportError as e:
HAS_OPENTELEMETRY_INSTALLED = False
DB_STATEMENT = 'db.statement'


EXTENDED_TRACING_ENABLED = os.environ.get('SPANNER_ENABLE_EXTENDED_TRACING', '') == 'true'
odeke-em marked this conversation as resolved.
Show resolved Hide resolved
LIB_FQNAME = 'cloud.google.com/python/spanner'
TRACER_NAME = LIB_FQNAME
TRACER_VERSION = LIB_VERSION

def get_tracer(tracer_provider=None):
"""
get_tracer is a utility to unify and simplify retrieval of the tracer, without
leaking implementation details given that retrieving a tracer requires providing
the full qualified library name and version.
When the tracer_provider is set, it'll retrieve the tracer from it, otherwise
it'll fall back to the global tracer provider and use this library's specific semantics.
"""
if not tracer_provider:
# Acquire the global tracer provider.
tracer_provider = trace.get_tracer_provider()

return tracer_provider.get_tracer(TRACER_NAME, TRACER_VERSION)


@contextmanager
def trace_call(name, session, extra_attributes=None):
def trace_call(name, session, extra_attributes=None, observability_options=None):
if not HAS_OPENTELEMETRY_INSTALLED or not session:
odeke-em marked this conversation as resolved.
Show resolved Hide resolved
# Empty context manager. Users will have to check if the generated value is None or a span
yield None
return

tracer = trace.get_tracer(__name__)
tracer = None
if observability_options:
tracerProvider = observability_options.get('tracer_provider', None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variable should be lower case

if tracerProvider:
tracer = get_tracer(tracerProvider)

if tracer is None: # Acquire the global tracer if none was provided.
tracer = get_tracer()

# It is imperative that we properly record the Cloud Spanner
# endpoint tracks whether we are connecting to the emulator
# or to production.
spanner_endpoint = os.getenv("SPANNER_EMULATOR_HOST")
if not spanner_endpoint:
spanner_endpoint = SpannerClient.DEFAULT_ENDPOINT

# Set base attributes that we know for every trace created
attributes = {
"db.type": "spanner",
"db.url": SpannerClient.DEFAULT_ENDPOINT,
"db.instance": session._database.name,
"net.host.name": SpannerClient.DEFAULT_ENDPOINT,
DB_SYSTEM: "spanner",
DB_CONNECTION_STRING: spanner_endpoint,
DB_NAME: session._database.name,
NET_HOST_NAME: spanner_endpoint,
OTEL_SCOPE_NAME: LIB_FQNAME,
OTEL_SCOPE_VERSION: TRACER_VERSION,
}

if extra_attributes:
attributes.update(extra_attributes)

extended_tracing = EXTENDED_TRACING_ENABLED or (
observability_options and
observability_options.get('enable_extended_tracing', False))

if not extended_tracing:
attributes.pop(DB_STATEMENT, None)

with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT, attributes=attributes
) as span:
Expand Down
6 changes: 4 additions & 2 deletions google/cloud/spanner_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ def commit(
max_commit_delay=max_commit_delay,
request_options=request_options,
)
with trace_call("CloudSpanner.Commit", self._session, trace_attributes):
with trace_call("CloudSpanner.Commit", self._session, trace_attributes,
observability_options=self._observability_options):
method = functools.partial(
api.commit,
request=request,
Expand Down Expand Up @@ -318,7 +319,8 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
request_options=request_options,
exclude_txn_from_change_streams=exclude_txn_from_change_streams,
)
with trace_call("CloudSpanner.BatchWrite", self._session, trace_attributes):
with trace_call("CloudSpanner.BatchWrite", self._session, trace_attributes,
observability_options=self._observability_options):
method = functools.partial(
api.batch_write,
request=request,
Expand Down
Loading