loading...

Implement server-streaming gRPC in Go

techschoolguru profile image TECH SCHOOL Updated on ・9 min read

The complete gRPC course (15 Part Series)

1) The complete gRPC course [Protobuf + Go + Java] 2) Introduction to gRPC: why, what, how? 3 ... 13 3) HTTP/2 - The secret weapon of gRPC 4) Is gRPC better than REST? Where to use it? 5) Define a protobuf message and generate Go code 6) Protocol buffer deep-dive 7) Config Gradle to generate Java code from Protobuf 8) Generate and serialize protobuf message in Go 9) Implement unary gRPC API in Go 10) Implement server-streaming gRPC in Go 11) Upload file in chunks with client-streaming gRPC - Go 12) Implement bidirectional streaming gRPC - Go 13) gRPC reflection and Evans CLI 14) Udemy coupon 100% for the complete gRPC course 15) Use gRPC interceptor for authorization with JWT

In the previous lecture, we've learned how to implement and test unary gRPC API in Go. Today we will learn how to implement the 2nd type of gRPC, which is server-streaming.

Here's the link to the full gRPC course playlist on Youtube
Gitlab repository: pcbook-go and pcbook-java

First we will define a new server-streaming RPC in the proto file to search for laptops with some specific requirements. Then we will implement the server, the client, and write unit test for it.

Alright let’s start!

Add server-streaming RPC definition to Protobuf

Our RPC will allow us to search for laptops that satisfy some configuration requirements. So I will create a filter_message.proto file.

syntax = "proto3";

package techschool.pcbook;

option go_package = "pb";
option java_package = "com.gitlab.techschool.pcbook.pb";
option java_multiple_files = true;

import "memory_message.proto";

message Filter {
  double max_price_usd = 1;
  uint32 min_cpu_cores = 2;
  double min_cpu_ghz = 3;
  Memory min_ram = 4;
}

This message will define what kind of laptop we’re looking for, such as:

  • The maximum price that we’re willing to pay for the laptop.
  • The minimum number of cores that the laptop CPU should have.
  • The minimum frequency of the CPU.
  • And the minimum size of the RAM.

Then we will define the new server-streaming RPC in the laptop_service.proto file.

We define the SearchLaptopRequest that contains only 1 Filter field, and a SearchLaptopResponse that contains only 1 Laptop field.

message SearchLaptopRequest { 
    Filter filter = 1; 
}

message SearchLaptopResponse { 
    Laptop laptop = 1; 
}

The server-streaming RPC is defined in a similar way to the unary RPC. Start with the rpc keyword, then the RPC name is SearchLaptop.The input is SearchLaptopRequest, and the output is a stream of SearchLaptopResponse.

service LaptopService {
  rpc CreateLaptop(CreateLaptopRequest) returns (CreateLaptopResponse) {};
  rpc SearchLaptop(SearchLaptopRequest) returns (stream SearchLaptopResponse) {};
}

And that’s it! Pretty straight-forward.

Define server streaming RPC in proto file

Let’s generate the code:

make gen

In the laptop_service.pb.go file, some new codes have been generated.

Generated codes

We have the SearchLaptopRequest struct, the SearchLaptopResponse struct.

type SearchLaptopRequest struct {
    Filter               *Filter  `protobuf:"bytes,1,opt,name=filter,proto3" json:"filter,omitempty"`
    XXX_NoUnkeyedLiteral struct{} `json:"-"`
    XXX_unrecognized     []byte   `json:"-"`
    XXX_sizecache        int32    `json:"-"`
}

type SearchLaptopResponse struct {
    Laptop               *Laptop  `protobuf:"bytes,1,opt,name=laptop,proto3" json:"laptop,omitempty"`
    XXX_NoUnkeyedLiteral struct{} `json:"-"`
    XXX_unrecognized     []byte   `json:"-"`
    XXX_sizecache        int32    `json:"-"`
}

Then the LaptopServiceClient interface with a new SearchLaptop() function.

type LaptopServiceClient interface {
    CreateLaptop(ctx context.Context, in *CreateLaptopRequest, opts ...grpc.CallOption) (*CreateLaptopResponse, error)
    SearchLaptop(ctx context.Context, in *SearchLaptopRequest, opts ...grpc.CallOption) (LaptopService_SearchLaptopClient, error)
}

Similarly we also have a new SearchLaptop() function inside the LaptopServiceServer interface.

type LaptopServiceServer interface {
    CreateLaptop(context.Context, *CreateLaptopRequest) (*CreateLaptopResponse, error)
    SearchLaptop(*SearchLaptopRequest, LaptopService_SearchLaptopServer) error
}

Add search function to the in-memory store

Before implementing the server, let’s add a new Search() function to the LaptopStore interface.

It takes a filter as input, and also a callback function to report whenever a laptop is found.

type LaptopStore interface {
    Save(laptop *pb.Laptop) error
    Find(id string) (*pb.Laptop, error)
    Search(ctx context.Context, filter *pb.Filter, found func(laptop *pb.Laptop) error) error
}

The context is used to control the deadline/timeout of the request. We will see how it work in a moment.

Now we should implement this Search() function for the InMemoryLaptopStore.

Since we’re reading data, we have to acquire a read lock, and unlock it afterward. We iterate through all laptops in the store, and check which one is qualified to the filter.

// Search searches for laptops with filter, returns one by one via the found function
func (store *InMemoryLaptopStore) Search(
    ctx context.Context,
    filter *pb.Filter,
    found func(laptop *pb.Laptop) error,
) error {
    store.mutex.RLock()
    defer store.mutex.RUnlock()

    for _, laptop := range store.data {
        if ctx.Err() == context.Canceled || ctx.Err() == context.DeadlineExceeded {
            log.Print("context is cancelled")
            return nil
        }

        if isQualified(filter, laptop) {
            other, err := deepCopy(laptop)
            if err != nil {
                return err
            }

            err = found(other)
            if err != nil {
                return err
            }
        }
    }

    return nil
}

In the for loop, before checking if a laptop is qualified or not, we check if the context error is Cancelled or DeadlineExceeded or not. If it is, we should return immediately because the request is either already timed out or cancelled by client, so it's just a waste of time to continue searching.

When the laptop is qualified, we have to deep-copy it before sending it to the caller via the callback function found().

The isQualified() function takes a filter and a laptop as input, and returns true if the laptop satisfies the filter.

func isQualified(filter *pb.Filter, laptop *pb.Laptop) bool {
    if laptop.GetPriceUsd() > filter.GetMaxPriceUsd() {
        return false
    }

    if laptop.GetCpu().GetNumberCores() < filter.GetMinCpuCores() {
        return false
    }

    if laptop.GetCpu().GetMinGhz() < filter.GetMinCpuGhz() {
        return false
    }

    if toBit(laptop.GetRam()) < toBit(filter.GetMinRam()) {
        return false
    }

    return true
}

Since there are different types of memory units, to compare the RAM, we have to write a function to convert its value to the smallest unit: BIT.

func toBit(memory *pb.Memory) uint64 {
    value := memory.GetValue()

    switch memory.GetUnit() {
    case pb.Memory_BIT:
        return value
    case pb.Memory_BYTE:
        return value << 3 // 8 = 2^3
    case pb.Memory_KILOBYTE:
        return value << 13 // 1024 * 8 = 2^10 * 2^3 = 2^13
    case pb.Memory_MEGABYTE:
        return value << 23
    case pb.Memory_GIGABYTE:
        return value << 33
    case pb.Memory_TERABYTE:
        return value << 43
    default:
        return 0
    }
}

First we get the memory value. Then we do a switch-case on the memory unit:

  • If it is BIT, we simply return the value.
  • If it is BYTE, we have to multiply the value by 8 because 1 BYTE = 8 BITs. And because 8 = 2^3, we can use a bit-operator shift-left 3 here.
  • If it is KILOBYTE, we have to multiply the value by 1024 * 8 because 1 KILOBYTE = 1024 BYTEs. And because 1024 * 8 = 2^13, we can use a simple shift-left 13 here.
  • Similarly, if it is MEGABYTE, we return value shift-left 23.
  • For GIGABYTE, value shift-left 33
  • And finally for TERABYTE, value shift-left 43.
  • For the default case, just return 0.

OK, the store is done. Now let’s implement the server!

Implement the server

We will have to implement the SearchLaptop() function of the LaptopServiceServer interface. It has 2 arguments: the input request and the output stream response.

// SearchLaptop is a server-streaming RPC to search for laptops
func (server *LaptopServer) SearchLaptop(
    req *pb.SearchLaptopRequest,
    stream pb.LaptopService_SearchLaptopServer,
) error {
    filter := req.GetFilter()
    log.Printf("receive a search-laptop request with filter: %v", filter)

    err := server.laptopStore.Search(
        stream.Context(),
        filter,
        func(laptop *pb.Laptop) error {
            res := &pb.SearchLaptopResponse{Laptop: laptop}
            err := stream.Send(res)
            if err != nil {
                return err
            }

            log.Printf("sent laptop with id: %s", laptop.GetId())
            return nil
        },
    )

    if err != nil {
        return status.Errorf(codes.Internal, "unexpected error: %v", err)
    }

    return nil
}

The first thing we do is to get the filter from the request. Then we call server.Store.Search(), pass in the stream context, the filter, and the callback function.

If an error occurs, we return it with the Internal status code, else we return nil.

In the callback function, when we found a laptop, we create a new response object with that laptop and send it to the client by calling stream.Send().

If an error occurs, just return it. Otherwise, we write a simple log saying we have sent the laptop with this ID then return nil.

And we’re done with the server. Now let’s implement the client!

Implement the client

First I will split the codes to create a random laptop that we've written in the previous lecture to a separate function:

func createLaptop(laptopClient pb.LaptopServiceClient) {
    laptop := sample.NewLaptop()
    laptop.Id = ""
    req := &pb.CreateLaptopRequest{
        Laptop: laptop,
    }

    // set timeout
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    res, err := laptopClient.CreateLaptop(ctx, req)
    if err != nil {
        st, ok := status.FromError(err)
        if ok && st.Code() == codes.AlreadyExists {
            // not a big deal
            log.Print("laptop already exists")
        } else {
            log.Fatal("cannot create laptop: ", err)
        }
        return
    }

    log.Printf("created laptop with id: %s", res.Id)
}

Then in the main function, we will use a for loop to create 10 random laptops.

func main() {
    serverAddress := flag.String("address", "", "the server address")
    flag.Parse()
    log.Printf("dial server %s", *serverAddress)

    conn, err := grpc.Dial(*serverAddress, grpc.WithInsecure())
    if err != nil {
        log.Fatal("cannot dial server: ", err)
    }

    laptopClient := pb.NewLaptopServiceClient(conn)
    for i := 0; i < 10; i++ {
        createLaptop(laptopClient)
    }

    filter := &pb.Filter{
        MaxPriceUsd: 3000,
        MinCpuCores: 4,
        MinCpuGhz:   2.5,
        MinRam:      &pb.Memory{Value: 8, Unit: pb.Memory_GIGABYTE},
    }

    searchLaptop(laptopClient, filter)
}

Then we create a new search filter. Suppose I want to search for laptops with:

  • Maximum price of 3000
  • At least 4 CPU cores
  • Minimum frequency of 2.5 Ghz
  • And at least 8 gigabytes of RAM

After that, we call searchLaptop() with the this filter. Let’s implement this function!

func searchLaptop(laptopClient pb.LaptopServiceClient, filter *pb.Filter) {
    log.Print("search filter: ", filter)

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    req := &pb.SearchLaptopRequest{Filter: filter}
    stream, err := laptopClient.SearchLaptop(ctx, req)
    if err != nil {
        log.Fatal("cannot search laptop: ", err)
    }

    for {
        res, err := stream.Recv()
        if err == io.EOF {
            return
        }
        if err != nil {
            log.Fatal("cannot receive response: ", err)
        }

        laptop := res.GetLaptop()
        log.Print("- found: ", laptop.GetId())
        log.Print("  + brand: ", laptop.GetBrand())
        log.Print("  + name: ", laptop.GetName())
        log.Print("  + cpu cores: ", laptop.GetCpu().GetNumberCores())
        log.Print("  + cpu min ghz: ", laptop.GetCpu().GetMinGhz())
        log.Print("  + ram: ", laptop.GetRam())
        log.Print("  + price: ", laptop.GetPriceUsd())
    }
}

We first create a context with timeout of 5 seconds. We make a SearchLaptopRequest object with the input filter. Then we call laptopClient.SearchLaptop() to get the stream.

If there’s an error, write a fatal log. Or else, we use a for loop to receive multiple responses from the stream.

If the stream.Recv() function call returns an end-of-file (EOF) error, this means it’s the end of the stream, so we just return. Otherwise, if error is not nil, we write a fatal log.

If everything goes well, we can get the laptop from the stream. Here I print out only a few properties of the laptop so that it’s easier to read.

Write unit test

Now I will show you how to write unit tests for this server-streaming RPC.

First I will create a search filter and an in-memory laptop store to insert some laptops for searching.

func TestClientSearchLaptop(t *testing.T) {
    t.Parallel()

    filter := &pb.Filter{
        MaxPriceUsd: 2000,
        MinCpuCores: 4,
        MinCpuGhz:   2.2,
        MinRam:      &pb.Memory{Value: 8, Unit: pb.Memory_GIGABYTE},
    }

    store := service.NewInMemoryLaptopStore()
        ...
}

Then I make an expectedIDs map that will contain all laptop IDs that we expect to be found by the server.

We use a for loop to create 6 laptops:

  • Case 0: unmatched laptop with a too high price.
  • Case 1: unmatched because it has only 2 cores.
  • Case 2: doesn’t match because the min frequency is too low.
  • Case 3: doesn’t match since it has only 4 GB of RAM.
  • Case 4 + 5: matched.
func TestClientSearchLaptop(t *testing.T) {
  ...

  expectedIDs := make(map[string]bool)

    for i := 0; i < 6; i++ {
        laptop := sample.NewLaptop()

        switch i {
        case 0:
            laptop.PriceUsd = 2500
        case 1:
            laptop.Cpu.NumberCores = 2
        case 2:
            laptop.Cpu.MinGhz = 2.0
        case 3:
            laptop.Ram = &pb.Memory{Value: 4096, Unit: pb.Memory_MEGABYTE}
        case 4:
            laptop.PriceUsd = 1999
            laptop.Cpu.NumberCores = 4
            laptop.Cpu.MinGhz = 2.5
            laptop.Cpu.MaxGhz = laptop.Cpu.MinGhz + 2.0
            laptop.Ram = &pb.Memory{Value: 16, Unit: pb.Memory_GIGABYTE}
            expectedIDs[laptop.Id] = true
        case 5:
            laptop.PriceUsd = 2000
            laptop.Cpu.NumberCores = 6
            laptop.Cpu.MinGhz = 2.8
            laptop.Cpu.MaxGhz = laptop.Cpu.MinGhz + 2.0
            laptop.Ram = &pb.Memory{Value: 64, Unit: pb.Memory_GIGABYTE}
            expectedIDs[laptop.Id] = true
        }

        err := store.Save(laptop)
        require.NoError(t, err)
  }

  ...
}

Then we call store.Save() to save the laptop to the store, and require that there’s no error returned.

Next we have to add this store to the test laptop server. So I will add one more store parameter to the startTestLaptopServer function that we've written in the previous lecture:

func startTestLaptopServer(t *testing.T, store service.LaptopStore) (*service.LaptopServer, string) {
    laptopServer := service.NewLaptopServer(store)

    grpcServer := grpc.NewServer()
    pb.RegisterLaptopServiceServer(grpcServer, laptopServer)

    listener, err := net.Listen("tcp", ":0") // random available port
    require.NoError(t, err)

    go grpcServer.Serve(listener)

    return laptopServer, listener.Addr().String()
}

func newTestLaptopClient(t *testing.T, serverAddress string) pb.LaptopServiceClient {
    conn, err := grpc.Dial(serverAddress, grpc.WithInsecure())
    require.NoError(t, err)
    return pb.NewLaptopServiceClient(conn)
}

Then call this function to start the test server, and create a laptop client object with that server address:

func TestClientSearchLaptop(t *testing.T) {
  ...

    _, serverAddress := startTestLaptopServer(t, store)
    laptopClient := newTestLaptopClient(t, serverAddress)

    req := &pb.SearchLaptopRequest{Filter: filter}
    stream, err := laptopClient.SearchLaptop(context.Background(), req)
    require.NoError(t, err)

  ...
}

After that, we create a new SearchLaptopRequest with the filter. Then we call laptopCient.SearchLaptop() with the created request to get back the stream. There should be no errors returned.

Next, I will use the found variable to keep track of the number of laptops found. Then use a for loop to receive multiple responses from the stream.

func TestClientSearchLaptop(t *testing.T) {
  ...

    found := 0
    for {
        res, err := stream.Recv()
        if err == io.EOF {
            break
        }

        require.NoError(t, err)
        require.Contains(t, expectedIDs, res.GetLaptop().GetId())

        found += 1
    }

    require.Equal(t, len(expectedIDs), found)
}

If we got an end-of-file error, then break. Else we check that there’s no error, and the laptop ID should be in the expectedIDs map.

Then we increase the number of laptops found. Finally we require that number to equal to the size of the expectedIDs.

OK now let’s run this unit test.

Run unit test

It passed.

And that’s all for today’s lecture. We have learned how to implement and test a server-streaming RPC in Go.

Thanks for reading and I will see you in the next article!

The complete gRPC course (15 Part Series)

1) The complete gRPC course [Protobuf + Go + Java] 2) Introduction to gRPC: why, what, how? 3 ... 13 3) HTTP/2 - The secret weapon of gRPC 4) Is gRPC better than REST? Where to use it? 5) Define a protobuf message and generate Go code 6) Protocol buffer deep-dive 7) Config Gradle to generate Java code from Protobuf 8) Generate and serialize protobuf message in Go 9) Implement unary gRPC API in Go 10) Implement server-streaming gRPC in Go 11) Upload file in chunks with client-streaming gRPC - Go 12) Implement bidirectional streaming gRPC - Go 13) gRPC reflection and Evans CLI 14) Udemy coupon 100% for the complete gRPC course 15) Use gRPC interceptor for authorization with JWT

Posted on May 22 by:

techschoolguru profile

TECH SCHOOL

@techschoolguru

We believe that everyone deserves a good and free education. The purpose of Tech School is to give everyone a chance to learn IT by giving free, high-quality tutorials and coding courses.

Discussion

markdown guide