Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Add schema update time verification for insert and upsert to use cache #39096

Merged
merged 2 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250107132849-4099923438e6
github.com/minio/minio-go/v7 v7.0.73
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -660,8 +660,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f h1:So6RKU5wqP/8EaKogicJP8gZ2SrzzS/JprusBaE3RKc=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250107132849-4099923438e6 h1:IUHMoxffuI2xbtMJJE/XK7kKzavZnN5sMiheIKTDVt8=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250107132849-4099923438e6/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down
5 changes: 5 additions & 0 deletions internal/metastore/model/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Collection struct {
Properties []*commonpb.KeyValuePair
State pb.CollectionState
EnableDynamicField bool
UpdateTimestamp uint64
}

func (c *Collection) Available() bool {
Expand Down Expand Up @@ -74,6 +75,7 @@ func (c *Collection) ShallowClone() *Collection {
State: c.State,
EnableDynamicField: c.EnableDynamicField,
Functions: c.Functions,
UpdateTimestamp: c.UpdateTimestamp,
}
}

Expand All @@ -99,6 +101,7 @@ func (c *Collection) Clone() *Collection {
State: c.State,
EnableDynamicField: c.EnableDynamicField,
Functions: CloneFunctions(c.Functions),
UpdateTimestamp: c.UpdateTimestamp,
}
}

Expand Down Expand Up @@ -156,6 +159,7 @@ func UnmarshalCollectionModel(coll *pb.CollectionInfo) *Collection {
State: coll.State,
Properties: coll.Properties,
EnableDynamicField: coll.Schema.EnableDynamicField,
UpdateTimestamp: coll.UpdateTimestamp,
}
}

Expand Down Expand Up @@ -218,6 +222,7 @@ func marshalCollectionModelWithConfig(coll *Collection, c *config) *pb.Collectio
StartPositions: coll.StartPositions,
State: coll.State,
Properties: coll.Properties,
UpdateTimestamp: coll.UpdateTimestamp,
}

if c.withPartitions {
Expand Down
18 changes: 10 additions & 8 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2629,10 +2629,11 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
Version: msgpb.InsertDataVersion_ColumnBased,
},
},
idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
schemaTimestamp: request.SchemaTimestamp,
}
var enqueuedTask task = it
if streamingutil.IsStreamingServiceEnabled() {
Expand Down Expand Up @@ -2872,10 +2873,11 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest)
},
},

idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
schemaTimestamp: request.SchemaTimestamp,
}
var enqueuedTask task = it
if streamingutil.IsStreamingServiceEnabled() {
Expand Down
3 changes: 3 additions & 0 deletions internal/proxy/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type collectionInfo struct {
consistencyLevel commonpb.ConsistencyLevel
partitionKeyIsolation bool
replicateID string
updateTimestamp uint64
}

type databaseInfo struct {
Expand Down Expand Up @@ -473,6 +474,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
createdUtcTimestamp: collection.CreatedUtcTimestamp,
consistencyLevel: collection.ConsistencyLevel,
partitionKeyIsolation: isolation,
updateTimestamp: collection.UpdateTimestamp,
}, nil
}
_, dbOk := m.collInfo[database]
Expand All @@ -490,6 +492,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
consistencyLevel: collection.ConsistencyLevel,
partitionKeyIsolation: isolation,
replicateID: replicateID,
updateTimestamp: collection.UpdateTimestamp,
}

log.Ctx(ctx).Info("meta update success", zap.String("database", database), zap.String("collectionName", collectionName),
Expand Down
1 change: 1 addition & 0 deletions internal/proxy/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ func (t *describeCollectionTask) Execute(ctx context.Context) error {
t.result.Properties = result.Properties
t.result.DbName = result.GetDbName()
t.result.NumPartitions = result.NumPartitions
t.result.UpdateTimestamp = result.UpdateTimestamp
for _, field := range result.Schema.Fields {
if field.IsDynamic {
continue
Expand Down
37 changes: 28 additions & 9 deletions internal/proxy/task_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@
insertMsg *BaseInsertTask
ctx context.Context

result *milvuspb.MutationResult
idAllocator *allocator.IDAllocator
segIDAssigner *segIDAssigner
chMgr channelsMgr
chTicker channelsTimeTicker
vChannels []vChan
pChannels []pChan
schema *schemapb.CollectionSchema
partitionKeys *schemapb.FieldData
result *milvuspb.MutationResult
idAllocator *allocator.IDAllocator
segIDAssigner *segIDAssigner
chMgr channelsMgr
chTicker channelsTimeTicker
vChannels []vChan
pChannels []pChan
schema *schemapb.CollectionSchema
partitionKeys *schemapb.FieldData
schemaTimestamp uint64
}

// TraceCtx returns insertTask context
Expand Down Expand Up @@ -135,6 +136,24 @@
return merr.WrapErrCollectionReplicateMode("insert")
}

collID, err := globalMetaCache.GetCollectionID(context.Background(), it.insertMsg.GetDbName(), collectionName)
if err != nil {
log.Ctx(ctx).Warn("fail to get collection id", zap.Error(err))
return err
}

Check warning on line 143 in internal/proxy/task_insert.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_insert.go#L141-L143

Added lines #L141 - L143 were not covered by tests
colInfo, err := globalMetaCache.GetCollectionInfo(ctx, it.insertMsg.GetDbName(), collectionName, collID)
if err != nil {
log.Ctx(ctx).Warn("fail to get collection info", zap.Error(err))
return err
}

Check warning on line 148 in internal/proxy/task_insert.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_insert.go#L146-L148

Added lines #L146 - L148 were not covered by tests
if it.schemaTimestamp != 0 {
if it.schemaTimestamp != colInfo.updateTimestamp {
err := merr.WrapErrCollectionSchemaMisMatch(collectionName)
log.Ctx(ctx).Info("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err))
return err
}
}

schema, err := globalMetaCache.GetCollectionSchema(ctx, it.insertMsg.GetDbName(), collectionName)
if err != nil {
log.Ctx(ctx).Warn("get collection schema from global meta cache failed", zap.String("collectionName", collectionName), zap.Error(err))
Expand Down
36 changes: 36 additions & 0 deletions internal/proxy/task_insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,13 @@ func TestInsertTask_Function(t *testing.T) {

info := newSchemaInfo(schema)
cache := NewMockCache(t)
collectionID := UniqueID(0)
cache.On("GetCollectionID",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
mock.AnythingOfType("string"),
).Return(collectionID, nil)

cache.On("GetCollectionSchema",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
Expand Down Expand Up @@ -437,3 +444,32 @@ func TestInsertTask_Function(t *testing.T) {
err = task.PreExecute(ctx)
assert.NoError(t, err)
}

func TestInsertTaskForSchemaMismatch(t *testing.T) {
cache := globalMetaCache
defer func() { globalMetaCache = cache }()
mockCache := NewMockCache(t)
globalMetaCache = mockCache
ctx := context.Background()

t.Run("schema ts mismatch", func(t *testing.T) {
it := insertTask{
ctx: context.Background(),
insertMsg: &msgstream.InsertMsg{
InsertRequest: &msgpb.InsertRequest{
DbName: "hooooooo",
CollectionName: "fooooo",
},
},
schemaTimestamp: 99,
}
mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{
updateTimestamp: 100,
}, nil)
mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
err := it.PreExecute(ctx)
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrCollectionSchemaMismatch)
})
}
21 changes: 20 additions & 1 deletion internal/proxy/task_upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@
partitionKeys *schemapb.FieldData
// automatic generate pk as new pk wehen autoID == true
// delete task need use the oldIds
oldIds *schemapb.IDs
oldIds *schemapb.IDs
schemaTimestamp uint64
}

// TraceCtx returns upsertTask context
Expand Down Expand Up @@ -312,6 +313,24 @@
return merr.WrapErrCollectionReplicateMode("upsert")
}

collID, err := globalMetaCache.GetCollectionID(context.Background(), it.req.GetDbName(), collectionName)
if err != nil {
log.Warn("fail to get collection id", zap.Error(err))
return err
}

Check warning on line 320 in internal/proxy/task_upsert.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_upsert.go#L318-L320

Added lines #L318 - L320 were not covered by tests
colInfo, err := globalMetaCache.GetCollectionInfo(ctx, it.req.GetDbName(), collectionName, collID)
if err != nil {
log.Warn("fail to get collection info", zap.Error(err))
return err
}

Check warning on line 325 in internal/proxy/task_upsert.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_upsert.go#L323-L325

Added lines #L323 - L325 were not covered by tests
if it.schemaTimestamp != 0 {
if it.schemaTimestamp != colInfo.updateTimestamp {
err := merr.WrapErrCollectionSchemaMisMatch(collectionName)
log.Info("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err))
return err
}
}

schema, err := globalMetaCache.GetCollectionSchema(ctx, it.req.GetDbName(), collectionName)
if err != nil {
log.Warn("Failed to get collection schema",
Expand Down
26 changes: 26 additions & 0 deletions internal/proxy/task_upsert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,3 +495,29 @@ func TestUpsertTask_Function(t *testing.T) {
task.upsertMsg.InsertMsg.InsertRequest.NumRows = oldRows
}
}

func TestUpsertTaskForSchemaMismatch(t *testing.T) {
cache := globalMetaCache
defer func() { globalMetaCache = cache }()
mockCache := NewMockCache(t)
globalMetaCache = mockCache
ctx := context.Background()

t.Run("schema ts mismatch", func(t *testing.T) {
ut := upsertTask{
ctx: ctx,
req: &milvuspb.UpsertRequest{
CollectionName: "col-0",
},
schemaTimestamp: 99,
}
mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(0, nil)
mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{
updateTimestamp: 100,
}, nil)
mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
err := ut.PreExecute(ctx)
assert.Error(t, err)
assert.ErrorIs(t, err, merr.ErrCollectionSchemaMismatch)
})
}
10 changes: 10 additions & 0 deletions internal/rootcoord/alter_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@
oldColl.Properties = oldProperties
newColl := col.Clone()
newColl.Properties = newProperties
tso, err := core.tsoAllocator.GenerateTSO(1)
if err == nil {
newColl.UpdateTimestamp = tso
}
redoTask := newBaseRedoTask(core.stepExecutor)
redoTask.AddSyncStep(&AlterCollectionStep{
baseStep: baseStep{core: core},
Expand Down Expand Up @@ -280,6 +284,12 @@
if err != nil {
return err
}

tso, err := core.tsoAllocator.GenerateTSO(1)
if err == nil {
newColl.UpdateTimestamp = tso
}

Check warning on line 291 in internal/rootcoord/alter_collection_task.go

View check run for this annotation

Codecov / codecov/patch

internal/rootcoord/alter_collection_task.go#L288-L291

Added lines #L288 - L291 were not covered by tests

redoTask := newBaseRedoTask(core.stepExecutor)
redoTask.AddSyncStep(&AlterCollectionStep{
baseStep: baseStep{core: core},
Expand Down
10 changes: 5 additions & 5 deletions internal/rootcoord/alter_collection_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
).Return(errors.New("err"))
meta.On("ListAliasesByID", mock.Anything, mock.Anything).Return([]string{})

core := newTestCore(withValidProxyManager(), withMeta(meta))
core := newTestCore(withValidProxyManager(), withMeta(meta), withInvalidTsoAllocator())
task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{
Expand Down Expand Up @@ -131,7 +131,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
return errors.New("err")
}

core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker))
core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker), withInvalidTsoAllocator())
task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{
Expand Down Expand Up @@ -166,7 +166,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
return errors.New("err")
}

core := newTestCore(withInvalidProxyManager(), withMeta(meta), withBroker(broker))
core := newTestCore(withInvalidProxyManager(), withMeta(meta), withBroker(broker), withInvalidTsoAllocator())
task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{
Expand Down Expand Up @@ -200,7 +200,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
},
},
}, nil)
core := newTestCore(withValidProxyManager(), withMeta(meta))
core := newTestCore(withValidProxyManager(), withMeta(meta), withInvalidTsoAllocator())
task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{
Expand Down Expand Up @@ -254,7 +254,7 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
ticker := newChanTimeTickSync(packChan)
ticker.addDmlChannels("by-dev-rootcoord-dml_1")

core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker), withTtSynchronizer(ticker))
core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker), withTtSynchronizer(ticker), withInvalidTsoAllocator())
newPros := append(properties, &commonpb.KeyValuePair{
Key: common.ReplicateEndTSKey,
Value: "10000",
Expand Down
2 changes: 1 addition & 1 deletion internal/rootcoord/create_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,6 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
State: pb.PartitionState_PartitionCreated,
}
}

collInfo := model.Collection{
CollectionID: collID,
DBID: t.dbID,
Expand All @@ -592,6 +591,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
Partitions: partitions,
Properties: t.Req.Properties,
EnableDynamicField: t.schema.EnableDynamicField,
UpdateTimestamp: ts,
}

// We cannot check the idempotency inside meta table when adding collection, since we'll execute duplicate steps
Expand Down
1 change: 1 addition & 0 deletions internal/rootcoord/root_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,7 @@ func convertModelToDesc(collInfo *model.Collection, aliases []string, dbName str
resp.Properties = collInfo.Properties
resp.NumPartitions = int64(len(collInfo.Partitions))
resp.DbId = collInfo.DBID
resp.UpdateTimestamp = collInfo.UpdateTimestamp
return resp
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/jolestar/go-commons-pool/v2 v2.1.2
github.com/json-iterator/go v1.1.12
github.com/klauspost/compress v1.17.7
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250107132849-4099923438e6
github.com/nats-io/nats-server/v2 v2.10.12
github.com/nats-io/nats.go v1.34.1
github.com/panjf2000/ants/v2 v2.7.2
Expand Down
4 changes: 2 additions & 2 deletions pkg/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f h1:So6RKU5wqP/8EaKogicJP8gZ2SrzzS/JprusBaE3RKc=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250107132849-4099923438e6 h1:IUHMoxffuI2xbtMJJE/XK7kKzavZnN5sMiheIKTDVt8=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250107132849-4099923438e6/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
Expand Down
Loading
Loading