Skip to content

Commit 97d134c

Browse files
author
Mikhail Podtserkovskiy
committed
QA-7710: change service address to pod ip in kubernetes strategy
1 parent 399bdcb commit 97d134c

File tree

3 files changed

+69
-34
lines changed

3 files changed

+69
-34
lines changed

pool/strategy/kubernetes/provider.go

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@ import (
1010
"strconv"
1111
"time"
1212
"strings"
13+
"fmt"
1314
)
1415

1516
type kubernetesProviderInterface interface {
16-
Create(podName string, nodeParams nodeParams) error
17+
Create(podName string, nodeParams nodeParams) (nodeAddress string, err error)
1718
// idempotent operation
1819
Destroy(podName string) error
1920
}
@@ -24,7 +25,7 @@ type kubDnsProvider struct {
2425
clientFactory jsonwire.ClientFactoryInterface
2526
}
2627

27-
func (p *kubDnsProvider) Create(podName string, nodeParams nodeParams) error {
28+
func (p *kubDnsProvider) Create(podName string, nodeParams nodeParams) (nodeAddress string, err error) {
2829
pod := &apiV1.Pod{}
2930
pod.ObjectMeta.Name = podName
3031
pod.ObjectMeta.Labels = map[string]string{"name": podName}
@@ -34,50 +35,64 @@ func (p *kubDnsProvider) Create(podName string, nodeParams nodeParams) error {
3435
container.Image = nodeParams.Image
3536
port, err := strconv.Atoi(nodeParams.Port)
3637
if err != nil {
37-
return errors.New("convert to int nodeParams.Port, " + err.Error())
38+
return "", errors.New("convert to int nodeParams.Port, " + err.Error())
3839
}
3940
container.Ports = []apiV1.ContainerPort{{ContainerPort: int32(port)}}
4041
pod.Spec.Containers = append(pod.Spec.Containers, container)
4142
_, err = p.clientset.CoreV1Client.Pods(p.namespace).Create(pod)
4243
if err != nil {
43-
return errors.New("send command pod/create to k8s, " + err.Error())
44+
return "", errors.New("send command pod/create to k8s, " + err.Error())
4445
}
4546

46-
service := &apiV1.Service{}
47-
service.ObjectMeta.Name = podName
48-
service.Spec.ClusterIP = "None"
49-
service.Spec.Ports = []apiV1.ServicePort{{Port: int32(port)}}
50-
service.Spec.Selector = map[string]string{"name": podName}
51-
_, err = p.clientset.CoreV1Client.Services(p.namespace).Create(service)
52-
if err != nil {
53-
return errors.New("send command service/create to k8s, " + err.Error())
47+
stopWaitIP := time.After(40 * time.Second)
48+
log.Debugf("start waiting pod ip")
49+
var createdPodIP string
50+
LoopWaitIP:
51+
for {
52+
select {
53+
case <-stopWaitIP:
54+
return "", fmt.Errorf("wait podIP stopped by timeout, %v", podName)
55+
default:
56+
time.Sleep(time.Second)
57+
createdPod, err := p.clientset.CoreV1Client.Pods(p.namespace).Get(podName)
58+
if err != nil {
59+
log.Debugf("fail get created pod, %v, %v",podName, err)
60+
continue
61+
}
62+
if createdPod.Status.PodIP == "" {
63+
log.Debugf("empty pod ip, %v", podName)
64+
continue
65+
}
66+
createdPodIP = createdPod.Status.PodIP
67+
break LoopWaitIP
68+
}
5469
}
5570

5671
// todo: пока так ожидаем поднятие ноды, так как не понятно что конкретно означают статусы возвращаемые через апи
57-
client := p.clientFactory.Create(net.JoinHostPort(podName, nodeParams.Port))
72+
nodeAddress = net.JoinHostPort(createdPodIP, nodeParams.Port)
73+
client := p.clientFactory.Create(nodeAddress)
5874
stop := time.After(40 * time.Second)
59-
log.Debugln("start waiting")
60-
Loop:
75+
log.Debugln("start waiting selenium")
76+
LoopWaitSelenium:
6177
for {
6278
select {
6379
case <-stop:
64-
return errors.New("wait stopped by timeout")
80+
return "", fmt.Errorf("wait selenium stopped by timeout, %v", podName)
6581
default:
6682
time.Sleep(time.Second)
67-
log.Debugln("start request")
6883
message, err := client.Health()
6984
if err != nil {
7085
log.Debugf("fail request, %v", err)
7186
continue
7287
}
7388
log.Debugf("done request, status: %v", message.Status)
7489
if message.Status == 0 {
75-
break Loop
90+
break LoopWaitSelenium
7691
}
7792
}
7893
}
7994

80-
return nil
95+
return nodeAddress, nil
8196
}
8297

8398
//Destroy - destroy all pod data (idempotent operation)
@@ -90,12 +105,5 @@ func (p *kubDnsProvider) Destroy(podName string) error {
90105
err = errors.New("send command pod/delete to k8s, " + err.Error())
91106
return err
92107
}
93-
err = p.clientset.CoreV1Client.Services(p.namespace).Delete(podName, &apiV1.DeleteOptions{})
94-
switch {
95-
case err != nil && strings.Contains(err.Error(), "not found"):
96-
// service already deleted
97-
case err != nil:
98-
return errors.New("send command service/delete to k8s, " + err.Error())
99-
}
100108
return nil
101109
}

pool/strategy/kubernetes/strategy.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"github.com/qa-dev/jsonwire-grid/pool/capabilities"
77
"github.com/qa-dev/jsonwire-grid/pool/strategy"
88
"github.com/satori/go.uuid"
9-
"net"
109
"time"
1110
"fmt"
1211
)
@@ -25,20 +24,25 @@ func (s *Strategy) Reserve(desiredCaps capabilities.Capabilities) (pool.Node, er
2524
}
2625
podName := "wd-node-" + uuid.NewV4().String()
2726
ts := time.Now().Unix()
28-
address := net.JoinHostPort(podName, nodeConfig.Params.Port)
29-
node := pool.NewNode(podName, pool.NodeTypeKubernetes, address, pool.NodeStatusReserved, "", ts, ts, []capabilities.Capabilities{})
27+
node := pool.NewNode(podName, pool.NodeTypeKubernetes, "temp-value-replace-me", pool.NodeStatusReserved, "", ts, ts, []capabilities.Capabilities{})
3028
err := s.storage.Add(*node, s.config.Limit)
3129
if err != nil {
3230
return pool.Node{}, errors.New("add node to storage, " + err.Error())
3331
}
34-
err = s.provider.Create(podName, nodeConfig.Params)
32+
nodeAddress, err := s.provider.Create(podName, nodeConfig.Params)
3533
if err != nil {
3634
go func(podName string) {
3735
time.Sleep(time.Minute * 2)
3836
_ = s.provider.Destroy(podName) // на случай если что то криво создалось
3937
}(podName)
4038
return pool.Node{}, errors.New("create node by provider, " + err.Error())
4139
}
40+
41+
err = s.storage.UpdateAddress(*node, nodeAddress)
42+
if err != nil {
43+
return pool.Node{}, errors.New("update node address in storage, " + err.Error())
44+
}
45+
node.Address = nodeAddress
4246
return *node, nil
4347

4448
}

pool/strategy/kubernetes/strategy_test.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ type providerMock struct {
1414
mock.Mock
1515
}
1616

17-
func (p *providerMock) Create(podName string, nodeParams nodeParams) error {
17+
func (p *providerMock) Create(podName string, nodeParams nodeParams) (nodeAddress string, err error) {
1818
args := p.Called(podName, nodeParams)
19-
return args.Error(0)
19+
return args.String(0), args.Error(1)
2020
}
2121

2222
func (p *providerMock) Destroy(podName string) error {
@@ -32,14 +32,17 @@ func TestStrategy_Reserve_Positive(t *testing.T) {
3232
}
3333
sm := new(pool.StorageMock)
3434
sm.On("Add", mock.AnythingOfType("pool.Node"), mock.AnythingOfType("int")).Return(nil)
35+
sm.On("UpdateAddress", mock.AnythingOfType("pool.Node"), mock.AnythingOfType("string")).Return(nil)
3536
cm := new(capabilities.ComparatorMock)
3637
cm.On("Compare", mock.AnythingOfType("capabilities.Capabilities"), mock.AnythingOfType("capabilities.Capabilities")).Return(true)
3738
pm := new(providerMock)
38-
pm.On("Create", mock.AnythingOfType("string"), mock.AnythingOfType("nodeParams")).Return(nil)
39+
expectedAddress := "addr"
40+
pm.On("Create", mock.AnythingOfType("string"), mock.AnythingOfType("nodeParams")).Return(expectedAddress, nil)
3941
str := Strategy{storage: sm, provider: pm, config: strategyConfig, capsComparator: cm}
4042
node, err := str.Reserve(capabilities.Capabilities{})
4143
assert.Nil(t, err)
4244
assert.NotNil(t, node)
45+
assert.Equal(t, expectedAddress, node.Address)
4346
}
4447

4548
func TestStrategy_Reserve_Negative_NotMatchCapabilities(t *testing.T) {
@@ -61,7 +64,27 @@ func TestStrategy_Reserve_Negative_ReserveAvailable(t *testing.T) {
6164
cm.On("Compare", mock.AnythingOfType("capabilities.Capabilities"), mock.AnythingOfType("capabilities.Capabilities")).Return(true)
6265
pm := new(providerMock)
6366
eError := errors.New("Error")
64-
pm.On("Create", mock.AnythingOfType("string"), mock.AnythingOfType("nodeParams")).Return(eError)
67+
pm.On("Create", mock.AnythingOfType("string"), mock.AnythingOfType("nodeParams")).Return("", eError)
68+
pm.On("Destroy", mock.AnythingOfType("string")).Return(nil)
69+
str := Strategy{storage: sm, provider: pm, config: strategyConfig, capsComparator: cm}
70+
_, err := str.Reserve(capabilities.Capabilities{})
71+
assert.NotNil(t, err)
72+
}
73+
74+
func TestStrategy_Reserve_Negative_UpdateAddress(t *testing.T) {
75+
nodeCfg := nodeConfig{}
76+
nodeCfg.CapabilitiesList = []map[string]interface{}{{"cap1": "cal1"}}
77+
strategyConfig := strategyConfig{
78+
NodeList: []nodeConfig{nodeCfg},
79+
}
80+
sm := new(pool.StorageMock)
81+
sm.On("Add", mock.AnythingOfType("pool.Node"), mock.AnythingOfType("int")).Return(nil)
82+
sm.On("UpdateAddress", mock.AnythingOfType("pool.Node"), mock.AnythingOfType("string")).Return(errors.New("muhaha-error"))
83+
cm := new(capabilities.ComparatorMock)
84+
cm.On("Compare", mock.AnythingOfType("capabilities.Capabilities"), mock.AnythingOfType("capabilities.Capabilities")).Return(true)
85+
pm := new(providerMock)
86+
eError := errors.New("Error")
87+
pm.On("Create", mock.AnythingOfType("string"), mock.AnythingOfType("nodeParams")).Return("", eError)
6588
pm.On("Destroy", mock.AnythingOfType("string")).Return(nil)
6689
str := Strategy{storage: sm, provider: pm, config: strategyConfig, capsComparator: cm}
6790
_, err := str.Reserve(capabilities.Capabilities{})

0 commit comments

Comments
 (0)