DEV Community

Alair Joao Tavares
Alair Joao Tavares

Posted on • Originally published at activi.dev

Migrating to Vertex AI: A Practical Guide to LLM Tooling Refactoring and SSE Streaming Stability in Python

Introduction: The Inevitable Evolution of AI Infrastructure

In the fast-paced world of generative AI, the tools and platforms we use are constantly evolving. What was state-of-the-art six months ago might be legacy today. For many developers who started building with early-access SDKs like the Google Generative AI library, the time has come to migrate to more robust, enterprise-grade platforms. This isn't just about keeping up with trends; it's about unlocking scalability, enhanced security, and a richer feature set that's crucial for production applications.

This article chronicles a practical journey of migrating a Python-based LLM-powered chat system from the legacy Google Generative AI SDK to the comprehensive Vertex AI platform. We'll dive deep into the architectural shifts, the code-level refactoring, and the unexpected challenges that arise. Specifically, we will cover:

  1. The Rationale for Migration: Why move from a simple SDK to a full-fledged MLOps platform like Vertex AI?
  2. Refactoring Tool Declarations: How to adapt your function-calling tools to Vertex AI's more structured and strongly-typed format.
  3. Implementing a Stable SSE Streaming Endpoint: The nuts and bolts of building a real-time, responsive chat experience using Server-Sent Events (SSE).
  4. Post-Migration Hardening: Tackling common stability issues like cache locking and content filtering to ensure a smooth user experience.

Whether you're facing a similar migration or just want to understand how to build resilient LLM applications on Google Cloud, this guide will provide you with actionable insights and battle-tested code examples.

Section 1: From API Keys to Service Accounts - The Vertex AI Shift

The most immediate difference when moving from the standalone Google Generative AI SDK to Vertex AI is the authentication and initialization model. The former often relies on a simple API key, which is convenient for development but less secure for production environments.

Vertex AI, being an integral part of Google Cloud Platform (GCP), embraces a more secure, identity-based approach using Application Default Credentials (ADC). This typically involves a service account with specific IAM (Identity and Access Management) permissions. This shift is a significant step up in terms of security and production readiness.

Before: The Legacy SDK

With the old SDK, your initialization code might have looked something like this, using an API key stored as an environment variable:

# Legacy google.generativeai SDK approach
import google.generativeai as genai
import os

# Authentication via API Key
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])

# Model initialization
model = genai.GenerativeModel('gemini-pro')
Enter fullscreen mode Exit fullscreen mode

After: The Vertex AI SDK

The Vertex AI SDK integrates seamlessly with the GCP ecosystem. Once you've authenticated your environment (e.g., using gcloud auth application-default login for local development or by attaching a service account to a VM/Cloud Run instance), the initialization becomes much cleaner.

# The modern Vertex AI SDK approach
import vertexai
from vertexai.generative_models import GenerativeModel, Part

# No explicit API key needed! ADC handles authentication.
vertexai.init(project="your-gcp-project-id", location="us-central1")

# Model initialization
model = GenerativeModel("gemini-1.0-pro-001")
Enter fullscreen mode Exit fullscreen mode

This approach not only eliminates the need to manage API keys in your application code but also allows for fine-grained permissions. You can grant the service account access only to the Vertex AI API and nothing else, adhering to the principle of least privilege.

Section 2: Refactoring Function Calling with Structured Schemas

Function calling is a cornerstone of creating sophisticated AI agents that can interact with external systems. This was one of the areas that required the most significant refactoring during the migration. The legacy SDK used a more loosely-defined method for declaring tools, while Vertex AI enforces a more rigid, typed schema.

This new structure, based on google.cloud.aiplatform_v1.types, is more verbose but also more explicit and less error-prone. It forces you to clearly define the name, description, and parameter types for each tool, which ultimately leads to better model performance and more reliable function calls.

Let's imagine we have a simple Python function to fetch a user's profile.

# The function our AI model will learn to call
def get_user_profile(user_id: int) -> dict:
    """Fetches a user's profile from the database.

    Args:
        user_id (int): The unique identifier for the user.

    Returns:
        dict: A dictionary containing the user's name and email, or an error message.
    """
    # In a real application, this would query a database or another service
    users = {
        101: {"name": "Alice", "email": "alice@example.com"},
        102: {"name": "Bob", "email": "bob@example.com"}
    }
    return users.get(user_id, {"error": "User not found"})
Enter fullscreen mode Exit fullscreen mode

Before: Simple Tool Declaration

The old SDK might have allowed for a simpler, dictionary-based declaration. While quick to write, it lacked strong typing.

After: Using FunctionDeclaration and Schema

With Vertex AI, we define the same tool using a structured approach. This ensures the model receives a perfectly formatted OpenAPI schema, improving its ability to correctly identify when to call the function and with what arguments.

from vertexai.generative_models import Tool
from google.cloud.aiplatform_v1.types import ( 
    FunctionDeclaration,
    Schema,
    Tool as GapicTool,
    Type
)

# Define the tool for the get_user_profile function
get_user_profile_tool = FunctionDeclaration(
    name="get_user_profile",
    description="Fetches a user's profile from the database using their ID.",
    parameters=Schema(
        type=Type.OBJECT,
        properties={
            "user_id": Schema(
                type=Type.NUMBER,
                description="The unique identifier for the user."
            )
        },
        required=["user_id"],
    ),
)

# Wrap it in the GenerativeModel's Tool object
my_tool_kit = Tool(
    function_declarations=[get_user_profile_tool],
)

# Now, when calling the model, you pass this tool configuration
model = GenerativeModel(
    "gemini-1.0-pro-001",
    tools=[my_tool_kit]
)

# Example usage:
# response = model.generate_content("What is the email for user 101?")
# print(response.candidates[0].content.parts[0].function_call)
Enter fullscreen mode Exit fullscreen mode

This code is more robust. It explicitly defines the user_id parameter as a NUMBER and marks it as required. This level of detail is invaluable for complex integrations and reduces the likelihood of the model hallucinating function calls with incorrect parameters.

Section 3: Building a Resilient SSE Streaming Endpoint

For a chat application, real-time feedback is non-negotiable. Waiting for the model to generate its entire response before displaying anything leads to a poor user experience. The solution is streaming, and a fantastic, lightweight way to implement this on the web is with Server-Sent Events (SSE).

An SSE endpoint is a long-lived HTTP connection where the server can push data to the client as it becomes available. This is a perfect match for LLM streaming responses.

Here’s a practical example of how to create an SSE endpoint in a Django application that streams the response from Vertex AI.

# In your Django views.py

from django.http import StreamingHttpResponse
import vertexai
from vertexai.generative_models import GenerativeModel
import json

def stream_chat_response(request):
    """An SSE endpoint to stream chat responses from Vertex AI."""
    try:
        # Assume the user's message is sent as a POST request
        data = json.loads(request.body)
        user_message = data.get('message')
        if not user_message:
            # A simple generator for handling errors
            def error_stream():
                yield "data: {\"error\": \"Message is required.\"}\n\n"
            return StreamingHttpResponse(error_stream(), content_type='text/event-stream')

        vertexai.init(project="your-gcp-project-id", location="us-central1")
        model = GenerativeModel("gemini-1.0-pro-001")

        # Use the stream_generate_content method
        responses = model.generate_content(user_message, stream=True)

        def event_stream():
            """The generator function that yields SSE formatted data."""
            for chunk in responses:
                # Access the text content safely
                if chunk.candidates and chunk.candidates[0].content.parts:
                    text_chunk = chunk.candidates[0].content.parts[0].text

                    # Practical issue: The model sometimes includes instructional tags.
                    # We should strip these before sending to the client.
                    if "[FOLLOW_UPS:]" in text_chunk:
                        text_chunk = text_chunk.split("[FOLLOW_UPS:]")[0].strip()

                    if text_chunk:
                        # Format as SSE: data: <json_string>\n\n
                        # Using json.dumps ensures proper escaping of special characters
                        formatted_data = json.dumps({"text": text_chunk})
                        yield f"data: {formatted_data}\n\n"

        return StreamingHttpResponse(event_stream(), content_type='text/event-stream')

    except Exception as e:
        # Generator for streaming exceptions as well
        def exception_stream():
            yield f"data: {json.dumps({'error': str(e)})}\n\n"
        return StreamingHttpResponse(exception_stream(), content_type='text/event-stream')

Enter fullscreen mode Exit fullscreen mode

A key lesson learned during implementation is that the raw stream from the model isn't always pristine. As shown in the code, we had to explicitly strip out artifacts like [FOLLOW_UPS:] tags that are meant for internal logic, not for display. Always inspect the raw stream and add necessary filtering to your backend before passing it to the frontend.

Section 4: Post-Migration Stability and Best Practices

Going live with a migration is just the beginning. The real test is how the system performs under load and over time. We encountered a few subtle but important issues that required attention.

1. Handling Cache Locking with Atomic Operations

To improve performance and reduce costs, we cache LLM responses for common queries. However, with a streaming endpoint, multiple users could potentially trigger the same query simultaneously, leading to a race condition where several processes try to write to the same cache key at once. This can corrupt the cache or cause deadlocks.

Instead of a simple cache.set(), which can be problematic in a concurrent environment, it's better to use an atomic operation like cache.add(). This operation only sets the key if it doesn't already exist, making it inherently safe from race conditions.

# A conceptual example of safe caching in a streaming context

from django.core.cache import cache

def get_or_stream_response(query):
    cache_key = f"llm_response:{hash(query)}"

    # Check cache first
    cached_response = cache.get(cache_key)
    if cached_response:
        return cached_response

    # Stream the response from the model
    model = GenerativeModel("gemini-1.0-pro-001")
    responses = model.generate_content(query, stream=True)

    full_response_parts = []
    for chunk in responses:
        # yield chunk to the client (as in the SSE example)
        if chunk.text:
            full_response_parts.append(chunk.text)

    # Once the full response is collected, try to cache it atomically.
    full_response_text = "".join(full_response_parts)

    # cache.add() is atomic. It will only succeed for the first process
    # that completes the stream for this query.
    cache.add(cache_key, full_response_text, timeout=3600) # timeout in seconds

    return full_response_text
Enter fullscreen mode Exit fullscreen mode

By collecting the full response and then performing a single cache.add() operation after the stream is complete, we ensure that only one process wins the race to populate the cache, preventing data corruption.

2. Configuring Safety Settings

Vertex AI provides robust, configurable safety filters to prevent the model from generating harmful, unethical, or inappropriate content. While the default settings are well-balanced, you should fine-tune them for your specific application's context.

You can block content that reaches a certain threshold for categories like harassment, hate speech, and sexually explicit content.

from vertexai.generative_models import HarmCategory, HarmBlockThreshold

# Example of setting stricter safety settings
safety_config = {
    HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_LOW_AND_ABOVE,
    HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_LOW_AND_ABOVE,
    HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
    HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
}

model = GenerativeModel(
    "gemini-1.0-pro-001",
    safety_settings=safety_config
)

# Now, any calls to this model instance will use these settings
# response = model.generate_content("A potentially unsafe prompt")
# The response may be empty with a 'finish_reason' of 'SAFETY'
Enter fullscreen mode Exit fullscreen mode

Configuring these settings is a crucial step in building a responsible and user-friendly AI application.

Conclusion: Embracing the Enterprise-Ready Future

Migrating from the legacy Google Generative AI SDK to Vertex AI is more than a simple dependency swap; it's a strategic move towards a more scalable, secure, and feature-rich architecture. The process demands careful attention to detail, particularly in refactoring tool declarations and implementing robust streaming mechanisms.

Key Takeaways:

  • Embrace Structured Tooling: The explicit FunctionDeclaration and Schema approach in Vertex AI, while verbose, leads to more reliable and predictable function calling.
  • SSE is Ideal for Chat: Server-Sent Events provide a simple and effective way to stream LLM responses, but always remember to sanitize the raw output from the model.
  • Plan for Production Stability: Real-world challenges like cache race conditions and content moderation must be addressed proactively with atomic operations and well-configured safety settings.

By investing the time to navigate this migration, you position your application to leverage the full power of Google Cloud's MLOps ecosystem, ensuring it can grow and adapt in the ever-changing landscape of generative AI.

Top comments (0)