Skip to content

Commit ca41b20

Browse files
author
zhangming03
committed
postgresql-init
1 parent e0de6dc commit ca41b20

37 files changed

+2659
-1723
lines changed

store/postgresql/admin.go

Lines changed: 48 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,19 @@
1-
/**
2-
* Tencent is pleased to support the open source community by making Polaris available.
3-
*
4-
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
5-
*
6-
* Licensed under the BSD 3-Clause License (the "License");
7-
* you may not use this file except in compliance with the License.
8-
* You may obtain a copy of the License at
9-
*
10-
* https://opensource.org/licenses/BSD-3-Clause
11-
*
12-
* Unless required by applicable law or agreed to in writing, software distributed
13-
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14-
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15-
* specific language governing permissions and limitations under the License.
16-
*/
17-
181
package postgresql
192

203
import (
214
"context"
225
"database/sql"
236
"fmt"
24-
"sync"
25-
"sync/atomic"
26-
"time"
27-
28-
_ "github.com/lib/pq"
297
"github.com/polarismesh/polaris/common/eventhub"
308
"github.com/polarismesh/polaris/common/log"
319
"github.com/polarismesh/polaris/common/model"
3210
"github.com/polarismesh/polaris/common/utils"
3311
"github.com/polarismesh/polaris/store"
12+
"sync"
13+
"sync/atomic"
14+
"time"
15+
16+
_ "github.com/lib/pq"
3417
)
3518

3619
const (
@@ -73,13 +56,16 @@ type leaderElectionStore struct {
7356
}
7457

7558
// CreateLeaderElection insert election key into leader table
76-
func (l leaderElectionStore) CreateLeaderElection(key string) error {
59+
func (l *leaderElectionStore) CreateLeaderElection(key string) error {
7760
log.Debugf("[Store][database] create leader election (%s)", key)
7861

7962
return l.master.processWithTransaction("createLeaderElection", func(tx *BaseTx) error {
80-
mainStr := "insert ignore into leader_election (elect_key, leader) values (?, ?)"
63+
stmt, err := tx.Prepare("INSERT INTO leader_election(elect_key,leader) SELECT $1,$2 WHERE NOT EXISTS (SELECT 1 FROM leader_election WHERE elect_key=$3 AND leader=$4)")
64+
if err != nil {
65+
return err
66+
}
8167

82-
_, err := tx.Exec(mainStr, key, "")
68+
_, err = stmt.Exec(key, "", key, "")
8369
if err != nil {
8470
log.Errorf("[Store][database] create leader election (%s), err: %s", key, err.Error())
8571
}
@@ -94,10 +80,10 @@ func (l leaderElectionStore) CreateLeaderElection(key string) error {
9480
}
9581

9682
// GetVersion get the version from election
97-
func (l leaderElectionStore) GetVersion(key string) (int64, error) {
83+
func (l *leaderElectionStore) GetVersion(key string) (int64, error) {
9884
log.Debugf("[Store][database] get version (%s)", key)
9985

100-
mainStr := "select version from leader_election where elect_key = ?"
86+
mainStr := "select version from leader_election where elect_key = $1"
10187

10288
var count int64
10389
err := l.master.DB.QueryRow(mainStr, key).Scan(&count)
@@ -109,13 +95,17 @@ func (l leaderElectionStore) GetVersion(key string) (int64, error) {
10995
}
11096

11197
// CompareAndSwapVersion compare key version and update
112-
func (l leaderElectionStore) CompareAndSwapVersion(key string, curVersion int64, newVersion int64, leader string) (bool, error) {
98+
func (l *leaderElectionStore) CompareAndSwapVersion(key string, curVersion int64, newVersion int64, leader string) (bool, error) {
11399
var rows int64
114100

115101
err := l.master.processWithTransaction("compareAndSwapVersion", func(tx *BaseTx) error {
116102
log.Debugf("[Store][database] compare and swap version (%s, %d, %d, %s)", key, curVersion, newVersion, leader)
117-
mainStr := "update leader_election set leader = ?, version = ? where elect_key = ? and version = ?"
118-
result, err := tx.Exec(mainStr, leader, newVersion, key, curVersion)
103+
104+
stmt, err := tx.Prepare("update leader_election set leader = $1, version = $2 where elect_key = $3 and version = $4")
105+
if err != nil {
106+
return store.Error(err)
107+
}
108+
result, err := stmt.Exec(leader, newVersion, key, curVersion)
119109
if err != nil {
120110
log.Errorf("[Store][database] compare and swap version (%s), err: %s", key, err.Error())
121111
return store.Error(err)
@@ -140,28 +130,30 @@ func (l leaderElectionStore) CompareAndSwapVersion(key string, curVersion int64,
140130
}
141131

142132
// CheckMtimeExpired check last modify time expired
143-
func (l leaderElectionStore) CheckMtimeExpired(key string, leaseTime int32) (string, bool, error) {
133+
func (l *leaderElectionStore) CheckMtimeExpired(key string, leaseTime int32) (string, bool, error) {
144134
log.Debugf("[Store][database] check mtime expired (%s, %d)", key, leaseTime)
145135

146-
mainStr := "select leader, FROM_UNIXTIME(UNIX_TIMESTAMP(SYSDATE())) - mtime from leader_election where elect_key = ?"
136+
mainStr := "select leader, mtime from leader_election where elect_key = $1"
147137

148138
var (
149-
leader string
150-
diffTime int32
139+
leader string
140+
mtime time.Time
151141
)
152142

153-
err := l.master.DB.QueryRow(mainStr, key).Scan(&leader, &diffTime)
143+
err := l.master.DB.QueryRow(mainStr, key).Scan(&leader, &mtime)
154144
if err != nil {
155145
log.Errorf("[Store][database] check mtime expired (%s), err: %s", key, err.Error())
156146
}
157147

158-
return leader, diffTime > leaseTime, store.Error(err)
148+
diffTime := CurrDiffTimeSecond(mtime)
149+
150+
return leader, int32(diffTime) > leaseTime, store.Error(err)
159151
}
160152

161-
func (l leaderElectionStore) ListLeaderElections() ([]*model.LeaderElection, error) {
153+
func (l *leaderElectionStore) ListLeaderElections() ([]*model.LeaderElection, error) {
162154
log.Info("[Store][database] list leader election")
163155

164-
mainStr := "select elect_key, leader, UNIX_TIMESTAMP(ctime), UNIX_TIMESTAMP(mtime) from leader_election"
156+
mainStr := "select elect_key, leader, ctime, mtime from leader_election"
165157
rows, err := l.master.Query(mainStr)
166158
if err != nil {
167159
log.Errorf("[Store][database] list leader election query err: %s", err.Error())
@@ -251,7 +243,7 @@ func (le *leaderElectionStateMachine) mainLoop() {
251243

252244
// tick 定时校验主健康状况
253245
func (le *leaderElectionStateMachine) tick() {
254-
// 校验校验次数
246+
// 校验次数
255247
if le.checkReleaseTickLimit() {
256248
log.Infof("[Store][database] abandon leader election in this tick (%s)", le.electKey)
257249
return
@@ -471,9 +463,13 @@ func (m *adminStore) BatchCleanDeletedInstances(timeout time.Duration, batchSize
471463

472464
var rows int64
473465
err := m.master.processWithTransaction("batchCleanDeletedInstances", func(tx *BaseTx) error {
474-
mainStr := "delete from instance where flag = 1 and " +
475-
"mtime <= FROM_UNIXTIME(UNIX_TIMESTAMP(SYSDATE()) - ?) limit ?"
476-
result, err := tx.Exec(mainStr, int32(timeout.Seconds()), batchSize)
466+
stmt, err := tx.Prepare("delete from instance where id in (select id from instance where flag = 1 and mtime <= $1 limit $2)")
467+
if err != nil {
468+
return store.Error(err)
469+
}
470+
471+
diffTime := GetCurrentSsTimestamp() - int64(timeout.Seconds())
472+
result, err := stmt.Exec(UnixSecondToTime(diffTime), batchSize)
477473
if err != nil {
478474
log.Errorf("[Store][database] batch clean soft deleted instances(%d), err: %s", batchSize, err.Error())
479475
return store.Error(err)
@@ -502,9 +498,9 @@ func (m *adminStore) BatchCleanDeletedInstances(timeout time.Duration, batchSize
502498
func (m *adminStore) GetUnHealthyInstances(timeout time.Duration, limit uint32) ([]string, error) {
503499
log.Infof("[Store][database] get unhealthy instances which mtime timeout %s (%d)", timeout, limit)
504500

505-
queryStr := "select id from instance where flag=0 and enable_health_check=1 and health_status=0 " +
506-
"and mtime < FROM_UNIXTIME(UNIX_TIMESTAMP(SYSDATE()) - ?) limit ?"
507-
rows, err := m.master.Query(queryStr, int32(timeout.Seconds()), limit)
501+
diffTime := GetCurrentSsTimestamp() - int64(timeout.Seconds())
502+
queryStr := "select id from instance where flag=0 and enable_health_check=1 and health_status=0 and mtime < $1 limit $2"
503+
rows, err := m.master.Query(queryStr, UnixSecondToTime(diffTime), limit)
508504
if err != nil {
509505
log.Errorf("[Store][database] get unhealthy instances, err: %s", err.Error())
510506
return nil, store.Error(err)
@@ -535,9 +531,13 @@ func (m *adminStore) BatchCleanDeletedClients(timeout time.Duration, batchSize u
535531

536532
var rows int64
537533
err := m.master.processWithTransaction("batchCleanDeletedClients", func(tx *BaseTx) error {
538-
mainStr := "delete from client where flag = 1 and " +
539-
"mtime <= FROM_UNIXTIME(UNIX_TIMESTAMP(SYSDATE()) - ?) limit ?"
540-
result, err := tx.Exec(mainStr, int32(timeout.Seconds()), batchSize)
534+
stmt, err := tx.Prepare("delete from client where id in (select id from client where flag = 1 and mtime <= $1 limit $2)")
535+
if err != nil {
536+
return store.Error(err)
537+
}
538+
539+
diffTime := GetCurrentSsTimestamp() - int64(timeout.Seconds())
540+
result, err := stmt.Exec(UnixSecondToTime(diffTime), batchSize)
541541
if err != nil {
542542
log.Errorf("[Store][database] batch clean soft deleted clients(%d), err: %s", batchSize, err.Error())
543543
return store.Error(err)

store/postgresql/admin_test.go

Lines changed: 42 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,54 @@
1-
/**
2-
* Tencent is pleased to support the open source community by making Polaris available.
3-
*
4-
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
5-
*
6-
* Licensed under the BSD 3-Clause License (the "License");
7-
* you may not use this file except in compliance with the License.
8-
* You may obtain a copy of the License at
9-
*
10-
* https://opensource.org/licenses/BSD-3-Clause
11-
*
12-
* Unless required by applicable law or agreed to in writing, software distributed
13-
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14-
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15-
* specific language governing permissions and limitations under the License.
16-
*/
17-
181
package postgresql
192

203
import (
21-
"database/sql"
224
"fmt"
235
"testing"
24-
25-
_ "github.com/lib/pq"
6+
"time"
267
)
278

28-
func TestDemo(t *testing.T) {
29-
// 创建连接
30-
db, err := sql.Open("postgres", fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", "192.168.31.19", 5432, "postgres", "aaaaaa", "postgres"))
31-
//db, err := sql.Open("postgres", fmt.Sprintf("%s:%s@tcp(%s)/%s", "postgres", "aaaaaa", "192.168.31.19:5432", "postgres"))
32-
if err != nil {
33-
fmt.Printf("sql.Open, err: %+v\n", err)
34-
return
35-
}
36-
defer db.Close()
9+
func TestCreateLeaderElection(t *testing.T) {
10+
obj := initConf()
3711

38-
if pingErr := db.Ping(); pingErr != nil {
39-
fmt.Printf("ping.err: %+v\n", pingErr)
40-
return
12+
for i := 0; i < 2; i++ {
13+
//go func() {
14+
key := fmt.Sprintf("test%d", i)
15+
err := obj.adminStore.StartLeaderElection(key)
16+
fmt.Printf("err: %+v\n", err)
17+
//}()
4118
}
19+
}
4220

43-
rows := db.QueryRow("SELECT id, user_name FROM demo;")
44-
var id *sql.NullInt16
45-
var userName *sql.NullString
46-
err = rows.Scan(&id, &userName)
47-
if err != nil {
48-
fmt.Printf("Scan, err: %+v\n", err)
49-
return
50-
}
51-
if id.Int16 == 0 || userName.String == "" {
52-
fmt.Printf("Scan.data.null\n")
53-
return
54-
}
21+
func TestCheckMtimeExpired(t *testing.T) {
22+
obj := initConf()
23+
24+
key := fmt.Sprintf("test%d", 1)
25+
err := obj.adminStore.StartLeaderElection(key)
26+
fmt.Printf("err: %+v\n", err)
27+
28+
select {}
29+
}
30+
31+
func TestBatchCleanDeletedInstances(t *testing.T) {
32+
obj := initConf()
33+
34+
resp, err := obj.adminStore.BatchCleanDeletedInstances(10*time.Minute, 5)
35+
fmt.Printf("resp: %+v, err: %+v\n", resp, err)
36+
37+
select {}
38+
}
39+
40+
func TestGetUnHealthyInstances(t *testing.T) {
41+
obj := initConf()
42+
43+
resp, err := obj.adminStore.GetUnHealthyInstances(10*time.Minute, 5)
44+
fmt.Printf("resp: %+v, err: %+v\n", resp, err)
45+
46+
select {}
47+
}
48+
49+
func TestBatchCleanDeletedClients(t *testing.T) {
50+
obj := initConf()
5551

56-
fmt.Println("id:", id.Int16)
57-
fmt.Println("user_name:", userName.String)
52+
resp, err := obj.adminStore.BatchCleanDeletedClients(10*time.Minute, 5)
53+
fmt.Printf("resp: %+v, err: %+v\n", resp, err)
5854
}

store/postgresql/base_db.go

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,15 @@
1-
/**
2-
* Tencent is pleased to support the open source community by making Polaris available.
3-
*
4-
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
5-
*
6-
* Licensed under the BSD 3-Clause License (the "License");
7-
* you may not use this file except in compliance with the License.
8-
* You may obtain a copy of the License at
9-
*
10-
* https://opensource.org/licenses/BSD-3-Clause
11-
*
12-
* Unless required by applicable law or agreed to in writing, software distributed
13-
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14-
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15-
* specific language governing permissions and limitations under the License.
16-
*/
17-
181
package postgresql
192

203
import (
214
"context"
225
"database/sql"
236
"fmt"
7+
"github.com/polarismesh/polaris/common/log"
8+
"github.com/polarismesh/polaris/plugin"
249
"strings"
2510
"time"
2611

2712
_ "github.com/lib/pq"
28-
"github.com/polarismesh/polaris/common/log"
29-
"github.com/polarismesh/polaris/plugin"
3013
)
3114

3215
// db抛出的异常,需要重试的字符串组

store/postgresql/base_db_test.go

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,11 @@
1-
/**
2-
* Tencent is pleased to support the open source community by making Polaris available.
3-
*
4-
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
5-
*
6-
* Licensed under the BSD 3-Clause License (the "License");
7-
* you may not use this file except in compliance with the License.
8-
* You may obtain a copy of the License at
9-
*
10-
* https://opensource.org/licenses/BSD-3-Clause
11-
*
12-
* Unless required by applicable law or agreed to in writing, software distributed
13-
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
14-
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
15-
* specific language governing permissions and limitations under the License.
16-
*/
17-
181
package postgresql
192

203
import (
214
"errors"
225
"fmt"
6+
. "github.com/smartystreets/goconvey/convey"
237
"testing"
248
"time"
25-
26-
. "github.com/smartystreets/goconvey/convey"
279
)
2810

2911
func TestRetry(t *testing.T) {

0 commit comments

Comments
 (0)