Skip to content

Commit

Permalink
Merge pull request #1416 from 0chain/sprint-1.14
Browse files Browse the repository at this point in the history
Sprint 1.14
  • Loading branch information
dabasov authored May 5, 2024
2 parents d8014c0 + ce3d2df commit 688a74b
Show file tree
Hide file tree
Showing 53 changed files with 1,044 additions and 643 deletions.
1 change: 1 addition & 0 deletions code/go/0chain.net/blobber/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func initHandlers(r *mux.Router, devMode bool) {
handler.SetupHandlers(r)
handler.SetupSwagger()
common.SetAdminCredentials(devMode)
common.Set0boxDetails()
}

func initProfHandlers(mux *http.ServeMux) {
Expand Down
16 changes: 9 additions & 7 deletions code/go/0chain.net/blobbercore/allocation/allocationchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ type Result struct {
PrevValidationRoot string
ThumbnailHash string
PrevThumbnailHash string
FilestoreVersion int
}

// TODO: Need to speed up this function
Expand All @@ -281,7 +282,7 @@ func (a *AllocationChangeCollector) MoveToFilestore(ctx context.Context) error {

err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
tx := datastore.GetStore().GetTransaction(ctx)
err := tx.Model(&reference.Ref{}).Clauses(clause.Locking{Strength: "NO KEY UPDATE"}).Select("id", "validation_root", "thumbnail_hash", "prev_validation_root", "prev_thumbnail_hash").Where("allocation_id=? AND is_precommit=? AND type=?", a.AllocationID, true, reference.FILE).
err := tx.Model(&reference.Ref{}).Clauses(clause.Locking{Strength: "NO KEY UPDATE"}).Select("id", "validation_root", "thumbnail_hash", "prev_validation_root", "prev_thumbnail_hash", "filestore_version").Where("allocation_id=? AND is_precommit=? AND type=?", a.AllocationID, true, reference.FILE).
FindInBatches(&refs, 50, func(tx *gorm.DB, batch int) error {

for _, ref := range refs {
Expand All @@ -303,27 +304,27 @@ func (a *AllocationChangeCollector) MoveToFilestore(ctx context.Context) error {
}()

if count == 0 && ref.PrevValidationRoot != "" {
err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevValidationRoot)
err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevValidationRoot, ref.FilestoreVersion)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()),
zap.String("validation_root", ref.ValidationRoot))
}
}
err := filestore.GetFileStore().MoveToFilestore(a.AllocationID, ref.ValidationRoot)
err := filestore.GetFileStore().MoveToFilestore(a.AllocationID, ref.ValidationRoot, ref.FilestoreVersion)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while moving file: %s", err.Error()),
zap.String("validation_root", ref.ValidationRoot))
}

if ref.ThumbnailHash != "" && ref.ThumbnailHash != ref.PrevThumbnailHash {
if ref.PrevThumbnailHash != "" {
err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevThumbnailHash)
err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevThumbnailHash, ref.FilestoreVersion)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting thumbnail file: %s", err.Error()),
zap.String("thumbnail_hash", ref.ThumbnailHash))
}
}
err := filestore.GetFileStore().MoveToFilestore(a.AllocationID, ref.ThumbnailHash)
err := filestore.GetFileStore().MoveToFilestore(a.AllocationID, ref.ThumbnailHash, ref.FilestoreVersion)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while moving thumbnail file: %s", err.Error()),
zap.String("thumbnail_hash", ref.ThumbnailHash))
Expand Down Expand Up @@ -380,15 +381,16 @@ func deleteFromFileStore(ctx context.Context, allocationID string) error {
}()

if count == 0 {
err := filestore.GetFileStore().DeleteFromFilestore(allocationID, res.ValidationRoot)
err := filestore.GetFileStore().DeleteFromFilestore(allocationID, res.ValidationRoot,
res.FilestoreVersion)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()),
zap.String("validation_root", res.ValidationRoot))
}
}

if res.ThumbnailHash != "" {
err := filestore.GetFileStore().DeleteFromFilestore(allocationID, res.ThumbnailHash)
err := filestore.GetFileStore().DeleteFromFilestore(allocationID, res.ThumbnailHash, res.FilestoreVersion)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting thumbnail: %s", err.Error()),
zap.String("thumbnail", res.ThumbnailHash))
Expand Down
10 changes: 5 additions & 5 deletions code/go/0chain.net/blobbercore/allocation/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,22 @@ func (mfs *MockFileStore) CommitWrite(allocID, connID string, fileData *filestor
return true, nil
}

func (mfs *MockFileStore) MoveToFilestore(allocID, hash string) error {
func (mfs *MockFileStore) MoveToFilestore(allocID, hash string, version int) error {
return nil
}

func (mfs *MockFileStore) DeleteAllocation(allocID string) {
}

func (mfs *MockFileStore) DeleteFromFilestore(allocID, hash string) error {
func (mfs *MockFileStore) DeleteFromFilestore(allocID, hash string, version int) error {
return nil
}

func (mfs *MockFileStore) DeleteTempFile(allocID, connID string, fileData *filestore.FileInputData) error {
return nil
}

func (mfs *MockFileStore) DeleteFile(allocID, contentHash string) error {
func (mfs *MockFileStore) DeleteFile(allocID, contentHash string, version int) error {
return nil
}

Expand All @@ -64,7 +64,7 @@ func (mfs *MockFileStore) GetFileBlock(rin *filestore.ReadBlockInput) (*filestor
return nil, nil
}

func (mfs *MockFileStore) GetFilePathSize(allocID, contentHash, thumbHash string) (int64, int64, error) {
func (mfs *MockFileStore) GetFilePathSize(allocID, contentHash, thumbHash string, version int) (int64, int64, error) {
return 0, 0, nil
}

Expand Down Expand Up @@ -112,7 +112,7 @@ func (mfs *MockFileStore) CalculateCurrentDiskCapacity() error {
return nil
}

func (mfs *MockFileStore) GetPathForFile(allocID, contentHash string) (string, error) {
func (mfs *MockFileStore) GetPathForFile(allocID, contentHash string, version int) (string, error) {
return "", nil
}

Expand Down
5 changes: 3 additions & 2 deletions code/go/0chain.net/blobbercore/allocation/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func SaveFileChange(connectionID, pathHash, fileName string, cmd FileCommand, is
return false, common.NewError("connection_not_found", "connection not found for save file change")
}
connectionObj.lock.Lock()
connectionObj.UpdatedAt = time.Now()
change := connectionObj.changes[pathHash]
saveChange := false
if change == nil {
Expand Down Expand Up @@ -208,7 +209,7 @@ func SaveFileChange(connectionID, pathHash, fileName string, cmd FileCommand, is
change.seqPQ.Done(seqpriorityqueue.UploadData{
Offset: offset,
DataBytes: dataWritten,
})
}, contentSize)
} else {
change.seqPQ.Push(seqpriorityqueue.UploadData{
Offset: offset,
Expand Down Expand Up @@ -256,7 +257,7 @@ func cleanConnectionObj() {
connectionObj.cnclCtx()
for _, change := range connectionObj.changes {
if change.seqPQ != nil {
change.seqPQ.Done(seqpriorityqueue.UploadData{})
change.seqPQ.Done(seqpriorityqueue.UploadData{}, 1)
}
}
delete(connectionProcessor, connectionID)
Expand Down
11 changes: 6 additions & 5 deletions code/go/0chain.net/blobbercore/allocation/deletefilechange.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,18 @@ func (nf *DeleteFileChange) DeleteTempFile() error {
func (nf *DeleteFileChange) CommitToFileStore(ctx context.Context, mut *sync.Mutex) error {
db := datastore.GetStore().GetTransaction(ctx)
type Result struct {
Id string
ValidationRoot string
ThumbnailHash string
Id string
ValidationRoot string
ThumbnailHash string
FilestoreVersion int
}

limitCh := make(chan struct{}, 10)
wg := &sync.WaitGroup{}
var results []Result
mut.Lock()
err := db.Model(&reference.Ref{}).Unscoped().
Select("id", "validation_root", "thumbnail_hash").
Select("id", "validation_root", "thumbnail_hash", "filestore_version").
Where("allocation_id=? AND path LIKE ? AND type=? AND deleted_at is not NULL",
nf.AllocationID, nf.Path+"%", reference.FILE).
FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error {
Expand All @@ -96,7 +97,7 @@ func (nf *DeleteFileChange) CommitToFileStore(ctx context.Context, mut *sync.Mut
}()

if count == 0 {
err := filestore.GetFileStore().DeleteFile(nf.AllocationID, res.ValidationRoot)
err := filestore.GetFileStore().DeleteFile(nf.AllocationID, res.ValidationRoot, res.FilestoreVersion)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()),
zap.String("validation_root", res.ValidationRoot))
Expand Down
1 change: 1 addition & 0 deletions code/go/0chain.net/blobbercore/allocation/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Allocation struct {
BlobberSize int64 `gorm:"column:blobber_size;not null;default:0"`
BlobberSizeUsed int64 `gorm:"column:blobber_size_used;not null;default:0"`
LatestRedeemedWM string `gorm:"column:latest_redeemed_write_marker;size:64"`
LastRedeemedSeq int64 `gorm:"column:last_redeemed_sequence;default:0"`
IsRedeemRequired bool `gorm:"column:is_redeem_required"`
TimeUnit time.Duration `gorm:"column:time_unit;not null;default:172800000000000"`
StartTime common.Timestamp `gorm:"column:start_time;not null"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (nf *UpdateFileChanger) ApplyChange(ctx context.Context, rootRef *reference
nf.deleteHash = make(map[string]int)

if fileRef.ValidationRoot != "" && fileRef.ValidationRoot != nf.ValidationRoot {
nf.deleteHash[fileRef.ValidationRoot] = int(CONTENT)
nf.deleteHash[fileRef.ValidationRoot] = fileRef.FilestoreVersion
}

fileRef.ActualFileHash = nf.ActualHash
Expand All @@ -99,13 +99,14 @@ func (nf *UpdateFileChanger) ApplyChange(ctx context.Context, rootRef *reference
fileRef.EncryptedKeyPoint = nf.EncryptedKeyPoint
fileRef.ChunkSize = nf.ChunkSize
fileRef.IsPrecommit = true
fileRef.FilestoreVersion = filestore.VERSION

return rootRef, nil
}

func (nf *UpdateFileChanger) CommitToFileStore(ctx context.Context, mut *sync.Mutex) error {
db := datastore.GetStore().GetTransaction(ctx)
for hash := range nf.deleteHash {
for hash, version := range nf.deleteHash {
var count int64
mut.Lock()
err := db.Table((&reference.Ref{}).TableName()).
Expand All @@ -115,7 +116,7 @@ func (nf *UpdateFileChanger) CommitToFileStore(ctx context.Context, mut *sync.Mu
mut.Unlock()
if err == nil && count == 0 {
logging.Logger.Info("Deleting content file", zap.String("validation_root", hash))
if err := filestore.GetFileStore().DeleteFile(nf.AllocationID, hash); err != nil {
if err := filestore.GetFileStore().DeleteFile(nf.AllocationID, hash, version); err != nil {
logging.Logger.Error("FileStore_DeleteFile", zap.String("allocation_id", nf.AllocationID), zap.Error(err))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/util"
Expand Down Expand Up @@ -119,6 +120,7 @@ func (nf *UploadFileChanger) applyChange(ctx context.Context, rootRef *reference
UpdatedAt: ts,
HashToBeComputed: true,
IsPrecommit: true,
FilestoreVersion: filestore.VERSION,
}

fileID, ok := fileIDMeta[newFile.Path]
Expand Down
5 changes: 4 additions & 1 deletion code/go/0chain.net/blobbercore/allocation/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (r *Repository) GetAllocationIds(ctx context.Context) []Res {

}

func (r *Repository) UpdateAllocationRedeem(ctx context.Context, allocationID, AllocationRoot string, allocationObj *Allocation) error {
func (r *Repository) UpdateAllocationRedeem(ctx context.Context, allocationID, AllocationRoot string, allocationObj *Allocation, redeemSeq int64) error {
var tx = datastore.GetStore().GetTransaction(ctx)
if tx == nil {
logging.Logger.Panic("no transaction in the context")
Expand All @@ -205,17 +205,20 @@ func (r *Repository) UpdateAllocationRedeem(ctx context.Context, allocationID, A
allocationUpdates := make(map[string]interface{})
allocationUpdates["latest_redeemed_write_marker"] = AllocationRoot
allocationUpdates["is_redeem_required"] = false
allocationUpdates["last_redeemed_sequence"] = redeemSeq
err = tx.Model(allocationObj).Updates(allocationUpdates).Error
if err != nil {
return err
}
allocationObj.LatestRedeemedWM = AllocationRoot
allocationObj.IsRedeemRequired = false
allocationObj.LastRedeemedSeq = redeemSeq
txnCache := cache[allocationID]
txnCache.Allocation = allocationObj
updateAlloc := func(a *Allocation) {
a.LatestRedeemedWM = AllocationRoot
a.IsRedeemRequired = false
a.LastRedeemedSeq = redeemSeq
}
txnCache.AllocationUpdates = append(txnCache.AllocationUpdates, updateAlloc)
cache[allocationID] = txnCache
Expand Down
10 changes: 6 additions & 4 deletions code/go/0chain.net/blobbercore/blobberhttp/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@ type ConnectionResult struct {

// swagger:model CommitResult
type CommitResult struct {
AllocationRoot string `json:"allocation_root"`
WriteMarker *writemarker.WriteMarker `json:"write_marker"`
Success bool `json:"success"`
ErrorMessage string `json:"error_msg,omitempty"`
AllocationRoot string `json:"allocation_root"`
WriteMarker *writemarker.WriteMarkerEntity `json:"write_marker"`
Success bool `json:"success"`
ErrorMessage string `json:"error_msg,omitempty"`
//Result []*UploadResult `json:"result"`
}

// swagger:model ReferencePathResult
type ReferencePathResult struct {
*reference.ReferencePath
LatestWM *writemarker.WriteMarker `json:"latest_write_marker"`
Version string `json:"version"`
}

// swagger:model RefResult
Expand Down Expand Up @@ -65,4 +66,5 @@ type DownloadResponse struct {
type LatestWriteMarkerResult struct {
LatestWM *writemarker.WriteMarker `json:"latest_write_marker"`
PrevWM *writemarker.WriteMarker `json:"prev_write_marker"`
Version string `json:"version"`
}
11 changes: 6 additions & 5 deletions code/go/0chain.net/blobbercore/challenge/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,12 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
}

challengeReadInput := &filestore.ChallengeReadBlockInput{
Hash: objectPath.Meta["validation_root"].(string),
FileSize: objectPath.Meta["size"].(int64),
BlockOffset: blockoffset,
AllocationID: cr.AllocationID,
IsPrecommit: fromPreCommit,
Hash: objectPath.Meta["validation_root"].(string),
FileSize: objectPath.Meta["size"].(int64),
BlockOffset: blockoffset,
AllocationID: cr.AllocationID,
IsPrecommit: fromPreCommit,
FilestoreVersion: objectPath.FilestoreVersion,
}

t1 := time.Now()
Expand Down
9 changes: 9 additions & 0 deletions code/go/0chain.net/blobbercore/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ func SetupDefaultConfig() {
viper.SetDefault("openconnection_cleaner.frequency", 30)
viper.SetDefault("writemarker_redeem.frequency", 10)
viper.SetDefault("writemarker_redeem.num_workers", 5)
viper.SetDefault("writemarker_redeem.max_chain_length", 32)
viper.SetDefault("writemarker_redeem.max_timestamp_gap", 1800)
viper.SetDefault("writemarker_redeem.marker_redeem_interval", time.Minute*10)
viper.SetDefault("readmarker_redeem.frequency", 10)
viper.SetDefault("readmarker_redeem.num_workers", 5)
viper.SetDefault("challenge_response.frequency", 10)
Expand Down Expand Up @@ -100,6 +103,9 @@ type Config struct {
OpenConnectionWorkerTolerance int64
WMRedeemFreq int64
WMRedeemNumWorkers int
MaxChainLength int
MaxTimestampGap int64
MarkerRedeemInterval time.Duration
RMRedeemFreq int64
RMRedeemNumWorkers int
ChallengeResolveFreq int64
Expand Down Expand Up @@ -218,6 +224,9 @@ func ReadConfig(deploymentMode int) {

Configuration.WMRedeemFreq = viper.GetInt64("writemarker_redeem.frequency")
Configuration.WMRedeemNumWorkers = viper.GetInt("writemarker_redeem.num_workers")
Configuration.MaxChainLength = viper.GetInt("writemarker_redeem.max_chain_length")
Configuration.MaxTimestampGap = viper.GetInt64("writemarker_redeem.max_timestamp_gap")
Configuration.MarkerRedeemInterval = viper.GetDuration("writemarker_redeem.marker_redeem_interval")

Configuration.RMRedeemFreq = viper.GetInt64("readmarker_redeem.frequency")
Configuration.RMRedeemNumWorkers = viper.GetInt("readmarker_redeem.num_workers")
Expand Down
6 changes: 3 additions & 3 deletions code/go/0chain.net/blobbercore/convert/response_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ func CommitWriteResponseCreator(r interface{}) *blobbergrpc.CommitResponse {

return &blobbergrpc.CommitResponse{
AllocationRoot: httpResp.AllocationRoot,
WriteMarker: WriteMarkerToWriteMarkerGRPC(httpResp.WriteMarker),
ErrorMessage: httpResp.ErrorMessage,
Success: httpResp.Success,
// WriteMarker: WriteMarkerToWriteMarkerGRPC(httpResp.WriteMarker),
ErrorMessage: httpResp.ErrorMessage,
Success: httpResp.Success,
}
}

Expand Down
6 changes: 3 additions & 3 deletions code/go/0chain.net/blobbercore/convert/response_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ func GetObjectTreeResponseHandler(getObjectTreeResponse *blobbergrpc.GetObjectTr
func CommitWriteResponseHandler(resp *blobbergrpc.CommitResponse) *blobberhttp.CommitResult {
return &blobberhttp.CommitResult{
AllocationRoot: resp.AllocationRoot,
WriteMarker: WriteMarkerGRPCToWriteMarker(resp.WriteMarker),
Success: resp.Success,
ErrorMessage: resp.ErrorMessage,
// WriteMarker: WriteMarkerGRPCToWriteMarker(resp.WriteMarker),
Success: resp.Success,
ErrorMessage: resp.ErrorMessage,
}
}

Expand Down
8 changes: 4 additions & 4 deletions code/go/0chain.net/blobbercore/filestore/mock_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (fs *MockStore) DeleteTempFile(allocID, connID string, fileData *FileInputD
return fs.FileStore.DeleteTempFile(allocID, connID, fileData)
}

func (fs *MockStore) DeleteFile(allocationID, contentHash string) error {
return fs.FileStore.DeleteFile(allocationID, contentHash)
func (fs *MockStore) DeleteFile(allocationID, contentHash string, version int) error {
return fs.FileStore.DeleteFile(allocationID, contentHash, version)
}

func (fs *MockStore) GetFileBlock(rbi *ReadBlockInput) (*FileDownloadResponse, error) {
Expand Down Expand Up @@ -98,8 +98,8 @@ func (fs *MockStore) CalculateCurrentDiskCapacity() error {
return fs.FileStore.CalculateCurrentDiskCapacity()
}

func (fs *MockStore) GetPathForFile(allocID, contentHash string) (string, error) {
return fs.FileStore.GetPathForFile(allocID, contentHash)
func (fs *MockStore) GetPathForFile(allocID, contentHash string, version int) (string, error) {
return fs.FileStore.GetPathForFile(allocID, contentHash, version)
}

func (fs *MockStore) UpdateAllocationMetaData(m map[string]interface{}) error {
Expand Down
Loading

0 comments on commit 688a74b

Please sign in to comment.