Skip to content

Commit f3b3799

Browse files
Merge pull request #25 from qa-dev/QA-7710
- change service address to pod ip in kubernetes strategy - move hardcode in on-demand strategy to config
2 parents 9962a10 + 2a3a5e7 commit f3b3799

File tree

19 files changed

+361
-161
lines changed

19 files changed

+361
-161
lines changed

README.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,9 @@ Configurations are stored in json files. Example:
112112
| node_list | - | Omit this property. |
113113

114114
##### `kubernetes` - on-demand nodes in kubernetes cluster.
115-
| Strategy option | Possible values | Description |
116-
|-------------------------- | --------------- | --------------------------- |
117-
| params | - | Omit this property. |
118-
| node_list.[].params.image | string | Docker image with selenium. |
119-
| node_list.[].params.port | string | Port of selenium. |
115+
| Strategy option | Possible values | Description |
116+
|-------------------------- | ---------------------- | ------------------------------------- |
117+
| params.namespace | string | Namespace in k8s for on-demand nodes. |
118+
| params.pod_creation_timeout | string as `12m`, `60s` | Max waiting time for creating a pod. |
119+
| node_list.[].params.image | string | Docker image with selenium. |
120+
| node_list.[].params.port | string | Port of selenium. |

handlers/registerNode.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,11 @@ func (h *RegisterNode) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
5050
for i, value := range capabilitiesList {
5151
poolCapabilitiesList[i] = capabilities.Capabilities(value)
5252
}
53+
hostPort := register.Configuration.Host + ":" + strconv.Itoa(register.Configuration.Port)
5354
err = h.Pool.Add(
55+
hostPort,
5456
pool.NodeTypePersistent,
55-
register.Configuration.Host+":"+strconv.Itoa(register.Configuration.Port),
57+
hostPort,
5658
poolCapabilitiesList,
5759
)
5860
if err != nil {

pool/mocks.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,8 @@ func (s *StrategyListMock) FixNodeStatus(node Node) error {
7272
args := s.Called(node)
7373
return args.Error(0)
7474
}
75+
76+
func (s *StorageMock) UpdateAddress(node Node, newAddress string) error {
77+
args := s.Called(node, newAddress)
78+
return args.Error(0)
79+
}

pool/node.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ const (
1818
)
1919

2020
type Node struct {
21+
// A unique key, by which we understand how to find this object in the outer world + for not adding the second time the same thing.
22+
// The value may depend on the strategy:
23+
// - for constant nodes ip: port
24+
// - for temporary pod.name
25+
Key string
2126
Type NodeType
2227
Address string
2328
Status NodeStatus
@@ -28,10 +33,11 @@ type Node struct {
2833
}
2934

3035
func (n *Node) String() string {
31-
return "Node [" + n.Address + "]"
36+
return "Node [" + n.Key + "]"
3237
}
3338

3439
func NewNode(
40+
key string,
3541
t NodeType,
3642
address string,
3743
status NodeStatus,
@@ -41,6 +47,7 @@ func NewNode(
4147
capabilitiesList []capabilities.Capabilities,
4248
) *Node {
4349
return &Node{
50+
key,
4451
t,
4552
address,
4653
status,

pool/pool.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type StorageInterface interface {
2525
GetByAddress(string) (Node, error)
2626
GetAll() ([]Node, error)
2727
Remove(Node) error
28+
UpdateAddress(node Node, newAddress string) error
2829
}
2930

3031
type StrategyInterface interface {
@@ -68,12 +69,12 @@ func (p *Pool) ReserveAvailableNode(caps capabilities.Capabilities) (*Node, erro
6869
return &node, err
6970
}
7071

71-
func (p *Pool) Add(t NodeType, address string, capabilitiesList []capabilities.Capabilities) error {
72+
func (p *Pool) Add(key string, t NodeType, address string, capabilitiesList []capabilities.Capabilities) error {
7273
if len(capabilitiesList) == 0 {
7374
return errors.New("[Pool/Add] Capabilities must contains more one element")
7475
}
7576
ts := time.Now().Unix()
76-
return p.storage.Add(*NewNode(t, address, NodeStatusAvailable, "", ts, ts, capabilitiesList), 0)
77+
return p.storage.Add(*NewNode(key, t, address, NodeStatusAvailable, "", ts, ts, capabilitiesList), 0)
7778
}
7879

7980
func (p *Pool) RegisterSession(node *Node, sessionID string) error {
@@ -148,7 +149,7 @@ func (p *Pool) FixNodeStatuses() {
148149
continue
149150
}
150151
if isFixed {
151-
log.Infof("Node [%s] status fixed", node.Address)
152+
log.Infof("Node [%s] status fixed", node.Key)
152153
}
153154
}
154155
}
@@ -169,7 +170,7 @@ func (p *Pool) fixNodeStatus(node *Node) (bool, error) {
169170
}
170171
err := p.strategyList.FixNodeStatus(*node)
171172
if err != nil {
172-
return false, fmt.Errorf("Can't fix node [%s] status, %s", node.Address, err.Error())
173+
return false, fmt.Errorf("Can't fix node [%s] status, %s", node.Key, err.Error())
173174
}
174175
return true, nil
175176
}

pool/pool_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func TestPool_Add_Positive(t *testing.T) {
4747
p := NewPool(s, new(StrategyListMock))
4848
eAddress := "127.0.0.1"
4949
eNodeType := NodeTypePersistent
50-
err := p.Add(eNodeType, eAddress, []capabilities.Capabilities{{"browserName": "ololo"}})
50+
err := p.Add("1234", eNodeType, eAddress, []capabilities.Capabilities{{"browserName": "ololo"}})
5151
a.Nil(err)
5252
}
5353

@@ -59,7 +59,7 @@ func TestPool_Add_Negative(t *testing.T) {
5959
p := NewPool(s, new(StrategyListMock))
6060
eAddress := "127.0.0.1"
6161
eNodeType := NodeTypePersistent
62-
err := p.Add(eNodeType, eAddress, []capabilities.Capabilities{})
62+
err := p.Add("1234", eNodeType, eAddress, []capabilities.Capabilities{})
6363
a.Error(err)
6464
}
6565

@@ -225,7 +225,7 @@ func TestPool_fixNodeStatus_Positive_BusyExpired(t *testing.T) {
225225
slm := new(StrategyListMock)
226226
slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(nil)
227227
p := NewPool(new(StorageMock), slm)
228-
node := NewNode(NodeTypePersistent, "", NodeStatusBusy, "", 0, 0, []capabilities.Capabilities{})
228+
node := NewNode("123", NodeTypePersistent, "", NodeStatusBusy, "", 0, 0, []capabilities.Capabilities{})
229229
isFixed, err := p.fixNodeStatus(node)
230230
a.True(isFixed)
231231
a.Nil(err)
@@ -236,7 +236,7 @@ func TestPool_fixNodeStatus_Positive_ReservedExpired(t *testing.T) {
236236
slm := new(StrategyListMock)
237237
slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(nil)
238238
p := NewPool(new(StorageMock), slm)
239-
node := NewNode(NodeTypePersistent, "", NodeStatusReserved, "", 0, 0, []capabilities.Capabilities{})
239+
node := NewNode("123", NodeTypePersistent, "", NodeStatusReserved, "", 0, 0, []capabilities.Capabilities{})
240240
isFixed, err := p.fixNodeStatus(node)
241241
a.True(isFixed)
242242
a.Nil(err)
@@ -247,7 +247,7 @@ func TestPool_fixNodeStatus_Positive_BusyNotNotExpired(t *testing.T) {
247247
slm := new(StrategyListMock)
248248
slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(nil)
249249
p := NewPool(new(StorageMock), slm)
250-
node := NewNode(NodeTypePersistent, "", NodeStatusBusy, "", time.Now().Unix(), 0, []capabilities.Capabilities{})
250+
node := NewNode("123", NodeTypePersistent, "", NodeStatusBusy, "", time.Now().Unix(), 0, []capabilities.Capabilities{})
251251
isFixed, err := p.fixNodeStatus(node)
252252
a.False(isFixed)
253253
a.Nil(err)
@@ -258,7 +258,7 @@ func TestPool_fixNodeStatus_Positive_ReservedNotNotExpired(t *testing.T) {
258258
slm := new(StrategyListMock)
259259
slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(nil)
260260
p := NewPool(new(StorageMock), slm)
261-
node := NewNode(NodeTypePersistent, "", NodeStatusReserved, "", time.Now().Unix(), 0, []capabilities.Capabilities{})
261+
node := NewNode("123", NodeTypePersistent, "", NodeStatusReserved, "", time.Now().Unix(), 0, []capabilities.Capabilities{})
262262
isFixed, err := p.fixNodeStatus(node)
263263
a.False(isFixed)
264264
a.Nil(err)
@@ -269,7 +269,7 @@ func TestPool_fixNodeStatus_Positive_AvailableExpired(t *testing.T) {
269269
slm := new(StrategyListMock)
270270
slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(nil)
271271
p := NewPool(new(StorageMock), slm)
272-
node := NewNode(NodeTypePersistent, "", NodeStatusAvailable, "", 0, 0, []capabilities.Capabilities{})
272+
node := NewNode("123", NodeTypePersistent, "", NodeStatusAvailable, "", 0, 0, []capabilities.Capabilities{})
273273
isFixed, err := p.fixNodeStatus(node)
274274
a.False(isFixed)
275275
a.Nil(err)
@@ -281,7 +281,7 @@ func TestPool_fixNodeStatus_NegativeBusy(t *testing.T) {
281281
slm := new(StrategyListMock)
282282
slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(eError)
283283
p := NewPool(new(StorageMock), slm)
284-
node := NewNode(NodeTypePersistent, "", NodeStatusBusy, "", 0, 0, []capabilities.Capabilities{})
284+
node := NewNode("123", NodeTypePersistent, "", NodeStatusBusy, "", 0, 0, []capabilities.Capabilities{})
285285
isFixed, err := p.fixNodeStatus(node)
286286
a.False(isFixed)
287287
a.Error(err)
@@ -293,7 +293,7 @@ func TestPool_fixNodeStatus_NegativeReserved(t *testing.T) {
293293
slm := new(StrategyListMock)
294294
slm.On("FixNodeStatus", mock.AnythingOfType("pool.Node")).Return(eError)
295295
p := NewPool(new(StorageMock), slm)
296-
node := NewNode(NodeTypePersistent, "", NodeStatusReserved, "", 0, 0, []capabilities.Capabilities{})
296+
node := NewNode("123", NodeTypePersistent, "", NodeStatusReserved, "", 0, 0, []capabilities.Capabilities{})
297297
isFixed, err := p.fixNodeStatus(node)
298298
a.False(isFixed)
299299
a.Error(err)

pool/strategy/kubernetes/config.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,33 @@ import (
44
"encoding/json"
55
"errors"
66
"github.com/qa-dev/jsonwire-grid/config"
7+
"time"
8+
"fmt"
79
)
810

911
type strategyParams struct {
12+
Namespace string
13+
PodCreationTimeout time.Duration
14+
}
15+
16+
func (sp *strategyParams) UnmarshalJSON(b []byte) error {
17+
tempStruct := struct{
18+
Namespace string `json:"namespace"`
19+
PodCreationTimeout string `json:"pod_creation_timeout"`
20+
} {
21+
"default",
22+
"1m",
23+
}
24+
if err := json.Unmarshal(b, &tempStruct); err != nil {
25+
return err
26+
}
27+
podCreationTimeout, err := time.ParseDuration(tempStruct.PodCreationTimeout)
28+
if err != nil {
29+
return fmt.Errorf("invalid value strategy.pod_creation_timeout in config, given: %v", tempStruct.PodCreationTimeout)
30+
}
31+
sp.Namespace = tempStruct.Namespace
32+
sp.PodCreationTimeout = podCreationTimeout
33+
return nil
1034
}
1135

1236
type strategyConfig struct {

pool/strategy/kubernetes/factory.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/qa-dev/jsonwire-grid/pool/capabilities"
99
"k8s.io/client-go/kubernetes"
1010
"k8s.io/client-go/rest"
11+
log "github.com/Sirupsen/logrus"
1112
)
1213

1314
type StrategyFactory struct {
@@ -31,6 +32,8 @@ func (f *StrategyFactory) Create(
3132
}
3233
}
3334

35+
log.Debugf("strategy kubernetes config, %+v", strategyConfig)
36+
3437
//todo: выпилить этот говноклиент, когда будет работать нормальный
3538
kubConfig, err := rest.InClusterConfig()
3639
if err != nil {
@@ -42,9 +45,10 @@ func (f *StrategyFactory) Create(
4245
return nil, errors.New("create k8s clientset, " + err.Error())
4346
}
4447

45-
provider := &kubernetesProvider{
48+
provider := &kubDnsProvider{
4649
clientset: clientset,
47-
namespace: "default", //todo: брать из конфига !!!
50+
namespace: strategyConfig.Params.Namespace,
51+
podCreationTimeout: strategyConfig.Params.PodCreationTimeout,
4852
clientFactory: clientFactory,
4953
}
5054

pool/strategy/kubernetes/provider.go

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,24 @@ import (
99
"net"
1010
"strconv"
1111
"time"
12+
"strings"
13+
"fmt"
1214
)
1315

1416
type kubernetesProviderInterface interface {
15-
Create(podName string, nodeParams nodeParams) error
17+
Create(podName string, nodeParams nodeParams) (nodeAddress string, err error)
18+
// idempotent operation
1619
Destroy(podName string) error
1720
}
1821

19-
type kubernetesProvider struct {
22+
type kubDnsProvider struct {
2023
clientset *kubernetes.Clientset
2124
namespace string
25+
podCreationTimeout time.Duration
2226
clientFactory jsonwire.ClientFactoryInterface
2327
}
2428

25-
func (p *kubernetesProvider) Create(podName string, nodeParams nodeParams) error {
29+
func (p *kubDnsProvider) Create(podName string, nodeParams nodeParams) (nodeAddress string, err error) {
2630
pod := &apiV1.Pod{}
2731
pod.ObjectMeta.Name = podName
2832
pod.ObjectMeta.Labels = map[string]string{"name": podName}
@@ -32,62 +36,70 @@ func (p *kubernetesProvider) Create(podName string, nodeParams nodeParams) error
3236
container.Image = nodeParams.Image
3337
port, err := strconv.Atoi(nodeParams.Port)
3438
if err != nil {
35-
return errors.New("convert to int nodeParams.Port, " + err.Error())
39+
return "", errors.New("convert to int nodeParams.Port, " + err.Error())
3640
}
3741
container.Ports = []apiV1.ContainerPort{{ContainerPort: int32(port)}}
3842
pod.Spec.Containers = append(pod.Spec.Containers, container)
3943
_, err = p.clientset.CoreV1Client.Pods(p.namespace).Create(pod)
4044
if err != nil {
41-
return errors.New("send command pod/create to k8s, " + err.Error())
45+
return "", errors.New("send command pod/create to k8s, " + err.Error())
4246
}
4347

44-
service := &apiV1.Service{}
45-
service.ObjectMeta.Name = podName
46-
service.Spec.ClusterIP = "None"
47-
service.Spec.Ports = []apiV1.ServicePort{{Port: int32(port)}}
48-
service.Spec.Selector = map[string]string{"name": podName}
49-
_, err = p.clientset.CoreV1Client.Services(p.namespace).Create(service)
50-
if err != nil {
51-
return errors.New("send command service/create to k8s, " + err.Error())
48+
stop := time.After(p.podCreationTimeout)
49+
log.Debugf("start waiting pod ip")
50+
var createdPodIP string
51+
LoopWaitIP:
52+
for {
53+
select {
54+
case <-stop:
55+
return "", fmt.Errorf("wait podIP stopped by timeout, %v", podName)
56+
default:
57+
time.Sleep(time.Second)
58+
createdPod, err := p.clientset.CoreV1Client.Pods(p.namespace).Get(podName)
59+
if err != nil {
60+
log.Debugf("fail get created pod, %v, %v",podName, err)
61+
continue
62+
}
63+
if createdPod.Status.PodIP == "" {
64+
log.Debugf("empty pod ip, %v", podName)
65+
continue
66+
}
67+
createdPodIP = createdPod.Status.PodIP
68+
break LoopWaitIP
69+
}
5270
}
5371

5472
// todo: пока так ожидаем поднятие ноды, так как не понятно что конкретно означают статусы возвращаемые через апи
55-
client := p.clientFactory.Create(net.JoinHostPort(podName, nodeParams.Port))
56-
stop := time.After(40 * time.Second)
57-
log.Debugln("start waiting")
58-
Loop:
73+
nodeAddress = net.JoinHostPort(createdPodIP, nodeParams.Port)
74+
client := p.clientFactory.Create(nodeAddress)
75+
log.Debugln("start waiting selenium")
76+
LoopWaitSelenium:
5977
for {
6078
select {
6179
case <-stop:
62-
return errors.New("wait stopped by timeout")
80+
return "", fmt.Errorf("wait selenium stopped by timeout, %v", podName)
6381
default:
6482
time.Sleep(time.Second)
65-
log.Debugln("start request")
6683
message, err := client.Health()
6784
if err != nil {
6885
log.Debugf("fail request, %v", err)
6986
continue
7087
}
7188
log.Debugf("done request, status: %v", message.Status)
7289
if message.Status == 0 {
73-
break Loop
90+
break LoopWaitSelenium
7491
}
7592
}
7693
}
7794

78-
return nil
95+
return nodeAddress, nil
7996
}
8097

81-
func (p *kubernetesProvider) Destroy(podName string) error {
98+
//Destroy - destroy all pod data (idempotent operation)
99+
func (p *kubDnsProvider) Destroy(podName string) error {
82100
err := p.clientset.CoreV1Client.Pods(p.namespace).Delete(podName, &apiV1.DeleteOptions{})
83-
if err != nil {
84-
err = errors.New("send command pod/delete to k8s, " + err.Error())
85-
return err
86-
}
87-
err = p.clientset.CoreV1Client.Services(p.namespace).Delete(podName, &apiV1.DeleteOptions{})
88-
if err != nil {
89-
err = errors.New("send command service/delete to k8s, " + err.Error())
90-
return err
101+
if err != nil && !strings.Contains(err.Error(), "not found") {
102+
return errors.New("send command pod/delete to k8s, " + err.Error())
91103
}
92104
return nil
93105
}

0 commit comments

Comments
 (0)