Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test ref dir #1453

Draft
wants to merge 8 commits into
base: sprint-1.16
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions code/go/0chain.net/blobbercore/challenge/protocol.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package challenge

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
Expand Down Expand Up @@ -221,14 +223,17 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
return err
}
allocMu.RUnlock()

postDataBytes, err := json.Marshal(postData)
logging.Logger.Info("[challenge]post: ", zap.Any("challenge_id", cr.ChallengeID), zap.Any("post_data_len", len(postDataBytes)/(1024*1024)))
validationTicketPayload := new(bytes.Buffer)
gw := gzip.NewWriter(validationTicketPayload)
err = json.NewEncoder(gw).Encode(postData)
if err != nil {
logging.Logger.Error("[db]form: " + err.Error())
logging.Logger.Error("json encoding failed: " + err.Error())
cr.CancelChallenge(ctx, err)
return err
}
gw.Close() //nolint:errcheck
logging.Logger.Info("[challenge]post: ", zap.Any("challenge_id", cr.ChallengeID), zap.Any("post_data_len", validationTicketPayload.Len()/(1024*1024)))

responses := make(map[string]ValidationTicket)
if cr.ValidationTickets == nil {
cr.ValidationTickets = make([]*ValidationTicket, len(cr.Validators))
Expand Down Expand Up @@ -264,7 +269,7 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
go func(url, validatorID string, i int) {
defer swg.Done()

resp, err := util.SendPostRequest(url, postDataBytes, nil)
resp, err := util.SendPostRequest(url, validationTicketPayload.Bytes(), nil)
if err != nil {
numFailed++
logging.Logger.Error("[challenge]post: ", zap.Any("error", err.Error()))
Expand Down
2 changes: 1 addition & 1 deletion code/go/0chain.net/blobbercore/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func SetupDefaultConfig() {
viper.SetDefault("finalize_allocations_interval", time.Duration(-1))

viper.SetDefault("max_dirs_files", 50000)
viper.SetDefault("max_objects_dir", 1000)
viper.SetDefault("max_objects_dir", 10000)
}

/*SetupConfig - setup the configuration system */
Expand Down
2 changes: 1 addition & 1 deletion code/go/0chain.net/blobbercore/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func setupHandlers(r *mux.Router) {
RateLimitByGeneralRL(common.ToJSONResponse(WithReadOnlyConnection(FileStatsHandler)))) // TODO: add swagger

r.HandleFunc("/v1/file/referencepath/{allocation}",
RateLimitByObjectRL(common.ToJSONResponse(WithReadOnlyConnection(ReferencePathHandler)))) // TODO: add handler
RateLimitByObjectRL(common.ToGzipJSONResponse(WithReadOnlyConnection(ReferencePathHandler)))) // TODO: add handler

r.HandleFunc("/v1/file/latestwritemarker/{allocation}",
RateLimitByObjectRL(common.ToJSONResponse(WithReadOnlyConnection(WriteMarkerHandler))))
Expand Down
7 changes: 5 additions & 2 deletions code/go/0chain.net/blobbercore/reference/referencepath.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path/filepath"
"strings"
"sync"
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
Expand Down Expand Up @@ -126,7 +127,7 @@ func GetReferencePathFromPaths(ctx context.Context, allocationID string, paths,
var refs []Ref
t := datastore.GetStore().GetTransaction(ctx)
db := t.DB

now := time.Now()
pathsAdded := make(map[string]bool)
var shouldOr bool
for _, path := range paths {
Expand Down Expand Up @@ -178,7 +179,7 @@ func GetReferencePathFromPaths(ctx context.Context, allocationID string, paths,
refMap[refs[i].Path] = &refs[i]

}

elapsedRef := time.Since(now)
for _, path := range objTreePath {
ref, err := GetObjectTree(ctx, allocationID, path)
if err != nil {
Expand All @@ -196,6 +197,8 @@ func GetReferencePathFromPaths(ctx context.Context, allocationID string, paths,
refMap[ref.Path].childrenLoaded = true
}
}
elapsedObjectTree := time.Since(now) - elapsedRef
logging.Logger.Info("getReferencePathFromPaths", zap.Duration("total_time", time.Since(now)), zap.Duration("elapsed_ref", elapsedRef), zap.Duration("elapsed_obj_tree", elapsedObjectTree))

return &refs[0], nil
}
Expand Down
65 changes: 65 additions & 0 deletions code/go/0chain.net/core/common/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package common

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"net/http"
"regexp"
"strings"

"github.com/pierrec/lz4/v4"
)

const (
Expand Down Expand Up @@ -60,6 +63,50 @@ func Respond(w http.ResponseWriter, data interface{}, err error) {
}
}

func RespondGzip(w http.ResponseWriter, data any, err error) {
w.Header().Set("Access-Control-Allow-Origin", "*") // CORS for all.
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Content-Type-Options", "nosniff")
if err != nil {
data := make(map[string]interface{}, 2)
data["error"] = err.Error()
if cerr, ok := err.(*Error); ok {
data["code"] = cerr.Code
}
buf := bytes.NewBuffer(nil)
json.NewEncoder(buf).Encode(data) //nolint:errcheck // checked in previous step
w.WriteHeader(400)
fmt.Fprintln(w, buf.String())
} else if data != nil {
w.Header().Set("Content-Encoding", "gzip")
gw := gzip.NewWriter(w)
defer gw.Close()
json.NewEncoder(gw).Encode(data) //nolint:errcheck // checked in previous step
}
}

func RespondLz4(w http.ResponseWriter, data any, err error) {
w.Header().Set("Access-Control-Allow-Origin", "*") // CORS for all.
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Content-Type-Options", "nosniff")
if err != nil {
data := make(map[string]interface{}, 2)
data["error"] = err.Error()
if cerr, ok := err.(*Error); ok {
data["code"] = cerr.Code
}
buf := bytes.NewBuffer(nil)
json.NewEncoder(buf).Encode(data) //nolint:errcheck // checked in previous step
w.WriteHeader(400)
fmt.Fprintln(w, buf.String())
} else if data != nil {
w.Header().Set("Content-Encoding", "lz4")
lw := lz4.NewWriter(w)
defer lw.Close()
json.NewEncoder(lw).Encode(data) //nolint:errcheck // checked in previous step
}
}

var domainRE = regexp.MustCompile(`^(?:https?:\/\/)?(?:[^@\/\n]+@)?(?:www\.)?([^:\/\n]+)`) //nolint:unused,deadcode,varcheck // might be used later?

func ToByteStream(handler JSONResponderF) ReqRespHandlerf {
Expand Down Expand Up @@ -119,6 +166,24 @@ func ToJSONResponse(handler JSONResponderF) ReqRespHandlerf {
}
}

/*ToGzipJSONResponse - An adapter that takes a handler of the form
* func AHandler(r *http.Request) (interface{}, error)
* which takes a request object, processes and returns an object or an error
* and converts into a standard request/response handler
*/
func ToGzipJSONResponse(handler JSONResponderF) ReqRespHandlerf {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*") // CORS for all.
if r.Method == "OPTIONS" {
SetupCORSResponse(w, r)
return
}
ctx := r.Context()
data, err := handler(ctx, r)
RespondGzip(w, data, err)
}
}

/*ToJSONReqResponse - An adapter that takes a handler of the form
* func AHandler(json map[string]interface{}) (interface{}, error)
* which takes a parsed json map from the request, processes and returns an object or an error
Expand Down
14 changes: 2 additions & 12 deletions code/go/0chain.net/core/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ const SLEEP_BETWEEN_RETRIES = 5
func NewHTTPRequest(method, url string, data []byte) (*http.Request, context.Context, context.CancelFunc, error) {
requestHash := encryption.Hash(data)
req, err := http.NewRequest(method, url, bytes.NewBuffer(data))
req.Header.Set("Content-Type", "application/json; charset=utf-8")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("Access-Control-Allow-Origin", "*")
req.Header.Set("X-App-Client-ID", node.Self.ID)
req.Header.Set("X-App-Client-Key", node.Self.PublicKey)
Expand All @@ -32,16 +33,6 @@ func NewHTTPRequest(method, url string, data []byte) (*http.Request, context.Con
return req, ctx, cncl, err
}

func SendMultiPostRequest(urls []string, data []byte) {
wg := sync.WaitGroup{}
wg.Add(len(urls))

for _, url := range urls {
go SendPostRequest(url, data, &wg) //nolint:errcheck // goroutines
}
wg.Wait()
}

func SendPostRequest(postURL string, data []byte, wg *sync.WaitGroup) (body []byte, err error) {
if wg != nil {
defer wg.Done()
Expand All @@ -63,7 +54,6 @@ func SendPostRequest(postURL string, data []byte, wg *sync.WaitGroup) (body []by

req, ctx, cncl, err = NewHTTPRequest(http.MethodPost, u.String(), data)
defer cncl()

resp, err = http.DefaultClient.Do(req.WithContext(ctx))
if err == nil {
if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
Expand Down
10 changes: 8 additions & 2 deletions code/go/0chain.net/validatorcore/storage/challenge_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storage

import (
"compress/gzip"
"context"
"encoding/hex"
"encoding/json"
Expand Down Expand Up @@ -66,9 +67,14 @@ func NewChallengeRequest(r *http.Request) (*ChallengeRequest, string, error) {
requestHash := r.Header.Get("X-App-Request-Hash")
h := sha3.New256()
tReader := io.TeeReader(r.Body, h)
var requestBody io.Reader
if strings.Contains(r.Header.Get("Content-Encoding"), "gzip") {
requestBody, _ = gzip.NewReader(tReader)
} else {
requestBody = tReader
}
var challengeRequest ChallengeRequest
decoder := json.NewDecoder(tReader)
err := decoder.Decode(&challengeRequest)
err := json.NewDecoder(requestBody).Decode(&challengeRequest)
if err != nil {
logging.Logger.Error("Error decoding the input to validator")
return nil, "", common.NewError("input_decode_error", "Error in decoding the input."+err.Error())
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/sts v1.26.5 // indirect
github.com/aws/smithy-go v1.19.0 // indirect
github.com/hitenjain14/fasthttp v0.0.0-20240527123209-06019e79bff9 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.51.0 // indirect
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,8 @@ github.com/philhofer/fwd v1.1.2-0.20210722190033-5c56ac6d0bb9 h1:6ob53CVz+ja2i7e
github.com/philhofer/fwd v1.1.2-0.20210722190033-5c56ac6d0bb9/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
Loading