Skip to content

Commit e555e2b

Browse files
committed
fix
1 parent f5c20c6 commit e555e2b

18 files changed

+93
-93
lines changed

Diff for: agent/agent.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func NewAgent(masterAddress string, address, name string) *Agent {
4343
Name: name,
4444
Topology: NewTopology(),
4545
Tasks: NewTaskMap(),
46-
MaxExecutorNumber: Config.Conf.Runtime.MaxExecutorNumber,
46+
MaxExecutorNumber: config.Conf.Runtime.MaxExecutorNumber,
4747
IsStatusChanged: true,
4848
}
4949
return res
@@ -81,7 +81,7 @@ func (self *Agent) LanchExecutor(ename string) error {
8181
"--name",
8282
ename,
8383
"--config",
84-
fmt.Sprintf("%v", Config.Conf.File),
84+
fmt.Sprintf("%v", config.Conf.File),
8585
)
8686
command.Stdout = os.Stdout
8787
command.Stdin = os.Stdin
@@ -100,7 +100,7 @@ func (self *Agent) Duplicate(ctx context.Context, em *pb.Empty) (*pb.Empty, erro
100100
"--address",
101101
fmt.Sprintf("%v", strings.Split(self.Address, ":")[0]+":0"),
102102
"--config",
103-
fmt.Sprintf("%v", Config.Conf.File),
103+
fmt.Sprintf("%v", config.Conf.File),
104104
)
105105

106106
command.Stdout = os.Stdout
@@ -134,7 +134,7 @@ func RunAgent(masterAddress string, address, name string) {
134134
}
135135
defer listener.Close()
136136
agentServer.Address = listener.Addr().String()
137-
Logger.Infof("Agent: %v", agentServer.Address)
137+
logger.Infof("Agent: %v", agentServer.Address)
138138

139139
go agentServer.Heartbeat()
140140

Diff for: agent/heartbeat.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,20 @@ func (self *Agent) SendHeartbeat(stream pb.GueryAgent_SendHeartbeatServer) error
1818
if err == nil {
1919
if location == nil {
2020
location = hb.Location
21-
Logger.Infof("Add executor %v", location)
21+
logger.Infof("Add executor %v", location)
2222
}
2323

2424
} else {
2525
if location != nil {
2626
self.Topology.DropExecutorInfo(location)
27-
Logger.Infof("Lost executor %v: %v", location, err)
27+
logger.Infof("Lost executor %v: %v", location, err)
2828
}
2929
if err == io.EOF {
30-
Logger.Infof("Lost executor %v: %v", location, err)
30+
logger.Infof("Lost executor %v: %v", location, err)
3131
return nil
3232
}
3333
if err != nil {
34-
Logger.Infof("Lost executor %v: %v", location, err)
34+
logger.Infof("Lost executor %v: %v", location, err)
3535
return err
3636
}
3737
}
@@ -52,7 +52,7 @@ func (self *Agent) Heartbeat() {
5252
func (self *Agent) DoHeartbeat() error {
5353
grpcConn, err := grpc.Dial(self.MasterAddress, grpc.WithInsecure())
5454
if err != nil {
55-
Logger.Errorf("DoHeartBeat failed: %v", err)
55+
logger.Errorf("DoHeartBeat failed: %v", err)
5656
return err
5757
}
5858
defer grpcConn.Close()
@@ -85,7 +85,7 @@ func (self *Agent) DoHeartbeat() error {
8585
func (self *Agent) SendOneHeartbeat(stream pb.GueryMaster_SendHeartbeatClient) error {
8686
hb := self.GetInfo()
8787
if err := stream.Send(hb); err != nil {
88-
Logger.Errorf("failed to SendOneHeartbeat: %v, %v", err, hb)
88+
logger.Errorf("failed to SendOneHeartbeat: %v, %v", err, hb)
8989
return err
9090
}
9191
return nil

Diff for: agent/info.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ func (self *Agent) GetInfo() *pb.AgentHeartbeat {
1616
res := &pb.AgentHeartbeat{
1717
Location: &pb.Location{
1818
Name: self.Name,
19-
Address: Util.GetHostFromAddress(self.Address),
20-
Port: Util.GetPortFromAddress(self.Address),
19+
Address: util.GetHostFromAddress(self.Address),
20+
Port: util.GetPortFromAddress(self.Address),
2121
},
2222
TotalMemory: int64(m.Total),
2323
FreeMemory: int64(m.Free),

Diff for: filereader/csv/csv_file_reader.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -18,38 +18,38 @@ const (
1818

1919
type CsvFileReader struct {
2020
Closer io.Closer
21-
Metadata *Metadata.Metadata
21+
Metadata *metadata.Metadata
2222
Reader *csv.Reader
2323

2424
Indexes []int
25-
OutMetadata *Metadata.Metadata
25+
OutMetadata *metadata.Metadata
2626
}
2727

2828
func New(reader io.Reader, md *metadata.Metadata) *CsvFileReader {
2929
return &CsvFileReader{
3030
Metadata: md,
3131
Reader: csv.NewReader(reader),
32-
Closer: io.Closer(reader.(FileSystem.VirtualFile)),
32+
Closer: io.Closer(reader.(filesystem.VirtualFile)),
3333
}
3434
}
3535

3636
func (self *CsvFileReader) TypeConvert(rg *row.RowsGroup) (*row.RowsGroup, error) {
3737
jobs := make(chan int)
3838
done := make(chan bool)
3939
cn := len(self.Indexes)
40-
colTypes := make([]Type.Type, cn)
40+
colTypes := make([]gtype.Type, cn)
4141
for i := 0; i < cn; i++ {
4242
colTypes[i], _ = self.Metadata.GetTypeByIndex(self.Indexes[i])
4343
}
4444

45-
for i := 0; i < int(Config.Conf.Runtime.ParallelNumber); i++ {
45+
for i := 0; i < int(config.Conf.Runtime.ParallelNumber); i++ {
4646
go func() {
4747
for {
4848
j, ok := <-jobs
4949
if ok {
5050
for k := 0; k < cn; k++ {
5151
v := rg.Vals[k][j]
52-
cv := Type.ToType(v, colTypes[k])
52+
cv := gtype.ToType(v, colTypes[k])
5353
rg.Vals[k][j] = cv
5454
}
5555
} else {
@@ -103,7 +103,7 @@ func (self *CsvFileReader) Read(indexes []int) (*row.RowsGroup, error) {
103103
return nil, err
104104
}
105105

106-
rg := Row.NewRowsGroup(self.OutMetadata)
106+
rg := row.NewRowsGroup(self.OutMetadata)
107107
for i := 0; i < READ_ROWS_NUMBER; i++ {
108108
if record, err = self.Reader.Read(); err != nil {
109109
break

Diff for: filereader/file_reader.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func NewReader(file *filesystem.FileLocation, md *metadata.Metadata) (FileReader
1919

2020
switch file.FileType {
2121
case filesystem.CSV:
22-
vf, err := FileSystem.Open(file.Location)
22+
vf, err := filesystem.Open(file.Location)
2323
if err != nil {
2424
return nil, err
2525
}

Diff for: filereader/orc/orc_file_reader.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,22 @@ type OrcFileReader struct {
1919
Closer io.Closer
2020
Reader *orc.Reader
2121
Cursor *orc.Cursor
22-
Metadata *Metadata.Metadata
23-
OutMetadata *Metadata.Metadata
22+
Metadata *metadata.Metadata
23+
OutMetadata *metadata.Metadata
2424
ReadColumnNames []string
2525
ReadColumnTypes []proto.Type_Kind
2626
ReadColumnIndexes []int
2727
}
2828

29-
func New(reader orc.SizedReaderAt, md *Metadata.Metadata) (*OrcFileReader, error) {
29+
func New(reader orc.SizedReaderAt, md *metadata.Metadata) (*OrcFileReader, error) {
3030
t, err := orc.NewReader(reader)
3131
if err != nil {
3232
return nil, err
3333
}
3434
return &OrcFileReader{
3535
Reader: t,
3636
Metadata: md,
37-
Closer: io.Closer(reader.(FileSystem.VirtualFile)),
37+
Closer: io.Closer(reader.(filesystem.VirtualFile)),
3838
}, nil
3939
}
4040

@@ -63,7 +63,7 @@ func (self *OrcFileReader) Read(indexes []int) (*row.RowsGroup, error) {
6363
self.Cursor = self.Reader.Select(self.ReadColumnNames...)
6464
}
6565

66-
rg := Row.NewRowsGroup(self.OutMetadata)
66+
rg := row.NewRowsGroup(self.OutMetadata)
6767
for i := 0; i < READ_ROWS_NUMBER; i++ {
6868
if self.Cursor.Next() || self.Cursor.Stripes() && self.Cursor.Next() {
6969
if err = self.Cursor.Err(); err != nil {

Diff for: filereader/orc/orc_type.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func OrcTypeToGueryType(src interface{}, oT proto.Type_Kind) interface{} {
2727
case proto.Type_BINARY:
2828
return string(src.([]byte))
2929
case proto.Type_TIMESTAMP:
30-
return Type.Timestamp{Sec: src.(time.Time).Unix()}
30+
return gtype.Timestamp{Sec: src.(time.Time).Unix()}
3131
case proto.Type_LIST:
3232
return nil
3333
case proto.Type_MAP:
@@ -39,7 +39,7 @@ func OrcTypeToGueryType(src interface{}, oT proto.Type_Kind) interface{} {
3939
case proto.Type_DECIMAL:
4040
return nil
4141
case proto.Type_DATE:
42-
return Type.Date{Sec: src.(orc.Date).Unix()}
42+
return gtype.Date{Sec: src.(orc.Date).Unix()}
4343
}
4444
return nil
4545
}

Diff for: filereader/parquet/parquet_file_reader.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ type Pair struct {
2424

2525
type PqFile struct {
2626
FileName string
27-
VF FileSystem.VirtualFile
27+
VF filesystem.VirtualFile
2828
}
2929

3030
func (self *PqFile) Create(name string) (ParquetFile, error) {
@@ -35,7 +35,7 @@ func (self *PqFile) Open(name string) (ParquetFile, error) {
3535
if name == "" {
3636
name = self.FileName
3737
}
38-
vf, err := FileSystem.Open(name)
38+
vf, err := filesystem.Open(name)
3939
if err != nil {
4040
return nil, err
4141
}
@@ -68,21 +68,21 @@ func (self *PqFile) Close() error { return nil }
6868
type ParquetFileReader struct {
6969
ParquetFile ParquetFile
7070
pqReader *ParquetReader
71-
Metadata *Metadata.Metadata
71+
Metadata *metadata.Metadata
7272
NumRows int
7373
Cursor int
7474

7575
ReadColumnIndexes []int
7676
ReadColumnTypes []*parquet.Type
7777
ReadColumnConvertedTypes []*parquet.ConvertedType
78-
OutMetadata *Metadata.Metadata
78+
OutMetadata *metadata.Metadata
7979
}
8080

8181
func New(fileName string, md *metadata.Metadata) *ParquetFileReader {
8282
parquetFileReader := new(ParquetFileReader)
8383
var pqFile ParquetFile = &PqFile{}
8484
pqFile, _ = pqFile.Open(fileName)
85-
parquetFileReader.pqReader, _ = NewParquetColumnReader(pqFile, int64(Config.Conf.Runtime.ParallelNumber))
85+
parquetFileReader.pqReader, _ = NewParquetColumnReader(pqFile, int64(config.Conf.Runtime.ParallelNumber))
8686
parquetFileReader.NumRows = int(parquetFileReader.pqReader.GetNumRows())
8787
parquetFileReader.Metadata = md
8888
parquetFileReader.ParquetFile = pqFile
@@ -137,13 +137,13 @@ func (self *ParquetFileReader) Read(indexes []int) (*row.RowsGroup, error) {
137137
}
138138

139139
var err error
140-
rg := Row.NewRowsGroup(self.OutMetadata)
140+
rg := row.NewRowsGroup(self.OutMetadata)
141141
readRowsNumber := 0
142142

143143
jobs := make(chan Pair)
144144
var wg sync.WaitGroup
145145

146-
for i := 0; i < int(Config.Conf.Runtime.ParallelNumber); i++ {
146+
for i := 0; i < int(config.Conf.Runtime.ParallelNumber); i++ {
147147
wg.Add(1)
148148
go func() {
149149
defer func() {

Diff for: filereader/parquet/parquet_type.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ import (
55
"github.com/xitongsys/parquet-go/parquet"
66
)
77

8-
func ParquetTypeToGueryType(src interface{}, pT *parquet.Type, cT *parquet.ConvertedType, gt Type.Type) interface{} {
8+
func ParquetTypeToGueryType(src interface{}, pT *parquet.Type, cT *parquet.ConvertedType, gt gtype.Type) interface{} {
99
if src == nil {
1010
return nil
1111
}
12-
if gt == Type.TIMESTAMP {
12+
if gt == gtype.TIMESTAMP {
1313
if *pT == parquet.Type_INT96 {
1414
s := src.(string)
1515
//first 8 byte is a int64 value for nanoseconds of the day

Diff for: filesystem/partition/partition.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ import (
77
)
88

99
type Partition struct {
10-
Type Type.Type
10+
Type gtype.Type
1111
Vals []interface{}
1212
Buffer []byte
1313
}
1414

15-
func NewPartition(t Type.Type) *Partition {
15+
func NewPartition(t gtype.Type) *Partition {
1616
return &Partition{
1717
Type: t,
1818
Vals: []interface{}{},
@@ -21,12 +21,12 @@ func NewPartition(t Type.Type) *Partition {
2121
}
2222

2323
func (self *Partition) Encode() {
24-
self.Buffer = Type.EncodeValues(self.Vals, self.Type)
24+
self.Buffer = gtype.EncodeValues(self.Vals, self.Type)
2525
}
2626

2727
func (self *Partition) Decode() (err error) {
2828
reader := bytes.NewReader(self.Buffer)
29-
self.Vals, err = Type.DecodeValue(reader, self.Type)
29+
self.Vals, err = gtype.DecodeValue(reader, self.Type)
3030
return err
3131
}
3232

Diff for: filesystem/partition/partition_info.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type PartitionInfo struct {
1717
FileList []*filesystem.FileLocation
1818
}
1919

20-
func NewPartitionInfo(md *Metadata.Metadata) *PartitionInfo {
20+
func NewPartitionInfo(md *metadata.Metadata) *PartitionInfo {
2121
res := &PartitionInfo{
2222
Metadata: md,
2323
Locations: []string{},
@@ -46,20 +46,20 @@ func (self *PartitionInfo) GetPartitionNum() int {
4646
}
4747

4848
func (self *PartitionInfo) GetPartitionRowGroup(i int) *row.RowsGroup {
49-
row := self.GetPartitionRow(i)
50-
if row == nil {
49+
r := self.GetPartitionRow(i)
50+
if r == nil {
5151
return nil
5252
}
53-
rb := Row.NewRowsGroup(self.Metadata)
54-
rb.Write(row)
53+
rb := row.NewRowsGroup(self.Metadata)
54+
rb.Write(r)
5555
return rb
5656
}
5757

5858
func (self *PartitionInfo) GetPartitionRow(i int) *row.Row {
5959
if i >= self.GetPartitionNum() {
6060
return nil
6161
}
62-
row := new(Row.Row)
62+
row := new(row.Row)
6363
for j := 0; j < len(self.Partitions); j++ {
6464
row.AppendVals(self.Partitions[j].Vals[i])
6565
}
@@ -68,7 +68,7 @@ func (self *PartitionInfo) GetPartitionRow(i int) *row.Row {
6868

6969
func (self *PartitionInfo) GetPartitionFiles(i int) []*filesystem.FileLocation {
7070
if i >= len(self.FileLists) {
71-
return []*FileSystem.FileLocation{}
71+
return []*filesystem.FileLocation{}
7272
}
7373
return self.FileLists[i]
7474
}
@@ -86,7 +86,7 @@ func (self *PartitionInfo) GetLocation(i int) string {
8686

8787
func (self *PartitionInfo) GetFileType(i int) filesystem.FileType {
8888
if i >= len(self.FileTypes) {
89-
return FileSystem.UNKNOWNFILETYPE
89+
return filesystem.UNKNOWNFILETYPE
9090
}
9191
return self.FileTypes[i]
9292
}

Diff for: filesystem/s3.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (self *S3FileSystem) Accept(fl *FileLocation) bool {
4141

4242
func (self *S3FileSystem) Open(fl *FileLocation) (VirtualFile, error) {
4343
var S3Conf = &aws.Config{
44-
Region: aws.String(Config.Conf.Runtime.S3Region),
44+
Region: aws.String(config.Conf.Runtime.S3Region),
4545
}
4646
svc := s3.New(session.New(), S3Conf)
4747
_, bucket, key, err := SplitS3URL(fl.Location)
@@ -73,7 +73,7 @@ func (self *S3FileSystem) Open(fl *FileLocation) (VirtualFile, error) {
7373
func (self *S3FileSystem) List(fl *FileLocation) (fileLocations []*FileLocation, err error) {
7474
res := []*FileLocation{}
7575
var S3Conf = &aws.Config{
76-
Region: aws.String(Config.Conf.Runtime.S3Region),
76+
Region: aws.String(config.Conf.Runtime.S3Region),
7777
}
7878
svc := s3.New(session.New(), S3Conf)
7979
prefix, bucket, key, err := SplitS3URL(fl.Location)

0 commit comments

Comments
 (0)