How AI-Driven development helps (helped me 😉) to gain in productivity!
Introduction
AI-driven code development is fundamentally shifting the software engineering paradigm, moving away from simple “automated typing” toward the creation of high-level, reliable application frameworks. The true value-add lies in leveraging LLMs and agentic tools to orchestrate complex architectural patterns — such as autonomous agents that handle data retrieval or API integration — which serve as a robust skeleton for the entire application. By automating the scaffolding, boilerplate, and initial testing layers, these tools significantly reduce the cognitive load on developers, allowing them to bypass the friction of repetitive setup.
This strategic shift empowers engineering teams to focus their expertise on high-value logic, creative problem-solving, and advanced system optimization. Consequently, applications move through the testing and validation phases with greater structural integrity, reaching production readiness much faster. Ultimately, this accelerated velocity doesn’t just benefit the development team; it ensures a shorter time-to-market for innovative features, directly translating into a more responsive and satisfying experience for the end user.
From Vision to Velocity: A December Sprint
Last December, our team faced a high-stakes challenge: deliver a high-impact demonstration of watsonx Orchestrate under a compressed timeline. The goal was to build a sophisticated, autonomous agent capable of bridging the gap between static enterprise data and actionable AI.
Specifically, the agent needed to navigate an enterprise SharePoint repository, extract deep insights from complex PDF, Excel, and Word files, and seamlessly feed that data into a Retrieval-Augmented Generation (RAG) pipeline. This project served as the perfect theater to showcase a powerful end-to-end technical stack:
- watsonx Orchestrate & the ADK: Used to build ad-hoc agents that don’t just follow scripts but understand intent and orchestrate complex workflows.
- Docling: Our go-to tool for document processing, ensuring that even the most stubborn layouts in PDFs and spreadsheets were converted into clean, RAG-ready data.
- Milvus: A common vector database used for RAG purpose.
- The “Human in the Loop”: Featuring Bob, our developer who leveraged these AI-driven tools to build, test, and deploy the entire application from A to Z in record time.
This sprint proved that when you combine a robust agentic framework with the right processing tools, you shift the developer’s role from “troubleshooter” to “architect,” resulting in a production-ready solution that exceeds user expectations.
The Acceleratoor “The Strategic Shift: From Scaffolding to Orchestration”
In traditional development, a developer might spend days writing the “glue code” required to connect a data source like SharePoint to an application. This includes handling OAuth flows, managing API rate limits, and structuring the data ingestion pipeline for a RAG (Retrieval-Augmented Generation) system.
By using watsonx Orchestrate, the focus shifts. Instead of writing the glue, Bob uses the Agent Development Kit (ADK) to define the intent. He configures a specialized agent to navigate the SharePoint directory, extract relevant documents, and feed them into a vector database.
OKay… But Practically What Was Done?
Once I gave my prompt to Bob and precised which sites and tools to use, and after authorizing access to the Docling and ADK (agent developer kit) documentation, Bob used a ‘curl’ process to ingest the technical specifications. This enabled the automated generation of the full project, ensuring that the resulting agents were built according to the very latest architectural standards.
watsonx_sharepoint_tool/
├── main.py # Main entry point
├── requirements.txt # Python dependencies
├── .env.example # Example environment variables
├── config/
│ └── config.yaml # Configuration file
├── src/
│ ├── orchestrator.py # Main orchestration logic
│ ├── sharepoint/
│ │ ├── __init__.py
│ │ └── sharepoint_client.py # SharePoint integration
│ ├── document_processor/
│ │ ├── __init__.py
│ │ └── docling_processor.py # Docling document processing
│ └── vector_store/
│ ├── __init__.py
│ ├── embedding_manager.py # Embedding generation
│ └── milvus_client.py # Milvus vector database
└── logs/ # Log files (auto-created)
The outcome was impressive. Building a watsonx Orchestrate Tool requires strict adherence to a specific hierarchy and file structure. Bob mastered this complexity instantly, generating the necessary folders and the essential __init__.py files with the precision of a seasoned system architect.
So without further due, hereafter is what was practically provided ⬇️
- Connection to SharePoint 🗂️
# SharePoint Credentials
SHAREPOINT_USERNAME=your-email@company.com
SHAREPOINT_PASSWORD=your-password
# Or use App-based authentication
SHAREPOINT_CLIENT_ID=your-client-id
SHAREPOINT_CLIENT_SECRET=your-client-secret
SHAREPOINT_TENANT_ID=your-tenant-id
# Milvus Credentials (if authentication is enabled)
MILVUS_USER=
MILVUS_PASSWORD=
# Optional: IBM watsonx API credentials (if using watsonx embeddings)
WATSONX_API_KEY=
WATSONX_PROJECT_ID=
- SharePoint Client Code
# sharepoint_client.py
"""
SharePoint Client for authenticating and fetching documents.
"""
import os
from typing import List, Dict, Optional
from pathlib import Path
from office365.runtime.auth.user_credential import UserCredential
from office365.runtime.auth.client_credential import ClientCredential
from office365.sharepoint.client_context import ClientContext
from office365.sharepoint.files.file import File
from loguru import logger
import tempfile
class SharePointClient:
"""Client for interacting with SharePoint."""
def __init__(
self,
site_url: str,
username: Optional[str] = None,
password: Optional[str] = None,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
tenant_id: Optional[str] = None
):
"""
Initialize SharePoint client with credentials.
Args:
site_url: SharePoint site URL
username: User email (for user authentication)
password: User password (for user authentication)
client_id: App client ID (for app authentication)
client_secret: App client secret (for app authentication)
tenant_id: Azure AD tenant ID (for app authentication)
"""
self.site_url = site_url
self.ctx = None
# Authenticate using provided credentials
if username and password:
logger.info("Authenticating with user credentials")
self._authenticate_user(username, password)
elif client_id and client_secret and tenant_id:
logger.info("Authenticating with app credentials")
self._authenticate_app(client_id, client_secret, tenant_id)
else:
raise ValueError(
"Must provide either (username, password) or "
"(client_id, client_secret, tenant_id)"
)
def _authenticate_user(self, username: str, password: str):
"""Authenticate using user credentials."""
try:
credentials = UserCredential(username, password)
self.ctx = ClientContext(self.site_url).with_credentials(credentials)
# Test connection
web = self.ctx.web
self.ctx.load(web)
self.ctx.execute_query()
logger.info(f"Successfully authenticated to: {web.properties['Title']}")
except Exception as e:
logger.error(f"Authentication failed: {e}")
raise
def _authenticate_app(self, client_id: str, client_secret: str, tenant_id: str):
"""Authenticate using app credentials."""
try:
credentials = ClientCredential(client_id, client_secret)
self.ctx = ClientContext(self.site_url).with_credentials(credentials)
# Test connection
web = self.ctx.web
self.ctx.load(web)
self.ctx.execute_query()
logger.info(f"Successfully authenticated to: {web.properties['Title']}")
except Exception as e:
logger.error(f"App authentication failed: {e}")
raise
def list_documents(
self,
library_name: str = "Shared Documents",
file_extensions: Optional[List[str]] = None
) -> List[Dict]:
"""
List all documents in a SharePoint library.
Args:
library_name: Name of the document library
file_extensions: List of file extensions to filter (e.g., ['.pdf', '.docx'])
Returns:
List of document metadata dictionaries
"""
if file_extensions is None:
file_extensions = ['.pdf', '.docx', '.doc', '.xlsx', '.xls']
try:
logger.info(f"Fetching documents from library: {library_name}")
# Get the document library
library = self.ctx.web.lists.get_by_title(library_name)
items = library.items.get().execute_query()
documents = []
for item in items:
file_ref = item.properties.get('FileRef', '')
file_ext = Path(file_ref).suffix.lower()
if file_ext in file_extensions:
doc_info = {
'name': item.properties.get('FileLeafRef', ''),
'path': file_ref,
'size': item.properties.get('File_x0020_Size', 0),
'modified': item.properties.get('Modified', ''),
'author': item.properties.get('Author', {}).get('Title', ''),
'extension': file_ext
}
documents.append(doc_info)
logger.info(f"Found {len(documents)} documents")
return documents
except Exception as e:
logger.error(f"Error listing documents: {e}")
raise
def download_document(self, file_path: str, local_path: Optional[str] = None) -> str:
"""
Download a document from SharePoint.
Args:
file_path: SharePoint file path
local_path: Local path to save file (if None, uses temp directory)
Returns:
Path to downloaded file
"""
try:
logger.info(f"Downloading: {file_path}")
# Get file from SharePoint
file = self.ctx.web.get_file_by_server_relative_url(file_path)
# Determine local path
if local_path is None:
temp_dir = tempfile.mkdtemp()
file_name = Path(file_path).name
local_path = os.path.join(temp_dir, file_name)
# Download file
with open(local_path, 'wb') as local_file:
file.download(local_file).execute_query()
logger.info(f"Downloaded to: {local_path}")
return local_path
except Exception as e:
logger.error(f"Error downloading document: {e}")
raise
def download_all_documents(
self,
library_name: str = "Shared Documents",
file_extensions: Optional[List[str]] = None,
output_dir: Optional[str] = None
) -> List[Dict]:
"""
Download all documents from a SharePoint library.
Args:
library_name: Name of the document library
file_extensions: List of file extensions to filter
output_dir: Directory to save files (if None, uses temp directory)
Returns:
List of dictionaries with document info and local paths
"""
if output_dir is None:
output_dir = tempfile.mkdtemp()
else:
os.makedirs(output_dir, exist_ok=True)
documents = self.list_documents(library_name, file_extensions)
downloaded_docs = []
for doc in documents:
try:
local_path = self.download_document(
doc['path'],
os.path.join(output_dir, doc['name'])
)
doc['local_path'] = local_path
downloaded_docs.append(doc)
except Exception as e:
logger.error(f"Failed to download {doc['name']}: {e}")
continue
logger.info(f"Downloaded {len(downloaded_docs)} documents to {output_dir}")
return downloaded_docs
# Made with Bob
- Docling Processing Code ⚙️
# docling_processor.py
"""
Document processor using Docling for Word, Excel, and PDF files.
"""
from typing import List, Dict, Optional
from pathlib import Path
from loguru import logger
from docling.document_converter import DocumentConverter
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend
class DoclingProcessor:
"""Process documents using Docling."""
def __init__(
self,
extract_tables: bool = True,
extract_images: bool = False,
ocr_enabled: bool = True
):
"""
Initialize Docling processor.
Args:
extract_tables: Whether to extract tables
extract_images: Whether to extract images
ocr_enabled: Whether to enable OCR for scanned documents
"""
self.extract_tables = extract_tables
self.extract_images = extract_images
self.ocr_enabled = ocr_enabled
# Configure pipeline options
pipeline_options = PdfPipelineOptions()
pipeline_options.do_ocr = ocr_enabled
pipeline_options.do_table_structure = extract_tables
# Initialize document converter
self.converter = DocumentConverter(
allowed_formats=[
InputFormat.PDF,
InputFormat.DOCX,
InputFormat.XLSX,
],
pipeline_options=pipeline_options
)
logger.info("Docling processor initialized")
def process_document(self, file_path: str) -> Dict:
"""
Process a single document.
Args:
file_path: Path to the document file
Returns:
Dictionary containing processed document data
"""
try:
logger.info(f"Processing document: {file_path}")
# Convert document
result = self.converter.convert(file_path)
# Extract text content
text_content = result.document.export_to_markdown()
# Extract metadata
metadata = {
'file_path': file_path,
'file_name': Path(file_path).name,
'file_type': Path(file_path).suffix,
'num_pages': len(result.document.pages) if hasattr(result.document, 'pages') else 1,
'title': result.document.name if hasattr(result.document, 'name') else Path(file_path).stem,
}
# Extract tables if enabled
tables = []
if self.extract_tables and hasattr(result.document, 'tables'):
for table in result.document.tables:
tables.append({
'data': table.export_to_dataframe().to_dict() if hasattr(table, 'export_to_dataframe') else {},
'caption': getattr(table, 'caption', '')
})
processed_doc = {
'text': text_content,
'metadata': metadata,
'tables': tables,
'success': True
}
logger.info(f"Successfully processed: {file_path}")
return processed_doc
except Exception as e:
logger.error(f"Error processing document {file_path}: {e}")
return {
'text': '',
'metadata': {'file_path': file_path, 'error': str(e)},
'tables': [],
'success': False
}
def process_documents(self, file_paths: List[str]) -> List[Dict]:
"""
Process multiple documents.
Args:
file_paths: List of file paths to process
Returns:
List of processed document dictionaries
"""
processed_docs = []
for file_path in file_paths:
doc = self.process_document(file_path)
processed_docs.append(doc)
successful = sum(1 for doc in processed_docs if doc['success'])
logger.info(f"Processed {successful}/{len(file_paths)} documents successfully")
return processed_docs
def extract_text_chunks(
self,
text: str,
chunk_size: int = 512,
chunk_overlap: int = 50,
min_chunk_size: int = 100
) -> List[str]:
"""
Split text into chunks with overlap.
Args:
text: Text to split
chunk_size: Maximum size of each chunk
chunk_overlap: Number of characters to overlap between chunks
min_chunk_size: Minimum size for a chunk to be included
Returns:
List of text chunks
"""
if not text or len(text) < min_chunk_size:
return [text] if text else []
chunks = []
start = 0
text_length = len(text)
while start < text_length:
end = start + chunk_size
# If this is not the last chunk, try to break at a sentence or word boundary
if end < text_length:
# Look for sentence boundary (., !, ?)
for i in range(end, max(start + min_chunk_size, end - 100), -1):
if text[i] in '.!?\n':
end = i + 1
break
else:
# Look for word boundary
for i in range(end, max(start + min_chunk_size, end - 50), -1):
if text[i].isspace():
end = i
break
chunk = text[start:end].strip()
if len(chunk) >= min_chunk_size:
chunks.append(chunk)
# Move start position with overlap
start = end - chunk_overlap if end < text_length else text_length
logger.debug(f"Split text into {len(chunks)} chunks")
return chunks
def process_and_chunk_document(
self,
file_path: str,
chunk_size: int = 512,
chunk_overlap: int = 50,
min_chunk_size: int = 100
) -> Dict:
"""
Process a document and split it into chunks.
Args:
file_path: Path to the document
chunk_size: Maximum size of each chunk
chunk_overlap: Number of characters to overlap
min_chunk_size: Minimum chunk size
Returns:
Dictionary with processed document and chunks
"""
# Process document
doc = self.process_document(file_path)
if not doc['success']:
return doc
# Create chunks
chunks = self.extract_text_chunks(
doc['text'],
chunk_size,
chunk_overlap,
min_chunk_size
)
doc['chunks'] = chunks
doc['num_chunks'] = len(chunks)
return doc
# Made with Bob
- Milvus Client and an Embedding Code 🗄️
# milvus_client.py
"""
Milvus vector database client for storing and querying embeddings.
"""
from typing import List, Dict, Optional, Any
from pymilvus import (
connections,
Collection,
CollectionSchema,
FieldSchema,
DataType,
utility
)
from loguru import logger
import numpy as np
class MilvusClient:
"""Client for interacting with Milvus vector database."""
def __init__(
self,
host: str = "localhost",
port: int = 19530,
collection_name: str = "sharepoint_documents",
dimension: int = 384,
index_type: str = "IVF_FLAT",
metric_type: str = "L2",
nlist: int = 128,
user: Optional[str] = None,
password: Optional[str] = None
):
"""
Initialize Milvus client.
Args:
host: Milvus server host
port: Milvus server port
collection_name: Name of the collection
dimension: Dimension of embedding vectors
index_type: Type of index (IVF_FLAT, IVF_SQ8, HNSW, etc.)
metric_type: Distance metric (L2, IP, COSINE)
nlist: Number of cluster units (for IVF indexes)
user: Username for authentication
password: Password for authentication
"""
self.host = host
self.port = port
self.collection_name = collection_name
self.dimension = dimension
self.index_type = index_type
self.metric_type = metric_type
self.nlist = nlist
# Connect to Milvus
logger.info(f"Connecting to Milvus at {host}:{port}")
connections.connect(
alias="default",
host=host,
port=port,
user=user or "",
password=password or ""
)
# Create or load collection
self.collection = self._get_or_create_collection()
logger.info(f"Milvus client initialized with collection: {collection_name}")
def _get_or_create_collection(self) -> Collection:
"""Get existing collection or create a new one."""
if utility.has_collection(self.collection_name):
logger.info(f"Loading existing collection: {self.collection_name}")
collection = Collection(self.collection_name)
else:
logger.info(f"Creating new collection: {self.collection_name}")
collection = self._create_collection()
return collection
def _create_collection(self) -> Collection:
"""Create a new collection with schema."""
# Define schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=self.dimension),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="document_name", dtype=DataType.VARCHAR, max_length=512),
FieldSchema(name="document_path", dtype=DataType.VARCHAR, max_length=1024),
FieldSchema(name="chunk_index", dtype=DataType.INT64),
FieldSchema(name="metadata", dtype=DataType.JSON)
]
schema = CollectionSchema(
fields=fields,
description="SharePoint documents with embeddings"
)
# Create collection
collection = Collection(
name=self.collection_name,
schema=schema
)
# Create index
index_params = {
"metric_type": self.metric_type,
"index_type": self.index_type,
"params": {"nlist": self.nlist}
}
collection.create_index(
field_name="embedding",
index_params=index_params
)
logger.info(f"Created collection with index: {self.index_type}")
return collection
def insert_documents(self, documents: List[Dict]) -> int:
"""
Insert documents with embeddings into Milvus.
Args:
documents: List of document dictionaries with embeddings
Returns:
Number of vectors inserted
"""
try:
embeddings = []
texts = []
document_names = []
document_paths = []
chunk_indices = []
metadata_list = []
# Prepare data for insertion
for doc in documents:
if not doc.get('success') or 'embeddings' not in doc:
continue
doc_metadata = doc.get('metadata', {})
doc_name = doc_metadata.get('file_name', 'unknown')
doc_path = doc_metadata.get('file_path', 'unknown')
for idx, (chunk, embedding) in enumerate(zip(doc['chunks'], doc['embeddings'])):
embeddings.append(embedding.tolist() if isinstance(embedding, np.ndarray) else embedding)
texts.append(chunk[:65535]) # Truncate if needed
document_names.append(doc_name[:512])
document_paths.append(doc_path[:1024])
chunk_indices.append(idx)
metadata_list.append({
'num_pages': doc_metadata.get('num_pages', 0),
'file_type': doc_metadata.get('file_type', ''),
'title': doc_metadata.get('title', '')
})
if not embeddings:
logger.warning("No embeddings to insert")
return 0
# Insert data
logger.info(f"Inserting {len(embeddings)} vectors into Milvus")
entities = [
embeddings,
texts,
document_names,
document_paths,
chunk_indices,
metadata_list
]
insert_result = self.collection.insert(entities)
self.collection.flush()
logger.info(f"Successfully inserted {len(embeddings)} vectors")
return len(embeddings)
except Exception as e:
logger.error(f"Error inserting documents: {e}")
raise
def search(
self,
query_embedding: np.ndarray,
top_k: int = 5,
output_fields: Optional[List[str]] = None
) -> List[Dict]:
"""
Search for similar vectors.
Args:
query_embedding: Query embedding vector
top_k: Number of results to return
output_fields: Fields to include in results
Returns:
List of search results
"""
try:
if output_fields is None:
output_fields = ["text", "document_name", "document_path", "chunk_index", "metadata"]
# Load collection
self.collection.load()
# Prepare query
search_params = {
"metric_type": self.metric_type,
"params": {"nprobe": 10}
}
# Search
results = self.collection.search(
data=[query_embedding.tolist() if isinstance(query_embedding, np.ndarray) else query_embedding],
anns_field="embedding",
param=search_params,
limit=top_k,
output_fields=output_fields
)
# Format results
formatted_results = []
for hits in results:
for hit in hits:
result = {
'id': hit.id,
'distance': hit.distance,
'score': 1 / (1 + hit.distance) if self.metric_type == "L2" else hit.distance
}
# Add output fields
for field in output_fields:
result[field] = hit.entity.get(field)
formatted_results.append(result)
return formatted_results
except Exception as e:
logger.error(f"Error searching: {e}")
raise
def delete_collection(self):
"""Delete the collection."""
try:
utility.drop_collection(self.collection_name)
logger.info(f"Deleted collection: {self.collection_name}")
except Exception as e:
logger.error(f"Error deleting collection: {e}")
raise
def get_collection_stats(self) -> Dict[str, Any]:
"""Get collection statistics."""
try:
stats = self.collection.num_entities
return {
'collection_name': self.collection_name,
'num_entities': stats,
'dimension': self.dimension
}
except Exception as e:
logger.error(f"Error getting stats: {e}")
raise
# Made with Bob
# embedding_manager.py
"""
Embedding manager for generating embeddings from text chunks.
"""
from typing import List, Optional
import torch
from sentence_transformers import SentenceTransformer
from loguru import logger
import numpy as np
class EmbeddingManager:
"""Manage text embeddings using sentence transformers."""
def __init__(
self,
model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
batch_size: int = 32,
normalize_embeddings: bool = True,
device: Optional[str] = None
):
"""
Initialize embedding manager.
Args:
model_name: Name of the sentence transformer model
batch_size: Batch size for encoding
normalize_embeddings: Whether to normalize embeddings
device: Device to use ('cuda', 'cpu', or None for auto)
"""
self.model_name = model_name
self.batch_size = batch_size
self.normalize_embeddings = normalize_embeddings
# Determine device
if device is None:
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
else:
self.device = device
logger.info(f"Loading embedding model: {model_name} on {self.device}")
# Load model
self.model = SentenceTransformer(model_name, device=self.device)
self.embedding_dimension = self.model.get_sentence_embedding_dimension()
logger.info(f"Model loaded. Embedding dimension: {self.embedding_dimension}")
def encode_text(self, text: str) -> np.ndarray:
"""
Encode a single text into an embedding.
Args:
text: Text to encode
Returns:
Embedding vector as numpy array
"""
try:
embedding = self.model.encode(
text,
normalize_embeddings=self.normalize_embeddings,
show_progress_bar=False
)
return embedding
except Exception as e:
logger.error(f"Error encoding text: {e}")
raise
def encode_batch(self, texts: List[str]) -> np.ndarray:
"""
Encode multiple texts into embeddings.
Args:
texts: List of texts to encode
Returns:
Array of embedding vectors
"""
try:
logger.info(f"Encoding {len(texts)} texts in batches of {self.batch_size}")
embeddings = self.model.encode(
texts,
batch_size=self.batch_size,
normalize_embeddings=self.normalize_embeddings,
show_progress_bar=True,
convert_to_numpy=True
)
logger.info(f"Successfully encoded {len(texts)} texts")
return embeddings
except Exception as e:
logger.error(f"Error encoding batch: {e}")
raise
def encode_documents(self, documents: List[dict]) -> List[dict]:
"""
Encode document chunks and add embeddings to document data.
Args:
documents: List of document dictionaries with 'chunks' field
Returns:
Documents with added 'embeddings' field
"""
all_chunks = []
chunk_to_doc_map = []
# Collect all chunks and track which document they belong to
for doc_idx, doc in enumerate(documents):
if 'chunks' in doc and doc['chunks']:
for chunk in doc['chunks']:
all_chunks.append(chunk)
chunk_to_doc_map.append(doc_idx)
if not all_chunks:
logger.warning("No chunks found in documents")
return documents
# Encode all chunks at once
logger.info(f"Encoding {len(all_chunks)} chunks from {len(documents)} documents")
embeddings = self.encode_batch(all_chunks)
# Distribute embeddings back to documents
for doc in documents:
doc['embeddings'] = []
for chunk_idx, doc_idx in enumerate(chunk_to_doc_map):
documents[doc_idx]['embeddings'].append(embeddings[chunk_idx])
# Add embedding metadata
for doc in documents:
if 'embeddings' in doc:
doc['num_embeddings'] = len(doc['embeddings'])
doc['embedding_dimension'] = self.embedding_dimension
logger.info(f"Added embeddings to {len(documents)} documents")
return documents
def get_embedding_dimension(self) -> int:
"""Get the dimension of embeddings produced by this model."""
return self.embedding_dimension
# Made with Bob
- And for the “The” Orchestrator Code and the main program 🏪
# orchestrator.py"""
Main orchestrator for the watsonx SharePoint tool.
Coordinates SharePoint access, document processing, and vector storage.
"""
import os
from typing import Optional, List, Dict
from pathlib import Path
import yaml
from dotenv import load_dotenv
from loguru import logger
from sharepoint.sharepoint_client import SharePointClient
from document_processor.docling_processor import DoclingProcessor
from vector_store.embedding_manager import EmbeddingManager
from vector_store.milvus_client import MilvusClient
class WatsonxSharePointOrchestrator:
"""Main orchestrator for the SharePoint to Milvus pipeline."""
def __init__(self, config_path: str = "config/config.yaml"):
"""
Initialize the orchestrator.
Args:
config_path: Path to configuration file
"""
# Load environment variables
load_dotenv()
# Load configuration
self.config = self._load_config(config_path)
# Setup logging
self._setup_logging()
# Initialize components
self.sharepoint_client = None
self.docling_processor = None
self.embedding_manager = None
self.milvus_client = None
logger.info("Orchestrator initialized")
def _load_config(self, config_path: str) -> Dict:
"""Load configuration from YAML file."""
try:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
logger.info(f"Configuration loaded from {config_path}")
return config
except Exception as e:
logger.error(f"Error loading config: {e}")
raise
def _setup_logging(self):
"""Setup logging configuration."""
log_config = self.config.get('logging', {})
log_level = log_config.get('level', 'INFO')
log_file = log_config.get('log_file', 'logs/watsonx_sharepoint.log')
# Create logs directory
os.makedirs(os.path.dirname(log_file), exist_ok=True)
# Configure logger
logger.add(
log_file,
rotation="10 MB",
retention="7 days",
level=log_level
)
logger.info(f"Logging configured: level={log_level}, file={log_file}")
def connect_sharepoint(
self,
username: Optional[str] = None,
password: Optional[str] = None,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
tenant_id: Optional[str] = None
):
"""
Connect to SharePoint with provided credentials.
Args:
username: SharePoint username (from env if not provided)
password: SharePoint password (from env if not provided)
client_id: App client ID (from env if not provided)
client_secret: App client secret (from env if not provided)
tenant_id: Tenant ID (from env if not provided)
"""
# Get credentials from environment if not provided
username = username or os.getenv('SHAREPOINT_USERNAME')
password = password or os.getenv('SHAREPOINT_PASSWORD')
client_id = client_id or os.getenv('SHAREPOINT_CLIENT_ID')
client_secret = client_secret or os.getenv('SHAREPOINT_CLIENT_SECRET')
tenant_id = tenant_id or os.getenv('SHAREPOINT_TENANT_ID')
site_url = self.config['sharepoint']['site_url']
self.sharepoint_client = SharePointClient(
site_url=site_url,
username=username,
password=password,
client_id=client_id,
client_secret=client_secret,
tenant_id=tenant_id
)
logger.info("SharePoint client connected")
def initialize_processors(self):
"""Initialize document processor and embedding manager."""
# Initialize Docling processor
docling_config = self.config.get('docling', {})
self.docling_processor = DoclingProcessor(
extract_tables=docling_config.get('extract_tables', True),
extract_images=docling_config.get('extract_images', False),
ocr_enabled=docling_config.get('ocr_enabled', True)
)
# Initialize embedding manager
embedding_config = self.config.get('embedding', {})
self.embedding_manager = EmbeddingManager(
model_name=embedding_config.get('model_name', 'sentence-transformers/all-MiniLM-L6-v2'),
batch_size=embedding_config.get('batch_size', 32),
normalize_embeddings=embedding_config.get('normalize_embeddings', True)
)
logger.info("Processors initialized")
def initialize_milvus(self):
"""Initialize Milvus vector database client."""
milvus_config = self.config.get('milvus', {})
self.milvus_client = MilvusClient(
host=milvus_config.get('host', 'localhost'),
port=milvus_config.get('port', 19530),
collection_name=milvus_config.get('collection_name', 'sharepoint_documents'),
dimension=self.embedding_manager.get_embedding_dimension(),
index_type=milvus_config.get('index_type', 'IVF_FLAT'),
metric_type=milvus_config.get('metric_type', 'L2'),
nlist=milvus_config.get('nlist', 128),
user=os.getenv('MILVUS_USER'),
password=os.getenv('MILVUS_PASSWORD')
)
logger.info("Milvus client initialized")
def process_sharepoint_documents(
self,
library_name: Optional[str] = None,
output_dir: Optional[str] = None
) -> List[Dict]:
"""
Download and process documents from SharePoint.
Args:
library_name: SharePoint library name (from config if not provided)
output_dir: Directory to save downloaded files
Returns:
List of processed documents
"""
if self.sharepoint_client is None:
raise RuntimeError("SharePoint client not connected. Call connect_sharepoint() first.")
if self.docling_processor is None:
raise RuntimeError("Processors not initialized. Call initialize_processors() first.")
# Get library name from config if not provided
library_name = library_name or self.config['sharepoint']['document_library']
file_extensions = self.config['sharepoint']['file_extensions']
# Download documents
logger.info(f"Downloading documents from library: {library_name}")
downloaded_docs = self.sharepoint_client.download_all_documents(
library_name=library_name,
file_extensions=file_extensions,
output_dir=output_dir
)
# Process documents with chunking
logger.info(f"Processing {len(downloaded_docs)} documents")
chunking_config = self.config.get('chunking', {})
processed_docs = []
for doc in downloaded_docs:
processed = self.docling_processor.process_and_chunk_document(
file_path=doc['local_path'],
chunk_size=chunking_config.get('chunk_size', 512),
chunk_overlap=chunking_config.get('chunk_overlap', 50),
min_chunk_size=chunking_config.get('min_chunk_size', 100)
)
processed_docs.append(processed)
logger.info(f"Processed {len(processed_docs)} documents")
return processed_docs
def generate_embeddings(self, documents: List[Dict]) -> List[Dict]:
"""
Generate embeddings for processed documents.
Args:
documents: List of processed documents with chunks
Returns:
Documents with embeddings added
"""
if self.embedding_manager is None:
raise RuntimeError("Embedding manager not initialized. Call initialize_processors() first.")
logger.info("Generating embeddings for documents")
documents_with_embeddings = self.embedding_manager.encode_documents(documents)
return documents_with_embeddings
def store_in_milvus(self, documents: List[Dict]) -> int:
"""
Store documents with embeddings in Milvus.
Args:
documents: Documents with embeddings
Returns:
Number of vectors inserted
"""
if self.milvus_client is None:
raise RuntimeError("Milvus client not initialized. Call initialize_milvus() first.")
logger.info("Storing documents in Milvus")
num_inserted = self.milvus_client.insert_documents(documents)
return num_inserted
def run_full_pipeline(
self,
username: Optional[str] = None,
password: Optional[str] = None,
client_id: Optional[str] = None,
client_secret: Optional[str] = None,
tenant_id: Optional[str] = None,
library_name: Optional[str] = None
) -> Dict:
"""
Run the complete pipeline from SharePoint to Milvus.
Args:
username: SharePoint username
password: SharePoint password
client_id: App client ID
client_secret: App client secret
tenant_id: Tenant ID
library_name: SharePoint library name
Returns:
Summary of the pipeline execution
"""
try:
logger.info("Starting full pipeline execution")
# Step 1: Connect to SharePoint
logger.info("Step 1: Connecting to SharePoint")
self.connect_sharepoint(username, password, client_id, client_secret, tenant_id)
# Step 2: Initialize processors
logger.info("Step 2: Initializing processors")
self.initialize_processors()
# Step 3: Initialize Milvus
logger.info("Step 3: Initializing Milvus")
self.initialize_milvus()
# Step 4: Download and process documents
logger.info("Step 4: Processing SharePoint documents")
processed_docs = self.process_sharepoint_documents(library_name)
# Step 5: Generate embeddings
logger.info("Step 5: Generating embeddings")
docs_with_embeddings = self.generate_embeddings(processed_docs)
# Step 6: Store in Milvus
logger.info("Step 6: Storing in Milvus")
num_inserted = self.store_in_milvus(docs_with_embeddings)
# Get statistics
stats = self.milvus_client.get_collection_stats()
summary = {
'success': True,
'documents_processed': len(processed_docs),
'vectors_inserted': num_inserted,
'collection_stats': stats
}
logger.info(f"Pipeline completed successfully: {summary}")
return summary
except Exception as e:
logger.error(f"Pipeline failed: {e}")
return {
'success': False,
'error': str(e)
}
def search_documents(self, query: str, top_k: int = 5) -> List[Dict]:
"""
Search for documents similar to the query.
Args:
query: Search query text
top_k: Number of results to return
Returns:
List of search results
"""
if self.embedding_manager is None or self.milvus_client is None:
raise RuntimeError("Managers not initialized")
# Generate query embedding
query_embedding = self.embedding_manager.encode_text(query)
# Search in Milvus
results = self.milvus_client.search(query_embedding, top_k)
return results
# Made with Bob
# main.py
"""
Main entry point for the watsonx SharePoint tool.
"""
import argparse
import sys
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent / 'src'))
from orchestrator import WatsonxSharePointOrchestrator
from loguru import logger
def main():
"""Main function to run the tool."""
parser = argparse.ArgumentParser(
description='watsonx SharePoint to Milvus Document Processing Tool'
)
parser.add_argument(
'--config',
type=str,
default='config/config.yaml',
help='Path to configuration file'
)
parser.add_argument(
'--username',
type=str,
help='SharePoint username (or set SHAREPOINT_USERNAME env var)'
)
parser.add_argument(
'--password',
type=str,
help='SharePoint password (or set SHAREPOINT_PASSWORD env var)'
)
parser.add_argument(
'--client-id',
type=str,
help='SharePoint app client ID (or set SHAREPOINT_CLIENT_ID env var)'
)
parser.add_argument(
'--client-secret',
type=str,
help='SharePoint app client secret (or set SHAREPOINT_CLIENT_SECRET env var)'
)
parser.add_argument(
'--tenant-id',
type=str,
help='Azure AD tenant ID (or set SHAREPOINT_TENANT_ID env var)'
)
parser.add_argument(
'--library',
type=str,
help='SharePoint document library name (uses config default if not specified)'
)
parser.add_argument(
'--search',
type=str,
help='Search query (if provided, runs search instead of ingestion)'
)
parser.add_argument(
'--top-k',
type=int,
default=5,
help='Number of search results to return (default: 5)'
)
args = parser.parse_args()
try:
# Initialize orchestrator
logger.info("Initializing watsonx SharePoint Orchestrator")
orchestrator = WatsonxSharePointOrchestrator(config_path=args.config)
if args.search:
# Search mode
logger.info(f"Running search for: {args.search}")
# Initialize components for search
orchestrator.initialize_processors()
orchestrator.initialize_milvus()
# Perform search
results = orchestrator.search_documents(args.search, args.top_k)
# Display results
print(f"\n{'='*80}")
print(f"Search Results for: '{args.search}'")
print(f"{'='*80}\n")
for i, result in enumerate(results, 1):
print(f"Result {i}:")
print(f" Document: {result.get('document_name', 'N/A')}")
print(f" Path: {result.get('document_path', 'N/A')}")
print(f" Chunk Index: {result.get('chunk_index', 'N/A')}")
print(f" Score: {result.get('score', 0):.4f}")
print(f" Text Preview: {result.get('text', '')[:200]}...")
print()
else:
# Ingestion mode
logger.info("Running full pipeline")
# Run full pipeline
summary = orchestrator.run_full_pipeline(
username=args.username,
password=args.password,
client_id=args.client_id,
client_secret=args.client_secret,
tenant_id=args.tenant_id,
library_name=args.library
)
# Display summary
print(f"\n{'='*80}")
print("Pipeline Execution Summary")
print(f"{'='*80}\n")
if summary['success']:
print(f"✓ Status: SUCCESS")
print(f"✓ Documents Processed: {summary['documents_processed']}")
print(f"✓ Vectors Inserted: {summary['vectors_inserted']}")
print(f"\nCollection Statistics:")
stats = summary['collection_stats']
print(f" - Collection Name: {stats['collection_name']}")
print(f" - Total Entities: {stats['num_entities']}")
print(f" - Embedding Dimension: {stats['dimension']}")
else:
print(f"✗ Status: FAILED")
print(f"✗ Error: {summary.get('error', 'Unknown error')}")
sys.exit(1)
print(f"\n{'='*80}\n")
logger.info("Execution completed successfully")
except Exception as e:
logger.error(f"Execution failed: {e}")
print(f"\n✗ Error: {e}\n")
sys.exit(1)
if __name__ == '__main__':
main()
# Made with Bob
- All this with a fully documented procedure (with tips for troubleshooting). 🤩
Conclusion: Redefining Production Velocity
The December sprint was more than just a successful demonstration; it was a blueprint for the future of software engineering. By combining the intent-based orchestration of watsonx Orchestrate, the high-fidelity document parsing of Docling, and the AI-augmented development capabilities of Bob, we transformed a complex integration challenge into a production-ready reality in record time.
The key takeaways from this journey underscore a fundamental shift in how we build:
- Framework over Friction: We moved past the “manual labor” of directory structures and boilerplate, allowing the AI to handle the rigorous architectural requirements of the ADK flawlessly.
- Intelligence at Scale: By “curling” and ingesting live documentation, the development process stayed aligned with the latest technical standards, ensuring the resulting agents were robust and reliable.
- Developer Empowerment: This approach didn’t replace the developer; it elevated them. By automating the “ceremony” of coding, Bob was able to focus on high-level system design and the end-user experience.
Ultimately, AI-driven development isn’t about cutting corners — it’s about accelerating the path to quality. When we empower developers to build on strong, automated frameworks, we don’t just ship code faster; we deliver smarter, more resilient products that satisfy end users and solve enterprise challenges at the speed of thought.
Thanks for reading 🔆
Links
- IBM Project Bob: https://www.ibm.com/products/bob
- IBM watsonx Orchestrate Agent Developer Kit Documentation: https://developer.watson-orchestrate.ibm.com/
- IBM watsonx Orchestrate Agent Developer Kit Public Repository: https://github.com/ibm/ibm-watsonx-orchestrate-adk/
- watsonx Orchestrate Documentation: https://www.ibm.com/docs/en/watsonx/watson-orchestrate/base






Top comments (0)