-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrewriter.go
120 lines (104 loc) · 2.42 KB
/
rewriter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package main
import (
"bufio"
"fmt"
"io"
"log"
"os"
"sync"
"github.com/hashicorp/hcl/v2"
"github.com/heimdalr/dag"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/format"
)
type RuleVisitor struct {
Table *Table
}
func (v *RuleVisitor) Enter(in ast.Node) (ast.Node, bool) {
if stmt, ok := in.(*ast.InsertStmt); ok {
valuesExpr := stmt.Lists[0]
row := &Row{
Table: v.Table,
Values: &valuesExpr,
}
for _, rule := range v.Table.Rules {
err := rule.Apply(row)
if err != nil {
// @TODO continue and collect errors?
log.Fatal(err.Error())
}
}
}
return in, true
}
func (v *RuleVisitor) Leave(in ast.Node) (ast.Node, bool) {
return in, true
}
type Rewriter struct {
Database *Database
Dumper *Dumper
Diagnostics hcl.Diagnostics
Parser *parser.Parser
Wg *sync.WaitGroup
}
func (r *Rewriter) PrintStatus() {
}
func (r *Rewriter) Visit(wrapper dag.Vertexer) {
id, value := wrapper.Vertex()
table, ok := value.(*Table)
if !ok {
return
}
allDependenciesMet := true
ancestors, err := r.Database.DAG.GetAncestors(id)
if err != nil {
log.Fatalf(err.Error())
}
for _, ancestor := range ancestors {
dependency, ok := ancestor.(*Table)
if !ok {
return
}
allDependenciesMet = allDependenciesMet && dependency.Dumped
}
if !allDependenciesMet || table.Dumped {
return
}
r.Dumper.Reset()
r.Dumper.AddTables(r.Database.Name, table.Name)
r.Dumper.SetExtraOptions(r.Database.Config.Options.ExtraArgs)
r.Dumper.SetWhere(table.Where())
r.Dumper.SetDestinationDatabase(r.Database.Destination)
visitor := &RuleVisitor{Table: table}
readPipe, writePipe := io.Pipe()
go func() {
defer writePipe.Close()
r.Dumper.Dump(writePipe)
}()
scanner := bufio.NewScanner(readPipe)
for scanner.Scan() {
line := scanner.Text()
if len(line) > 5 && line[0:6] == "INSERT" {
stmtNode, err := r.Parser.ParseOneStmt(line, "", "")
if err != nil {
log.Fatal(err.Error())
}
stmtNode.Accept(visitor)
err = stmtNode.Restore(format.NewRestoreCtx(format.DefaultRestoreFlags, table.OutFile))
if err != nil {
log.Fatalf(err.Error())
}
table.OutFile.Write([]byte(";\n"))
} else {
table.OutFile.WriteString(fmt.Sprintf("%s\n", line))
}
}
if err := scanner.Err(); err != nil {
log.Fatal(err.Error())
}
table.OutFile.Seek(0, io.SeekStart)
io.Copy(os.Stdout, table.OutFile)
table.Dumped = true
return
}