9
9
from scaleway .instance .v1 .api import InstanceV1API
10
10
from scaleway .block .v1alpha1 import BlockV1Alpha1API
11
11
from scaleway .instance .v1 .types import Server , VolumeServerTemplate
12
- from scaleway .block .v1alpha1 .types import Volume , CreateVolumeRequestFromEmpty
12
+ from scaleway .block .v1alpha1 .types import Volume , CreateVolumeRequestFromEmpty
13
13
14
14
logger = logging .getLogger ()
15
15
logger .level = logging .DEBUG
20
20
timeout = 10
21
21
volume_size = 10
22
22
commercial_type = "DEV1-S"
23
+
24
+
23
25
class TestE2EServerCreation (unittest .TestCase ):
24
26
def setUp (self ) -> None :
25
27
self .zone = "fr-par-1"
@@ -32,29 +34,27 @@ def setUp(self) -> None:
32
34
def tearDown (self ) -> None :
33
35
for volume in self ._volumes :
34
36
try :
35
- self .instanceAPI .detach_server_volume (server_id = self ._server .id , volume_id = volume .id )
37
+ self .instanceAPI .detach_server_volume (
38
+ server_id = self ._server .id , volume_id = volume .id
39
+ )
36
40
logger .info ("✅ Volume {volume.id} has been detach" )
37
41
except Exception as e :
38
42
logger .warning (f"Failed to detach volume { volume .id } : { e } " )
39
-
40
-
41
43
try :
42
-
43
44
self .blockAPI .delete_volume (volume_id = volume .id )
44
45
logger .info ("✅ Volume {volume.id} has been deleted" )
45
46
except Exception as e :
46
47
logger .warning (f"Failed to delete volume { volume .id } : { e } " )
47
-
48
48
if self ._server :
49
49
try :
50
50
self .api .delete_server (zone = self .zone , server_id = self ._server .id )
51
51
logger .info (f"🗑️ Deleted server: { self ._server .id } " )
52
52
except Exception as e :
53
53
logger .warning (f"Failed to delete server { self ._server .id } : { e } " )
54
-
54
+
55
55
def wait_test_instance_server (self , server_id ):
56
56
for _ in range (10 ):
57
- s = self .api .get_server (zone = self .zone , server_id = server . id )
57
+ s = self .api .get_server (zone = self .zone , server_id = server_id )
58
58
if s .state == "running" :
59
59
logger .info (f"✅ Server { server_id } is running." )
60
60
break
@@ -63,16 +63,17 @@ def wait_test_instance_server(self, server_id):
63
63
self .fail ("Server did not reach 'running' state in time." )
64
64
65
65
def create_test_instance_server (self ) -> Server :
66
- volume = { "0" : VolumeServerTemplate (volume_type = "sbs_volume" ,
67
- name = "my-volume" ,
68
- size = volume_size
69
- )}
66
+ volume = {
67
+ "0" : VolumeServerTemplate (
68
+ volume_type = "sbs_volume" , name = "my-volume" , size = volume_size
69
+ )
70
+ }
70
71
server = self .instanceAPI ._create_server (
71
72
commercial_type = commercial_type ,
72
73
zone = self .zone ,
73
74
name = server_name ,
74
75
dynamic_ip_required = True ,
75
- volumes = volume
76
+ volumes = volume ,
76
77
)
77
78
logger .info (f"✅ Created server: { server .id } " )
78
79
self ._server = server .server
@@ -83,23 +84,20 @@ def create_test_from_empty_volume(self, number) -> Dict[str, Volume]:
83
84
volumes : Dict [str , Volume ] = {}
84
85
for i in range (number ):
85
86
volume = self .blockAPI .create_volume (
86
- project_id = "19e2fd0b-3d53-4f8f-9338-629df9c1b1db" ,
87
- from_empty = CreateVolumeRequestFromEmpty (size = 10 )
87
+ project_id = "19e2fd0b-3d53-4f8f-9338-629df9c1b1db" ,
88
+ from_empty = CreateVolumeRequestFromEmpty (size = 10 ),
88
89
)
89
90
logger .info ("✅ Created server: {volume.id}" )
90
91
self ._volumes .append (volume ) # Ensure cleanup in tearDown
91
92
volumes [str (i )] = volume
92
-
93
- return volumes
94
93
95
-
94
+ return volumes
96
95
97
96
def test_attach_aditionnal_volume (self ):
98
97
server = self .create_test_instance_server ()
99
98
additional_volumes = self .create_test_from_empty_volume (1 )
100
99
additional_volume = list (additional_volumes .values ())[0 ]
101
100
102
-
103
101
self .assertIsNotNone (server .id )
104
102
self .assertEqual (server .zone , self .zone )
105
103
@@ -108,15 +106,18 @@ def test_attach_aditionnal_volume(self):
108
106
logger .info (f"✅ Volume created with ID: { additional_volume .id } " )
109
107
110
108
self .instanceAPI .attach_server_volume (
111
- server_id = server .id ,
112
- volume_id = additional_volume .id
109
+ server_id = server .id , volume_id = additional_volume .id
113
110
)
114
111
logger .info (f"🔗 Attached volume { additional_volume .id } to server { server .id } " )
115
112
116
113
time .sleep (5 )
117
114
118
- updated_server = self .instanceAPI .get_server (zone = self .zone , server_id = server .id )
115
+ updated_server = self .instanceAPI .get_server (
116
+ zone = self .zone , server_id = server .id
117
+ )
119
118
attached_volumes = updated_server .volumes or {}
120
119
attached_volume_ids = [v .volume .id for v in attached_volumes .values ()]
121
120
self .assertIn (additional_volume .id , attached_volume_ids )
122
- logger .info (f"✅ Volume { additional_volume .id } is attached to server { server .id } " )
121
+ logger .info (
122
+ f"✅ Volume { additional_volume .id } is attached to server { server .id } "
123
+ )
0 commit comments