Hi, I am Rajat, working in Data Science and DevOps.
Being a data scientist, I have a good command over Python. Generally, you will find that a new deployment and upgrade is not an easy job. It includes writing config files and creating complex pipelines.
But wait... then I remembered I know Python. Why am I not using my Python skills to automate things?
So, I started writing some scripts to automate the jobs. First, I wrote some scripts that would help me in my deployments.
When we talk about a Kubernetes deployment, what are the first things we need to do? It can be a plain deployment using YAML files or using the Helm chart. Here's the list:
- Create a namespace
- Create image-pull secrets
- Create a YAML file (Deployment, Service, Ingress, HPA, ConfigMap, Secret) or the Helm file if you are using Helm chart
- Finally, deploy and validate
Now, let's start doing this one by one.
import argparse
import yaml
import os
from kubernetes import client, config, watch
from kubernetes.client.rest import ApiException
import base64
from traceback import print_exc
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class KubernetesManager:
def __init__(self):
logging.info("Initializing Kubernetes manager...")
config.load_kube_config()
self.core_api = client.CoreV1Api()
self.apps_api = client.AppsV1Api()
self.namespaces = []
self.list_namespaces()
def create_namespace(self, namespace_name):
try:
logging.info(f"Creating namespace: {namespace_name}")
if namespace_name in self.namespaces:
logging.info(f"Namespace '{namespace_name}' already exists.")
return
namespace = client.V1Namespace(metadata=client.V1ObjectMeta(name=namespace_name))
self.core_api.create_namespace(namespace)
logging.info(f"Namespace '{namespace_name}' created successfully.")
self.create_image_pull_secret(namespace_name)
except ApiException as e:
logging.error(f"Error creating namespace: {e}")
def create_image_pull_secret(self, namespace_name, secret_name, server, username, password):
try:
logging.info(f"Creating image pull secret in namespace: {namespace_name}")
docker_config_json = {
"auths": {
server: {
"username": username,
"password": password,
"auth": base64.b64encode(f"{username}:{password}".encode()).decode()
}
}
}
secret = client.V1Secret(
metadata=client.V1ObjectMeta(name=secret_name),
type="kubernetes.io/dockerconfigjson",
data={".dockerconfigjson": base64.b64encode(json.dumps(docker_config_json).encode()).decode()}
)
self.core_api.create_namespaced_secret(namespace_name, secret)
logging.info(f"ImagePull secret '{secret_name}' created in namespace '{namespace_name}'.")
except ApiException as e:
logging.error(f"Error creating secret: {e}")
def list_namespaces(self):
try:
namespaces = self.core_api.list_namespace().items
self.namespaces = [
ns.metadata.name
for ns in namespaces
if ns.metadata.name not in ["default", "kube-public", "kube-system", "kube-node-lease", "kube-proxy", "kube-service-catalog", "prometheus", "kubernetes-dashboard"]
]
except ApiException as e:
logging.error(f"Error listing namespaces: {e}")
def check_rollout_status(self, deployment_name, namespace, timeout_seconds=240):
try:
logging.info(f"Checking rollout status for deployment '{deployment_name}' in namespace '{namespace}'...")
start_time = time.time()
w = watch.Watch()
for event in w.stream(
self.apps_api.list_namespaced_deployment,
namespace=namespace,
field_selector=f"metadata.name={deployment_name}",
timeout_seconds=timeout_seconds,
):
deployment = event['object']
status = deployment.status
if (
status.updated_replicas == deployment.spec.replicas
and status.replicas == deployment.spec.replicas
and status.available_replicas == deployment.spec.replicas
and status.observed_generation >= deployment.metadata.generation
):
logging.info(f"Deployment '{deployment_name}' in namespace '{namespace}' has been successfully rolled out.")
w.stop()
return True
if time.time() - start_time > timeout_seconds:
logging.error(f"Timeout: Deployment '{deployment_name}' in namespace '{namespace}' did not roll out within {timeout_seconds} seconds.")
w.stop()
return False
elapsed_time = time.time() - start_time
logging.info(f"Waiting for deployment '{deployment_name}' to roll out... Time elapsed: {elapsed_time:.2f} seconds")
logging.info(f"Updated Replicas: {status.updated_replicas}, Available Replicas: {status.available_replicas}")
except ApiException as e:
logging.error(f"Error checking rollout status: {e}")
return False
Let me explain the code above:
- We have a class
KubernetesManager
which has methods to create a namespace and image pull secret. - The
create_namespace
method creates a namespace if it does not already exist. - The
create_image_pull_secret
method creates an image pull secret in the specified namespace. - The
list_namespaces
method lists all the namespaces in the cluster. - The
check_rollout_status
method monitors the rollout status of a deployment in a namespace.
Suppose you are using the YAML files for deployment or even Helm chart, you can write other functions to do the deployment. Add the below code to the above code.
import subprocess
import argparse
def deploy_using_yaml(yaml_file, namespace):
logging.info(f"Deploying using yaml file: {yaml_file}")
command = [
"kubectl",
"apply",
"-f",
yaml_file,
"--namespace",
namespace
]
try:
subprocess.run(command, check=True)
logging.info(f"Deployment using yaml file '{yaml_file}' completed successfully.")
except subprocess.CalledProcessError as e:
raise Exception(f"Error deploying using yaml file: {e}")
def final_deploy(yaml_file_path, namespace):
k8s_manager.create_namespace(namespace)
logging.info(f"Namespace '{namespace}' created successfully.")
k8s_manager.create_image_pull_secret(namespace, "my-secret", "docker.io", "username", "password")
logging.info(f"ImagePull secret created successfully.")
deploy_using_yaml(yaml_file_path, namespace)
logging.info(f"Deployment using yaml file '{yaml_file_path}' completed successfully.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Deploy an application to Kubernetes.")
parser.add_argument("--task", required=True, choices=["deploy", "rollback"], help="The task to perform.")
parser.add_argument("--yaml-file", required=True, help="The path to the yaml file.")
parser.add_argument("--namespace", required=True, help="The namespace to deploy the application.")
args = parser.parse_args()
if args.task == "deploy":
final_deploy(args.yaml_file, args.namespace)
elif args.task == "rollback":
rollback(args.namespace)
If you want to deploy, you can run the following command:
python3 deploy.py --task deploy --yaml-file deployment.yaml --namespace my-namespace
I hope this helps you in automating your Kubernetes deployments using Python. If you have any questions, feel free to ask.
Top comments (0)