In the previous articles, we created the pod's network in code, in this article we are going to create a ClusterIP service.
To establish a ClusterIP service we needed to create some iptables rules, so first, we create some iptables handlers.
In pkg/net/iptables.go
we create three functions: for creating a new chain, appending a new rule to the chain, and inserting in a specific location a rule.
package net
import (
"fmt"
"github.com/jonatan5524/own-kubernetes/pkg"
)
const (
NAT_TABLE = "nat"
)
func NewIPTablesChain(name string) error {
return pkg.ExecuteCommand(fmt.Sprintf("/usr/sbin/iptables -t %s -N %s", NAT_TABLE, name), true)
}
func AppendNewIPTablesRule(rule string, chain string) error {
return pkg.ExecuteCommand(fmt.Sprintf("/usr/sbin/iptables -t %s -A %s %s", NAT_TABLE, chain, rule), true)
}
func InsertNewIPTablesRule(rule string, chain string, index int) error {
return pkg.ExecuteCommand(fmt.Sprintf("/usr/sbin/iptables -t %s -I %s %d %s", NAT_TABLE, chain, index, rule), true)
}
The code above is fairly simple, we perform the commands we use in the previous article here.
Now we create a new file for service handling: pkg/service/service.go
, in the file we first create some constraints, and a new service struct:
const (
KUBE_SERVICES_CHAIN = "KUBE-SERVICES"
CLUSTER_IP_SERVICE_PREFIX = "KUBE-SVC"
KUBE_SERVICE_MARK = "KUBE-MARK-MASQ"
)
type ServiceType int
const (
CluserIP ServiceType = iota
NodePort
)
type Service struct {
Type ServiceType
Id string
IpAddr string
}
The Service struct has 3 members:
-
Type
- can be ClusterIP or NodePort -
Id
- unique generated ID -
IpAddr
- IP address of the service
Next, we create a new function for initializing the new iptables chains when the node is created:
func InitKubeServicesChain() error {
// iptables -t nat -N KUBE-SERVICES
if err := net.NewIPTablesChain(KUBE_SERVICES_CHAIN); err != nil {
return err
}
// iptables -t nat -A PREROUTING -j KUBE-SERVICES
if err := net.AppendNewIPTablesRule(fmt.Sprintf("-j %s", KUBE_SERVICES_CHAIN), "PREROUTING"); err != nil {
return err
}
// iptables -t nat -A OUTPUT -j KUBE-SERVICES
if err := net.AppendNewIPTablesRule(fmt.Sprintf("-j %s", KUBE_SERVICES_CHAIN), "OUTPUT"); err != nil {
return err
}
if err := initMarkChain(); err != nil {
return err
}
// iptables -t nat -A POSTROUTING -m mark --mark 0x4000/0x4000 -j MASQUERADE
if err := net.AppendNewIPTablesRule("-m mark --mark 0x4000/0x4000 -j MASQUERADE", "POSTROUTING"); err != nil {
return err
}
return nil
}
func initMarkChain() error {
// iptables -t nat -N KUBE-MARK-MASQ
if err := net.NewIPTablesChain(KUBE_SERVICE_MARK); err != nil {
return err
}
// iptables -t nat -A KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000
if err := net.AppendNewIPTablesRule("-j MARK --set-xmark 0x4000/0x4000", KUBE_SERVICE_MARK); err != nil {
return err
}
return nil
}
As you can see above, the function creates a new KUBE-SERVICES
chain and adds it to the PREROUTING
and OUTPUT
chain, adds new KUBE-MARK-MASQ
chai and adds it to the POSTROUTING
chain.
We call this function in pkg/agent/agent.go
:
func setupIPTablesServices() {
if err := service.InitKubeServicesChain(); err != nil {
panic(err)
}
}
func main() {
startContainerd()
setupIPTablesServices()
e := echo.New()
initMiddlewares(e)
initRoutes(e)
e.Logger.Fatal(e.Start(fmt.Sprintf(":%s", api.PORT)))
}
Next, we create a function that creates a new ClusterIP service:
func NewClusterIPService(port string, podsCidr string) (*Service, error) {
const MAX_CHAIN_NAME = 29
serviceName := pkg.GenerateNewID(CLUSTER_IP_SERVICE_PREFIX)[:MAX_CHAIN_NAME-len(CLUSTER_IP_SERVICE_PREFIX)-1]
if err := net.NewIPTablesChain(serviceName); err != nil {
return nil, err
}
ipAddr := "172.17.10.10"
// iptables -t nat -I KUBE-SERVICES 1 ! -s podsCidr -d 1ipAddr -p tcp -m tcp --dport port -j KUBE-MARK-MASQ
if err := net.InsertNewIPTablesRule(
fmt.Sprintf("! -s %s -d %s -p tcp -m tcp --dport %s -j %s", podsCidr, ipAddr, port, KUBE_SERVICE_MARK),
KUBE_SERVICES_CHAIN, 1); err != nil {
return nil, err
}
// iptables -t nat -A KUBE-SERVICES -d clusterIP -p tcp -m tcp --dport port -j ServicerName
if err := net.AppendNewIPTablesRule(
fmt.Sprintf("-d %s -p tcp -m tcp --dport %s -j %s", ipAddr, port, serviceName),
KUBE_SERVICES_CHAIN); err != nil {
return nil, err
}
return &Service{
Type: CluserIP,
Id: serviceName,
IpAddr: ipAddr,
}, nil
}
The function does as follows:
- generate a new service name
- creates a new chain
- currently, there is a hardcoded IP (in the future we will save the IPs in an etcd DB)
- insert new rules for the
KUBE-SERVICES
chain to mask the packets that are directed for the clusterIP and port and direct them to the next chain.
Next, we have a new function that adds a new pod to the service:
func AddRouteToClusterIPService(ip string, port string, service string, index int) error {
podService := fmt.Sprintf(service[:len(service)-3]+"-%d", index)
if err := net.NewIPTablesChain(podService); err != nil {
return err
}
// iptables -t nat -A podService -s podIp -j KUBE-MARK-MASQ
if err := net.AppendNewIPTablesRule(fmt.Sprintf("-s %s -j %s", ip, KUBE_SERVICE_MARK), podService); err != nil {
return err
}
// iptables -t nat -A podService -p tcp -m tcp -j DNAT --to-destination route
if err := net.AppendNewIPTablesRule(fmt.Sprintf("-p tcp -m tcp -j DNAT --to-destination %s", fmt.Sprintf("%s:%s", ip, port)), podService); err != nil {
return err
}
if index == 0 {
// iptables -t nat -A serviceName -j podService
return net.AppendNewIPTablesRule(fmt.Sprintf("-j %s", podService), service)
}
// iptables -t nat -A service -m statistic --mode nth --every index --packet 0 -j podService
return net.InsertNewIPTablesRule(fmt.Sprintf("-m statistic --mode nth --every %d --packet 0 -j %s", index+1, podService), service, 1)
}
This function creates a new chain for the pod route, adds its route and mask, and adds the rule to the service chain with load balance.
To get the running pods' IPs for later, we add a new field for the RunningPod
struct:
type RunningPod struct {
IPAddr string
Pod *Pod
Task *containerd.Task
exitStatusC <-chan containerd.ExitStatus
}
And assign it when the pod is run:
func NewPodAndRun(imageRegistry string, name string) (*RunningPod, error) {
pod, err := NewPod(imageRegistry, name)
if err != nil {
return nil, err
}
log.Printf("pod created: %s\n", pod.Id)
log.Printf("starting pod\n")
runningPod, err := pod.Run()
if err != nil {
return nil, err
}
log.Printf("setting up pod network\n")
podIPAddr, err := connectToNetwork(pod.Id, (*runningPod.Task).Pid())
if err != nil {
return nil, err
}
runningPod.IPAddr = podIPAddr
return runningPod, nil
}
Now, for testing we add in the CreatePod function in pkg/agent/api
a creation of 3 pods and a ClusterIP service:
unc CreatePod(c echo.Context) error {
podDto := new(podDTO)
if err := c.Bind(podDto); err != nil {
return err
}
runningPod, err := pod.NewPodAndRun(podDto.ImageRegistry, podDto.Name)
if err != nil {
return err
}
// ------- test ---
log.Printf("creating cluster ip\n")
clusterIPService, err := service.NewClusterIPService("3001", fmt.Sprintf("%s/24", runningPod.IPAddr))
if err != nil {
return err
}
if err := service.AddRouteToClusterIPService(runningPod.IPAddr, "8080", clusterIPService.Id, 0); err != nil {
return err
}
runningPod2, err := pod.NewPodAndRun(podDto.ImageRegistry, podDto.Name)
if err != nil {
return err
}
if err := service.AddRouteToClusterIPService(runningPod2.IPAddr, "8080", clusterIPService.Id, 1); err != nil {
return err
}
runningPod3, err := pod.NewPodAndRun(podDto.ImageRegistry, podDto.Name)
if err != nil {
return err
}
if err := service.AddRouteToClusterIPService(runningPod3.IPAddr, "8080", clusterIPService.Id, 2); err != nil {
return err
}
// -------------
return c.JSON(http.StatusCreated, podDTO{
ImageRegistry: podDto.ImageRegistry,
Name: runningPod.Pod.Id,
})
}
If we build, run and create a new pod:
# create a new pod (actually creates 3 in the test)
❯ curl -X POST localhost:49256/pods -H 'Content-Type: application/json' -d '{"name": "pod2", "image registry": "docker.io/cloudnativelabs/whats-my-ip:latest"}'
{"image registry":"docker.io/cloudnativelabs/whats-my-ip:latest","name":"pod2-10563f40-61ae-426b-b965-b468aeafe046"}
# now we call inside the node
❯ sudo docker exec -it fe4 /bin/bash
root@fe47726fb371:/agent# curl http://172.17.0.2:3001
HOSTNAME:fe47726fb371 IP:10.0.2.3
root@fe47726fb371:/agent# curl http://172.17.0.2:3001
HOSTNAME:fe47726fb371 IP:10.0.2.2
root@fe47726fb371:/agent# curl http://172.17.0.2:3001
HOSTNAME:fe47726fb371 IP:10.0.2.4
root@fe47726fb371:/agent# curl http://172.17.0.2:3001
HOSTNAME:fe47726fb371 IP:10.0.2.3
root@fe47726fb371:/agent# curl http://172.17.0.2:3001
HOSTNAME:fe47726fb371 IP:10.0.2.2
root@fe47726fb371:/agent# curl http://172.17.0.2:3001
HOSTNAME:fe47726fb371 IP:10.0.2.4
root@fe47726fb371:/agent# curl http://172.17.0.3:3001
And we got a ClusterIP generated in code!
Now the NodePort service is simple, first, let's move the NewClusterIPService
and AddRouteToClusterIPService
functions to a new file pkg/service/clusterIP
for organizing.
Now we add a new file pkg/service/nodePort.go
, and we create there a new function called NewNodePortService
:
func NewNodePortService(port string, podsCidr string, nodeIP string) (*Service, *Service, error) {
clusterIPService, err := NewClusterIPService("3001", podsCidr)
if err != nil {
return nil, nil, err
}
// iptables -t nat -N KUBE-NODEPORTS
if err := net.NewIPTablesChain(NODE_PORT_SERVICE_CHAIN); err != nil {
return nil, nil, err
}
// iptables -t nat -A KUBE-SERVICES -j KUBE-NODEPORTS
if err := net.AppendNewIPTablesRule(fmt.Sprintf("-j %s", NODE_PORT_SERVICE_CHAIN), KUBE_SERVICES_CHAIN); err != nil {
return nil, nil, err
}
// iptables -t nat -I KUBE-NODEPORTS 1 -p tcp -m tcp --dport port -j KUBE-MARK-MASQ
if err := net.InsertNewIPTablesRule(fmt.Sprintf("-p tcp -m tcp --dport %s -j %s", port, KUBE_SERVICE_MARK), NODE_PORT_SERVICE_CHAIN, 1); err != nil {
return nil, nil, err
}
// iptables -t nat -A KUBE-NODEPORTS -p tcp -m tcp --dport port -j clusterIPService
if err := net.AppendNewIPTablesRule(fmt.Sprintf("-p tcp -m tcp --dport %s -j %s", port, clusterIPService.Id), NODE_PORT_SERVICE_CHAIN); err != nil {
return nil, nil, err
}
return clusterIPService, &Service{
Type: NodePort,
Id: NODE_PORT_SERVICE_CHAIN,
IpAddr: nodeIP,
}, nil
}
When a new NodePort service is created, a new ClusterIP service is created as well, we add new iptables rules and chains, and returns from the function the new ClusterIP service and the new NodePort service.
We test the NodePort service in the same place we tested the ClusterIP service, pkg/agent/api.go
:
func CreatePod(c echo.Context) error {
podDto := new(podDTO)
if err := c.Bind(podDto); err != nil {
return err
}
runningPod, err := pod.NewPodAndRun(podDto.ImageRegistry, podDto.Name)
if err != nil {
return err
}
// ------- test ---
log.Printf("creating cluster ip\n")
localIPAddr, err := net.GetLocalIPAddr("eth0")
if err != nil {
return err
}
clusterIPService, _, err := service.NewNodePortService("30001", fmt.Sprintf("%s/24", runningPod.IPAddr), localIPAddr)
if err != nil {
return err
}
if err := service.AddRouteToClusterIPService(runningPod.IPAddr, "8080", clusterIPService.Id, 0); err != nil {
return err
}
runningPod2, err := pod.NewPodAndRun(podDto.ImageRegistry, podDto.Name)
if err != nil {
return err
}
if err := service.AddRouteToClusterIPService(runningPod2.IPAddr, "8080", clusterIPService.Id, 1); err != nil {
return err
}
runningPod3, err := pod.NewPodAndRun(podDto.ImageRegistry, podDto.Name)
if err != nil {
return err
}
if err := service.AddRouteToClusterIPService(runningPod3.IPAddr, "8080", clusterIPService.Id, 2); err != nil {
return err
}
// -------------
return c.JSON(http.StatusCreated, podDTO{
ImageRegistry: podDto.ImageRegistry,
Name: runningPod.Pod.Id,
})
}
And outside of the node:
❯ curl 172.18.0.2:30001
HOSTNAME:15c79bf78092 IP:10.0.2.3
❯ curl 172.18.0.2:30001
HOSTNAME:15c79bf78092 IP:10.0.2.2
❯ curl 172.18.0.2:30001
HOSTNAME:15c79bf78092 IP:10.0.2.4
❯ curl 172.18.0.2:30001
HOSTNAME:15c79bf78092 IP:10.0.2.3
❯ curl 172.18.0.2:30001
HOSTNAME:15c79bf78092 IP:10.0.2.2
And we got a NodePort generated in code! (don't forget to delete the test)
As always, the source code can be found here, and the changes on the ClusterIP can be found in this commit, and for the NodePort can be found in this commit.
Top comments (0)