Skip to content

Commit 73d7b7f

Browse files
committed
adds downloading archives in parallel
1 parent 23e9a70 commit 73d7b7f

File tree

2 files changed

+80
-81
lines changed

2 files changed

+80
-81
lines changed

protocol/communities/codex_archive_downloader.go

Lines changed: 73 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"context"
99
"fmt"
1010
"sync"
11+
"time"
1112

1213
"github.com/status-im/status-go/crypto/types"
1314
"github.com/status-im/status-go/protocol/protobuf"
@@ -34,6 +35,7 @@ type CodexArchiveDownloader struct {
3435
totalDownloadedArchivesCount int
3536
currentArchiveHash string
3637
archiveDownloadProgress map[string]int64 // hash -> bytes downloaded
38+
archiveDownloadCancel map[string]chan struct{}
3739
mu sync.RWMutex
3840

3941
// Download control
@@ -56,6 +58,7 @@ func NewCodexArchiveDownloader(codexClient *CodexClient, index *protobuf.CodexWa
5658
totalArchivesCount: len(index.Archives),
5759
totalDownloadedArchivesCount: len(existingArchiveIDs),
5860
archiveDownloadProgress: make(map[string]int64),
61+
archiveDownloadCancel: make(map[string]chan struct{}),
5962
}
6063
}
6164

@@ -76,6 +79,12 @@ func (d *CodexArchiveDownloader) GetTotalDownloadedArchivesCount() int {
7679
return d.totalDownloadedArchivesCount
7780
}
7881

82+
func (d *CodexArchiveDownloader) GetPendingArchivesCount() int {
83+
d.mu.RLock()
84+
defer d.mu.RUnlock()
85+
return len(d.archiveDownloadCancel)
86+
}
87+
7988
// GetCurrentArchiveHash returns the hash of the currently downloading archive
8089
func (d *CodexArchiveDownloader) GetCurrentArchiveHash() string {
8190
d.mu.RLock()
@@ -113,6 +122,9 @@ func (d *CodexArchiveDownloader) IsCancelled() bool {
113122

114123
// StartDownload begins downloading all missing archives
115124
func (d *CodexArchiveDownloader) StartDownload() error {
125+
if d.totalArchivesCount == 0 {
126+
return fmt.Errorf("no archives to download")
127+
}
116128
go func() {
117129
err := d.downloadAllArchives()
118130
d.mu.Lock()
@@ -150,18 +162,42 @@ func (d *CodexArchiveDownloader) downloadAllArchives() error {
150162
}
151163
}
152164

153-
// Download each missing archive
154-
for _, archive := range archivesList {
155-
// Check for cancellation
156-
select {
157-
case <-d.cancelChan:
158-
d.mu.Lock()
159-
d.cancelled = true
160-
d.mu.Unlock()
161-
return nil // Return nil to indicate graceful cancellation
162-
default:
165+
// Monitor for cancellation in a separate goroutine
166+
go func() {
167+
ticker := time.NewTicker(100 * time.Millisecond)
168+
defer ticker.Stop()
169+
170+
for {
171+
select {
172+
case <-d.cancelChan:
173+
d.mu.Lock()
174+
for hash, cancelChan := range d.archiveDownloadCancel {
175+
select {
176+
case <-cancelChan:
177+
// Already closed
178+
default:
179+
close(cancelChan) // Safe to close
180+
}
181+
delete(d.archiveDownloadCancel, hash)
182+
}
183+
d.cancelled = true
184+
d.mu.Unlock()
185+
return // Exit goroutine after cancellation
186+
case <-ticker.C:
187+
// Check if downloads are complete
188+
d.mu.RLock()
189+
complete := d.downloadComplete
190+
d.mu.RUnlock()
191+
192+
if complete {
193+
return // Exit goroutine when downloads complete
194+
}
195+
}
163196
}
197+
}()
164198

199+
// Download each missing archive
200+
for _, archive := range archivesList {
165201
// Check if we already have this archive
166202
hasArchive := false
167203
for _, existingHash := range d.existingArchiveIDs {
@@ -174,37 +210,51 @@ func (d *CodexArchiveDownloader) downloadAllArchives() error {
174210
continue
175211
}
176212

177-
// Set current archive
213+
archiveCancelChan := make(chan struct{})
214+
178215
d.mu.Lock()
179216
d.currentArchiveHash = archive.hash
180217
d.archiveDownloadProgress[archive.hash] = 0
218+
d.archiveDownloadCancel[archive.hash] = archiveCancelChan
181219
d.mu.Unlock()
182220

183-
// Download this archive using its CID
184-
err := d.downloadSingleArchive(archive.hash, archive.cid)
185-
if err != nil {
186-
return fmt.Errorf("failed to download archive %s: %w", archive.hash, err)
187-
}
188-
189-
// Update progress (callback is handled in downloadSingleArchive)
190-
d.mu.Lock()
191-
d.totalDownloadedArchivesCount++
192-
d.mu.Unlock()
221+
// Download this archive in parallel
222+
go func(archiveHash, archiveCid string, archiveCancel chan struct{}) {
223+
err := d.downloadSingleArchive(archiveHash, archiveCid, archiveCancel)
224+
d.mu.Lock()
225+
defer d.mu.Unlock()
226+
227+
if err != nil {
228+
// Store the last error encountered
229+
d.downloadError = err
230+
} else {
231+
// Only increment on successful download
232+
d.totalDownloadedArchivesCount++
233+
}
234+
235+
// Remove from active downloads
236+
delete(d.archiveDownloadCancel, archiveHash)
237+
238+
// Check if all downloads are complete
239+
if len(d.archiveDownloadCancel) == 0 {
240+
d.downloadComplete = true
241+
}
242+
}(archive.hash, archive.cid, archiveCancelChan)
193243
}
194244

195245
return nil
196246
}
197247

198248
// downloadSingleArchive downloads a single archive by its CID
199-
func (d *CodexArchiveDownloader) downloadSingleArchive(hash, cid string) error {
249+
func (d *CodexArchiveDownloader) downloadSingleArchive(hash, cid string, cancelChan <-chan struct{}) error {
200250
// Create a context that can be cancelled via our cancel channel
201251
ctx, cancel := context.WithCancel(context.Background())
202252
defer cancel()
203253

204254
// Monitor for cancellation in a separate goroutine
205255
go func() {
206256
select {
207-
case <-d.cancelChan:
257+
case <-cancelChan:
208258
cancel() // Cancel the download immediately
209259
case <-ctx.Done():
210260
// Context already cancelled, nothing to do

protocol/communities/manager_archive.go

Lines changed: 7 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -766,44 +766,6 @@ func (m *ArchiveManager) DownloadHistoryArchivesByIndexCid(communityID types.Hex
766766
downloadTaskInfo.TotalDownloadedArchivesCount = len(existingArchiveIDs)
767767
downloadTaskInfo.TotalArchivesCount = len(index.Archives)
768768

769-
// Create message handler function that processes messages through the same pipeline as torrents
770-
// messageHandler := func(messages []*protobuf.WakuMessage) error {
771-
// // Process messages in chunks like torrent archives (importMessagesChunkSize = 10)
772-
// chunkSize := 10
773-
// for i := 0; i < len(messages); i += chunkSize {
774-
// end := i + chunkSize
775-
// if end > len(messages) {
776-
// end = len(messages)
777-
// }
778-
// messagesChunk := messages[i:end]
779-
780-
// // This would normally call m.handleArchiveMessages(messagesChunk)
781-
// // For now, we just log the processing (TODO: integrate with messenger)
782-
// m.logger.Debug("processing message chunk", zap.Int("count", len(messagesChunk)))
783-
// }
784-
// return nil
785-
// }
786-
787-
// Create the archive processor
788-
// processor := NewCodexArchiveMessageProcessor(m.identity, m.messaging, m.persistence, m.logger, messageHandler)
789-
790-
// Set up callback for when archives are processed
791-
// processor.SetOnArchiveProcessed(func(hash string, from, to uint64) {
792-
// // Publish download signal (signaling that the archive was processed)
793-
// m.publisher.publish(&Subscription{
794-
// HistoryArchiveDownloadedSignal: &signal.HistoryArchiveDownloadedSignal{
795-
// CommunityID: communityID.String(),
796-
// From: int(from),
797-
// To: int(to),
798-
// },
799-
// })
800-
801-
// m.logger.Debug("archive processed successfully",
802-
// zap.String("hash", hash),
803-
// zap.Uint64("from", from),
804-
// zap.Uint64("to", to))
805-
// })
806-
807769
// Create separate cancel channel for the archive downloader to avoid channel competition
808770
archiveDownloaderCancel := make(chan struct{})
809771

@@ -824,14 +786,6 @@ func (m *ArchiveManager) DownloadHistoryArchivesByIndexCid(communityID types.Hex
824786
zap.String("hash", hash),
825787
zap.Uint64("from", from),
826788
zap.Uint64("to", to))
827-
// Process the downloaded archive data
828-
// err := processor.ProcessArchiveData(communityID, hash, archiveData, from, to)
829-
// if err != nil {
830-
// m.logger.Error("failed to process downloaded archive",
831-
// zap.String("hash", hash),
832-
// zap.Error(err))
833-
// return
834-
// }
835789
})
836790

837791
err = archiveDownloader.StartDownload()
@@ -865,8 +819,7 @@ func (m *ArchiveManager) DownloadHistoryArchivesByIndexCid(communityID types.Hex
865819
return downloadTaskInfo, nil
866820
}
867821
if downloadError := archiveDownloader.GetDownloadError(); downloadError != nil {
868-
m.logger.Error("archive download failed", zap.Error(downloadError))
869-
return nil, fmt.Errorf("archive download failed: %w", downloadError)
822+
m.logger.Warn("at least one archive download failed", zap.Error(downloadError))
870823
}
871824

872825
// Update final progress
@@ -883,20 +836,16 @@ func (m *ArchiveManager) DownloadHistoryArchivesByIndexCid(communityID types.Hex
883836
IndexCid: true, // Downloaded via Codex CID
884837
},
885838
})
886-
m.logger.Debug("finished downloading all archives from Codex")
839+
m.logger.Debug("finished downloading archives from Codex")
887840
return downloadTaskInfo, nil
888841
} else {
889842
// Update progress
890843
downloadTaskInfo.TotalDownloadedArchivesCount = archiveDownloader.GetTotalDownloadedArchivesCount()
891-
currentArchive := archiveDownloader.GetCurrentArchiveHash()
892-
if currentArchive != "" {
893-
progress := archiveDownloader.GetArchiveDownloadProgress(currentArchive)
894-
m.logger.Debug("downloading archive",
895-
zap.String("hash", currentArchive),
896-
zap.Int64("bytes", progress),
897-
zap.Int("completed", downloadTaskInfo.TotalDownloadedArchivesCount),
898-
zap.Int("total", downloadTaskInfo.TotalArchivesCount))
899-
}
844+
m.logger.Debug("downloading archives",
845+
zap.Int("completed", downloadTaskInfo.TotalDownloadedArchivesCount),
846+
zap.Int("total", downloadTaskInfo.TotalArchivesCount),
847+
zap.Int("inProgress", archiveDownloader.GetPendingArchivesCount()),
848+
)
900849
}
901850
}
902851
}

0 commit comments

Comments
 (0)