DEV Community

Cover image for Learning Microservices with Go(Part 2). Service Discovery
Manav Kushwaha
Manav Kushwaha

Posted on

Learning Microservices with Go(Part 2). Service Discovery

This post is the second part of the "Learning Microservices with Go". I'm writing these posts as I'm learning the concepts. If you haven't checked out the part 1, here's the link for it: Part 1.
Link for Git Repo: Github

In part 1, we created the directory structure for our movie app. We created 3 services namely metadata, rating, movie services.

The problem that we're facing was that these services were running, interacting with each other using hard-coded addresses and ports. This works fine only till we run all the services on the same machine. We also need to be able to create multiple instances of any service which is currently not possible(This is important if we want to build a real application which can scale. It'll give us high availability and more fault tolerance).

To tackle this, we will use a service discovery. You may ask what's a service discovery. Service discovery addresses multiple problems:

1. How to find instance(s) of a particular service.
2. How to dynamically add or remove an instance of any service.
3. How to handle unresponsive instances of any service.

In simplicity, service discovery can be viewed as a registry, storing the information about the available services. It has the following features:

1. Adding(Registering) an instance of a service.
2. Removing(Deregistering) an instance of a service.
3. Get a list of all instances of a service. (list of network addresses of the instances)

Service Discovery

Service Discovery Models

The service discovery can be implemented in two ways.

  • Client-side service discovery: The application directly accesses the registry directly using a registry client.

Client-side service discovery

  • Server-side service discovery: The application access the registry via a load balancer, which forwards the request to the available instances.

Server-side service discovery

Some differences:

  1. In case of the client side service discovery, the logic for calling the instance needs to be with the client and be implemented for every application. If this logic is not correctly configured, the application may end up calling only 1 instance of the service, leading to overload of the instance and under-utilization of other instances.

  2. In case of the server-side service discovery, the logic for calling the instance is with the load balancer. The load balancer gets the list of instances from the service discovery and decides which instance should the request be directed to and forwards the request to the chosen instance. This decouples the logic for service directory interaction from the application.

In this series, we'll be covering the client-side discovery models only as they're simpler to implement and don't require setting up of load balancers.

Tools available for service discovery

  1. Hashicorp Consul: This has been a very simple solution for service discovery for some years. It has a simple API, which includes:

    • PUT /catalog/register: Registers a service instance.
    • PUT /catalog/deregister: Deregisters a service instance.
    • GET /catalog/services: Gets the instances of the service that are available.
  2. Kubernetes: Kubernetes is a very widely used container orchestration tool. It's used to manage, deploy, scale containers(which often contain microservices). It has a feature to register and find services running on it. It allows for a client-side discovery model as well as pluging a load balancer for server-side discovery model.

We'll use the hashicorp consul for this tutorial. We'll learn to use Kubernetes in future parts of the series.

Coding the service discovery

We'll add the service directory to the pkg folder as it'd be accessed by all the services to discover other services.

Create a file pkg/discovery/discovery.go. Add the following code to that.

package discovery

import (
    "context"
    "errors"
    "fmt"
    "math/rand"
    "time"
)

type Registery interface {
    // Register a service with the registry.
    Register(ctx context.Context, instanceID string, serviceName string, hostPort string) error
    // Deregister a service with the registry.
    Deregister(ctx context.Context, instanceID string, serviceName string) error
    // Discover a service with the registry.
    Discover(ctx context.Context, serviceName string) ([]string, error)
    // HealthCheck a service with the registry.
    HealthCheck(instanceID string, serviceName string) error
}

// ErrNotFound is returned when no service addresses are found.
var ErrNotFound = errors.New("no service addresses found")

// GenerateInstanceID generates a psuedo-random service instance identifier, using a service name. Suffixed by dash and number
func GenerateInstanceID(serviceName string) string {
    return fmt.Sprintf("%s-%d", serviceName, rand.New(rand.NewSource(time.Now().UnixNano())).Int())
}
Enter fullscreen mode Exit fullscreen mode
  • What we added was an interface of a service discovery. We added that so as if in future if we switch our service discovery tool(eg: from Consul to some other tool), then we won't be making changes all over the code. The newer discovery could use the same interface.
  • Also similar to the way we have a memory repository in the last part, we can have a memory discovery as well. It would provide us ability to test our discovery.

In memory service discovery

Create the file pkg/discovery/memory/memory.go and add the following code.

package memory

import (
    "context"
    "errors"
    "sync"
    "time"

    "movieexample.com/pkg/discovery"
)

type serviceNameType string
type instanceIDType string

type Registery struct {
    sync.RWMutex
    serviceAddrs map[serviceNameType]map[instanceIDType]*serviceInstance
}

type serviceInstance struct {
    hostPort   string
    lastActive time.Time
}

// NewRegistry creates a new in-memory registry instance.
func NewRegistry() *Registery {
    return &Registery{
        serviceAddrs: make(map[serviceNameType]map[instanceIDType]*serviceInstance),
    }
}

// Register creates a service record in the registry
func (r *Registery) Register(ctx context.Context, instanceID string, serviceName string, hostPort string) error {
    r.Lock()
    defer r.Unlock()

    if _, ok := r.serviceAddrs[serviceNameType(serviceName)]; !ok {
        r.serviceAddrs[serviceNameType(serviceName)] = map[instanceIDType]*serviceInstance{}
    }
    r.serviceAddrs[serviceNameType(serviceName)][instanceIDType(instanceID)] = &serviceInstance{
        hostPort:   hostPort,
        lastActive: time.Now(),
    }
    return nil
}

// Deregister removes a service record from the registry
func (r *Registery) Deregister(ctx context.Context, instanceID string, serviceName string) error {
    r.Lock()
    defer r.Unlock()

    if _, ok := r.serviceAddrs[serviceNameType(serviceName)]; !ok {
        return discovery.ErrNotFound
    }

    delete(r.serviceAddrs[serviceNameType(serviceName)], instanceIDType(instanceID))
    return nil
}

// HealthCheck marks a service instance as active
func (r *Registery) HealthCheck(instanceID string, serviceName string) error {
    r.Lock()
    defer r.Unlock()

    if _, ok := r.serviceAddrs[serviceNameType(serviceName)]; !ok {
        return errors.New("service not registered yet")
    }

    if _, ok := r.serviceAddrs[serviceNameType(serviceName)][instanceIDType(instanceID)]; !ok {
        return errors.New("service instance not registered yet")
    }

    r.serviceAddrs[serviceNameType(serviceName)][instanceIDType(instanceID)].lastActive = time.Now()
    return nil
}

// Discover returns a list of service instances from the registry
func (r *Registery) Discover(ctx context.Context, serviceName string) ([]string, error) {
    r.RLock()
    defer r.RUnlock()

    if len(r.serviceAddrs[serviceNameType(serviceName)]) == 0 {
        return nil, discovery.ErrNotFound
    }
    var res []string

    for _, v := range r.serviceAddrs[serviceNameType(serviceName)] {
        if time.Since(v.lastActive) > 5*time.Second {
            continue
        }

        res = append(res, v.hostPort)
    }
    return res, nil
}

Enter fullscreen mode Exit fullscreen mode

We can see that the in-memory service discovery implements all the methods of the Registry interface we had defined earlier. We used a map of map to store the service addresses. sync.RWMutex is used to allow concurrent read, writes to the map.
Now we can use this implementation wherever we want.

Consul based service discovery

  • Create a file pkg/discovery/consul/consul.go
  • Add the following code to it.
package consul

import (
    "context"
    "errors"
    "fmt"
    "strconv"
    "strings"

    consul "github.com/hashicorp/consul/api"
)

// Registery defines a consul based service registry
type Registery struct {
    client *consul.Client
}

// NewRegistery creates a new consul registry instance.
// Addr is the address where the consul agent is running
func NewRegistery(addr string) (*Registery, error) {
    config := consul.DefaultConfig()
    config.Address = addr
    client, err := consul.NewClient(config)
    if err != nil {
        return nil, err
    }
    return &Registery{client: client}, nil
}

// Register creates a service record in the registry
func (r *Registery) Register(ctx context.Context, instanceID string, serviceName string, hostPort string) error {
    parts := strings.Split(hostPort, ":")
    if len(parts) != 2 {
        return errors.New("invalid host:port format. Eg: localhost:8081")
    }
    port, err := strconv.Atoi(parts[1])
    if err != nil {
        return err
    }
    host := parts[0]

    err = r.client.Agent().ServiceRegister(&consul.AgentServiceRegistration{
        Address: host,
        Port:    port,
        ID:      instanceID,
        Name:    serviceName,
        Check: &consul.AgentServiceCheck{
            CheckID: instanceID,
            TTL:     "5s",
        },
    })
    return err
}

// Deregister removes a service record from the registry
func (r *Registery) Deregister(ctx context.Context, instanceID string, _ string) error {
    err := r.client.Agent().ServiceDeregister(instanceID)
    return err
}

// HealthCheck is a push mechanism to update the health status of a service instance
func (r *Registery) HealthCheck(instanceID string, _ string) error {
    err := r.client.Agent().UpdateTTL(instanceID, "", "pass")
    return err
}

// Discover returns a list of addresses of active instances of the given service
func (r *Registery) Discover(ctx context.Context, serviceName string) ([]string, error) {
    entries, _, err := r.client.Health().Service(serviceName, "", true, nil)
    if err != nil {
        return nil, err
    }
    var instances []string
    for _, entry := range entries {
        instances = append(instances, fmt.Sprintf("%s:%d", entry.Service.Address, entry.Service.Port))
    }
    return instances, nil
}
Enter fullscreen mode Exit fullscreen mode

As we're using consul which is an external library, do remember to do go mod tidy in the src directory.

Modifying files to use service discovery

Change the two gateways that we'd defined earlier for metadata, rating services

type Gateway struct {
    registry discovery.Registry
}
Enter fullscreen mode Exit fullscreen mode

Changing the New function

func New(registry discovery.Registry) *Gateway {
    return &Gateway{registry}
}
Enter fullscreen mode Exit fullscreen mode

Now the gateways require registry when being created. Change the Get function of the rating gateway as the following:

    addrs, err := g.registery.Discover(ctx, "rating")
    if err != nil {
        return 0, err
    }
    if len(addrs) == 0 {
        return 0, fmt.Errorf("no rating service instances available")
    }

    url := "http://" + addrs[rand.Intn(len(addrs))] + "/rating"
    log.Printf("Calling rating service. Request: GET %s\n", url)

    req, err := http.NewRequest(http.MethodGet, url, nil)
Enter fullscreen mode Exit fullscreen mode

Earlier we're calling a pre-configured address. Now we'll get a list of addresses from the service discovery. We choose a random instance from the list.

Update the metadata service in the same way.

Now, we'll update the main functions of our services so that they can register and deregister to the service discovery.

Update the rating service main function as:

func main() {
    var port int
    flag.IntVar(&port, "port", 8082, "API Handler port")
    flag.Parse()
    fmt.Printf("Starting the movie rating service on port %d", port)

    registery, err := consul.NewRegistery("localhost:8500")
    if err != nil {
        panic(err)
    }

    ctx := context.Background()
    instanceID := discovery.GenerateInstanceID(serviceName)

    if err := registery.Register(ctx, instanceID, serviceName, fmt.Sprintf("localhost:%d", port)); err != nil {
        panic(err)
    }

    go func() {
        for {
            if err := registery.HealthCheck(instanceID, serviceName); err != nil {
                log.Println("Failed to report healthy state: ", err.Error())
            }
            time.Sleep(1 * time.Second)
        }
    }()
    defer registery.Deregister(ctx, instanceID, serviceName)

    repo := memory.New()
    ctrl := rating.New(repo)
    h := httpHandler.New(ctrl)

    http.HandleFunc("/rating", http.HandlerFunc(h.Handle))
    if err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil); err != nil {
        panic(err)
    }
}
Enter fullscreen mode Exit fullscreen mode

Do the same for the metadata service.

The final thing that we need to do is to update the main function of the movie service.

func main() {
    var port int
    flag.IntVar(&port, "port", 8083, "Port to listen on")
    flag.Parse()

    // Register with consul
    registery, err := consul.NewRegistery("localhost:8500")
    if err != nil {
        panic(err)
    }

    ctx := context.Background()
    instanceID := discovery.GenerateInstanceID(serviceName)

    if err := registery.Register(ctx, instanceID, serviceName, fmt.Sprintf("localhost:%d", port)); err != nil {
        panic(err)
    }

    go func() {
        for {
            if err := registery.HealthCheck(instanceID, serviceName); err != nil {
                log.Println("Failed to report healthy state: ", err.Error())
            }
            time.Sleep(1 * time.Second)
        }
    }()
    defer registery.Deregister(ctx, instanceID, serviceName)

    log.Printf("Starting the movie service at port: %d\n", port)
    metadataGateway := metadataGateway.New(registery)
    ratingGateway := ratingGateway.New(registery)

    ctrl := movie.New(ratingGateway, metadataGateway)
    h := httpHandler.New(ctrl)
    http.Handle("/movie", http.HandlerFunc(h.GetMovieDetails))
    if err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil); err != nil {
        panic(err)
    }
}
Enter fullscreen mode Exit fullscreen mode

We've completed the code for implementing service discovery.

Testing

  1. Now we'll need to run the Consul app to allow our services to use it. We can do this by using Docker. Run the following command. This will spin up consul in a docker container.
 docker run \                                                                                                                                                                 
    -d \
    -p 8500:8500 \
    -p 8600:8600/udp \
    --name=dev-consul \
    hashicorp/consul agent -server -ui \
-node=server-1 -bootstrap-expect=1 -client=0.0.0.0
Enter fullscreen mode Exit fullscreen mode
  1. We'll now run each microservice inside each cmd directory.
    go run *.go

  2. Now going to the consul web ui running at localhost:8500, you'll see list of services that are active.

  3. Try running multiple instances of any of the services by:
    go run *.go --port <Port>
    Remember to use ports that are not already being used.

  4. Test the API by sending the request to the movie service.
    curl -v localhost:8083/movie?id=1

You'll find the following in the shell running movie service.

2024/01/28 10:19:55 Calling metadata service. Request: GET http://localhost:8081/metadata
Enter fullscreen mode Exit fullscreen mode

This means that the movie service was able to interact with the service discovery and then direct the request to the correct address for the service.

Tadaaa. We've completed the implementation of service discovery.

In the next part we'll learn serialization(used to encode the data transferred between the services).

Do like the post if you found it helpful and learned something.
Checkout my twitter at: manavkush
Link for Git Repo: Github
Link for Reference Book: Book

See you next post.

Top comments (0)