Skip to content

Commit f7c6ac5

Browse files
author
chen.rui
committed
基本完成常用的几个功能
1 parent a6a37a1 commit f7c6ac5

25 files changed

+952
-299
lines changed

.idea/inspectionProfiles/Project_Default.xml

+10
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/client.go

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package client
2+
3+
import (
4+
"errors"
5+
"github.com/IBM/sarama"
6+
"kf/cluster"
7+
"log"
8+
"os"
9+
"sigs.k8s.io/yaml"
10+
"strings"
11+
)
12+
13+
// GetKafkaAdmin todo 后续使用channel来进行优化
14+
func GetKafkaAdmin() (sarama.ClusterAdmin, error) {
15+
// 获取当期的配置文件,创建client
16+
config, err := getCurrentClusterConfig()
17+
if err != nil {
18+
return nil, err
19+
}
20+
saramaConfig := sarama.NewConfig()
21+
admin, err := sarama.NewClusterAdmin(splitKafkaAddr(config.Addr), saramaConfig)
22+
if err != nil {
23+
return nil, err
24+
}
25+
return admin, nil
26+
}
27+
28+
// GetClient 获取client
29+
func GetClient() (sarama.Client, error) {
30+
// 获取当期的配置文件,创建client
31+
config, err := getCurrentClusterConfig()
32+
if err != nil {
33+
return nil, err
34+
}
35+
saramaConfig := sarama.NewConfig()
36+
consumer, err := sarama.NewClient(splitKafkaAddr(config.Addr), saramaConfig)
37+
if err != nil {
38+
log.Fatalf("Error creating consumer: %v", err)
39+
}
40+
return consumer, nil
41+
}
42+
43+
// 使用;来分割
44+
func splitKafkaAddr(addr string) []string {
45+
return strings.Split(addr, ";")
46+
}
47+
48+
func getCurrentClusterConfig() (*cluster.Cluster, error) {
49+
config := &cluster.Config{}
50+
file, err := os.ReadFile(cluster.KfFile)
51+
if err != nil {
52+
return nil, err
53+
}
54+
err = yaml.Unmarshal(file, config)
55+
if err != nil {
56+
return nil, err
57+
}
58+
if config.CurrentContext == "" || len(config.Clusters) == 0 {
59+
return nil, errors.New("no current cluster found,please add cluster context")
60+
}
61+
for _, _cluster := range config.Clusters {
62+
if config.CurrentContext == _cluster.Name {
63+
return _cluster, nil
64+
}
65+
}
66+
return nil, errors.New("cluster not found")
67+
}

client/interface.go

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package client
2+
3+
type kafka interface {
4+
getTopics()
5+
}

client/kafka_test.go

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package client
2+
3+
import (
4+
"fmt"
5+
"github.com/IBM/sarama"
6+
"testing"
7+
)
8+
9+
func TestKafkaClient(t *testing.T) {
10+
11+
saramaConfig := sarama.NewConfig()
12+
brokers := []string{"127.0.0.1:9092"}
13+
admin, err := sarama.NewClusterAdmin(brokers, saramaConfig)
14+
if err != nil {
15+
t.Fatal(err)
16+
}
17+
topics, err := admin.ListTopics()
18+
if err != nil {
19+
t.Fatal(err)
20+
}
21+
for topic := range topics {
22+
fmt.Println(topic)
23+
}
24+
}

cluster/add.go

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package cluster
2+
3+
import (
4+
"github.com/spf13/cobra"
5+
"os"
6+
"sigs.k8s.io/yaml"
7+
)
8+
9+
func NewAddCmd() *cobra.Command {
10+
// add
11+
addCmd := &cobra.Command{
12+
Use: "add ",
13+
Short: "add kafka cluster",
14+
RunE: func(cmd *cobra.Command, args []string) error {
15+
16+
var kfConfig *Config
17+
var config *Config
18+
var err error
19+
fileName := cmd.Flags().Lookup("fileName").Value.String()
20+
// judge kf file 是否存在
21+
if _, err = os.Stat(KfFile); err != nil {
22+
//如果不存在
23+
if os.IsNotExist(err) {
24+
config, err = parse2Struct(fileName)
25+
if err != nil {
26+
return err
27+
}
28+
kfConfig = config
29+
kfConfig.CurrentContext = config.Clusters[0].Name
30+
}
31+
} else { // 如果存在
32+
kfConfig, err = parse2Struct(KfFile)
33+
if err != nil {
34+
return err
35+
}
36+
config, err = parse2Struct(fileName)
37+
kfConfig.Clusters = append(kfConfig.Clusters, config.Clusters...)
38+
if kfConfig.CurrentContext == "" {
39+
kfConfig.CurrentContext = config.Clusters[0].Name
40+
}
41+
}
42+
yamlData, err := yaml.Marshal(kfConfig)
43+
if err != nil {
44+
return err
45+
}
46+
return os.WriteFile(KfFile, yamlData, 0644)
47+
},
48+
}
49+
var fileName string
50+
addCmd.Flags().StringVarP(&fileName, "fileName", "f", "localhost:9092", "specific cluster fileName")
51+
err := addCmd.MarkFlagRequired("fileName")
52+
if err != nil {
53+
panic(err)
54+
}
55+
return addCmd
56+
}

cluster/cluster.go

+121
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package cluster
2+
3+
import (
4+
"github.com/spf13/cobra"
5+
"os"
6+
"sigs.k8s.io/yaml"
7+
)
8+
9+
type Cluster struct {
10+
//ID string `json:"id,omitempty"`
11+
Name string `json:"name"`
12+
Addr string `json:"addr"`
13+
}
14+
15+
func NewClusterCmd() *cobra.Command {
16+
clusterCmd := &cobra.Command{
17+
Use: "cluster",
18+
Short: "kafka cluster",
19+
RunE: func(cmd *cobra.Command, args []string) error {
20+
return nil
21+
},
22+
}
23+
clusterCmd.AddCommand(NewAddCmd())
24+
clusterCmd.AddCommand(NewListCmd())
25+
clusterCmd.AddCommand(NewRemoveCmd())
26+
clusterCmd.AddCommand(NewSwitchCmd())
27+
clusterCmd.AddCommand(NewCurrentCmd())
28+
clusterCmd.AddCommand(NewRenameCmd())
29+
return clusterCmd
30+
}
31+
32+
func switchClusterFromName(fileName string, name string, config *Config) {
33+
// 将文件序列成对象
34+
file, err := os.ReadFile(fileName)
35+
if err != nil {
36+
panic(err)
37+
}
38+
err = yaml.Unmarshal(file, config)
39+
if err != nil {
40+
panic(err)
41+
}
42+
// 替换当前的上下文
43+
config.CurrentContext = name
44+
// 重新写入yaml
45+
yamlData, err := yaml.Marshal(config)
46+
err = os.WriteFile(fileName, yamlData, 0644)
47+
if err != nil {
48+
panic(err)
49+
}
50+
51+
}
52+
53+
func renameClusterFromName(oldName, newName string, config *Config) {
54+
55+
// 更新current-context
56+
if config.CurrentContext == oldName {
57+
config.CurrentContext = newName
58+
}
59+
60+
// 更新名称
61+
for _, cluster := range config.Clusters {
62+
if oldName == cluster.Name {
63+
cluster.Name = newName
64+
println(cluster.Name)
65+
}
66+
}
67+
// 回写
68+
yamlData, err := yaml.Marshal(config)
69+
err = os.WriteFile(KfFile, yamlData, 0644)
70+
if err != nil {
71+
panic(err)
72+
}
73+
74+
}
75+
76+
func removeClusterFromName(fileName, name string, config *Config) {
77+
// 将文件序列成对象
78+
file, err := os.ReadFile(fileName)
79+
if err != nil {
80+
panic(err)
81+
}
82+
err = yaml.Unmarshal(file, config)
83+
if err != nil {
84+
panic(err)
85+
}
86+
// 删除记录
87+
for i, cluster := range config.Clusters {
88+
if cluster.Name == name {
89+
config.Clusters = append(config.Clusters[:i], config.Clusters[i+1:]...)
90+
}
91+
}
92+
// 重置当前的上下文
93+
if name == config.CurrentContext {
94+
if len(config.Clusters) == 0 {
95+
config.CurrentContext = ""
96+
} else {
97+
config.CurrentContext = config.Clusters[0].Name
98+
}
99+
}
100+
101+
// 重新写入yaml
102+
yamlData, err := yaml.Marshal(config)
103+
err = os.WriteFile(fileName, yamlData, 0644)
104+
if err != nil {
105+
panic(err)
106+
}
107+
108+
}
109+
110+
func parse2Struct(fileName string) (*Config, error) {
111+
config := &Config{}
112+
file, err := os.ReadFile(fileName)
113+
if err != nil {
114+
return nil, err
115+
}
116+
err = yaml.Unmarshal(file, config)
117+
if err != nil {
118+
return nil, err
119+
}
120+
return config, nil
121+
}

cluster/config.go

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package cluster
2+
3+
import (
4+
"os"
5+
)
6+
7+
var KfFile = ""
8+
9+
func init() {
10+
wokDir := os.Getenv("WOK_DIR")
11+
if len(wokDir) == 0 {
12+
dir, _ := os.Getwd()
13+
wokDir = dir + "/kf.yaml"
14+
}
15+
KfFile = wokDir
16+
}
17+
18+
type Config struct {
19+
ApiVersion string `json:"apiVersion"`
20+
Kind string `json:"kind"`
21+
// 如果是值类型,无法修改cluster中的值,如果需要修改,需要定义成指针类型
22+
Clusters []*Cluster `json:"clusters"`
23+
CurrentContext string `json:"current-context"`
24+
}

cluster/current.go

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package cluster
2+
3+
import (
4+
"fmt"
5+
"github.com/spf13/cobra"
6+
)
7+
8+
func NewCurrentCmd() *cobra.Command {
9+
currentCmd := &cobra.Command{
10+
Use: "current ",
11+
Short: "current kafka cluster",
12+
Aliases: []string{"c"},
13+
RunE: func(cmd *cobra.Command, args []string) error {
14+
config, err := parse2Struct(KfFile)
15+
for _, currentCluster := range config.Clusters {
16+
if currentCluster.Name == config.CurrentContext {
17+
printCurrentClusterInfo(*currentCluster)
18+
}
19+
}
20+
return err
21+
},
22+
}
23+
return currentCmd
24+
}
25+
26+
func printCurrentClusterInfo(currentCluster Cluster) {
27+
fmt.Println(fmt.Sprintf("name:%s", currentCluster.Name))
28+
fmt.Println(fmt.Sprintf("addr:%s", currentCluster.Addr))
29+
}

cluster/list.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package cluster
2+
3+
import (
4+
"fmt"
5+
"github.com/spf13/cobra"
6+
"os"
7+
)
8+
9+
func NewListCmd() *cobra.Command {
10+
listCmd := &cobra.Command{
11+
Use: "list ",
12+
Short: "list kafka cluster",
13+
Aliases: []string{"ls"},
14+
RunE: func(cmd *cobra.Command, args []string) error {
15+
if _, err := os.Stat(KfFile); err != nil {
16+
//如果不存在
17+
if os.IsNotExist(err) {
18+
fmt.Println("kafka cluster does not exist")
19+
return nil
20+
}
21+
}
22+
config, err := parse2Struct(KfFile)
23+
for _, currentCluster := range config.Clusters {
24+
fmt.Println(currentCluster.Name)
25+
}
26+
return err
27+
},
28+
}
29+
return listCmd
30+
}

0 commit comments

Comments
 (0)