diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 7851582a801d4..5f4f534aa56f7 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -740,7 +740,11 @@ func (is *PhysicalIndexScan) initSchema(idx *model.IndexInfo, idxExprCols []*exp // If it's double read case, the first index must return handle. So we should add extra handle column // if there isn't a handle column. if isDoubleRead && !setHandle { - indexCols = append(indexCols, &expression.Column{ID: model.ExtraHandleID, UniqueID: is.ctx.GetSessionVars().AllocPlanColumnID()}) + indexCols = append(indexCols, &expression.Column{ + RetType: types.NewFieldType(mysql.TypeLonglong), + ID: model.ExtraHandleID, + UniqueID: is.ctx.GetSessionVars().AllocPlanColumnID(), + }) } is.SetSchema(expression.NewSchema(indexCols...)) } diff --git a/planner/core/task.go b/planner/core/task.go index 6becdc2b2e500..a99f34fa35e17 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -19,12 +19,14 @@ import ( "github.com/pingcap/parser/ast" "github.com/pingcap/parser/charset" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/plancodec" ) @@ -390,17 +392,38 @@ func (p *PhysicalIndexJoin) GetCost(outerTask, innerTask task) float64 { return outerTask.cost() + innerPlanCost + cpuCost + memoryCost } +func (p *PhysicalHashJoin) avgRowSize(inner PhysicalPlan) (size float64) { + padChar := p.ctx.GetSessionVars().StmtCtx.PadCharToFullLength + if inner.statsInfo().HistColl != nil { + size = inner.statsInfo().HistColl.GetAvgRowSizeListInDisk(inner.Schema().Columns, padChar) + } else { + // Estimate using just the type info. + cols := inner.Schema().Columns + for _, col := range cols { + size += float64(chunk.EstimateTypeWidth(padChar, col.GetType())) + } + } + return +} + // GetCost computes cost of hash join operator itself. func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64) float64 { - innerCnt, outerCnt := lCnt, rCnt + buildCnt, probeCnt := lCnt, rCnt + build := p.children[0] // Taking the right as the inner for right join or using the outer to build a hash table. if (p.InnerChildIdx == 1 && !p.UseOuterToBuild) || (p.InnerChildIdx == 0 && p.UseOuterToBuild) { - innerCnt, outerCnt = rCnt, lCnt + buildCnt, probeCnt = rCnt, lCnt + build = p.children[1] } sessVars := p.ctx.GetSessionVars() + oomUseTmpStorage := config.GetGlobalConfig().OOMUseTmpStorage + memQuota := sessVars.StmtCtx.MemTracker.GetBytesLimit() // sessVars.MemQuotaQuery && hint + rowSize := p.avgRowSize(build) + spill := oomUseTmpStorage && memQuota > 0 && rowSize*buildCnt > float64(memQuota) // Cost of building hash table. - cpuCost := innerCnt * sessVars.CPUFactor - memoryCost := innerCnt * sessVars.MemoryFactor + cpuCost := buildCnt * sessVars.CPUFactor + memoryCost := buildCnt * sessVars.MemoryFactor + diskCost := buildCnt * sessVars.DiskFactor * rowSize // Number of matched row pairs regarding the equal join conditions. helper := &fullJoinRowCountHelper{ cartesian: false, @@ -428,23 +451,38 @@ func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64) float64 { numPairs = 0 } } - // Cost of quering hash table is cheap actually, so we just compute the cost of + // Cost of querying hash table is cheap actually, so we just compute the cost of // evaluating `OtherConditions` and joining row pairs. probeCost := numPairs * sessVars.CPUFactor + probeDiskCost := numPairs * sessVars.DiskFactor * rowSize // Cost of evaluating outer filter. if len(p.LeftConditions)+len(p.RightConditions) > 0 { // Input outer count for the above compution should be adjusted by selectionFactor. probeCost *= selectionFactor - probeCost += outerCnt * sessVars.CPUFactor + probeDiskCost *= selectionFactor + probeCost += probeCnt * sessVars.CPUFactor } + diskCost += probeDiskCost probeCost /= float64(p.Concurrency) // Cost of additional concurrent goroutines. cpuCost += probeCost + float64(p.Concurrency+1)*sessVars.ConcurrencyFactor // Cost of traveling the hash table to resolve missing matched cases when building the hash table from the outer table if p.UseOuterToBuild { - cpuCost += innerCnt * sessVars.CPUFactor / float64(p.Concurrency) + if spill { + // It runs in sequence when build data is on disk. See handleUnmatchedRowsFromHashTableInDisk + cpuCost += buildCnt * sessVars.CPUFactor + } else { + cpuCost += buildCnt * sessVars.CPUFactor / float64(p.Concurrency) + } + diskCost += buildCnt * sessVars.DiskFactor * rowSize } - return cpuCost + memoryCost + + if spill { + memoryCost *= float64(memQuota) / (rowSize * buildCnt) + } else { + diskCost = 0 + } + return cpuCost + memoryCost + diskCost } func (p *PhysicalHashJoin) attach2Task(tasks ...task) task { diff --git a/session/session.go b/session/session.go index 6ca949996dccf..0e79e405965db 100644 --- a/session/session.go +++ b/session/session.go @@ -1836,6 +1836,7 @@ var builtinGlobalVariable = []string{ variable.TiDBOptScanFactor, variable.TiDBOptDescScanFactor, variable.TiDBOptMemoryFactor, + variable.TiDBOptDiskFactor, variable.TiDBOptConcurrencyFactor, variable.TiDBDistSQLScanConcurrency, variable.TiDBInitChunkSize, diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 3ec99b0d87cd2..3f0b482ef6bec 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -308,6 +308,8 @@ type SessionVars struct { SeekFactor float64 // MemoryFactor is the memory cost of storing one tuple. MemoryFactor float64 + // DiskFactor is the IO cost of reading/writing one byte to temporary disk. + DiskFactor float64 // ConcurrencyFactor is the CPU cost of additional one goroutine. ConcurrencyFactor float64 @@ -517,6 +519,7 @@ func NewSessionVars() *SessionVars { DescScanFactor: DefOptDescScanFactor, SeekFactor: DefOptSeekFactor, MemoryFactor: DefOptMemoryFactor, + DiskFactor: DefOptDiskFactor, ConcurrencyFactor: DefOptConcurrencyFactor, EnableRadixJoin: false, EnableVectorizedExpression: DefEnableVectorizedExpression, @@ -848,6 +851,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.SeekFactor = tidbOptFloat64(val, DefOptSeekFactor) case TiDBOptMemoryFactor: s.MemoryFactor = tidbOptFloat64(val, DefOptMemoryFactor) + case TiDBOptDiskFactor: + s.DiskFactor = tidbOptFloat64(val, DefOptDiskFactor) case TiDBOptConcurrencyFactor: s.ConcurrencyFactor = tidbOptFloat64(val, DefOptConcurrencyFactor) case TiDBIndexLookupConcurrency: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 22b042941948e..3f05abd598606 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -648,6 +648,7 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal | ScopeSession, TiDBOptDescScanFactor, strconv.FormatFloat(DefOptDescScanFactor, 'f', -1, 64)}, {ScopeGlobal | ScopeSession, TiDBOptSeekFactor, strconv.FormatFloat(DefOptSeekFactor, 'f', -1, 64)}, {ScopeGlobal | ScopeSession, TiDBOptMemoryFactor, strconv.FormatFloat(DefOptMemoryFactor, 'f', -1, 64)}, + {ScopeGlobal | ScopeSession, TiDBOptDiskFactor, strconv.FormatFloat(DefOptDiskFactor, 'f', -1, 64)}, {ScopeGlobal | ScopeSession, TiDBOptConcurrencyFactor, strconv.FormatFloat(DefOptConcurrencyFactor, 'f', -1, 64)}, {ScopeGlobal | ScopeSession, TiDBIndexJoinBatchSize, strconv.Itoa(DefIndexJoinBatchSize)}, {ScopeGlobal | ScopeSession, TiDBIndexLookupSize, strconv.Itoa(DefIndexLookupSize)}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 11ac625d62861..e50cbbd7ae5e3 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -200,6 +200,8 @@ const ( TiDBOptSeekFactor = "tidb_opt_seek_factor" // tidb_opt_memory_factor is the memory cost of storing one tuple. TiDBOptMemoryFactor = "tidb_opt_memory_factor" + // tidb_opt_disk_factor is the IO cost of reading/writing one byte to temporary disk. + TiDBOptDiskFactor = "tidb_opt_disk_factor" // tidb_opt_concurrency_factor is the CPU cost of additional one goroutine. TiDBOptConcurrencyFactor = "tidb_opt_concurrency_factor" @@ -382,6 +384,7 @@ const ( DefOptDescScanFactor = 3.0 DefOptSeekFactor = 20.0 DefOptMemoryFactor = 0.001 + DefOptDiskFactor = 1.5 DefOptConcurrencyFactor = 3.0 DefOptInSubqToJoinAndAgg = true DefBatchInsert = false diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 954a597a4e5fe..db0b7f8e6fb81 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -503,6 +503,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, TiDBOptDescScanFactor, TiDBOptSeekFactor, TiDBOptMemoryFactor, + TiDBOptDiskFactor, TiDBOptConcurrencyFactor: v, err := strconv.ParseFloat(value, 64) if err != nil { diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index f2451f62b39ad..66801cf3b9827 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -352,6 +352,14 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { c.Assert(val, Equals, "1.0") c.Assert(v.MemoryFactor, Equals, 1.0) + c.Assert(v.DiskFactor, Equals, 1.5) + err = SetSessionSystemVar(v, TiDBOptDiskFactor, types.NewStringDatum("1.1")) + c.Assert(err, IsNil) + val, err = GetSessionSystemVar(v, TiDBOptDiskFactor) + c.Assert(err, IsNil) + c.Assert(val, Equals, "1.1") + c.Assert(v.DiskFactor, Equals, 1.1) + c.Assert(v.ConcurrencyFactor, Equals, 3.0) err = SetSessionSystemVar(v, TiDBOptConcurrencyFactor, types.NewStringDatum("5.0")) c.Assert(err, IsNil) @@ -455,6 +463,8 @@ func (s *testVarsutilSuite) TestValidate(c *C) { {TiDBOptSeekFactor, "-2", true}, {TiDBOptMemoryFactor, "a", true}, {TiDBOptMemoryFactor, "-2", true}, + {TiDBOptDiskFactor, "a", true}, + {TiDBOptDiskFactor, "-2", true}, {TiDBOptConcurrencyFactor, "a", true}, {TiDBOptConcurrencyFactor, "-2", true}, {TxnIsolation, "READ-UNCOMMITTED", true}, diff --git a/statistics/handle/handle_test.go b/statistics/handle/handle_test.go index 2be0e6a445f3c..a72b38851c5a2 100644 --- a/statistics/handle/handle_test.go +++ b/statistics/handle/handle_test.go @@ -17,6 +17,7 @@ import ( "fmt" "testing" "time" + "unsafe" . "github.com/pingcap/check" "github.com/pingcap/errors" @@ -209,11 +210,15 @@ func (s *testStatsSuite) TestAvgColLen(c *C) { tableInfo := tbl.Meta() statsTbl := do.StatsHandle().GetTableStats(tableInfo) c.Assert(statsTbl.Columns[tableInfo.Columns[0].ID].AvgColSize(statsTbl.Count, false), Equals, 1.0) + c.Assert(statsTbl.Columns[tableInfo.Columns[0].ID].AvgColSizeListInDisk(statsTbl.Count), Equals, 8.0) // The size of varchar type is LEN + BYTE, here is 1 + 7 = 8 c.Assert(statsTbl.Columns[tableInfo.Columns[1].ID].AvgColSize(statsTbl.Count, false), Equals, 8.0) c.Assert(statsTbl.Columns[tableInfo.Columns[2].ID].AvgColSize(statsTbl.Count, false), Equals, 8.0) c.Assert(statsTbl.Columns[tableInfo.Columns[3].ID].AvgColSize(statsTbl.Count, false), Equals, 8.0) + c.Assert(statsTbl.Columns[tableInfo.Columns[1].ID].AvgColSizeListInDisk(statsTbl.Count), Equals, 8.0-1) + c.Assert(statsTbl.Columns[tableInfo.Columns[2].ID].AvgColSizeListInDisk(statsTbl.Count), Equals, float64(unsafe.Sizeof(float32(12.3)))) + c.Assert(statsTbl.Columns[tableInfo.Columns[3].ID].AvgColSizeListInDisk(statsTbl.Count), Equals, float64(unsafe.Sizeof(types.Time{}))) testKit.MustExec("insert into t values(132, '123456789112', 1232.3, '2018-03-07 19:17:29')") testKit.MustExec("analyze table t") statsTbl = do.StatsHandle().GetTableStats(tableInfo) @@ -221,6 +226,10 @@ func (s *testStatsSuite) TestAvgColLen(c *C) { c.Assert(statsTbl.Columns[tableInfo.Columns[1].ID].AvgColSize(statsTbl.Count, false), Equals, 10.5) c.Assert(statsTbl.Columns[tableInfo.Columns[2].ID].AvgColSize(statsTbl.Count, false), Equals, 8.0) c.Assert(statsTbl.Columns[tableInfo.Columns[3].ID].AvgColSize(statsTbl.Count, false), Equals, 8.0) + c.Assert(statsTbl.Columns[tableInfo.Columns[0].ID].AvgColSizeListInDisk(statsTbl.Count), Equals, 8.0) + c.Assert(statsTbl.Columns[tableInfo.Columns[1].ID].AvgColSizeListInDisk(statsTbl.Count), Equals, 10.5-1) + c.Assert(statsTbl.Columns[tableInfo.Columns[2].ID].AvgColSizeListInDisk(statsTbl.Count), Equals, float64(unsafe.Sizeof(float32(12.3)))) + c.Assert(statsTbl.Columns[tableInfo.Columns[3].ID].AvgColSizeListInDisk(statsTbl.Count), Equals, float64(unsafe.Sizeof(types.Time{}))) } func (s *testStatsSuite) TestDurationToTS(c *C) { diff --git a/statistics/histogram.go b/statistics/histogram.go index 83497b4ed112e..346678ff6136f 100644 --- a/statistics/histogram.go +++ b/statistics/histogram.go @@ -140,6 +140,26 @@ func (c *Column) AvgColSize(count int64, isKey bool) float64 { return math.Round(float64(c.TotColSize)/float64(count)*100) / 100 } +// AvgColSizeListInDisk is the average column size of the histogram. These sizes are derived +// from `chunk.ListInDisk` so we need to update them if those 2 functions are changed. +func (c *Column) AvgColSizeListInDisk(count int64) float64 { + if count == 0 { + return 0 + } + histCount := c.TotalRowCount() + notNullRatio := 1.0 + if histCount > 0 { + notNullRatio = 1.0 - float64(c.NullCount)/histCount + } + size := chunk.GetFixedLen(c.Histogram.Tp) + if size != -1 { + return float64(size) * notNullRatio + } + // Keep two decimal place. + // size of varchar type is LEN + BYTE, so we minus 1 here. + return math.Round(float64(c.TotColSize)/float64(count)*100)/100 - 1 +} + // AppendBucket appends a bucket into `hg`. func (hg *Histogram) AppendBucket(lower *types.Datum, upper *types.Datum, count, repeat int64) { hg.Buckets = append(hg.Buckets, Bucket{Count: count, Repeat: repeat}) diff --git a/statistics/table.go b/statistics/table.go index c3d79ad621fa9..d056986afeca6 100644 --- a/statistics/table.go +++ b/statistics/table.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/ranger" "go.uber.org/atomic" @@ -694,6 +695,28 @@ func (coll *HistColl) GetAvgRowSize(cols []*expression.Column, isEncodedKey bool return size + float64(len(cols)) } +// GetAvgRowSizeListInDisk computes average row size for given columns. +func (coll *HistColl) GetAvgRowSizeListInDisk(cols []*expression.Column, padChar bool) (size float64) { + if coll.Pseudo || len(coll.Columns) == 0 || coll.Count == 0 { + for _, col := range cols { + size += float64(chunk.EstimateTypeWidth(padChar, col.GetType())) + } + } else { + for _, col := range cols { + colHist, ok := coll.Columns[col.UniqueID] + // Normally this would not happen, it is for compatibility with old version stats which + // does not include TotColSize. + if !ok || (!colHist.IsHandle && colHist.TotColSize == 0 && (colHist.NullCount != coll.Count)) { + size += float64(chunk.EstimateTypeWidth(padChar, col.GetType())) + continue + } + size += colHist.AvgColSizeListInDisk(coll.Count) + } + } + // Add 8 byte for each column's size record. See `ListInDisk` for details. + return size + float64(8*len(cols)) +} + // GetTableAvgRowSize computes average row size for a table scan, exclude the index key-value pairs. func (coll *HistColl) GetTableAvgRowSize(cols []*expression.Column, storeType kv.StoreType, handleInCols bool) (size float64) { size = coll.GetAvgRowSize(cols, false) diff --git a/util/chunk/codec.go b/util/chunk/codec.go index f866ac34c6efc..51ef7b6baa718 100644 --- a/util/chunk/codec.go +++ b/util/chunk/codec.go @@ -184,6 +184,50 @@ func getFixedLen(colType *types.FieldType) int { } } +// GetFixedLen get the memory size of a fixed-length type. +// if colType is not fixed-length, it returns varElemLen, aka -1. +func GetFixedLen(colType *types.FieldType) int { + return getFixedLen(colType) +} + +// EstimateTypeWidth estimates the average width of values of the type. +// This is used by the planner, which doesn't require absolutely correct results; +// it's OK (and expected) to guess if we don't know for sure. +// +// mostly study from https://github.com/postgres/postgres/blob/REL_12_STABLE/src/backend/utils/cache/lsyscache.c#L2356 +func EstimateTypeWidth(padChar bool, colType *types.FieldType) int { + colLen := getFixedLen(colType) + // Easy if it's a fixed-width type + if colLen != varElemLen { + return colLen + } + + colLen = colType.Flen + if colLen > 0 { + /* + * If PAD_CHAR_TO_FULL_LENGTH is enabled, and type is CHAR, + * the colType.Flen is also the only width. + */ + if padChar && colType.Tp == mysql.TypeString { + return colLen + } + if colLen <= 32 { + return colLen + } + if colLen < 1000 { + return 32 + (colLen-32)/2 // assume 50% + } + /* + * Beyond 1000, assume we're looking at something like + * "varchar(10000)" where the limit isn't actually reached often, and + * use a fixed estimate. + */ + return 32 + (1000-32)/2 + } + // Oops, we have no idea ... wild guess time. + return 32 +} + func init() { for i := 0; i < 128; i++ { allNotNullBitmap[i] = 0xFF diff --git a/util/chunk/codec_test.go b/util/chunk/codec_test.go index b904ed9756f85..5e22a3c8f10cb 100644 --- a/util/chunk/codec_test.go +++ b/util/chunk/codec_test.go @@ -77,6 +77,28 @@ func (s *testCodecSuite) TestCodec(c *check.C) { } } +func (s *testCodecSuite) TestEstimateTypeWidth(c *check.C) { + var colType *types.FieldType + + colType = &types.FieldType{Tp: mysql.TypeLonglong} + c.Assert(EstimateTypeWidth(false, colType), check.Equals, 8) // fixed-witch type + + colType = &types.FieldType{Tp: mysql.TypeString, Flen: 100000} + c.Assert(EstimateTypeWidth(true, colType), check.Equals, 100000) // PAD_CHAR_TO_FULL_LENGTH + + colType = &types.FieldType{Tp: mysql.TypeString, Flen: 31} + c.Assert(EstimateTypeWidth(false, colType), check.Equals, 31) // colLen <= 32 + + colType = &types.FieldType{Tp: mysql.TypeString, Flen: 999} + c.Assert(EstimateTypeWidth(false, colType), check.Equals, 515) // colLen < 1000 + + colType = &types.FieldType{Tp: mysql.TypeString, Flen: 2000} + c.Assert(EstimateTypeWidth(false, colType), check.Equals, 516) // colLen < 1000 + + colType = &types.FieldType{Tp: mysql.TypeString} + c.Assert(EstimateTypeWidth(false, colType), check.Equals, 32) // value after guessing +} + func BenchmarkEncodeChunk(b *testing.B) { numCols := 4 numRows := 1024 diff --git a/util/memory/tracker.go b/util/memory/tracker.go index a28116c85feec..400c91c8bac61 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -78,6 +78,12 @@ func (t *Tracker) SetBytesLimit(bytesLimit int64) { t.bytesLimit = bytesLimit } +// GetBytesLimit gets the bytes limit for this tracker. +// "bytesLimit <= 0" means no limit. +func (t *Tracker) GetBytesLimit() int64 { + return t.bytesLimit +} + // SetActionOnExceed sets the action when memory usage exceeds bytesLimit. func (t *Tracker) SetActionOnExceed(a ActionOnExceed) { t.actionMu.Lock()