Skip to content

Commit

Permalink
Adding UTs for snapshot module (#147)
Browse files Browse the repository at this point in the history
* Adding UTs for snapshot module

* Adding UTs for Snapshot module

* Adding UTs for syncIQ job

* Adding UTs for syncIQ job

* Adding UTs for SyncIQ Job module
  • Loading branch information
trisha-dell authored Feb 3, 2025
1 parent ed16885 commit 7c09d52
Show file tree
Hide file tree
Showing 6 changed files with 889 additions and 53 deletions.
59 changes: 18 additions & 41 deletions plugins/modules/snapshot.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/python
# Copyright: (c) 2019-2024, Dell Technologies
# Copyright: (c) 2019-2025, Dell Technologies

# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

Expand Down Expand Up @@ -336,7 +336,9 @@ def validate_expiration_timestamp(self, expiration_timestamp):
def get_filesystem_snapshot_details(self, snapshot_name):
"""Returns details of a filesystem Snapshot"""
try:
return self.snapshot_api.get_snapshot_snapshot(snapshot_name)
snapshot_details = self.snapshot_api.get_snapshot_snapshot(snapshot_name)
if snapshot_details:
return snapshot_details.to_dict()
except utils.ApiException as e:
if str(e.status) == "404":
log_msg = "Snapshot {0} status is " \
Expand All @@ -352,12 +354,6 @@ def get_filesystem_snapshot_details(self, snapshot_name):
LOG.error(error_message)
self.module.fail_json(msg=error_message)

except Exception as e:
error_message = "Failed to get details of Snapshot {0} with" \
" error {1} ".format(snapshot_name, str(e))
LOG.error(error_message)
self.module.fail_json(msg=error_message)

def get_zone_base_path(self, access_zone):
"""Returns the base path of the Access Zone."""
try:
Expand Down Expand Up @@ -399,7 +395,7 @@ def create_filesystem_snapshot(self, snapshot_name,
"snapshot creation")

if desired_retention and desired_retention.lower() != 'none':
if retention_unit is None:
if retention_unit is None or retention_unit == 'hours':
expiration_timestamp = (datetime.utcnow() +
timedelta(
hours=int(desired_retention))
Expand All @@ -414,12 +410,6 @@ def create_filesystem_snapshot(self, snapshot_name,
epoch_expiry_time = calendar.timegm(
time.strptime(str(expiration_timestamp),
'%Y-%m-%d %H:%M:%S.%f'))
elif retention_unit == 'hours':
expiration_timestamp = (datetime.utcnow() + timedelta(
hours=int(desired_retention)))
epoch_expiry_time = calendar.timegm(
time.strptime(str(expiration_timestamp),
'%Y-%m-%d %H:%M:%S.%f'))

elif desired_retention and \
desired_retention.lower() == 'none':
Expand Down Expand Up @@ -456,10 +446,6 @@ def delete_filesystem_snapshot(self, snapshot_name):

def rename_filesystem_snapshot(self, snapshot, new_name):
"""Renames a filesystem snapshot"""
if snapshot is None:
self.module.fail_json(msg="Snapshot not found.")

snapshot = snapshot.to_dict()

if snapshot['snapshots'][0]['name'] == new_name:
return False
Expand Down Expand Up @@ -517,7 +503,7 @@ def check_snapshot_modified(self, snapshot, alias,
snapshot_modification_details['is_timestamp_modified'] = False
snapshot_modification_details['new_expiration_timestamp_value'] = None

snap_details = snapshot.to_dict()
snap_details = snapshot

if effective_path is not None:
if self.module.params['path'] and \
Expand Down Expand Up @@ -549,7 +535,7 @@ def check_snapshot_modified(self, snapshot, alias,
# creation timestamp of the snapshot to the desired retention
# specified in the Playbook.
if desired_retention and desired_retention.lower() != 'none':
if retention_unit is None:
if retention_unit is None or retention_unit == 'hours':
expiration_timestamp = \
datetime.fromtimestamp(snap_creation_timestamp) + \
timedelta(hours=int(desired_retention))
Expand All @@ -562,12 +548,6 @@ def check_snapshot_modified(self, snapshot, alias,
timedelta(days=int(desired_retention))
expiration_timestamp = \
time.mktime(expiration_timestamp.timetuple())
elif retention_unit == 'hours':
expiration_timestamp = \
datetime.fromtimestamp(snap_creation_timestamp) + \
timedelta(hours=int(desired_retention))
expiration_timestamp = \
time.mktime(expiration_timestamp.timetuple())
elif desired_retention and desired_retention.lower() == 'none':
expiration_timestamp = None
info_message = "The new expiration " \
Expand Down Expand Up @@ -610,8 +590,11 @@ def check_snapshot_modified(self, snapshot, alias,
# This is the case when expiration timestamp may not be present
# in the snapshot details.
# Expiration timestamp specified in the playbook is not None.
elif 'expires' not in snap_details['snapshots'][0] \
and expiration_timestamp is not None:
elif ('expires' not in snap_details['snapshots'][0]
and expiration_timestamp is not None) or \
('expires' in snap_details['snapshots'][0] and
snap_details['snapshots'][0]['expires'] is
None and expiration_timestamp is not None):
snapshot_modification_details['is_timestamp_modified'] = True
snapshot_modification_details[
'new_expiration_timestamp_value'] = expiration_timestamp
Expand All @@ -627,13 +610,6 @@ def check_snapshot_modified(self, snapshot, alias,
snapshot_modification_details[
'new_expiration_timestamp_value'] = expiration_timestamp
modified = True
elif 'expires' in snap_details['snapshots'][0] and \
snap_details['snapshots'][0]['expires'] is \
None and expiration_timestamp is not None:
snapshot_modification_details['is_timestamp_modified'] = True
snapshot_modification_details[
'new_expiration_timestamp_value'] = expiration_timestamp
modified = True

snapshot_alias = self.get_snapshot_alias(snapshot_name)

Expand All @@ -660,10 +636,11 @@ def modify_filesystem_snapshot(self, snapshot_name,
new_timestamp = \
snapshot_modification_details[
'new_expiration_timestamp_value']
snapshot_update_param = self.isi_sdk.SnapshotSnapshot(
expires=int(new_timestamp))
self.snapshot_api.update_snapshot_snapshot(
snapshot_update_param, snapshot_name)
if new_timestamp is not None:
snapshot_update_param = self.isi_sdk.SnapshotSnapshot(
expires=int(new_timestamp))
self.snapshot_api.update_snapshot_snapshot(
snapshot_update_param, snapshot_name)
changed = True
if snapshot_modification_details['is_alias_modified']:
new_alias = \
Expand Down Expand Up @@ -820,7 +797,7 @@ def perform_module_operation(self):
'{0} details'.format(snapshot_name)
LOG.info(info_message)
result['snapshot_details'] = \
self.get_filesystem_snapshot_details(snapshot_name).to_dict()
self.get_filesystem_snapshot_details(snapshot_name)

# Finally update the module result!
self.module.exit_json(**result)
Expand Down
12 changes: 0 additions & 12 deletions plugins/modules/synciqjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,6 @@ def get_job_details(self, job_id):
(error_obj=e))
LOG.error(error_message)
self.module.fail_json(msg=error_message)
except Exception as e:
error_message = 'Get details of SyncIQ job %s failed with ' \
'error: %s' % (job_id, utils.determine_error
(error_obj=e))
LOG.error(error_message)
self.module.fail_json(msg=error_message)

def modify_job_state(self, job_id, job_state):
"""
Expand All @@ -334,12 +328,6 @@ def modify_job_state(self, job_id, job_state):
(error_obj=e))
LOG.error(error_message)
self.module.fail_json(msg=error_message)
except Exception as e:
error_message = 'Modify state of SyncIQ job %s failed with ' \
'error: %s' % (job_id, utils.determine_error
(error_obj=e))
LOG.error(error_message)
self.module.fail_json(msg=error_message)

def validate_module(self, job_id, job_state, state):
""" Validates the SyncIQ jobs module """
Expand Down
133 changes: 133 additions & 0 deletions tests/unit/plugins/module_utils/mock_snapshot_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Copyright: (c) 2025, Dell Technologies

# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

"""Mock API responses for PowerScale Snapshot module"""

from __future__ import (absolute_import, division, print_function)

__metaclass__ = type

MODULE_UTILS_PATH = 'ansible_collections.dellemc.powerscale.plugins.modules.snapshot.utils'

SNAPSHOT = {
"snapshots": [
{
"alias": "alias_name_1",
"created": 1628155527,
"expires": 10,
"has_locks": False,
"id": 936,
"name": "ansible_test_snapshot",
"path": "/ifs/ansible_test_snapshot",
"pct_filesystem": 2.435778242215747e-06,
"pct_reserve": 0.0,
"schedule": None,
"shadow_bytes": 0,
"size": 4096,
"state": "active",
"target_id": None,
"target_name": None
}
]
}

SNAPSHOT_WO_EXPIRES = {
"snapshots": [
{
"alias": "alias_name_1",
"created": 1628155527,
"has_locks": False,
"id": 936,
"name": "ansible_test_snapshot",
"path": "/ifs/ansible_test_snapshot",
"pct_filesystem": 2.435778242215747e-06,
"pct_reserve": 0.0,
"schedule": None,
"shadow_bytes": 0,
"size": 4096,
"state": "active",
"target_id": None,
"target_name": None
}
]
}

ALIAS = {
"snapshots": [
{
"target_name": "ansible_test_snapshot",
"name": "alias_name_1"
}
]
}

CREATE_SNAPSHOT_PARAMS = {
"name": "ansible_test_snapshot",
"path": "/ifs/ansible_test_snapshot",
"alias": "snap_alias_1",
"expires": 60}

MODIFY_SNAPSHOT_PARAMS = {"expires": 60}

RENAME_SNAPSHOT_PARAMS = {"name": "renamed_snapshot_name_1"}


def create_snapshot_failed_msg():
return 'Failed to create snapshot'


def modify_snapshot_failed_msg():
return 'Failed to modify snapshot'


def rename_snapshot_failed_msg():
return 'Failed to rename snapshot'


def invalid_access_zone_failed_msg():
return 'Unable to fetch base path of Access Zone invalid_zone ,failed with error: SDK Error message'


def get_snapshot_wo_name_failed_msg():
return 'Please provide a valid snapshot name'


def modify_snapshot_wo_desired_retention_failed_msg():
return 'Specify desired retention along with retention unit.'


def delete_snapshot_exception_failed_msg():
return 'Failed to delete snapshot'


def get_snapshot_alias_failed_msg():
return 'Failed to get alias for snapshot'


def create_snapshot_wo_retention_failed_msg():
return 'Please provide either desired_retention or expiration_timestamp for creating a snapshot'


def create_snapshot_with_new_name_failed_msg():
return 'Invalid param: new_name while creating a new snapshot.'


def create_snapshot_without_path_failed_msg():
return 'Please provide a valid path for snapshot creation'


def create_snapshot_wo_desired_retention_failed_msg():
return 'Desired retention is set to'


def create_snapshot_invalid_desired_retention_failed_msg():
return 'Please provide a valid integer as the desired retention.'


def modify_non_existing_path_failed_msg():
return 'specified in the playbook does not match the path of the snapshot'


def get_snapshot_failed_msg():
return 'Failed to get details of Snapshot'
Loading

0 comments on commit 7c09d52

Please sign in to comment.