Skip to content

Commit

Permalink
Add a view that audits all collaborative analysis workspaces
Browse files Browse the repository at this point in the history
  • Loading branch information
amstilp committed Jan 25, 2024
1 parent 404c099 commit ecbfe25
Show file tree
Hide file tree
Showing 4 changed files with 374 additions and 2 deletions.
299 changes: 298 additions & 1 deletion primed/collaborative_analysis/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ def test_creates_workspace(self):
self.assertEqual(new_workspace_data.workspace, new_workspace)


class CollaborativeAnalysisWorkspaceAuditTest(TestCase):
class WorkspaceAuditTest(TestCase):
"""Tests for the CollaborativeAnalysisWorkspaceAuditTest view."""

def setUp(self):
Expand Down Expand Up @@ -684,3 +684,300 @@ def test_context_error_table_group_in_auth_domain(self):
audit.CollaborativeAnalysisWorkspaceAccessAudit.UNEXPECTED_GROUP_ACCESS,
)
self.assertIsNotNone(table.rows[0].get_cell_value("action"))


class WorkspaceAuditAllTest(TestCase):
"""Tests for the CollaborativeAnalysisWorkspaceAuditAllTest view."""

def setUp(self):
"""Set up test class."""
self.factory = RequestFactory()
# Create a user with both view and edit permission.
self.user = User.objects.create_user(username="test", password="test")
self.user.user_permissions.add(
Permission.objects.get(
codename=AnVILProjectManagerAccess.STAFF_VIEW_PERMISSION_CODENAME
)
)

def get_url(self, *args):
"""Get the url for the view being tested."""
return reverse(
"collaborative_analysis:workspaces:audit_all",
args=args,
)

def get_view(self):
"""Return the view being tested."""
return views.WorkspaceAuditAll.as_view()

def test_view_redirect_not_logged_in(self):
"View redirects to login view when user is not logged in."
# Need a client for redirects.
response = self.client.get(self.get_url())
self.assertRedirects(
response,
resolve_url(settings.LOGIN_URL) + "?next=" + self.get_url(),
)

def test_status_code_with_user_permission_view(self):
"""Returns successful response code if the user has view permission."""
request = self.factory.get(self.get_url())
request.user = self.user
response = self.get_view()(request)
self.assertEqual(response.status_code, 200)

def test_access_without_user_permission(self):
"""Raises permission denied if user has no permissions."""
user_no_perms = User.objects.create_user(
username="test-none", password="test-none"
)
request = self.factory.get(self.get_url())
request.user = user_no_perms
with self.assertRaises(PermissionDenied):
self.get_view()(request)

def test_context_data_access_audit_no_workspaces(self):
"""The data_access_audit exists in the context."""
self.client.force_login(self.user)
response = self.client.get(self.get_url())
self.assertIn("data_access_audit", response.context_data)
self.assertIsInstance(
response.context_data["data_access_audit"],
audit.CollaborativeAnalysisWorkspaceAccessAudit,
)
self.assertTrue(response.context_data["data_access_audit"].completed)
qs = response.context_data["data_access_audit"].queryset
self.assertEqual(len(qs), 0)

def test_context_data_access_audit_one_workspace(self):
"""The data_access_audit exists in the context."""
instance = factories.CollaborativeAnalysisWorkspaceFactory.create()
self.client.force_login(self.user)
response = self.client.get(self.get_url())
self.assertIn("data_access_audit", response.context_data)
self.assertIsInstance(
response.context_data["data_access_audit"],
audit.CollaborativeAnalysisWorkspaceAccessAudit,
)
self.assertTrue(response.context_data["data_access_audit"].completed)
qs = response.context_data["data_access_audit"].queryset
self.assertEqual(len(qs), 1)
self.assertIn(instance, qs)

def test_context_data_access_audit_two_workspaces(self):
"""The data_access_audit exists in the context."""
instance_1 = factories.CollaborativeAnalysisWorkspaceFactory.create()
instance_2 = factories.CollaborativeAnalysisWorkspaceFactory.create()
self.client.force_login(self.user)
response = self.client.get(self.get_url())
self.assertIn("data_access_audit", response.context_data)
self.assertIsInstance(
response.context_data["data_access_audit"],
audit.CollaborativeAnalysisWorkspaceAccessAudit,
)
self.assertTrue(response.context_data["data_access_audit"].completed)
qs = response.context_data["data_access_audit"].queryset
self.assertEqual(len(qs), 2)
self.assertIn(instance_1, qs)
self.assertIn(instance_2, qs)

def test_context_verified_table_access(self):
"""verified_table shows a record when audit has one account with verified access."""
instance = factories.CollaborativeAnalysisWorkspaceFactory.create()
# Create accounts.
account = AccountFactory.create()
# Set up source workspaces.
source_workspace = dbGaPWorkspaceFactory.create()
instance.source_workspaces.add(source_workspace.workspace)
# Analyst group membership.
GroupAccountMembershipFactory.create(
group=instance.analyst_group, account=account
)
# Source workspace auth domains membership.
GroupAccountMembershipFactory.create(
group=source_workspace.workspace.authorization_domains.first(),
account=account,
)
# CollaborativeAnalysisWorkspace auth domain membership.
GroupAccountMembershipFactory.create(
group=instance.workspace.authorization_domains.first(),
account=account,
)
# Check the table in the context.
self.client.force_login(self.user)
response = self.client.get(self.get_url())
self.assertIn("verified_table", response.context_data)
table = response.context_data["verified_table"]
self.assertIsInstance(
table,
audit.AccessAuditResultsTable,
)
self.assertEqual(len(table.rows), 1)
self.assertEqual(
table.rows[0].get_cell_value("workspace"),
instance,
)
self.assertEqual(table.rows[0].get_cell_value("member"), account)
self.assertEqual(
table.rows[0].get_cell_value("note"),
audit.CollaborativeAnalysisWorkspaceAccessAudit.IN_SOURCE_AUTH_DOMAINS,
)
self.assertIsNone(table.rows[0].get_cell_value("action"))

def test_context_verified_table_no_access(self):
"""verified_table shows a record when audit has one account with verified no access."""
instance = factories.CollaborativeAnalysisWorkspaceFactory.create()
# Create accounts.
account = AccountFactory.create()
# Set up source workspaces.
source_workspace = dbGaPWorkspaceFactory.create()
instance.source_workspaces.add(source_workspace.workspace)
# Analyst group membership.
GroupAccountMembershipFactory.create(
group=instance.analyst_group, account=account
)
# Source workspace auth domains membership.
# GroupAccountMembershipFactory.create(
# group=source_workspace.workspace.authorization_domains.first(),
# account=account,
# )
# CollaborativeAnalysisWorkspace auth domain membership.
# GroupAccountMembershipFactory.create(
# group=instance.workspace.authorization_domains.first(), account=account
# )
# Check the table in the context.
self.client.force_login(self.user)
response = self.client.get(self.get_url())
self.assertIn("verified_table", response.context_data)
table = response.context_data["verified_table"]
self.assertIsInstance(
table,
audit.AccessAuditResultsTable,
)
self.assertEqual(len(table.rows), 1)
self.assertEqual(
table.rows[0].get_cell_value("workspace"),
instance,
)
self.assertEqual(table.rows[0].get_cell_value("member"), account)
self.assertEqual(
table.rows[0].get_cell_value("note"),
audit.CollaborativeAnalysisWorkspaceAccessAudit.NOT_IN_SOURCE_AUTH_DOMAINS,
)
self.assertIsNone(table.rows[0].get_cell_value("action"))

def test_context_needs_action_table_grant(self):
"""needs_action_table shows a record when audit finds that access needs to be granted."""
instance = factories.CollaborativeAnalysisWorkspaceFactory.create()
# Create accounts.
account = AccountFactory.create()
# Set up source workspaces.
source_workspace = dbGaPWorkspaceFactory.create()
instance.source_workspaces.add(source_workspace.workspace)
# Analyst group membership.
GroupAccountMembershipFactory.create(
group=instance.analyst_group, account=account
)
# Source workspace auth domains membership.
GroupAccountMembershipFactory.create(
group=source_workspace.workspace.authorization_domains.first(),
account=account,
)
# CollaborativeAnalysisWorkspace auth domain membership.
# GroupAccountMembershipFactory.create(
# group=self.collaborative_analysis_workspace.workspace.authorization_domains.first(), account=account
# )
# Check the table in the context.
self.client.force_login(self.user)
response = self.client.get(self.get_url())
self.assertIn("needs_action_table", response.context_data)
table = response.context_data["needs_action_table"]
self.assertIsInstance(
table,
audit.AccessAuditResultsTable,
)
self.assertEqual(len(table.rows), 1)
self.assertEqual(
table.rows[0].get_cell_value("workspace"),
instance,
)
self.assertEqual(table.rows[0].get_cell_value("member"), account)
self.assertEqual(
table.rows[0].get_cell_value("note"),
audit.CollaborativeAnalysisWorkspaceAccessAudit.IN_SOURCE_AUTH_DOMAINS,
)
self.assertIsNotNone(table.rows[0].get_cell_value("action"))

def test_context_needs_action_table_remove(self):
"""needs_action_table shows a record when audit finds that access needs to be removed."""
instance = factories.CollaborativeAnalysisWorkspaceFactory.create()
# Create accounts.
account = AccountFactory.create()
# Set up source workspaces.
source_workspace = dbGaPWorkspaceFactory.create()
instance.source_workspaces.add(source_workspace.workspace)
# Analyst group membership.
GroupAccountMembershipFactory.create(
group=instance.analyst_group, account=account
)
# Source workspace auth domains membership.
# GroupAccountMembershipFactory.create(
# group=source_workspace.workspace.authorization_domains.first(),
# account=account,
# )
# CollaborativeAnalysisWorkspace auth domain membership.
GroupAccountMembershipFactory.create(
group=instance.workspace.authorization_domains.first(),
account=account,
)
# Check the table in the context.
self.client.force_login(self.user)
response = self.client.get(self.get_url())
self.assertIn("needs_action_table", response.context_data)
table = response.context_data["needs_action_table"]
self.assertIsInstance(
table,
audit.AccessAuditResultsTable,
)
self.assertEqual(len(table.rows), 1)
self.assertEqual(
table.rows[0].get_cell_value("workspace"),
instance,
)
self.assertEqual(table.rows[0].get_cell_value("member"), account)
self.assertEqual(
table.rows[0].get_cell_value("note"),
audit.CollaborativeAnalysisWorkspaceAccessAudit.NOT_IN_SOURCE_AUTH_DOMAINS,
)
self.assertIsNotNone(table.rows[0].get_cell_value("action"))

def test_context_error_table_group_in_auth_domain(self):
"""error shows a record when audit finds a group in the auth domain."""
instance = factories.CollaborativeAnalysisWorkspaceFactory.create()
# Create accounts.
group = ManagedGroupFactory.create()
GroupGroupMembershipFactory.create(
parent_group=instance.workspace.authorization_domains.first(),
child_group=group,
)
# Check the table in the context.
self.client.force_login(self.user)
response = self.client.get(self.get_url())
self.assertIn("errors_table", response.context_data)
table = response.context_data["errors_table"]
self.assertIsInstance(
table,
audit.AccessAuditResultsTable,
)
self.assertEqual(len(table.rows), 1)
self.assertEqual(
table.rows[0].get_cell_value("workspace"),
instance,
)
self.assertEqual(table.rows[0].get_cell_value("member"), group)
self.assertEqual(
table.rows[0].get_cell_value("note"),
audit.CollaborativeAnalysisWorkspaceAccessAudit.UNEXPECTED_GROUP_ACCESS,
)
self.assertIsNotNone(table.rows[0].get_cell_value("action"))
5 changes: 5 additions & 0 deletions primed/collaborative_analysis/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

collaborative_analysis_workspace_patterns = (
[
path(
"audit/",
views.WorkspaceAuditAll.as_view(),
name="audit_all",
),
path(
"<slug:billing_project_slug>/<slug:workspace_slug>/audit/",
views.WorkspaceAudit.as_view(),
Expand Down
23 changes: 22 additions & 1 deletion primed/collaborative_analysis/views.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from anvil_consortium_manager.auth import AnVILConsortiumManagerStaffViewRequired
from django.http import Http404
from django.utils.translation import gettext_lazy as _
from django.views.generic import DetailView
from django.views.generic import DetailView, TemplateView

from . import audit, models

Expand Down Expand Up @@ -46,3 +46,24 @@ def get_context_data(self, **kwargs):
context["needs_action_table"] = data_access_audit.get_needs_action_table()
context["data_access_audit"] = data_access_audit
return context


class WorkspaceAuditAll(AnVILConsortiumManagerStaffViewRequired, TemplateView):
"""View to show audit results for all `CollaborativeAnalysisWorkspace` objects."""

template_name = (
"collaborative_analysis/collaborativeanalysisworkspace_audit_all.html"
)

def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
# Run the audit
data_access_audit = audit.CollaborativeAnalysisWorkspaceAccessAudit(
queryset=models.CollaborativeAnalysisWorkspace.objects.all()
)
data_access_audit.run_audit()
context["verified_table"] = data_access_audit.get_verified_table()
context["errors_table"] = data_access_audit.get_errors_table()
context["needs_action_table"] = data_access_audit.get_needs_action_table()
context["data_access_audit"] = data_access_audit
return context
Loading

0 comments on commit ecbfe25

Please sign in to comment.