Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions market/retrieval/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ var log = logging.Logger("retrievals")
// activeRequestCounters stores atomic counters for active requests per path+method
var activeRequestCounters sync.Map // map[string]*atomic.Int64

// Request limiters using buffered channels as semaphores
const maxParallelRequests = 256

var (
ipfsRequestLimiter = make(chan struct{}, maxParallelRequests)
ipfsHeadRequestLimiter = make(chan struct{}, maxParallelRequests/2)
pieceRequestLimiter = make(chan struct{}, maxParallelRequests)
)

type Provider struct {
db *harmonydb.DB
bs *remoteblockstore.RemoteBlockstore
Expand Down Expand Up @@ -157,6 +166,27 @@ func decrementActiveRequests(ctx context.Context, counter *atomic.Int64, pathPre
}, remoteblockstore.HttpActiveRequests.M(newValue))
}

// limiterMiddleware limits concurrent requests to the retrieval service
func limiterMiddleware(limiter chan struct{}) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Try to acquire semaphore (non-blocking)
select {
case limiter <- struct{}{}:
// Successfully acquired, ensure we release when done
defer func() { <-limiter }()
// Continue to next handler
next.ServeHTTP(w, r)
default:
// Limit reached, return 429
log.Warnw("Request limit reached", "method", r.Method, "path", r.URL.Path)
w.WriteHeader(http.StatusTooManyRequests)
_, _ = w.Write([]byte("Service temporarily unavailable: too many concurrent requests"))
}
})
}
}

// metricsMiddleware records HTTP metrics for requests
func metricsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -202,9 +232,25 @@ func Router(mux *chi.Mux, rp *Provider) {
// Group retrieval routes with metrics middleware
mux.Group(func(r chi.Router) {
r.Use(metricsMiddleware)
r.Get(piecePrefix+"{cid}", rp.handleByPieceCid)
r.Get(ipfsPrefix+"*", rp.fr.ServeHTTP)
r.Head(ipfsPrefix+"*", rp.fr.ServeHTTP)

// Piece endpoint with limiter
r.Group(func(r chi.Router) {
r.Use(limiterMiddleware(pieceRequestLimiter))
r.Get(piecePrefix+"{cid}", rp.handleByPieceCid)
})

// IPFS endpoints with limiter
r.Group(func(r chi.Router) {
r.Use(limiterMiddleware(ipfsHeadRequestLimiter))
r.Head(ipfsPrefix+"*", rp.fr.ServeHTTP)
})

r.Group(func(r chi.Router) {
r.Use(limiterMiddleware(ipfsRequestLimiter))
r.Get(ipfsPrefix+"*", rp.fr.ServeHTTP)
})

// Info endpoint without limiter
r.Get(infoPage, handleInfo)
})
}
Expand Down