Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topsql: add label for the table and schema #260

Merged
merged 2 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 151 additions & 3 deletions component/subscriber/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@ package subscriber

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"reflect"
"strconv"
"sync"
"time"

"github.com/pingcap/ng-monitoring/component/domain"
"github.com/pingcap/ng-monitoring/component/subscriber/model"
"github.com/pingcap/ng-monitoring/component/topology"
"github.com/pingcap/ng-monitoring/config"
"github.com/pingcap/ng-monitoring/config/pdvariable"
"github.com/pingcap/ng-monitoring/utils"
"go.uber.org/zap"

"github.com/pingcap/log"
)
Expand All @@ -26,13 +34,19 @@ type Manager struct {
topoSubscriber topology.Subscriber
cfgSubscriber config.Subscriber
varSubscriber pdvariable.Subscriber
httpCli *http.Client

do *domain.Domain
schemaCache *sync.Map
schemaVersion int64

subscribeController SubscribeController
}

func NewManager(
ctx context.Context,
wg *sync.WaitGroup,
do *domain.Domain,
varSubscriber pdvariable.Subscriber,
topoSubscriber topology.Subscriber,
cfgSubscriber config.Subscriber,
Expand All @@ -42,29 +56,35 @@ func NewManager(
ctx: ctx,
wg: wg,

scrapers: make(map[topology.Component]Scraper),
scrapers: make(map[topology.Component]Scraper),
schemaCache: &sync.Map{},

varSubscriber: varSubscriber,
topoSubscriber: topoSubscriber,
cfgSubscriber: cfgSubscriber,

do: do,
prevEnabled: subscribeController.IsEnabled(),
subscribeController: subscribeController,
}
}

func (m *Manager) Run() {
defer m.clearScrapers()

ticker := time.NewTicker(30 * time.Second)
for {
select {
case getCfg := <-m.cfgSubscriber:
m.subscribeController.UpdateConfig(getCfg())
m.httpCli = m.subscribeController.NewHTTPClient()
case getVars := <-m.varSubscriber:
m.subscribeController.UpdatePDVariable(getVars())
case getTopology := <-m.topoSubscriber:
m.components = getTopology()
m.subscribeController.UpdateTopology(getTopology())
case <-ticker.C:
m.updateSchemaCache()
continue
case <-m.ctx.Done():
return
}
Expand All @@ -87,6 +107,134 @@ func (m *Manager) Run() {
}
}

func (m *Manager) updateSchemaCache() {
if !m.subscribeController.IsEnabled() {
// clear cache
m.schemaCache.Range(func(k, v interface{}) bool {
m.schemaCache.Delete(k)
return true
})
m.schemaVersion = 0
return
}
if m.do == nil {
return
}

ectx, cancel := context.WithTimeout(context.TODO(), 3*time.Second)
etcdCli, err := m.do.GetEtcdClient()
defer cancel()
if err != nil {
log.Error("failed to get etcd client", zap.Error(err))
return
}
resp, err := etcdCli.Get(ectx, model.SchemaVersionPath)
if err != nil || len(resp.Kvs) != 1 {
if resp != nil && len(resp.Kvs) == 0 {
return
}
log.Warn("failed to get tidb schema version", zap.Error(err))
return
}
schemaVersion, err := strconv.ParseInt(string(resp.Kvs[0].Value), 10, 64)
if err != nil {
log.Warn("failed to get tidb schema version", zap.Error(err))
return
}
if schemaVersion == m.schemaVersion {
return
}
log.Info("schema version changed", zap.Int64("old", m.schemaVersion), zap.Int64("new", schemaVersion))
m.tryUpdateSchemaCache(schemaVersion)
}

type getConfig interface {
GetConfig() *config.Config
}

func (m *Manager) requestDB(path string, v interface{}) error {
schema := "http"
if sc, ok := m.subscribeController.(getConfig); ok && sc.GetConfig().Security.GetTLSConfig() != nil {
schema = "https"
}
for _, compt := range m.components {
if compt.Name != topology.ComponentTiDB {
continue
}

url := fmt.Sprintf("%s://%s:%d%s", schema, compt.IP, compt.StatusPort, path)
resp, err := m.httpCli.Get(url)
if err != nil {
log.Error("request failed", zap.Error(err))
continue
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Error("request failed", zap.String("status", resp.Status))
continue
}
if err := json.NewDecoder(resp.Body).Decode(v); err != nil {
log.Error("decode response failed", zap.Error(err))
continue
}
return nil
}
return fmt.Errorf("all request failed")
}

func (m *Manager) tryUpdateSchemaCache(schemaVersion int64) {
// get all database info
var dbInfos []*model.DBInfo
if err := m.requestDB("/schema", &dbInfos); err != nil {
return
}

// get all table info
updateSuccess := true
for _, db := range dbInfos {
if db.State == model.StateNone {
continue
}
var tableInfos []*model.TableInfo
encodeName := url.PathEscape(db.Name.O)
if err := m.requestDB(fmt.Sprintf("/schema/%s?id_name_only=true", encodeName), &tableInfos); err != nil {
updateSuccess = false
continue
}
log.Info("update table info", zap.String("db", db.Name.O), zap.Reflect("table-info", tableInfos))
if len(tableInfos) == 0 {
continue
}
for _, table := range tableInfos {
indices := make(map[int64]string, len(table.Indices))
for _, index := range table.Indices {
indices[index.ID] = index.Name.O
}
detail := &model.TableDetail{
Name: table.Name.O,
DB: db.Name.O,
ID: table.ID,
Indices: indices,
}
m.schemaCache.Store(table.ID, detail)
if partition := table.GetPartitionInfo(); partition != nil {
for _, partitionDef := range partition.Definitions {
detail := &model.TableDetail{
Name: fmt.Sprintf("%s/%s", table.Name.O, partitionDef.Name.O),
DB: db.Name.O,
ID: partitionDef.ID,
Indices: indices,
}
m.schemaCache.Store(partitionDef.ID, detail)
}
}
}
}
if updateSuccess {
m.schemaVersion = schemaVersion
}
}

func (m *Manager) updateScrapers() {
// clean up closed scrapers
for component, scraper := range m.scrapers {
Expand All @@ -109,7 +257,7 @@ func (m *Manager) updateScrapers() {

// set up incoming scrapers
for i := range in {
scraper := m.subscribeController.NewScraper(m.ctx, in[i])
scraper := m.subscribeController.NewScraper(m.ctx, in[i], m.schemaCache)
m.scrapers[in[i]] = scraper

if !isNil(scraper) {
Expand Down
9 changes: 8 additions & 1 deletion component/subscriber/mock_sub_controller_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

101 changes: 101 additions & 0 deletions component/subscriber/model/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0.

package model

// SchemaState is the state for schema elements.
type SchemaState byte

const (
// StateNone means this schema element is absent and can't be used.
StateNone SchemaState = iota
// StateDeleteOnly means we can only delete items for this schema element.
StateDeleteOnly
// StateWriteOnly means we can use any write operation on this schema element,
// but outer can't read the changed data.
StateWriteOnly
// StateWriteReorganization means we are re-organizing whole data after write only state.
StateWriteReorganization
// StateDeleteReorganization means we are re-organizing whole data after delete only state.
StateDeleteReorganization
// StatePublic means this schema element is ok for all write and read operations.
StatePublic
)

const (
SchemaVersionPath = "/tidb/ddl/global_schema_version"
)

// CIStr is case insensitive string.
type CIStr struct {
O string `json:"O"` // Original string.
L string `json:"L"` // Lower case string.
}

// DBInfo provides meta data describing a DB.
type DBInfo struct {
ID int64 `json:"id"`
Name CIStr `json:"db_name"`
State SchemaState `json:"state"`
}

// IndexInfo provides meta data describing a DB index.
// It corresponds to the statement `CREATE INDEX Name ON Table (Column);`
// See https://dev.mysql.com/doc/refman/5.7/en/create-index.html
type IndexInfo struct {
ID int64 `json:"id"`
Name CIStr `json:"idx_name"`
}

// PartitionDefinition defines a single partition.
type PartitionDefinition struct {
ID int64 `json:"id"`
Name CIStr `json:"name"`
}

// PartitionInfo provides table partition info.
type PartitionInfo struct {
// User may already creates table with partition but table partition is not
// yet supported back then. When Enable is true, write/read need use tid
// rather than pid.
Enable bool `json:"enable"`
Definitions []*PartitionDefinition `json:"definitions"`
}

// TableInfo provides meta data describing a DB table.
type TableInfo struct {
ID int64 `json:"id"`
Name CIStr `json:"name"`
Indices []*IndexInfo `json:"index_info"`
Partition *PartitionInfo `json:"partition"`
}

// GetPartitionInfo returns the partition information.
func (t *TableInfo) GetPartitionInfo() *PartitionInfo {
if t.Partition != nil && t.Partition.Enable {
return t.Partition
}
return nil
}

type DBTablesInfo struct {
DB DBInfo `json:"db"`
Tables []TableInfo `json:"tables"`
}

type DBTableInfo struct {
DB DBInfo
Table IndexedTableInfo
}

type IndexedTableInfo struct {
ID int64
Name CIStr
Indices map[int64]string
}

type TableDetail struct {
Name string
DB string
ID int64
Indices map[int64]string
}
5 changes: 4 additions & 1 deletion component/subscriber/sub_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package subscriber

import (
"context"
"net/http"
"sync"

"github.com/pingcap/ng-monitoring/component/topology"
"github.com/pingcap/ng-monitoring/config"
Expand All @@ -16,10 +18,11 @@ type SubscribeController interface {
UpdatePDVariable(pdvariable.PDVariable)
UpdateConfig(config.Config)
UpdateTopology([]topology.Component)
NewHTTPClient() *http.Client
}

type ScraperFactory interface {
NewScraper(ctx context.Context, component topology.Component) Scraper
NewScraper(ctx context.Context, component topology.Component, schemaInfo *sync.Map) Scraper
}

type Scraper interface {
Expand Down
Loading
Loading