Hi π, in this post we shall see how to use Open Telemetry and Langfuse cloud to observe LLM calls made via Strands Agents.
UV
Let's initialize our project with uv.
uv init strands-langfuse-demo
Copy the code shown in this video to the project, so that the content looks like below. The final code for this lab is anyway here.
$ cd strands-langfuse-demo/
$ ls -a
. .. .env .python-version README.md k8s_mcp_agent.py k8s_mcp_app.py main.py pyproject.toml
Let's activate the virtual environment.
uv venv
source .venv/bin/activate
We can now install these dependencies: streamlit, strands-agents
uv add streamlit==1.49.1
uv add strands-agents[otel]==1.6.0
We are adding the extra otel here, as we need to enable telemetry.
K3S
I am using K3S for launching a single node kubernetes cluster.
curl -sfL https://get.k3s.io | sh -
Let's copy the kubeconfig as we will be referring to this in our agent configuration.
sudo cp /etc/rancher/k3s/k3s.yaml .
Langfuse
Go to Langfuse cloud and create a new organization(for ex. my-org), inside that you can create a new project(for ex. my-llm-org). And generate api keys for the project.
We need to add these variables in our env file. So it looks like below.
cat .env
AWS_ACCESS_KEY_ID=""
AWS_SECRET_ACCESS_KEY=""
AWS_DEFAULT_REGION=us-west-2
LANGFUSE_PUBLIC_KEY="pk-lf-.."
LANGFUSE_SECRET_KEY="sk-lf-.."
OTEL_EXPORTER_OTLP_ENDPOINT = "https://cloud.langfuse.com/api/public/otel"
Let's use this code to load the env vars and set the telemetry with otlp headers.
$ cat set_telemetry.py
import os
import base64
from dotenv import load_dotenv
from strands.telemetry import StrandsTelemetry
def set_telemetry():
load_dotenv()
# Build Basic Auth header.
LANGFUSE_AUTH = base64.b64encode(
f"{os.environ.get('LANGFUSE_PUBLIC_KEY')}:{os.environ.get('LANGFUSE_SECRET_KEY')}".encode()
).decode()
# Configure OpenTelemetry endpoint & headers
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {LANGFUSE_AUTH}"
strands_telemetry = StrandsTelemetry()
strands_telemetry.setup_otlp_exporter()
Since we are loading env above, we can remove it from the agent code. These lines can be removed:
$ cat k8s_mcp_agent.py | grep dotenv
from dotenv import load_dotenv
load_dotenv()
And replace it with the new file we added.
$ cat k8s_mcp_agent.py | grep telemetry
from set_telemetry import set_telemetry
set_telemetry()
Streamlit
Let's now run our app.
streamlit run k8s_mcp_app.py
I tried a prompt to get the list of namespaces.
We can see the trace for this in langfuse.
We can add some custom attributes to traces, let's add one by the name session.id with the trace_attributes argument.
import uuid
self.agent = Agent(
callback_handler=None,
model="us.amazon.nova-micro-v1:0",
tools=tools,
trace_attributes={
"session.id": str(uuid.uuid4()),
},
)
Which we can now see in the trace in the attributes section.
The final agent code will be:
$ cat k8s_mcp_agent.py
import re
from mcp import stdio_client, StdioServerParameters
from strands import Agent
from strands.tools.mcp import MCPClient
import streamlit as st
from set_telemetry import set_telemetry
import uuid
def remove_html_tags(text_with_html):
text_with_out_html = re.sub(r"<[^>]+>", "", text_with_html)
return text_with_out_html
async def stream_result(stream):
result = ""
placeholder = st.empty()
async for chunk in stream:
if 'data' in chunk:
result += chunk['data']
result = remove_html_tags(result)
placeholder.write(result)
class KubernetesMCPAgent:
def __init__(self):
set_telemetry()
server_params = {
"command": "npx",
"args": [
"-y",
"kubernetes-mcp-server@latest"
],
"env": {
"KUBECONFIG": "k3s.yaml"
}
}
self.stdio_mcp_client = MCPClient(lambda: stdio_client(
StdioServerParameters(
**server_params
)
))
with self.stdio_mcp_client:
# Get the tools from the MCP server
tools = self.stdio_mcp_client.list_tools_sync()
# Create an agent with these tools
self.agent = Agent(
callback_handler=None,
model="us.amazon.nova-micro-v1:0",
tools=tools,
trace_attributes={
"session.id": str(uuid.uuid4()),
},
)
async def send_prompt(self, prompt):
with self.stdio_mcp_client:
stream = self.agent.stream_async(prompt)
await stream_result(stream)
kubernetes_mcp_agent = KubernetesMCPAgent()
That's it for the post, hope we got some insights on LLM observability. Thank you for reading!
Top comments (0)