DEV Community

Cover image for AI Tech Daily Agent — Complete Architecture Deep Dive & Workflow Analysis
GAUTAM MANAK
GAUTAM MANAK

Posted on • Originally published at github.com

AI Tech Daily Agent — Complete Architecture Deep Dive & Workflow Analysis

AI Tech Daily Agent — from search and data to published deep-dive article

A Comprehensive Exploration of Building an Autonomous AI Agent for Daily Tech Journalism


Table of Contents

  1. Introduction
  2. Project Overview
  3. Core Architecture
  4. Component Analysis
  5. Workflow Pipeline
  6. Service Layer Deep Dive
  7. Protocol Implementation
  8. Agent Lifecycle
  9. Data Flow & Orchestration
  10. Code Analysis: Key Patterns
  11. Deployment & Infrastructure
  12. Technical Challenges & Solutions
  13. Future Enhancements
  14. Conclusion

Introduction

The AI Tech Daily Agent represents a sophisticated implementation of an autonomous AI agent designed to automate technical journalism. Built on the Fetch.ai uAgents framework, this system orchestrates multiple services to research, analyze, and generate comprehensive deep-dive articles about AI and technology companies on a daily basis.

This project demonstrates the power of agent-based systems in automating complex, multi-step workflows that typically require significant human effort. By integrating web search, content scraping, GitHub API integration, large language models (LLMs), and image search into a cohesive pipeline, the agent produces high-quality, research-backed articles with minimal human intervention.

Key Capabilities:

  • Automated company selection based on topic coverage
  • Real-time news aggregation from multiple sources
  • GitHub repository tracking for open source projects
  • Web scraping for in-depth content analysis
  • LLM-powered article generation with specific formatting requirements
  • Dev.to platform integration for automated publishing
  • Chat interface for interactive control and monitoring
  • Session management and conversation handling

Project Overview

Purpose & Mission

The AI Tech Daily Agent exists to solve a specific problem: the high effort required to produce daily, in-depth technical content about rapidly evolving AI and technology companies. Traditional technical journalism requires journalists to:

  1. Monitor multiple news sources
  2. Track GitHub repositories
  3. Analyze company announcements
  4. Understand technical details
  5. Write comprehensive articles
  6. Format for various platforms
  7. Publish and distribute content

This agent automates the entire pipeline, reducing what would typically take several hours of human work into a 2-3 minute automated process.

Technology Stack

The project leverages a modern Python-based technology stack:

Core Framework:

  • uAgents Protocol (Fetch.ai): Decentralized agent communication protocol
  • Python 3.11+: Modern Python with async/await support
  • uv: Fast Python package manager

Web & Data:

  • Requests: HTTP client for API interactions
  • GitHub REST API: Repository and release tracking
  • Dev.to API: Content publishing platform
  • Bing/Web Search APIs: News and web search capabilities

AI & NLP:

  • OpenAI/LLM APIs: Content generation and analysis
  • LangChain-style prompting: Structured prompt engineering

Infrastructure:

  • Agentverse: Agent hosting and discovery platform
  • Almanac Contracts: Decentralized service registration
  • Environment Configuration: Flexible deployment setup

Project Structure

ai-tech-daily-agent/
├── agent.py                    # Main agent entry point
├── config/
│   ├── __init__.py
│   └── sources.py             # Tracked repositories & companies
├── protocols/
│   ├── __init__.py
│   └── chat_proto.py          # Chat protocol implementation
├── services/
│   ├── __init__.py
│   ├── article_service.py     # Article generation logic
│   ├── company_picker.py      # Company selection algorithm
│   ├── devto_service.py       # Dev.to API integration
│   ├── github_service.py      # GitHub API integration
│   ├── image_search_service.py # Image finding logic
│   ├── llm_service.py         # LLM abstraction layer
│   ├── publish_service.py     # Publishing orchestration
│   ├── web_scraper_service.py # Content scraping
│   └── web_search_service.py  # Search API wrapper
├── tests/
│   ├── __init__.py
│   └── test_filter.py         # Unit tests
├── pyproject.toml             # Project dependencies
├── uv.lock                    # Locked dependency versions
├── .gitignore
├── README.md
├── PROJECT_DEEP_DIVE.md       # This document
└── docs/
    └── deep-dive/             # Generated diagram images (PNG)
        ├── architecture.png
        ├── pipeline.png
        └── data-flow.png
Enter fullscreen mode Exit fullscreen mode

This structure follows clean architecture principles with clear separation of concerns:

  • Configuration in config/
  • Protocol definitions in protocols/
  • Business logic in services/
  • Entry point at the root

Core Architecture

System Architecture Diagram

The AI Tech Daily Agent follows a multi-layered architecture designed for modularity, scalability, and maintainability.

Illustrative architecture (view on GitHub):

System architecture — layers from Agentverse/uAgents through agent.py, services, and external APIs

Architectural Principles

The architecture embodies several key principles that make it robust and maintainable:

1. Separation of Concerns
Each service has a single, well-defined responsibility:

  • company_picker.py - Only handles company selection logic
  • github_service.py - Only GitHub API interactions
  • article_service.py - Only article generation
  • publish_service.py - Only publishing logic

2. Dependency Injection
Services receive their dependencies as parameters, making testing and flexibility easier:

def generate_article(
    company: dict,
    search_data: dict,
    scraped_content: str,
    github_repos: list[dict],
    images: dict[str, str],
) -> tuple[str, str]:
Enter fullscreen mode Exit fullscreen mode

3. Async/Await Pattern
Network operations use async to prevent blocking:

async def _run_pipeline(ctx: Context) -> str:
    result = await asyncio.to_thread(run_pipeline, dry_run)
    return result
Enter fullscreen mode Exit fullscreen mode

4. Error Handling & Fallbacks
Graceful degradation when services fail:

if result:
    # Use LLM-generated content
else:
    result = _fallback_article(...)
Enter fullscreen mode Exit fullscreen mode

5. Configuration Externalization
All tracked companies and repositories are in config/sources.py, not hardcoded:

TRACKED_COMPANIES = [...]
TRACKED_FRAMEWORK_REPOS = [...]
Enter fullscreen mode Exit fullscreen mode

Communication Model

The agent uses the uAgents protocol for inter-agent communication:

Chat Protocol:

  • Implements the standard uAgents chat protocol specification
  • Supports session management with StartSessionContent and EndSessionContent
  • Message acknowledgments for reliable delivery
  • Text-based commands for user interaction

Key Protocol Features:

# Session start
StartSessionContent  Welcome message

# User commands
TextContent("generate")  Start pipeline
TextContent("status")  Show history
TextContent("help")  Show commands

# Acknowledgments
ChatAcknowledgement  Confirmation of receipt
Enter fullscreen mode Exit fullscreen mode

Component Analysis

1. Main Agent (agent.py)

The agent.py file serves as the entry point and orchestrator for the entire system.

Key Responsibilities:

  1. Agent Registration: Registers with Agentverse using the Almanac contract
  2. Protocol Setup: Attaches the chat protocol for user interaction
  3. Pipeline Orchestration: Coordinates the execution of all services
  4. Environment Configuration: Handles dry-run modes and API keys
  5. Logging: Provides comprehensive logging throughout the pipeline

Critical Code Flow:

# Agent registration
Agent(
    name="ai-tech-daily-agent",
    port=8000,
    seed=AGENT_SEED,
    endpoint=["http://localhost:8000/submit"],
)

# Main pipeline
def run_pipeline(dry_run: bool = False) -> str:
    1. Check history and select company
    2. Perform web/search queries
    3. Fetch GitHub repository data
    4. Scrape and read content
    5. Generate article using LLM
    6. Find appropriate images
    7. Optionally publish to Dev.to
    8. Update history
Enter fullscreen mode Exit fullscreen mode

Design Pattern: Pipeline/Chain of Responsibility

The run_pipeline function implements a pipeline pattern where each step builds on the previous one:

def run_pipeline(dry_run: bool = False) -> str:
    # Step 1: Company Selection
    history = get_history()
    company = select_company(history, TRACKED_COMPANIES)

    # Step 2: Data Collection
    search_data = {
        "news": search_news(...),
        "web": search_web(...),
        "github": search_github(...),
    }

    # Step 3: Content Gathering
    github_repos = get_all_repos()
    scraped_content = scrape_and_read(...)

    # Step 4: Article Generation
    article, filename = generate_article(...)

    # Step 5: Publishing
    if not dry_run:
        devto_id = publish_to_devto(...)

    return result
Enter fullscreen mode Exit fullscreen mode

Each step passes its output to the next, creating a data transformation pipeline.

2. Company Picker Service (company_picker.py)

The company picker implements the core decision-making logic for which company to feature each day.

Algorithm:

  1. Load History: Read history.json to see previous coverage
  2. Filter Candidates: Remove companies covered in last 14 days
  3. Random Selection: Pick from remaining candidates
  4. Update History: Record the selection

Key Code:

def select_company(history: list[dict], companies: list[dict]) -> dict:
    cutoff = (datetime.now() - timedelta(days=14)).isoformat()
    recent_slugs = {h["slug"] for h in history if h["date"] >= cutoff}

    candidates = [c for c in companies if c["slug"] not in recent_slugs]

    if not candidates:
        log.warning("No candidates available after 14-day filter")
        return companies[0]

    return random.choice(candidates)
Enter fullscreen mode Exit fullscreen mode

Design Considerations:

  • 14-Day Cooling Period: Prevents repetitive coverage
  • Random Selection: Ensures variety in coverage
  • Fallback Mechanism: If all companies are recent, pick the first one
  • Slug Matching: Uses simple string matching for easy comparison

Data Structure:

COMPANY_TRACKING = [
    {
        "name": "OpenAI",
        "slug": "openai",
        "topics": ["llm", "generative-ai", "gpt"],
    },
    {
        "name": "Anthropic",
        "slug": "anthropic",
        "topics": ["llm", "claude", "safety"],
    },
    # ... more companies
]
Enter fullscreen mode Exit fullscreen mode

3. Web Search Service (web_search_service.py)

This service abstracts web search operations for news and general web search.

API Integration:

The service integrates with search APIs (likely Bing or similar) to fetch:

  • News articles with titles, URLs, bodies, and dates
  • Web search results with titles and descriptions

Key Functionality:

def search_news(company: str, topics: list[str]) -> list[dict]:
    """
    Search for recent news about the company.
    Returns list of news items with title, url, body, date.
    """
    queries = [company] + topics
    all_news = []

    for query in queries:
        results = _call_search_api(query="news:" + query)
        all_news.extend(results)

    return _deduplicate(all_news)

def search_web(company: str) -> list[dict]:
    """
    General web search for company information.
    """
    return _call_search_api(query=company)
Enter fullscreen mode Exit fullscreen mode

Data Transformation:

Raw search results are transformed into a standardized format:

# Raw API response
{
    "title": "...",
    "url": "...",
    "snippet": "...",
    "date": "...",
}

# Transformed to internal format
{
    "title": "...",
    "url": "...",
    "body": "...",
    "date": "...",
}
Enter fullscreen mode Exit fullscreen mode

Error Handling:

The service includes robust error handling for:

  • API failures (returns empty list)
  • Rate limiting (with retries)
  • Network timeouts
  • Malformed responses

Workflow Pipeline

Complete Pipeline Overview

The AI Tech Daily Agent executes a comprehensive pipeline that transforms a simple command into a published article. Here's the complete workflow.

Illustrative pipeline (view on GitHub):

End-to-end pipeline from generate command through publishing and history

Pipeline Execution Details

Phase 1: Company Selection (5 seconds)

# Load history file
if os.path.exists(HISTORY_FILE):
    history = json.loads(Path(HISTORY_FILE).read_text())
else:
    history = []

# Apply temporal filter
cutoff = (datetime.now() - timedelta(days=14)).isoformat()
recent_slugs = {h["slug"] for h in history if h["date"] >= cutoff}

# Select company
candidates = [c for c in TRACKED_COMPANIES if c["slug"] not in recent_slugs]
company = random.choice(candidates)
Enter fullscreen mode Exit fullscreen mode

Phase 2: Data Collection (30-45 seconds)

Concurrent API calls for efficiency:

# Parallel search with different query variations
news_queries = [
    company["name"],
    company["name"] + " news",
    company["name"] + " announcement",
    *company["topics"]
]

all_news = []
for query in news_queries:
    news = search_news(query)
    all_news.extend(news)

# Deduplicate results
seen_urls = set()
unique_news = [n for n in all_news if n["url"] not in seen_urls]
Enter fullscreen mode Exit fullscreen mode

Phase 3: GitHub Data (20-30 seconds)

Two types of GitHub data collection:

# 1. Tracked frameworks (known repos)
frameworks = []
for repo in TRACKED_FRAMEWORK_REPOS:
    data = fetch_github_repo(repo["owner"], repo["repo"])
    release = get_latest_release(repo["owner"], repo["repo"])
    frameworks.append({...})

# 2. Trending new repos (discovery)
trending = []
for query in SEARCH_QUERIES:
    repos = github_search_repository(query, 
                                     sort="stars",
                                     created=">7 days ago")
    trending.extend(repos)
Enter fullscreen mode Exit fullscreen mode

Phase 4: Content Scraping (30-60 seconds)

# Get top URLs from search results
top_urls = [item["url"] for item in search_results[:10]]

# Scrape and read content
scraped_text = ""
for url in top_urls:
    try:
        html = requests.get(url, timeout=15).text
        text = extract_text_from_html(html)
        scraped_text += text
        if len(scraped_text) > 10000:  # Limit content
            break
    except Exception as e:
        log.warning(f"Failed to scrape {url}: {e}")
Enter fullscreen mode Exit fullscreen mode

Phase 5: Article Generation (30-45 seconds)

# Build comprehensive prompt
system_prompt = f"""
You are a senior tech journalist...
TODAY'S FOCUS: {company_name}
RULES:
- Article MUST be 300+ lines
- Include specific numbers: stars, funding, users
- Include 2-3 code snippets
- Include links to sources
"""

user_prompt = f"""
Company topics: {topics}

=== REAL-TIME NEWS ===
{formatted_news}

=== WEB SEARCH RESULTS ===
{formatted_web}

=== GITHUB SEARCH ===
{formatted_github}

=== TRACKED REPOS ===
{formatted_repos}

=== SCRAPED CONTENT ===
{scraped_content[:8000]}
"""

# Generate article
article = call_llm(system_prompt, user_prompt, 
                   temperature=0.7, 
                   max_tokens=8000)
Enter fullscreen mode Exit fullscreen mode

Phase 6: Image Enhancement (15-20 seconds)

images = {}

# Search for logo
logo_url = search_images(f"{company} logo official website")
if logo_url:
    images["logo"] = logo_url

# Search for hero image
hero_url = search_images(f"{company} technology platform")
if hero_url:
    images["hero"] = hero_url

# Search for tech images
banner_url = search_images(f"{company} architecture technology")
if banner_url:
    images["banner"] = banner_url
Enter fullscreen mode Exit fullscreen mode

Phase 7: Publishing (10-15 seconds)

# Save local copy
filename = f"{slug}-{date}.md"
article_path = Path("articles") / filename
article_path.write_text(article)

# Publish to Dev.to
if not dry_run and devto_api_key:
    devto_id = create_devto_article(
        title=f"{company} — Deep Dive",
        body_markdown=article,
        tags=company["topics"] + ["ai", "technology"],
        published=True
    )
    url = f"https://dev.to/{devto_username}/{slug}"
else:
    url = f"Local: {article_path}"
Enter fullscreen mode Exit fullscreen mode

Phase 8: History Update (2 seconds)

history.append({
    "name": company["name"],
    "slug": company["slug"],
    "date": datetime.now().isoformat(),
    "article_url": url,
    "devto_id": devto_id
})

# Persist to file
Path(HISTORY_FILE).write_text(json.dumps(history, indent=2))
Enter fullscreen mode Exit fullscreen mode

Total Pipeline Time: ~2-3 minutes


Service Layer Deep Dive

GitHub Service (github_service.py)

The GitHub service is a critical component that provides both tracking of known repositories and discovery of new trending projects.

Authentication:

def _headers() -> dict:
    h = {
        "Accept": "application/vnd.github+json",
        "User-Agent": "AI-Tech-Daily-Agent/1.0"
    }
    token = os.getenv("GH_TOKEN") or os.getenv("GITHUB_TOKEN")
    if token:
        h["Authorization"] = f"token {token.strip()}"
    return h
Enter fullscreen mode Exit fullscreen mode

Key Features:

  1. Framework Tracking: Monitors known AI agent frameworks
  2. Trending Discovery: Finds new repositories created in the last 7 days
  3. Release Tracking: Tracks latest releases for version information
  4. Metadata Collection: Extracts stars, language, description, activity

Framework Tracking Logic:

def get_framework_updates() -> list[dict]:
    results = []

    for repo_info in TRACKED_FRAMEWORK_REPOS:
        # Fetch repository metadata
        resp = requests.get(
            f"https://api.github.com/repos/{owner}/{repo}",
            headers=headers,
            timeout=10
        )
        data = resp.json()

        # Fetch latest release
        release_info = _get_latest_release(owner, repo, headers)

        # Build comprehensive record
        results.append({
            "name": f"{owner}/{repo}",
            "label": repo_info["label"],
            "url": data["html_url"],
            "description": data["description"],
            "stars": data["stargazers_count"],
            "language": data.get("language"),
            "updated_at": data.get("pushed_at"),
            "latest_release": release_info,
            "type": "tracked"
        })

    # Sort by recent activity
    results.sort(key=lambda x: x.get("updated_at", ""), reverse=True)
    return results
Enter fullscreen mode Exit fullscreen mode

Trending Search Logic:

def search_trending_repos() -> list[dict]:
    one_week_ago = (datetime.utcnow() - timedelta(days=7)).strftime("%Y-%m-%d")

    queries = [
        "ai agent",
        "llm agent framework",
        "mcp server",
        "agentic ai",
        "autonomous agent",
        # ... more queries
    ]

    all_repos = []

    for query in queries:
        resp = requests.get(
            "https://api.github.com/search/repositories",
            params={
                "q": f"{query} created:>{one_week_ago}",
                "sort": "stars",
                "order": "desc",
                "per_page": 5
            },
            headers=headers
        )

        for repo in resp.json().get("items", []):
            all_repos.append({
                "name": repo["full_name"],
                "url": repo["html_url"],
                "description": repo["description"],
                "stars": repo["stargazers_count"],
                "language": repo["language"],
                "type": "trending"
            })

    # Deduplicate and sort by stars
    unique = list({r["name"]: r for r in all_repos}.values())
    unique.sort(key=lambda x: x["stars"], reverse=True)
    return unique[:10]
Enter fullscreen mode Exit fullscreen mode

Rate Limiting Considerations:

  • Uses GitHub REST API which has rate limits
  • Implements timeout handling (10-15 seconds per request)
  • Catches and logs failures without crashing
  • No explicit rate limiting code, relies on GitHub's default limits

Article Service (article_service.py)

The article service is the core content generation component that orchestrates LLM-based article writing.

Main Generation Function:

def generate_article(
    company: dict,
    search_data: dict,
    scraped_content: str,
    github_repos: list[dict],
    images: dict[str, str],
) -> tuple[str, str]:
Enter fullscreen mode Exit fullscreen mode

Prompt Engineering Strategy:

The service uses sophisticated prompt engineering to ensure high-quality output:

1. System Prompt - Sets Persona and Rules:

system = f"""You are a senior tech journalist and developer advocate writing an in-depth daily article for "AI & Tech Daily".

TODAY'S FOCUS: {name}

Write a COMPREHENSIVE deep-dive about {name} — covering everything happening RIGHT NOW.

RULES:
- Article MUST be 300+ lines of markdown
- ALL content must be based on the real-time search data provided — do NOT invent facts
- Include specific numbers: star counts, funding, users, version numbers
- Include 2-3 code snippets showing how to use their tools/products
- Include links to sources: [text](url)
- Include images where provided (logo, hero, tech images)
- Be opinionated — give your take on what this means for developers
- Every section must have real, substantial content

REQUIRED SECTIONS (## headings, ALL mandatory):

# {name} — Deep Dive | {human_date}

## Company Overview
## Latest News & Announcements
## Product & Technology Deep Dive
## GitHub & Open Source
## Getting Started — Code Examples
## Market Position & Competition
## Developer Impact
## What's Next
## Key Takeaways
## Resources & Links
"""
Enter fullscreen mode Exit fullscreen mode

2. User Prompt - Provides All Context:

user = f"""Write a deep-dive article about {name} for {human_date}.

Company topics: {topics}
{image_instructions}

=== REAL-TIME NEWS (searched today) ===
{news_text}

=== WEB SEARCH RESULTS ===
{web_text}

=== GITHUB SEARCH ===
{github_text}

=== TRACKED REPOS DATA ===
{repo_text}

=== SCRAPED ARTICLE CONTENT (from top sources) ===
{scraped_content[:8000]}

IMPORTANT: Write FULL article. 300+ lines minimum. Use ONLY data from above. Include images where instructed. Include code snippets."""
Enter fullscreen mode Exit fullscreen mode

Data Formatting Functions:

def _format_news(news: list[dict]) -> str:
    """Format news search results for prompt."""
    lines = []
    for n in news[:15]:  # Limit to top 15
        lines.append(f"- [{n['title']}]({n['url']})")
        if n.get("body"):
            lines.append(f"  {n['body'][:300]}")
        if n.get("date"):
            lines.append(f"  Date: {n['date']}")
        lines.append("")
    return "\n".join(lines)

def _format_github(github: list[dict]) -> str:
    """Format GitHub search results for prompt."""
    lines = []
    for g in github[:8]:  # Limit to top 8
        lines.append(f"- [{g['title']}]({g['url']})")
        lines.append(f"  {g.get('body', '')[:200]}")
    return "\n".join(lines)

def _format_tracked_repos(repos: list[dict]) -> str:
    """Format tracked repositories with release info."""
    lines = []
    for r in repos:
        release = r.get("latest_release")
        rel = f" — latest: {release['tag']}" if release else ""
        lines.append(
            f"- {r['label']} (⭐{r['stars']:,}){rel}"
            f"{r['description'][:150]} [{r['url']}]"
        )
    return "\n".join(lines)
Enter fullscreen mode Exit fullscreen mode

Fallback Mechanism:

If LLM generation fails or returns empty content, the service falls back to a templated article:

def _fallback_article(company, search_data, repos, images, 
                      human_date, date_str):
    """Generate a basic template article if LLM fails."""

    name = company["name"]
    topics = ", ".join(company["topics"])

    # Format available data
    news_bullets = "\n".join(
        f"- **{n['title']}** — {n.get('body', '')[:200]} [source]({n['url']})"
        for n in search_data.get("news", [])[:10]
    )

    web_bullets = "\n".join(
        f"- [{w['title']}]({w['url']})"
        for w in search_data.get("web", [])[:8]
    )

    repo_bullets = "\n".join(
        f"- **[{r['label']}]({r['url']})** ⭐ {r['stars']:,}"
        for r in repos[:10]
    )

    # Build template
    return f"""# {name} — Deep Dive | {human_date}
{logo_img}
> Daily deep dive into {name} — covering {topics}.

---

{hero_img}

## Latest News & Announcements

{news_bullets}

---

## Web Resources

{web_bullets}

---

## GitHub & Open Source

{repo_bullets}

---

## Key Takeaways

1. {name} continues to evolve in the AI/tech landscape
2. Monitor their open-source projects for updates
3. Check official channels for latest announcements

---

*Generated on {date_str} by [AI Tech Daily Agent](https://github.com/gautammanak1/ai-tech-daily-agent)*
"""
Enter fullscreen mode Exit fullscreen mode

This ensures the system always produces output, even when LLM services are unavailable or fail.

LLM Service (llm_service.py)

The LLM service provides a clean abstraction layer over LLM APIs.

Interface:

def call_llm(
    system: str,
    user: str,
    temperature: float = 0.7,
    max_tokens: int = 4000,
) -> str | None:
    """
    Call LLM API with system and user messages.
    Returns generated text or None on failure.
    """
Enter fullscreen mode Exit fullscreen mode

Implementation:

def call_llm(
    system: str,
    user: str,
    temperature: float = 0.7,
    max_tokens: int = 4000,
) -> str | None:
    try:
        # Get API key from environment
        api_key = os.getenv("OPENAI_API_KEY") or os.getenv("LLM_API_KEY")

        if not api_key:
            log.warning("No LLM API key found")
            return None

        # Make API call
        response = requests.post(
            "https://api.openai.com/v1/chat/completions",
            headers={
                "Authorization": f"Bearer {api_key}",
                "Content-Type": "application/json"
            },
            json={
                "model": "gpt-4-turbo-preview",
                "messages": [
                    {"role": "system", "content": system},
                    {"role": "user", "content": user}
                ],
                "temperature": temperature,
                "max_tokens": max_tokens
            },
            timeout=60
        )

        response.raise_for_status()
        data = response.json()

        # Extract generated content
        return data["choices"][0]["message"]["content"]

    except requests.RequestException as e:
        log.error(f"LLM API request failed: {e}")
        return None
    except (KeyError, IndexError) as e:
        log.error(f"LLM API response parsing failed: {e}")
        return None
Enter fullscreen mode Exit fullscreen mode

Configuration:

Environment variables for configuration:

  • OPENAI_API_KEY or LLM_API_KEY: API key for LLM service
  • LLM_MODEL: Model name (default: gpt-4-turbo-preview)
  • LLM_TIMEOUT: Request timeout in seconds (default: 60)

Error Handling:

The service handles various error scenarios:

  • Missing API key: Returns None
  • Network errors: Logs and returns None
  • Timeout errors: Logs and returns None
  • Malformed response: Logs and returns None
  • Rate limiting: Would need to be added with retry logic

Dev.to Service (devto_service.py)

This service handles publishing articles to the Dev.to platform.

Create Article:

def create_devto_article(
    title: str,
    body_markdown: str,
    tags: list[str],
    published: bool = True,
) -> str | None:
    """
    Create an article on Dev.to.
    Returns dev.to article ID or None on failure.
    """

    api_key = os.getenv("DEVTO_API_KEY")
    if not api_key:
        log.warning("No Dev.to API key")
        return None

    try:
        response = requests.post(
            "https://dev.to/api/articles",
            headers={
                "api-key": api_key,
                "Content-Type": "application/json"
            },
            json={
                "article": {
                    "title": title,
                    "body_markdown": body_markdown,
                    "published": published,
                    "tags": tags[:4]  # Dev.to limits to 4 tags
                }
            },
            timeout=30
        )

        response.raise_for_status()
        data = response.json()

        return data["id"]

    except Exception as e:
        log.error(f"Failed to create Dev.to article: {e}")
        return None
Enter fullscreen mode Exit fullscreen mode

Tag Handling:

Dev.to limits articles to 4 tags, so the service truncates:

tags = company["topics"] + ["ai", "technology"]
tags = tags[:4]  # Ensure max 4 tags
Enter fullscreen mode Exit fullscreen mode

Publish vs Draft:

The published parameter controls whether the article is immediately published (True) or saved as a draft (False). This is useful for:

  • Testing: Publish as drafts first
  • Review: Allow human approval before publishing
  • Automation: Direct publication in production

Publish Service (publish_service.py)

The publish service orchestrates the publishing pipeline.

Complete Publishing Flow:

def publish_article(
    article: str,
    company: dict,
    date: str,
    filename: str,
    dry_run: bool = False,
) -> dict:
    """
    Publish article to Dev.to and save locally.
    Returns metadata about published article.
    """

    result = {
        "local_path": None,
        "devto_id": None,
        "article_url": None,
        "published": False
    }

    # 1. Save locally
    articles_dir = Path("articles")
    articles_dir.mkdir(exist_ok=True)

    local_path = articles_dir / filename
    local_path.write_text(article)
    result["local_path"] = str(local_path)

    log.info(f"Article saved locally: {local_path}")

    # 2. Publish to Dev.to (if not dry run)
    if not dry_run:
        devto_id = create_devto_article(
            title=f"{company['name']} — Deep Dive",
            body_markdown=article,
            tags=company["topics"] + ["ai", "technology"],
            published=True
        )

        if devto_id:
            result["devto_id"] = devto_id
            username = os.getenv("DEVTO_USERNAME", "")
            result["article_url"] = f"https://dev.to/{username}/{filename.replace('.md', '')}"
            result["published"] = True

            log.info(f"Article published to Dev.to: {result['article_url']}")
        else:
            log.warning("Failed to publish to Dev.to")

    return result
Enter fullscreen mode Exit fullscreen mode

Return Value:

The service returns a dictionary with:

  • local_path: Path to saved markdown file
  • devto_id: Dev.to article ID (if published)
  • article_url: URL to published article (if published)
  • published: Boolean indicating success

Protocol Implementation

Chat Protocol (protocols/chat_proto.py)

The chat protocol enables interactive communication with the agent through the uAgents messaging system.

Protocol Setup:

from uagents import Protocol
from uagents_core.contrib.protocols.chat import (
    ChatMessage,
    ChatAcknowledgement,
    StartSessionContent,
    EndSessionContent,
    TextContent,
    chat_protocol_spec,
)

daily_chat_proto = Protocol(spec=chat_protocol_spec)
Enter fullscreen mode Exit fullscreen mode

Message Handling:

The protocol handles three main message types:

1. Session Start:

@daily_chat_proto.on_message(model=ChatMessage)
async def handle_message(ctx: Context, sender: str, msg: ChatMessage):
    for item in msg.content:
        if isinstance(item, StartSessionContent):
            await ctx.send(sender, _create_text(
                "Welcome to **AI & Tech Daily Agent**!\n\n"
                "I write daily deep-dive articles about AI/tech companies.\n\n"
                "Commands:\n"
                "- **generate** — pick a company and write today's article\n"
                "- **status** — check recent articles\n"
                "- **help** — show commands"
            ))
Enter fullscreen mode Exit fullscreen mode

2. Text Commands:

elif isinstance(item, TextContent):
    user_text = (item.text or "").strip().lower()

    # Generate article command
    if any(kw in user_text for kw in ["generate", "article", "news", "run", "start", "write"]):
        await ctx.send(sender, _create_text("Starting deep-dive pipeline... This takes 2-3 minutes."))
        try:
            result = await _run_pipeline(ctx)
            await ctx.send(sender, _create_text(f"Article published: {result}"))
        except Exception as e:
            await ctx.send(sender, _create_text(f"Pipeline failed: {e}"))

    # Status command
    elif "status" in user_text:
        history = get_history()
        recent = history[-5:]
        lines = "\n".join(
            f"- **{h['name']}** ({h['date']})" 
            for h in reversed(recent)
        )
        await ctx.send(sender, _create_text(f"Recent articles:\n{lines}\n\nTotal: {len(history)}"))

    # Help command
    elif "help" in user_text:
        await ctx.send(sender, _create_text(
            "**Commands:**\n"
            "- `generate` — write today's deep-dive article\n"
            "- `status` — check recent articles\n"
            "- `help` — this message"
        ))
Enter fullscreen mode Exit fullscreen mode

3. Acknowledgments:

async def _ack(ctx: Context, sender: str, msg: ChatMessage):
    await ctx.send(
        sender,
        ChatAcknowledgement(
            timestamp=datetime.now(timezone.utc),
            acknowledged_msg_id=msg.msg_id,
        ),
    )

@daily_chat_proto.on_message(model=ChatAcknowledgement)
async def handle_ack(ctx: Context, sender: str, msg: ChatAcknowledgement):
    ctx.logger.info(f"Ack from {sender} for {msg.acknowledged_msg_id}")
Enter fullscreen mode Exit fullscreen mode

Pipeline Trigger:

async def _run_pipeline(ctx: Context) -> str:
    from agent import run_pipeline
    ctx.logger.info("Starting pipeline...")
    dry_run = os.getenv("DRY_RUN", "false").lower() == "true"
    result = await asyncio.to_thread(run_pipeline, dry_run)
    return result
Enter fullscreen mode Exit fullscreen mode

Key Design Decisions:

  1. Async/Await: All message handlers are async to prevent blocking
  2. Pipeline in Thread: Run CPU-intensive pipeline in thread to not block message loop
  3. Flexible Command Matching: Uses any(kw in user_text for kw in [...]) for robust command detection
  4. Multiple Keywords: Each command has multiple trigger words (e.g., "generate", "article", "news" all trigger generation)
  5. Status Feedback: Sends progress updates (e.g., "Starting deep-dive pipeline...") to keep user informed

Agent Lifecycle

Initialization

The agent goes through several initialization steps:

1. Environment Setup:

import os
from dotenv import load_dotenv

load_dotenv()

# Required environment variables
AGENT_SEED = os.getenv("AGENT_SEED")
if not AGENT_SEED:
    raise ValueError("AGENT_SEED environment variable required")
Enter fullscreen mode Exit fullscreen mode

2. Agent Creation:

from uagents import Agent, Context

agent = Agent(
    name="ai-tech-daily-agent",
    port=8000,
    seed=AGENT_SEED,
    endpoint=["http://localhost:8000/submit"],
)

agent.include(daily_chat_proto)
Enter fullscreen mode Exit fullscreen mode

3. Configuration Loading:

# Load tracked companies
from config.sources import TRACKED_COMPANIES

# Ensure history file exists
HISTORY_FILE = "history.json"
if not os.path.exists(HISTORY_FILE):
    Path(HISTORY_FILE).write_text("[]")
Enter fullscreen mode Exit fullscreen mode

4. Registration (Optional):

# Register with Almanac contract (commented out in code)
# This would make the agent discoverable on Agentverse
Enter fullscreen mode Exit fullscreen mode

Running State

Once initialized, the agent enters its main loop:

Message Processing Loop (agent / protocol context — same visual language as Core Architecture):

High-level architecture: agent surface, services, and external systems

Flow in words: listen for incoming messages → parse type (ChatMessage, acknowledgements, session start/end) → route ChatMessage to text handlers (generate / status / help) → send responses → wait for the next message.

Pipeline Execution:

When generate is triggered, the same end-to-end pipeline as Workflow Pipeline runs:

Pipeline: company pick → research → LLM article → publish → history

Steps: log start → run_pipeline() → select company → search web/news → GitHub data → scrape → generate article → find images → publish → update history → log completion → return the result string to the user.

Error Handling

The agent includes comprehensive error handling:

Network Errors:

try:
    resp = requests.get(url, timeout=15)
    resp.raise_for_status()
except requests.RequestException as e:
    log.error(f"Request failed: {e}")
    return None  # Or fallback value
Enter fullscreen mode Exit fullscreen mode

LLM Failures:

result = call_llm(system, user)
if not result:
    log.warning("LLM failed, using fallback")
    result = _fallback_article(...)
Enter fullscreen mode Exit fullscreen mode

Timeouts:

All network operations have timeouts:

  • GitHub API: 10-15 seconds
  • Web scraping: 15 seconds
  • LLM API: 60 seconds
  • Dev.to API: 30 seconds

Graceful Degradation:

The system is designed to continue even when some components fail:

  • If search fails, use empty results
  • If LLM fails, use template
  • If publishing fails, save locally only
  • If images fail, proceed without them

Shutdown

The agent can be gracefully shutdown:

Clean Shutdown:

# SIGTERM handler
import signal

def shutdown(signum, frame):
    log.info("Shutting down agent...")
    # Save any pending state
    # Close connections
    sys.exit(0)

signal.signal(signal.SIGTERM, shutdown)
Enter fullscreen mode Exit fullscreen mode

Data Flow & Orchestration

Complete Data Flow Diagram

Illustrative data flow (view on GitHub):

Data sources through company picker, search, scraper, images, article, publish, history

Data Transformation Examples

1. Company Selection:

Input:
  History: [{name: "OpenAI", slug: "openai", date: "2026-04-15"}]
  Config: [
    {name: "OpenAI", slug: "openai", topics: ["llm"]},
    {name: "Anthropic", slug: "anthropic", topics: ["llm", "safety"]},
  ]

Process:
  - Filter recent (14 days): OpenAI filtered out
  - Candidates: ["Anthropic"]
  - Select: Anthropic

Output:
  {name: "Anthropic", slug: "anthropic", topics: ["llm", "safety"]}
Enter fullscreen mode Exit fullscreen mode

2. Search Results:

Input:
  Company: "Anthropic"
  Topics: ["llm", "safety"]

API Response (News):
  {
    "value": [
      {
        "name": "Anthropic releases Claude 3.5",
        "url": "https://example.com/claude",
        "snippet": "New model with...",
        "datePublished": "2026-04-16"
      }
    ]
  }

Transformed:
  {
    "title": "Anthropic releases Claude 3.5",
    "url": "https://example.com/claude",
    "body": "New model with...",
    "date": "2026-04-16"
  }
Enter fullscreen mode Exit fullscreen mode

3. GitHub Data:

Input:
  Tracked Repos: [{owner: "openai", repo: "gym", label: "OpenAI Gym"}]

API Response:
  {
    "html_url": "https://github.com/openai/gym",
    "description": "OpenAI Gym interface",
    "stargazers_count": 32156,
    "language": "Python",
    "pushed_at": "2026-04-15T10:30:00Z"
  }

Transformed:
  {
    "name": "openai/gym",
    "label": "OpenAI Gym",
    "url": "https://github.com/openai/gym",
    "description": "OpenAI Gym interface",
    "stars": 32156,
    "language": "Python",
    "updated_at": "2026-04-15T10:30:00Z",
    "latest_release": {...}
  }
Enter fullscreen mode Exit fullscreen mode

4. LLM Prompt:

Inputs:
  Company: {name: "Anthropic", slug: "anthropic", topics: [...]}
  News: [formatted_news_text]
  Web: [formatted_web_text]
  GitHub: [formatted_github_text]
  Repos: [formatted_repos_text]
  Scraped: [scraped_article_content]

Constructed Prompt:
  System: "You are a senior tech journalist..."

  User: """
    Write a deep-dive article about Anthropic for Friday, April 16, 2026.

    Company topics: llm, safety

    === REAL-TIME NEWS ===
    - [Anthropic releases Claude 3.5](https://example.com)
      New model with...
      Date: 2026-04-16

    === WEB SEARCH RESULTS ===
    [Additional formatted content]

    ...

    IMPORTANT: Write FULL article. 300+ lines minimum.
  """

LLM Output:
  # Anthropic — Deep Dive | Friday, April 16, 2026

  ## Company Overview
  [300+ lines of generated content]
Enter fullscreen mode Exit fullscreen mode

Orchestration Patterns

1. Sequential Pipeline:

def run_pipeline(dry_run: bool = False) -> str:
    # Step 1: Must complete before step 2
    company = select_company(history, TRACKED_COMPANIES)

    # Step 2: Parallel execution
    with ThreadPoolExecutor(max_workers=3) as executor:
        news_future = executor.submit(search_news, company)
        web_future = executor.submit(search_web, company)
        github_future = executor.submit(search_github, company)

        search_data = {
            "news": news_future.result(),
            "web": web_future.result(),
            "github": github_future.result(),
        }

    # Step 3: Depends on search data
    github_repos = get_all_repos()
    scraped_content = scrape_and_read(search_data)

    # Step 4: Depends on all previous
    article, filename = generate_article(
        company, search_data, scraped_content, github_repos, images
    )

    # Step 5: Final publishing
    result = publish_article(article, company, date, filename, dry_run)

    return result["article_url"] or result["local_path"]
Enter fullscreen mode Exit fullscreen mode

2. Error Recovery:

try:
    article = generate_article(...)
except Exception as e:
    log.error(f"Article generation failed: {e}")
    # Fallback: Use template
    article = _fallback_article(company, search_data, repos, images)

# Continue pipeline regardless of success
result = publish_article(article, ...)
Enter fullscreen mode Exit fullscreen mode

3. State Accumulation:

# Pipeline accumulates state at each step
state = {
    "company": None,
    "search_data": {},
    "github_repos": [],
    "scraped_content": "",
    "article": "",
    "images": {},
    "published": False,
    "url": None,
}

state["company"] = select_company(history, companies)
state["search_data"] = perform_search(state["company"])
state["github_repos"] = get_all_repos()
state["scraped_content"] = scrape_content(state["search_data"])
state["images"] = find_images(state["company"])
state["article"] = generate_article(**state)
state["url"] = publish(state["article"], state["company"])

return state["url"]
Enter fullscreen mode Exit fullscreen mode

Code Analysis: Key Patterns

1. Configuration Pattern

Pattern: Externalize configuration to separate files and environment variables.

Implementation:

# config/sources.py
TRACKED_COMPANIES = [
    {
        "name": "OpenAI",
        "slug": "openai",
        "topics": ["llm", "generative-ai", "gpt"],
    },
    # ... more companies
]

TRACKED_FRAMEWORK_REPOS = [
    {
        "owner": "openai",
        "repo": "gym",
        "label": "OpenAI Gym"
    },
    # ... more repos
]
Enter fullscreen mode Exit fullscreen mode

Benefits:

  • Easy to update without code changes
  • Environment-specific configurations possible
  • Clear separation of config and logic

2. Service Layer Pattern

Pattern: Encapsulate external API interactions in dedicated service modules.

Implementation:

# services/github_service.py
def get_framework_updates() -> list[dict]:
    """Get latest release/activity for tracked framework repos."""
    # GitHub API interaction logic here
    pass

# services/web_search_service.py
def search_news(company: str) -> list[dict]:
    """Search for recent news about the company."""
    # Search API interaction logic here
    pass
Enter fullscreen mode Exit fullscreen mode

Benefits:

  • Easy to mock for testing
  • Clear API abstraction
  • Reusable across different contexts
  • Consistent error handling

3. Fallback Pattern

Pattern: Always have a fallback when primary operation might fail.

Implementation:

def generate_article(...) -> tuple[str, str]:
    result = call_llm(system, user, ...)

    if result:
        # Success: use LLM-generated content
        if "AI Tech Daily Agent" not in result:
            result += attribution_footer
        return result, filename
    else:
        # Fallback: use template
        log.info("Used fallback template")
        return _fallback_article(...), filename
Enter fullscreen mode Exit fullscreen mode

Benefits:

  • System continues working even when services fail
  • Graceful degradation
  • Better user experience
  • Easier debugging (can see what failed)

4. Parallel Execution Pattern

Pattern: Run independent operations in parallel for performance.

Implementation:

import concurrent.futures

def run_pipeline(dry_run: bool = False) -> str:
    # Run searches in parallel
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        news_future = executor.submit(
            search_news, company["name"], company["topics"]
        )
        web_future = executor.submit(
            search_web, company["name"]
        )
        github_future = executor.submit(
            search_github, company["name"]
        )

        search_data = {
            "news": news_future.result(),
            "web": web_future.result(),
            "github": github_future.result(),
        }
Enter fullscreen mode Exit fullscreen mode

Benefits:

  • Faster execution (3x speedup for 3 parallel calls)
  • Better resource utilization
  • Reduced total pipeline time

5. Logging Pattern

Pattern: Comprehensive logging at different levels for debugging and monitoring.

Implementation:

import logging

log = logging.getLogger("github")

def get_framework_updates() -> list[dict]:
    log.info("Fetching tracked framework repos...")

    for repo_info in TRACKED_FRAMEWORK_REPOS:
        try:
            resp = requests.get(...)
            log.debug(f"Successfully fetched {owner}/{repo}")
        except Exception as e:
            log.warning(f"Failed to fetch {owner}/{repo}: {e}")

    log.info(f"Tracked {len(results)}/{len(TRACKED_FRAMEWORK_REPOS)} framework repos")
    return results
Enter fullscreen mode Exit fullscreen mode

Benefits:

  • Easy debugging
  • Performance monitoring
  • Error tracking
  • Audit trail

6. Data Transformation Pattern

Pattern: Transform external API data to internal standardized format.

Implementation:

def _format_news(news: list[dict]) -> str:
    """Format search results for LLM prompt."""
    lines = []
    for n in news[:15]:  # Limit and select
        title = n['title']
        url = n['url']
        body = n.get('body', '')[:300]  # Truncate
        date = n.get('date', '')

        lines.append(f"- [{title}]({url})")
        if body:
            lines.append(f"  {body}")
        if date:
            lines.append(f"  Date: {date}")
        lines.append("")

    return "\n".join(lines)
Enter fullscreen mode Exit fullscreen mode

Benefits:

  • Consistent data format across services
  • Easy to change output format
  • Centralized formatting logic
  • Promotes reusability

7. Async/Await Pattern

Pattern: Use async operations to prevent blocking the main loop.

Implementation:

async def handle_message(ctx: Context, sender: str, msg: ChatMessage):
    # Quick acknowledgment
    await _ack(ctx, sender, msg)

    # Process content
    for item in msg.content:
        if isinstance(item, TextContent):
            # Long-running operation in thread
            result = await asyncio.to_thread(run_pipeline, dry_run)
            await ctx.send(sender, _create_text(result))
Enter fullscreen mode Exit fullscreen mode

Benefits:

  • Non-blocking message handling
  • Can handle multiple concurrent requests
  • Better resource utilization
  • Responsive user experience

Deployment & Infrastructure

Local Development Setup

Prerequisites:

  • Python 3.11+
  • uv package manager
  • API keys for external services

Installation Steps:

# Clone repository
git clone https://github.com/gautammanak1/ai-tech-daily-agent.git
cd ai-tech-daily-agent

# Install dependencies with uv
uv sync

# Create .env file
cat > .env << EOF
AGENT_SEED=your-seed-phrase-here
OPENAI_API_KEY=sk-your-openai-key
GITHUB_TOKEN=ghp-your-github-token
DEVTO_API_KEY=your-devto-api-key
DEVTO_USERNAME=your-username
DRY_RUN=false
EOF

# Run agent
python agent.py
Enter fullscreen mode Exit fullscreen mode

Configuration Files:

.env (Environment Variables):

# Agent Configuration
AGENT_SEED=recovery_seed_phrase
PORT=8000

# API Keys
OPENAI_API_KEY=sk-...
LLM_API_KEY=sk-...
GITHUB_TOKEN=ghp_...
DEVTO_API_KEY=...

# Dev.to Configuration
DEVTO_USERNAME=your_username

# Pipeline Configuration
DRY_RUN=false
MAX_ARTICLES_PER_DAY=1
Enter fullscreen mode Exit fullscreen mode

pyproject.toml (Dependencies):

[project]
name = "ai-tech-daily-agent"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
    "uagents",
    "requests",
    "python-dotenv",
]

[tool.uv]
dev-dependencies = []
Enter fullscreen mode Exit fullscreen mode

Production Deployment

Option 1: Agentverse Hosted Agent

Deploy on Fetch.ai's hosted platform:

# Build and deploy to Agentverse
agentverse deploy ai-tech-daily-agent

# Or use the CLI
agent register \
  --name "AI Tech Daily Agent" \
  --endpoint "https://your-endpoint.com" \
  --protocols "chat"
Enter fullscreen mode Exit fullscreen mode

Option 2: Self-Hosted on Cloud

Deploy to AWS, GCP, or Azure:

Dockerfile:

FROM python:3.11-slim

WORKDIR /app

# Copy project files
COPY pyproject.toml uv.lock ./
COPY config ./config
COPY protocols ./protocols
COPY services ./services
COPY agent.py .

# Install dependencies
RUN pip install uv
RUN uv sync --frozen

# Create directories
RUN mkdir -p articles

# Set environment
ENV PYTHONUNBUFFERED=1

# Expose port
EXPOSE 8000

# Run agent
CMD ["python", "agent.py"]
Enter fullscreen mode Exit fullscreen mode

docker-compose.yml:

version: '3.8'

services:
  agent:
    build: .
    ports:
      - "8000:8000"
    environment:
      - AGENT_SEED=${AGENT_SEED}
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - GITHUB_TOKEN=${GITHUB_TOKEN}
      - DEVTO_API_KEY=${DEVTO_API_KEY}
      - DEVTO_USERNAME=${DEVTO_USERNAME}
      - DRY_RUN=false
    volumes:
      - ./articles:/app/articles
      - ./history.json:/app/history.json
    restart: unless-stopped
Enter fullscreen mode Exit fullscreen mode

Deploy Commands:

# Build and start
docker-compose up -d

# Check logs
docker-compose logs -f

# Stop
docker-compose down
Enter fullscreen mode Exit fullscreen mode

Option 3: Serverless (AWS Lambda)

For event-driven generation:

# lambda_handler.py
import json
from agent import run_pipeline

def lambda_handler(event, context):
    # Parse event (e.g., CloudWatch Scheduler)
    command = event.get("command", "generate")

    if command == "generate":
        result = run_pipeline(dry_run=False)
        return {
            "statusCode": 200,
            "body": json.dumps({"url": result})
        }

    return {
        "statusCode": 400,
        "body": json.dumps({"error": "Unknown command"})
    }
Enter fullscreen mode Exit fullscreen mode

CloudWatch Scheduler Rule:

aws events put-rule \
  --name "daily-article-generation" \
  --schedule-expression "cron(0 9 * * ? *)"

aws lambda add-permission \
  --function-name ai-tech-daily-agent \
  --statement-id daily-schedule \
  --action lambda:InvokeFunction \
  --principal events.amazonaws.com \
  --source-arn arn:aws:events:region:account:rule/daily-article-generation
Enter fullscreen mode Exit fullscreen mode

Monitoring & Observability

Logging:

Configure logging for production:

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('agent.log'),
        logging.StreamHandler()
    ]
)

# Service-specific loggers
github_log = logging.getLogger("github")
github_log.setLevel(logging.DEBUG)  # More verbose for GitHub API

article_log = logging.getLogger("article")
article_log.setLevel(logging.INFO)
Enter fullscreen mode Exit fullscreen mode

Metrics to Track:

  1. Pipeline Duration:
import time

start = time.time()
result = run_pipeline(dry_run=False)
duration = time.time() - start
log.info(f"Pipeline completed in {duration:.2f} seconds")
Enter fullscreen mode Exit fullscreen mode
  1. Article Quality:
article_lines = len(article.splitlines())
log.info(f"Generated {article_lines} lines (target: 300+)")
Enter fullscreen mode Exit fullscreen mode
  1. API Call Counts:
api_calls = {
    "github": len(github_repos),
    "search": len(search_data["news"]) + len(search_data["web"]),
    "llm": 1,
    "devto": 1 if published else 0
}
log.info(f"API calls: {api_calls}")
Enter fullscreen mode Exit fullscreen mode
  1. Error Rates:
errors = {
    "github_failures": github_failures,
    "llm_fallbacks": llm_fallbacks,
    "publish_failures": publish_failures
}
log.warning(f"Errors detected: {errors}")
Enter fullscreen mode Exit fullscreen mode

Health Checks:

Implement health check endpoint:

from flask import Flask

app = Flask(__name__)

@app.route('/health')
def health():
    status = {
        "agent": "running",
        "last_run": last_run_time,
        "success_rate": calculate_success_rate(),
        "apis": check_api_health()
    }

    if all(s["healthy"] for s in status["apis"].values()):
        return jsonify({"status": "healthy", **status}), 200
    else:
        return jsonify({"status": "degraded", **status}), 503
Enter fullscreen mode Exit fullscreen mode

Technical Challenges & Solutions

Challenge 1: Handling Unreliable APIs

Problem: External APIs (GitHub, Search, LLM) can be slow, fail, or return unexpected data.

Solution: Comprehensive error handling and fallbacks

def get_framework_updates() -> list[dict]:
    results = []

    for repo_info in TRACKED_FRAMEWORK_REPOS:
        try:
            # Set timeout
            resp = requests.get(url, headers=headers, timeout=10)
            resp.raise_for_status()  # Raise on 4xx/5xx

            # Parse response
            data = resp.json()

            # Build result
            results.append({...})

        except requests.Timeout:
            log.warning(f"Timeout fetching {repo}: timeout")
            continue  # Skip this repo, don't crash

        except requests.HTTPError as e:
            log.warning(f"HTTP error fetching {repo}: {e}")
            continue

        except (KeyError, ValueError) as e:
            log.warning(f"Parse error fetching {repo}: {e}")
            continue

        except Exception as e:
            log.error(f"Unexpected error fetching {repo}: {e}")
            continue

    # Return whatever succeeded
    return results
Enter fullscreen mode Exit fullscreen mode

Best Practices:

  • Always use timeouts
  • Catch specific exceptions
  • Log errors with context
  • Continue processing despite failures
  • Provide fallback results

Challenge 2: Managing Rate Limits

Problem: APIs (especially GitHub) have rate limits that can be exceeded.

Solution: Rate limiting and caching

import time
from functools import wraps

def rate_limit(calls_per_second: int):
    """Decorator to rate limit function calls."""
    min_interval = 1.0 / calls_per_second
    last_called = [0.0]

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_called[0]
            left_to_wait = min_interval - elapsed

            if left_to_wait > 0:
                time.sleep(left_to_wait)

            last_called[0] = time.time()
            return func(*args, **kwargs)

        return wrapper
    return decorator

@rate_limit(calls_per_second=5)
def github_api_call(url: str):
    resp = requests.get(url, headers=headers, timeout=10)
    return resp.json()
Enter fullscreen mode Exit fullscreen mode

Additional Strategies:

  1. Caching: Cache GitHub repo data for 1 hour
  2. Batch Requests: Fetch multiple repos in one call when possible
  3. Prioritize: Track critical repos, deprioritize others
  4. Exponential Backoff: Retry with increasing delays
import time
from random import uniform

def with_backoff(max_retries=3, base_delay=1):
    """Retry with exponential backoff."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise

                    delay = base_delay * (2 ** attempt) + uniform(0, 1)
                    log.warning(f"Attempt {attempt + 1} failed, retrying in {delay:.2f}s")
                    time.sleep(delay)
        return wrapper
    return decorator
Enter fullscreen mode Exit fullscreen mode

Challenge 3: Ensuring Article Quality

Problem: LLMs may generate low-quality, short, or inaccurate content.

Solution: Multi-layered quality checks and constraints

def generate_article(...) -> tuple[str, str]:
    result = call_llm(system, user, temperature=0.7, max_tokens=8000)

    # Quality check 1: Line count
    if result and len(result.splitlines()) < 200:
        log.warning(f"Article too short: {len(result.splitlines())} lines, retrying with lower temp")
        result = call_llm(system, user, temperature=0.9, max_tokens=8000)

    # Quality check 2: Required sections
    required_sections = [
        "Company Overview",
        "Latest News",
        "Product & Technology",
        "GitHub & Open Source",
        "Key Takeaways"
    ]

    if result:
        missing = [s for s in required_sections if s not in result]
        if missing:
            log.warning(f"Missing sections: {missing}")

    # Quality check 3: Data accuracy (has company name)
    if result:
        if company["name"].lower() not in result.lower():
            log.warning("Company name not found in article")

    # Fallback if all checks fail
    if not result or len(result.splitlines()) < 100:
        log.info("Using fallback article")
        result = _fallback_article(...)

    return result, filename
Enter fullscreen mode Exit fullscreen mode

Quality Metrics Tracked:

  1. Line count (>300 target)
  2. Section coverage (all required sections present)
  3. Source inclusion (links to sources)
  4. Code snippet presence (2-3 required)
  5. Image inclusion (if provided)
  6. Factual accuracy (company name mentioned)

Challenge 4: Managing State & History

Problem: Tracking what has been covered to avoid repetition.

Solution: Persistent history file

HISTORY_FILE = "history.json"

def get_history() -> list[dict]:
    """Load coverage history from file."""
    try:
        if os.path.exists(HISTORY_FILE):
            content = Path(HISTORY_FILE).read_text()
            return json.loads(content)
    except Exception as e:
        log.error(f"Failed to load history: {e}")

    return []

def update_history(company: dict, url: str, devto_id: str = None):
    """Append new article to history."""
    history = get_history()

    entry = {
        "name": company["name"],
        "slug": company["slug"],
        "date": datetime.now().isoformat(),
        "article_url": url,
        "devto_id": devto_id
    }

    history.append(entry)

    # Keep last 365 entries (1 year)
    if len(history) > 365:
        history = history[-365:]

    # Atomic write
    Path(HISTORY_FILE).write_text(json.dumps(history, indent=2))

def select_company(history: list[dict], companies: list[dict]) -> dict:
    """Select company that hasn't been covered recently."""
    cutoff = (datetime.now() - timedelta(days=14)).isoformat()
    recent_slugs = {h["slug"] for h in history if h["date"] >= cutoff}

    candidates = [c for c in companies if c["slug"] not in recent_slugs]

    if not candidates:
        log.warning("No candidates after 14-day filter, using default")
        return companies[0]

    return random.choice(candidates)
Enter fullscreen mode Exit fullscreen mode

Design Considerations:

  • JSON format for human readability
  • Atomic write operations (avoid corruption)
  • Size limiting (keep last year only)
  • Easy to inspect and edit manually
  • Simple slug-based lookup

Challenge 5: Image Discovery

Problem: Finding relevant, high-quality images for articles.

Solution: Multiple search strategies and fallbacks

def find_images(company: dict) -> dict[str, str]:
    """Search for company images: logo, hero, banner."""
    images = {}
    name = company["name"]

    # Strategy 1: Official website
    try:
        logo_url = _find_logo_on_website(name)
        if logo_url:
            images["logo"] = logo_url
    except Exception as e:
        log.debug(f"Failed to find website logo: {e}")

    # Strategy 2: Image search API
    if "logo" not in images:
        try:
            results = image_search(f"{name} official logo")
            if results:
                images["logo"] = results[0]["url"]
        except Exception as e:
            log.debug(f"Image search failed: {e}")

    # Strategy 3: Hero image
    try:
        results = image_search(f"{name} technology platform")
        if results:
            images["hero"] = results[0]["url"]
    except Exception:
        pass

    # Strategy 4: Technology/Architecture image
    try:
        results = image_search(f"{name} architecture diagram")
        if results:
            images["banner"] = results[0]["url"]
    except Exception:
        pass

    log.info(f"Found images: {list(images.keys())}")
    return images

def _find_logo_on_website(company_name: str) -> str | None:
    """Try to find logo on company's official website."""
    # Search for official website
    web_results = search_web(f"{company_name} official website")

    if not web_results:
        return None

    homepage_url = web_results[0]["url"]

    # Parse HTML for common logo patterns
    try:
        resp = requests.get(homepage_url, timeout=10)
        soup = BeautifulSoup(resp.text, 'html.parser')

        # Look for common logo patterns
        logo_patterns = [
            'img[alt*="logo"]',
            'img[alt*="Logo"]',
            '.logo img',
            '#logo img',
            'img[src*="logo"]',
        ]

        for selector in logo_patterns:
            logo = soup.select_one(selector)
            if logo and logo.get('src'):
                # Make URL absolute
                logo_url = urljoin(homepage_url, logo['src'])
                return logo_url

    except Exception:
        pass

    return None
Enter fullscreen mode Exit fullscreen mode

Future Enhancements

Short-Term Improvements (Next 1-3 months)

1. Enhanced Error Recovery

# Retry mechanism with circuit breaker
class CircuitBreaker:
    def __init__(self, max_failures=5, timeout=60):
        self.failures = 0
        self.max_failures = max_failures
        self.timeout = timeout
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half-open

    def call(self, func, *args, **kwargs):
        if self.state == "open":
            if time.time() - self.last_failure_time > self.timeout:
                self.state = "half-open"
            else:
                raise CircuitBreakerOpenError()

        try:
            result = func(*args, **kwargs)
            if self.state == "half-open":
                self.state = "closed"
                self.failures = 0
            return result
        except Exception as e:
            self.failures += 1
            self.last_failure_time = time.time()
            if self.failures >= self.max_failures:
                self.state = "open"
            raise
Enter fullscreen mode Exit fullscreen mode

2. Multi-Model LLM Support

SUPPORTED_MODELS = {
    "openai": {
        "api_key": "OPENAI_API_KEY",
        "endpoint": "https://api.openai.com/v1/chat/completions",
        "model": "gpt-4-turbo-preview"
    },
    "anthropic": {
        "api_key": "ANTHROPIC_API_KEY",
        "endpoint": "https://api.anthropic.com/v1/messages",
        "model": "claude-3-opus-20240229"
    },
    "cohere": {
        "api_key": "COHERE_API_KEY",
        "endpoint": "https://api.cohere.ai/v1/generate",
        "model": "command"
    }
}

def call_llm(system: str, user: str, 
             model_provider: str = "openai") -> str:
    provider = SUPPORTED_MODELS[model_provider]
    # Provider-specific implementation
    pass
Enter fullscreen mode Exit fullscreen mode

3. Article Analytics Dashboard

# Simple Flask dashboard
@app.route('/dashboard')
def dashboard():
    history = get_history()

    stats = {
        "total_articles": len(history),
        "companies_covered": len(set(h["slug"] for h in history)),
        "last_7_days": [h for h in history if is_last_7_days(h["date"])],
        "top_topics": get_top_topics(history),
        "avg_article_length": get_avg_article_length(history),
    }

    return render_template('dashboard.html', stats=stats)
Enter fullscreen mode Exit fullscreen mode

Medium-Term Enhancements (3-6 months)

1. Multi-Platform Publishing

# Support for multiple platforms
PLATFORMS = {
    "devto": DevToPublisher,
    "medium": MediumPublisher,
    "hashnode": HashnodePublisher,
    "linkedin": LinkedInPublisher,
}

def publish_to_all(article: str, company: dict):
    results = {}

    for platform_name, publisher_class in PLATFORMS.items():
        try:
            publisher = publisher_class()
            url = publisher.publish(article, company)
            results[platform_name] = {"status": "success", "url": url}
        except Exception as e:
            results[platform_name] = {"status": "error", "message": str(e)}

    return results
Enter fullscreen mode Exit fullscreen mode

2. Custom Article Templates

# Template system for different article styles
ARTICLE_TEMPLATES = {
    "deep_dive": DeepDiveTemplate,
    "news_brief": NewsBriefTemplate,
    "tutorial": TutorialTemplate,
    "interview": InterviewTemplate,
}

class ArticleTemplate:
    def generate_prompt(self, company: dict, data: dict) -> str:
        raise NotImplementedError

    def validate_article(self, article: str) -> bool:
        raise NotImplementedError

class DeepDiveTemplate(ArticleTemplate):
    sections = [
        "Company Overview",
        "Latest News",
        "Product Deep Dive",
        "GitHub Analysis",
        "Code Examples",
        "Market Position",
        "Developer Impact",
        "What's Next",
        "Key Takeaways"
    ]

    def generate_prompt(self, company, data):
        # Custom prompt for deep dive style
        pass
Enter fullscreen mode Exit fullscreen mode

3. Sentiment & Trend Analysis

def analyze_company_sentiment(company: str) -> dict:
    """Analyze sentiment around the company."""

    # Collect recent mentions
    news = search_news(company)
    reddit = search_reddit(company)
    twitter = search_twitter(company)

    sentiment_scores = []

    for mention in news + reddit + twitter:
        score = analyze_sentiment(mention["text"])
        sentiment_scores.append(score)

    return {
        "average": np.mean(sentiment_scores),
        "trend": calculate_trend(sentiment_scores),
        "confidence": len(sentiment_scores) / 10  # More data = higher confidence
    }
Enter fullscreen mode Exit fullscreen mode

Long-Term Enhancements (6-12 months)

1. Autonomous Research Agent

class ResearchAgent:
    """Agent that can research and cross-reference information independently."""

    async def research_company(self, company: str) -> dict:
        # Multi-step research
        background = await self.get_background(company)
        competitors = await self.find_competitors(company)
        market_data = await self.analyze_market(company)
        technical_docs = await self.read_documentation(company)

        # Cross-reference and validate
        validated = self.cross_validate({
            "background": background,
            "competitors": competitors,
            "market": market_data,
            "docs": technical_docs
        })

        return validated

    async def cross_validate(self, research: dict) -> dict:
        """Find contradictions and validate facts."""
        # AI-powered validation
        pass
Enter fullscreen mode Exit fullscreen mode

2. Interactive Chatbot Mode

class ChatbotMode:
    """Interactive mode where users can ask questions about companies."""

    async def handle_query(self, query: str) -> str:
        # Parse query
        intent = parse_intent(query)
        company = extract_company(query)

        # Fetch relevant data
        if intent == "comparison":
            results = await self.compare_companies(company)
        elif intent == "news":
            results = await self.get_latest_news(company)
        elif intent == "analysis":
            results = await self.analyze_company(company)

        # Generate response
        return self.format_response(results)
Enter fullscreen mode Exit fullscreen mode

3. Community Features

# User-generated content and collaboration
class CommunityFeatures:

    def submit_company(self, user: str, company: dict):
        """Allow users to suggest companies to cover."""
        suggestions = load_suggestions()
        suggestions.append({
            "user": user,
            "company": company,
            "votes": 0,
            "submitted_at": datetime.now().isoformat()
        })
        save_suggestions(suggestions)

    def vote_company(self, suggestion_id: str, user: str):
        """Vote for suggested companies."""
        suggestions = load_suggestions()
        for s in suggestions:
            if s["id"] == suggestion_id:
                s["votes"] += 1
        save_suggestions(suggestions)

    def get_top_suggestions(self, limit: int = 10) -> list[dict]:
        """Get most-voted company suggestions."""
        suggestions = load_suggestions()
        return sorted(suggestions, key=lambda x: x["votes"], reverse=True)[:limit]
Enter fullscreen mode Exit fullscreen mode

4. Multi-Language Support

LANGUAGES = {
    "en": "English",
    "es": "Spanish",
    "fr": "French",
    "de": "German",
    "zh": "Chinese",
    "ja": "Japanese",
}

def generate_article_multilingual(
    company: dict,
    data: dict,
    language: str = "en"
) -> tuple[str, str]:
    """Generate article in specified language."""

    if language not in LANGUAGES:
        raise ValueError(f"Unsupported language: {language}")

    system = f"""
    You are a tech journalist writing in {LANGUAGES[language]}.
    Write about {company['name']} in {LANGUAGES[language]}.
    """

    article = call_llm(system, build_prompt(data))
    return translate_metadata(article, language)
Enter fullscreen mode Exit fullscreen mode

Conclusion

The AI Tech Daily Agent represents a sophisticated implementation of autonomous AI agent architecture, successfully combining multiple advanced technologies into a cohesive, production-ready system. This project demonstrates the power of multi-agent systems to automate complex, multi-step workflows that traditionally require significant human effort.

Key Architectural Achievements

1. Modular Service Architecture
The system's clean separation of concerns, with dedicated services for each external API and functionality, makes it maintainable, testable, and extensible. The service layer pattern provides clear abstractions over complex external systems.

2. Robust Error Handling
Comprehensive error handling at every level, from network timeouts to LLM failures, ensures the system continues operating even when components fail. The fallback mechanisms prevent total system failure and provide graceful degradation.

3. Pipeline-based Orchestration
The clear pipeline pattern transforms data through defined stages, making the workflow easy to understand, debug, and optimize. Each stage has well-defined inputs and outputs, enabling modular testing and independent optimization.

4. Real-time Data Integration
The system successfully integrates multiple real-time data sources—news APIs, GitHub, web search, and content scraping—into a unified context that powers intelligent content generation.

5. Quality Assurance
Multi-layered quality checks ensure article length, structure, and content quality before publishing. The validation mechanism catches common LLM failure modes and triggers fallbacks.

Technical Excellence

Code Quality:

  • Clear, readable code with comprehensive logging
  • Consistent error handling patterns
  • Type hints and docstrings
  • DRY (Don't Repeat Yourself) principles

Best Practices:

  • Configuration externalization
  • Dependency injection
  • Async/await for non-blocking operations
  • Comprehensive testing strategies
  • Production-ready deployment options

Scalability:

  • Parallel execution where beneficial
  • Rate limiting and backoff strategies
  • Efficient data transformation
  • Resource-conscious design (timeouts, limiting)

Impact & Applications

This agent demonstrates how AI agents can:

  • Automate content creation at scale with quality
  • Integrate multiple services into cohesive workflows
  • Make intelligent decisions based on real-time data
  • Maintain context and state across operations
  • Handle failures gracefully in production

The patterns and architecture used here are applicable to many other domains:

  • Financial research and reporting
  • Market analysis and newsletters
  • Competitive intelligence gathering
  • Technical documentation generation
  • Automated journalism and reporting

Future of AI Agents

The AI Tech Daily Agent is a glimpse into the future of autonomous AI systems. As LLMs and agent frameworks continue to evolve, we can expect:

  1. More Sophisticated Reasoning: Agents that can plan, adapt, and solve problems more autonomously
  2. Better Tool Use: More reliable and comprehensive integration with external APIs and tools
  3. Improved Collaboration: Multi-agent systems where specialized agents work together
  4. Enhanced Reliability: Better error handling, validation, and trustworthiness
  5. Richer Interactions: More natural and sophisticated human-AI collaboration

This project serves as both a working implementation and an architectural reference for building production-grade AI agent systems. The lessons learned—from error handling to API integration to prompt engineering—are valuable for anyone building with autonomous agents.


Resources & References

Project:

Technologies:

Related Concepts:

API Documentation:

Architecture Patterns:


This comprehensive deep dive was written to provide a complete understanding of the AI Tech Daily Agent architecture, from high-level design to implementation details. It serves as both documentation for developers and a case study in autonomous AI agent development.

Generated on April 16, 2026


Source: PROJECT_DEEP_DIVE.md on GitHubAI Tech Daily Agent.

Top comments (0)