DEV Community

Rubens Zimbres
Rubens Zimbres

Posted on • Originally published at Medium on

Develop a Financial Multi-Agent System with Dynamic Tools using Gemini and Google ADK Agents

Agent-Based Modeling (ABM) has become a significant tool in academic research in the 90's, drawing its foundational principles from Complexity Theory, which dates back to the work of Ludwig von Bertalanffy in the 1950s. At its core, ABM simulates systems that are not static but in a constant state of flux, shaped by continuous feedback between individual agents and their environment. From simple initial rules, intricate phenomena like self-organization and emergence can arise, where the system as a whole exhibits properties far more complex than the sum of its parts.

This dynamic nature leads to non-linear behavior , where small disturbances can trigger disproportionate and unexpected reconfigurations throughout the system. Consequently, its patterns are notoriously difficult to capture and predict with traditional analytical methods.

This long-standing challenge provides crucial context for today’s applications. When modern multi-agent projects using Large Language Models (LLMs) fail, it’s easy to blame the LLM or the agent-based architecture. However, the root cause often isn’t the framework but rather the inherent limitations of the underlying model, such as hallucinations, our own unrealistic expectations of a predictable behavior from a probabilistic model and the non-linear behavior of these systems.

Why Agents ? A System Design Perspective of Using Agents for Production Level Use

Development Simplicity and Speed: Instead of writing complex routing logic from scratch in a monolithic structure, the Agent Development Kit (ADK) handles the orchestration. It automatically manages how the main agent delegates tasks to specialized sub-agents.

Scalability and Microservices: A structured agent system allows each sub-agent to be treated as an independent microservice (think also A2A and MCP). This lets you scale the most demanding parts of your application, like document analysis, without affecting the others. In a monolithic structure, you would have to design the entire inter-service communication protocol from scratch, including service discovery and request/response schemas. It does not make sense.

Fault Tolerance and Redundancy: By isolating tasks into separate agents, the failure of one component (like the stock predictor) doesn’t crash the entire system. The main agent can continue to operate and handle other requests, ensuring the application remains available.

Latency and Cost of the Solution: Using a main agent as a smart router is cheaper and faster. It uses an efficient model to direct queries to the correct sub-agent, ensuring that powerful, expensive models are only used when absolutely necessary. You can exchange powerful LLMs and fine-tuned open source LLMs, according to the scope of the agent, saving costs.

Cybersecurity: Defining specific tools for each agent in a controlled system limits their capabilities. This reduces the risk of malicious prompts tricking an agent into performing unintended actions, creating a more secure boundary between user input and your tools. This follows the cybersecurity principle of least privilege and separation of duties. In a monolithic solution with no agents, it will be harder to designate the scope of each component. In a monolithic solution where one “god agent” has access to all tools, you have the opposite of these principles. You have maximum privilege and no separation of duties, which creates a much larger and more vulnerable attack surface. Multi-agent systems are inherently more secure because of the boundaries and scopes of authorization that are implemented.

Specialization and Accuracy: Instead of one agent trying to do everything, multiple agents can specialize and eliminate bottlenecks. One becomes an expert at database queries, another at document analysis. This specialization leads to more accurate and reliable answers, also decreasing the cost of the infrastructure necessary to run the solution. Instead of scaling horizontally the whole system, you will scale only parts of it.

Modularity and Maintainability: Agent frameworks are modular. You can update or replace one agent (e.g., the stock predictor) without affecting the others. This makes the application much easier to maintain and upgrade.

Efficiency and Resource Management: The multi-agent architecture ensures agents will use the right tool for the job. Simple queries are handled by simple agents, while complex questions engage more powerful ones. This intelligent routing prevents wasting money and computational power.

Trade-offs between control and speed: Opting out for agents gives you maximum control and flexibility, allowing you to tailor every component and immediately use the latest LLM features, but at the cost of manually writing all the complex orchestration, state management, and routing logic from scratch. For a company trying to gain a competitive advantage, a delay of even a few days or weeks of unnecessary development can be significant.

On the other hand, using an agent framework dramatically accelerates development by providing pre-built, production-ready solutions for these common problems and enforcing a scalable architecture, but the trade-off is that maybe that latest model that was launched yesterday may not be already integrated in the multi-agent core system. Once again, a delay of even a few days or weeks can be significant. Accessing a new model’s breakthrough feature, like a much larger context window, a lower price point, or a new capability, before anyone else can be a major product differentiator. Then you’ll have to wait for this new model be supported by the agent framework.

Agents in Finance

In today’s fast-paced financial markets, investors and analysts face the challenge of navigating a sea of interconnected information. Making informed decisions requires synthesizing vast quantities of data, from structured financial reports with precise metrics like revenue and net income to dense, unstructured documents such as annual SEC 10-K filings, which are filled with critical but often buried qualitative insights.

The traditional process of manually parsing these documents or writing complex database queries is not only time-consuming but also creates a significant barrier for those without specialized technical skills. This information overload creates a clear need for a more intuitive, efficient, and powerful way to access and interpret financial data, enabling users to ask direct questions and receive immediate, comprehensive answers.

To address this challenge, I developed the Financial AI Assistant, a conversational analytics platform that leverages the power of Google Cloud’s AI ecosystem. At its core, the system utilizes Vertex AI, with the efficient Gemini-2.5-flash model, to understand user queries, synthesize information, and generate natural language responses. The entire application is architected around Google’s Agent Development Kit (ADK), which orchestrates a team of specialized AI agents to handle different tasks, by using dynamic tools. For seamless and scalable deployment, the assistant is containerized and served via Google Cloud Run, with container images stored in Google Artifact Registry, providing a serverless, cost-effective solution that scales on demand. This powerful combination of services provides the foundation for a sophisticated yet accessible financial analysis tool.

This article details the journey of building this Financial AI Assistant, demonstrating how modern AI architectural patterns can be applied to the financial domain. We will explore the fusion of knowledge graphs for representing interconnected financial data, Retrieval-Augmented Generation (RAG) for extracting insights from unstructured SEC filings, and a multi-agent framework for intelligent task delegation.

By walking through the data ingestion pipeline, the agent design, and the final deployment process, this piece serves as a comprehensive guide for creating a domain-specific AI assistant. Ultimately, this project showcases how a multi-modal data approach, powered by advanced language models and cloud infrastructure, can transform complex financial analysis into a simple conversation.

Project Structure


Project structure

Github repo for this project:

⭐⭐⭐⭐⭐ if you like it

GitHub - RubensZimbres/Financial_ADK_Agent_Graph_Database: A multi-agent conversational financial analytics platform that combines company fundamentals analysis, SEC filing intelligence, and machine learning-based stock price prediction through an intuitive chat interface.

Fetching Data

First, we will fetch data, necessary to populate the Graph Database. The data ingestion process is automated by a Python script that systematically gathers both structured and unstructured information for a predefined list of companies. For structured data, the script leverages the yfinance library to retrieve two key datasets:

  • it fetches annual income statements, which are then formatted and saved as JSON files, and
  • it downloads five years of historical daily stock prices, saving them as CSV files.
  • for unstructured qualitative data, the script interacts directly with the SEC EDGAR database. Using a company's unique CIK (Central Index Key) identifier, it makes API calls via the requests library to locate and download the full HTML text of the last five annual 10-K filings.

This entire workflow iterates through each company listed in a master companies.csv file, methodically populating a local directory structure with the financial, price, and filing data needed for the assistant's analysis

I asked Gemini 2.5 Pro to generate this companies.csv example data with 500 examples:

ticker,company_name,cik
NVDA,NVIDIA CORP,1045810
MSFT,MICROSOFT CORP,789019
AAPL,Apple Inc.,320193
GOOGL,Alphabet Inc.,1652044
AMZN,AMAZON COM INC,1018724
Enter fullscreen mode Exit fullscreen mode

The script for fetching financial data is this one:

import os
import requests
import pandas as pd
import json
import time
from datetime import datetime
from dotenv import load_dotenv
from tqdm import tqdm
import yfinance as yf 

# Load environment variables
load_dotenv()
SEC_USER_AGENT = os.getenv("SEC_USER_AGENT")

# --- Configuration ---
COMPANIES_CSV_PATH = "companies.csv"
FINANCIALS_DIR = "data/structured/financials"
PRICES_DIR = "data/structured/prices"
FILINGS_10K_DIR = "data/unstructured/10k"

# Create directories if they don't exist
os.makedirs(FINANCIALS_DIR, exist_ok=True)
os.makedirs(PRICES_DIR, exist_ok=True)
os.makedirs(FILINGS_10K_DIR, exist_ok=True)

def fetch_financial_statements(ticker: str):
    """Fetches annual income statements using yfinance."""
    print(f"Fetching financial statements for {ticker}...")
    try:
        stock = yf.Ticker(ticker)
        income_stmt = stock.income_stmt

        if income_stmt.empty:
            print(f" -> No financial data found for {ticker}")
            return

        data = income_stmt.transpose()
        data.index.name = 'date'
        data = data.reset_index()
        data['date'] = data['date'].astype(str) # Convert timestamp to string
        records = data.to_dict('records')

        with open(os.path.join(FINANCIALS_DIR, f"{ticker}_financials.json"), 'w') as f:
            json.dump(records, f, indent=4)
        print(f" -> Saved financials for {ticker}")
    except Exception as e:
        print(f"Error fetching financials for {ticker}: {e}")

def fetch_stock_prices(ticker: str):
    """Fetches the last 5 years of daily stock prices using yfinance."""
    print(f"Fetching stock prices for {ticker}...")
    try:
        stock = yf.Ticker(ticker)
        # Get 5 years of historical market data
        hist = stock.history(period="5y")

        if hist.empty:
            print(f" -> No price data found for {ticker}")
            return

        hist.to_csv(os.path.join(PRICES_DIR, f"{ticker}_prices.csv"))
        print(f" -> Saved prices for {ticker}")
    except Exception as e:
        print(f"Error fetching prices for {ticker}: {e}")

# --- SEC function ---

def fetch_10k_filings(ticker: str, cik: str):
    """Fetches the last 5 annual 10-K filings from the SEC EDGAR database."""
    print(f"Fetching 10-K filings for {ticker} (CIK: {cik})...")
    headers = {'User-Agent': SEC_USER_AGENT}

    submissions_url = f"https://data.sec.gov/submissions/CIK{cik.zfill(10)}.json"
    try:
        response = requests.get(submissions_url, headers=headers)
        response.raise_for_status()
        submissions = response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error fetching submission history for {ticker}: {e}")
        return

    filing_count = 0
    recent_filings = submissions['filings']['recent']

    for i in range(len(recent_filings['form'])):
        if filing_count >= 5:
            break
        if recent_filings['form'][i] == '10-K':
            accession_no = recent_filings['accessionNumber'][i].replace('-', '')
            primary_doc_name = recent_filings['primaryDocument'][i]
            filing_date = recent_filings['filingDate'][i]
            year = filing_date.split('-')[0]

            doc_url = f"https://www.sec.gov/Archives/edgar/data/{cik}/{accession_no}/{primary_doc_name}"

            print(f" -> Downloading 10-K for {year}...")
            try:
                time.sleep(0.2)
                doc_response = requests.get(doc_url, headers=headers)
                doc_response.raise_for_status()

                file_path = os.path.join(FILINGS_10K_DIR, f"{ticker}_10K_{year}.html")
                with open(file_path, 'w', encoding='utf-8') as f:
                    f.write(doc_response.text)

                filing_count += 1
            except requests.exceptions.RequestException as e:
                print(f" Error downloading filing {doc_url}: {e}")

    print(f" -> Finished fetching filings for {ticker}")

if __name__ == " __main__":
    companies_df = pd.read_csv(COMPANIES_CSV_PATH)

    companies_df = companies_df[~companies_df['ticker'].str.contains('\.|\-')]

    for index, row in tqdm(companies_df.iterrows(), total=companies_df.shape[0], desc="Processing Companies"):
        ticker = row['ticker']
        cik = str(row['cik'])

        # --- Fetch and Save Data ---
        fetch_financial_statements(ticker)
        fetch_stock_prices(ticker)
        fetch_10k_filings(ticker, cik)

        time.sleep(0.5)

    print("\nData fetching complete. Check the 'data' directory.")
Enter fullscreen mode Exit fullscreen mode

Once data is gathered, we will populate the Graph database using a populate_graph.py script.

Database Preparation

Before any data is loaded, the script performs a critical cleanup step. It runs a Cypher query (MATCH (n) DETACH DELETE n) to completely wipe all existing nodes and relationships from the database. It also attempts to drop any pre-existing vector index named filings. This ensures that each run starts with a clean slate, preventing data duplication or corruption from previous ingestions.

Phase 1: Ingesting Structured Data

This phase focuses on building the foundational skeleton of the graph with concrete company and financial data.

Company Node Creation: The process begins by reading the companies.csv file into a pandas DataFrame. The script then uses a MERGE operation in Cypher to create a Company node for each ticker. MERGE is used instead of CREATE to intelligently create a node only if it doesn’t already exist, preventing duplicates. Each Company node is populated with properties like its name , ticker , and CIK number.

Financial Node Creation and Linking: Next, the script iterates through all JSON files in the structured financials directory. For each file, it extracts annual financial data points like revenue , net income , and EPS (Earnings Per Share). It then creates a distinct Financials node for each year of data. The most crucial step is linking these nodes: a HAS_FINANCIALS relationship is created from the parent Company node to each of its annual Financials nodes. This establishes the first set of connections in our graph.


Financials data sample

Phase 2: Ingesting Unstructured Data (SEC Filings)

This phase enriches the graph with qualitative insights extracted from text-heavy 10-K filings, combining Large Language Model (LLM) intelligence with vector search capabilities.


SEC 10-K filing

LLM-Powered Entity Extraction: The script first loads the HTML content of 10-K filings from the years 2020 to 2025. For each document, it takes the first 20,000 characters (as a sample), often containing the most critical summaries, and sends them to a Gemini model via a carefully crafted prompt. The prompt instructs the LLM to act as a financial analyst, extracting key entities like key_risks, management_outlook, and major_events and returning them in a structured JSON format.

Graph Construction: The structured JSON output from the LLM is used to weave a rich web of new nodes and relationships into the graph.

  • A Document node is created for the filing and linked to the corresponding Company with a FILED relationship.
  • The extracted entities (risks, events, strategies) are created as their own nodes (e.g., Risk, Event).
  • Multiple relationships are formed to show how everything is connected. For example, a Company HAS_RISK to a Risk node, and the Document MENTIONS_RISK to that same Risk node. This creates a detailed and queryable map of qualitative information.


Node visualization in the Graph Database


Graph database zoom

Vector Embedding for RAG

Finally, to enable semantic search, the script prepares the 10-K filings for Retrieval-Augmented Generation (RAG). It truncates each document’s content to the first 80,000 characters. This text is split into smaller, overlapping chunks (1500 characters each). By using a Vertex AI embedding model, each chunk is converted into a numerical vector.

The langchain_neo4j library then loads these chunks into Neo4j as Chunk nodes, with each node containing the original text and its corresponding vector embedding. A vector index named filings is automatically created on these Chunk nodes, allowing for ultra-fast semantic similarity searches later on.

See the populate_graph.py code:

# populate_graph.py
import os
import pandas as pd
import json
import math
from langchain_community.document_loaders import DirectoryLoader, UnstructuredHTMLLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_google_vertexai import VertexAI, VertexAIEmbeddings
from langchain_neo4j import Neo4jVector
from dotenv import load_dotenv
from tqdm import tqdm
from neo4j import GraphDatabase
import re

load_dotenv()
URI = os.getenv("NEO4J_URI", "bolt://localhost:7687")
AUTH = (os.getenv("NEO4J_USERNAME", "neo4j"), os.getenv("NEO4J_PASSWORD", "password"))
driver = GraphDatabase.driver(URI, auth=AUTH)
llm = VertexAI(model_name="gemini-2.5-flash", temperature=0)
embeddings = VertexAIEmbeddings(model_name="text-embedding-005")

def ingest_structured_data():
    """
    Loads company profiles from CSV and financial data from JSON files,
    then creates Company and Financials nodes in Neo4j.
    """
    print("Ingesting structured company and financial data...")
    companies_df = pd.read_csv('./companies.csv')
    company_records = companies_df.to_dict('records')
    ingest_companies_query = """
    UNWIND $records AS record
    MERGE (c:Company {ticker: record.ticker})
    SET c.name = record.company_name, c.cik = toString(record.cik)
    """
    with driver.session() as session:
        session.run(ingest_companies_query, records=company_records)

    financials_dir = './data/structured/financials'
    if os.path.exists(financials_dir):
        for filename in tqdm(os.listdir(financials_dir), desc="Ingesting Financials"):
            if filename.endswith(".json"):
                ticker = filename.split('_')[0].upper()
                with open(os.path.join(financials_dir, filename), 'r') as f:
                    financials_data = json.load(f)
                records_to_ingest = []
                for item in financials_data:
                    def get_value(key):
                        val = item.get(key)
                        if val is None or (isinstance(val, float) and math.isnan(val)):
                            return None
                        return val
                    record = {
                        'ticker': ticker,
                        'year': item.get('date', '').split('-')[0],
                        'revenue': get_value('Total Revenue'),
                        'netIncome': get_value('Net Income'),
                        'eps': get_value('Basic EPS') or get_value('Diluted EPS')
                    }
                    if record['year']:
                        records_to_ingest.append(record)
                ingest_financials_query = """
                UNWIND $records AS record
                MATCH (c:Company {ticker: record.ticker})
                MERGE (f:Financials {company: c.ticker, year: record.year})
                SET f.revenue = toFloat(record.revenue), f.netIncome = toFloat(record.netIncome), f.eps = toFloat(record.eps)
                MERGE (c)-[:HAS_FINANCIALS]->(f)
                """
                with driver.session() as session:
                    session.run(ingest_financials_query, records=records_to_ingest)
    else:
        print(f"Warning: Financials directory {financials_dir} not found")
    print("Structured data ingestion complete.")

def extract_entities_from_filing(doc):
    """
    Uses an LLM to extract structured entities from the first 20,000 characters of a 10-K filing.
    """
    filename = os.path.basename(doc.metadata.get('source', ''))
    match = re.search(r"([A-Z]+)_10K_(\d{4})", filename)
    if not match:
        print(f"Warning: Could not extract ticker and year from filename: {filename}")
        return None
    ticker, year = match.groups()
    extraction_prompt = f"""
    From the SEC 10-K filing document below for ticker {ticker} and year {year}, extract the following information.
    Focus on the "Risk Factors" and "Management's Discussion and Analysis" sections if possible.
    - key_risks: A list of the 3-5 most significant risks mentioned.
    - management_outlook: A concise, one-paragraph summary of management's outlook.
    - major_events: A list of 1-3 major events from that year.
    - strategic_focus: A list of key strategic areas mentioned.
    Return the information as a valid JSON object with these exact keys. If any information is not found, use an empty list or null.
    Do not include any other text, explanation, or markdown formatting.
    DOCUMENT (first 20000 characters):
    {doc.page_content[:20000]}
    """
    try:
        response = llm.invoke(extraction_prompt)
        cleaned_response = response.strip().replace("```

json", "").replace("

```", "").strip()
        entities = json.loads(cleaned_response)
        entities['ticker'] = ticker
        entities['year'] = year
        return entities
    except (json.JSONDecodeError, Exception) as e:
        print(f"Error processing document {doc.metadata.get('source', 'Unknown')}: {e}")
        print(f"LLM Response was: {response}")
        return None

def ingest_unstructured_data():
    """
    MODIFIED:
    - Extracts entities using the first 20,000 characters.
    - Chunks and creates vector embeddings for the first 80,000 characters.
    """
    print("Ingesting data from 10-K filings (2020-2025)...")
    filings_dir = './data/unstructured/10k/'
    if not os.path.exists(filings_dir):
        print(f"Warning: Filings directory {filings_dir} not found. Skipping unstructured data ingestion.")
        return

    loader = DirectoryLoader(
        filings_dir, glob="**/*.html", loader_cls=UnstructuredHTMLLoader,
        show_progress=True, loader_kwargs={"unstructured_kwargs": {"strategy": "fast"}}, silent_errors=True
    )
    documents = loader.load()
    if not documents:
        print("No documents found. Skipping unstructured data ingestion.")
        return

    target_years = [str(y) for y in range(2020, 2026)]
    docs_to_process = []
    for doc in documents:
        filename = os.path.basename(doc.metadata.get('source', ''))
        if any(year in filename for year in target_years):
            docs_to_process.append(doc)
    if not docs_to_process:
        print("No documents found for target years. Skipping unstructured data ingestion.")
        return

    print(f"Loaded {len(docs_to_process)} documents for years {target_years[0]}-{target_years[-1]}")
    print("Extracting and linking entities from filings...")
    with driver.session() as session:
        for doc in tqdm(docs_to_process, desc="Processing Filings"):
            entities = extract_entities_from_filing(doc)
            if entities and entities.get('ticker'):
                # Cypher query for linking entities 
                link_query = """
                MATCH (c:Company {ticker: $ticker}) MERGE (doc:Document {source: $source})
                ON CREATE SET doc.year = $year, doc.type = '10-K' MERGE (c)-[:FILED]->(doc)
                SET doc.management_outlook = $management_outlook
                WITH c, doc UNWIND $key_risks AS risk_name WHERE risk_name IS NOT NULL AND risk_name <> ""
                MERGE (r:Risk {name: risk_name}) MERGE (c)-[:HAS_RISK]->(r) MERGE (doc)-[:MENTIONS_RISK]->(r)
                WITH c, doc UNWIND $major_events AS event_name WHERE event_name IS NOT NULL AND event_name <> ""
                MERGE (e:Event {name: event_name}) MERGE (c)-[:HAD_EVENT]->(e) MERGE (doc)-[:DESCRIBES_EVENT]->(e)
                WITH c, doc UNWIND $strategic_focus AS strategy_name WHERE strategy_name IS NOT NULL AND strategy_name <> ""
                MERGE (s:Strategy {name: strategy_name}) MERGE (c)-[:HAS_STRATEGY]->(s) MERGE (doc)-[:MENTIONS_STRATEGY]->(s)
                """
                params = {
                    "source": os.path.basename(doc.metadata.get('source')), "ticker": entities.get('ticker'),
                    "year": entities.get('year'), "management_outlook": entities.get('management_outlook'),
                    "key_risks": entities.get('key_risks', []), "major_events": entities.get('major_events', []),
                    "strategic_focus": entities.get('strategic_focus', [])
                }
                try:
                    session.run(link_query, params)
                except Exception as e:
                    print(f"Error executing link query for {entities.get('ticker')}: {e}")

    print("Splitting documents and creating vector embeddings (first 80,000 chars)...")

    docs_to_embed = []
    for doc in docs_to_process:
        truncated_doc = doc.copy()
        truncated_doc.page_content = doc.page_content[:80000] # Slice to 80,000
        docs_to_embed.append(truncated_doc)

    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=150)
    docs_for_vector = text_splitter.split_documents(docs_to_embed)

    try:
        Neo4jVector.from_documents(
            docs_for_vector, embeddings, url=URI, username=AUTH[0], password=AUTH[1],
            database="neo4j", index_name="filings", node_label="Chunk",
            text_node_property="text", embedding_node_property="embedding", create_id_index=True
        )
        print("Unstructured data ingestion and vector indexing complete.")
    except Exception as e:
        print(f"Error creating vector index: {e}")

if __name__ == " __main__":
    print("Clearing database...")
    with driver.session() as session:
        session.run("MATCH (n) DETACH DELETE n")
        try:
            session.run("CALL db.index.vector.drop('filings')")
            print("Dropped existing vector index.")
        except Exception as e:
            print(f"No existing vector index to drop or error: {e}")
    ingest_structured_data()
    ingest_unstructured_data()
    print("\nDatabase population finished.")
    driver.close()
Enter fullscreen mode Exit fullscreen mode

Once the database is populated, we will develop the agents. As we are using Google ADK, a single script handles all the agent’s tasks. The agentic system follows a sophisticated Root (frontend agent)/Sub-Agent (backend agent) architecture. This design mimics a team of financial analysts, with a lead analyst (the Root Agent) who intelligently delegates tasks to specialized analysts (the 3 Sub-Agents with different tools).


agents

The Agent Toolkit

Before defining the agents, we first create the core functions they will use to interact with the data and models. These are the tools (functions) that give the agents their capabilities.

query_graph_database: This tool is designed for structured data queries. When a user asks a question like “What was the revenue for NVDA in 2024?”, this function uses a Gemini LLM to dynamically write a Cypher query. It’s guided by a detailed prompt that includes the database schema and examples of correct queries. The generated Cypher is then executed against the Neo4j graph to fetch precise, factual answers. Here, prompt engineering is the key to success.

retrieve_from_documents: This tool handles qualitative questions by performing Retrieval-Augmented Generation (RAG). It follows a two-step process:

  • Retrieve: It converts the user’s question into a vector embedding and uses it to perform a similarity search on the vector index in Neo4j. This retrieves the most relevant text chunks from the 10-K filings.
  • Synthesize: These retrieved text chunks are combined with the original question in a new prompt to the LLM, which then synthesizes a comprehensive, human-readable answer based on the provided context.

predict_stock_price_tool: This is a straightforward tool that acts as a wrapper for a pre-trained machine learning model. It takes a single stock ticker, validates that it’s one of the available companies, and calls the predict_next_day_price function to get a next-day price prediction.

The Specialist Sub-Agents

With the tools defined, we create three distinct sub-agents, each with a specific role. Each agent is given a name, a tool, and a set of instructions that define its expertise.

Graph QA Agent: This is the quantitative analyst. It’s equipped exclusively with the query_graph_database tool. Its instructions tell it to handle questions about specific financial numbers (revenue, net income), company risks, events, and any query that requires pulling structured data from the knowledge graph.

Document RAG Agent: This is the qualitative researcher. It uses the retrieve_from_documents tool to answer questions that require understanding context and nuance, such as summarizing management’s outlook, explaining business strategies, or detailing risks mentioned in SEC filings.

Stock Predictor Agent: This is the forecaster. Its sole purpose is to use the predict_stock_price_tool. Its instructions are very strict: only activate for explicit requests to predict a stock price and always include a disclaimer that the prediction is not financial advice.

The Root Agent: Orchestrator

The root_agent acts as the team lead or the “brain” of the operation. It does not have any tools of its own. Instead, its “tools” are the three sub-agents. Its primary job is to perform intent recognition and delegation. Based on its detailed instructions, the Root Agent analyzes the incoming user query and determines which specialist is best suited for the job (the dynamic part of the system).

If the query asks for a specific number like “net income,” it delegates to the Graph QA Agent. If the query asks for a summary like “What did Apple say about AI?”, it delegates to the Document RAG Agent. If the query explicitly asks for a “prediction,” it delegates to the Stock Predictor Agent.

This layered, multi-agent approach makes the system modular, scalable, and highly effective at routing complex financial questions to the correct “expert” for a precise and relevant answer.

See the script for agents.py:

# agents.py
"""
Defines the ADK agent team for the financial data application.
This includes a root agent for orchestration and specialized sub-agents
for graph querying, document retrieval, and stock price predictions.
"""
from google.adk.agents import Agent
from google.adk.models.lite_llm import LiteLlm
from ..neo4j_for_adk import graphdb
from app.models.predict import predict_next_day_price
from langchain_google_vertexai import VertexAIEmbeddings

# --- Setup ---
llm = LiteLlm(model="gemini-2.5-flash") # cheap and good model

embeddings = VertexAIEmbeddings(model_name="text-embedding-005")

# --- Tool Definitions ---
def query_graph_database(question: str) -> dict:
    """
    Generates a Cypher query for the financial graph and executes it.
    """
    schema = graphdb.send_query("CALL db.schema.visualization()")["query_result"]

    cypher_generation_prompt = f"""
    Task: Generate a Cypher statement to query a financial graph database.

    Schema: {schema}

    Instructions:
    - Use ONLY the provided relationship types and property keys.
    - The graph contains the following nodes and relationships:
      - (c:Company)-[:HAS_FINANCIALS]->(f:Financials)
      - (c:Company)-[:FILED]->(doc:Document)
      - (c:Company)-[:HAS_RISK]->(r:Risk)
      - (c:Company)-[:HAD_EVENT]->(e:Event)
      - (c:Company)-[:HAS_STRATEGY]->(s:Strategy)
      - (doc:Document)-[:MENTIONS_RISK]->(r:Risk)
      - (doc:Document)-[:DESCRIBES_EVENT]->(e:Event)
      - (doc:Document)-[:MENTIONS_STRATEGY]->(s:Strategy)
      - (chunk:Chunk) nodes with vector embeddings for document chunks

    - Key properties for nodes:
      - Company: `ticker` (e.g., 'NVDA'), `name`, `cik`
      - Financials: `company` (ticker), `year` (string like '2024'), `revenue`, `netIncome`, `eps`
      - Risk, Event, Strategy: `name`
      - Document: `source` (filename), `year`, `type`, `management_outlook`
      - Chunk: `text`, `embedding` (vector)

    - IMPORTANT: The Financials node uses `company` property (not ticker directly) and `year` is a STRING
    - Company tickers in your data: NVDA, MSFT, AAPL, GOOGL, AMZN

    Example Questions & Queries (ticker and year are database property names, not variables):
    - Question: "What was the revenue for NVDA in 2024?"
      Query: MATCH (c:Company {{ticker: 'NVDA'}})-[:HAS_FINANCIALS]->(f:Financials {{year: '2024'}}) RETURN f.revenue
    - Question: "What are the key risks for NVDA?"
      Query: MATCH (c:Company {{ticker: 'NVDA'}})-[:HAS_RISK]->(r:Risk) RETURN r.name
    - Question: "Show me financial trends for NVDA over the years"
      Query: MATCH (c:Company {{ticker: 'NVDA'}})-[:HAS_FINANCIALS]->(f:Financials) RETURN f.year, f.revenue, f.netIncome, f.eps ORDER BY f.year
    - Question: "What events happened at Apple?"
      Query: MATCH (c:Company {{ticker: 'AAPL'}})-[:HAD_EVENT]->(e:Event) RETURN e.name

    Question: {question}
    Return only the Cypher query, no explanation or formatting.
    """

    cypher_query = llm.llm_client.completion(
        model=llm.model,
        messages=[{"role": "user", "content": cypher_generation_prompt}],
        tools=[], # <-- ADD THIS LINE
    ).choices[0].message.content.strip()

    cypher_query = cypher_query.replace("```

cypher", "").replace("

```", "").strip()
    print(f"Generated Cypher: {cypher_query}")

    return graphdb.send_query(cypher_query)

def retrieve_from_documents(question: str) -> dict:
    """
    Performs vector search on 10-K filing chunks and synthesizes an answer.
    """
    question_embedding = embeddings.embed_query(question)

    search_query = """
    CALL db.index.vector.queryNodes('filings', 5, $embedding) YIELD node, score
    RETURN node.text AS text, score
    ORDER BY score DESC
    """

    search_results = graphdb.send_query(search_query, {"embedding": question_embedding})

    if search_results['status'] == 'error' or not search_results['query_result']:
        return {"answer": "Could not retrieve relevant documents from filings.", "error": search_results.get('message', 'Unknown error')}

    context = "\n".join([r['text'] for r in search_results['query_result']])

    synthesis_prompt = f"""
    Based on the following context from SEC 10-K filings, answer the question comprehensively.

    Context from filings:
    {context}

    Question: {question}

    Instructions:
    - Provide a detailed answer based on the context
    - If the context doesn't contain relevant information, say so
    - Cite specific information from the filings when possible
    - Focus on the financial and strategic aspects mentioned

    Answer:
    """

    response = llm.llm_client.completion(
        model=llm.model,
        messages=[{"role": "user", "content": synthesis_prompt}],
        tools=[], 
    ).choices[0].message.content

    return {"answer": response}

def predict_stock_price_tool(ticker: str) -> dict:
    """
    A wrapper for the stock price prediction model.
    Input must be a single, valid stock ticker string from our available companies.
    """
    valid_tickers = {'NVDA', 'MSFT', 'AAPL', 'GOOGL', 'AMZN'}

    if not isinstance(ticker, str):
        return {"error": f"Invalid input type. Please provide a ticker as a string."}

    ticker = ticker.upper().strip()

    if ticker not in valid_tickers:
        return {"error": f"Ticker '{ticker}' not found. Available tickers: {', '.join(valid_tickers)}"}

    print(f"Predicting price for ticker: {ticker}")
    return predict_next_day_price(ticker)

# --- Sub-Agent Definitions ---
graph_qa_subagent = Agent(
    name="GraphQA_Agent",
    model=llm,
    tools=[query_graph_database],
    description="Use for questions about company financials (revenue, net income, EPS), risks, events, strategies, and any structured data queries. Works with tickers: NVDA, MSFT, AAPL, GOOGL, AMZN.",
    instruction="""
    Your task is to use the `query_graph_database` tool to answer questions about:
    - Financial metrics (revenue, net income, EPS) by company and year
    - Company risks, events, and strategic focuses
    - Comparisons between companies
    - Financial trends over time

    Always use the exact ticker symbols: NVDA, MSFT, AAPL, GOOGL, AMZN
    Remember that years are stored as strings (e.g., '2024', '2023').
    """
)

document_rag_subagent = Agent(
    name="DocumentRAG_Agent",
    model=llm,
    tools=[retrieve_from_documents],
    description="Use for qualitative questions about company strategy, management outlook, detailed business descriptions, or any information that requires reading through SEC 10-K filing text.",
    instruction="""
    Your task is to use the `retrieve_from_documents` tool to find detailed, qualitative information from SEC filings including:
    - Management's discussion and analysis
    - Business strategy and outlook
    - Detailed risk descriptions
    - Product and service descriptions
    - Market analysis and competitive positioning

    Provide comprehensive answers based on the retrieved document chunks.
    """
)

prediction_subagent = Agent(
    name="StockPricePredictor_Agent",
    model=llm,
    tools=[predict_stock_price_tool],
    description="Use ONLY to predict the next day's closing stock price. Works with tickers: NVDA, MSFT, AAPL, GOOGL, AMZN.",
    instruction="""
    Your only task is to use the `predict_stock_price_tool` for stock price predictions.

    IMPORTANT:
    - Only valid tickers: NVDA, MSFT, AAPL, GOOGL, AMZN
    - Input must be a single ticker string
    - Always include a disclaimer that predictions are estimates based on historical data and not financial advice
    """
)

# --- Root Agent Definition ---
root_agent = Agent(
    name="Financial_Root_Agent",
    model=llm,
    sub_agents=[graph_qa_subagent, document_rag_subagent, prediction_subagent],
    description="The main financial assistant that analyzes user queries and delegates to specialized agents for financial data analysis.",
    instruction="""
    You are a knowledgeable financial data assistant with access to data for these companies: NVDA, MSFT, AAPL, GOOGL, AMZN.

    DELEGATION GUIDELINES:
    - Use 'GraphQA_Agent' for:
      * Specific financial numbers (revenue, net income, EPS)
      * Company risks, events, strategies (structured data)
      * Financial comparisons and trends
      * Any query requiring precise data extraction

    - Use 'DocumentRAG_Agent' for:
      * Qualitative analysis and detailed explanations
      * Management outlook and business strategy discussions
      * Complex business descriptions
      * Questions requiring reading through filing narratives

    - Use 'StockPricePredictor_Agent' ONLY for:
      * Explicit requests to predict future stock prices
      * Must use valid tickers: NVDA, MSFT, AAPL, GOOGL, AMZN

    IMPORTANT NOTES:
    - Available companies: NVIDIA (NVDA), Microsoft (MSFT), Apple (AAPL), Alphabet/Google (GOOGL), Amazon (AMZN)
    - Financial data years: 2021-2024
    - Always include disclaimers for predictions
    - If uncertain about which agent to use, explain your reasoning
    """
)
Enter fullscreen mode Exit fullscreen mode

Finally, we train the autoregressive predictive model for each stock. This process creates a custom machine learning model for every company that can be used by the Stock Predictor Agent.

Feature Engineering

To prepare the data for training, the script first performs feature engineering to create a set of predictive inputs from the raw historical data. This autoregressive model uses past values to predict future ones. The key features created include:

Lag Features: The closing prices of the last 10 days.

Rolling Window Features: 5-day and 20-day moving averages for both price and volume to capture recent trends.

Volume Features: The previous day’s trading volume.

The model’s target is to predict the stock’s closing price one day into the future.

Model Training and Saving

The script then iterates through each company’s price data. For each stock, it applies the feature engineering process and then trains a LightGBM Regressor model on the company’s entire historical dataset. Using the full history allows the model to make the most informed prediction possible for the next day.


Prices data

After a model is trained for a specific stock, two files are saved: the trained model object itself and a separate file containing the list of feature names the model expects. This ensures that the prediction tool can consistently provide the correct input structure. This loop repeats until a unique, serialized model exists for every stock. In order to achieve better results for specific stocks, use Optuna parameter optimization.

# train_predictor.py

import pandas as pd
import numpy as np
import joblib
import os
import lightgbm as lgb
from tqdm import tqdm

# --- Configuration ---
PRICES_DIR = "./data/structured/prices"
MODEL_DIR = "./app/models/saved_models" # Matching your project structure
os.makedirs(MODEL_DIR, exist_ok=True)

# --- Feature Engineering Parameters ---
WINDOW_SIZE = 10         
PREDICTION_HORIZON = 1   

def create_features(df):
    """Creates time-series features from a stock price DataFrame."""
    # Create a new DataFrame for features to avoid modifying the original
    featured_df = df[['Close', 'Volume']].copy()

    # 1. Lag Features (autoregressive part)
    for i in range(1, WINDOW_SIZE + 1):
        featured_df[f'Close_lag_{i}'] = featured_df['Close'].shift(i)

    # 2. Rolling Window Features
    featured_df['MA_5'] = featured_df['Close'].rolling(window=5).mean()
    featured_df['MA_20'] = featured_df['Close'].rolling(window=20).mean()

    # 3. Volume-based Features
    featured_df['Volume_lag_1'] = featured_df['Volume'].shift(1)
    featured_df['Volume_MA_5'] = featured_df['Volume'].rolling(window=5).mean()

    # 4. Create the target variable
    featured_df['target'] = featured_df['Close'].shift(-PREDICTION_HORIZON)

    featured_df.dropna(inplace=True)

    return featured_df

if __name__ == " __main__":
    price_files = [f for f in os.listdir(PRICES_DIR) if f.endswith('_prices.csv')]

    for file in tqdm(price_files, desc="Training Models for each stock"):
        ticker = file.split('_')[0]

        # Load data
        df = pd.read_csv(os.path.join(PRICES_DIR, file))
        df['Date'] = pd.to_datetime(df['Date'])
        df.set_index('Date', inplace=True)
        df.sort_index(inplace=True)

        # Create features
        data = create_features(df)

        if data.empty:
            print(f"Skipping {ticker}: Not enough data to create features.")
            continue

        # Define features (X) and target (y)
        X = data.drop(columns=['target', 'Close', 'Volume'])
        y = data['target']

        # Train the model
        print(f"\nTraining model for {ticker} with {len(X.columns)} features...")

        model = lgb.LGBMRegressor(
            random_state=42,
            n_estimators=200, # More estimators for better performance
            learning_rate=0.05,
            num_leaves=31
        )
        model.fit(X, y)

        # Save the trained model and the list of features it expects
        joblib.dump(model, os.path.join(MODEL_DIR, f"{ticker}_price_regressor.joblib"))
        joblib.dump(X.columns.tolist(), os.path.join(MODEL_DIR, f"{ticker}_features.joblib"))

        print(f" Model for {ticker} saved.")

    print("\nTraining complete! All models saved.")
Enter fullscreen mode Exit fullscreen mode

… and also create the predict.py script, to be run when someone asks for a specific stock price prediction in the chat interface:

# app/models/predict.py

import pandas as pd
import numpy as np
import joblib
import os
from pathlib import Path

# --- Configuration ---
MODEL_DIR = Path( __file__ ).resolve().parent / "saved_models"
PRICES_DIR = Path( __file__ ).resolve().parent.parent.parent / "data/structured/prices"

def predict_next_day_price(ticker: str) -> dict:
    """
    Predicts the next day's closing price for a given stock ticker.

    Args:
        ticker: The stock ticker (e.g., 'AAPL').

    Returns:
        A dictionary with the predicted price or an error message.
    """
    try:
        # Load the trained model and its required features
        model = joblib.load(MODEL_DIR / f"{ticker}_price_regressor.joblib")
        features_list = joblib.load(MODEL_DIR / f"{ticker}_features.joblib")

        # Load the latest historical data for the ticker
        df = pd.read_csv(PRICES_DIR / f"{ticker}_prices.csv")
        df['Date'] = pd.to_datetime(df['Date'])
        df.set_index('Date', inplace=True)
        df.sort_index(inplace=True)

        # Take a slice of the last ~30 days to ensure rolling windows can be calculated
        latest_data = df.tail(30).copy()

        # 1. Lag Features
        for i in range(1, 11): # WINDOW_SIZE is 10
            latest_data[f'Close_lag_{i}'] = latest_data['Close'].shift(i)

        # 2. Rolling Window Features
        latest_data['MA_5'] = latest_data['Close'].rolling(window=5).mean()
        latest_data['MA_20'] = latest_data['Close'].rolling(window=20).mean()

        # 3. Volume-based Features
        latest_data['Volume_lag_1'] = latest_data['Volume'].shift(1)
        latest_data['Volume_MA_5'] = latest_data['Volume'].rolling(window=5).mean()

        prediction_features = latest_data.tail(1)

        prediction_features = prediction_features[features_list]

        predicted_price = model.predict(prediction_features)[0]

        return {
            "ticker": ticker,
            "predicted_next_day_close": round(float(predicted_price), 2)
        }

    except FileNotFoundError:
        return {"error": f"Model or data for ticker '{ticker}' not found. Please ensure it has been trained."}
    except Exception as e:
        return {"error": f"An error occurred during prediction for {ticker}: {e}"}

if __name__ == ' __main__':
    sample_ticker = 'AAPL'
    prediction = predict_next_day_price(sample_ticker)

    if "error" in prediction:
        print(f"Error: {prediction['error']}")
    else:
        print(f"Prediction for {prediction['ticker']}:")
        print(f" Predicted Close Price for Tomorrow: ${prediction['predicted_next_day_close']}")
Enter fullscreen mode Exit fullscreen mode

Now we can run our project using uvicorn:

uvicorn app.main:app --reload --port 8080
Enter fullscreen mode Exit fullscreen mode

The app will run at http://127.0.0.1:8080

For deployment in Cloud Run:

gcloud auth login
gcloud config set project YOUR-PROJECT

gcloud artifacts repositories create financial-assistant-repo \
    --repository-format=docker \
    --location=us-central1 \
    --description="Docker repository for financial assistant service"

gcloud builds submit --tag us-central1-docker.pkg.dev/YOUR-PROJECT/financial-assistant-repo/assistant-service:latest

gcloud run deploy financial-assistant-service \
    --image=us-central1-docker.pkg.dev/YOUR-PROJECT/financial-assistant-repo/assistant-service:latest \
    --platform=managed \
    --region=us-central1 \
    --allow-unauthenticated \
    --set-env-vars-from-file=.env \
    --min-instances 0 \
    --max-instances 3 \
    --cpu 4 \
    --memory 8192Mi \
    --concurrency 10
Enter fullscreen mode Exit fullscreen mode

This article I wrote some years ago implements an ABM system using NetLogo (written in Scala and Java), getting input from environment via Raspberry sensors to simulate agents in a social network.

Clap ➕ if you liked ☺️☺️☺️

Acknowledgements

Google ML Developer Programs and Google Developers Program supported this work by providing Google Cloud Credits (and awesome tutorials for the Google Developer Experts)

🔗 https://developers.google.com/machine-learning 🔗

Top comments (0)