Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 104 additions & 75 deletions fw/face/ndnlp-link-service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/named-data/ndnd/std/utils"
)

const lpPacketOverhead = 1 + 3
const lpPacketOverhead = 1 + 3 + 1 + 3 // LpPacket+Fragment
const pitTokenOverhead = 1 + 1 + 6
const congestionMarkOverhead = 3 + 1 + 8

Expand Down Expand Up @@ -63,14 +63,17 @@ type NDNLPLinkService struct {
options NDNLPLinkServiceOptions
headerOverhead int

// Receive
partialMessageStore map[uint64][][]byte
// Fragment reassembly ring buffer
reassemblyIndex int
reassemblyBuffers [16]struct {
sequence uint64
buffer enc.Wire
}

// Send
// Outgoing packet state
nextSequence uint64
nextTxSequence uint64
lastTimeCongestionMarked time.Time
BufferReader enc.BufferReader
congestionCheck uint64
outFrame []byte
}
Expand All @@ -84,11 +87,12 @@ func MakeNDNLPLinkService(transport transport, options NDNLPLinkServiceOptions)
l.options = options
l.computeHeaderOverhead()

l.partialMessageStore = make(map[uint64][][]byte)
// Initialize outgoing packet state
l.nextSequence = 0
l.nextTxSequence = 0
l.congestionCheck = 0
l.outFrame = make([]byte, defn.MaxNDNPacketSize)

return l
}

Expand Down Expand Up @@ -116,10 +120,8 @@ func (l *NDNLPLinkService) computeHeaderOverhead() {

if l.options.IsFragmentationEnabled {
l.headerOverhead += 1 + 1 + 8 // Sequence
}

if l.options.IsFragmentationEnabled {
l.headerOverhead += 1 + 1 + 2 + 1 + 1 + 2 // FragIndex/FragCount (Type + Length + up to 2^16 fragments)
l.headerOverhead += 1 + 1 + 2 // FragIndex (max 2^16 fragments)
l.headerOverhead += 1 + 1 + 2 // FragCount
}

if l.options.IsIncomingFaceIndicationEnabled {
Expand Down Expand Up @@ -183,17 +185,23 @@ func sendPacket(l *NDNLPLinkService, out dispatch.OutPkt) {
l.nOutData++
}

now := time.Now()
// Congestion marking
congestionMark := pkt.CongestionMark // from upstream
if l.checkCongestion(wire) && congestionMark == nil {
core.LogWarn(l, "Marking congestion")
congestionMark = utils.IdPtr(uint64(1)) // ours
}

// Calculate effective MTU after accounting for packet-specific overhead
effectiveMtu := l.transport.MTU() - l.headerOverhead
if pkt.PitToken != nil {
effectiveMtu -= pitTokenOverhead
}
if pkt.CongestionMark != nil {
if congestionMark != nil {
effectiveMtu -= congestionMarkOverhead
}

// Fragmentation
// Fragment packet if necessary
var fragments []*spec.LpPacket
if len(wire) > effectiveMtu {
if !l.options.IsFragmentationEnabled {
Expand All @@ -202,50 +210,35 @@ func sendPacket(l *NDNLPLinkService, out dispatch.OutPkt) {
}

// Split up fragment
nFragments := int((len(wire) + effectiveMtu - 1) / effectiveMtu)
fragments = make([]*spec.LpPacket, nFragments)
fragCount := (len(wire) + effectiveMtu - 1) / effectiveMtu
fragCountPtr := utils.IdPtr(uint64(fragCount))
fragments = make([]*spec.LpPacket, fragCount)

reader := enc.NewBufferReader(wire)
for i := 0; i < nFragments; i++ {
for i := range fragments {
// Read till effective mtu or end of wire
readSize := effectiveMtu
if i == nFragments-1 {
readSize = len(wire) - effectiveMtu*(nFragments-1)
if i == fragCount-1 {
readSize = len(wire) - effectiveMtu*(fragCount-1)
}

frag, err := reader.ReadWire(readSize)
if err != nil {
core.LogFatal(l, "Unexpected Wire reading error")
}
fragments[i] = &spec.LpPacket{Fragment: frag}
}
} else {
fragments = []*spec.LpPacket{{Fragment: enc.Wire{wire}}}
}

// Sequence
if len(fragments) > 1 {
for _, fragment := range fragments {
fragment.Sequence = utils.IdPtr(l.nextSequence)
// Create fragment with sequence and index
l.nextSequence++
}
}

// Congestion marking
congestionMark := pkt.CongestionMark // from upstream
if congestionMarking {
// GetSendQueueSize is expensive, so only check every 1/2 of the threshold
// and only if we can mark congestion for this particular packet
if l.congestionCheck > l.options.DefaultCongestionThresholdBytes {
if now.After(l.lastTimeCongestionMarked.Add(l.options.BaseCongestionMarkingInterval)) &&
l.transport.GetSendQueueSize() > l.options.DefaultCongestionThresholdBytes {
core.LogWarn(l, "Marking congestion")
congestionMark = utils.IdPtr[uint64](1) // ours
l.lastTimeCongestionMarked = now
fragments[i] = &spec.LpPacket{
Fragment: frag,
Sequence: utils.IdPtr(l.nextSequence),
FragIndex: utils.IdPtr(uint64(i)),
FragCount: fragCountPtr,
}

l.congestionCheck = 0
}

l.congestionCheck += uint64(len(wire)) // approx
} else {
// No fragmentation necessary
fragments = []*spec.LpPacket{{Fragment: enc.Wire{wire}}}
}

// Send fragment(s)
Expand All @@ -265,6 +258,7 @@ func sendPacket(l *NDNLPLinkService, out dispatch.OutPkt) {
fragment.CongestionMark = congestionMark
}

// Encode final LP frame
pkt := &spec.Packet{
LpPacket: fragment,
}
Expand Down Expand Up @@ -296,7 +290,7 @@ func (l *NDNLPLinkService) handleIncomingFrame(frame []byte) {
IncomingFaceID: l.faceID,
}

L2, err := ReadPacketUnverified(enc.NewBufferReader(wire))
L2, err := readPacketUnverified(enc.NewBufferReader(wire))
if err != nil {
core.LogError(l, err)
return
Expand Down Expand Up @@ -333,7 +327,7 @@ func (l *NDNLPLinkService) handleIncomingFrame(frame []byte) {
if fragIndex == 0 && fragCount == 1 {
// Bypass reassembly since only one fragment
} else {
fragment = l.reassemblePacket(LP, baseSequence, fragIndex, fragCount)
fragment = l.reassemble(LP, baseSequence, fragIndex, fragCount)
if fragment == nil {
// Nothing more to be done, so return
return
Expand Down Expand Up @@ -370,7 +364,7 @@ func (l *NDNLPLinkService) handleIncomingFrame(frame []byte) {
}

// Parse inner packet in place
L3, err := ReadPacketUnverified(enc.NewBufferReader(wire))
L3, err := readPacketUnverified(enc.NewBufferReader(wire))
if err != nil {
return
}
Expand All @@ -390,47 +384,82 @@ func (l *NDNLPLinkService) handleIncomingFrame(frame []byte) {
}
}

func (l *NDNLPLinkService) reassemblePacket(
func (l *NDNLPLinkService) reassemble(
frame *spec.LpPacket,
baseSequence uint64,
fragIndex uint64,
fragCount uint64,
) enc.Wire {
_, hasSequence := l.partialMessageStore[baseSequence]
if !hasSequence {
// Create map entry
l.partialMessageStore[baseSequence] = make([][]byte, fragCount)
var buffer enc.Wire = nil
var bufIndex int = 0

// Check if reassembly buffer already exists
for i := range l.reassemblyBuffers {
if l.reassemblyBuffers[i].sequence == baseSequence {
bufIndex = i
buffer = l.reassemblyBuffers[bufIndex].buffer
break
}
}

// Use the next available buffer if this is a new sequence
if buffer == nil {
bufIndex = (l.reassemblyIndex + 1) % len(l.reassemblyBuffers)
l.reassemblyIndex = bufIndex
l.reassemblyBuffers[bufIndex].sequence = baseSequence
l.reassemblyBuffers[bufIndex].buffer = make(enc.Wire, fragCount)
buffer = l.reassemblyBuffers[bufIndex].buffer
}

// Insert into PartialMessageStore
// Safe to call Join since there is only one fragment
if len(frame.Fragment) > 1 {
core.LogError("LpPacket should only have one fragment.")
// Validate fragCount has not changed
if fragCount != uint64(len(buffer)) {
core.LogWarn(l, "Received fragment count ", fragCount, " does not match expected count ", len(buffer), " for base sequence ", baseSequence, " - DROP")
return nil
}
l.partialMessageStore[baseSequence][fragIndex] = frame.Fragment.Join()

// Determine whether it is time to reassemble
receivedCount := 0
receivedTotalLen := 0
for _, fragment := range l.partialMessageStore[baseSequence] {
if len(fragment) != 0 {
receivedCount++
receivedTotalLen += len(fragment)

// Validate fragIndex is valid
if fragIndex >= uint64(len(buffer)) {
core.LogWarn(l, "Received fragment index ", fragIndex, " out of range for base sequence ", baseSequence, " - DROP")
return nil
}

// Store fragment in buffer
buffer[fragIndex] = frame.Fragment.Join() // should be only one fragment

// Check if all fragments have been received
for _, frag := range buffer {
if len(frag) == 0 {
return nil // not all fragments received
}
}

if receivedCount == len(l.partialMessageStore[baseSequence]) {
// Time to reassemble!
reassembled := make(enc.Wire, len(l.partialMessageStore[baseSequence]))
for i, fragment := range l.partialMessageStore[baseSequence] {
reassembled[i] = fragment
// All fragments received, free up buffer
l.reassemblyBuffers[bufIndex].sequence = 0
l.reassemblyBuffers[bufIndex].buffer = nil

return buffer
}

func (l *NDNLPLinkService) checkCongestion(wire []byte) bool {
if !congestionMarking {
return false
}

// GetSendQueueSize is expensive, so only check every 1/2 of the threshold
// and only if we can mark congestion for this particular packet
if l.congestionCheck > l.options.DefaultCongestionThresholdBytes {
now := time.Now()
if now.After(l.lastTimeCongestionMarked.Add(l.options.BaseCongestionMarkingInterval)) &&
l.transport.GetSendQueueSize() > l.options.DefaultCongestionThresholdBytes {
l.lastTimeCongestionMarked = now
return true
}

delete(l.partialMessageStore, baseSequence)
return reassembled
l.congestionCheck = 0 // reset
}

return nil
l.congestionCheck += uint64(len(wire)) // approx
return false
}

func (op *NDNLPLinkServiceOptions) Flags() (ret uint64) {
Expand All @@ -444,7 +473,7 @@ func (op *NDNLPLinkServiceOptions) Flags() (ret uint64) {
}

// Reads a packet without validating the internal fields
func ReadPacketUnverified(reader enc.ParseReader) (*spec.Packet, error) {
func readPacketUnverified(reader enc.ParseReader) (*spec.Packet, error) {
context := spec.PacketParsingContext{}
context.Init()
return context.Parse(reader, false)
Expand Down
Loading