In file "tendermint/p2p/peer.go", we have following code
// peer implements Peer.
//
// Before using a peer, you will need to perform a handshake on connection.
type peer struct {
service.BaseService
// raw peerConn and the multiplex connection
peerConn
mconn *tmconn.MConnection
// peer's node info and the channel it knows about
// channels = nodeInfo.Channels
// cached to avoid copying nodeInfo in hasChannel
nodeInfo NodeInfo
channels []byte
// User data
Data *cmap.CMap
metrics *Metrics
metricsTicker *time.Ticker
}
type PeerOption func(*peer)
func newPeer(
pc peerConn,
mConfig tmconn.MConnConfig,
nodeInfo NodeInfo,
reactorsByCh map[byte]Reactor,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
options ...PeerOption,
) *peer {
p := &peer{
peerConn: pc,
nodeInfo: nodeInfo,
channels: nodeInfo.(DefaultNodeInfo).Channels,
Data: cmap.NewCMap(),
metricsTicker: time.NewTicker(metricsTickerDuration),
metrics: NopMetrics(),
}
p.mconn = createMConnection(
pc.conn,
p,
reactorsByCh,
chDescs,
onPeerError,
mConfig,
)
p.BaseService = *service.NewBaseService(nil, "Peer", p)
for _, option := range options {
option(p)
}
return p
}
........
func createMConnection(
conn net.Conn,
p *peer,
reactorsByCh map[byte]Reactor,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
config tmconn.MConnConfig,
) *tmconn.MConnection {
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil {
// Note that its ok to panic here as it's caught in the conn._recover,
// which does onPeerError.
panic(fmt.Sprintf("Unknown channel %X", chID))
}
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
reactor.Receive(chID, p, msgBytes)
}
onError := func(r interface{}) {
onPeerError(p, r)
}
return tmconn.NewMConnectionWithConfig(
conn,
chDescs,
onReceive,
onError,
config,
)
}
In file "tendermint/p2p/conn/connection.go", we have code:
type MConnection struct {
service.BaseService
conn net.Conn
bufConnReader *bufio.Reader
bufConnWriter *bufio.Writer
sendMonitor *flow.Monitor
recvMonitor *flow.Monitor
send chan struct{}
pong chan struct{}
channels []*Channel
channelsIdx map[byte]*Channel
onReceive receiveCbFunc
onError errorCbFunc
errored uint32
config MConnConfig
........
}
// recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer.
// After a whole message has been assembled, it's pushed to onReceive().
// Blocks depending on how the connection is throttled.
// Otherwise, it never blocks.
func (c *MConnection) recvRoutine() {
defer c._recover()
protoReader := protoio.NewDelimitedReader(c.bufConnReader, c._maxPacketMsgSize)
FOR_LOOP:
for {
........
// Read packet type
var packet tmp2p.Packet
_n, err := protoReader.ReadMsg(&packet)
........
// Read more depending on packet type.
switch pkt := packet.Sum.(type) {
........
case *tmp2p.Packet_PacketMsg:
channelID := byte(pkt.PacketMsg.ChannelID)
channel, ok := c.channelsIdx[channelID]
if pkt.PacketMsg.ChannelID < 0 || pkt.PacketMsg.ChannelID > math.MaxUint8 || !ok || channel == nil {
err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID)
c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err)
c.stopForError(err)
break FOR_LOOP
}
msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg)
if err != nil {
if c.IsRunning() {
c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err)
c.stopForError(err)
}
break FOR_LOOP
}
if msgBytes != nil {
c.Logger.Debug("Received bytes", "chID", channelID, "msgBytes", msgBytes)
// NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine
c.onReceive(channelID, msgBytes)
}
default:
........
}
Conclusion:
The MConnection relentlessly receives packets from network. When it receives a packet of type Packet_PacketMsg, it go ahead to receive the whole message with this function:
msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg)
Then it calls the
c.onReceive(channelID, msgBytes)
The onReceive is a callback defined in struc MConnection. It gets initialzied in "tendermint/p2p/peer.go". The callback first finds out the reactor which the channelID corresponds to. And then call the reactor's "Receive" function with the channelID, peer, and msgBytes as parameters:
reactor.Receive(chID, p, msgBytes)
This is how the reactor's Receive function gets message from peers.
Top comments (0)