Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p/discover: implement send of v5 call without waiting for response #31150

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 73 additions & 7 deletions p2p/discover/v5_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,14 @@ type UDPv5 struct {
callDoneCh chan *callV5
respTimeoutCh chan *callTimeout
sendCh chan sendRequest
sendNoRespCh chan *sendNoRespRequest
unhandled chan<- ReadPacket

// state of dispatch
codec codecV5
activeCallByNode map[enode.ID]*callV5
activeCallByAuth map[v5wire.Nonce]*callV5
noRespCallByAuth map[v5wire.Nonce]*callV5
callQueue map[enode.ID][]*callV5

// shutdown stuff
Expand All @@ -117,6 +119,12 @@ type sendRequest struct {
msg v5wire.Packet
}

type sendNoRespRequest struct {
destNode *enode.Node
destAddr netip.AddrPort
msg v5wire.Packet
}

// callV5 represents a remote procedure call against another node.
type callV5 struct {
id enode.ID
Expand Down Expand Up @@ -176,12 +184,14 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
callCh: make(chan *callV5),
callDoneCh: make(chan *callV5),
sendCh: make(chan sendRequest),
sendNoRespCh: make(chan *sendNoRespRequest),
respTimeoutCh: make(chan *callTimeout),
unhandled: cfg.Unhandled,
// state of dispatch
codec: v5wire.NewCodec(ln, cfg.PrivateKey, cfg.Clock, cfg.V5ProtocolID),
activeCallByNode: make(map[enode.ID]*callV5),
activeCallByAuth: make(map[v5wire.Nonce]*callV5),
noRespCallByAuth: make(map[v5wire.Nonce]*callV5),
callQueue: make(map[enode.ID][]*callV5),
// shutdown
closeCtx: closeCtx,
Expand Down Expand Up @@ -512,14 +522,14 @@ func (t *UDPv5) verifyResponseNode(c *callV5, r *enr.Record, distances []uint, s
func (t *UDPv5) callToNode(n *enode.Node, responseType byte, req v5wire.Packet) *callV5 {
addr, _ := n.UDPEndpoint()
c := &callV5{id: n.ID(), addr: addr, node: n}
t.initCall(c, responseType, req)
t.initCallAndDelivery(c, responseType, req)
return c
}

// callToID is like callToNode, but for cases where the node record is not available.
func (t *UDPv5) callToID(id enode.ID, addr netip.AddrPort, responseType byte, req v5wire.Packet) *callV5 {
c := &callV5{id: id, addr: addr}
t.initCall(c, responseType, req)
t.initCallAndDelivery(c, responseType, req)
return c
}

Expand All @@ -532,6 +542,10 @@ func (t *UDPv5) initCall(c *callV5, responseType byte, packet v5wire.Packet) {
// Assign request ID.
crand.Read(c.reqid)
packet.SetRequestID(c.reqid)
}

func (t *UDPv5) initCallAndDelivery(c *callV5, responseType byte, packet v5wire.Packet) {
t.initCall(c, responseType, packet)
// Send call to dispatch.
select {
case t.callCh <- c:
Expand Down Expand Up @@ -581,12 +595,17 @@ func (t *UDPv5) dispatch() {
case c := <-t.callCh:
t.callQueue[c.id] = append(t.callQueue[c.id], c)
t.sendNextCall(c.id)
case cnr := <-t.sendNoRespCh:
// send a TalkReq call but not waiting for response
t.sendCallNotWaitResp(cnr)

case ct := <-t.respTimeoutCh:
active := t.activeCallByNode[ct.c.id]
if ct.c == active && ct.timer == active.timeout {
ct.c.err <- errTimeout
}
delete(t.noRespCallByAuth, ct.c.nonce)
ct.c.timeout.Stop()

case c := <-t.callDoneCh:
active := t.activeCallByNode[c.id]
Expand All @@ -600,7 +619,6 @@ func (t *UDPv5) dispatch() {

case r := <-t.sendCh:
t.send(r.destID, r.destAddr, r.msg, nil)

case p := <-t.packetInCh:
t.handlePacket(p.Data, p.Addr)
// Arm next read.
Expand All @@ -619,6 +637,9 @@ func (t *UDPv5) dispatch() {
delete(t.activeCallByNode, id)
delete(t.activeCallByAuth, c.nonce)
}
for nonce := range t.noRespCallByAuth {
delete(t.activeCallByAuth, nonce)
}
return
}
}
Expand All @@ -644,6 +665,35 @@ func (t *UDPv5) startResponseTimeout(c *callV5) {
close(done)
}

// sendCallNotWaitResp send a talk request contains utp packet by call, call will not insert into queue.
// And during handshaking with the target node, new packets may be lost.
func (t *UDPv5) sendCallNotWaitResp(r *sendNoRespRequest) {
// send out a TalkRequest that is a UTP packet for portal network
id, addr := r.destNode.ID(), r.destAddr.String()
if n := t.codec.SessionNode(id, addr); n != nil {
// already handshake success
t.send(id, r.destAddr, r.msg, nil)
return
}

c := &callV5{id: id, addr: r.destAddr, node: r.destNode}
t.initCall(c, v5wire.TalkResponseMsg, r.msg)

nonce, _ := t.send(c.id, c.addr, c.packet, nil)
c.nonce = nonce
t.noRespCallByAuth[nonce] = c
t.startResponseTimeout(c)
}

// sendNoRespData send a data from a call of no wait resp.
// The handshake just has been successful.
func (t *UDPv5) sendNoRespData(c *callV5) {
// Just resend the data and not use call again
delete(t.noRespCallByAuth, c.nonce)
c.timeout.Stop()
t.send(c.node.ID(), c.addr, c.packet, c.challenge)
}

// sendNextCall sends the next call in the call queue if there is no active call.
func (t *UDPv5) sendNextCall(id enode.ID) {
queue := t.callQueue[id]
Expand Down Expand Up @@ -684,7 +734,15 @@ func (t *UDPv5) sendResponse(toID enode.ID, toAddr netip.AddrPort, packet v5wire

func (t *UDPv5) sendFromAnotherThread(toID enode.ID, toAddr netip.AddrPort, packet v5wire.Packet) {
select {
case t.sendCh <- sendRequest{toID, toAddr, packet}:
case t.sendCh <- sendRequest{destID: toID, destAddr: toAddr, msg: packet}:
case <-t.closeCtx.Done():
}
}

// SendNoResp Send a packet but will not wait for a response
func (t *UDPv5) SendNoResp(n *enode.Node, toAddr netip.AddrPort, packet v5wire.Packet) {
select {
case t.sendNoRespCh <- &sendNoRespRequest{n, toAddr, packet}:
case <-t.closeCtx.Done():
}
}
Expand Down Expand Up @@ -877,17 +935,25 @@ func (t *UDPv5) handleWhoareyou(p *v5wire.Whoareyou, fromID enode.ID, fromAddr n
c.err <- errors.New("remote wants handshake, but call has no ENR")
return
}
// Resend the call that was answered by WHOAREYOU.

t.log.Trace("<< "+p.Name(), "id", c.node.ID(), "addr", fromAddr)
c.handshakeCount++
c.challenge = p
p.Node = c.node
t.sendCall(c)
if _, ok := t.noRespCallByAuth[p.Nonce]; !ok {
// Resend the call that was answered by WHOAREYOU.
p.Node = c.node
t.sendCall(c)
} else {
t.sendNoRespData(c)
}
}

// matchWithCall checks whether a handshake attempt matches the active call.
func (t *UDPv5) matchWithCall(fromID enode.ID, nonce v5wire.Nonce) (*callV5, error) {
c := t.activeCallByAuth[nonce]
if c == nil {
c = t.noRespCallByAuth[nonce]
}
if c == nil {
return nil, errChallengeNoCall
}
Expand Down