Skip to content

Commit 5d6b5a8

Browse files
authored
feat(blob-uploader): blob_upload table add batch hash (#1677)
1 parent 4ee459a commit 5d6b5a8

File tree

4 files changed

+142
-65
lines changed

4 files changed

+142
-65
lines changed

database/migrate/migrations/00027_ blob_upload.sql

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
CREATE TABLE blob_upload (
55
batch_index BIGINT NOT NULL,
6+
batch_hash VARCHAR NOT NULL,
67

78
platform SMALLINT NOT NULL,
89
status SMALLINT NOT NULL,
@@ -13,20 +14,15 @@ CREATE TABLE blob_upload (
1314
deleted_at TIMESTAMP(0) DEFAULT NULL
1415
);
1516

16-
CREATE UNIQUE INDEX IF NOT EXISTS batch_index_platform_uindex
17-
ON blob_upload(batch_index, platform) WHERE deleted_at IS NULL;
17+
CREATE UNIQUE INDEX IF NOT EXISTS batch_index_batch_hash_platform_uindex
18+
ON blob_upload(batch_index, batch_hash, platform) WHERE deleted_at IS NULL;
1819

1920
COMMENT ON COLUMN blob_upload.status IS 'undefined, pending, uploaded, failed';
2021

21-
CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index ON blob_upload(batch_index) WHERE deleted_at IS NULL;
22-
23-
CREATE INDEX IF NOT EXISTS idx_blob_upload_platform ON blob_upload(platform) WHERE deleted_at IS NULL;
24-
25-
CREATE INDEX IF NOT EXISTS idx_blob_upload_status ON blob_upload(status) WHERE deleted_at IS NULL;
26-
2722
CREATE INDEX IF NOT EXISTS idx_blob_upload_status_platform ON blob_upload(status, platform) WHERE deleted_at IS NULL;
2823

29-
CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index_status_platform ON blob_upload(batch_index, status, platform) WHERE deleted_at IS NULL;
24+
CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index_batch_hash_status_platform
25+
ON blob_upload(batch_index, batch_hash, status, platform) WHERE deleted_at IS NULL;
3026

3127
-- +goose StatementEnd
3228

rollup/internal/controller/blob_uploader/blob_uploader.go

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package blob_uploader
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67

78
"github.com/prometheus/client_golang/prometheus"
@@ -67,7 +68,7 @@ func (b *BlobUploader) UploadBlobToS3() {
6768
}
6869

6970
// get un-uploaded batches from database in ascending order by their index.
70-
dbBatch, err := b.batchOrm.GetFirstUnuploadedBatchByPlatform(b.ctx, b.cfg.StartBatch, types.BlobStoragePlatformS3)
71+
dbBatch, err := b.GetFirstUnuploadedBatchByPlatform(b.ctx, b.cfg.StartBatch, types.BlobStoragePlatformS3)
7172
if err != nil {
7273
log.Error("Failed to fetch unuploaded batch", "err", err)
7374
return
@@ -85,7 +86,7 @@ func (b *BlobUploader) UploadBlobToS3() {
8586
if err != nil {
8687
log.Error("failed to construct constructBlobCodec payload ", "codecVersion", codecVersion, "batch index", dbBatch.Index, "err", err)
8788
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
88-
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
89+
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
8990
log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr)
9091
}
9192
return
@@ -97,7 +98,7 @@ func (b *BlobUploader) UploadBlobToS3() {
9798
log.Error("failed to calculate versioned blob hash", "batch index", dbBatch.Index, "err", err)
9899
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
99100
// update status to failed
100-
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
101+
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
101102
log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr)
102103
}
103104
return
@@ -110,14 +111,14 @@ func (b *BlobUploader) UploadBlobToS3() {
110111
log.Error("failed to upload blob data to AWS S3", "batch index", dbBatch.Index, "versioned blob hash", key, "err", err)
111112
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
112113
// update status to failed
113-
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
114+
if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil {
114115
log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr)
115116
}
116117
return
117118
}
118119

119120
// update status to uploaded
120-
if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusUploaded); err != nil {
121+
if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusUploaded); err != nil {
121122
log.Error("failed to update blob upload status to uploaded", "batch index", dbBatch.Index, "err", err)
122123
b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc()
123124
return
@@ -195,3 +196,56 @@ func (b *BlobUploader) constructBlobCodec(dbBatch *orm.Batch) (*kzg4844.Blob, er
195196

196197
return daBatch.Blob(), nil
197198
}
199+
200+
// GetFirstUnuploadedBatchByPlatform retrieves the first batch that either hasn't been uploaded to corresponding blob storage service
201+
// The batch must have a commit_tx_hash (committed).
202+
func (b *BlobUploader) GetFirstUnuploadedBatchByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (*orm.Batch, error) {
203+
batchIndex, err := b.blobUploadOrm.GetNextBatchIndexToUploadByPlatform(ctx, startBatch, platform)
204+
if err != nil {
205+
return nil, err
206+
}
207+
208+
var batch *orm.Batch
209+
for {
210+
var err error
211+
batch, err = b.batchOrm.GetBatchByIndex(ctx, batchIndex)
212+
if err != nil {
213+
if errors.Is(err, gorm.ErrRecordNotFound) {
214+
log.Debug("got batch not proposed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
215+
return nil, nil
216+
}
217+
return nil, err
218+
}
219+
220+
// to check if the parent batch uploaded
221+
// if no, there is a batch revert happened, we need to fallback to upload previous batch
222+
// skip the check if the parent batch is genesis batch
223+
if batchIndex <= 1 || batchIndex == startBatch {
224+
break
225+
}
226+
fields := map[string]interface{}{
227+
"batch_index = ?": batchIndex - 1,
228+
"batch_hash = ?": batch.ParentBatchHash,
229+
"platform = ?": platform,
230+
"status = ?": types.BlobUploadStatusUploaded,
231+
}
232+
blobUpload, err := b.blobUploadOrm.GetBlobUploads(ctx, fields, nil, 1)
233+
if err != nil {
234+
return nil, err
235+
}
236+
237+
if len(blobUpload) == 0 {
238+
batchIndex--
239+
continue
240+
}
241+
242+
break
243+
}
244+
245+
if len(batch.CommitTxHash) == 0 {
246+
log.Debug("got batch not committed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
247+
return nil, nil
248+
}
249+
250+
return batch, nil
251+
}

rollup/internal/orm/batch.go

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -263,44 +263,6 @@ func (o *Batch) GetBatchByIndex(ctx context.Context, index uint64) (*Batch, erro
263263
return &batch, nil
264264
}
265265

266-
// GetFirstUnuploadedBatchByPlatform retrieves the first batch that either hasn't been uploaded to corresponding blob storage service
267-
// The batch must have a commit_tx_hash (committed).
268-
func (o *Batch) GetFirstUnuploadedBatchByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (*Batch, error) {
269-
db := o.db.WithContext(ctx)
270-
db = db.Model(&BlobUpload{})
271-
db = db.Where("platform = ? AND status = ?", platform, types.BlobUploadStatusUploaded)
272-
db = db.Order("batch_index DESC")
273-
db = db.Limit(1)
274-
275-
var blobUpload BlobUpload
276-
var batchIndex uint64
277-
if err := db.First(&blobUpload).Error; err != nil {
278-
if errors.Is(err, gorm.ErrRecordNotFound) {
279-
batchIndex = startBatch
280-
} else {
281-
return nil, fmt.Errorf("Batch.GetFirstUnuploadedBatchByPlatform error: %w", err)
282-
}
283-
} else {
284-
batchIndex = blobUpload.BatchIndex + 1
285-
}
286-
287-
batch, err := o.GetBatchByIndex(ctx, batchIndex)
288-
if err != nil {
289-
if errors.Is(err, gorm.ErrRecordNotFound) {
290-
log.Debug("got batch not proposed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
291-
return nil, nil
292-
}
293-
return nil, fmt.Errorf("Batch.GetFirstUnuploadedBatchByPlatform error: %w", err)
294-
}
295-
296-
if len(batch.CommitTxHash) == 0 {
297-
log.Debug("got batch not committed for blob uploading", "batch_index", batchIndex, "platform", platform.String())
298-
return nil, nil
299-
}
300-
301-
return batch, nil
302-
}
303-
304266
// InsertBatch inserts a new batch into the database.
305267
func (o *Batch) InsertBatch(ctx context.Context, batch *encoding.Batch, codecVersion encoding.CodecVersion, metrics rutils.BatchMetrics, dbTX ...*gorm.DB) (*Batch, error) {
306268
if batch == nil {

rollup/internal/orm/blob_upload.go

Lines changed: 78 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ package orm
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"time"
78

89
"gorm.io/gorm"
9-
"gorm.io/gorm/clause"
1010

1111
"scroll-tech/common/types"
1212
)
@@ -16,8 +16,9 @@ type BlobUpload struct {
1616
db *gorm.DB `gorm:"-"`
1717

1818
// blob upload
19-
BatchIndex uint64 `json:"batch_index" gorm:"column:batch_index;primaryKey"`
20-
Platform int16 `json:"platform" gorm:"column:platform;primaryKey"`
19+
BatchIndex uint64 `json:"batch_index" gorm:"column:batch_index"`
20+
BatchHash string `json:"batch_hash" gorm:"column:batch_hash"`
21+
Platform int16 `json:"platform" gorm:"column:platform"`
2122
Status int16 `json:"status" gorm:"column:status"`
2223

2324
// metadata
@@ -36,23 +37,87 @@ func (*BlobUpload) TableName() string {
3637
return "blob_upload"
3738
}
3839

40+
// GetNextBatchIndexToUploadByPlatform retrieves the next batch index that hasn't been uploaded to corresponding blob storage service
41+
func (o *BlobUpload) GetNextBatchIndexToUploadByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (uint64, error) {
42+
db := o.db.WithContext(ctx)
43+
db = db.Model(&BlobUpload{})
44+
db = db.Where("platform = ? AND status = ?", platform, types.BlobUploadStatusUploaded)
45+
db = db.Order("batch_index DESC")
46+
db = db.Limit(1)
47+
48+
var blobUpload BlobUpload
49+
var batchIndex uint64
50+
if err := db.First(&blobUpload).Error; err != nil {
51+
if errors.Is(err, gorm.ErrRecordNotFound) {
52+
batchIndex = startBatch
53+
} else {
54+
return 0, fmt.Errorf("BlobUpload.GetNextBatchIndexToUploadByPlatform error: %w", err)
55+
}
56+
} else {
57+
batchIndex = blobUpload.BatchIndex + 1
58+
}
59+
60+
return batchIndex, nil
61+
}
62+
63+
// GetBlobUpload retrieves the selected blob uploads from the database.
64+
func (o *BlobUpload) GetBlobUploads(ctx context.Context, fields map[string]interface{}, orderByList []string, limit int) ([]*BlobUpload, error) {
65+
db := o.db.WithContext(ctx)
66+
db = db.Model(&BlobUpload{})
67+
68+
for key, value := range fields {
69+
db = db.Where(key, value)
70+
}
71+
72+
for _, orderBy := range orderByList {
73+
db = db.Order(orderBy)
74+
}
75+
76+
if limit > 0 {
77+
db = db.Limit(limit)
78+
}
79+
80+
db = db.Order("batch_index ASC")
81+
82+
var blobUploads []*BlobUpload
83+
if err := db.Find(&blobUploads).Error; err != nil {
84+
return nil, fmt.Errorf("BlobUpload.GetBlobUploads error: %w", err)
85+
}
86+
87+
return blobUploads, nil
88+
}
89+
3990
// InsertOrUpdateBlobUpload inserts a new blob upload record or updates the existing one.
40-
func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex uint64, platform types.BlobStoragePlatform, status types.BlobUploadStatus, dbTX ...*gorm.DB) error {
91+
func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex uint64, batchHash string, platform types.BlobStoragePlatform, status types.BlobUploadStatus, dbTX ...*gorm.DB) error {
4192
db := o.db
4293
if len(dbTX) > 0 && dbTX[0] != nil {
4394
db = dbTX[0]
4495
}
4596
db = db.WithContext(ctx)
46-
blobUpload := &BlobUpload{
47-
BatchIndex: batchIndex,
48-
Platform: int16(platform),
49-
Status: int16(status),
97+
98+
var existing BlobUpload
99+
err := db.Where("batch_index = ? AND batch_hash = ? AND platform = ? AND deleted_at IS NULL",
100+
batchIndex, batchHash, int16(platform),
101+
).First(&existing).Error
102+
103+
if errors.Is(err, gorm.ErrRecordNotFound) {
104+
newRecord := BlobUpload{
105+
BatchIndex: batchIndex,
106+
BatchHash: batchHash,
107+
Platform: int16(platform),
108+
Status: int16(status),
109+
}
110+
if err := db.Create(&newRecord).Error; err != nil {
111+
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload insert error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform)
112+
}
113+
return nil
114+
} else if err != nil {
115+
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload query error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform)
50116
}
51-
if err := db.Clauses(clause.OnConflict{
52-
Columns: []clause.Column{{Name: "batch_index"}, {Name: "platform"}},
53-
DoUpdates: clause.AssignmentColumns([]string{"status"}),
54-
}).Create(blobUpload).Error; err != nil {
55-
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload error: %w, batch index: %v, platform: %v", err, batchIndex, platform)
117+
118+
if err := db.Model(&existing).Update("status", int16(status)).Error; err != nil {
119+
return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload update error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform)
56120
}
121+
57122
return nil
58123
}

0 commit comments

Comments
 (0)