DEV Community

loading...
Cover image for CHAT-SERVER using gRPC Bidirectional Streaming

CHAT-SERVER using gRPC Bidirectional Streaming

CodeFuture
Originally published at blog.codefuture.dev ・7 min read

In this post, we will cover how to create a basic chat server for one-to-one chatting using gRPC bidirectional streaming. We are going to use Go language for both gRPC server and gRPC client programs.

Protocol-buffer or protobuf is an efficient format of data transfer that is preferably used with gRPC. First, we will create a protobuf file with messages that define the format of communication between gRPC server and client and, we also define Remote Procedure Call (RPC) services in the same protobuf file.

Create a protobuf file




Let's first create a **chat.proto** file in the project root directory and copy the following lines.



```protobuf
// path: ./chat.proto

syntax = "proto3";

package chatserver;

message FromClient {
    string name = 1;
    string body = 2;
}

message FromServer {
    string name = 1;
    string body = 2;
}

service Services {

    rpc ChatService(stream FromClient) returns (stream FromServer){};

}
Enter fullscreen mode Exit fullscreen mode

The syntax = 'proto3' specifies that we are using proto3 version syntax (the default version while writing this post is proto2). We have also specified a package name as chatserver. Then, we have defined two messages, one for clients to server communication


 and one for the server to client communication

 ```FromServer```

.  The fields in the message are identical in this case though; the `name` is for the user's name and the body is for the user's message.

To generate the service interface, we need to define **RPC** methods in the `chat.proto` file. In this example, we have defined only one RPC method i.e.`ChatService`. The `ChatService` method will be called by clients to set up bidirectional gRPC streams between client and server.

### Compile protobuf file `chat.proto` 

Now that we have our `chat.proto` file is ready. Next step is to compile the `chat.proto` file to generate the compiled `*.pb.go` file with **Interfaces** that we will be using to send and receive messages.

Before you proceed, please ensure that you have installed the following in your system, and both **protoc** and **protoc-gen-go** are in system $PATH:

> - [protobuf compiler](https://github.com/protocolbuffers/protobuf/releases) 
> - Golang Protobuf plugin:

 ```go install google.golang.org/protobuf/cmd/protoc-gen-go```



Now, run the command below in your preferred shell with PWD as your project root directory to generate golang code referring to `chat.proto` file.



```bash
$ protoc --go_out=plugins=grpc:chatserver chat.proto
Enter fullscreen mode Exit fullscreen mode

The above command will generate a file chatserver.pb.go inside a directory ./chatserver/.

Create a file


 under `package main`

Let's create our project as a **Go module** for dependency management by executing the following command in our project root directory. This will create a `go.mod` file that would identify our project as a module.



Enter fullscreen mode Exit fullscreen mode

$ go mod init grpcChatServer




Now, create a file named as `server.go` in our project root directory. We define a listener associated with a port which can be referred from environment variables. In case `PORT` is not set as a system environment, then let's assign 5000 (some port) as the default Port for our gRPC server.



```go
func main() {
  // assign port
  Port := os.Getenv("PORT")
    if Port == "" {
        Port = "5000"// port default : 5000 if in env port is not set
    } 

  // initiate listener
    listen, err := net.Listen("tcp", Port)
    if err != nil {
        log.Fatalf("Could not listen @ %v :: %v", Port, err)
    }
  log.Println("Listening @ "+Port)  

    // ...
}
Enter fullscreen mode Exit fullscreen mode

Create a gRPC server instance that would listen and server through assigned Port.

grpcServer := grpc.NewServer()
err = grpcServer.Serve(listen)
    if err != nil {
        log.Fatalf("Failed to start gRPC server :: %v", err)
    }

Enter fullscreen mode Exit fullscreen mode

Though we have created and also can run the gRPC server, it will not do anything impressive. Let's create a chatserver.go file that invokes interfaces defined in chatserver.pb.go file which will help to establish bidirectional stream between server and client.

Create chatserver.go file under


package chatserver

Let's create a new file named aschatserver.go inside chatserver directory (this is the same directory that has chatserver.pb.go file generated earlier through the proto compiler).

Now, define a struct messageUnit for handling the message in the server. We also create another struct messageQue that basically a slice MQue of type messageUnit and variable mu of type sync.Muex . Go sync.Mutex implements memory Lock() and UnLock() which are used to avoid race conditions while accessing global variables asynchronously through multiple threads.

type messageUnit struct {
    ClientName        string
    MessageBody       string
    MessageUniqueCode int
    ClientUniqueCode  int
}

type messageQue struct {
    MQue []messageUnit
    mu sync.Mutex
}

var messageQueObject = messageQue{}

Enter fullscreen mode Exit fullscreen mode

In addition to the above, lets define a type struct with name ChatServer which implemets a method ChatService as defined in chat.proto file. ChatService method takes an argument of type Services_ChatServiceServer (defined in chatserver.pb.go file) and returns error.

type ChatServer struct {
}

//ChatService 
func (is *ChatServer) ChatService(csi Services_ChatServiceServer) error {

// ...

}
Enter fullscreen mode Exit fullscreen mode

Now, inside our chatservice method we need to call two methods - one for receiving messages from stream and one for sending messages to stream. With recieveFromStream method, we parse the message from the client and append it to our messageQue object.

// recieve from stream
func recieveFromStream(csi_ Services_ChatServiceServer, clientUniqueCode_ int) {

    for {
        req, err := csi_.Recv()
        if err != nil {
            log.Printf("Error reciving request from client :: %v", err)
            break

        } else {
            messageQueObject.mu.Lock()
            messageQueObject.MQue = append(messageQueObject.MQue, messageUnit{ClientName: req.Name, MessageBody: req.Body, MessageUniqueCode: rand.Intn(1e8), ClientUniqueCode: clientUniqueCode_})
            messageQueObject.mu.Unlock()
            log.Printf("%v", messageQueObject.MQue[len(messageQueObject.MQue)-1])
        }

    }

}
Enter fullscreen mode Exit fullscreen mode

With sendToStream method, we serialize the message from messageQue and send it to the designated client.

//send to stream
func sendToStream(csi_ Services_ChatServiceServer, clientUniqueCode_ int, errch_ chan error) {

    for {

        for {
            time.Sleep(500 * time.Millisecond)
            messageQueObject.mu.Lock()
            if len(messageQueObject.MQue) == 0 {
                messageQueObject.mu.Unlock()
                break
            }
            senderUniqueCode := messageQueObject.MQue[0].ClientUniqueCode
            senderName4client := messageQueObject.MQue[0].ClientName
            message4client := messageQueObject.MQue[0].MessageBody
            messageQueObject.mu.Unlock()
            if senderUniqueCode != clientUniqueCode_ {
                err := csi_.Send(&FromServer{Name: senderName4client, Body: message4client})

                if err != nil {
                    errch_ <- err
                }
                messageQueObject.mu.Lock()
                if len(messageQueObject.MQue) >= 2 {
                    messageQueObject.MQue = messageQueObject.MQue[1:] // if send success > delete message
                } else {
                    messageQueObject.MQue = []messageUnit{}
                }
                messageQueObject.mu.Unlock()

            }

        }

        time.Sleep(1 * time.Second)

    }

}

Enter fullscreen mode Exit fullscreen mode

Let's call both recieveFromStream and sendToStream method from ChatService method through go routines. We have created one channel of type error which is used to pass error, if any, to the client during sending message to stream; the same error also would lead to termination of the connection from server-side. Every time a client connects to the server, two go routines would be spawned for recieving and sending messages. Whenever multiple Go-routines asynchronously access a variable (memory location), mutex blocking is used above to avoid Race condition.

// ChatService  
func (is *ChatServer) ChatService(csi Services_ChatServiceServer) error {

    clientUniqueCode := rand.Intn(1e3)

    // recieve request <<< client
    go recieveFromStream(csi, clientUniqueCode)
    //stream >>> client
    errch := make(chan error)
    go sendToStream(csi, clientUniqueCode, errch)

    return <-errch
}
Enter fullscreen mode Exit fullscreen mode

Now, we need to register ChatService by adding the following two lines in server.go file.

// register ChatService
    cs := chatserver.ChatServer{}
    chatserver.RegisterServicesServer(grpcServer, &cs)
Enter fullscreen mode Exit fullscreen mode

with this, we have finished our serverside code for gRPC bidirectional streaming. We can choose any language that supports gRPC for creating client stub. For simplicity, in this post let's create our client-side program in Golang. The chatserver.pb.go file also includes interfaces for the client.

Create client.go for client side stubs

let's create a client.go file in our project root directory. The main intention now is to have two clients that would first establish bidirectional streaming connections with the server and then chat with each other. In this post, we will be using the console in the terminal for posting and receiving messages.

For the sake of simplicity, in this post, we have two clients who communicate through one-to-one message. Clients shall have a name and would be able to send and receive messages. Let's define a struct clientHandle which basically has two fields one is clientName and the second one is the stream. clientHandle implements two methods one for sending message sendMessage and one for receiving message receiveMessage.

type clientHandle struct {
    stream     chatserver.Services_ChatServiceClient
    clientName string
}

// Assign name
func (ch *clientHandle) clientConfig() {

    reader := bufio.NewReader(os.Stdin)
    fmt.Printf("Your Name : ")
    msg, err := reader.ReadString('\n')
    if err != nil {
        log.Fatalf("Failed to read from console :: %v", err)

    }
    ch.clientName = strings.TrimRight(msg, "\r\n")

}

// send Message
func (ch *clientHandle) sendMessage() {

    for {
        reader := bufio.NewReader(os.Stdin)
        clientMessage, err := reader.ReadString('\n')
        clientMessage = strings.TrimRight(clientMessage, "\r\n")
        if err != nil {
            log.Printf("Failed to read from console :: %v", err)
            continue
        }

        clientMessageBox := &chatserver.FromClient{
            Name: ch.clientName,
            Body: clientMessage,
        }

        err = ch.stream.Send(clientMessageBox)

        if err != nil {
            log.Printf("Error while sending to server :: %v", err)
        }

    }

}

// receive message
func (ch *clientHandle) receiveMessage() {

    for {
        resp, err := ch.stream.Recv()
        if err != nil {
            log.Fatalf("can not receive %v", err)
        }
        log.Printf("%s : %s", resp.Name, resp.Body)
    }
}

Enter fullscreen mode Exit fullscreen mode

In the main function, the client dial gRPC server addressing the port that the server is listening to. The client.ChatService method is called with a context to initiate a bidirectional stream between the client and the gRPC server. Next, we are running sendMessage and receiveMessage loops in separate go-routines. The main method is blocked by a dummy chan.

func main() {

    const serverID = "localhost:5000"

    log.Println("Connecting : " + serverID)
    conn, err := grpc.Dial(serverID, grpc.WithInsecure())

    if err != nil {
        log.Fatalf("Failed to connect gRPC server :: %v", err)
    }
    defer conn.Close()

    client := chatserver.NewServicesClient(conn)

    stream, err := client.ChatService(context.Background())
    if err != nil {
        log.Fatalf("Failed to get response from gRPC server :: %v", err)
    }

    ch := clientHandle{stream: stream}
    ch.clientConfig()
    go ch.sendMessage()
    go ch.receiveMessage()

    // block main
    bl := make(chan bool)
    <-bl

}
Enter fullscreen mode Exit fullscreen mode

Now we are done! Let's type the following commands on the terminal to run our gRPC chatserver and clients.

# run server (Termina-1)

$ env PORT=5000 go run server.go

# Run client.go file in two separate terminals (Terminal -2 and Terminal -3)

$ go run server.go

Enter fullscreen mode Exit fullscreen mode

server.png
clients.png

I hope, you enjoyed this post. I would appreciate your feedback/suggestions/comments in the comment section below.

Thanks.

Follow Me :

YouTube CodeFuture , Twitter

Link to GitHub :

https://github.com/rrrCode9/gRPC-Bidirectional-Streaming-ChatServer.git

Useful Links :

protobuf compiler, grpc.io

** #gRPC #BidirectionalStreaming #Golang #chatServer **

Discussion (1)

Collapse
hsn profile image
hsnhrn

Thanks a lot for the article. It is quite helpful. One thing i noticed is that the server sends message only to one other client. If e.g. total of 3 clients are connected to the server and 1st client sends the message it will be only sent to the 2nd or 3rd client randomly. It would be great if this would also be capable of sending to some specific client or to all the clients.