Skip to content

Commit

Permalink
botocore: add basic tracing for bedrock ConverseStream (#3204)
Browse files Browse the repository at this point in the history
* Add tracing for ConverseStream

* Add converse stream example
  • Loading branch information
xrmx authored Jan 27, 2025
1 parent 2756c1e commit 0bb1c42
Show file tree
Hide file tree
Showing 11 changed files with 395 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3161](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3161))
- `opentelemetry-opentelemetry-botocore` Add basic support for GenAI attributes for AWS Bedrock InvokeModel API
([#3200](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3200))
- `opentelemetry-opentelemetry-botocore` Add basic support for GenAI attributes for AWS Bedrock ConverseStream API
([#3204](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3204))

### Fixed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ Available examples
------------------

- `converse.py` uses `bedrock-runtime` `Converse API <https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html>_`.
- `converse_stream.py` uses `bedrock-runtime` `ConverseStream API <https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html>_`.
- `invoke_model.py` uses `bedrock-runtime` `InvokeModel API <https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_InvokeModel.html>_`.

Setup
-----
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os

import boto3


def main():
client = boto3.client("bedrock-runtime")
stream = client.converse_stream(
modelId=os.getenv("CHAT_MODEL", "amazon.titan-text-lite-v1"),
messages=[
{
"role": "user",
"content": [{"text": "Write a short poem on OpenTelemetry."}],
},
],
)

response = ""
for event in stream["stream"]:
if "contentBlockDelta" in event:
response += event["contentBlockDelta"]["delta"]["text"]
print(response)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,15 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
}

_safe_invoke(extension.extract_attributes, attributes)
end_span_on_exit = extension.should_end_span_on_exit()

with self._tracer.start_as_current_span(
call_context.span_name,
kind=call_context.span_kind,
attributes=attributes,
# tracing streaming services require to close the span manually
# at a later time after the stream has been consumed
end_on_exit=end_span_on_exit,
) as span:
_safe_invoke(extension.before_service_call, span)
self._call_request_hook(span, call_context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import logging
from typing import Any

from botocore.eventstream import EventStream
from botocore.response import StreamingBody

from opentelemetry.instrumentation.botocore.extensions.bedrock_utils import (
ConverseStreamWrapper,
)
from opentelemetry.instrumentation.botocore.extensions.types import (
_AttributeMapT,
_AwsSdkExtension,
Expand Down Expand Up @@ -62,7 +66,14 @@ class _BedrockRuntimeExtension(_AwsSdkExtension):
Amazon Bedrock Runtime</a>.
"""

_HANDLED_OPERATIONS = {"Converse", "InvokeModel"}
_HANDLED_OPERATIONS = {"Converse", "ConverseStream", "InvokeModel"}
_DONT_CLOSE_SPAN_ON_END_OPERATIONS = {"ConverseStream"}

def should_end_span_on_exit(self):
return (
self._call_context.operation
not in self._DONT_CLOSE_SPAN_ON_END_OPERATIONS
)

def extract_attributes(self, attributes: _AttributeMapT):
if self._call_context.operation not in self._HANDLED_OPERATIONS:
Expand All @@ -77,7 +88,7 @@ def extract_attributes(self, attributes: _AttributeMapT):
GenAiOperationNameValues.CHAT.value
)

# Converse
# Converse / ConverseStream
if inference_config := self._call_context.params.get(
"inferenceConfig"
):
Expand Down Expand Up @@ -251,6 +262,20 @@ def on_success(self, span: Span, result: dict[str, Any]):
return

if not span.is_recording():
if not self.should_end_span_on_exit():
span.end()
return

# ConverseStream
if "stream" in result and isinstance(result["stream"], EventStream):

def stream_done_callback(response):
self._converse_on_success(span, response)
span.end()

result["stream"] = ConverseStreamWrapper(
result["stream"], stream_done_callback
)
return

# Converse
Expand Down Expand Up @@ -328,3 +353,6 @@ def on_error(self, span: Span, exception: _BotoClientErrorT):
span.set_status(Status(StatusCode.ERROR, str(exception)))
if span.is_recording():
span.set_attribute(ERROR_TYPE, type(exception).__qualname__)

if not self.should_end_span_on_exit():
span.end()
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Includes work from:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

from botocore.eventstream import EventStream
from wrapt import ObjectProxy


# pylint: disable=abstract-method
class ConverseStreamWrapper(ObjectProxy):
"""Wrapper for botocore.eventstream.EventStream"""

def __init__(
self,
stream: EventStream,
stream_done_callback,
):
super().__init__(stream)

self._stream_done_callback = stream_done_callback
# accumulating things in the same shape of non-streaming version
# {"usage": {"inputTokens": 0, "outputTokens": 0}, "stopReason": "finish"}
self._response = {}

def __iter__(self):
for event in self.__wrapped__:
self._process_event(event)
yield event

def _process_event(self, event):
if "messageStart" in event:
# {'messageStart': {'role': 'assistant'}}
pass

if "contentBlockDelta" in event:
# {'contentBlockDelta': {'delta': {'text': "Hello"}, 'contentBlockIndex': 0}}
pass

if "contentBlockStop" in event:
# {'contentBlockStop': {'contentBlockIndex': 0}}
pass

if "messageStop" in event:
# {'messageStop': {'stopReason': 'end_turn'}}
if stop_reason := event["messageStop"].get("stopReason"):
self._response["stopReason"] = stop_reason

if "metadata" in event:
# {'metadata': {'usage': {'inputTokens': 12, 'outputTokens': 15, 'totalTokens': 27}, 'metrics': {'latencyMs': 2980}}}
if usage := event["metadata"].get("usage"):
self._response["usage"] = {}
if input_tokens := usage.get("inputTokens"):
self._response["usage"]["inputTokens"] = input_tokens

if output_tokens := usage.get("outputTokens"):
self._response["usage"]["outputTokens"] = output_tokens

self._stream_done_callback(self._response)
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ def should_trace_service_call(self) -> bool: # pylint:disable=no-self-use
"""
return True

def should_end_span_on_exit(self) -> bool: # pylint:disable=no-self-use
"""Returns if the span should be closed automatically on exit
Extensions might override this function to disable automatic closing
of the span if they need to close it at a later time themselves.
"""
return True

def extract_attributes(self, attributes: _AttributeMapT):
"""Callback which gets invoked before the span is created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def assert_completion_attributes_from_streaming_body(
)


def assert_completion_attributes(
def assert_converse_completion_attributes(
span: ReadableSpan,
request_model: str,
response: dict[str, Any] | None,
Expand Down Expand Up @@ -128,6 +128,34 @@ def assert_completion_attributes(
)


def assert_converse_stream_completion_attributes(
span: ReadableSpan,
request_model: str,
input_tokens: int | None = None,
output_tokens: int | None = None,
finish_reason: tuple[str] | None = None,
operation_name: str = "chat",
request_top_p: int | None = None,
request_temperature: int | None = None,
request_max_tokens: int | None = None,
request_stop_sequences: list[str] | None = None,
):
return assert_all_attributes(
span,
request_model,
input_tokens,
output_tokens,
finish_reason,
operation_name,
request_top_p,
request_temperature,
request_max_tokens,
tuple(request_stop_sequences)
if request_stop_sequences is not None
else request_stop_sequences,
)


def assert_equal_or_not_present(value, attribute_name, span):
if value is not None:
assert value == span.attributes[attribute_name]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
interactions:
- request:
body: '{"messages": [{"role": "user", "content": [{"text": "Say this is a test"}]}],
"inferenceConfig": {"maxTokens": 10, "temperature": 0.8, "topP": 1, "stopSequences":
["|"]}}'
headers:
Content-Length:
- '170'
Content-Type:
- !!binary |
YXBwbGljYXRpb24vanNvbg==
User-Agent:
- !!binary |
Qm90bzMvMS4zNS41NiBtZC9Cb3RvY29yZSMxLjM1LjU2IHVhLzIuMCBvcy9saW51eCM2LjEuMC0x
MDM0LW9lbSBtZC9hcmNoI3g4Nl82NCBsYW5nL3B5dGhvbiMzLjEwLjEyIG1kL3B5aW1wbCNDUHl0
aG9uIGNmZy9yZXRyeS1tb2RlI2xlZ2FjeSBCb3RvY29yZS8xLjM1LjU2
X-Amz-Date:
- !!binary |
MjAyNTAxMjNUMDk1MTU2Wg==
X-Amz-Security-Token:
- test_aws_security_token
X-Amzn-Trace-Id:
- !!binary |
Um9vdD0xLTA0YmY4MjVjLTAxMTY5NjdhYWM1NmIxM2RlMDI1N2QwMjtQYXJlbnQ9MDdkM2U3N2Rl
OGFjMzJhNDtTYW1wbGVkPTE=
amz-sdk-invocation-id:
- !!binary |
ZGQ1MTZiNTEtOGU1Yi00NGYyLTk5MzMtZjAwYzBiOGFkYWYw
amz-sdk-request:
- !!binary |
YXR0ZW1wdD0x
authorization:
- Bearer test_aws_authorization
method: POST
uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/amazon.titan-text-lite-v1/converse-stream
response:
body:
string: !!binary |
AAAAlAAAAFLEwW5hCzpldmVudC10eXBlBwAMbWVzc2FnZVN0YXJ0DTpjb250ZW50LXR5cGUHABBh
cHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsicCI6ImFiY2RlZmdoaWprbG1u
b3BxcnN0dXZ3Iiwicm9sZSI6ImFzc2lzdGFudCJ9P+wfRAAAAMQAAABXjLhVJQs6ZXZlbnQtdHlw
ZQcAEWNvbnRlbnRCbG9ja0RlbHRhDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTpt
ZXNzYWdlLXR5cGUHAAVldmVudHsiY29udGVudEJsb2NrSW5kZXgiOjAsImRlbHRhIjp7InRleHQi
OiJIaSEgSG93IGNhbiBJIGhlbHAgeW91In0sInAiOiJhYmNkZWZnaGlqa2xtbm9wcXJzdHUifeBJ
9mIAAACJAAAAVlvc+UsLOmV2ZW50LXR5cGUHABBjb250ZW50QmxvY2tTdG9wDTpjb250ZW50LXR5
cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVldmVudHsiY29udGVudEJsb2Nr
SW5kZXgiOjAsInAiOiJhYmNkZSJ95xzwrwAAAKcAAABRu0n9jQs6ZXZlbnQtdHlwZQcAC21lc3Nh
Z2VTdG9wDTpjb250ZW50LXR5cGUHABBhcHBsaWNhdGlvbi9qc29uDTptZXNzYWdlLXR5cGUHAAVl
dmVudHsicCI6ImFiY2RlZmdoaWprbG1ub3BxcnN0dXZ3eHl6QUJDREVGR0hJSiIsInN0b3BSZWFz
b24iOiJtYXhfdG9rZW5zIn1LR3pNAAAAygAAAE5X40OECzpldmVudC10eXBlBwAIbWV0YWRhdGEN
OmNvbnRlbnQtdHlwZQcAEGFwcGxpY2F0aW9uL2pzb24NOm1lc3NhZ2UtdHlwZQcABWV2ZW50eyJt
ZXRyaWNzIjp7ImxhdGVuY3lNcyI6NjA4fSwicCI6ImFiY2RlZmdoaWprIiwidXNhZ2UiOnsiaW5w
dXRUb2tlbnMiOjgsIm91dHB1dFRva2VucyI6MTAsInRvdGFsVG9rZW5zIjoxOH19iiQr+w==
headers:
Connection:
- keep-alive
Content-Type:
- application/vnd.amazon.eventstream
Date:
- Thu, 23 Jan 2025 09:51:56 GMT
Set-Cookie: test_set_cookie
Transfer-Encoding:
- chunked
x-amzn-RequestId:
- 2b74a5d3-615a-4f81-b00f-f0b10a618e23
status:
code: 200
message: OK
version: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
interactions:
- request:
body: '{"messages": [{"role": "user", "content": [{"text": "Say this is a test"}]}]}'
headers:
Content-Length:
- '77'
Content-Type:
- !!binary |
YXBwbGljYXRpb24vanNvbg==
User-Agent:
- !!binary |
Qm90bzMvMS4zNS41NiBtZC9Cb3RvY29yZSMxLjM1LjU2IHVhLzIuMCBvcy9saW51eCM2LjEuMC0x
MDM0LW9lbSBtZC9hcmNoI3g4Nl82NCBsYW5nL3B5dGhvbiMzLjEwLjEyIG1kL3B5aW1wbCNDUHl0
aG9uIGNmZy9yZXRyeS1tb2RlI2xlZ2FjeSBCb3RvY29yZS8xLjM1LjU2
X-Amz-Date:
- !!binary |
MjAyNTAxMjNUMDk1MTU3Wg==
X-Amz-Security-Token:
- test_aws_security_token
X-Amzn-Trace-Id:
- !!binary |
Um9vdD0xLTI5NzA1OTZhLTEyZWI5NDk2ODA1ZjZhYzE5YmU3ODM2NztQYXJlbnQ9Y2M0OTA0YWE2
ZjQ2NmYxYTtTYW1wbGVkPTE=
amz-sdk-invocation-id:
- !!binary |
MjQzZWY2ZDgtNGJhNy00YTVlLWI0MGEtYThiNDE2ZDIzYjhk
amz-sdk-request:
- !!binary |
YXR0ZW1wdD0x
authorization:
- Bearer test_aws_authorization
method: POST
uri: https://bedrock-runtime.us-east-1.amazonaws.com/model/does-not-exist/converse-stream
response:
body:
string: '{"message":"The provided model identifier is invalid."}'
headers:
Connection:
- keep-alive
Content-Length:
- '55'
Content-Type:
- application/json
Date:
- Thu, 23 Jan 2025 09:51:57 GMT
Set-Cookie: test_set_cookie
x-amzn-ErrorType:
- ValidationException:http://internal.amazon.com/coral/com.amazon.bedrock/
x-amzn-RequestId:
- 358b122c-d045-4d8f-a5bb-b0bd8cf6ee59
status:
code: 400
message: Bad Request
version: 1
Loading

0 comments on commit 0bb1c42

Please sign in to comment.