Skip to content

Commit

Permalink
executor: check inconsistent index in IndexLookupExecutor (pingcap#…
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored and zz-jason committed Apr 28, 2019
1 parent 168776d commit 45c0e51
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 7 deletions.
26 changes: 19 additions & 7 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha
e.tblWorkerWg.Add(lookupConcurrencyLimit)
for i := 0; i < lookupConcurrencyLimit; i++ {
worker := &tableWorker{
idxLookup: e,
workCh: workCh,
finished: e.finished,
buildTblReader: e.buildTableReader,
Expand Down Expand Up @@ -732,6 +733,7 @@ func (w *indexWorker) buildTableTask(handles []int64) *lookupTableTask {

// tableWorker is used by IndexLookUpExecutor to maintain table lookup background goroutines.
type tableWorker struct {
idxLookup *IndexLookUpExecutor
workCh <-chan *lookupTableTask
finished <-chan struct{}
buildTblReader func(ctx context.Context, handles []int64) (Executor, error)
Expand Down Expand Up @@ -809,6 +811,7 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er
task.rows = append(task.rows, row)
}
}

memUsage = int64(cap(task.rows)) * int64(unsafe.Sizeof(chunk.Row{}))
task.memUsage += memUsage
task.memTracker.Consume(memUsage)
Expand All @@ -824,14 +827,23 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er
sort.Sort(task)
}

if w.isCheckOp && handleCnt != len(task.rows) {
obtainedHandlesMap := make(map[int64]struct{}, len(task.rows))
for _, row := range task.rows {
handle := row.GetInt64(w.handleIdx)
obtainedHandlesMap[handle] = struct{}{}
if handleCnt != len(task.rows) {
if w.isCheckOp {
obtainedHandlesMap := make(map[int64]struct{}, len(task.rows))
for _, row := range task.rows {
handle := row.GetInt64(w.handleIdx)
obtainedHandlesMap[handle] = struct{}{}
}
return errors.Errorf("inconsistent index %s handle count %d isn't equal to value count %d, missing handles %v in a batch",
w.idxLookup.index.Name.O, handleCnt, len(task.rows), GetLackHandles(task.handles, obtainedHandlesMap))
}

if len(w.idxLookup.tblPlans) == 1 {
// table scan in double read can never has conditions according to convertToIndexScan.
// if this table scan has no condition, the number of rows it returns must equal to the length of handles.
return errors.Errorf("inconsistent index %s handle count %d isn't equal to value count %d",
w.idxLookup.index.Name.O, handleCnt, len(task.rows))
}
return errors.Errorf("handle count %d isn't equal to value count %d, missing handles %v in a batch",
handleCnt, len(task.rows), GetLackHandles(task.handles, obtainedHandlesMap))
}

return nil
Expand Down
53 changes: 53 additions & 0 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/testkit"
)

Expand Down Expand Up @@ -184,3 +187,53 @@ func (s *testSuite3) TestIssue10178(c *C) {
tk.MustQuery("select * from t where a > 9223372036854775807").Check(testkit.Rows("18446744073709551615"))
tk.MustQuery("select * from t where a < 9223372036854775808").Check(testkit.Rows("9223372036854775807"))
}

func (s *testSuite3) TestInconsistentIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, index idx_a(a))")
is := s.domain.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
idx := tbl.Meta().FindIndexByName("idx_a")
idxOp := tables.NewIndex(tbl.Meta().ID, tbl.Meta(), idx)
ctx := mock.NewContext()
ctx.Store = s.store

for i := 0; i < 10; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i+10, i))
c.Assert(tk.QueryToErr("select * from t where a>=0"), IsNil)
}

for i := 0; i < 10; i++ {
tk.MustExec(fmt.Sprintf("update t set a=%d where a=%d", i, i+10))
c.Assert(tk.QueryToErr("select * from t where a>=0"), IsNil)
}

for i := 0; i < 10; i++ {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
_, err = idxOp.Create(ctx, txn, types.MakeDatums(i+10), int64(100+i))
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)

err = tk.QueryToErr("select * from t use index(idx_a) where a >= 0")
c.Assert(err.Error(), Equals, fmt.Sprintf("inconsistent index idx_a handle count %d isn't equal to value count 10", i+11))

// if has other conditions, the inconsistent index check doesn't work.
err = tk.QueryToErr("select * from t where a>=0 and b<10")
c.Assert(err, IsNil)
}

// fix inconsistent problem to pass CI
for i := 0; i < 10; i++ {
txn, err := s.store.Begin()
c.Assert(err, IsNil)
err = idxOp.Delete(ctx.GetSessionVars().StmtCtx, txn, types.MakeDatums(i+10), int64(100+i), nil)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
}
}

0 comments on commit 45c0e51

Please sign in to comment.