DEV Community

Yi Zhang
Yi Zhang

Posted on

How Tendermint p2p Nodes Receive Messages

In file "tendermint/p2p/peer.go", we have following code


// peer implements Peer.
//
// Before using a peer, you will need to perform a handshake on connection.
type peer struct {
        service.BaseService

        // raw peerConn and the multiplex connection
        peerConn
        mconn *tmconn.MConnection

        // peer's node info and the channel it knows about
        // channels = nodeInfo.Channels
        // cached to avoid copying nodeInfo in hasChannel
        nodeInfo NodeInfo
        channels []byte

        // User data
        Data *cmap.CMap

        metrics       *Metrics
        metricsTicker *time.Ticker
}

type PeerOption func(*peer)

func newPeer(
        pc peerConn,
        mConfig tmconn.MConnConfig,
        nodeInfo NodeInfo,
        reactorsByCh map[byte]Reactor,
        chDescs []*tmconn.ChannelDescriptor,
        onPeerError func(Peer, interface{}),
        options ...PeerOption,
) *peer {
        p := &peer{
                peerConn:      pc,
                nodeInfo:      nodeInfo,
                channels:      nodeInfo.(DefaultNodeInfo).Channels,
                Data:          cmap.NewCMap(),
                metricsTicker: time.NewTicker(metricsTickerDuration),
                metrics:       NopMetrics(),
        }

        p.mconn = createMConnection(
                pc.conn,
                p,
                reactorsByCh,
                chDescs,
                onPeerError,
                mConfig,
        )
        p.BaseService = *service.NewBaseService(nil, "Peer", p)
        for _, option := range options {
                option(p)
        }

        return p
}

........

func createMConnection(
        conn net.Conn,
        p *peer,
        reactorsByCh map[byte]Reactor,
        chDescs []*tmconn.ChannelDescriptor,
        onPeerError func(Peer, interface{}),
        config tmconn.MConnConfig,
) *tmconn.MConnection {

        onReceive := func(chID byte, msgBytes []byte) {
                reactor := reactorsByCh[chID]
                if reactor == nil {
                        // Note that its ok to panic here as it's caught in the conn._recover,
                        // which does onPeerError.
                        panic(fmt.Sprintf("Unknown channel %X", chID))
                }
                labels := []string{
                        "peer_id", string(p.ID()),
                        "chID", fmt.Sprintf("%#x", chID),
                }
                p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
                reactor.Receive(chID, p, msgBytes)
        }

        onError := func(r interface{}) {
                onPeerError(p, r)
        }

        return tmconn.NewMConnectionWithConfig(
                conn,
                chDescs,
                onReceive,
                onError,
                config,
        )
}

Enter fullscreen mode Exit fullscreen mode

In file "tendermint/p2p/conn/connection.go", we have code:


type MConnection struct {
        service.BaseService

        conn          net.Conn
        bufConnReader *bufio.Reader
        bufConnWriter *bufio.Writer
        sendMonitor   *flow.Monitor
        recvMonitor   *flow.Monitor
        send          chan struct{}
        pong          chan struct{}
        channels      []*Channel
        channelsIdx   map[byte]*Channel
        onReceive     receiveCbFunc
        onError       errorCbFunc
        errored       uint32
        config        MConnConfig
........
}


// recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
// After a whole message has been assembled, it's pushed to onReceive().
// Blocks depending on how the connection is throttled.
// Otherwise, it never blocks.
func (c *MConnection) recvRoutine() {
        defer c._recover()

        protoReader := protoio.NewDelimitedReader(c.bufConnReader, c._maxPacketMsgSize)

FOR_LOOP:
        for {
........
                // Read packet type
                var packet tmp2p.Packet

                _n, err := protoReader.ReadMsg(&packet)
........
                // Read more depending on packet type.
                switch pkt := packet.Sum.(type) {
........
                case *tmp2p.Packet_PacketMsg:
                        channelID := byte(pkt.PacketMsg.ChannelID)
                        channel, ok := c.channelsIdx[channelID]
                        if pkt.PacketMsg.ChannelID < 0 || pkt.PacketMsg.ChannelID > math.MaxUint8 || !ok || channel == nil {
                                err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID)
                                    c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err)
                                c.stopForError(err)
                                break FOR_LOOP
                        }

                        msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg)
                        if err != nil {
                                if c.IsRunning() {
                                        c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err)
                                        c.stopForError(err)
                                }
                                break FOR_LOOP
                        }
                        if msgBytes != nil {
                                c.Logger.Debug("Received bytes", "chID", channelID, "msgBytes", msgBytes)
                                // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
                                c.onReceive(channelID, msgBytes)
                        }
                default:
........
}

Enter fullscreen mode Exit fullscreen mode

Conclusion:

The MConnection relentlessly receives packets from network. When it receives a packet of type Packet_PacketMsg, it go ahead to receive the whole message with this function:

msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg)
Enter fullscreen mode Exit fullscreen mode

Then it calls the

c.onReceive(channelID, msgBytes)
Enter fullscreen mode Exit fullscreen mode

The onReceive is a callback defined in struc MConnection. It gets initialzied in "tendermint/p2p/peer.go". The callback first finds out the reactor which the channelID corresponds to. And then call the reactor's "Receive" function with the channelID, peer, and msgBytes as parameters:

reactor.Receive(chID, p, msgBytes)
Enter fullscreen mode Exit fullscreen mode

This is how the reactor's Receive function gets message from peers.

Top comments (0)