Skip to content

Commit

Permalink
Merge pull request #1383 from 0chain/sprint-1.12
Browse files Browse the repository at this point in the history
Sprint 1.12
  • Loading branch information
dabasov authored Feb 8, 2024
2 parents 06be785 + 1d81b05 commit cce361c
Show file tree
Hide file tree
Showing 20 changed files with 259 additions and 101 deletions.
27 changes: 16 additions & 11 deletions .github/workflows/build-&-publish-docker-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
- name: Setup go
uses: actions/setup-go@v4
with:
go-version: ^1.20 # The Go version to download (if necessary) and use.
go-version: ^1.21 # The Go version to download (if necessary) and use.

- name: Clone blobber
uses: actions/checkout@v3
Expand All @@ -71,7 +71,7 @@ jobs:
docker tag $BLOBBER_BUILD_BASE_REGISTRY:staging $BLOBBER_BUILDBASE
- name: Build Base image
if: contains(steps.changed-files.outputs.modified_files, 'docker.local/base.Dockerfile')
# if: contains(steps.changed-files.outputs.modified_files, 'docker.local/base.Dockerfile')
run: |
SHORT_SHA=$(echo ${{ env.SHA }} | head -c 8)
Expand All @@ -84,14 +84,17 @@ jobs:
- name: Build blobber
run: |
SHORT_SHA=$(echo ${{ env.SHA }} | head -c 8)
export DOCKER_IMAGE_BASE="${BLOBBER_REGISTRY}:base"
export DOCKER_IMAGE_BASE="$BLOBBER_BUILD_BASE_REGISTRY:$TAG"
export DOCKER_IMAGE_SWAGGER="${BLOBBER_REGISTRY}:swagger_test"
export DOCKER_BUILD="buildx build --platform linux/amd64,linux/arm64 --push"
export DOCKER_IMAGE_BLOBBER="-t ${BLOBBER_REGISTRY}:${TAG} -t ${BLOBBER_REGISTRY}:${TAG}-${SHORT_SHA}"
# export DOCKER_BUILD="buildx build --platform linux/amd64,linux/arm64 --push"
export DOCKER_BUILD="build --push"
export DOCKER_IMAGE_BLOBBER="-t ${BLOBBER_REGISTRY}:${TAG}"
export CONTEXT_NAME="$RUNNER_NAME" && (docker context inspect "$CONTEXT_NAME" >/dev/null 2>&1 || docker context create "$CONTEXT_NAME")
docker buildx inspect "blobber-$RUNNER_NAME" || docker buildx create --name "blobber-$RUNNER_NAME" --driver-opt network=host --buildkitd-flags '--allow-insecure-entitlement security.insecure' "$CONTEXT_NAME"
docker buildx use "blobber-$RUNNER_NAME"
./docker.local/bin/build.blobber.sh
docker tag ${BLOBBER_REGISTRY}:${TAG} ${BLOBBER_REGISTRY}:${TAG}-${SHORT_SHA}
docker push ${BLOBBER_REGISTRY}:${TAG}-${SHORT_SHA}
validator:
timeout-minutes: 30
Expand All @@ -118,7 +121,7 @@ jobs:
- name: Setup go
uses: actions/setup-go@v4
with:
go-version: ^1.20 # The Go version to download (if necessary) and use.
go-version: ^1.21 # The Go version to download (if necessary) and use.

- name: Clone blobber
uses: actions/checkout@v3
Expand All @@ -145,7 +148,7 @@ jobs:
docker tag $BLOBBER_BUILD_BASE_REGISTRY:staging $BLOBBER_BUILDBASE
- name: Build Base image
if: contains(steps.changed-files.outputs.modified_files, 'docker.local/base.Dockerfile')
# if: contains(steps.changed-files.outputs.modified_files, 'docker.local/base.Dockerfile')
run: |
SHORT_SHA=$(echo ${{ env.SHA }} | head -c 8)
Expand All @@ -158,14 +161,16 @@ jobs:
- name: Build validator
run: |
SHORT_SHA=$(echo ${{ env.SHA }} | head -c 8)
export DOCKER_IMAGE_BASE="${VALIDATOR_REGISTRY}:base"
export DOCKER_BUILD="buildx build --platform linux/amd64,linux/arm64 --push"
export DOCKER_IMAGE_VALIDATOR="-t ${VALIDATOR_REGISTRY}:${TAG} -t ${VALIDATOR_REGISTRY}:${TAG}-${SHORT_SHA}"
export DOCKER_IMAGE_BASE="$BLOBBER_BUILD_BASE_REGISTRY:$TAG"
# export DOCKER_BUILD="buildx build --platform linux/amd64,linux/arm64 --push"
export DOCKER_BUILD="build --push"
export DOCKER_IMAGE_VALIDATOR="-t ${VALIDATOR_REGISTRY}:${TAG}"
export CONTEXT_NAME="$RUNNER_NAME" && (docker context inspect "$CONTEXT_NAME" >/dev/null 2>&1 || docker context create "$CONTEXT_NAME")
docker buildx inspect "validator-$RUNNER_NAME" || docker buildx create --name "validator-$RUNNER_NAME" --driver-opt network=host --buildkitd-flags '--allow-insecure-entitlement security.insecure' "$CONTEXT_NAME"
docker buildx use "validator-$RUNNER_NAME"
./docker.local/bin/build.validator.sh
docker tag ${VALIDATOR_REGISTRY}:${TAG} ${VALIDATOR_REGISTRY}:${TAG}-${SHORT_SHA}
docker push ${VALIDATOR_REGISTRY}:${TAG}-${SHORT_SHA}
system-tests:
if: github.event_name != 'workflow_dispatch'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build-for-conductor-testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
- name: Setup go
uses: actions/setup-go@v3
with:
go-version: ^1.20 # The Go version to download (if necessary) and use.
go-version: ^1.21 # The Go version to download (if necessary) and use.

- name: Clone blobber
uses: actions/checkout@v1
Expand Down
5 changes: 5 additions & 0 deletions code/go/0chain.net/blobbercore/allocation/copyfilechange.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func (rf *CopyFileChange) ApplyChange(ctx context.Context, rootRef *reference.Re
}
}

if len(dirRef.Children) >= config.Configuration.MaxObjectsInDir {
return nil, common.NewErrorf("max_objects_in_dir_reached",
"maximum objects in directory %s reached: %v", dirRef.Path, config.Configuration.MaxObjectsInDir)
}

if !found {
newRef := reference.NewDirectoryRef()
newRef.AllocationID = rf.AllocationID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ func TestBlobberCore_CopyFile(t *testing.T) {
tc.setupDbMock()

config.Configuration.MaxAllocationDirFiles = tc.maxDirFilesPerAlloc
config.Configuration.MaxObjectsInDir = 1000

ctx := datastore.GetStore().CreateTransaction(context.TODO())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ func (nf *UploadFileChanger) applyChange(ctx context.Context, rootRef *reference
found = true
}
}

if len(dirRef.Children) >= config.Configuration.MaxObjectsInDir {
return nil, common.NewErrorf("max_objects_in_dir_reached",
"maximum objects in directory %s reached: %v", dirRef.Path, config.Configuration.MaxObjectsInDir)
}

if !found {
newRef := reference.NewDirectoryRef()
newRef.AllocationID = dirRef.AllocationID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func TestBlobberCore_FileChangerUpload(t *testing.T) {
tc.setupDbMock()

config.Configuration.MaxAllocationDirFiles = tc.maxDirFilesPerAlloc
config.Configuration.MaxObjectsInDir = 1000

ctx := datastore.GetStore().CreateTransaction(context.TODO())

Expand Down
6 changes: 6 additions & 0 deletions code/go/0chain.net/blobbercore/allocation/movefilechange.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"sync"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
)
Expand Down Expand Up @@ -91,6 +92,11 @@ func (rf *MoveFileChange) ApplyChange(ctx context.Context, rootRef *reference.Re
}
}

if len(dirRef.Children) >= config.Configuration.MaxObjectsInDir {
return nil, common.NewErrorf("max_objects_in_dir_reached",
"maximum objects in directory %s reached: %v", dirRef.Path, config.Configuration.MaxObjectsInDir)
}

if !found {
newRef := reference.NewDirectoryRef()
newRef.AllocationID = rf.AllocationID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ func TestBlobberCore_MoveFile(t *testing.T) {
tc.setupDbMock()

config.Configuration.MaxAllocationDirFiles = tc.maxDirFilesPerAlloc
config.Configuration.MaxObjectsInDir = 1000

ctx := datastore.GetStore().CreateTransaction(context.TODO())

Expand Down
10 changes: 10 additions & 0 deletions code/go/0chain.net/blobbercore/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func SetupDefaultConfig() {
viper.SetDefault("rate_limiters.block_limit_monthly", 31250000)
viper.SetDefault("rate_limiters.upload_limit_monthly", 31250000)
viper.SetDefault("rate_limiters.commit_limit_monthly", 30000)
viper.SetDefault("rate_limiters.commit_limit_daily", 1600)
viper.SetDefault("rate_limiters.commit_zero_limit_daily", 400)

viper.SetDefault("healthcheck.frequency", "60s")

Expand All @@ -47,6 +49,7 @@ func SetupDefaultConfig() {
viper.SetDefault("finalize_allocations_interval", time.Duration(-1))

viper.SetDefault("max_dirs_files", 50000)
viper.SetDefault("max_objects_dir", 1000)
}

/*SetupConfig - setup the configuration system */
Expand Down Expand Up @@ -109,6 +112,8 @@ type Config struct {
BlockLimitMonthly int64
UploadLimitMonthly int64
CommitLimitMonthly int64
CommitLimitDaily int64
CommitZeroLimitDaily int64
ChallengeCleanupGap int64

HealthCheckWorkerFreq time.Duration
Expand All @@ -124,6 +129,7 @@ type Config struct {
FinalizeAllocationsInterval time.Duration

MaxAllocationDirFiles int
MaxObjectsInDir int

// DelegateWallet for pool owner.
DelegateWallet string `json:"delegate_wallet"`
Expand Down Expand Up @@ -256,6 +262,8 @@ func ReadConfig(deploymentMode int) {
Configuration.MaxAllocationDirFiles =
viper.GetInt("max_dirs_files")

Configuration.MaxObjectsInDir = viper.GetInt("max_objects_dir")

Configuration.DelegateWallet = viper.GetString("delegate_wallet")
if w := Configuration.DelegateWallet; len(w) != 64 {
log.Fatal("invalid delegate wallet:", w)
Expand All @@ -279,6 +287,8 @@ func ReadConfig(deploymentMode int) {
Configuration.BlockLimitMonthly = viper.GetInt64("rate_limiters.block_limit_monthly")
Configuration.UploadLimitMonthly = viper.GetInt64("rate_limiters.upload_limit_monthly")
Configuration.CommitLimitMonthly = viper.GetInt64("rate_limiters.commit_limit_monthly")
Configuration.CommitLimitDaily = viper.GetInt64("rate_limiters.commit_limit_daily")
Configuration.CommitZeroLimitDaily = viper.GetInt64("rate_limiters.commit_zero_limit_daily")
}

// StorageSCConfiguration will include all the required sc configs to operate blobber
Expand Down
58 changes: 29 additions & 29 deletions code/go/0chain.net/blobbercore/handler/client_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ const (
type ClientStats struct {
ClientID string `gorm:"column:client_id;size:64;primaryKey" json:"client_id"`
CreatedAt common.Timestamp `gorm:"created_at;primaryKey" json:"created"`
TotalUpload uint64 `gorm:"column:total_upload" json:"total_upload"`
TotalDownload uint64 `gorm:"column:total_download" json:"total_download"`
TotalWM uint64 `gorm:"column:total_write_marker" json:"total_write_marker"`
TotalUpload int64 `gorm:"column:total_upload" json:"total_upload"`
TotalDownload int64 `gorm:"column:total_download" json:"total_download"`
TotalWM int64 `gorm:"column:total_write_marker" json:"total_write_marker"`
TotalZeroWM int64 `gorm:"-" json:"total_zero_write_marker"`
}

func (ClientStats) TableName() string {
Expand All @@ -39,7 +40,7 @@ func (cs *ClientStats) BeforeCreate(tx *gorm.DB) error {
return nil
}

func GetUploadedData(clientID string) uint64 {
func GetUploadedData(clientID string) int64 {
mpLock.RLock()
defer mpLock.RUnlock()
cs := clientMap[clientID]
Expand All @@ -57,28 +58,10 @@ func AddUploadedData(clientID string, data int64) {
cs = &ClientStats{ClientID: clientID}
clientMap[clientID] = cs
}
cs.TotalUpload += uint64(data)
cs.TotalUpload += data
}

func GetDownloadedData(clientID string) uint64 {
mpLock.RLock()
defer mpLock.RUnlock()
cs := clientMap[clientID]
return cs.TotalDownload
}

func AddDownloadedData(clientID string, data int64) {
mpLock.Lock()
defer mpLock.Unlock()
cs := clientMap[clientID]
if cs == nil {
cs = &ClientStats{ClientID: clientID}
clientMap[clientID] = cs
}
cs.TotalDownload += uint64(data)
}

func GetWriteMarkerCount(clientID string) uint64 {
func GetWriteMarkerCount(clientID string) int64 {
mpLock.RLock()
defer mpLock.RUnlock()
cs := clientMap[clientID]
Expand All @@ -88,15 +71,27 @@ func GetWriteMarkerCount(clientID string) uint64 {
return cs.TotalWM
}

func AddWriteMarkerCount(clientID string, count int64) {
func AddWriteMarkerCount(clientID string, zeroSizeWM bool) {
mpLock.Lock()
defer mpLock.Unlock()
cs := clientMap[clientID]
if cs == nil {
cs = &ClientStats{ClientID: clientID}
clientMap[clientID] = cs
}
cs.TotalWM += uint64(count)
cs.TotalWM++
if zeroSizeWM {
cs.TotalZeroWM++
}
if cs.TotalZeroWM > config.Configuration.CommitZeroLimitDaily || cs.TotalWM > config.Configuration.CommitLimitDaily {
SetBlacklist(clientID)
}
}

func SetBlacklist(clientID string) {
blMap.Lock()
blackListMap[clientID] = true
blMap.Unlock()
}

func CheckBlacklist(clientID string) bool {
Expand All @@ -109,10 +104,13 @@ func CheckBlacklist(clientID string) bool {
func saveClientStats() {
dbStats := make([]*ClientStats, 0, len(clientMap))
mpLock.Lock()
now := common.Now()
for _, cs := range clientMap {
cs.CreatedAt = now
cs.TotalDownload = getDailyBlocks(cs.ClientID)
dbStats = append(dbStats, cs)
delete(clientMap, cs.ClientID)
}
clear(clientMap)
mpLock.Unlock()
_ = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
if len(dbStats) > 0 {
Expand All @@ -129,7 +127,7 @@ func saveClientStats() {
})
if err == nil {
blMap.Lock()
blackListMap = make(map[string]bool)
clear(blackListMap)
for _, clientID := range blackList {
blackListMap[clientID] = true
}
Expand All @@ -138,17 +136,19 @@ func saveClientStats() {
}

func startBlackListWorker(ctx context.Context) {
BlackListWorkerTime := 6 * time.Hour
BlackListWorkerTime := 24 * time.Hour
if config.Development() {
BlackListWorkerTime = 10 * time.Second
}
saveClientStats()

for {
select {
case <-ctx.Done():
return
case <-time.After(BlackListWorkerTime):
saveClientStats()
cleanupDownloadLimit()
}
}
}
17 changes: 4 additions & 13 deletions code/go/0chain.net/blobbercore/handler/download_quota.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package handler

import (
"context"
"fmt"
"sync"
"time"

"github.com/0chain/blobber/code/go/0chain.net/core/common"
)
Expand Down Expand Up @@ -38,17 +36,10 @@ func getDailyBlocks(key string) int64 {
return downloadLimit[key]
}

func startDownloadLimitCleanup(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(24 * time.Hour):
downloadLock.Lock()
downloadLimit = make(map[string]int64)
downloadLock.Unlock()
}
}
func cleanupDownloadLimit() {
downloadLock.Lock()
defer downloadLock.Unlock()
clear(downloadLimit)
}

func getQuotaManager() *QuotaManager {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,6 @@ func (fsh *StorageHandler) DownloadFile(ctx context.Context, r *http.Request) (i
reference.FileBlockDownloaded(ctx, fileref, dr.NumBlocks)
go func() {
addDailyBlocks(clientID, dr.NumBlocks)
AddDownloadedData(clientID, dr.NumBlocks)
}()
if !dr.VerifyDownload {
return fileDownloadResponse.Data, nil
Expand Down Expand Up @@ -731,7 +730,7 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
return nil, common.NewError("write_marker_error", "Error redeeming the write marker")
}
go allocation.DeleteConnectionObjEntry(connectionID)
go AddWriteMarkerCount(clientID, 1)
go AddWriteMarkerCount(clientID, connectionObj.Size <= 0)

Logger.Info("[commit]"+commitOperation,
zap.String("alloc_id", allocationID),
Expand Down
Loading

0 comments on commit cce361c

Please sign in to comment.