DEV Community

Cover image for Simple validation webhook with Python
Ashok Nagaraj
Ashok Nagaraj

Posted on

3 1

Simple validation webhook with Python

We will implement a simple validation webhook to achieve the following:

  • Input conditions:
  • Applicable to all namespaces where label:ngaddons/validation-webhooks is enabled
  • Applicable to object-types: Deployments and Pods
  • Not applicable if label:ngaddons/bypass is set on the object

  • Validation logic

  • Fail the object creation unless all of these labels are set: 'ngaddons/ownerId', 'ngaddons/webexRoomId', 'ngaddons/appName'

  • Bypass the checks if ngaddons/bypass is set


Webhook code
import logging
import os
from flask import Flask, jsonify, request

app = Flask('webhook')
app.logger.addHandler(logging.StreamHandler())
app.logger.setLevel(logging.DEBUG)

#Health check
@app.route("/healthz", methods=['GET'])
def ping():
  return jsonify({'message': 'ok'})

REQUIRED_LABELS = ['ngaddons/ownerId', 'ngaddons/webexRoomId', 'ngaddons/appName']

@app.route('/validate', methods=['POST'])
def deployment_webhook():
  r = request.get_json()

  req = r.get('request', {})
  try:
    if not req:
      return send_response(False, '<no uid>', "Invalid request, no payload.request found")

    uid = req.get("uid", '')
    app.logger.debug(f"+ uid: {uid}")
    if not uid:
      return send_response(False, '<no uid>', "Invalid request, no payload.request.uid found")

    labels = req.get("object", {}).get("metadata", {}).get("labels")
  if 'ngaddons/bypass' in labels:
      return send_response(True, uid, "Request bypassed as 'ngaddons/bypass' is set")

    missing = [ l for l in REQUIRED_LABELS if l not in labels ]
    app.logger.debug(f"+ missing: {missing}")
    if missing:
      return send_response(False, uid, f"Missing labels: {missing}")

  except Exception as e:
    return send_response(False, uid, f"Webhook exception: {e}")

  #Send OK
  return send_response(True, uid, "Request has required labels")


#Function to respond back to the Admission Controller
def send_response(allowed, uid, message):
  return jsonify({
      "apiVersion": "admission.k8s.io/v1",
      "kind": "AdmissionReview",
      "response": {
        "allowed": allowed,
        "uid": uid,
        "status": {"message": message}
    }
  })


if __name__ == "__main__":
  ca_crt = '/etc/ssl/ca.crt'
  ca_key = '/etc/ssl/ca.key'
  app.run(ssl_context=(ca_crt, ca_key), port=5000, host='0.0.0.0', debug=True)
Enter fullscreen mode Exit fullscreen mode
Dockerfile
# Image: ashoka007/check-labels:0.1
FROM python:3.8-slim
WORKDIR /app
COPY requirements.txt /app
RUN pip install -r requirements.txt
COPY app.py /app
CMD python app.py
Enter fullscreen mode Exit fullscreen mode
Webhook manifest
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
  name: ngaddons-check-labels
  namespace: check-labels
webhooks:
  - name: ngaddons.check-labels.webhook
    failurePolicy: Fail
    sideEffects: None
    admissionReviewVersions: ["v1","v1beta1"]
    namespaceSelector:
      matchLabels:
        ngaddons/validation-webhooks: enabled
    rules:
      - apiGroups: ["apps", ""]
        resources:
          - "deployments"
          - "pods"
        apiVersions:
          - "*"
        operations:
          - CREATE
    clientConfig:
      service:
        name: ${WEBHOOK_SERVICE_NAME} # to be substituted
        namespace: ${WEBHOOK_NAMESPACE} # to be substituted
        path: /validate/
      caBundle: ${CA_BUNDLE} # to be substituted
Enter fullscreen mode Exit fullscreen mode
Create self-signed key-pair and ${CA_BUNDLE}
# Configuration parameters are the key and DNS match is required
[ req ]
default_bits       = 2048
distinguished_name = req_distinguished_name
req_extensions     = req_ext
prompt             = no
[ req_distinguished_name ]
countryName                 = IN
stateOrProvinceName         = KAR
localityName                = BGL
organizationName            = ACME INC
commonName                  = check-labels 0.1
[ req_ext ]
subjectAltName = @alt_names
[alt_names]
DNS.1   = ${WEBHOOK_SERVICE_NAME}.${WEBHOOK_NAMESPACE}.svc

❯ openssl req -x509 -newkey rsa:4096 -nodes -out certs/ca.crt -keyout certs/ca.key -days 365 -config conf/ext.cnf -extensions req_ext
Enter fullscreen mode Exit fullscreen mode
Create a secret with the above
❯ kubectl create secret tls webhook-secret --cert=certs/ca.crt --key=certs/ca.key --namespace=${WEBHOOK_NAMESPACE}
Enter fullscreen mode Exit fullscreen mode
Create namespace
❯ kubectl create namespace ${WEBHOOK_NAMESPACE}
Enter fullscreen mode Exit fullscreen mode
Kubernetes manifest for webhook deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: webhook-dep
  namespace: ${WEBHOOK_NAMESPACE}
  labels:
    app: webhook
spec:
  replicas: 2
  selector:
    matchLabels:
      app: webhook
  template:
    metadata:
      labels:
        app: webhook
    spec:
      containers:
        - name: webhook-container
          image: ashoka007/check-labels:0.1
          volumeMounts:
            - mountPath: /etc/ssl
              name: webhook-certs
              readOnly: true
      volumes:
      - name: webhook-certs
        secret:
          secretName: webhook-secret
Enter fullscreen mode Exit fullscreen mode
Expose the deployment with service
❯ kubectl expose deployment/webhook-dep --name=${WEBHOOK_SERVICE_NAME} --namespace=${WEBHOOK_NAMESPACE} --port=443 --target-port=5000
Enter fullscreen mode Exit fullscreen mode

Test
❯ kubectl create namespace demo-ns

# normal scenario
❯ kubectl run testpod --image=nginx -n demo-ns
pod/testpod created

# start enforcing validation
❯ kubectl label namespace demo-ns -l ngaddons/validation-webhooks=enabled

# validation fail
❯ kubectl run testpod2 --image=nginx -n demo-ns
Error from server: admission webhook "ngaddons.check-labels.webhook" denied the request: Missing labels: ['ngaddons/ownerCec', 'ngaddons/webexRoomId', 'ngaddons/appName']

# validation pass
❯ kubectl run testpod3 --image=nginx -n demo-ns -l=ngaddons/ownerId=ram -l=ngaddons/webexRoomId=rams-room-id -l=ngaddons/appName=rams-test-app
pod/testpod3 created

# validation bypass
❯ kubectl run testpod4 --image=nginx -n demo-ns -l=ngaddons/bypassed=1
pod/testpod4 created

Enter fullscreen mode Exit fullscreen mode
Delete validation webhook
❯ kubectl delete validatingwebhookconfigurations.admissionregistration.k8s.io check-labels
Enter fullscreen mode Exit fullscreen mode

Source code

Billboard image

Deploy and scale your apps on AWS and GCP with a world class developer experience

Coherence makes it easy to set up and maintain cloud infrastructure. Harness the extensibility, compliance and cost efficiency of the cloud.

Learn more

Top comments (1)

Collapse
 
michael_odell_af5d95ecf70 profile image
Michael Odell

Thanks! This helped me today.

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay