Skip to content

Commit 90dde9d

Browse files
committed
test: add instance test
add modul rm module
1 parent d5afbe6 commit 90dde9d

File tree

2 files changed

+122
-0
lines changed

2 files changed

+122
-0
lines changed

docs/module.rst

Whitespace-only changes.

scaleway/tests/test_instance.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import logging
2+
import sys
3+
from typing import Dict
4+
import unittest
5+
import uuid
6+
import time
7+
8+
from scaleway_core.client import Client
9+
from scaleway.instance.v1.api import InstanceV1API
10+
from scaleway.block.v1alpha1 import BlockV1Alpha1API
11+
from scaleway.instance.v1.types import Server, VolumeServerTemplate
12+
from scaleway.block.v1alpha1.types import Volume, CreateVolumeRequestFromEmpty
13+
14+
logger = logging.getLogger()
15+
logger.level = logging.DEBUG
16+
stream_handler = logging.StreamHandler(sys.stdout)
17+
logger.addHandler(stream_handler)
18+
19+
server_name = f"test-sdk-python-{uuid.uuid4().hex[:6]}"
20+
timeout = 10
21+
volume_size = 10
22+
commercial_type = "DEV1-S"
23+
class TestE2EServerCreation(unittest.TestCase):
24+
def setUp(self) -> None:
25+
self.zone = "fr-par-1"
26+
self.client = Client.from_config_file_and_env()
27+
self.instanceAPI = InstanceV1API(self.client, bypass_validation=True)
28+
self.blockAPI = BlockV1Alpha1API(self.client, bypass_validation=True)
29+
self._server = None
30+
self._volumes = []
31+
32+
def tearDown(self) -> None:
33+
for volume in self._volumes:
34+
try:
35+
self.instanceAPI.detach_server_volume(server_id=self._server.id, volume_id=volume.id)
36+
logger.info("✅ Volume {volume.id} has been detach")
37+
except Exception as e:
38+
logger.warning(f"Failed to detach volume {volume.id}: {e}")
39+
40+
41+
try:
42+
43+
self.blockAPI.delete_volume(volume_id=volume.id)
44+
logger.info("✅ Volume {volume.id} has been deleted")
45+
except Exception as e:
46+
logger.warning(f"Failed to delete volume {volume.id}: {e}")
47+
48+
if self._server:
49+
try:
50+
self.api.delete_server(zone=self.zone, server_id=self._server.id)
51+
logger.info(f"🗑️ Deleted server: {self._server.id}")
52+
except Exception as e:
53+
logger.warning(f"Failed to delete server {self._server.id}: {e}")
54+
55+
def wait_test_instance_server(self, server_id):
56+
for _ in range(10):
57+
s = self.api.get_server(zone=self.zone, server_id=server.id)
58+
if s.state == "running":
59+
logger.info(f"✅ Server {server_id} is running.")
60+
break
61+
time.sleep(timeout)
62+
else:
63+
self.fail("Server did not reach 'running' state in time.")
64+
65+
def create_test_instance_server(self) -> Server:
66+
volume = { "0": VolumeServerTemplate(volume_type= "sbs_volume",
67+
name="my-volume",
68+
size=volume_size
69+
)}
70+
server = self.instanceAPI._create_server(
71+
commercial_type=commercial_type,
72+
zone=self.zone,
73+
name=server_name,
74+
dynamic_ip_required=True,
75+
volumes=volume
76+
)
77+
logger.info(f"✅ Created server: {server.id}")
78+
self._server = server.server
79+
self.wait_test_instance_server(server_id=server.server.id)
80+
return server.server
81+
82+
def create_test_from_empty_volume(self, number) -> Dict[str, Volume]:
83+
volumes: Dict[str, Volume] = {}
84+
for i in range(number):
85+
volume = self.blockAPI.create_volume(
86+
project_id="19e2fd0b-3d53-4f8f-9338-629df9c1b1db",
87+
from_empty=CreateVolumeRequestFromEmpty(size=10)
88+
)
89+
logger.info("✅ Created server: {volume.id}")
90+
self._volumes.append(volume) # Ensure cleanup in tearDown
91+
volumes[str(i)] = volume
92+
93+
return volumes
94+
95+
96+
97+
def test_attach_aditionnal_volume(self):
98+
server = self.create_test_instance_server()
99+
additional_volumes = self.create_test_from_empty_volume(1)
100+
additional_volume = list(additional_volumes.values())[0]
101+
102+
103+
self.assertIsNotNone(server.id)
104+
self.assertEqual(server.zone, self.zone)
105+
106+
self.assertIsNotNone(additional_volume.id)
107+
self.assertEqual(additional_volume.size, 10)
108+
logger.info(f"✅ Volume created with ID: {additional_volume.id}")
109+
110+
self.instanceAPI.attach_server_volume(
111+
server_id=server.id,
112+
volume_id=additional_volume.id
113+
)
114+
logger.info(f"🔗 Attached volume {additional_volume.id} to server {server.id}")
115+
116+
time.sleep(5)
117+
118+
updated_server = self.instanceAPI.get_server(zone=self.zone, server_id=server.id)
119+
attached_volumes = updated_server.volumes or {}
120+
attached_volume_ids = [v.volume.id for v in attached_volumes.values()]
121+
self.assertIn(additional_volume.id, attached_volume_ids)
122+
logger.info(f"✅ Volume {additional_volume.id} is attached to server {server.id}")

0 commit comments

Comments
 (0)