diff --git a/.github/workflows/python-ci.yml b/.github/workflows/python-ci.yml index f85f8050..8cc2ed74 100644 --- a/.github/workflows/python-ci.yml +++ b/.github/workflows/python-ci.yml @@ -23,29 +23,35 @@ jobs: with: python-version-file: "python/selfie-lib/pyproject.toml" cache: "poetry" - - run: poetry install + - name: selfie-lib - poetry install + run: poetry install working-directory: python/selfie-lib - - run: poetry run pytest -vv + - name: selfie-lib - pytest + run: poetry run pytest -vv working-directory: python/selfie-lib - - run: poetry run pyright + - name: selfie-lib - pyright + run: poetry run pyright working-directory: python/selfie-lib - - run: poetry run ruff format --check + - name: selfie-lib - ruff + run: poetry run ruff format --check working-directory: python/selfie-lib - - run: poetry install + - name: pytest-selfie - poetry install + run: poetry install working-directory: python/pytest-selfie - - run: poetry run pytest -vv + - name: pytest-selfie - pyright + run: poetry run pyright working-directory: python/pytest-selfie - - run: poetry run tox -e py + - name: pytest-selfie - ruff + run: poetry run ruff format --check working-directory: python/pytest-selfie - - run: poetry run pyright - working-directory: python/pytest-selfie - - run: poetry run ruff format --check - working-directory: python/pytest-selfie - - run: poetry install + - name: example-pytest-selfie - poetry install + run: poetry install working-directory: python/example-pytest-selfie # - run: poetry run pytest -vv # working-directory: python/example-pytest-selfie - - run: poetry run pyright + - name: example-pytest-selfie - pyright + run: poetry run pyright working-directory: python/example-pytest-selfie - - run: poetry run ruff format --check + - name: example-pytest-selfie - ruff + run: poetry run ruff format --check working-directory: python/example-pytest-selfie diff --git a/python/example-pytest-selfie/poetry.lock b/python/example-pytest-selfie/poetry.lock index 1ad63a37..5fa3ee12 100644 --- a/python/example-pytest-selfie/poetry.lock +++ b/python/example-pytest-selfie/poetry.lock @@ -256,13 +256,13 @@ test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)", "pytest (>=7.4.3)", "pytest- [[package]] name = "pluggy" -version = "1.4.0" +version = "1.5.0" description = "plugin and hook calling mechanisms for python" optional = false python-versions = ">=3.8" files = [ - {file = "pluggy-1.4.0-py3-none-any.whl", hash = "sha256:7db9f7b503d67d1c5b95f59773ebb58a8c1c288129a88665838012cfb07b8981"}, - {file = "pluggy-1.4.0.tar.gz", hash = "sha256:8c85c2876142a764e5b7548e7d9a0e0ddb46f5185161049a79b7e974454223be"}, + {file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"}, + {file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"}, ] [package.extras] @@ -578,13 +578,13 @@ files = [ [[package]] name = "virtualenv" -version = "20.25.2" +version = "20.25.3" description = "Virtual Python Environment builder" optional = false python-versions = ">=3.7" files = [ - {file = "virtualenv-20.25.2-py3-none-any.whl", hash = "sha256:6e1281a57849c8a54da89ba82e5eb7c8937b9d057ff01aaf5bc9afaa3552e90f"}, - {file = "virtualenv-20.25.2.tar.gz", hash = "sha256:fa7edb8428620518010928242ec17aa7132ae435319c29c1651d1cf4c4173aad"}, + {file = "virtualenv-20.25.3-py3-none-any.whl", hash = "sha256:8aac4332f2ea6ef519c648d0bc48a5b1d324994753519919bddbb1aff25a104e"}, + {file = "virtualenv-20.25.3.tar.gz", hash = "sha256:7bb554bbdfeaacc3349fa614ea5bff6ac300fc7c335e9facf3a3bcfc703f45be"}, ] [package.dependencies] diff --git a/python/example-pytest-selfie/tests/Simple_test.py b/python/example-pytest-selfie/tests/Simple_test.py deleted file mode 100644 index eb3da02b..00000000 --- a/python/example-pytest-selfie/tests/Simple_test.py +++ /dev/null @@ -1,25 +0,0 @@ -# from selfie_lib.ArrayMap import ArrayMap -from selfie_lib.Selfie import expectSelfie - - -# def test_simple(): -# test = ArrayMap.empty().plus("key", "value") -# assert test.__len__() == 1 - - -def test_comment_removal(): # selfieonce - expectSelfie("nothing happens").toBe_TODO() - - -def test_inline(): - expectSelfie("A").toBe_TODO() - - expectSelfie("testing123").toBe_TODO() - - -def test_disk(): - expectSelfie("A").toMatchDisk_TODO() - - expectSelfie( - "Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo ligula eget dolor. Aenean massa. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Donec quam felis, ultricies nec, pellentesque eu, pretium quis, sem. Nulla consequat massa quis enim. Donec pede justo, fringilla vel, aliquet nec, vulputate eget, arcu. In enim justo, rhoncus ut, imperdiet a, venenatis vitae, justo. Nullam dictum felis eu pede mollis pretium. Integer tincidunt. Cras dapibus. Vivamus elementum semper nisi. Aenean vulputate eleifend tellus. Aenean leo ligula, porttitor eu, consequat vitae, eleifend ac, enim. Aliquam lorem ante, dapibus in, viverra quis, feugiat a," - ).toMatchDisk_TODO() diff --git a/python/example-pytest-selfie/tests/simple_comment_removal_test.py b/python/example-pytest-selfie/tests/simple_comment_removal_test.py new file mode 100644 index 00000000..72b62cfc --- /dev/null +++ b/python/example-pytest-selfie/tests/simple_comment_removal_test.py @@ -0,0 +1,5 @@ +from selfie_lib.Selfie import expect_selfie + + +def test_comment_removal(): # selfieonce + expect_selfie("no op").to_be("no op") diff --git a/python/example-pytest-selfie/tests/simple_inline_test.py b/python/example-pytest-selfie/tests/simple_inline_test.py new file mode 100644 index 00000000..6f99490f --- /dev/null +++ b/python/example-pytest-selfie/tests/simple_inline_test.py @@ -0,0 +1,13 @@ +from selfie_lib.Selfie import expect_selfie + + +# def test_read_pass(): +# expect_selfie("A").to_be("A") + + +# def test_read_fail(): +# expect_selfie("A").to_be("B") + + +def test_write(): + expect_selfie("B").to_be_TODO() diff --git a/python/example-pytest-selfie/tests/simple_ondisk_test.py b/python/example-pytest-selfie/tests/simple_ondisk_test.py new file mode 100644 index 00000000..0bd25d02 --- /dev/null +++ b/python/example-pytest-selfie/tests/simple_ondisk_test.py @@ -0,0 +1,9 @@ +from selfie_lib.Selfie import expect_selfie + + +def test_write(): + expect_selfie("A").to_match_disk() + + +def test_read(): + expect_selfie("B").to_match_disk_TODO() diff --git a/python/pytest-selfie/poetry.lock b/python/pytest-selfie/poetry.lock index c14f9a0e..ea4259db 100644 --- a/python/pytest-selfie/poetry.lock +++ b/python/pytest-selfie/poetry.lock @@ -113,13 +113,13 @@ test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)", "pytest (>=7.4.3)", "pytest- [[package]] name = "pluggy" -version = "1.4.0" +version = "1.5.0" description = "plugin and hook calling mechanisms for python" optional = false python-versions = ">=3.8" files = [ - {file = "pluggy-1.4.0-py3-none-any.whl", hash = "sha256:7db9f7b503d67d1c5b95f59773ebb58a8c1c288129a88665838012cfb07b8981"}, - {file = "pluggy-1.4.0.tar.gz", hash = "sha256:8c85c2876142a764e5b7548e7d9a0e0ddb46f5185161049a79b7e974454223be"}, + {file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"}, + {file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"}, ] [package.extras] @@ -265,13 +265,13 @@ testing = ["build[virtualenv] (>=1.0.3)", "covdefaults (>=2.3)", "detect-test-po [[package]] name = "virtualenv" -version = "20.25.2" +version = "20.25.3" description = "Virtual Python Environment builder" optional = false python-versions = ">=3.7" files = [ - {file = "virtualenv-20.25.2-py3-none-any.whl", hash = "sha256:6e1281a57849c8a54da89ba82e5eb7c8937b9d057ff01aaf5bc9afaa3552e90f"}, - {file = "virtualenv-20.25.2.tar.gz", hash = "sha256:fa7edb8428620518010928242ec17aa7132ae435319c29c1651d1cf4c4173aad"}, + {file = "virtualenv-20.25.3-py3-none-any.whl", hash = "sha256:8aac4332f2ea6ef519c648d0bc48a5b1d324994753519919bddbb1aff25a104e"}, + {file = "virtualenv-20.25.3.tar.gz", hash = "sha256:7bb554bbdfeaacc3349fa614ea5bff6ac300fc7c335e9facf3a3bcfc703f45be"}, ] [package.dependencies] diff --git a/python/pytest-selfie/pytest_selfie/SelfieSettingsAPI.py b/python/pytest-selfie/pytest_selfie/SelfieSettingsAPI.py new file mode 100644 index 00000000..16138c14 --- /dev/null +++ b/python/pytest-selfie/pytest_selfie/SelfieSettingsAPI.py @@ -0,0 +1,48 @@ +import os +from pathlib import Path +import re +from typing import Optional +from selfie_lib import Mode +import pytest + + +class SelfieSettingsAPI: + """API for configuring the selfie plugin, you can set its values like this https://docs.pytest.org/en/7.1.x/reference/customize.html#configuration-file-formats""" + + def __init__(self, config: pytest.Config): + self.root_dir = config.rootpath + + @property + def allow_multiple_equivalent_writes_to_one_location(self) -> bool: + """Allow multiple equivalent writes to one location.""" + return True + + @property + def snapshot_folder_name(self) -> Optional[str]: + """Defaults to None, which means that snapshots are stored right next to the test that created them.""" + return None + + @property + def root_folder(self) -> Path: + """Returns the root folder for storing snapshots. Set by https://docs.pytest.org/en/7.1.x/reference/customize.html#finding-the-rootdir""" + return self.root_dir + + def calc_mode(self) -> Mode: + override = os.getenv("selfie") or os.getenv("SELFIE") + if override: + # Convert the mode to lowercase and match it with the Mode enum + try: + return Mode[override.lower()] + except KeyError: + raise ValueError(f"No such mode: {override}") + + ci = os.getenv("ci") or os.getenv("CI") + if ci and ci.lower() == "true": + return Mode.readonly + else: + return Mode.interactive + + +class SelfieSettingsSmuggleError(SelfieSettingsAPI): + def __init__(self, error: BaseException): + self.error = error diff --git a/python/pytest-selfie/pytest_selfie/__init__.py b/python/pytest-selfie/pytest_selfie/__init__.py index e69de29b..3d2f2183 100644 --- a/python/pytest-selfie/pytest_selfie/__init__.py +++ b/python/pytest-selfie/pytest_selfie/__init__.py @@ -0,0 +1 @@ +from .SelfieSettingsAPI import SelfieSettingsAPI as SelfieSettingsAPI diff --git a/python/pytest-selfie/pytest_selfie/plugin.py b/python/pytest-selfie/pytest_selfie/plugin.py index b664afcf..ee37ac3e 100644 --- a/python/pytest-selfie/pytest_selfie/plugin.py +++ b/python/pytest-selfie/pytest_selfie/plugin.py @@ -1,215 +1,434 @@ -from typing import Optional +from cgi import test +import os +from collections import defaultdict +import re +from typing import ByteString, DefaultDict, List, Optional, Iterator, Tuple + +from selfie_lib.Atomic import AtomicReference +from .SelfieSettingsAPI import SelfieSettingsAPI from selfie_lib import ( - Snapshot, + _clearSelfieSystem, _initSelfieSystem, - SnapshotSystem, - TypedPath, - recordCall, - FS, - SnapshotFile, - SnapshotFileLayout, - DiskStorage, + ArrayMap, + ArraySet, CallStack, + CommentTracker, + DiskStorage, + DiskWriteTracker, + FS, + InlineWriteTracker, LiteralValue, Mode, + Snapshot, + SnapshotFile, + SnapshotFileLayout, + SnapshotSystem, + SnapshotValueReader, + SourceFile, + TypedPath, + WithinTestGC, ) -from selfie_lib.CommentTracker import CommentTracker -from pathlib import Path import pytest -import re class FSImplementation(FS): - def file_walk(self, typed_path, walk): - pass + def assert_failed(self, message: str, expected=None, actual=None) -> Exception: + raise Exception(message) - def file_read_binary(self, typed_path) -> bytes: - return b"" - def file_write_binary(self, typed_path, content: bytes): - pass +class PytestSnapshotFileLayout(SnapshotFileLayout): + def __init__(self, fs: FSImplementation, settings: SelfieSettingsAPI): + super().__init__(fs) + self.__settings = settings + self.__root_folder = TypedPath.of_folder(os.path.abspath(settings.root_dir)) + self.unix_newlines = self.__infer_default_line_ending_is_unix() - def assert_failed(self, message: str, expected=None, actual=None) -> Exception: - raise Exception(message) + def snapshotfile_for_testfile(self, testfile: TypedPath) -> TypedPath: + if testfile.name.endswith(".py"): + return testfile.parent_folder().resolve_file(f"{testfile.name[:-3]}.ss") + else: + raise ValueError(f"Unknown file extension, expected .py: {testfile.name}") + def __infer_default_line_ending_is_unix(self) -> bool: + def walk_callback(walk: Iterator[TypedPath]) -> bool: + for file_path in walk: + try: + txt = self.fs.file_read(file_path) + # look for a file that has a newline somewhere in it + if "\n" in txt: + return "\r" not in txt + except Exception: + # might be a binary file that throws an encoding exception + pass + return True # if we didn't find any files, assume unix -class DiskStorageImplementation(DiskStorage): - def read_disk(self, sub: str, call: CallStack) -> Optional[Snapshot]: - print(f"Reading from disk: sub={sub}") - return None + return self.fs.file_walk(self.__root_folder, walk_callback) - def write_disk(self, actual: Snapshot, sub: str, call: CallStack): - print(f"Writing to disk: {actual} at {sub}") - def keep(self, sub_or_keep_all: Optional[str]): - print(f"Keeping snapshot for: {sub_or_keep_all}") +@pytest.hookimpl +def pytest_collection_modifyitems( + session: pytest.Session, config: pytest.Config, items: List[pytest.Item] +) -> None: + settings = SelfieSettingsAPI(config) + system = PytestSnapshotSystem(settings) + session.selfie_system = system # type: ignore + _initSelfieSystem(system) + for item in items: + (file, _, testname) = item.reportinfo() + system.planning_to_run(TypedPath.of_file(os.path.abspath(file)), testname) -class PytestSnapshotSystem(SnapshotSystem): - def __init__(self): - self._mode = Mode(can_write=True) - self._comment_tracker = CommentTracker() +@pytest.hookimpl +def pytest_sessionfinish(session: pytest.Session, exitstatus): + system: PytestSnapshotSystem = session.selfie_system # type: ignore + system.finished_all_tests() + _clearSelfieSystem(system) - @property - def mode(self) -> Mode: - return self._mode - @property - def fs(self) -> FS: - return FSImplementation() +@pytest.hookimpl(hookwrapper=True) +def pytest_runtest_protocol(item: pytest.Item, nextitem: Optional[pytest.Item]): + (file, _, testname) = item.reportinfo() + testfile = TypedPath.of_file(os.path.abspath(file)) - @property - def layout(self) -> SnapshotFileLayout: - return SnapshotFileLayout(self.fs) + system: PytestSnapshotSystem = item.session.selfie_system # type: ignore + system.test_start(testfile, testname) + yield + system.test_finish(testfile, testname) - def diskThreadLocal(self) -> DiskStorage: - return DiskStorageImplementation() - def source_file_has_writable_comment(self, call: CallStack) -> bool: - return self._comment_tracker.hasWritableComment(call, self.layout) +@pytest.hookimpl +def pytest_runtest_makereport(call: pytest.CallInfo[None], item: pytest.Item): + if call.excinfo is not None and call.when in ( + "call", + "teardown", + ): + system: PytestSnapshotSystem = item.session.selfie_system # type: ignore + (file, _, testname) = item.reportinfo() + system.test_failed(TypedPath.of_file(os.path.abspath(file)), testname) - def write_inline(self, literal_value: LiteralValue, call: CallStack): - pass - def finishedAllTests(self): - pass +class _keydefaultdict(defaultdict): + """A special defaultdict that passes the key to the default_factory.""" + def __missing__(self, key): + if self.default_factory is None: + raise KeyError(key) + else: + ret = self[key] = self.default_factory(key) # type: ignore + return ret -pytestSystem = PytestSnapshotSystem() +class PytestSnapshotSystem(SnapshotSystem): + def __init__(self, settings: SelfieSettingsAPI): + self.__fs = FSImplementation() + self.__mode = settings.calc_mode() + self._layout = PytestSnapshotFileLayout(self.__fs, settings) + self.__comment_tracker = CommentTracker() + self.__inline_write_tracker = InlineWriteTracker() + # self.__toBeFileWriteTracker = ToBeFileWriteTracker() #TODO + + self.__progress_per_file: DefaultDict[TypedPath, SnapshotFileProgress] = ( + _keydefaultdict(lambda key: SnapshotFileProgress(self, key)) # type: ignore + ) # type: ignore + # the test which is running right now, if any + self.__in_progress: Optional[SnapshotFileProgress] = None + # double-checks that we don't have any tests in progress + self.check_for_invalid_state: AtomicReference[Optional[ArraySet[TypedPath]]] = ( + AtomicReference(ArraySet.empty()) + ) -def pytest_addoption(parser): - group = parser.getgroup("selfie") - group.addoption( - "--foo", - action="store", - dest="dest_foo", - default="2024", - help='Set the value for the fixture "bar".', - ) + def planning_to_run(self, testfile: TypedPath, testname: str): + progress = self.__progress_per_file[testfile] + progress.finishes_expected += 1 - parser.addini("HELLO", "Dummy pytest.ini setting") + def mark_path_as_written(self, path: TypedPath): + def update_fun(arg: Optional[ArraySet[TypedPath]]): + if arg is None: + raise RuntimeError( + "Snapshot file is being written after all tests were finished." + ) + return arg.plusOrThis(path) + self.check_for_invalid_state.update_and_get(update_fun) -@pytest.fixture -def bar(request): - return request.config.option.dest_foo + def test_start(self, testfile: TypedPath, testname: str): + if self.__in_progress: + raise RuntimeError( + f"Test already in progress. {self.__in_progress.test_file} is running, can't start {testfile}" + ) + self.__in_progress = self.__progress_per_file[testfile] + self.__in_progress.test_start(testname) + + def test_failed(self, testfile: TypedPath, testname: str): + self.__assert_inprogress(testfile) + self.__in_progress.test_failed(testname) # type: ignore + + def test_finish(self, testfile: TypedPath, testname: str): + self.__assert_inprogress(testfile) + self.__in_progress.test_finish(testname) # type: ignore + self.__in_progress = None + + def __assert_inprogress(self, testfile: TypedPath): + if self.__in_progress is None: + raise RuntimeError("No test in progress") + if self.__in_progress.test_file != testfile: + raise RuntimeError( + f"{self.__in_progress.test_file} is in progress, can't accept data for {testfile}." + ) + def finished_all_tests(self): + snapshotsFilesWrittenToDisk = self.check_for_invalid_state.get_and_update( + lambda _: None + ) + if snapshotsFilesWrittenToDisk is None: + raise RuntimeError("finished_all_tests() was called more than once.") + if self.mode != Mode.readonly: + for path in self.__comment_tracker.paths_with_once(): + source = SourceFile(path.name, self.fs.file_read(path)) + source.remove_selfie_once_comments() + self.fs.file_write(path, source.as_string) + if self.__inline_write_tracker.hasWrites(): + self.__inline_write_tracker.persist_writes(self.layout) -@pytest.hookimpl -def pytest_sessionstart(session: pytest.Session): - print("SELFIE SESSION STARTED") - replace_todo_in_test_file("tests/Simple_test.py::test_inline") - global pytestSystem - _initSelfieSystem(pytestSystem) + @property + def mode(self) -> Mode: + return self.__mode + @property + def fs(self) -> FS: + return self.__fs -@pytest.hookimpl -def pytest_sessionfinish(session: pytest.Session, exitstatus): - print("SELFIE SESSION FINISHED") - update_test_files(session) - pytestSystem.finishedAllTests() + @property + def layout(self) -> SnapshotFileLayout: + return self._layout + + def disk_thread_local(self) -> DiskStorage: + if ( + self.__in_progress is None + or self.__in_progress.testname_in_progress is None + ): + raise RuntimeError("No test in progress") + return DiskStoragePytest( + self.__in_progress, self.__in_progress.testname_in_progress + ) + def source_file_has_writable_comment(self, call: CallStack) -> bool: + return self.__comment_tracker.hasWritableComment(call, self.layout) -def update_test_files(session): - for test in session.items: - if getattr(test, "todo_replace", None): - replace_todo_in_test_file(test.nodeid, test.todo_replace["expected"]) + def write_inline(self, literal_value: LiteralValue, call: CallStack): + self.__inline_write_tracker.record(literal_value, call, self.layout) + def write_to_be_file( + self, path: TypedPath, data: "ByteString", call: CallStack + ) -> None: + raise NotImplementedError -def replace_todo_in_test_file(test_id, replacement_text=None): - file_path, test_name = test_id.split("::") - full_file_path = Path(file_path).resolve() - if not full_file_path.exists(): - print(f"File not found: {full_file_path}") - return +class DiskStoragePytest(DiskStorage): + def __init__(self, progress: "SnapshotFileProgress", testname: str): + self.__progress = progress + self._testname = testname - # Read and split file content into lines - test_code = full_file_path.read_text() - new_test_code = test_code.splitlines() + def read_disk(self, sub: str, call: CallStack) -> Optional[Snapshot]: + raise NotImplementedError() - # Using CommentTracker to check for writable comments - if pytestSystem._comment_tracker.hasWritableComment( - recordCall(), pytestSystem.layout - ): - print(f"Checking for writable comment in file: {full_file_path}") - typed_path = TypedPath.of_file(full_file_path.absolute().__str__()) - comment_str, line_number = CommentTracker.commentString(typed_path) - print(f"Found '#selfieonce' comment at line {line_number}") + def write_disk(self, actual: Snapshot, sub: str, call: CallStack): + raise NotImplementedError() - # Remove the selfieonce comment - line_content = new_test_code[line_number - 1] - new_test_code[line_number - 1] = line_content.split("#", 1)[0].rstrip() + def keep(self, sub_or_keep_all: Optional[str]): + self.__progress.keep(self._testname, sub_or_keep_all) - # Rejoin lines into a single string - new_test_code = "\n".join(new_test_code) - # Handling toBe_TODO() replacements - pattern_to_be = re.compile( - r"expectSelfie\(\s*\"(.*?)\"\s*\)\.toBe_TODO\(\)", re.DOTALL - ) - new_test_code = pattern_to_be.sub( - lambda m: f"expectSelfie(\"{m.group(1)}\").toBe('{m.group(1)}')", new_test_code - ) +class SnapshotFileProgress: + TERMINATED = ArrayMap.empty().plus(" ~ / f!n1shed / ~ ", WithinTestGC()) - # Handling toMatchDisk_TODO() replacements - test_disk_start = new_test_code.find("def test_disk():") - test_disk_end = new_test_code.find("def ", test_disk_start + 1) - test_disk_code = ( - new_test_code[test_disk_start:test_disk_end] - if test_disk_end != -1 - else new_test_code[test_disk_start:] - ) + def __init__(self, system: PytestSnapshotSystem, test_file: TypedPath): + self.system = system + # the test file which holds the test case which we are the snapshot file for + self.test_file = test_file - pattern_to_match_disk = re.compile( - r"expectSelfie\(\s*\"(.*?)\"\s*\)\.toMatchDisk_TODO\(\)", re.DOTALL - ) - snapshot_file_path = full_file_path.parent / "SomethingOrOther.ss" + # before the tests run, we find out how many we expect to happen + self.finishes_expected = 0 + # while the tests run, we count up until they have all run, and then we can cleanup + self.finishes_so_far = 0 + # have any tests failed? + self.has_failed = False - # Extract and write the matched content to file - with snapshot_file_path.open("w") as snapshot_file: + # lazy-loaded snapshot file + self.file: Optional[SnapshotFile] = None + self.tests: AtomicReference[ArrayMap[str, WithinTestGC]] = AtomicReference( + ArrayMap.empty() + ) + self.disk_write_tracker: Optional[DiskWriteTracker] = DiskWriteTracker() + # the test name which is currently in progress, if any + self.testname_in_progress: Optional[str] = None + self.testname_in_progress_failed = False + + def assert_not_terminated(self): + if self.tests.get() == SnapshotFileProgress.TERMINATED: + raise RuntimeError( + "Cannot call methods on a terminated SnapshotFileProgress" + ) - def write_snapshot(match): - selfie_value = match.group(1) - snapshot_content = ( - f'expectSelfie("{selfie_value}").toMatchDisk("{selfie_value}")' + def test_start(self, testname: str): + if "/" in testname: + raise ValueError(f"Test name cannot contain '/', was {test}") + self.assert_not_terminated() + if self.testname_in_progress is not None: + raise RuntimeError( + f"Cannot start a new test {testname}, {self.testname_in_progress} is already in progress" + ) + self.testname_in_progress = testname + self.tests.update_and_get(lambda it: it.plus_or_noop(testname, WithinTestGC())) + + def test_failed(self, testname: str): + self.__assert_in_progress(testname) + self.has_failed = True + self.tests.get()[testname].keep_all() + + def test_finish(self, testname: str): + self.__assert_in_progress(testname) + self.finishes_so_far += 1 + self.testname_in_progress = None + if self.finishes_so_far == self.finishes_expected: + self.__all_tests_finished() + + def __assert_in_progress(self, testname: str): + self.assert_not_terminated() + if self.testname_in_progress is None: + raise RuntimeError("Can't finish, no test was in progress!") + if self.testname_in_progress != testname: + raise RuntimeError( + f"Can't finish {testname}, {self.testname_in_progress} was in progress" ) - snapshot_file.write(snapshot_content + "\n") - return f'expectSelfie("{selfie_value}").toMatchDisk()' - test_disk_code = pattern_to_match_disk.sub(write_snapshot, test_disk_code) + def __all_tests_finished(self): + self.assert_not_terminated() + self.disk_write_tracker = None # don't need this anymore + tests = self.tests.get_and_update(lambda _: SnapshotFileProgress.TERMINATED) + if tests == SnapshotFileProgress.TERMINATED: + raise ValueError(f"Snapshot for {self.test_file} already terminated!") + if self.file is not None: + stale_snapshot_indices = WithinTestGC.find_stale_snapshots_within( + self.file.snapshots, + tests, + find_test_methods_that_didnt_run(self.test_file, tests), + ) + if stale_snapshot_indices or self.file.was_set_at_test_time: + self.file.remove_all_indices(stale_snapshot_indices) + snapshot_path = self.system._layout.snapshotfile_for_testfile( + self.test_file + ) + if not self.file.snapshots: + delete_file_and_parent_dir_if_empty(snapshot_path) + else: + self.system.mark_path_as_written( + self.system._layout.snapshotfile_for_testfile(self.test_file) + ) + os.makedirs( + os.path.dirname(snapshot_path.absolute_path), exist_ok=True + ) + with open( + snapshot_path.absolute_path, "w", encoding="utf-8" + ) as writer: + list = [] + self.file.serialize(list) + for e in list: + writer.write(e) + else: + # we never read or wrote to the file + every_test_in_class_ran = not any( + find_test_methods_that_didnt_run(self.test_file, tests) + ) + is_stale = ( + every_test_in_class_ran + and not self.has_failed + and all(it.succeeded_and_used_no_snapshots() for it in tests.values()) + ) + if is_stale: + snapshot_file = self.system._layout.snapshotfile_for_testfile( + self.test_file + ) + delete_file_and_parent_dir_if_empty(snapshot_file) + # now that we are done, allow our contents to be GC'ed + self.file = None + + def keep(self, test: str, suffix_or_all: Optional[str]): + self.assert_not_terminated() + if suffix_or_all is None: + self.tests.get()[test].keep_all() + else: + self.tests.get()[test].keep_suffix(suffix_or_all) + + def write( + self, + test: str, + suffix: str, + snapshot: Snapshot, + call_stack: CallStack, + layout: SnapshotFileLayout, + ): + self.assert_not_terminated() + key = f"{test}{suffix}" + self.disk_write_tracker.record(key, snapshot, call_stack, layout) # type: ignore + self.tests.get()[test].keep_suffix(suffix) + self.read_file().set_at_test_time(key, snapshot) + + def read(self, test: str, suffix: str) -> Optional[Snapshot]: + self.assert_not_terminated() + snapshot = self.read_file().snapshots.get(f"{test}{suffix}") + if snapshot is not None: + self.tests.get()[test].keep_suffix(suffix) + return snapshot + + def read_file(self) -> SnapshotFile: + if self.file is None: + snapshot_path = self.system._layout.snapshotfile_for_testfile( + self.test_file + ) + if os.path.exists(snapshot_path.absolute_path) and os.path.isfile( + snapshot_path.absolute_path + ): + with open(snapshot_path.absolute_path, "rb") as f: + content = f.read() + self.file = SnapshotFile.parse(SnapshotValueReader.of_binary(content)) + else: + self.file = SnapshotFile.create_empty_with_unix_newlines( + self.system._layout.unix_newlines + ) + return self.file + + +def delete_file_and_parent_dir_if_empty(snapshot_file: TypedPath): + if os.path.isfile(snapshot_file.absolute_path): + os.remove(snapshot_file.absolute_path) + # if the parent folder is now empty, delete it + parent = os.path.dirname(snapshot_file.absolute_path) + if not os.listdir(parent): + os.rmdir(parent) + + +def find_test_methods_that_didnt_run( + testfile: TypedPath, tests: ArrayMap[str, WithinTestGC] +) -> ArrayMap[str, WithinTestGC]: + # Implementation of finding test methods that didn't run + # You can replace this with your own logic based on the class_name and tests dictionary + return ArrayMap.empty() - # Update the test code for the 'test_disk' method - if test_disk_end != -1: - new_test_code = ( - new_test_code[:test_disk_start] - + test_disk_code - + new_test_code[test_disk_end:] - ) - else: - new_test_code = new_test_code[:test_disk_start] + test_disk_code - if test_code != new_test_code: - full_file_path.write_text(new_test_code) - print(f"Updated test code in {full_file_path}") - else: - print("No changes made to the test code.") +def pytest_addoption(parser): + group = parser.getgroup("selfie") + group.addoption( + "--foo", + action="store", + dest="dest_foo", + default="2024", + help='Set the value for the fixture "bar".', + ) + parser.addini("HELLO", "Dummy pytest.ini setting") -@pytest.hookimpl(hookwrapper=True) -def pytest_pyfunc_call(pyfuncitem): - outcome = yield - try: - result = outcome.get_result() - except Exception as e: - result = str(e) - print(f"Test error: {pyfuncitem.nodeid} with {e}") - - # Store expected result if TODO was used and test passed - if "TODO" in pyfuncitem.name and outcome.excinfo is None: - expected_result = result - pyfuncitem.todo_replace = {"expected": expected_result} - replace_todo_in_test_file(pyfuncitem.nodeid, expected_result) - - print(f"SELFIE end test {pyfuncitem.nodeid} with {result}") + +@pytest.fixture +def bar(request): + return request.config.option.dest_foo diff --git a/python/pytest-selfie/pytest_selfie/replace_todo.py b/python/pytest-selfie/pytest_selfie/replace_todo.py new file mode 100644 index 00000000..74dcf49c --- /dev/null +++ b/python/pytest-selfie/pytest_selfie/replace_todo.py @@ -0,0 +1,95 @@ +import re +from pathlib import Path +from selfie_lib import ( + CommentTracker, + recordCall, + TypedPath, +) + + +def update_test_files(pytestSystem, session): + for test in session.items: + if getattr(test, "todo_replace", None): + replace_todo_in_test_file( + pytestSystem, test.nodeid, test.todo_replace["expected"] + ) + + +def replace_todo_in_test_file(pytestSystem, test_id, replacement_text=None): + file_path, test_name = test_id.split("::") + full_file_path = Path(file_path).resolve() + + if not full_file_path.exists(): + print(f"File not found: {full_file_path}") + return + + # Read and split file content into lines + test_code = full_file_path.read_text() + new_test_code = test_code.splitlines() + + # Using CommentTracker to check for writable comments + if pytestSystem.__comment_tracker.hasWritableComment( + recordCall(False), pytestSystem.layout + ): + print(f"Checking for writable comment in file: {full_file_path}") + typed_path = TypedPath.of_file(full_file_path.absolute().__str__()) + comment_str, line_number = CommentTracker.commentString(typed_path) + print(f"Found '#selfieonce' comment at line {line_number}") + + # Remove the selfieonce comment + line_content = new_test_code[line_number - 1] + new_test_code[line_number - 1] = line_content.split("#", 1)[0].rstrip() + + # Rejoin lines into a single string + new_test_code = "\n".join(new_test_code) + + # Handling toBe_TODO() replacements + pattern_to_be = re.compile( + r"expectSelfie\(\s*\"(.*?)\"\s*\)\.toBe_TODO\(\)", re.DOTALL + ) + new_test_code = pattern_to_be.sub( + lambda m: f"expectSelfie(\"{m.group(1)}\").toBe('{m.group(1)}')", new_test_code + ) + + # Handling toMatchDisk_TODO() replacements + test_disk_start = new_test_code.find("def test_disk():") + test_disk_end = new_test_code.find("def ", test_disk_start + 1) + test_disk_code = ( + new_test_code[test_disk_start:test_disk_end] + if test_disk_end != -1 + else new_test_code[test_disk_start:] + ) + + pattern_to_match_disk = re.compile( + r"expectSelfie\(\s*\"(.*?)\"\s*\)\.toMatchDisk_TODO\(\)", re.DOTALL + ) + snapshot_file_path = full_file_path.parent / "SomethingOrOther.ss" + + # Extract and write the matched content to file + with snapshot_file_path.open("w") as snapshot_file: + + def write_snapshot(match): + selfie_value = match.group(1) + snapshot_content = ( + f'expectSelfie("{selfie_value}").toMatchDisk("{selfie_value}")' + ) + snapshot_file.write(snapshot_content + "\n") + return f'expectSelfie("{selfie_value}").toMatchDisk()' + + test_disk_code = pattern_to_match_disk.sub(write_snapshot, test_disk_code) + + # Update the test code for the 'test_disk' method + if test_disk_end != -1: + new_test_code = ( + new_test_code[:test_disk_start] + + test_disk_code + + new_test_code[test_disk_end:] + ) + else: + new_test_code = new_test_code[:test_disk_start] + test_disk_code + + if test_code != new_test_code: + full_file_path.write_text(new_test_code) + print(f"Updated test code in {full_file_path}") + else: + print("No changes made to the test code.") diff --git a/python/pytest-selfie/tests/conftest.py b/python/pytest-selfie/tests/conftest.py deleted file mode 100644 index 694d7d58..00000000 --- a/python/pytest-selfie/tests/conftest.py +++ /dev/null @@ -1 +0,0 @@ -pytest_plugins = "pytester" diff --git a/python/pytest-selfie/tests/test_pytest_selfie.py b/python/pytest-selfie/tests/test_pytest_selfie.py deleted file mode 100644 index 50ca3f9e..00000000 --- a/python/pytest-selfie/tests/test_pytest_selfie.py +++ /dev/null @@ -1,64 +0,0 @@ -def test_bar_fixture(pytester): - """Make sure that pytest accepts our fixture.""" - - # create a temporary pytest test module - pytester.makepyfile(""" - def test_sth(bar): - assert bar == "europython2015" - """) - - # run pytest with the following cmd args - result = pytester.runpytest("--foo=europython2015", "-v") - - # fnmatch_lines does an assertion internally - result.stdout.fnmatch_lines( - [ - "*::test_sth PASSED*", - ] - ) - - # make sure that we get a '0' exit code for the testsuite - assert result.ret == 0 - - -def test_help_message(pytester): - result = pytester.runpytest( - "--help", - ) - # fnmatch_lines does an assertion internally - result.stdout.fnmatch_lines( - [ - "selfie:", - '*--foo=DEST_FOO*Set the value for the fixture "bar".', - ] - ) - - -def test_hello_ini_setting(pytester): - pytester.makeini(""" - [pytest] - HELLO = world - """) - - pytester.makepyfile(""" - import pytest - - @pytest.fixture - def hello(request): - return request.config.getini('HELLO') - - def test_hello_world(hello): - assert hello == 'world' - """) - - result = pytester.runpytest("-v") - - # fnmatch_lines does an assertion internally - result.stdout.fnmatch_lines( - [ - "*::test_hello_world PASSED*", - ] - ) - - # make sure that we get a '0' exit code for the testsuite - assert result.ret == 0 diff --git a/python/selfie-lib/poetry.lock b/python/selfie-lib/poetry.lock index 8ce387f3..533eebdd 100644 --- a/python/selfie-lib/poetry.lock +++ b/python/selfie-lib/poetry.lock @@ -49,13 +49,13 @@ files = [ [[package]] name = "pluggy" -version = "1.4.0" +version = "1.5.0" description = "plugin and hook calling mechanisms for python" optional = false python-versions = ">=3.8" files = [ - {file = "pluggy-1.4.0-py3-none-any.whl", hash = "sha256:7db9f7b503d67d1c5b95f59773ebb58a8c1c288129a88665838012cfb07b8981"}, - {file = "pluggy-1.4.0.tar.gz", hash = "sha256:8c85c2876142a764e5b7548e7d9a0e0ddb46f5185161049a79b7e974454223be"}, + {file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"}, + {file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"}, ] [package.extras] diff --git a/python/selfie-lib/selfie_lib/ArrayMap.py b/python/selfie-lib/selfie_lib/ArrayMap.py index 98083f89..5d76cb7d 100644 --- a/python/selfie-lib/selfie_lib/ArrayMap.py +++ b/python/selfie-lib/selfie_lib/ArrayMap.py @@ -1,5 +1,6 @@ -from collections.abc import Set, Iterator, Mapping -from typing import List, TypeVar, Union, Any +from collections.abc import Set, Iterator, Mapping, ItemsView +import re +from typing import List, Tuple, TypeVar, Union, Any from abc import abstractmethod, ABC T = TypeVar("T") @@ -94,8 +95,59 @@ def plusOrThis(self, element: K) -> "ArraySet[K]": return ArraySet.__create(new_data) +class _ArrayMapKeys(ListBackedSet[K]): + def __init__(self, data: List[Union[K, V]]): + self.__data = data + + def __len__(self) -> int: + return len(self.__data) // 2 + + def __getitem__(self, index: Union[int, slice]): # type: ignore + if isinstance(index, slice): + return [ + self.__data[i] + for i in range( + index.start * 2 if index.start else 0, + index.stop * 2 if index.stop else len(self.__data), + index.step * 2 if index.step else 2, + ) + ] + else: + return self.__data[2 * index] + + def __iter__(self) -> Iterator[K]: + return (self.__data[i] for i in range(0, len(self.__data), 2)) # type: ignore + + +class _ArrayMapEntries(ListBackedSet[Tuple[K, V]], ItemsView[K, V]): + def __init__(self, data: List[Union[K, V]]): + self.__data = data + + def __len__(self) -> int: + return len(self.__data) // 2 + + def __getitem__(self, index: Union[int, slice]): # type: ignore + if isinstance(index, slice): + return [ + (self.__data[i], self.__data[i + 1]) + for i in range( + index.start * 2 if index.start else 0, + index.stop * 2 if index.stop else len(self.__data), + index.step * 2 if index.step else 2, + ) + ] + else: + return (self.__data[2 * index], self.__data[2 * index + 1]) + + def __iter__(self) -> Iterator[Tuple[K, V]]: + return ( + (self.__data[i], self.__data[i + 1]) for i in range(0, len(self.__data), 2) + ) # type: ignore + + class ArrayMap(Mapping[K, V]): __data: List[Union[K, V]] + __keys: ListBackedSet[K] def __init__(self): raise NotImplementedError("Use ArrayMap.empty() or other class methods instead") @@ -104,6 +156,7 @@ def __init__(self): def __create(cls, data: List[Union[K, V]]) -> "ArrayMap[K, V]": instance = cls.__new__(cls) instance.__data = data + instance.__keys = _ArrayMapKeys(data) return instance @classmethod @@ -112,6 +165,12 @@ def empty(cls) -> "ArrayMap[K, V]": cls.__EMPTY = cls.__create([]) return cls.__EMPTY + def keys(self) -> ListBackedSet[K]: # type: ignore + return self.__keys + + def items(self) -> _ArrayMapEntries[K, V]: # type: ignore + return _ArrayMapEntries(self.__data) + def __getitem__(self, key: K) -> V: index = self._binary_search_key(key) if index >= 0: @@ -125,8 +184,7 @@ def __len__(self) -> int: return len(self.__data) // 2 def _binary_search_key(self, key: K) -> int: - keys = [self.__data[i] for i in range(0, len(self.__data), 2)] - return _binary_search(keys, key) + return _binary_search(self.__keys, key) def plus(self, key: K, value: V) -> "ArrayMap[K, V]": index = self._binary_search_key(key) @@ -146,6 +204,18 @@ def minus_sorted_indices(self, indices: List[int]) -> "ArrayMap[K, V]": del new_data[index] return ArrayMap.__create(new_data) + def plus_or_noop(self, key: K, value: V) -> "ArrayMap[K, V]": + index = self._binary_search_key(key) + if index >= 0: + return self + else: + # Insert new key-value pair + insert_at = -(index + 1) + new_data = self.__data[:] + new_data.insert(insert_at * 2, key) + new_data.insert(insert_at * 2 + 1, value) + return ArrayMap.__create(new_data) + def plus_or_noop_or_replace(self, key: K, value: V) -> "ArrayMap[K, V]": index = self._binary_search_key(key) if index >= 0: diff --git a/python/selfie-lib/selfie_lib/Atomic.py b/python/selfie-lib/selfie_lib/Atomic.py new file mode 100644 index 00000000..50b74c05 --- /dev/null +++ b/python/selfie-lib/selfie_lib/Atomic.py @@ -0,0 +1,46 @@ +from typing import Callable, Optional + + +class AtomicReference[T]: + """ + This has the same API as Java's AtomicReference, but it doesn't make any sense in the Python runtime. + The point of keeping it is that it makes the port from Kotlin more 1:1 + """ + + def __init__(self, initial_value: T): + self.value: T = initial_value + + def get(self) -> T: + return self.value + + def set(self, new_value: T) -> None: + self.value = new_value + + def get_and_set(self, new_value: T) -> T: + old_value = self.value + self.value = new_value + return old_value + + def compare_and_set(self, expected_value: T, new_value: T) -> bool: + if self.value == expected_value: + self.value = new_value + return True + return False + + def get_and_update(self, update_function: Callable[[T], T]) -> T: + old_value = self.value + self.value = update_function(self.value) + return old_value + + def update_and_get(self, update_function: Callable[[T], T]) -> T: + self.value = update_function(self.value) + return self.value + + def get_and_accumulate(self, x: T, accumulator: Callable[[T, T], T]) -> T: + old_value = self.value + self.value = accumulator(self.value, x) + return old_value + + def accumulate_and_get(self, x: T, accumulator: Callable[[T, T], T]) -> T: + self.value = accumulator(self.value, x) + return self.value diff --git a/python/selfie-lib/selfie_lib/CommentTracker.py b/python/selfie-lib/selfie_lib/CommentTracker.py index b33f9abf..a770511b 100644 --- a/python/selfie-lib/selfie_lib/CommentTracker.py +++ b/python/selfie-lib/selfie_lib/CommentTracker.py @@ -23,7 +23,7 @@ def __init__(self): self.cache: Dict[TypedPath, WritableComment] = {} self.lock = threading.Lock() - def pathsWithOnce(self) -> Iterable[TypedPath]: + def paths_with_once(self) -> Iterable[TypedPath]: with self.lock: return [ path @@ -32,7 +32,7 @@ def pathsWithOnce(self) -> Iterable[TypedPath]: ] def hasWritableComment(self, call: CallStack, layout: SnapshotFileLayout) -> bool: - path = layout.sourcePathForCall(call) + path = layout.sourcefile_for_call(call) with self.lock: if path in self.cache: comment = self.cache[path] diff --git a/python/selfie-lib/selfie_lib/FS.py b/python/selfie-lib/selfie_lib/FS.py new file mode 100644 index 00000000..9e1daea3 --- /dev/null +++ b/python/selfie-lib/selfie_lib/FS.py @@ -0,0 +1,37 @@ +from selfie_lib.TypedPath import TypedPath +from pathlib import Path + +from abc import ABC, abstractmethod +from typing import Callable, Iterator +from itertools import chain + + +class FS(ABC): + def file_exists(self, typed_path: TypedPath) -> bool: + return Path(typed_path.absolute_path).is_file() + + def file_walk[T]( + self, typed_path: TypedPath, walk: Callable[[Iterator[TypedPath]], T] + ) -> T: + def walk_generator(path: TypedPath) -> Iterator[TypedPath]: + for file_path in Path(path.absolute_path).rglob("*"): + if file_path.is_file(): + yield TypedPath(file_path.absolute().as_posix()) + + return walk(walk_generator(typed_path)) + + def file_read(self, typed_path) -> str: + return self.file_read_binary(typed_path).decode() + + def file_write(self, typed_path, content: str): + self.file_write_binary(typed_path, content.encode()) + + def file_read_binary(self, typed_path: TypedPath) -> bytes: + return Path(typed_path.absolute_path).read_bytes() + + def file_write_binary(self, typed_path: TypedPath, content: bytes): + Path(typed_path.absolute_path).write_bytes(content) + + @abstractmethod + def assert_failed(self, message: str, expected=None, actual=None) -> Exception: + pass diff --git a/python/selfie-lib/selfie_lib/Literals.py b/python/selfie-lib/selfie_lib/Literals.py index 46005ff5..08259a70 100644 --- a/python/selfie-lib/selfie_lib/Literals.py +++ b/python/selfie-lib/selfie_lib/Literals.py @@ -1,3 +1,4 @@ +from calendar import c from enum import Enum, auto from typing import Protocol, TypeVar from abc import abstractmethod @@ -262,3 +263,24 @@ def __to_boolean_strict(self, string: str) -> bool: def parse(self, string: str, language: Language) -> bool: return self.__to_boolean_strict(string) + + +class TodoStub(Enum): + toMatchDisk = auto() + toBeFile = auto() + + def create_literal(self): + return LiteralValue(None, self, LiteralTodoStub()) + + +class LiteralTodoStub(LiteralFormat[TodoStub]): + def encode( + self, + value: TodoStub, + language: Language, + encoding_policy: EscapeLeadingWhitespace, + ) -> str: + raise NotImplementedError() + + def parse(self, string: str, language: Language) -> TodoStub: + raise NotImplementedError() diff --git a/python/selfie-lib/selfie_lib/Selfie.py b/python/selfie-lib/selfie_lib/Selfie.py index bc569556..d44c3d99 100644 --- a/python/selfie-lib/selfie_lib/Selfie.py +++ b/python/selfie-lib/selfie_lib/Selfie.py @@ -1,10 +1,16 @@ -from tracemalloc import Snapshot from .SelfieImplementations import StringSelfie from .Snapshot import Snapshot from .SnapshotSystem import _selfieSystem +from typing import Union -def expectSelfie(actual: str) -> "StringSelfie": - snapshot = Snapshot.of(actual) - diskStorage = _selfieSystem().diskThreadLocal() - return StringSelfie(snapshot, diskStorage, actual) + +def expect_selfie(actual: Union[str, int]) -> "StringSelfie": + if isinstance(actual, int): + raise NotImplementedError() + elif isinstance(actual, str): + snapshot = Snapshot.of(actual) + diskStorage = _selfieSystem().disk_thread_local() + return StringSelfie(snapshot, diskStorage) + else: + raise NotImplementedError() diff --git a/python/selfie-lib/selfie_lib/SelfieImplementations.py b/python/selfie-lib/selfie_lib/SelfieImplementations.py index 6e65ea34..f01d8932 100644 --- a/python/selfie-lib/selfie_lib/SelfieImplementations.py +++ b/python/selfie-lib/selfie_lib/SelfieImplementations.py @@ -1,102 +1,220 @@ +import base64 + from .Snapshot import Snapshot -from .SnapshotSystem import DiskStorage, _selfieSystem +from .SnapshotFile import SnapshotFile +from .SnapshotSystem import DiskStorage, SnapshotSystem, _selfieSystem, Mode from .WriteTracker import recordCall as recordCall -from .Literals import LiteralValue, LiteralString as LiteralString +from .Literals import LiteralValue, LiteralString, LiteralFormat, TodoStub + + +from abc import ABC, abstractmethod +from typing import Any, List, Optional +from itertools import chain + + +class FluentFacet(ABC): + @abstractmethod + def facet(self, facet: str) -> "StringFacet": + """Extract a single facet from a snapshot in order to do an inline snapshot.""" + pass + + @abstractmethod + def facets(self, *facets: str) -> "StringFacet": + """Extract multiple facets from a snapshot in order to do an inline snapshot.""" + pass + + @abstractmethod + def facet_binary(self, facet: str) -> "BinaryFacet": + pass + + +class StringFacet(FluentFacet, ABC): + @abstractmethod + def to_be(self, expected: str) -> str: + pass + + def to_be_TODO(self, unused_arg: Any = None) -> str: + return self.to_be_TODO() + + +class BinaryFacet(FluentFacet, ABC): + @abstractmethod + def to_be_base64(self, expected: str) -> bytes: + pass + + def to_be_base64_TODO(self, unused_arg: Any = None) -> bytes: + return self.to_be_base64_TODO() + @abstractmethod + def to_be_file(self, subpath: str) -> bytes: + pass -class FluentFacet: + @abstractmethod + def to_be_file_TODO(self, subpath: str) -> bytes: + pass + + +class DiskSelfie(FluentFacet): def __init__(self, actual: Snapshot, disk: DiskStorage): - self._actual = actual - self._disk = disk + self.actual = actual + self.disk = disk - def facet(self, facet: str): - return StringSelfie(self._actual, self._disk, facet) + def to_match_disk(self, sub: str = "") -> "DiskSelfie": + call = recordCall(False) + if _selfieSystem().mode.can_write(False, call, _selfieSystem()): + self.disk.write_disk(self.actual, sub, call) + else: + _assertEqual(self.disk.read_disk(sub, call), self.actual, _selfieSystem()) + return self - def facets(self, *facets: str): - return StringSelfie(self._actual, self._disk, " ".join(facets)) + def to_match_disk_TODO(self, sub: str = "") -> "DiskSelfie": + call = recordCall(False) + if _selfieSystem().mode.can_write(True, call, _selfieSystem()): + self.disk.write_disk(self.actual, sub, call) + _selfieSystem().write_inline(TodoStub.toMatchDisk.create_literal(), call) + return self + else: + raise _selfieSystem().fs.assert_failed( + "Can't call `toMatchDisk_TODO` in {} mode!".format(Mode.readonly) + ) - def facet_binary(self, facet: str): + def facet(self, facet: str) -> "StringFacet": raise NotImplementedError() + def facets(self, *facets: str) -> "StringFacet": + raise NotImplementedError() -class DiskSelfie(FluentFacet): - def __init__(self, actual: Snapshot, disk: DiskStorage, expected: str = ""): + def facet_binary(self, facet: str) -> "BinaryFacet": + raise NotImplementedError() + + +class StringSelfie(DiskSelfie, StringFacet): + def __init__( + self, + actual: Snapshot, + disk: DiskStorage, + only_facets: Optional[List[str]] = None, + ): super().__init__(actual, disk) - self._expected = expected + self.only_facets = only_facets - def toMatchDisk(self, sub="") -> "DiskSelfie": - call = recordCall() - snapshot_system = _selfieSystem() - if snapshot_system.mode.can_write(False, call): - snapshot_system.diskThreadLocal().write_disk(self._actual, sub, call) - else: - expected = snapshot_system.diskThreadLocal().read_disk(sub, call) - if expected != self._actual: - raise snapshot_system.fs.assert_failed( - "Snapshot mismatch!", expected, self._actual - ) + if self.only_facets is not None: + assert all( + facet == "" or facet in actual.facets for facet in self.only_facets + ), f"The following facets were not found in the snapshot: {[facet for facet in self.only_facets if actual.subject_or_facet_maybe(facet) is None]}\navailable facets are: {list(actual.facets.keys())}" + assert ( + len(self.only_facets) > 0 + ), "Must have at least one facet to display, this was empty." + if "" in self.only_facets: + assert ( + self.only_facets.index("") == 0 + ), f'If you\'re going to specify the subject facet (""), you have to list it first, this was {self.only_facets}' + + def to_match_disk(self, sub: str = "") -> "StringSelfie": + super().to_match_disk(sub) return self - def toMatchDisk_TODO(self, sub="") -> "DiskSelfie": - call = recordCall() - snapshot_system = _selfieSystem() - if snapshot_system.mode.can_write(True, call): - self._disk.write_disk(self._actual, sub, call) - actual_snapshot_value = self._actual.subject_or_facet_maybe(sub) - if actual_snapshot_value is None: - actual_value = "None" - else: - actual_value = ( - actual_snapshot_value.value_string() - if not actual_snapshot_value.is_binary - else "binary data" - ) - literal_value = LiteralValue( - expected=None, - actual=f"TODO: Expected '{self._expected}', got '{actual_value}'", - format=LiteralString(), + def to_match_disk_TODO(self, sub: str = "") -> "StringSelfie": + super().to_match_disk_TODO(sub) + return self + + def __actual(self) -> str: + if not self.actual.facets or (self.only_facets and len(self.only_facets) == 1): + # single value doesn't have to worry about escaping at all + only_value = self.actual.subject_or_facet( + self.only_facets[0] if self.only_facets else "" ) - snapshot_system.write_inline(literal_value, call) + if only_value.is_binary: + return ( + base64.b64encode(only_value.value_binary()) + .decode() + .replace("\r", "") + ) + else: + return only_value.value_string() else: - raise snapshot_system.fs.assert_failed( - "Can't call `toMatchDisk_TODO` in readonly mode!" + return _serializeOnlyFacets( + self.actual, self.only_facets or [""] + list(self.actual.facets.keys()) ) - return self + + def to_be_TODO(self, unused_arg: Any = None) -> str: + return _toBeDidntMatch(None, self.__actual(), LiteralString()) + + def to_be(self, expected: str) -> str: + actual_string = self.__actual() + if actual_string == expected: + return _checkSrc(actual_string) + else: + return _toBeDidntMatch(expected, actual_string, LiteralString()) -class StringSelfie(DiskSelfie): - def __init__(self, actual: Snapshot, disk: DiskStorage, expected: str): - super().__init__(actual, disk, expected) +def _checkSrc[T](value: T) -> T: + _selfieSystem().mode.can_write(False, recordCall(True), _selfieSystem()) + return value - def toBe(self, expected: str) -> str: - result = self._expected - if result != expected: + +def _toBeDidntMatch[T](expected: Optional[T], actual: T, format: LiteralFormat[T]) -> T: + call = recordCall(False) + writable = _selfieSystem().mode.can_write(expected is None, call, _selfieSystem()) + if writable: + _selfieSystem().write_inline(LiteralValue(expected, actual, format), call) + return actual + else: + if expected is None: raise _selfieSystem().fs.assert_failed( - "Expected value does not match!", expected, result - ) - return result - - def toBe_TODO(self) -> str: - call = recordCall() - snapshot_system = _selfieSystem() - if snapshot_system.mode.can_write(True, call): - actual_snapshot_value = self._actual.subject_or_facet_maybe("") - if actual_snapshot_value is None: - actual_value = "None" - else: - actual_value = ( - actual_snapshot_value.value_string() - if not actual_snapshot_value.is_binary - else "binary data" - ) - literal_value = LiteralValue( - expected=None, - actual=f"TODO: Expected '{self._expected}', got '{actual_value}'", - format=LiteralString(), + f"Can't call `toBe_TODO` in {Mode.readonly} mode!" ) - snapshot_system.write_inline(literal_value, call) else: - raise snapshot_system.fs.assert_failed( - "Can't call `toBe_TODO` in readonly mode!" + raise _selfieSystem().fs.assert_failed( + _selfieSystem().mode.msg_snapshot_mismatch(), expected, actual ) - return self._expected + + +def _assertEqual( + expected: Optional[Snapshot], actual: Snapshot, storage: SnapshotSystem +): + if expected is None: + raise storage.fs.assert_failed(storage.mode.msg_snapshot_not_found()) + elif expected == actual: + return + else: + mismatched_keys = sorted( + filter( + lambda facet: expected.subject_or_facet_maybe(facet) + != actual.subject_or_facet_maybe(facet), + chain( + [""], + expected.facets.keys(), + ( + facet + for facet in actual.facets.keys() + if facet not in expected.facets + ), + ), + ) + ) + raise storage.fs.assert_failed( + storage.mode.msg_snapshot_mismatch(), + _serializeOnlyFacets(expected, mismatched_keys), + _serializeOnlyFacets(actual, mismatched_keys), + ) + + +def _serializeOnlyFacets(snapshot: Snapshot, keys: List[str]) -> str: + writer = [] + for key in keys: + if not key: + SnapshotFile.writeEntry(writer, "", None, snapshot.subject_or_facet(key)) + else: + value = snapshot.subject_or_facet(key) + if value is not None: + SnapshotFile.writeEntry(writer, "", key, value) + + EMPTY_KEY_AND_FACET = "╔═ ═╗\n" + writer_str = "".join(writer) + + if writer_str.startswith(EMPTY_KEY_AND_FACET): + # this codepath is triggered by the `key.isEmpty()` line above + return writer_str[len(EMPTY_KEY_AND_FACET) : -1] + else: + return writer_str[:-1] diff --git a/python/selfie-lib/selfie_lib/SnapshotFile.py b/python/selfie-lib/selfie_lib/SnapshotFile.py index 801f5957..9b3e189a 100644 --- a/python/selfie-lib/selfie_lib/SnapshotFile.py +++ b/python/selfie-lib/selfie_lib/SnapshotFile.py @@ -1,7 +1,8 @@ +import base64 from threading import Lock -from typing import List, Optional, Dict +from typing import Tuple, List, Optional, Dict -from .Snapshot import Snapshot +from .Snapshot import Snapshot, SnapshotValue from .SnapshotReader import SnapshotReader from .SnapshotValueReader import SnapshotValueReader from .ArrayMap import ArrayMap @@ -13,31 +14,56 @@ class SnapshotFile: def __init__(self): self.unix_newlines: bool = True - self.metadata: Optional[Dict[str, str]] = None - self._snapshots: ArrayMap[str, Snapshot] = ArrayMap.empty() + self.metadata: Optional[Tuple[str, str]] = None + self.snapshots: ArrayMap[str, Snapshot] = ArrayMap.empty() self._lock: Lock = Lock() self.was_set_at_test_time: bool = False - def serialize(self, value_writer: List[str]) -> None: - if self.metadata: - for key, value in self.metadata.items(): - value_writer.append(f"╔═ 📷 {key} ═╗\n{value}\n") + def serialize(self, valueWriter: List[str]): + if self.metadata is not None: + self.writeEntry( + valueWriter, + f"📷 {self.metadata[0]}", + None, + SnapshotValue.of(self.metadata[1]), + ) + + for entry_key, entry_value in self.snapshots.items(): + self.writeEntry(valueWriter, entry_key, None, entry_value._subject) + for facet_key, facet_value in entry_value.facets.items(): + self.writeEntry(valueWriter, entry_key, facet_key, facet_value) + + self.writeEntry(valueWriter, "", "end of file", SnapshotValue.of("")) + + @staticmethod + def writeEntry( + valueWriter: List[str], key: str, facet: Optional[str], value: SnapshotValue + ): + valueWriter.append("╔═ ") + valueWriter.append(SnapshotValueReader.name_esc.escape(key)) + if facet is not None: + valueWriter.append("[") + valueWriter.append(SnapshotValueReader.name_esc.escape(facet)) + valueWriter.append("]") + valueWriter.append(" ═╗") + if value.is_binary: + valueWriter.append(" base64 length ") + valueWriter.append(str(len(value.value_binary()))) + valueWriter.append(" bytes") + valueWriter.append("\n") + + if not key and facet == "end of file": + return - for key, snapshot in self._snapshots.items(): - subject_str = ( - snapshot._subject.value_string() - if hasattr(snapshot._subject, "value_string") - else str(snapshot._subject) + if value.is_binary: + escaped = base64.b64encode(value.value_binary()).decode("utf-8") + valueWriter.append(escaped.replace("\r", "")) + else: + escaped = SnapshotValueReader.body_esc.escape(value.value_string()).replace( + "\n╔", "\n\ud801\udf41" ) - value_writer.append(f"╔═ {key} ═╗\n{subject_str}\n") - for facet_key, facet_value in snapshot.facets.items(): - facet_value_str = ( - facet_value.value_string() - if hasattr(facet_value, "value_string") - else str(facet_value) - ) - value_writer.append(f"╔═ {key}[{facet_key}] ═╗\n{facet_value_str}\n") - value_writer.append("╔═ [end of file] ═╗\n") + valueWriter.append(escaped) + valueWriter.append("\n") def set_at_test_time(self, key: str, snapshot: Snapshot) -> None: with self._lock: @@ -61,7 +87,7 @@ def parse(cls, value_reader: SnapshotValueReader) -> "SnapshotFile": if peek_key and peek_key.startswith(cls.HEADER_PREFIX): metadata_name = peek_key[len(cls.HEADER_PREFIX) :] metadata_value = value_reader.next_value().value_string() - result.metadata = {metadata_name: metadata_value} + result.metadata = (metadata_name, metadata_value) reader.next_snapshot() while True: @@ -71,7 +97,7 @@ def parse(cls, value_reader: SnapshotValueReader) -> "SnapshotFile": if peek_key.startswith(cls.HEADER_PREFIX): continue next_snapshot = reader.next_snapshot() - result._snapshots = result._snapshots.plus(peek_key, next_snapshot) + result.snapshots = result.snapshots.plus(peek_key, next_snapshot) return result diff --git a/python/selfie-lib/selfie_lib/SnapshotSystem.py b/python/selfie-lib/selfie_lib/SnapshotSystem.py index 081baeeb..48a0174b 100644 --- a/python/selfie-lib/selfie_lib/SnapshotSystem.py +++ b/python/selfie-lib/selfie_lib/SnapshotSystem.py @@ -1,43 +1,15 @@ from abc import ABC, abstractmethod -from typing import Callable, Sequence, Optional -from .TypedPath import TypedPath -from .Snapshot import Snapshot -from .SnapshotFile import SnapshotFile -from .Literals import LiteralValue -from .SnapshotReader import SnapshotReader -from .SnapshotValueReader import SnapshotValueReader - - -class Mode: - def __init__(self, can_write: bool): - self._can_write = can_write - - def can_write(self, write: bool, call): - return self._can_write - - -class FS(ABC): - @abstractmethod - def file_walk[T](self, typed_path, walk: Callable[[Sequence[TypedPath]], T]) -> T: - pass - - def file_read(self, typed_path) -> str: - return self.file_read_binary(typed_path).decode() - - def file_write(self, typed_path, content: str): - self.file_write_binary(typed_path, content.encode()) - - @abstractmethod - def file_read_binary(self, typed_path) -> bytes: - pass +from enum import Enum, auto +import glob +from typing import ByteString, Optional - @abstractmethod - def file_write_binary(self, typed_path, content: bytes): - pass +from .FS import FS - @abstractmethod - def assert_failed(self, message: str, expected=None, actual=None) -> Exception: - pass +from .Literals import LiteralValue +from .Snapshot import Snapshot +from .TypedPath import TypedPath +from .WriteTracker import CallStack, SnapshotFileLayout +from .CommentTracker import CommentTracker class DiskStorage(ABC): @@ -57,89 +29,116 @@ def keep(self, sub_or_keep_all: Optional[str]): class SnapshotSystem(ABC): - from .WriteTracker import CallStack, SnapshotFileLayout - - def __init__(self): - from .CommentTracker import CommentTracker - from .WriteTracker import InlineWriteTracker - - self._comment_tracker = CommentTracker() - self._inline_write_tracker = InlineWriteTracker() - @property @abstractmethod - def fs(self) -> FS: - pass + def fs(self) -> FS: ... @property @abstractmethod - def mode(self) -> "Mode": - pass + def mode(self) -> "Mode": ... @property @abstractmethod - def layout(self) -> SnapshotFileLayout: - pass + def layout(self) -> SnapshotFileLayout: ... + @abstractmethod def source_file_has_writable_comment(self, call: CallStack) -> bool: - return self._comment_tracker.hasWritableComment(call, self.layout) - - def write_inline(self, literal_value: LiteralValue, call: CallStack): - from .WriteTracker import CallLocation - - call_location = CallLocation(call.location.file_name, call.location.line) - self._inline_write_tracker.record( - call_location, literal_value, call, self.layout - ) + """ + Returns true if the sourcecode for the given call has a writable annotation. + """ + ... @abstractmethod - def diskThreadLocal(self) -> DiskStorage: - pass + def write_inline(self, literal_value: LiteralValue, call: CallStack) -> None: + """ + Indicates that the following value should be written into test sourcecode. + """ + ... - def read_snapshot_from_disk(self, file_path: TypedPath) -> Optional[Snapshot]: - try: - content = self.fs.file_read_binary(file_path) - value_reader = SnapshotValueReader.of_binary(content) - snapshot_reader = SnapshotReader(value_reader) - return snapshot_reader.next_snapshot() - except Exception as e: - raise self.fs.assert_failed( - f"Failed to read snapshot from {file_path.absolute_path}: {str(e)}" - ) + @abstractmethod + def write_to_be_file( + self, path: TypedPath, data: ByteString, call: CallStack + ) -> None: + """ + Writes the given bytes to the given file, checking for duplicate writes. + """ + ... - def write_snapshot_to_disk(self, snapshot: Snapshot, file_path: TypedPath): - try: - # Create a SnapshotFile object that will contain the snapshot - snapshot_file = SnapshotFile.create_empty_with_unix_newlines(True) - snapshot_file.set_at_test_time("default", snapshot) - - # Serialize the SnapshotFile object - serialized_data = [] - snapshot_file.serialize(serialized_data) - - # Write serialized data to disk as binary - serialized_str = "\n".join(serialized_data) - self.fs.file_write_binary(file_path, serialized_str.encode()) - except Exception as e: - raise self.fs.assert_failed( - f"Failed to write snapshot to {file_path.absolute_path}: {str(e)}" - ) + @abstractmethod + def disk_thread_local(self) -> DiskStorage: + """ + Returns the DiskStorage for the test associated with this thread, else error. + """ + ... selfieSystem = None -def _initSelfieSystem(system: SnapshotSystem): - global selfieSystem - # TODO: Figure out how to wipe this state in unit tests - # if selfieSystem is not None: - # raise Exception("Selfie system already initialized") - selfieSystem = system - - def _selfieSystem() -> "SnapshotSystem": + global selfieSystem if selfieSystem is None: raise Exception( "Selfie system not initialized, make sure that `pytest-selfie` is installed and that you are running tests with `pytest`." ) return selfieSystem + + +def _initSelfieSystem(system: SnapshotSystem): + global selfieSystem + if selfieSystem is not None: + raise Exception("Selfie system already initialized") + selfieSystem = system + + +def _clearSelfieSystem(system: SnapshotSystem): + global selfieSystem + if selfieSystem is not system: + raise Exception("This was not the running system!") + selfieSystem = None + + +class Mode(Enum): + interactive = auto() + readonly = auto() + overwrite = auto() + + def can_write(self, is_todo: bool, call: CallStack, system: SnapshotSystem) -> bool: + if self == Mode.interactive: + return is_todo or system.source_file_has_writable_comment(call) + elif self == Mode.readonly: + if system.source_file_has_writable_comment(call): + layout = system.layout + path = layout.sourcefile_for_call(call) + comment, line = CommentTracker.commentString(path) + raise system.fs.assert_failed( + f"Selfie is in readonly mode, so `{comment}` is illegal at {call.location.with_line(line).ide_link(layout)}" + ) + return False + elif self == Mode.overwrite: + return True + else: + raise ValueError(f"Unknown mode: {self}") + + def msg_snapshot_not_found(self) -> str: + return self.msg("Snapshot not found") + + def msg_snapshot_not_found_no_such_file(self, file) -> str: + return self.msg(f"Snapshot not found: no such file {file}") + + def msg_snapshot_mismatch(self) -> str: + return self.msg("Snapshot mismatch") + + def msg(self, headline: str) -> str: + if self == Mode.interactive: + return ( + f"{headline}\n" + "- update this snapshot by adding '_TODO' to the function name\n" + "- update all snapshots in this file by adding '//selfieonce' or '//SELFIEWRITE'" + ) + elif self == Mode.readonly: + return headline + elif self == Mode.overwrite: + return f"{headline}\n(didn't expect this to ever happen in overwrite mode)" + else: + raise ValueError(f"Unknown mode: {self}") diff --git a/python/selfie-lib/selfie_lib/SourceFile.py b/python/selfie-lib/selfie_lib/SourceFile.py index 6ab05fc9..f75be6e2 100644 --- a/python/selfie-lib/selfie_lib/SourceFile.py +++ b/python/selfie-lib/selfie_lib/SourceFile.py @@ -15,6 +15,9 @@ def __init__(self, filename: str, content: str) -> None: self.__content_slice.__str__() ) + def remove_selfie_once_comments(self): + raise NotImplementedError("remove_selfie_once_comments is not implemented") + @property def as_string(self) -> str: return ( diff --git a/python/selfie-lib/selfie_lib/TypedPath.py b/python/selfie-lib/selfie_lib/TypedPath.py index 7165db0f..74625a7c 100644 --- a/python/selfie-lib/selfie_lib/TypedPath.py +++ b/python/selfie-lib/selfie_lib/TypedPath.py @@ -9,6 +9,9 @@ def __init__(self, absolute_path: str): def __hash__(self): return hash(self.absolute_path) + def __str__(self) -> str: + return self.absolute_path + @property def name(self) -> str: if self.absolute_path.endswith("/"): diff --git a/python/selfie-lib/selfie_lib/WithinTestGC.py b/python/selfie-lib/selfie_lib/WithinTestGC.py new file mode 100644 index 00000000..6f84ea14 --- /dev/null +++ b/python/selfie-lib/selfie_lib/WithinTestGC.py @@ -0,0 +1,92 @@ +from re import S, T +from typing import Iterable, Tuple, List +from threading import Lock + +from .ArrayMap import ArrayMap, ArraySet +from .Snapshot import Snapshot + + +class WithinTestGC: + def __init__(self): + self.suffixes_to_keep = ArraySet.empty() + self.lock = Lock() + + def keep_suffix(self, suffix: str): + with self.lock: + if self.suffixes_to_keep: + self.suffixes_to_keep = self.suffixes_to_keep.plusOrThis(suffix) + + def keep_all(self) -> "WithinTestGC": + with self.lock: + self.suffixes_to_keep = None + return self + + def __str__(self) -> str: + with self.lock: + return ( + str(self.suffixes_to_keep) + if self.suffixes_to_keep is not None + else "(null)" + ) + + def succeeded_and_used_no_snapshots(self) -> bool: + with self.lock: + return self.suffixes_to_keep == ArraySet.empty() + + def keeps(self, s: str) -> bool: + with self.lock: + return True if self.suffixes_to_keep is None else s in self.suffixes_to_keep + + @staticmethod + def find_stale_snapshots_within( + snapshots: ArrayMap[str, Snapshot], + tests_that_ran: ArrayMap[str, "WithinTestGC"], + tests_that_didnt_run: Iterable[str], + ) -> List[int]: + stale_indices = [] + + # combine what we know about methods that did run with what we know about the tests that didn't + total_gc = tests_that_ran + for method in tests_that_didnt_run: + total_gc = total_gc.plus(method, WithinTestGC().keep_all()) + + gc_roots = total_gc.items() + keys = snapshots.keys() + # we'll start with the lowest gc, and the lowest key + gc_idx = 0 + key_idx = 0 + while key_idx < len(keys) and gc_idx < len(gc_roots): + key: str = keys[key_idx] # type: ignore + gc: Tuple[str, WithinTestGC] = gc_roots[gc_idx] # type: ignore + if key.startswith(gc[0]): + if len(key) == len(gc[0]): + # startWith + same length = exact match, no suffix + if not gc[1].keeps(""): + stale_indices.append(key_idx) + key_idx += 1 + continue + elif key[len(gc[0])] == "/": + # startWith + not same length = can safely query the `/` + suffix = key[len(gc[0]) :] + if not gc[1].keeps(suffix): + stale_indices.append(key_idx) + key_idx += 1 + continue + else: + # key is longer than gc.key, but doesn't start with gc.key, so we must increment gc + gc_idx += 1 + continue + else: + # we don't start with the key, so we must increment + if gc[0] < key: + gc_idx += 1 + else: + # we never found a gc that started with this key, so it's stale + stale_indices.append(key_idx) + key_idx += 1 + + while key_idx < len(keys): + stale_indices.append(key_idx) + key_idx += 1 + + return stale_indices diff --git a/python/selfie-lib/selfie_lib/WriteTracker.py b/python/selfie-lib/selfie_lib/WriteTracker.py index 6e2c8981..4d2c10f4 100644 --- a/python/selfie-lib/selfie_lib/WriteTracker.py +++ b/python/selfie-lib/selfie_lib/WriteTracker.py @@ -3,12 +3,13 @@ from abc import ABC, abstractmethod import inspect import threading +import os from functools import total_ordering from .SourceFile import SourceFile from .Literals import LiteralValue from .TypedPath import TypedPath -from .SnapshotSystem import FS +from .FS import FS T = TypeVar("T") @@ -55,6 +56,9 @@ def __eq__(self, other) -> bool: return NotImplemented return (self._file_name, self._line) == (other.file_name, other.line) + def __hash__(self): + return hash((self._file_name, self._line)) + class CallStack: def __init__(self, location: CallLocation, rest_of_stack: List[CallLocation]): @@ -83,15 +87,25 @@ class SnapshotFileLayout: def __init__(self, fs: FS): self.fs = fs - def sourcePathForCall(self, call: CallStack) -> TypedPath: + def sourcefile_for_call(self, call: CallStack) -> TypedPath: file_path = call.location.file_name if not file_path: raise ValueError("No file path available in CallLocation.") - return TypedPath(str(Path(file_path))) - - -def recordCall(callerFileOnly: bool = False) -> CallStack: - stack_frames = inspect.stack()[1:] + return TypedPath(os.path.abspath(Path(file_path))) + + +def recordCall(callerFileOnly: bool) -> CallStack: + stack_frames_raw = inspect.stack() + first_real_frame = next( + ( + i + for i, x in enumerate(stack_frames_raw) + if x.frame.f_globals.get("__package__") != __package__ + ), + None, + ) + # filter to only the stack after the selfie-lib package + stack_frames = stack_frames_raw[first_real_frame:] if callerFileOnly: caller_file = stack_frames[0].filename @@ -120,10 +134,6 @@ def __init__(self): self.lock = threading.Lock() self.writes: Dict[T, FirstWrite[U]] = {} - @abstractmethod - def record(self, key: T, snapshot: U, call: CallStack, layout: SnapshotFileLayout): - pass - def recordInternal( self, key: T, @@ -153,33 +163,38 @@ def record(self, key: T, snapshot: U, call: CallStack, layout: SnapshotFileLayou class InlineWriteTracker(WriteTracker[CallLocation, LiteralValue]): + def hasWrites(self) -> bool: + return len(self.writes) > 0 + def record( self, - key: CallLocation, snapshot: LiteralValue, call: CallStack, layout: SnapshotFileLayout, ): - super().recordInternal(key, snapshot, call, layout) + super().recordInternal(call.location, snapshot, call, layout) - call_stack_from_location = CallStack(key, []) - file = layout.sourcePathForCall(call_stack_from_location) + call_stack_from_location = CallStack(call.location, []) + file = layout.sourcefile_for_call(call_stack_from_location) if snapshot.expected is not None: content = SourceFile(file.name, layout.fs.file_read(file)) try: snapshot = cast(LiteralValue, snapshot) - parsed_value = content.parse_to_be_like(key.line).parse_literal( - snapshot.format - ) + parsed_value = content.parse_to_be_like( + call.location.line + ).parse_literal(snapshot.format) except Exception as e: raise AssertionError( - f"Error while parsing the literal at {key.ide_link(layout)}. Please report this error at https://github.com/diffplug/selfie", + f"Error while parsing the literal at {call.location.ide_link(layout)}. Please report this error at https://github.com/diffplug/selfie", e, ) if parsed_value != snapshot.expected: raise layout.fs.assert_failed( - f"Selfie cannot modify the literal at {key.ide_link(layout)} because Selfie has a parsing bug. Please report this error at https://github.com/diffplug/selfie", + f"Selfie cannot modify the literal at {call.location.ide_link(layout)} because Selfie has a parsing bug. Please report this error at https://github.com/diffplug/selfie", snapshot.expected, parsed_value, ) + + def persist_writes(self, layout: SnapshotFileLayout): + raise NotImplementedError("InlineWriteTracker does not support persist_writes") diff --git a/python/selfie-lib/selfie_lib/__init__.py b/python/selfie-lib/selfie_lib/__init__.py index 4909263f..a8013a5c 100644 --- a/python/selfie-lib/selfie_lib/__init__.py +++ b/python/selfie-lib/selfie_lib/__init__.py @@ -1,22 +1,31 @@ +# maintain alphabetical order +from .ArrayMap import ArrayMap as ArrayMap +from .ArrayMap import ArraySet as ArraySet +from .Atomic import AtomicReference as AtomicReference +from .CommentTracker import CommentTracker as CommentTracker +from .FS import FS as FS from .LineReader import LineReader as LineReader -from .Slice import Slice as Slice -from .SourceFile import SourceFile as SourceFile -from .PerCharacterEscaper import PerCharacterEscaper as PerCharacterEscaper -from .SnapshotValueReader import SnapshotValueReader as SnapshotValueReader +from .Literals import LiteralValue as LiteralValue from .ParseException import ParseException as ParseException -from .SnapshotReader import SnapshotReader as SnapshotReader +from .PerCharacterEscaper import PerCharacterEscaper as PerCharacterEscaper +from .Slice import Slice as Slice from .Snapshot import Snapshot as Snapshot -from .SnapshotValue import SnapshotValue as SnapshotValue -from .SnapshotSystem import SnapshotSystem as SnapshotSystem +from .SnapshotFile import SnapshotFile as SnapshotFile +from .SnapshotReader import SnapshotReader as SnapshotReader from .SnapshotSystem import _initSelfieSystem as _initSelfieSystem -from .SnapshotSystem import FS as FS -from .SnapshotSystem import DiskStorage as DiskStorage -from .SnapshotSystem import SnapshotFile as SnapshotFile +from .SnapshotSystem import _clearSelfieSystem as _clearSelfieSystem from .SnapshotSystem import _selfieSystem as _selfieSystem +from .SnapshotSystem import DiskStorage as DiskStorage from .SnapshotSystem import Mode as Mode +from .SnapshotSystem import SnapshotSystem as SnapshotSystem +from .SnapshotValue import SnapshotValue as SnapshotValue +from .SnapshotValueReader import SnapshotValueReader as SnapshotValueReader +from .SourceFile import SourceFile as SourceFile from .TypedPath import TypedPath as TypedPath -from .WriteTracker import CallStack as CallStack +from .WithinTestGC import WithinTestGC as WithinTestGC from .WriteTracker import CallLocation as CallLocation +from .WriteTracker import CallStack as CallStack +from .WriteTracker import DiskWriteTracker as DiskWriteTracker +from .WriteTracker import InlineWriteTracker as InlineWriteTracker from .WriteTracker import recordCall as recordCall from .WriteTracker import SnapshotFileLayout as SnapshotFileLayout -from .Literals import LiteralValue as LiteralValue diff --git a/python/selfie-lib/tests/ArrayMap_test.py b/python/selfie-lib/tests/ArrayMap_test.py index 0af17e5d..21b3100e 100644 --- a/python/selfie-lib/tests/ArrayMap_test.py +++ b/python/selfie-lib/tests/ArrayMap_test.py @@ -145,3 +145,17 @@ def test_map_length(): assert len(map) == 1, "Length should be 1 after removing another item" map = map.minus_sorted_indices([0]) assert len(map) == 0, "Length should be 0 after removing all items" + + +def test_keys(): + assert ArrayMap.empty().keys().__len__() == 0 + map = ArrayMap.empty().plus("a", "alpha").plus("b", "beta") + assert map.keys()[0] == "a" + assert map.keys()[1] == "b" + + +def test_items(): + assert ArrayMap.empty().items().__len__() == 0 + map = ArrayMap.empty().plus("a", "alpha").plus("b", "beta") + assert map.items()[0] == ("a", "alpha") + assert map.items()[1] == ("b", "beta") diff --git a/python/selfie-lib/tests/SnapshotFile_test.py b/python/selfie-lib/tests/SnapshotFile_test.py index a40c6a45..a2b14c1a 100644 --- a/python/selfie-lib/tests/SnapshotFile_test.py +++ b/python/selfie-lib/tests/SnapshotFile_test.py @@ -16,7 +16,7 @@ def test_read_with_metadata(): ╔═ [end of file] ═╗ """.strip() file = SnapshotFile.parse(SnapshotValueReader.of(file_content)) - assert file.metadata == {"com.acme.AcmeTest": """{"header":"data"}"""} + assert file.metadata == ("com.acme.AcmeTest", """{"header":"data"}""") def test_read_without_metadata(): @@ -33,19 +33,19 @@ def test_read_without_metadata(): """.strip() file = SnapshotFile.parse(SnapshotValueReader.of(file_content)) assert file.metadata is None - assert set(file._snapshots.keys()) == {"Apple", "Orange"} + assert set(file.snapshots.keys()) == {"Apple", "Orange"} def test_write(): underTest = SnapshotFile() - underTest.metadata = {"com.acme.AcmeTest": """{"header":"data"}"""} + underTest.metadata = ("com.acme.AcmeTest", """{"header":"data"}""") apple_snapshot = Snapshot.of("Granny Smith") apple_snapshot = apple_snapshot.plus_facet("color", "green") apple_snapshot = apple_snapshot.plus_facet("crisp", "yes") - underTest._snapshots = underTest._snapshots.plus("Apple", apple_snapshot) - underTest._snapshots = underTest._snapshots.plus("Orange", Snapshot.of("Orange")) + underTest.snapshots = underTest.snapshots.plus("Apple", apple_snapshot) + underTest.snapshots = underTest.snapshots.plus("Orange", Snapshot.of("Orange")) buffer = [] underTest.serialize(buffer)