1
1
import logging
2
2
import sys
3
- from typing import Dict
3
+ from typing import List
4
4
import unittest
5
5
import uuid
6
6
import time
17
17
logger .addHandler (stream_handler )
18
18
19
19
server_name = f"test-sdk-python-{ uuid .uuid4 ().hex [:6 ]} "
20
- timeout = 10
20
+ max_retry = 10
21
+ interval = 10
21
22
volume_size = 10
22
23
commercial_type = "DEV1-S"
24
+ zone = "fr-par-1"
25
+ timeout_attach = 10
23
26
24
27
25
28
class TestE2EServerCreation (unittest .TestCase ):
26
29
def setUp (self ) -> None :
27
- self .zone = "fr-par-1"
30
+ self .zone = zone
28
31
self .client = Client .from_config_file_and_env ()
29
32
self .instanceAPI = InstanceV1API (self .client , bypass_validation = True )
30
33
self .blockAPI = BlockV1Alpha1API (self .client , bypass_validation = True )
@@ -33,32 +36,24 @@ def setUp(self) -> None:
33
36
34
37
def tearDown (self ) -> None :
35
38
for volume in self ._volumes :
36
- try :
37
- self .instanceAPI .detach_server_volume (
38
- server_id = self ._server .id , volume_id = volume .id
39
- )
40
- logger .info ("✅ Volume {volume.id} has been detach" )
41
- except Exception as e :
42
- logger .warning (f"Failed to detach volume { volume .id } : { e } " )
43
- try :
44
- self .blockAPI .delete_volume (volume_id = volume .id )
45
- logger .info ("✅ Volume {volume.id} has been deleted" )
46
- except Exception as e :
47
- logger .warning (f"Failed to delete volume { volume .id } : { e } " )
39
+ self .instanceAPI .detach_server_volume (
40
+ server_id = self ._server .id , volume_id = volume .id
41
+ )
42
+ logger .info ("✅ Volume {volume.id} has been detach" )
43
+
44
+ self .blockAPI .delete_volume (volume_id = volume .id )
45
+ logger .info ("✅ Volume {volume.id} has been deleted" )
48
46
if self ._server :
49
- try :
50
- self .api .delete_server (zone = self .zone , server_id = self ._server .id )
51
- logger .info (f"🗑️ Deleted server: { self ._server .id } " )
52
- except Exception as e :
53
- logger .warning (f"Failed to delete server { self ._server .id } : { e } " )
47
+ self .api .delete_server (zone = self .zone , server_id = self ._server .id )
48
+ logger .info (f"🗑️ Deleted server: { self ._server .id } " )
54
49
55
50
def wait_test_instance_server (self , server_id ):
56
- for _ in range (10 ):
51
+ for i in range (1 , max_retry ):
52
+ interval *= i
57
53
s = self .api .get_server (zone = self .zone , server_id = server_id )
58
54
if s .state == "running" :
59
- logger .info (f"✅ Server { server_id } is running." )
60
55
break
61
- time .sleep (timeout )
56
+ time .sleep (interval )
62
57
else :
63
58
self .fail ("Server did not reach 'running' state in time." )
64
59
@@ -80,16 +75,15 @@ def create_test_instance_server(self) -> Server:
80
75
self .wait_test_instance_server (server_id = server .server .id )
81
76
return server .server
82
77
83
- def create_test_from_empty_volume (self , number ) -> Dict [ str , Volume ]:
84
- volumes : Dict [ str , Volume ] = {}
78
+ def create_test_from_empty_volume (self , number ) -> List [ Volume ]:
79
+ volumes : List [ Volume ] = {}
85
80
for i in range (number ):
86
81
volume = self .blockAPI .create_volume (
87
- project_id = "19e2fd0b-3d53-4f8f-9338-629df9c1b1db" ,
88
82
from_empty = CreateVolumeRequestFromEmpty (size = 10 ),
89
83
)
90
84
logger .info ("✅ Created server: {volume.id}" )
91
85
self ._volumes .append (volume ) # Ensure cleanup in tearDown
92
- volumes [ str ( i )] = volume
86
+ volumes . append ( volume )
93
87
94
88
return volumes
95
89
@@ -110,7 +104,7 @@ def test_attach_aditionnal_volume(self):
110
104
)
111
105
logger .info (f"🔗 Attached volume { additional_volume .id } to server { server .id } " )
112
106
113
- time .sleep (5 )
107
+ time .sleep (timeout_attach )
114
108
115
109
updated_server = self .instanceAPI .get_server (
116
110
zone = self .zone , server_id = server .id
0 commit comments