DEV Community

Alain Airom
Alain Airom

Posted on

Securing the Cloud-Native Frontier: Vault Security Meets Agentic Automation with watsonx Orchestrate and IBM Bob Code Assistant

Combining sets of tools to autmate security implementation for a Kubernetes Cluster!

In the rapidly evolving landscape of Kubernetes, managing sensitive data like API keys, database credentials, and certificates is a critical challenge. Modern enterprises need more than just a storage vault; they need a proactive, intelligent ecosystem that not only secures secrets but also monitors them and automates the surrounding workflows.
This post explores a powerful combination: the Vault Secrets Operator for robust Kubernetes security, integrated with the agentic capabilities of watsonx Orchestrate, all accelerated by the IBM Bob Code Assistant.


TL-DR 1; What is HashiCorp Vault?

HashiCorp Vault is an identity-based secrets and encryption management system designed to secure modern computing environments. At its core, Vault provides a centralized way to store and control access to sensitive information such as API keys, passwords, certificates, and database credentials. Unlike traditional static secret storage, Vault excels in dynamic environments by offering “encryption as a service” and the ability to generate dynamic, short-lived credentials on-demand. By leveraging robust authentication methods and fine-grained authorization policies, it ensures that only authorized users and applications can access the secrets they need, while maintaining a detailed audit log for compliance and security forensics.


TL-DR 2; What is IBM watsonx Orchestrate?

IBM watsonx Orchestrate is an interactive AI-powered platform designed to automate and streamline complex business workflows through the use of “agentic” automation. It acts as a digital personal assistant or “orchestrator” that enables users to delegate tasks using natural language. By connecting to a vast ecosystem of enterprise applications — such as Salesforce, SAP, and Workday — watsonx Orchestrate uses AI agents to reason through problems, execute multi-step processes across different systems, and adapt to feedback. It empowers teams by removing manual, repetitive “work about work,” allowing employees to focus on higher-value strategic initiatives while the platform handles the execution, monitoring, and integration of tasks in the background.

watsonx Orchestrate provides an ADK which is a set of tools designed to make it easy to build and deploy agents using IBM watsonx Orchestrate. It is packaged as a Python library and command line tool that allows builders to configure agents that run on the IBM watsonx Orchestrate platform. The ADK also supports integrating agents and tools built on other frameworks.

These agents and tools can be run developed locally via an offering known as the watsonx Orchestrate Developer Edition, a fully self-contained local copy of watsonx Orchestrate that can run on your laptop or desktop where you can rapidly iterate in isolation.

Once you are satisfied with what you built, it is possible to connect the ADK to a production instance of watsonx Orchestrate to share what you have built locally with your team and run at scale!


The Porject Core: Vault Secrets Operator

The foundation of this solution is the Vault Secrets Operator, a Kubernetes-native tool designed to synchronize secrets from HashiCorp Vault directly into Kubernetes Secrets.

Intelligence at Scale: The Monitoring Agent

While the operator handles the “plumbing” of secret synchronization, the watsonx-agent (a Python-based monitoring component) adds a layer of intelligent oversight. This agent tracks secret changes — including additions, updates, and deletions — and reports them through various notification channels like Slack or Webhooks.

vault-secrets-operator/
├── operator/                          # Kubernetes Operator
│   ├── cmd/
│   │   └── main.go                   # Operator entry point
│   ├── pkg/
│   │   ├── apis/vault/v1alpha1/      # Custom Resource Definitions
│   │   │   ├── types.go              # VaultSecret CRD types
│   │   │   └── register.go           # Scheme registration
│   │   ├── controller/               # Controller implementation
│   │   │   └── vaultsecret_controller.go
│   │   └── vault/                    # Vault client
│   │       └── client.go             # Vault API wrapper
│   ├── config/
│   │   ├── crd/                      # CRD manifests
│   │   │   └── vault.ibm.com_vaultsecrets.yaml
│   │   ├── rbac/                     # RBAC configurations
│   │   ├── manager/                  # Manager configurations
│   │   └── samples/                  # Example resources
│   │       ├── vaultsecret-kubernetes-auth.yaml
│   │       └── vaultsecret-approle-auth.yaml
│   ├── go.mod                        # Go dependencies
│   └── Dockerfile                    # Operator container image
│
├── watsonx-agent/                    # Monitoring Agent (Python)
│   ├── src/
│   │   ├── __init__.py               # Package initialization
│   │   ├── main.py                   # Agent entry point
│   │   ├── secret_monitor.py         # Secret monitoring logic
│   │   └── notification_service.py   # Notification handling
│   ├── config/                       # Agent configurations
│   ├── requirements.txt              # Python dependencies
│   └── Dockerfile                    # Agent container image
│
├── deploy/kubernetes/                # Deployment manifests
│   ├── operator-deployment.yaml      # Operator deployment
│   └── watsonx-agent-deployment.yaml # Agent deployment
│
├── docs/                             # Documentation
│   ├── diagrams/
│   │   └── architecture.md           # Architecture diagrams
│   ├── DEPLOYMENT.md                 # Deployment guide
│   └── QUICKSTART.md                 # Quick start guide
│
├── Makefile                          # Build and deployment automation
└── README.md                         # Main documentation
Enter fullscreen mode Exit fullscreen mode

Main Components

1-Kubernetes Operator

Press enter or click to view image in full size

![Uploading image](...

Synchronizing secrets from HashiCorp Vault to Kubernetes Secrets

  • Multiple authentication methods (Kubernetes, AppRole, Token)
  • Automatic secret rotation with configurable intervals
  • Support for KV v1 and KV v2 secret engines
  • Status reporting and health checks
  • Finalizers for proper cleanup
  • Metrics endpoint for monitoring
package main

import (
 "flag"
 "os"

 "k8s.io/apimachinery/pkg/runtime"
 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
 ctrl "sigs.k8s.io/controller-runtime"
 "sigs.k8s.io/controller-runtime/pkg/healthz"
 "sigs.k8s.io/controller-runtime/pkg/log/zap"

 vaultv1alpha1 "github.com/ibm/vault-secrets-operator/operator/pkg/apis/vault/v1alpha1"
 "github.com/ibm/vault-secrets-operator/operator/pkg/controller"
)

var (
 scheme   = runtime.NewScheme()
 setupLog = ctrl.Log.WithName("setup")
)

func init() {
 utilruntime.Must(clientgoscheme.AddToScheme(scheme))
 utilruntime.Must(vaultv1alpha1.AddToScheme(scheme))
}

func main() {
 var metricsAddr string
 var enableLeaderElection bool
 var probeAddr string

 flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.")
 flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.")
 flag.BoolVar(&enableLeaderElection, "leader-elect", false,
  "Enable leader election for controller manager. "+
   "Enabling this will ensure there is only one active controller manager.")

 opts := zap.Options{
  Development: true,
 }
 opts.BindFlags(flag.CommandLine)
 flag.Parse()

 ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

 mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
  Scheme:                 scheme,
  MetricsBindAddress:     metricsAddr,
  Port:                   9443,
  HealthProbeBindAddress: probeAddr,
  LeaderElection:         enableLeaderElection,
  LeaderElectionID:       "vault-secrets-operator.ibm.com",
 })
 if err != nil {
  setupLog.Error(err, "unable to start manager")
  os.Exit(1)
 }

 if err = (&controller.VaultSecretReconciler{
  Client: mgr.GetClient(),
  Scheme: mgr.GetScheme(),
 }).SetupWithManager(mgr); err != nil {
  setupLog.Error(err, "unable to create controller", "controller", "VaultSecret")
  os.Exit(1)
 }

 if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
  setupLog.Error(err, "unable to set up health check")
  os.Exit(1)
 }
 if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
  setupLog.Error(err, "unable to set up ready check")
  os.Exit(1)
 }

 setupLog.Info("starting manager")
 if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
  setupLog.Error(err, "problem running manager")
  os.Exit(1)
 }
}

// Made with Bob
Enter fullscreen mode Exit fullscreen mode

2-watsonx Orchestrate Monitoring Agent

Monitoring Kubernetes secrets for changes and sends notifications

  • Hourly secret change detection (configurable via cron expressions)
"""
Vault Secrets Monitor Agent - Main Application
watsonx Orchestrate monitoring agent for Kubernetes secrets
"""

import os
import sys
import time
import signal
import logging
import schedule
from http.server import HTTPServer, BaseHTTPRequestHandler
from threading import Thread
from kubernetes import client, config
from dotenv import load_dotenv

from secret_monitor import SecretMonitor
from notification_service import NotificationService


# Load environment variables
load_dotenv()

# Configure logging
logging.basicConfig(
    level=os.getenv('LOG_LEVEL', 'INFO').upper(),
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout),
        logging.FileHandler('logs/agent.log') if os.path.exists('logs') else logging.NullHandler()
    ]
)

logger = logging.getLogger(__name__)


class HealthCheckHandler(BaseHTTPRequestHandler):
    """HTTP handler for health checks"""

    def do_GET(self):
        """Handle GET requests"""
        if self.path == '/health':
            self.send_response(200)
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            self.wfile.write(b'{"status": "healthy"}')
        elif self.path == '/ready':
            self.send_response(200)
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            self.wfile.write(b'{"status": "ready"}')
        else:
            self.send_response(404)
            self.end_headers()

    def log_message(self, format, *args):
        """Suppress default logging"""
        pass


class VaultSecretsMonitor:
    """Main monitoring application"""

    def __init__(self):
        """Initialize the monitor"""
        self.config = self._load_config()
        self.k8s_client = self._init_kubernetes_client()
        self.secret_monitor = SecretMonitor(
            self.k8s_client,
            self.config['namespace']
        )
        self.notification_service = NotificationService(self.config)
        self.running = True

    def _load_config(self) -> dict:
        """Load configuration from environment variables"""
        return {
            'namespace': os.getenv('NAMESPACE', 'default'),
            'check_interval': os.getenv('CHECK_INTERVAL', '0 * * * *'),
            'cluster_name': os.getenv('CLUSTER_NAME', 'default-cluster'),
            'slack_webhook': os.getenv('SLACK_WEBHOOK'),
            'notification_webhook': os.getenv('NOTIFICATION_WEBHOOK'),
            'health_port': int(os.getenv('HEALTH_PORT', '8080'))
        }

    def _init_kubernetes_client(self) -> client.CoreV1Api:
        """Initialize Kubernetes client"""
        try:
            # Try to load in-cluster config first
            config.load_incluster_config()
            logger.info("Loaded in-cluster Kubernetes configuration")
        except config.ConfigException:
            try:
                # Fall back to kubeconfig
                config.load_kube_config()
                logger.info("Loaded kubeconfig configuration")
            except config.ConfigException as e:
                logger.error(f"Failed to load Kubernetes configuration: {e}")
                raise

        return client.CoreV1Api()

    def monitor_secrets(self):
        """Main monitoring function"""
        logger.info(
            f"Starting secret monitoring cycle for cluster: {self.config['cluster_name']}"
        )

        try:
            # Check for changes
            changes = self.secret_monitor.check_for_changes()

            if changes:
                logger.info(f"Detected {len(changes)} secret changes")

                # Send notifications
                self.notification_service.send_notifications(
                    changes,
                    self.config['cluster_name']
                )

                logger.info("Notifications sent successfully")
            else:
                logger.info("No secret changes detected")

            # Update baseline for next check
            self.secret_monitor.update_baseline()

        except Exception as e:
            logger.error(f"Error during monitoring cycle: {e}", exc_info=True)

            # Send error notification
            try:
                self.notification_service.send_error_notification(
                    e,
                    self.config['cluster_name']
                )
            except Exception as notify_error:
                logger.error(f"Failed to send error notification: {notify_error}")

    def start_health_server(self):
        """Start health check HTTP server"""
        port = self.config['health_port']
        server = HTTPServer(('0.0.0.0', port), HealthCheckHandler)

        def serve():
            logger.info(f"Health check server listening on port {port}")
            server.serve_forever()

        thread = Thread(target=serve, daemon=True)
        thread.start()

    def setup_schedule(self):
        """Setup monitoring schedule"""
        # Parse cron expression (simplified - supports hourly format)
        interval = self.config['check_interval']

        if interval == '0 * * * *':  # Every hour
            schedule.every().hour.do(self.monitor_secrets)
            logger.info("Scheduled monitoring every hour")
        elif interval.startswith('*/'):  # Every N minutes
            minutes = int(interval.split('/')[1].split()[0])
            schedule.every(minutes).minutes.do(self.monitor_secrets)
            logger.info(f"Scheduled monitoring every {minutes} minutes")
        else:
            # Default to hourly
            schedule.every().hour.do(self.monitor_secrets)
            logger.info("Scheduled monitoring every hour (default)")

    def run(self):
        """Run the monitoring agent"""
        logger.info(
            f"Starting Vault Secrets Monitor Agent for cluster: {self.config['cluster_name']}"
        )

        # Start health check server
        self.start_health_server()

        # Initialize baseline
        self.secret_monitor.initialize_baseline()
        logger.info("Baseline initialized")

        # Run initial check
        self.monitor_secrets()

        # Setup schedule
        self.setup_schedule()

        # Run scheduler
        logger.info("Monitoring agent started successfully")
        while self.running:
            schedule.run_pending()
            time.sleep(1)

    def stop(self):
        """Stop the monitoring agent"""
        logger.info("Stopping monitoring agent")
        self.running = False


def signal_handler(signum, frame):
    """Handle shutdown signals"""
    logger.info(f"Received signal {signum}, shutting down gracefully")
    if 'monitor' in globals():
        monitor.stop()
    sys.exit(0)


if __name__ == '__main__':
    # Setup signal handlers
    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)

    try:
        # Create and run monitor
        monitor = VaultSecretsMonitor()
        monitor.run()
    except KeyboardInterrupt:
        logger.info("Received keyboard interrupt, shutting down")
        if 'monitor' in locals():
            monitor.stop()
    except Exception as e:
        logger.error(f"Fatal error: {e}", exc_info=True)
        sys.exit(1)

# Made with Bob
Enter fullscreen mode Exit fullscreen mode
  • Multiple notification channels: Slack Webhooks or Generic Webhooks
"""
Notification Service Module
Handles sending notifications about secret changes
"""

import logging
import requests
from typing import List, Dict, Optional
from datetime import datetime


class NotificationService:
    """Handles sending notifications for secret changes"""

    def __init__(self, config: Dict):
        """
        Initialize the notification service

        Args:
            config: Configuration dictionary with webhook URLs
        """
        self.config = config
        self.logger = logging.getLogger(__name__)

    def send_notifications(self, changes: List[Dict], cluster_name: str) -> None:
        """
        Send notifications for detected changes

        Args:
            changes: List of change dictionaries
            cluster_name: Name of the cluster
        """
        notifications = []

        # Send to Slack if configured
        if self.config.get('slack_webhook'):
            try:
                self._send_slack_notification(changes, cluster_name)
                self.logger.info("Slack notification sent")
            except Exception as e:
                self.logger.error(f"Failed to send Slack notification: {e}")

        # Send to generic webhook if configured
        if self.config.get('notification_webhook'):
            try:
                self._send_webhook_notification(changes, cluster_name)
                self.logger.info("Webhook notification sent")
            except Exception as e:
                self.logger.error(f"Failed to send webhook notification: {e}")

        self.logger.info("All notifications processed")

    def _send_slack_notification(self, changes: List[Dict], cluster_name: str) -> None:
        """Send Slack notification"""
        message = self._format_slack_message(changes, cluster_name)

        response = requests.post(
            self.config['slack_webhook'],
            json=message,
            headers={'Content-Type': 'application/json'},
            timeout=10
        )
        response.raise_for_status()

    def _format_slack_message(self, changes: List[Dict], cluster_name: str) -> Dict:
        """Format message for Slack"""
        summary = self._get_change_summary(changes)

        blocks = [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": f"🔐 Vault Secrets Changes Detected - {cluster_name}",
                    "emoji": True
                }
            },
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": (
                        f"*Summary:* {summary['total']} changes detected\n"
                        f"• Added: {summary['added']}\n"
                        f"• Updated: {summary['updated']}\n"
                        f"• Deleted: {summary['deleted']}\n"
                        f"• Metadata Updated: {summary['metadata_updated']}"
                    )
                }
            },
            {"type": "divider"}
        ]

        # Add details for each change (limit to 10)
        for change in changes[:10]:
            blocks.append({
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": self._format_change_for_slack(change)
                }
            })

        if len(changes) > 10:
            blocks.append({
                "type": "context",
                "elements": [{
                    "type": "mrkdwn",
                    "text": f"_... and {len(changes) - 10} more changes_"
                }]
            })

        blocks.append({
            "type": "context",
            "elements": [{
                "type": "mrkdwn",
                "text": f"Timestamp: {datetime.utcnow().isoformat()}"
            }]
        })

        return {"blocks": blocks}

    def _format_change_for_slack(self, change: Dict) -> str:
        """Format a single change for Slack"""
        emoji_map = {
            'ADDED': '',
            'UPDATED': '🔄',
            'DELETED': '',
            'METADATA_UPDATED': 'ℹ️'
        }

        emoji = emoji_map.get(change['change_type'], '')
        text = (
            f"{emoji} *{change['change_type']}*: "
            f"`{change['namespace']}/{change['name']}`\n"
            f"Vault Path: `{change['vault_path']}`\n"
        )

        if change['change_type'] == 'UPDATED' and 'changed_keys' in change:
            changed_keys = change['changed_keys']
            if changed_keys['added']:
                text += f"Keys Added: {', '.join(changed_keys['added'])}\n"
            if changed_keys['removed']:
                text += f"Keys Removed: {', '.join(changed_keys['removed'])}\n"

        return text

    def _send_webhook_notification(self, changes: List[Dict], cluster_name: str) -> None:
        """Send generic webhook notification"""
        payload = {
            'cluster': cluster_name,
            'timestamp': datetime.utcnow().isoformat(),
            'summary': self._get_change_summary(changes),
            'changes': [
                {
                    'type': c['change_type'],
                    'namespace': c['namespace'],
                    'name': c['name'],
                    'vault_path': c['vault_path'],
                    'timestamp': c['timestamp']
                }
                for c in changes
            ]
        }

        response = requests.post(
            self.config['notification_webhook'],
            json=payload,
            headers={'Content-Type': 'application/json'},
            timeout=10
        )
        response.raise_for_status()

    def send_error_notification(self, error: Exception, cluster_name: str) -> None:
        """Send error notification"""
        error_message = {
            'cluster': cluster_name,
            'timestamp': datetime.utcnow().isoformat(),
            'error': {
                'message': str(error),
                'type': type(error).__name__
            }
        }

        try:
            if self.config.get('slack_webhook'):
                blocks = [
                    {
                        "type": "header",
                        "text": {
                            "type": "plain_text",
                            "text": f"⚠️ Vault Secrets Monitor Error - {cluster_name}",
                            "emoji": True
                        }
                    },
                    {
                        "type": "section",
                        "text": {
                            "type": "mrkdwn",
                            "text": f"*Error:* {str(error)}"
                        }
                    }
                ]

                requests.post(
                    self.config['slack_webhook'],
                    json={"blocks": blocks},
                    timeout=10
                )

            if self.config.get('notification_webhook'):
                requests.post(
                    self.config['notification_webhook'],
                    json=error_message,
                    timeout=10
                )

            self.logger.info("Error notification sent")
        except Exception as e:
            self.logger.error(f"Failed to send error notification: {e}")

    def _get_change_summary(self, changes: List[Dict]) -> Dict:
        """Get summary of changes"""
        return {
            'total': len(changes),
            'added': sum(1 for c in changes if c['change_type'] == 'ADDED'),
            'updated': sum(1 for c in changes if c['change_type'] == 'UPDATED'),
            'deleted': sum(1 for c in changes if c['change_type'] == 'DELETED'),
            'metadata_updated': sum(1 for c in changes if c['change_type'] == 'METADATA_UPDATED')
        }

# Made with Bob
Enter fullscreen mode Exit fullscreen mode

Conclusion

The synergy between HashiCorp Vault’s industry-standard security and the agentic, AI-driven automation of watsonx Orchestrate represents the next step in cloud-native operations. By leveraging the Vault Secrets Operator for reliable secret synchronization and IBM Bob Code Assistant to accelerate the development of intelligent monitoring agents, teams can achieve a “secure-by-design” architecture that is both robust and highly automated. This approach not only reduces the risk of secret sprawl but also frees up developers to focus on building value, knowing their security posture is being actively managed by an intelligent agentic ecosystem.

Thanks for reading and thanks to “Bob” ☺️

Links

Top comments (0)