diff --git a/.gitignore b/.gitignore index 7bbc71c..0bc649a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# IDE files +.vscode/ + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/conftest.py b/conftest.py index 164fb76..6b4211e 100644 --- a/conftest.py +++ b/conftest.py @@ -1,5 +1,6 @@ import argparse import datetime +from typing import Any import pytest @@ -10,6 +11,7 @@ from gcp.client import GCPClient from gsuite.client import GsuiteClient from heroku.client import HerokuAdminClient +from github.client import GitHubClient import custom_config @@ -18,6 +20,15 @@ gcp_client = None gsuite_client = None heroku_client = None +github_client = None + +# globals in conftest.py are hard to import from several levels down, so provide access function +def get_client(client_name: str) -> Any: + # restrict to variables with defined suffix + suffix = "_client" + if client_name.endswith(suffix): + client_name = client_name[: -len(suffix)] + return globals()[f"{client_name}_client"] def pytest_addoption(parser): @@ -69,6 +80,7 @@ def pytest_configure(config): global gcp_client global gsuite_client global heroku_client + global github_client # monkeypatch cache.set to serialize datetime.datetime's patch_cache_set(config) @@ -102,6 +114,11 @@ def pytest_configure(config): offline=config.getoption("--offline"), ) + github_client = GitHubClient( + debug_calls=config.getoption("--debug-calls"), + offline=config.getoption("--offline"), + ) + config.custom_config = custom_config.CustomConfig(config.getoption("--config")) try: diff --git a/github/branches/conftest.py b/github/branches/conftest.py index 8f8d110..572ec16 100755 --- a/github/branches/conftest.py +++ b/github/branches/conftest.py @@ -5,37 +5,38 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. -# Fixtures to fetch data for the various GitHub branch checks +# PyTest support for the various GitHub branch checks -# TODO: -# - convert to logger output -# - add sleep_* for 'core' functionality +# TODO convert to logger output +# TODO add sleep_* for 'core' functionality -# from datetime import datetime from functools import lru_cache import os import pathlib - -# import time from typing import List - -# import sys import subprocess import pytest -# from sgqlc.operation import Operation # noqa: I900 from sgqlc.endpoint.http import HTTPEndpoint # noqa: I900 -# from github_schema import github_schema as schema # noqa: I900 - -# import branch_check.retrieve_github_data as retrieve_github_data from . import retrieve_github_data +# Needed to dynamically grab globals +import conftest + -# @pytest.fixture(scope="session") def repos_to_check() -> List[str]: # just shell out for now + # TODO: fix ickiness + # While there is no network operation done here, we don't want to go + # poking around the file system if we're in "--offline" mode + # (e.g. doctest mode) + global github_client + if conftest.get_client("github").is_offline(): + return [] + + # real work path_to_metadata = os.environ["PATH_TO_METADATA"] meta_dir = pathlib.Path(os.path.expanduser(path_to_metadata)).resolve() in_files = list(meta_dir.glob("*.json")) @@ -64,8 +65,10 @@ def repos_to_check() -> List[str]: ] +# we expect to (eventually) make multiple tests against the same branch data @lru_cache(maxsize=32) def get_branch_info(gql_connection, repo_full_name: str) -> str: + assert False repo_info = retrieve_github_data.get_repo_branch_protections( gql_connection, repo_full_name ) @@ -73,8 +76,5 @@ def get_branch_info(gql_connection, repo_full_name: str) -> str: if __name__ == "__main__": - if os.environ.get("DEBUG"): - print(repos_to_check()) - # for name in sys.argv[1:]: - # data = get_branch_info(gql_connection(), name) - # print(data) + # TODO add doctests + pass diff --git a/github/branches/retrieve_github_data.py b/github/branches/retrieve_github_data.py index 279edf5..7c8e4b8 100644 --- a/github/branches/retrieve_github_data.py +++ b/github/branches/retrieve_github_data.py @@ -1,12 +1,14 @@ #!/usr/bin/env python3 - +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. """ Collect Information about branches sufficient to check for all branch protection guideline compliance. """ +# TODO add doctests -# import datetime import csv from functools import lru_cache import logging @@ -15,12 +17,6 @@ import sys from typing import Any, List -# import re - -# import sys - -# from collections import OrderedDict - from sgqlc.operation import Operation # noqa: I900 from sgqlc.endpoint.http import HTTPEndpoint # noqa: I900 @@ -30,6 +26,7 @@ DEFAULT_GRAPHQL_ENDPOINT = "https://api.github.com/graphql" EXTENSION_TO_STRIP = ".git" +# TODO use logger logger = logging.getLogger(__name__) @@ -140,6 +137,7 @@ def create_operation(owner, name): # we only get one item at a time to # simplify getting all. # N.B. anything we can get multiple of, we need to gather the 'id' + # as well, in case pagination is needed branch_protection = repo.branch_protection_rules(first=10) branch_protection.total_count() branch_protection.page_info.__fields__(end_cursor=True, has_next_page=True) @@ -170,12 +168,11 @@ def create_rule_query(): op = Operation(schema.Query) node = op.branch_protection_rules.nodes(cursor="$LAST_CURSOR") - # node.__fields__(is_admin_enforced=True, id=True, pattern=True) _add_protection_fields(node) return op -# Should be able to produce iterator for lowest level data +# TODO Should be able to produce iterator for lowest level data def get_nested_branch_data(endpoint, reponame): @@ -183,7 +180,6 @@ def get_nested_branch_data(endpoint, reponame): op = create_operation(owner, name) logger.info("Downloading base information from %s", endpoint) - # logger.debug("Operation:\n%s", op) d = endpoint(op) errors = d.get("errors") @@ -206,8 +202,10 @@ def _more_to_do(cur_result, fake_new_page=False): ) return has_more_rules or has_more_refs + # TODO determine better way to test fake_next_page = False + # TODO ensure errors are reported out to pytest when invoked from there while _more_to_do(repodata, fake_next_page): fake_next_page = False # Need to work from inside out. @@ -283,7 +281,7 @@ def csv_output(data, csv_writer) -> None: csv_writer.writerow(line) -def parse_args(**kwargs): +def parse_args(): import argparse ap = argparse.ArgumentParser(description="GitHub Agile Dashboard") diff --git a/github/branches/test_branch_protection.py b/github/branches/test_branch_protection.py index 2463458..5fa1ec8 100644 --- a/github/branches/test_branch_protection.py +++ b/github/branches/test_branch_protection.py @@ -5,7 +5,8 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. -from typing import Any +from github.branches.validate_compliance import Criteria +from typing import Any, Optional from .retrieve_github_data import get_repo_branch_protections from . import validate_compliance @@ -15,10 +16,20 @@ import pytest -@pytest.mark.parametrize("repo_to_check", repos_to_check()) -@pytest.mark.parametrize("criteria", validate_compliance.required_criteria) +def idfn(val: Any) -> Optional[str]: + string = None + if isinstance(val, (str,)): + if val.startswith("https://"): + string = "/".join(val.split("/")[3:5]) + return string + + +@pytest.mark.parametrize("repo_to_check", repos_to_check(), ids=idfn) +@pytest.mark.parametrize( + "criteria", validate_compliance.required_criteria, ids=Criteria.idfn +) def test_required_protections( - gql_connection: Any, repo_to_check: str, criteria: str + gql_connection: Any, repo_to_check: str, criteria: Criteria ) -> None: line = repo_to_check # for line in repos_to_check: diff --git a/github/branches/validate_compliance.py b/github/branches/validate_compliance.py index d90fe34..ee40034 100644 --- a/github/branches/validate_compliance.py +++ b/github/branches/validate_compliance.py @@ -5,21 +5,41 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. -from typing import List +from typing import Any, List, Optional, Tuple +from dataclasses import dataclass from .retrieve_github_data import RepoBranchProtections, BranchProtectionRule + +@dataclass +class Criteria: + standard_number: str # as defined in messages file. alpha-numeric + slug: str # id to match. alpha-numeric + description: str # whatever you want + + @staticmethod + def idfn(val: Any) -> Optional[str]: + """ provide ID for pytest Parametrization + """ + if isinstance(val, (Criteria,)): + return f"{val.standard_number}-{val.slug}" + return None + + def __str__(self: Any) -> str: + return f"{self.standard_number} {self.description}" + + # define the criteria we care about. Identify each critera with a string that will # appear in the results. required_criteria = [ - "admins restricted", + Criteria("SOGH001b", "admins", "admins not restricted"), ] optional_criteria = [ - "limited commiters", + Criteria("SOGH001c", "commiters", "allowed commiters not configured"), # "commit signing", # may not be knowable ] warning_criteria = [ - "rule conflicts", + Criteria("SOGH001d", "conflicts", "Conflict in Protection Rules"), ] @@ -35,14 +55,14 @@ def find_applicable_rules( return result -def meets_criteria(protections: List[BranchProtectionRule], criteria: str) -> bool: +def meets_criteria(protections: List[BranchProtectionRule], criteria: Criteria) -> bool: met = True # ugly implementation for now - if criteria == "admins restricted": + if criteria.slug == "admins": met = all(r.is_admin_enforced for r in protections) - elif criteria == "limited commiters": + elif criteria.slug == "commiters": met = all(r.push_actor_count > 0 for r in protections) - elif criteria == "rule conflicts": + elif criteria.slug == "conflicts": met = all(r.rule_conflict_count == 0 for r in protections) else: met = False @@ -50,7 +70,7 @@ def meets_criteria(protections: List[BranchProtectionRule], criteria: str) -> bo def validate_branch_protections( - data: RepoBranchProtections, branch: str, criteria: str + data: RepoBranchProtections, branch: str, criteria: Criteria, ) -> List[str]: """ Validate the protections diff --git a/github/client.py b/github/client.py new file mode 100644 index 0000000..675275f --- /dev/null +++ b/github/client.py @@ -0,0 +1,32 @@ +# Classes required by Frost +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +from typing import Optional + + +class GitHubClient: + + _debug_calls: bool = False + _offline: bool = False + + @classmethod + def debug_calls(cls) -> bool: + return cls._debug_calls + + @classmethod + def is_offline(cls) -> bool: + return cls._offline + + @classmethod + def update(cls, debug_calls: Optional[bool] = None, offline: Optional[bool] = None): + # allow updates + if debug_calls: + cls._debug_calls = debug_calls + if offline: + cls._offline = offline + + def __init__( + self, debug_calls: Optional[bool] = None, offline: Optional[bool] = None + ): + self.update(debug_calls, offline) diff --git a/github/conftest.py b/github/conftest.py index 2ab30f7..78c5749 100755 --- a/github/conftest.py +++ b/github/conftest.py @@ -7,42 +7,44 @@ # Fixtures to fetch data for the various GitHub branch checks -# TODO: -# - convert to logger output -# - add sleep_* for 'core' functionality +# TODO: add doctests +# TODO: convert to logger output +# TODO: add sleep_* for 'core' functionality +from conftest import github_client from functools import lru_cache import logging import os -import pathlib - from typing import List -import subprocess - import pytest -# from sgqlc.operation import Operation # noqa: I900 from sgqlc.endpoint.http import HTTPEndpoint # noqa: I900 +import conftest logger = logging.getLogger(__name__) # Data to move to config DEFAULT_GRAPHQL_ENDPOINT = "https://api.github.com/graphql" -EXTENSION_TO_STRIP = ".git" - -# check for all required environment variables so we can fail fast -os.environ["PATH_TO_METADATA"] -os.environ["GH_TOKEN"] - # Data collection routines -- these likely should be a separate python # package, as they are useful outside of frost as written @pytest.fixture(scope="session", autouse=True) def gql_connection(): - token = os.environ["GH_TOKEN"] - endpoint = HTTPEndpoint( - DEFAULT_GRAPHQL_ENDPOINT, {"Authorization": "bearer " + token,}, - ) + # Frost integration -- this routine controls all of our real system access, + # so we must honor the --offline option to support doctests + if not conftest.github_client.is_offline(): + # check for all required environment variables so we can fail fast + # however, we only check once inside a session. This allows import + # of this module in other contexts, such as running doctests, + # without irrelevant configuration + os.environ["PATH_TO_METADATA"] + + token = os.environ["GH_TOKEN"] + endpoint = HTTPEndpoint( + DEFAULT_GRAPHQL_ENDPOINT, {"Authorization": "bearer " + token,}, + ) + else: + endpoint = {} # tack on error reporting so it's available everywhere needed endpoint.report_download_errors = _report_download_errors return endpoint @@ -80,6 +82,3 @@ def _report_download_errors(errors): if __name__ == "__main__": if os.environ.get("DEBUG"): print(repos_to_check()) - # for name in sys.argv[1:]: - # data = get_branch_info(gql_connection(), name) - # print(data) diff --git a/github/manage_issues.py b/github/manage_issues.py index 13a634b..aecae17 100755 --- a/github/manage_issues.py +++ b/github/manage_issues.py @@ -12,26 +12,29 @@ import re from dataclasses import dataclass from pprint import pprint -from typing import List +from typing import List, Optional, Tuple import argcomplete _epilog = "" # constants -# Sample line +# Sample line: +# github/branches/test_branch_protection.py::test_required_protections[SOGH001b-admins-firefox-devtools/profiler-server.git,] SPEC_DECODER_RE = re.compile( r""" (?P[^:]+):: # path (?P\w+)\[ # method + (?P[^-]+)- # assumes no hyphen in standard (?P[^-]+)- # assumes no hyphen in test_name (?P[^]]+)\] """, re.VERBOSE, ) -# Sample line: +# Sample lines: # E AssertionError: ERROR:SOGH003:firefox-devtools doesn't meet two factor required - required\n assert False +# E AssertionError: ERROR:SOGH001:firefox-devtools/profiler-server:master has no SOGH001b admins not restricted rule\n assert False ASSERT_DECODER_RE = re.compile( r""" (E\s+AssertionError:)?\s* # preamble @@ -42,12 +45,30 @@ re.VERBOSE | re.MULTILINE, ) +# Work on "info" section of branch. Example: +# firefox-devtools/profiler-server:master +BRANCH_INFO_DECODER_RE = re.compile( + r""" + ^ + (?P[^/]+)/ # repo owner + (?P[^:]+): # repo name + (?P\S+) # branch name + $ + """, + re.VERBOSE, +) + @dataclass class Action: - owner: str - repo: str - branch: str + final_status: str = "" # after frost exemption processing + base_status: str = "" # native pytest status + owner: str = "" + repo: str = "" + branch: str = "" + standard: str = "" + summary: str = "" + messages: Optional[List[str]] = None def parse_action_string(name: str) -> List[str]: @@ -72,44 +93,78 @@ def infer_resource_type(path: str) -> str: return resource_type -def extract_standard(assert_msg: str) -> str: - """ - pull standard(s) out of the assert string - - TODO: - - support more than one result in string - """ - for item in ASSERT_DECODER_RE.finditer(assert_msg): - pprint(item.groupdict()["standard"]) - - def create_branch_action(action_spec: dict) -> Action: """Parse pytest info into information needed to open an issue against a specific branch""" - path, method, test_name, param_id = parse_action_string(action_spec["full_name"]) - standard = extract_standard(action_spec["longrepr"]) - url, branch = param_id.split(",") - owner, repo = url.split("/")[3:5] + # most information comes from the (parametrized) name of the test + test_info = action_spec["name"] + *_, standard, test_name, param_id = parse_action_string(test_info) + owner, repo = param_id.split("/") + + # details for branches come from the assertion text + branch_details = action_spec["call"]["longrepr"] + errors = [] + branch = "BoGuS" + for item in ASSERT_DECODER_RE.finditer(branch_details): + info = item.groupdict()["info"] + # further parse info + branch = "BoGuS" + matches = BRANCH_INFO_DECODER_RE.match(info) + if matches: + owner, repo, branch = matches.groups() + errors.append( + f"Branch {branch} of {owner}/{repo} failed {standard} {test_name}" + ) + + summary = f"{len(errors)} for {owner}/{repo}:{branch}" + final_status, base_status = get_status(action_spec) + action = Action( + final_status=final_status, + base_status=base_status, + owner=owner, + repo=repo, + branch=branch, + standard=standard, + summary=summary, + messages=errors, + ) + return action + + +def get_status(action_spec: dict) -> Tuple[str, str]: + final_status = action_spec["call"]["outcome"] + base_status = action_spec["metadata"][0]["outcome"] + return final_status, base_status def create_org_action(action_spec: dict) -> Action: """ Break out the org info from the json """ - found = False - for item in ASSERT_DECODER_RE.finditer(action_spec["longrepr"]): - pprint(item.groupdict()) - found = True - if not found: - raise KeyError(f"Malformed json {repr(action_spec)}") + # TODO check for outcome of xfailed (means exemption no longer needed) + # most information comes from the (parametrized) name of the test + test_info = action_spec["name"] + path, method, standard, test_name, param_id = parse_action_string(test_info) + org_full_name = param_id + summary = f"Org {org_full_name} failed {standard} {test_name}" + final_status, base_status = get_status(action_spec) + action = Action( + final_status=final_status, + base_status=base_status, + owner=org_full_name, + standard=standard, + summary=summary, + ) + return action def create_action_spec(action_spec: dict) -> Action: # for now, just return Action -- later decode may involve inferring what to - # do ("xpass" detection) - name = action_spec["full_name"] - path, _, _, _ = parse_action_string(name) + # do ("xpass" detection -- see GH-325) + # full name is file_path::method[test_name-parametrize_id] + name = action_spec["name"] + path, *_ = parse_action_string(name) resource_type = infer_resource_type(path) if resource_type == "orgs": action = create_org_action(action_spec) @@ -118,28 +173,11 @@ def create_action_spec(action_spec: dict) -> Action: else: raise TypeError(f"unknown resource type '{resource_type}' from '{name}") - # full name is file_path::method[test_name-parametrize_id] - pprint(action_spec) return action def parse_args(): parser = argparse.ArgumentParser(description=__doc__, epilog=_epilog) - # parser.add_argument( - # "--debug", action="store_true", help="include dump of all data returned" - # ) - # parser.add_argument("--owners", action="store_true", help="Also show owners") - # parser.add_argument("--email", action="store_true", help="include owner email") - # parser.add_argument( - # "--all-my-orgs", - # action="store_true", - # help="act on all orgs for which you're an owner", - # ) - # parser.add_argument( - # "--names-only", - # action="store_true", - # help="Only output your org names for which you're an owner", - # ) parser.add_argument("json_file", help="frost json output") argcomplete.autocomplete(parser) args = parser.parse_args() @@ -150,8 +188,13 @@ def parse_args(): args = parse_args() with open(args.json_file, "r") as jf: - issue_actions = json.loads(jf.read()) + pytest_report = json.loads(jf.read()) - print(f"Processing {len(issue_actions)}") + issue_actions = pytest_report["report"]["tests"] + print(f"Processing {len(issue_actions)} test results") for action_spec in issue_actions: + if action_spec["call"]["outcome"] == "passed": + continue action = create_action_spec(action_spec) + # TODO integrate actual issue handling + print(action) diff --git a/github/messages-github.yaml b/github/messages-github.yaml index 5d677de..bd45307 100644 --- a/github/messages-github.yaml +++ b/github/messages-github.yaml @@ -23,6 +23,8 @@ Standards: reopen message id: SOGH001a-1 still open message id: SOGH001a-2 admin set: SOGH001a-3 + - standard number: 1b + standard description: Admins exempted from branch protections - standard number: 2 # no longer used, as we don't keep history standard description: Regression in Production branch protection new message id: SOGH002-0 diff --git a/github/orgs/conftest.py b/github/orgs/conftest.py index 113f4a0..4116d4a 100755 --- a/github/orgs/conftest.py +++ b/github/orgs/conftest.py @@ -5,36 +5,33 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. -# Fixtures to fetch data for the various GitHub branch checks +# PyTest support for the various GitHub organization checks -# TODO: -# - convert to logger output -# - add sleep_* for 'core' functionality +# TODO: convert to logger output +# TODO: add sleep_* for 'core' functionality -# from datetime import datetime from functools import lru_cache import os import pathlib -# import time from typing import List, Set -# import sys import subprocess -import pytest - -# from sgqlc.operation import Operation # noqa: I900 from sgqlc.endpoint.http import HTTPEndpoint # noqa: I900 -# from github_schema import github_schema as schema # noqa: I900 -# import branch_check.retrieve_github_data as retrieve_github_data from . import retrieve_github_data +import conftest def orgs_to_check() -> Set[str]: # just shell out for now + # While there is no network operation done here, we don't want to go + # poking around the file system if we're in "--offline" mode + # (aka doctest mode) + if conftest.get_client("github_client").is_offline(): + return [] path_to_metadata = os.environ["PATH_TO_METADATA"] meta_dir = pathlib.Path(os.path.expanduser(path_to_metadata)).resolve() in_files = list(meta_dir.glob("*.json")) @@ -73,6 +70,3 @@ def get_branch_info(gql_connection, repo_full_name: str) -> str: if __name__ == "__main__": if os.environ.get("DEBUG"): print(repos_to_check()) - # for name in sys.argv[1:]: - # data = get_branch_info(gql_connection(), name) - # print(data) diff --git a/github/orgs/retrieve_github_data.py b/github/orgs/retrieve_github_data.py index ff875e5..7177915 100755 --- a/github/orgs/retrieve_github_data.py +++ b/github/orgs/retrieve_github_data.py @@ -1,12 +1,13 @@ #!/usr/bin/env python3 - +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. """ Collect Information about branches sufficient to check for all branch protection guideline compliance. """ -# import datetime import csv from functools import lru_cache import logging @@ -15,20 +16,12 @@ import sys from typing import Any, List -# import re - -# import sys - -# from collections import OrderedDict - from sgqlc.operation import Operation # noqa: I900 from sgqlc.endpoint.http import HTTPEndpoint # noqa: I900 -# from branch_check.github_schema import github_schema as schema # noqa: I900 from ..github_schema import github_schema as schema # noqa: I900 DEFAULT_GRAPHQL_ENDPOINT = "https://api.github.com/graphql" -EXTENSION_TO_STRIP = ".git" logger = logging.getLogger(__name__) @@ -56,12 +49,10 @@ def create_operation(owner): """ Create the default Query operation We build the structure for: - repository: - 0-n branch protections rules - flags - 0-n conflicts with other rules (we only count) - 0-n actors who can push (we only count) - 0-n branches with this protection + organization: + name (may contain spaces) + login (no spaces) + requires 2fa """ op = Operation(schema.Query) @@ -77,7 +68,6 @@ def create_operation(owner): def get_org_info(endpoint: Any, org: str) -> OrgInfo: op = create_operation(org) logger.info("Downloading base information from %s", endpoint) - # logger.debug("Operation:\n%s", op) d = endpoint(op) errors = d.get("errors") @@ -160,10 +150,10 @@ def main(): csv_out = csv.writer(sys.stdout) raise SystemExit("Not ready for CLI usage") # endpoint = get_gql_session(args.graphql_endpoint, args.token,) - csv_out.writerow(OrgInfo.csv_header()) - for org in args.orgs: - row_data = get_org_info(endpoint, org) - csv_output(row_data, csv_writer=csv_out) + # csv_out.writerow(OrgInfo.csv_header()) + # for org in args.orgs: + # row_data = get_org_info(endpoint, org) + # csv_output(row_data, csv_writer=csv_out) if __name__ == "__main__": diff --git a/github/orgs/test_two_factor_required.py b/github/orgs/test_two_factor_required.py index a18c27a..6c18747 100755 --- a/github/orgs/test_two_factor_required.py +++ b/github/orgs/test_two_factor_required.py @@ -5,7 +5,8 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. -from typing import Any +from github.orgs.validate_compliance import Criteria +from typing import Any, Optional from .retrieve_github_data import get_org_info from . import validate_compliance @@ -16,12 +17,13 @@ @pytest.mark.parametrize("org_to_check", orgs_to_check()) -@pytest.mark.parametrize("criteria", validate_compliance.required_criteria) -def test_require_2fa(gql_connection: Any, org_to_check: str, criteria: str) -> None: +@pytest.mark.parametrize( + "criteria", validate_compliance.required_criteria, ids=Criteria.idfn +) +def test_require_2fa( + gql_connection: Any, org_to_check: str, criteria: validate_compliance.Criteria +) -> None: info = get_org_info(gql_connection, f"{org_to_check}") - if not info: - assert False, f"ERROR: organization '{orgs_to_check}' not accessible" - f"ERROR:SOGH001:{data.name_with_owner}:{branch} has no {criteria} rule" - else: + if info: met, message = validate_compliance.validate_org_info(info, criteria) assert met, message diff --git a/github/orgs/validate_compliance.py b/github/orgs/validate_compliance.py index 1ad85be..66973c8 100644 --- a/github/orgs/validate_compliance.py +++ b/github/orgs/validate_compliance.py @@ -5,32 +5,52 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. -from typing import List +from typing import Any, List, Optional, Tuple +from dataclasses import dataclass from .retrieve_github_data import OrgInfo + +@dataclass +class Criteria: + standard_number: str # as defined in messages file. alpha-numeric + slug: str # id to match. alpha-numeric + description: str # whatever you want + + @staticmethod + def idfn(val: Any) -> Optional[str]: + """ provide ID for pytest Parametrization + """ + if isinstance(val, (Criteria,)): + return f"{val.standard_number}-{val.slug}" + return None + + def __str__(self: Any) -> str: + return f"{self.standard_number} {self.description}" + + # define the criteria we care about. Identify each critera with a string that will # appear in the results. -required_criteria = [ - "two factor required", # SOGH003 +required_criteria: List[Criteria] = [ + Criteria("SOGH003", "2fa", "two factor required"), # SOGH003 ] -optional_criteria = [ +optional_criteria: List[Criteria] = [ # "commit signing", # may not be knowable ] -warning_criteria = [] +warning_criteria: List[Criteria] = [] -def meets_criteria(org_info: OrgInfo, criteria: str) -> bool: +def meets_criteria(org_info: OrgInfo, criteria: Criteria) -> bool: met = True # ugly implementation for now - if criteria == "two factor required": + if criteria.slug == "2fa": met = org_info.requires_two_factor_authentication else: met = False return met -def validate_org_info(data: OrgInfo, criteria: str) -> List[str]: +def validate_org_info(data: OrgInfo, criteria: Criteria) -> Tuple[bool, str]: """ Validate the protections diff --git a/github/run.sh b/github/run.sh index 70394d9..eaf2168 100755 --- a/github/run.sh +++ b/github/run.sh @@ -7,6 +7,7 @@ set -eu export GH_TOKEN=${GH_TOKEN:-$(pass show Mozilla/moz-hwine-PAT)} export PATH_TO_METADATA=${PATH_TO_METADATA:-~/repos/foxsec/master/services/metadata} export TODAY=${TODAY:-$(date --utc --iso=date)} +export PATH_TO_SCRIPTS=${PATH_TO_SCRIPTS:-$PWD/github} PATH_TO_EXEMPTIONS=${PATH_TO_EXEMPTIONS:-$PWD/github/exemptions-github.yaml} @@ -15,29 +16,27 @@ PROFILE="github" pytest_json=results-$PROFILE-$TODAY.json -function create_issue() { - local -i i=0 - while read l; do - ((i++)) - printf "%3d: %s" $i "$l" - done -} - - pytest --continue-on-collection-errors \ --quiet --tb=no \ $PROFILE \ --json=$pytest_json \ --config "${PATH_TO_EXEMPTIONS}" || true -# filter for errors we want to open an issue on -jq '.report.tests[] | select(.call.outcome != "passed") - | { full_name: .name, - modified_status: .outcome, - reason:(.call.xfail_reason // "") - }' \ - $pytest_json \ - | create_issue + +# post processing works directly with the output from pytest +$PATH_TO_SCRIPTS/manage_issues.py $pytest_json +$PATH_TO_SCRIPTS/create_metrics.py $pytest_json + + + +# # filter for errors we want to open an issue on +# jq '.report.tests[] | select(.call.outcome != "passed") +# | { full_name: .name, +# modified_status: .outcome, +# reason:(.call.xfail_reason // "") +# }' \ +# $pytest_json \ +# | create_issue