Skip to content

Commit

Permalink
std: separate progress callback for consume
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Feb 5, 2025
1 parent 02e71db commit 30ec775
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 55 deletions.
4 changes: 0 additions & 4 deletions dv/dv/advert_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ func (a *advertModule) dataFetch(nName enc.Name, bootTime uint64, seqNo uint64)
WithVersion(seqNo)

a.dv.client.Consume(advName, func(state ndn.ConsumeState) {
if !state.IsComplete() {
return
}

go func() {
fetchErr := state.Error()
if fetchErr != nil {
Expand Down
4 changes: 0 additions & 4 deletions dv/dv/prefix_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ func (dv *Router) prefixDataFetch(nName enc.Name) {

// Fetch the object
dv.client.Consume(name, func(state ndn.ConsumeState) {
if !state.IsComplete() {
return
}

go func() {
fetchErr := state.Error()
if fetchErr != nil {
Expand Down
3 changes: 3 additions & 0 deletions std/ndn/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ type ConsumeExtArgs struct {
// Callback is called when data is available.
// True should be returned to continue fetching the object.
Callback func(status ConsumeState)
// OnProgress is called when progress is made (advanced usage).
// [Caution] Any data returned by Content() may not be validated.
OnProgress func(status ConsumeState)
// NoMetadata disables fetching RDR metadata (advanced usage).
NoMetadata bool
}
Expand Down
4 changes: 2 additions & 2 deletions std/object/client_consume_seg.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ func (s *rrSegFetcher) handleValidatedData(args ndn.ExpressCallbackArgs, state *
return
}

if !state.IsComplete() {
state.args.Callback(state) // progress
if state.args.OnProgress != nil && !state.IsComplete() {
state.args.OnProgress(state)
}
}

Expand Down
6 changes: 1 addition & 5 deletions std/security/ndncert/client_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@ func (c *Client) FetchProfile() (*tlv.CaProfile, error) {
name := c.caPrefix.
Append(enc.NewGenericComponent("CA")).
Append(enc.NewGenericComponent("INFO"))
c.client.Consume(name, func(status ndn.ConsumeState) {
if status.IsComplete() {
ch <- status
}
})
c.client.Consume(name, func(status ndn.ConsumeState) { ch <- status })
state := <-ch
if err := state.Error(); err != nil {
return nil, err
Expand Down
4 changes: 0 additions & 4 deletions std/sync/snapshot_node_latest.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,6 @@ func (s *SnapshotNodeLatest) snapName(node enc.Name, boot uint64) enc.Name {
func (s *SnapshotNodeLatest) fetch(node enc.Name, boot uint64) {
// Discover the latest snapshot
s.Client.Consume(s.snapName(node, boot), func(cstate ndn.ConsumeState) {
if !cstate.IsComplete() {
return
}

s.callback(func(state SvMap[svsDataState]) (pub SvsPub, err error) {
hash := node.TlvStr()
pub.Publisher = node
Expand Down
4 changes: 0 additions & 4 deletions std/sync/svs_alo_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,6 @@ func (s *SvsALO) consumeCheck(node enc.Name, hash string) {
func (s *SvsALO) consumeObject(node enc.Name, boot uint64, seq uint64) {
name := s.objectName(node, boot, seq)
s.client.Consume(name, func(status ndn.ConsumeState) {
if !status.IsComplete() {
return
}

s.mutex.Lock()
defer s.mutex.Unlock()

Expand Down
40 changes: 18 additions & 22 deletions tools/catchunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,32 +61,21 @@ func (cc *CatChunks) run(_ *cobra.Command, args []string) {

done := make(chan ndn.ConsumeState)
t1, t2 := time.Now(), time.Now()
byteCount := 0

// calling Content() on a status object clears the buffer
// and returns the new data the next time it is called
write := func(status ndn.ConsumeState) {
for _, chunk := range status.Content() {
os.Stdout.Write(chunk)
byteCount += len(chunk)
}
}

// fetch object
progress := 0
cli.Consume(name, func(status ndn.ConsumeState) {
if status.IsComplete() {
cli.ConsumeExt(ndn.ConsumeExtArgs{
Name: name,
Callback: func(state ndn.ConsumeState) {
t2 = time.Now()
write(status)
done <- status
return
}

if status.Progress()-progress >= 1000 {
progress = status.Progress()
log.Debug(cc, "Consume progress", "progress", float64(status.Progress())/float64(status.ProgressMax())*100)
write(status)
}
done <- state
},
OnProgress: func(state ndn.ConsumeState) {
if state.Progress()-progress >= 1000 {
progress = state.Progress()
log.Debug(cc, "Consume progress", "progress", float64(state.Progress())/float64(state.ProgressMax())*100)
}
},
})
state := <-done

Expand All @@ -95,6 +84,13 @@ func (cc *CatChunks) run(_ *cobra.Command, args []string) {
return
}

// write object to stdout
byteCount := 0
for _, chunk := range state.Content() {
os.Stdout.Write(chunk)
byteCount += len(chunk)
}

// statistics
fmt.Fprintf(os.Stderr, "Object fetched %s\n", state.Name())
fmt.Fprintf(os.Stderr, "Content: %d bytes\n", byteCount)
Expand Down
14 changes: 4 additions & 10 deletions tools/nfdc/nfdc_dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,13 @@ func (t *Tool) fetchStatusDataset(suffix enc.Name) (enc.Wire, error) {
client.ConsumeExt(ndn.ConsumeExtArgs{
Name: t.Prefix().Append(suffix...),
NoMetadata: true, // NFD has no RDR metadata
Callback: func(status ndn.ConsumeState) {
if !status.IsComplete() {
return
}
ch <- status
close(ch)
},
Callback: func(status ndn.ConsumeState) { ch <- status },
})

res := <-ch
if err := res.Error(); err != nil {
state := <-ch
if err := state.Error(); err != nil {
return nil, err
}

return res.Content(), nil
return state.Content(), nil
}

0 comments on commit 30ec775

Please sign in to comment.