Skip to content

Commit

Permalink
planner: consider disk cost in hashJoin (pingcap#13246)
Browse files Browse the repository at this point in the history
  • Loading branch information
SunRunAway authored Dec 3, 2019
1 parent 744c5c1 commit 5a589c9
Show file tree
Hide file tree
Showing 14 changed files with 196 additions and 9 deletions.
6 changes: 5 additions & 1 deletion planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,11 @@ func (is *PhysicalIndexScan) initSchema(idx *model.IndexInfo, idxExprCols []*exp
// If it's double read case, the first index must return handle. So we should add extra handle column
// if there isn't a handle column.
if isDoubleRead && !setHandle {
indexCols = append(indexCols, &expression.Column{ID: model.ExtraHandleID, UniqueID: is.ctx.GetSessionVars().AllocPlanColumnID()})
indexCols = append(indexCols, &expression.Column{
RetType: types.NewFieldType(mysql.TypeLonglong),
ID: model.ExtraHandleID,
UniqueID: is.ctx.GetSessionVars().AllocPlanColumnID(),
})
}
is.SetSchema(expression.NewSchema(indexCols...))
}
Expand Down
54 changes: 46 additions & 8 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/plancodec"
)

Expand Down Expand Up @@ -390,17 +392,38 @@ func (p *PhysicalIndexJoin) GetCost(outerTask, innerTask task) float64 {
return outerTask.cost() + innerPlanCost + cpuCost + memoryCost
}

func (p *PhysicalHashJoin) avgRowSize(inner PhysicalPlan) (size float64) {
padChar := p.ctx.GetSessionVars().StmtCtx.PadCharToFullLength
if inner.statsInfo().HistColl != nil {
size = inner.statsInfo().HistColl.GetAvgRowSizeListInDisk(inner.Schema().Columns, padChar)
} else {
// Estimate using just the type info.
cols := inner.Schema().Columns
for _, col := range cols {
size += float64(chunk.EstimateTypeWidth(padChar, col.GetType()))
}
}
return
}

// GetCost computes cost of hash join operator itself.
func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64) float64 {
innerCnt, outerCnt := lCnt, rCnt
buildCnt, probeCnt := lCnt, rCnt
build := p.children[0]
// Taking the right as the inner for right join or using the outer to build a hash table.
if (p.InnerChildIdx == 1 && !p.UseOuterToBuild) || (p.InnerChildIdx == 0 && p.UseOuterToBuild) {
innerCnt, outerCnt = rCnt, lCnt
buildCnt, probeCnt = rCnt, lCnt
build = p.children[1]
}
sessVars := p.ctx.GetSessionVars()
oomUseTmpStorage := config.GetGlobalConfig().OOMUseTmpStorage
memQuota := sessVars.StmtCtx.MemTracker.GetBytesLimit() // sessVars.MemQuotaQuery && hint
rowSize := p.avgRowSize(build)
spill := oomUseTmpStorage && memQuota > 0 && rowSize*buildCnt > float64(memQuota)
// Cost of building hash table.
cpuCost := innerCnt * sessVars.CPUFactor
memoryCost := innerCnt * sessVars.MemoryFactor
cpuCost := buildCnt * sessVars.CPUFactor
memoryCost := buildCnt * sessVars.MemoryFactor
diskCost := buildCnt * sessVars.DiskFactor * rowSize
// Number of matched row pairs regarding the equal join conditions.
helper := &fullJoinRowCountHelper{
cartesian: false,
Expand Down Expand Up @@ -428,23 +451,38 @@ func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64) float64 {
numPairs = 0
}
}
// Cost of quering hash table is cheap actually, so we just compute the cost of
// Cost of querying hash table is cheap actually, so we just compute the cost of
// evaluating `OtherConditions` and joining row pairs.
probeCost := numPairs * sessVars.CPUFactor
probeDiskCost := numPairs * sessVars.DiskFactor * rowSize
// Cost of evaluating outer filter.
if len(p.LeftConditions)+len(p.RightConditions) > 0 {
// Input outer count for the above compution should be adjusted by selectionFactor.
probeCost *= selectionFactor
probeCost += outerCnt * sessVars.CPUFactor
probeDiskCost *= selectionFactor
probeCost += probeCnt * sessVars.CPUFactor
}
diskCost += probeDiskCost
probeCost /= float64(p.Concurrency)
// Cost of additional concurrent goroutines.
cpuCost += probeCost + float64(p.Concurrency+1)*sessVars.ConcurrencyFactor
// Cost of traveling the hash table to resolve missing matched cases when building the hash table from the outer table
if p.UseOuterToBuild {
cpuCost += innerCnt * sessVars.CPUFactor / float64(p.Concurrency)
if spill {
// It runs in sequence when build data is on disk. See handleUnmatchedRowsFromHashTableInDisk
cpuCost += buildCnt * sessVars.CPUFactor
} else {
cpuCost += buildCnt * sessVars.CPUFactor / float64(p.Concurrency)
}
diskCost += buildCnt * sessVars.DiskFactor * rowSize
}
return cpuCost + memoryCost

if spill {
memoryCost *= float64(memQuota) / (rowSize * buildCnt)
} else {
diskCost = 0
}
return cpuCost + memoryCost + diskCost
}

func (p *PhysicalHashJoin) attach2Task(tasks ...task) task {
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1836,6 +1836,7 @@ var builtinGlobalVariable = []string{
variable.TiDBOptScanFactor,
variable.TiDBOptDescScanFactor,
variable.TiDBOptMemoryFactor,
variable.TiDBOptDiskFactor,
variable.TiDBOptConcurrencyFactor,
variable.TiDBDistSQLScanConcurrency,
variable.TiDBInitChunkSize,
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ type SessionVars struct {
SeekFactor float64
// MemoryFactor is the memory cost of storing one tuple.
MemoryFactor float64
// DiskFactor is the IO cost of reading/writing one byte to temporary disk.
DiskFactor float64
// ConcurrencyFactor is the CPU cost of additional one goroutine.
ConcurrencyFactor float64

Expand Down Expand Up @@ -517,6 +519,7 @@ func NewSessionVars() *SessionVars {
DescScanFactor: DefOptDescScanFactor,
SeekFactor: DefOptSeekFactor,
MemoryFactor: DefOptMemoryFactor,
DiskFactor: DefOptDiskFactor,
ConcurrencyFactor: DefOptConcurrencyFactor,
EnableRadixJoin: false,
EnableVectorizedExpression: DefEnableVectorizedExpression,
Expand Down Expand Up @@ -848,6 +851,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.SeekFactor = tidbOptFloat64(val, DefOptSeekFactor)
case TiDBOptMemoryFactor:
s.MemoryFactor = tidbOptFloat64(val, DefOptMemoryFactor)
case TiDBOptDiskFactor:
s.DiskFactor = tidbOptFloat64(val, DefOptDiskFactor)
case TiDBOptConcurrencyFactor:
s.ConcurrencyFactor = tidbOptFloat64(val, DefOptConcurrencyFactor)
case TiDBIndexLookupConcurrency:
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBOptDescScanFactor, strconv.FormatFloat(DefOptDescScanFactor, 'f', -1, 64)},
{ScopeGlobal | ScopeSession, TiDBOptSeekFactor, strconv.FormatFloat(DefOptSeekFactor, 'f', -1, 64)},
{ScopeGlobal | ScopeSession, TiDBOptMemoryFactor, strconv.FormatFloat(DefOptMemoryFactor, 'f', -1, 64)},
{ScopeGlobal | ScopeSession, TiDBOptDiskFactor, strconv.FormatFloat(DefOptDiskFactor, 'f', -1, 64)},
{ScopeGlobal | ScopeSession, TiDBOptConcurrencyFactor, strconv.FormatFloat(DefOptConcurrencyFactor, 'f', -1, 64)},
{ScopeGlobal | ScopeSession, TiDBIndexJoinBatchSize, strconv.Itoa(DefIndexJoinBatchSize)},
{ScopeGlobal | ScopeSession, TiDBIndexLookupSize, strconv.Itoa(DefIndexLookupSize)},
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ const (
TiDBOptSeekFactor = "tidb_opt_seek_factor"
// tidb_opt_memory_factor is the memory cost of storing one tuple.
TiDBOptMemoryFactor = "tidb_opt_memory_factor"
// tidb_opt_disk_factor is the IO cost of reading/writing one byte to temporary disk.
TiDBOptDiskFactor = "tidb_opt_disk_factor"
// tidb_opt_concurrency_factor is the CPU cost of additional one goroutine.
TiDBOptConcurrencyFactor = "tidb_opt_concurrency_factor"

Expand Down Expand Up @@ -382,6 +384,7 @@ const (
DefOptDescScanFactor = 3.0
DefOptSeekFactor = 20.0
DefOptMemoryFactor = 0.001
DefOptDiskFactor = 1.5
DefOptConcurrencyFactor = 3.0
DefOptInSubqToJoinAndAgg = true
DefBatchInsert = false
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
TiDBOptDescScanFactor,
TiDBOptSeekFactor,
TiDBOptMemoryFactor,
TiDBOptDiskFactor,
TiDBOptConcurrencyFactor:
v, err := strconv.ParseFloat(value, 64)
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,14 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) {
c.Assert(val, Equals, "1.0")
c.Assert(v.MemoryFactor, Equals, 1.0)

c.Assert(v.DiskFactor, Equals, 1.5)
err = SetSessionSystemVar(v, TiDBOptDiskFactor, types.NewStringDatum("1.1"))
c.Assert(err, IsNil)
val, err = GetSessionSystemVar(v, TiDBOptDiskFactor)
c.Assert(err, IsNil)
c.Assert(val, Equals, "1.1")
c.Assert(v.DiskFactor, Equals, 1.1)

c.Assert(v.ConcurrencyFactor, Equals, 3.0)
err = SetSessionSystemVar(v, TiDBOptConcurrencyFactor, types.NewStringDatum("5.0"))
c.Assert(err, IsNil)
Expand Down Expand Up @@ -455,6 +463,8 @@ func (s *testVarsutilSuite) TestValidate(c *C) {
{TiDBOptSeekFactor, "-2", true},
{TiDBOptMemoryFactor, "a", true},
{TiDBOptMemoryFactor, "-2", true},
{TiDBOptDiskFactor, "a", true},
{TiDBOptDiskFactor, "-2", true},
{TiDBOptConcurrencyFactor, "a", true},
{TiDBOptConcurrencyFactor, "-2", true},
{TxnIsolation, "READ-UNCOMMITTED", true},
Expand Down
9 changes: 9 additions & 0 deletions statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"testing"
"time"
"unsafe"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -209,18 +210,26 @@ func (s *testStatsSuite) TestAvgColLen(c *C) {
tableInfo := tbl.Meta()
statsTbl := do.StatsHandle().GetTableStats(tableInfo)
c.Assert(statsTbl.Columns[tableInfo.Columns[0].ID].AvgColSize(statsTbl.Count, false), Equals, 1.0)
c.Assert(statsTbl.Columns[tableInfo.Columns[0].ID].AvgColSizeListInDisk(statsTbl.Count), Equals, 8.0)

// The size of varchar type is LEN + BYTE, here is 1 + 7 = 8
c.Assert(statsTbl.Columns[tableInfo.Columns[1].ID].AvgColSize(statsTbl.Count, false), Equals, 8.0)
c.Assert(statsTbl.Columns[tableInfo.Columns[2].ID].AvgColSize(statsTbl.Count, false), Equals, 8.0)
c.Assert(statsTbl.Columns[tableInfo.Columns[3].ID].AvgColSize(statsTbl.Count, false), Equals, 8.0)
c.Assert(statsTbl.Columns[tableInfo.Columns[1].ID].AvgColSizeListInDisk(statsTbl.Count), Equals, 8.0-1)
c.Assert(statsTbl.Columns[tableInfo.Columns[2].ID].AvgColSizeListInDisk(statsTbl.Count), Equals, float64(unsafe.Sizeof(float32(12.3))))
c.Assert(statsTbl.Columns[tableInfo.Columns[3].ID].AvgColSizeListInDisk(statsTbl.Count), Equals, float64(unsafe.Sizeof(types.Time{})))
testKit.MustExec("insert into t values(132, '123456789112', 1232.3, '2018-03-07 19:17:29')")
testKit.MustExec("analyze table t")
statsTbl = do.StatsHandle().GetTableStats(tableInfo)
c.Assert(statsTbl.Columns[tableInfo.Columns[0].ID].AvgColSize(statsTbl.Count, false), Equals, 1.5)
c.Assert(statsTbl.Columns[tableInfo.Columns[1].ID].AvgColSize(statsTbl.Count, false), Equals, 10.5)
c.Assert(statsTbl.Columns[tableInfo.Columns[2].ID].AvgColSize(statsTbl.Count, false), Equals, 8.0)
c.Assert(statsTbl.Columns[tableInfo.Columns[3].ID].AvgColSize(statsTbl.Count, false), Equals, 8.0)
c.Assert(statsTbl.Columns[tableInfo.Columns[0].ID].AvgColSizeListInDisk(statsTbl.Count), Equals, 8.0)
c.Assert(statsTbl.Columns[tableInfo.Columns[1].ID].AvgColSizeListInDisk(statsTbl.Count), Equals, 10.5-1)
c.Assert(statsTbl.Columns[tableInfo.Columns[2].ID].AvgColSizeListInDisk(statsTbl.Count), Equals, float64(unsafe.Sizeof(float32(12.3))))
c.Assert(statsTbl.Columns[tableInfo.Columns[3].ID].AvgColSizeListInDisk(statsTbl.Count), Equals, float64(unsafe.Sizeof(types.Time{})))
}

func (s *testStatsSuite) TestDurationToTS(c *C) {
Expand Down
20 changes: 20 additions & 0 deletions statistics/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,26 @@ func (c *Column) AvgColSize(count int64, isKey bool) float64 {
return math.Round(float64(c.TotColSize)/float64(count)*100) / 100
}

// AvgColSizeListInDisk is the average column size of the histogram. These sizes are derived
// from `chunk.ListInDisk` so we need to update them if those 2 functions are changed.
func (c *Column) AvgColSizeListInDisk(count int64) float64 {
if count == 0 {
return 0
}
histCount := c.TotalRowCount()
notNullRatio := 1.0
if histCount > 0 {
notNullRatio = 1.0 - float64(c.NullCount)/histCount
}
size := chunk.GetFixedLen(c.Histogram.Tp)
if size != -1 {
return float64(size) * notNullRatio
}
// Keep two decimal place.
// size of varchar type is LEN + BYTE, so we minus 1 here.
return math.Round(float64(c.TotColSize)/float64(count)*100)/100 - 1
}

// AppendBucket appends a bucket into `hg`.
func (hg *Histogram) AppendBucket(lower *types.Datum, upper *types.Datum, count, repeat int64) {
hg.Buckets = append(hg.Buckets, Bucket{Count: count, Repeat: repeat})
Expand Down
23 changes: 23 additions & 0 deletions statistics/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/ranger"
"go.uber.org/atomic"
Expand Down Expand Up @@ -694,6 +695,28 @@ func (coll *HistColl) GetAvgRowSize(cols []*expression.Column, isEncodedKey bool
return size + float64(len(cols))
}

// GetAvgRowSizeListInDisk computes average row size for given columns.
func (coll *HistColl) GetAvgRowSizeListInDisk(cols []*expression.Column, padChar bool) (size float64) {
if coll.Pseudo || len(coll.Columns) == 0 || coll.Count == 0 {
for _, col := range cols {
size += float64(chunk.EstimateTypeWidth(padChar, col.GetType()))
}
} else {
for _, col := range cols {
colHist, ok := coll.Columns[col.UniqueID]
// Normally this would not happen, it is for compatibility with old version stats which
// does not include TotColSize.
if !ok || (!colHist.IsHandle && colHist.TotColSize == 0 && (colHist.NullCount != coll.Count)) {
size += float64(chunk.EstimateTypeWidth(padChar, col.GetType()))
continue
}
size += colHist.AvgColSizeListInDisk(coll.Count)
}
}
// Add 8 byte for each column's size record. See `ListInDisk` for details.
return size + float64(8*len(cols))
}

// GetTableAvgRowSize computes average row size for a table scan, exclude the index key-value pairs.
func (coll *HistColl) GetTableAvgRowSize(cols []*expression.Column, storeType kv.StoreType, handleInCols bool) (size float64) {
size = coll.GetAvgRowSize(cols, false)
Expand Down
44 changes: 44 additions & 0 deletions util/chunk/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,50 @@ func getFixedLen(colType *types.FieldType) int {
}
}

// GetFixedLen get the memory size of a fixed-length type.
// if colType is not fixed-length, it returns varElemLen, aka -1.
func GetFixedLen(colType *types.FieldType) int {
return getFixedLen(colType)
}

// EstimateTypeWidth estimates the average width of values of the type.
// This is used by the planner, which doesn't require absolutely correct results;
// it's OK (and expected) to guess if we don't know for sure.
//
// mostly study from https://github.com/postgres/postgres/blob/REL_12_STABLE/src/backend/utils/cache/lsyscache.c#L2356
func EstimateTypeWidth(padChar bool, colType *types.FieldType) int {
colLen := getFixedLen(colType)
// Easy if it's a fixed-width type
if colLen != varElemLen {
return colLen
}

colLen = colType.Flen
if colLen > 0 {
/*
* If PAD_CHAR_TO_FULL_LENGTH is enabled, and type is CHAR,
* the colType.Flen is also the only width.
*/
if padChar && colType.Tp == mysql.TypeString {
return colLen
}
if colLen <= 32 {
return colLen
}
if colLen < 1000 {
return 32 + (colLen-32)/2 // assume 50%
}
/*
* Beyond 1000, assume we're looking at something like
* "varchar(10000)" where the limit isn't actually reached often, and
* use a fixed estimate.
*/
return 32 + (1000-32)/2
}
// Oops, we have no idea ... wild guess time.
return 32
}

func init() {
for i := 0; i < 128; i++ {
allNotNullBitmap[i] = 0xFF
Expand Down
22 changes: 22 additions & 0 deletions util/chunk/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,28 @@ func (s *testCodecSuite) TestCodec(c *check.C) {
}
}

func (s *testCodecSuite) TestEstimateTypeWidth(c *check.C) {
var colType *types.FieldType

colType = &types.FieldType{Tp: mysql.TypeLonglong}
c.Assert(EstimateTypeWidth(false, colType), check.Equals, 8) // fixed-witch type

colType = &types.FieldType{Tp: mysql.TypeString, Flen: 100000}
c.Assert(EstimateTypeWidth(true, colType), check.Equals, 100000) // PAD_CHAR_TO_FULL_LENGTH

colType = &types.FieldType{Tp: mysql.TypeString, Flen: 31}
c.Assert(EstimateTypeWidth(false, colType), check.Equals, 31) // colLen <= 32

colType = &types.FieldType{Tp: mysql.TypeString, Flen: 999}
c.Assert(EstimateTypeWidth(false, colType), check.Equals, 515) // colLen < 1000

colType = &types.FieldType{Tp: mysql.TypeString, Flen: 2000}
c.Assert(EstimateTypeWidth(false, colType), check.Equals, 516) // colLen < 1000

colType = &types.FieldType{Tp: mysql.TypeString}
c.Assert(EstimateTypeWidth(false, colType), check.Equals, 32) // value after guessing
}

func BenchmarkEncodeChunk(b *testing.B) {
numCols := 4
numRows := 1024
Expand Down
6 changes: 6 additions & 0 deletions util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ func (t *Tracker) SetBytesLimit(bytesLimit int64) {
t.bytesLimit = bytesLimit
}

// GetBytesLimit gets the bytes limit for this tracker.
// "bytesLimit <= 0" means no limit.
func (t *Tracker) GetBytesLimit() int64 {
return t.bytesLimit
}

// SetActionOnExceed sets the action when memory usage exceeds bytesLimit.
func (t *Tracker) SetActionOnExceed(a ActionOnExceed) {
t.actionMu.Lock()
Expand Down

0 comments on commit 5a589c9

Please sign in to comment.