DEV Community

Alain Airom
Alain Airom

Posted on

Scaling Output, Not Headcount: The Business Case for AI-Driven Development

How AI-Driven development helps (helped me 😉) to gain in productivity!

Introduction

AI-driven code development is fundamentally shifting the software engineering paradigm, moving away from simple “automated typing” toward the creation of high-level, reliable application frameworks. The true value-add lies in leveraging LLMs and agentic tools to orchestrate complex architectural patterns — such as autonomous agents that handle data retrieval or API integration — which serve as a robust skeleton for the entire application. By automating the scaffolding, boilerplate, and initial testing layers, these tools significantly reduce the cognitive load on developers, allowing them to bypass the friction of repetitive setup.

This strategic shift empowers engineering teams to focus their expertise on high-value logic, creative problem-solving, and advanced system optimization. Consequently, applications move through the testing and validation phases with greater structural integrity, reaching production readiness much faster. Ultimately, this accelerated velocity doesn’t just benefit the development team; it ensures a shorter time-to-market for innovative features, directly translating into a more responsive and satisfying experience for the end user.


From Vision to Velocity: A December Sprint

Last December, our team faced a high-stakes challenge: deliver a high-impact demonstration of watsonx Orchestrate under a compressed timeline. The goal was to build a sophisticated, autonomous agent capable of bridging the gap between static enterprise data and actionable AI.

Specifically, the agent needed to navigate an enterprise SharePoint repository, extract deep insights from complex PDF, Excel, and Word files, and seamlessly feed that data into a Retrieval-Augmented Generation (RAG) pipeline. This project served as the perfect theater to showcase a powerful end-to-end technical stack:

  • watsonx Orchestrate & the ADK: Used to build ad-hoc agents that don’t just follow scripts but understand intent and orchestrate complex workflows.
  • Docling: Our go-to tool for document processing, ensuring that even the most stubborn layouts in PDFs and spreadsheets were converted into clean, RAG-ready data.
  • Milvus: A common vector database used for RAG purpose.
  • The “Human in the Loop”: Featuring Bob, our developer who leveraged these AI-driven tools to build, test, and deploy the entire application from A to Z in record time.

This sprint proved that when you combine a robust agentic framework with the right processing tools, you shift the developer’s role from “troubleshooter” to “architect,” resulting in a production-ready solution that exceeds user expectations.

The Acceleratoor “The Strategic Shift: From Scaffolding to Orchestration”

In traditional development, a developer might spend days writing the “glue code” required to connect a data source like SharePoint to an application. This includes handling OAuth flows, managing API rate limits, and structuring the data ingestion pipeline for a RAG (Retrieval-Augmented Generation) system.

By using watsonx Orchestrate, the focus shifts. Instead of writing the glue, Bob uses the Agent Development Kit (ADK) to define the intent. He configures a specialized agent to navigate the SharePoint directory, extract relevant documents, and feed them into a vector database.


OKay… But Practically What Was Done?

Once I gave my prompt to Bob and precised which sites and tools to use, and after authorizing access to the Docling and ADK (agent developer kit) documentation, Bob used a ‘curl’ process to ingest the technical specifications. This enabled the automated generation of the full project, ensuring that the resulting agents were built according to the very latest architectural standards.

watsonx_sharepoint_tool/
├── main.py                          # Main entry point
├── requirements.txt                 # Python dependencies
├── .env.example                     # Example environment variables
├── config/
│   └── config.yaml                  # Configuration file
├── src/
│   ├── orchestrator.py              # Main orchestration logic
│   ├── sharepoint/
│   │   ├── __init__.py
│   │   └── sharepoint_client.py     # SharePoint integration
│   ├── document_processor/
│   │   ├── __init__.py
│   │   └── docling_processor.py     # Docling document processing
│   └── vector_store/
│       ├── __init__.py
│       ├── embedding_manager.py     # Embedding generation
│       └── milvus_client.py         # Milvus vector database
└── logs/                            # Log files (auto-created)
Enter fullscreen mode Exit fullscreen mode

The outcome was impressive. Building a watsonx Orchestrate Tool requires strict adherence to a specific hierarchy and file structure. Bob mastered this complexity instantly, generating the necessary folders and the essential __init__.py files with the precision of a seasoned system architect.

So without further due, hereafter is what was practically provided ⬇️

  • Connection to SharePoint 🗂️
# SharePoint Credentials
SHAREPOINT_USERNAME=your-email@company.com
SHAREPOINT_PASSWORD=your-password
# Or use App-based authentication
SHAREPOINT_CLIENT_ID=your-client-id
SHAREPOINT_CLIENT_SECRET=your-client-secret
SHAREPOINT_TENANT_ID=your-tenant-id

# Milvus Credentials (if authentication is enabled)
MILVUS_USER=
MILVUS_PASSWORD=

# Optional: IBM watsonx API credentials (if using watsonx embeddings)
WATSONX_API_KEY=
WATSONX_PROJECT_ID=
Enter fullscreen mode Exit fullscreen mode
  • SharePoint Client Code
#  sharepoint_client.py
"""
SharePoint Client for authenticating and fetching documents.
"""
import os
from typing import List, Dict, Optional
from pathlib import Path
from office365.runtime.auth.user_credential import UserCredential
from office365.runtime.auth.client_credential import ClientCredential
from office365.sharepoint.client_context import ClientContext
from office365.sharepoint.files.file import File
from loguru import logger
import tempfile


class SharePointClient:
    """Client for interacting with SharePoint."""

    def __init__(
        self,
        site_url: str,
        username: Optional[str] = None,
        password: Optional[str] = None,
        client_id: Optional[str] = None,
        client_secret: Optional[str] = None,
        tenant_id: Optional[str] = None
    ):
        """
        Initialize SharePoint client with credentials.

        Args:
            site_url: SharePoint site URL
            username: User email (for user authentication)
            password: User password (for user authentication)
            client_id: App client ID (for app authentication)
            client_secret: App client secret (for app authentication)
            tenant_id: Azure AD tenant ID (for app authentication)
        """
        self.site_url = site_url
        self.ctx = None

        # Authenticate using provided credentials
        if username and password:
            logger.info("Authenticating with user credentials")
            self._authenticate_user(username, password)
        elif client_id and client_secret and tenant_id:
            logger.info("Authenticating with app credentials")
            self._authenticate_app(client_id, client_secret, tenant_id)
        else:
            raise ValueError(
                "Must provide either (username, password) or "
                "(client_id, client_secret, tenant_id)"
            )

    def _authenticate_user(self, username: str, password: str):
        """Authenticate using user credentials."""
        try:
            credentials = UserCredential(username, password)
            self.ctx = ClientContext(self.site_url).with_credentials(credentials)
            # Test connection
            web = self.ctx.web
            self.ctx.load(web)
            self.ctx.execute_query()
            logger.info(f"Successfully authenticated to: {web.properties['Title']}")
        except Exception as e:
            logger.error(f"Authentication failed: {e}")
            raise

    def _authenticate_app(self, client_id: str, client_secret: str, tenant_id: str):
        """Authenticate using app credentials."""
        try:
            credentials = ClientCredential(client_id, client_secret)
            self.ctx = ClientContext(self.site_url).with_credentials(credentials)
            # Test connection
            web = self.ctx.web
            self.ctx.load(web)
            self.ctx.execute_query()
            logger.info(f"Successfully authenticated to: {web.properties['Title']}")
        except Exception as e:
            logger.error(f"App authentication failed: {e}")
            raise

    def list_documents(
        self,
        library_name: str = "Shared Documents",
        file_extensions: Optional[List[str]] = None
    ) -> List[Dict]:
        """
        List all documents in a SharePoint library.

        Args:
            library_name: Name of the document library
            file_extensions: List of file extensions to filter (e.g., ['.pdf', '.docx'])

        Returns:
            List of document metadata dictionaries
        """
        if file_extensions is None:
            file_extensions = ['.pdf', '.docx', '.doc', '.xlsx', '.xls']

        try:
            logger.info(f"Fetching documents from library: {library_name}")

            # Get the document library
            library = self.ctx.web.lists.get_by_title(library_name)
            items = library.items.get().execute_query()

            documents = []
            for item in items:
                file_ref = item.properties.get('FileRef', '')
                file_ext = Path(file_ref).suffix.lower()

                if file_ext in file_extensions:
                    doc_info = {
                        'name': item.properties.get('FileLeafRef', ''),
                        'path': file_ref,
                        'size': item.properties.get('File_x0020_Size', 0),
                        'modified': item.properties.get('Modified', ''),
                        'author': item.properties.get('Author', {}).get('Title', ''),
                        'extension': file_ext
                    }
                    documents.append(doc_info)

            logger.info(f"Found {len(documents)} documents")
            return documents

        except Exception as e:
            logger.error(f"Error listing documents: {e}")
            raise

    def download_document(self, file_path: str, local_path: Optional[str] = None) -> str:
        """
        Download a document from SharePoint.

        Args:
            file_path: SharePoint file path
            local_path: Local path to save file (if None, uses temp directory)

        Returns:
            Path to downloaded file
        """
        try:
            logger.info(f"Downloading: {file_path}")

            # Get file from SharePoint
            file = self.ctx.web.get_file_by_server_relative_url(file_path)

            # Determine local path
            if local_path is None:
                temp_dir = tempfile.mkdtemp()
                file_name = Path(file_path).name
                local_path = os.path.join(temp_dir, file_name)

            # Download file
            with open(local_path, 'wb') as local_file:
                file.download(local_file).execute_query()

            logger.info(f"Downloaded to: {local_path}")
            return local_path

        except Exception as e:
            logger.error(f"Error downloading document: {e}")
            raise

    def download_all_documents(
        self,
        library_name: str = "Shared Documents",
        file_extensions: Optional[List[str]] = None,
        output_dir: Optional[str] = None
    ) -> List[Dict]:
        """
        Download all documents from a SharePoint library.

        Args:
            library_name: Name of the document library
            file_extensions: List of file extensions to filter
            output_dir: Directory to save files (if None, uses temp directory)

        Returns:
            List of dictionaries with document info and local paths
        """
        if output_dir is None:
            output_dir = tempfile.mkdtemp()
        else:
            os.makedirs(output_dir, exist_ok=True)

        documents = self.list_documents(library_name, file_extensions)
        downloaded_docs = []

        for doc in documents:
            try:
                local_path = self.download_document(
                    doc['path'],
                    os.path.join(output_dir, doc['name'])
                )
                doc['local_path'] = local_path
                downloaded_docs.append(doc)
            except Exception as e:
                logger.error(f"Failed to download {doc['name']}: {e}")
                continue

        logger.info(f"Downloaded {len(downloaded_docs)} documents to {output_dir}")
        return downloaded_docs

# Made with Bob
Enter fullscreen mode Exit fullscreen mode
  • Docling Processing Code ⚙️
#  docling_processor.py
"""
Document processor using Docling for Word, Excel, and PDF files.
"""
from typing import List, Dict, Optional
from pathlib import Path
from loguru import logger
from docling.document_converter import DocumentConverter
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend


class DoclingProcessor:
    """Process documents using Docling."""

    def __init__(
        self,
        extract_tables: bool = True,
        extract_images: bool = False,
        ocr_enabled: bool = True
    ):
        """
        Initialize Docling processor.

        Args:
            extract_tables: Whether to extract tables
            extract_images: Whether to extract images
            ocr_enabled: Whether to enable OCR for scanned documents
        """
        self.extract_tables = extract_tables
        self.extract_images = extract_images
        self.ocr_enabled = ocr_enabled

        # Configure pipeline options
        pipeline_options = PdfPipelineOptions()
        pipeline_options.do_ocr = ocr_enabled
        pipeline_options.do_table_structure = extract_tables

        # Initialize document converter
        self.converter = DocumentConverter(
            allowed_formats=[
                InputFormat.PDF,
                InputFormat.DOCX,
                InputFormat.XLSX,
            ],
            pipeline_options=pipeline_options
        )

        logger.info("Docling processor initialized")

    def process_document(self, file_path: str) -> Dict:
        """
        Process a single document.

        Args:
            file_path: Path to the document file

        Returns:
            Dictionary containing processed document data
        """
        try:
            logger.info(f"Processing document: {file_path}")

            # Convert document
            result = self.converter.convert(file_path)

            # Extract text content
            text_content = result.document.export_to_markdown()

            # Extract metadata
            metadata = {
                'file_path': file_path,
                'file_name': Path(file_path).name,
                'file_type': Path(file_path).suffix,
                'num_pages': len(result.document.pages) if hasattr(result.document, 'pages') else 1,
                'title': result.document.name if hasattr(result.document, 'name') else Path(file_path).stem,
            }

            # Extract tables if enabled
            tables = []
            if self.extract_tables and hasattr(result.document, 'tables'):
                for table in result.document.tables:
                    tables.append({
                        'data': table.export_to_dataframe().to_dict() if hasattr(table, 'export_to_dataframe') else {},
                        'caption': getattr(table, 'caption', '')
                    })

            processed_doc = {
                'text': text_content,
                'metadata': metadata,
                'tables': tables,
                'success': True
            }

            logger.info(f"Successfully processed: {file_path}")
            return processed_doc

        except Exception as e:
            logger.error(f"Error processing document {file_path}: {e}")
            return {
                'text': '',
                'metadata': {'file_path': file_path, 'error': str(e)},
                'tables': [],
                'success': False
            }

    def process_documents(self, file_paths: List[str]) -> List[Dict]:
        """
        Process multiple documents.

        Args:
            file_paths: List of file paths to process

        Returns:
            List of processed document dictionaries
        """
        processed_docs = []

        for file_path in file_paths:
            doc = self.process_document(file_path)
            processed_docs.append(doc)

        successful = sum(1 for doc in processed_docs if doc['success'])
        logger.info(f"Processed {successful}/{len(file_paths)} documents successfully")

        return processed_docs

    def extract_text_chunks(
        self,
        text: str,
        chunk_size: int = 512,
        chunk_overlap: int = 50,
        min_chunk_size: int = 100
    ) -> List[str]:
        """
        Split text into chunks with overlap.

        Args:
            text: Text to split
            chunk_size: Maximum size of each chunk
            chunk_overlap: Number of characters to overlap between chunks
            min_chunk_size: Minimum size for a chunk to be included

        Returns:
            List of text chunks
        """
        if not text or len(text) < min_chunk_size:
            return [text] if text else []

        chunks = []
        start = 0
        text_length = len(text)

        while start < text_length:
            end = start + chunk_size

            # If this is not the last chunk, try to break at a sentence or word boundary
            if end < text_length:
                # Look for sentence boundary (., !, ?)
                for i in range(end, max(start + min_chunk_size, end - 100), -1):
                    if text[i] in '.!?\n':
                        end = i + 1
                        break
                else:
                    # Look for word boundary
                    for i in range(end, max(start + min_chunk_size, end - 50), -1):
                        if text[i].isspace():
                            end = i
                            break

            chunk = text[start:end].strip()
            if len(chunk) >= min_chunk_size:
                chunks.append(chunk)

            # Move start position with overlap
            start = end - chunk_overlap if end < text_length else text_length

        logger.debug(f"Split text into {len(chunks)} chunks")
        return chunks

    def process_and_chunk_document(
        self,
        file_path: str,
        chunk_size: int = 512,
        chunk_overlap: int = 50,
        min_chunk_size: int = 100
    ) -> Dict:
        """
        Process a document and split it into chunks.

        Args:
            file_path: Path to the document
            chunk_size: Maximum size of each chunk
            chunk_overlap: Number of characters to overlap
            min_chunk_size: Minimum chunk size

        Returns:
            Dictionary with processed document and chunks
        """
        # Process document
        doc = self.process_document(file_path)

        if not doc['success']:
            return doc

        # Create chunks
        chunks = self.extract_text_chunks(
            doc['text'],
            chunk_size,
            chunk_overlap,
            min_chunk_size
        )

        doc['chunks'] = chunks
        doc['num_chunks'] = len(chunks)

        return doc

# Made with Bob
Enter fullscreen mode Exit fullscreen mode
  • Milvus Client and an Embedding Code 🗄️
# milvus_client.py
"""
Milvus vector database client for storing and querying embeddings.
"""
from typing import List, Dict, Optional, Any
from pymilvus import (
    connections,
    Collection,
    CollectionSchema,
    FieldSchema,
    DataType,
    utility
)
from loguru import logger
import numpy as np


class MilvusClient:
    """Client for interacting with Milvus vector database."""

    def __init__(
        self,
        host: str = "localhost",
        port: int = 19530,
        collection_name: str = "sharepoint_documents",
        dimension: int = 384,
        index_type: str = "IVF_FLAT",
        metric_type: str = "L2",
        nlist: int = 128,
        user: Optional[str] = None,
        password: Optional[str] = None
    ):
        """
        Initialize Milvus client.

        Args:
            host: Milvus server host
            port: Milvus server port
            collection_name: Name of the collection
            dimension: Dimension of embedding vectors
            index_type: Type of index (IVF_FLAT, IVF_SQ8, HNSW, etc.)
            metric_type: Distance metric (L2, IP, COSINE)
            nlist: Number of cluster units (for IVF indexes)
            user: Username for authentication
            password: Password for authentication
        """
        self.host = host
        self.port = port
        self.collection_name = collection_name
        self.dimension = dimension
        self.index_type = index_type
        self.metric_type = metric_type
        self.nlist = nlist

        # Connect to Milvus
        logger.info(f"Connecting to Milvus at {host}:{port}")
        connections.connect(
            alias="default",
            host=host,
            port=port,
            user=user or "",
            password=password or ""
        )

        # Create or load collection
        self.collection = self._get_or_create_collection()
        logger.info(f"Milvus client initialized with collection: {collection_name}")

    def _get_or_create_collection(self) -> Collection:
        """Get existing collection or create a new one."""
        if utility.has_collection(self.collection_name):
            logger.info(f"Loading existing collection: {self.collection_name}")
            collection = Collection(self.collection_name)
        else:
            logger.info(f"Creating new collection: {self.collection_name}")
            collection = self._create_collection()

        return collection

    def _create_collection(self) -> Collection:
        """Create a new collection with schema."""
        # Define schema
        fields = [
            FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
            FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=self.dimension),
            FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
            FieldSchema(name="document_name", dtype=DataType.VARCHAR, max_length=512),
            FieldSchema(name="document_path", dtype=DataType.VARCHAR, max_length=1024),
            FieldSchema(name="chunk_index", dtype=DataType.INT64),
            FieldSchema(name="metadata", dtype=DataType.JSON)
        ]

        schema = CollectionSchema(
            fields=fields,
            description="SharePoint documents with embeddings"
        )

        # Create collection
        collection = Collection(
            name=self.collection_name,
            schema=schema
        )

        # Create index
        index_params = {
            "metric_type": self.metric_type,
            "index_type": self.index_type,
            "params": {"nlist": self.nlist}
        }

        collection.create_index(
            field_name="embedding",
            index_params=index_params
        )

        logger.info(f"Created collection with index: {self.index_type}")
        return collection

    def insert_documents(self, documents: List[Dict]) -> int:
        """
        Insert documents with embeddings into Milvus.

        Args:
            documents: List of document dictionaries with embeddings

        Returns:
            Number of vectors inserted
        """
        try:
            embeddings = []
            texts = []
            document_names = []
            document_paths = []
            chunk_indices = []
            metadata_list = []

            # Prepare data for insertion
            for doc in documents:
                if not doc.get('success') or 'embeddings' not in doc:
                    continue

                doc_metadata = doc.get('metadata', {})
                doc_name = doc_metadata.get('file_name', 'unknown')
                doc_path = doc_metadata.get('file_path', 'unknown')

                for idx, (chunk, embedding) in enumerate(zip(doc['chunks'], doc['embeddings'])):
                    embeddings.append(embedding.tolist() if isinstance(embedding, np.ndarray) else embedding)
                    texts.append(chunk[:65535])  # Truncate if needed
                    document_names.append(doc_name[:512])
                    document_paths.append(doc_path[:1024])
                    chunk_indices.append(idx)
                    metadata_list.append({
                        'num_pages': doc_metadata.get('num_pages', 0),
                        'file_type': doc_metadata.get('file_type', ''),
                        'title': doc_metadata.get('title', '')
                    })

            if not embeddings:
                logger.warning("No embeddings to insert")
                return 0

            # Insert data
            logger.info(f"Inserting {len(embeddings)} vectors into Milvus")

            entities = [
                embeddings,
                texts,
                document_names,
                document_paths,
                chunk_indices,
                metadata_list
            ]

            insert_result = self.collection.insert(entities)
            self.collection.flush()

            logger.info(f"Successfully inserted {len(embeddings)} vectors")
            return len(embeddings)

        except Exception as e:
            logger.error(f"Error inserting documents: {e}")
            raise

    def search(
        self,
        query_embedding: np.ndarray,
        top_k: int = 5,
        output_fields: Optional[List[str]] = None
    ) -> List[Dict]:
        """
        Search for similar vectors.

        Args:
            query_embedding: Query embedding vector
            top_k: Number of results to return
            output_fields: Fields to include in results

        Returns:
            List of search results
        """
        try:
            if output_fields is None:
                output_fields = ["text", "document_name", "document_path", "chunk_index", "metadata"]

            # Load collection
            self.collection.load()

            # Prepare query
            search_params = {
                "metric_type": self.metric_type,
                "params": {"nprobe": 10}
            }

            # Search
            results = self.collection.search(
                data=[query_embedding.tolist() if isinstance(query_embedding, np.ndarray) else query_embedding],
                anns_field="embedding",
                param=search_params,
                limit=top_k,
                output_fields=output_fields
            )

            # Format results
            formatted_results = []
            for hits in results:
                for hit in hits:
                    result = {
                        'id': hit.id,
                        'distance': hit.distance,
                        'score': 1 / (1 + hit.distance) if self.metric_type == "L2" else hit.distance
                    }
                    # Add output fields
                    for field in output_fields:
                        result[field] = hit.entity.get(field)
                    formatted_results.append(result)

            return formatted_results

        except Exception as e:
            logger.error(f"Error searching: {e}")
            raise

    def delete_collection(self):
        """Delete the collection."""
        try:
            utility.drop_collection(self.collection_name)
            logger.info(f"Deleted collection: {self.collection_name}")
        except Exception as e:
            logger.error(f"Error deleting collection: {e}")
            raise

    def get_collection_stats(self) -> Dict[str, Any]:
        """Get collection statistics."""
        try:
            stats = self.collection.num_entities
            return {
                'collection_name': self.collection_name,
                'num_entities': stats,
                'dimension': self.dimension
            }
        except Exception as e:
            logger.error(f"Error getting stats: {e}")
            raise

# Made with Bob
Enter fullscreen mode Exit fullscreen mode

# embedding_manager.py
"""
Embedding manager for generating embeddings from text chunks.
"""
from typing import List, Optional
import torch
from sentence_transformers import SentenceTransformer
from loguru import logger
import numpy as np


class EmbeddingManager:
    """Manage text embeddings using sentence transformers."""

    def __init__(
        self,
        model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
        batch_size: int = 32,
        normalize_embeddings: bool = True,
        device: Optional[str] = None
    ):
        """
        Initialize embedding manager.

        Args:
            model_name: Name of the sentence transformer model
            batch_size: Batch size for encoding
            normalize_embeddings: Whether to normalize embeddings
            device: Device to use ('cuda', 'cpu', or None for auto)
        """
        self.model_name = model_name
        self.batch_size = batch_size
        self.normalize_embeddings = normalize_embeddings

        # Determine device
        if device is None:
            self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        else:
            self.device = device

        logger.info(f"Loading embedding model: {model_name} on {self.device}")

        # Load model
        self.model = SentenceTransformer(model_name, device=self.device)
        self.embedding_dimension = self.model.get_sentence_embedding_dimension()

        logger.info(f"Model loaded. Embedding dimension: {self.embedding_dimension}")

    def encode_text(self, text: str) -> np.ndarray:
        """
        Encode a single text into an embedding.

        Args:
            text: Text to encode

        Returns:
            Embedding vector as numpy array
        """
        try:
            embedding = self.model.encode(
                text,
                normalize_embeddings=self.normalize_embeddings,
                show_progress_bar=False
            )
            return embedding
        except Exception as e:
            logger.error(f"Error encoding text: {e}")
            raise

    def encode_batch(self, texts: List[str]) -> np.ndarray:
        """
        Encode multiple texts into embeddings.

        Args:
            texts: List of texts to encode

        Returns:
            Array of embedding vectors
        """
        try:
            logger.info(f"Encoding {len(texts)} texts in batches of {self.batch_size}")

            embeddings = self.model.encode(
                texts,
                batch_size=self.batch_size,
                normalize_embeddings=self.normalize_embeddings,
                show_progress_bar=True,
                convert_to_numpy=True
            )

            logger.info(f"Successfully encoded {len(texts)} texts")
            return embeddings

        except Exception as e:
            logger.error(f"Error encoding batch: {e}")
            raise

    def encode_documents(self, documents: List[dict]) -> List[dict]:
        """
        Encode document chunks and add embeddings to document data.

        Args:
            documents: List of document dictionaries with 'chunks' field

        Returns:
            Documents with added 'embeddings' field
        """
        all_chunks = []
        chunk_to_doc_map = []

        # Collect all chunks and track which document they belong to
        for doc_idx, doc in enumerate(documents):
            if 'chunks' in doc and doc['chunks']:
                for chunk in doc['chunks']:
                    all_chunks.append(chunk)
                    chunk_to_doc_map.append(doc_idx)

        if not all_chunks:
            logger.warning("No chunks found in documents")
            return documents

        # Encode all chunks at once
        logger.info(f"Encoding {len(all_chunks)} chunks from {len(documents)} documents")
        embeddings = self.encode_batch(all_chunks)

        # Distribute embeddings back to documents
        for doc in documents:
            doc['embeddings'] = []

        for chunk_idx, doc_idx in enumerate(chunk_to_doc_map):
            documents[doc_idx]['embeddings'].append(embeddings[chunk_idx])

        # Add embedding metadata
        for doc in documents:
            if 'embeddings' in doc:
                doc['num_embeddings'] = len(doc['embeddings'])
                doc['embedding_dimension'] = self.embedding_dimension

        logger.info(f"Added embeddings to {len(documents)} documents")
        return documents

    def get_embedding_dimension(self) -> int:
        """Get the dimension of embeddings produced by this model."""
        return self.embedding_dimension

# Made with Bob
Enter fullscreen mode Exit fullscreen mode
  • And for the “The” Orchestrator Code and the main program 🏪
# orchestrator.py"""
Main orchestrator for the watsonx SharePoint tool.
Coordinates SharePoint access, document processing, and vector storage.
"""
import os
from typing import Optional, List, Dict
from pathlib import Path
import yaml
from dotenv import load_dotenv
from loguru import logger

from sharepoint.sharepoint_client import SharePointClient
from document_processor.docling_processor import DoclingProcessor
from vector_store.embedding_manager import EmbeddingManager
from vector_store.milvus_client import MilvusClient


class WatsonxSharePointOrchestrator:
    """Main orchestrator for the SharePoint to Milvus pipeline."""

    def __init__(self, config_path: str = "config/config.yaml"):
        """
        Initialize the orchestrator.

        Args:
            config_path: Path to configuration file
        """
        # Load environment variables
        load_dotenv()

        # Load configuration
        self.config = self._load_config(config_path)

        # Setup logging
        self._setup_logging()

        # Initialize components
        self.sharepoint_client = None
        self.docling_processor = None
        self.embedding_manager = None
        self.milvus_client = None

        logger.info("Orchestrator initialized")

    def _load_config(self, config_path: str) -> Dict:
        """Load configuration from YAML file."""
        try:
            with open(config_path, 'r') as f:
                config = yaml.safe_load(f)
            logger.info(f"Configuration loaded from {config_path}")
            return config
        except Exception as e:
            logger.error(f"Error loading config: {e}")
            raise

    def _setup_logging(self):
        """Setup logging configuration."""
        log_config = self.config.get('logging', {})
        log_level = log_config.get('level', 'INFO')
        log_file = log_config.get('log_file', 'logs/watsonx_sharepoint.log')

        # Create logs directory
        os.makedirs(os.path.dirname(log_file), exist_ok=True)

        # Configure logger
        logger.add(
            log_file,
            rotation="10 MB",
            retention="7 days",
            level=log_level
        )
        logger.info(f"Logging configured: level={log_level}, file={log_file}")

    def connect_sharepoint(
        self,
        username: Optional[str] = None,
        password: Optional[str] = None,
        client_id: Optional[str] = None,
        client_secret: Optional[str] = None,
        tenant_id: Optional[str] = None
    ):
        """
        Connect to SharePoint with provided credentials.

        Args:
            username: SharePoint username (from env if not provided)
            password: SharePoint password (from env if not provided)
            client_id: App client ID (from env if not provided)
            client_secret: App client secret (from env if not provided)
            tenant_id: Tenant ID (from env if not provided)
        """
        # Get credentials from environment if not provided
        username = username or os.getenv('SHAREPOINT_USERNAME')
        password = password or os.getenv('SHAREPOINT_PASSWORD')
        client_id = client_id or os.getenv('SHAREPOINT_CLIENT_ID')
        client_secret = client_secret or os.getenv('SHAREPOINT_CLIENT_SECRET')
        tenant_id = tenant_id or os.getenv('SHAREPOINT_TENANT_ID')

        site_url = self.config['sharepoint']['site_url']

        self.sharepoint_client = SharePointClient(
            site_url=site_url,
            username=username,
            password=password,
            client_id=client_id,
            client_secret=client_secret,
            tenant_id=tenant_id
        )

        logger.info("SharePoint client connected")

    def initialize_processors(self):
        """Initialize document processor and embedding manager."""
        # Initialize Docling processor
        docling_config = self.config.get('docling', {})
        self.docling_processor = DoclingProcessor(
            extract_tables=docling_config.get('extract_tables', True),
            extract_images=docling_config.get('extract_images', False),
            ocr_enabled=docling_config.get('ocr_enabled', True)
        )

        # Initialize embedding manager
        embedding_config = self.config.get('embedding', {})
        self.embedding_manager = EmbeddingManager(
            model_name=embedding_config.get('model_name', 'sentence-transformers/all-MiniLM-L6-v2'),
            batch_size=embedding_config.get('batch_size', 32),
            normalize_embeddings=embedding_config.get('normalize_embeddings', True)
        )

        logger.info("Processors initialized")

    def initialize_milvus(self):
        """Initialize Milvus vector database client."""
        milvus_config = self.config.get('milvus', {})

        self.milvus_client = MilvusClient(
            host=milvus_config.get('host', 'localhost'),
            port=milvus_config.get('port', 19530),
            collection_name=milvus_config.get('collection_name', 'sharepoint_documents'),
            dimension=self.embedding_manager.get_embedding_dimension(),
            index_type=milvus_config.get('index_type', 'IVF_FLAT'),
            metric_type=milvus_config.get('metric_type', 'L2'),
            nlist=milvus_config.get('nlist', 128),
            user=os.getenv('MILVUS_USER'),
            password=os.getenv('MILVUS_PASSWORD')
        )

        logger.info("Milvus client initialized")

    def process_sharepoint_documents(
        self,
        library_name: Optional[str] = None,
        output_dir: Optional[str] = None
    ) -> List[Dict]:
        """
        Download and process documents from SharePoint.

        Args:
            library_name: SharePoint library name (from config if not provided)
            output_dir: Directory to save downloaded files

        Returns:
            List of processed documents
        """
        if self.sharepoint_client is None:
            raise RuntimeError("SharePoint client not connected. Call connect_sharepoint() first.")

        if self.docling_processor is None:
            raise RuntimeError("Processors not initialized. Call initialize_processors() first.")

        # Get library name from config if not provided
        library_name = library_name or self.config['sharepoint']['document_library']
        file_extensions = self.config['sharepoint']['file_extensions']

        # Download documents
        logger.info(f"Downloading documents from library: {library_name}")
        downloaded_docs = self.sharepoint_client.download_all_documents(
            library_name=library_name,
            file_extensions=file_extensions,
            output_dir=output_dir
        )

        # Process documents with chunking
        logger.info(f"Processing {len(downloaded_docs)} documents")
        chunking_config = self.config.get('chunking', {})

        processed_docs = []
        for doc in downloaded_docs:
            processed = self.docling_processor.process_and_chunk_document(
                file_path=doc['local_path'],
                chunk_size=chunking_config.get('chunk_size', 512),
                chunk_overlap=chunking_config.get('chunk_overlap', 50),
                min_chunk_size=chunking_config.get('min_chunk_size', 100)
            )
            processed_docs.append(processed)

        logger.info(f"Processed {len(processed_docs)} documents")
        return processed_docs

    def generate_embeddings(self, documents: List[Dict]) -> List[Dict]:
        """
        Generate embeddings for processed documents.

        Args:
            documents: List of processed documents with chunks

        Returns:
            Documents with embeddings added
        """
        if self.embedding_manager is None:
            raise RuntimeError("Embedding manager not initialized. Call initialize_processors() first.")

        logger.info("Generating embeddings for documents")
        documents_with_embeddings = self.embedding_manager.encode_documents(documents)

        return documents_with_embeddings

    def store_in_milvus(self, documents: List[Dict]) -> int:
        """
        Store documents with embeddings in Milvus.

        Args:
            documents: Documents with embeddings

        Returns:
            Number of vectors inserted
        """
        if self.milvus_client is None:
            raise RuntimeError("Milvus client not initialized. Call initialize_milvus() first.")

        logger.info("Storing documents in Milvus")
        num_inserted = self.milvus_client.insert_documents(documents)

        return num_inserted

    def run_full_pipeline(
        self,
        username: Optional[str] = None,
        password: Optional[str] = None,
        client_id: Optional[str] = None,
        client_secret: Optional[str] = None,
        tenant_id: Optional[str] = None,
        library_name: Optional[str] = None
    ) -> Dict:
        """
        Run the complete pipeline from SharePoint to Milvus.

        Args:
            username: SharePoint username
            password: SharePoint password
            client_id: App client ID
            client_secret: App client secret
            tenant_id: Tenant ID
            library_name: SharePoint library name

        Returns:
            Summary of the pipeline execution
        """
        try:
            logger.info("Starting full pipeline execution")

            # Step 1: Connect to SharePoint
            logger.info("Step 1: Connecting to SharePoint")
            self.connect_sharepoint(username, password, client_id, client_secret, tenant_id)

            # Step 2: Initialize processors
            logger.info("Step 2: Initializing processors")
            self.initialize_processors()

            # Step 3: Initialize Milvus
            logger.info("Step 3: Initializing Milvus")
            self.initialize_milvus()

            # Step 4: Download and process documents
            logger.info("Step 4: Processing SharePoint documents")
            processed_docs = self.process_sharepoint_documents(library_name)

            # Step 5: Generate embeddings
            logger.info("Step 5: Generating embeddings")
            docs_with_embeddings = self.generate_embeddings(processed_docs)

            # Step 6: Store in Milvus
            logger.info("Step 6: Storing in Milvus")
            num_inserted = self.store_in_milvus(docs_with_embeddings)

            # Get statistics
            stats = self.milvus_client.get_collection_stats()

            summary = {
                'success': True,
                'documents_processed': len(processed_docs),
                'vectors_inserted': num_inserted,
                'collection_stats': stats
            }

            logger.info(f"Pipeline completed successfully: {summary}")
            return summary

        except Exception as e:
            logger.error(f"Pipeline failed: {e}")
            return {
                'success': False,
                'error': str(e)
            }

    def search_documents(self, query: str, top_k: int = 5) -> List[Dict]:
        """
        Search for documents similar to the query.

        Args:
            query: Search query text
            top_k: Number of results to return

        Returns:
            List of search results
        """
        if self.embedding_manager is None or self.milvus_client is None:
            raise RuntimeError("Managers not initialized")

        # Generate query embedding
        query_embedding = self.embedding_manager.encode_text(query)

        # Search in Milvus
        results = self.milvus_client.search(query_embedding, top_k)

        return results

# Made with Bob
Enter fullscreen mode Exit fullscreen mode

# main.py
"""
Main entry point for the watsonx SharePoint tool.
"""
import argparse
import sys
from pathlib import Path

# Add src to path
sys.path.insert(0, str(Path(__file__).parent / 'src'))

from orchestrator import WatsonxSharePointOrchestrator
from loguru import logger


def main():
    """Main function to run the tool."""
    parser = argparse.ArgumentParser(
        description='watsonx SharePoint to Milvus Document Processing Tool'
    )

    parser.add_argument(
        '--config',
        type=str,
        default='config/config.yaml',
        help='Path to configuration file'
    )

    parser.add_argument(
        '--username',
        type=str,
        help='SharePoint username (or set SHAREPOINT_USERNAME env var)'
    )

    parser.add_argument(
        '--password',
        type=str,
        help='SharePoint password (or set SHAREPOINT_PASSWORD env var)'
    )

    parser.add_argument(
        '--client-id',
        type=str,
        help='SharePoint app client ID (or set SHAREPOINT_CLIENT_ID env var)'
    )

    parser.add_argument(
        '--client-secret',
        type=str,
        help='SharePoint app client secret (or set SHAREPOINT_CLIENT_SECRET env var)'
    )

    parser.add_argument(
        '--tenant-id',
        type=str,
        help='Azure AD tenant ID (or set SHAREPOINT_TENANT_ID env var)'
    )

    parser.add_argument(
        '--library',
        type=str,
        help='SharePoint document library name (uses config default if not specified)'
    )

    parser.add_argument(
        '--search',
        type=str,
        help='Search query (if provided, runs search instead of ingestion)'
    )

    parser.add_argument(
        '--top-k',
        type=int,
        default=5,
        help='Number of search results to return (default: 5)'
    )

    args = parser.parse_args()

    try:
        # Initialize orchestrator
        logger.info("Initializing watsonx SharePoint Orchestrator")
        orchestrator = WatsonxSharePointOrchestrator(config_path=args.config)

        if args.search:
            # Search mode
            logger.info(f"Running search for: {args.search}")

            # Initialize components for search
            orchestrator.initialize_processors()
            orchestrator.initialize_milvus()

            # Perform search
            results = orchestrator.search_documents(args.search, args.top_k)

            # Display results
            print(f"\n{'='*80}")
            print(f"Search Results for: '{args.search}'")
            print(f"{'='*80}\n")

            for i, result in enumerate(results, 1):
                print(f"Result {i}:")
                print(f"  Document: {result.get('document_name', 'N/A')}")
                print(f"  Path: {result.get('document_path', 'N/A')}")
                print(f"  Chunk Index: {result.get('chunk_index', 'N/A')}")
                print(f"  Score: {result.get('score', 0):.4f}")
                print(f"  Text Preview: {result.get('text', '')[:200]}...")
                print()

        else:
            # Ingestion mode
            logger.info("Running full pipeline")

            # Run full pipeline
            summary = orchestrator.run_full_pipeline(
                username=args.username,
                password=args.password,
                client_id=args.client_id,
                client_secret=args.client_secret,
                tenant_id=args.tenant_id,
                library_name=args.library
            )

            # Display summary
            print(f"\n{'='*80}")
            print("Pipeline Execution Summary")
            print(f"{'='*80}\n")

            if summary['success']:
                print(f"✓ Status: SUCCESS")
                print(f"✓ Documents Processed: {summary['documents_processed']}")
                print(f"✓ Vectors Inserted: {summary['vectors_inserted']}")
                print(f"\nCollection Statistics:")
                stats = summary['collection_stats']
                print(f"  - Collection Name: {stats['collection_name']}")
                print(f"  - Total Entities: {stats['num_entities']}")
                print(f"  - Embedding Dimension: {stats['dimension']}")
            else:
                print(f"✗ Status: FAILED")
                print(f"✗ Error: {summary.get('error', 'Unknown error')}")
                sys.exit(1)

            print(f"\n{'='*80}\n")

        logger.info("Execution completed successfully")

    except Exception as e:
        logger.error(f"Execution failed: {e}")
        print(f"\n✗ Error: {e}\n")
        sys.exit(1)


if __name__ == '__main__':
    main()

# Made with Bob
Enter fullscreen mode Exit fullscreen mode
  • All this with a fully documented procedure (with tips for troubleshooting). 🤩


etc...

Conclusion: Redefining Production Velocity

The December sprint was more than just a successful demonstration; it was a blueprint for the future of software engineering. By combining the intent-based orchestration of watsonx Orchestrate, the high-fidelity document parsing of Docling, and the AI-augmented development capabilities of Bob, we transformed a complex integration challenge into a production-ready reality in record time.

The key takeaways from this journey underscore a fundamental shift in how we build:

  • Framework over Friction: We moved past the “manual labor” of directory structures and boilerplate, allowing the AI to handle the rigorous architectural requirements of the ADK flawlessly.
  • Intelligence at Scale: By “curling” and ingesting live documentation, the development process stayed aligned with the latest technical standards, ensuring the resulting agents were robust and reliable.
  • Developer Empowerment: This approach didn’t replace the developer; it elevated them. By automating the “ceremony” of coding, Bob was able to focus on high-level system design and the end-user experience.

Ultimately, AI-driven development isn’t about cutting corners — it’s about accelerating the path to quality. When we empower developers to build on strong, automated frameworks, we don’t just ship code faster; we deliver smarter, more resilient products that satisfy end users and solve enterprise challenges at the speed of thought.

Thanks for reading 🔆

Links

Top comments (0)