From e0a42230cabde619307665255f68e95546b1824c Mon Sep 17 00:00:00 2001 From: jctanner Date: Tue, 5 Dec 2023 13:35:43 -0500 Subject: [PATCH] Raise exceptions on invalid tarball paths. (#250) Issue: AAH-2992 Signed-off-by: James Tanner --- CHANGES/2992.bugfix | 1 + galaxy_importer/collection.py | 10 +- tests/unit/test_collection_archive_extract.py | 121 ++++++++++++++++++ 3 files changed, 130 insertions(+), 2 deletions(-) create mode 100644 CHANGES/2992.bugfix create mode 100644 tests/unit/test_collection_archive_extract.py diff --git a/CHANGES/2992.bugfix b/CHANGES/2992.bugfix new file mode 100644 index 00000000..ab6c7432 --- /dev/null +++ b/CHANGES/2992.bugfix @@ -0,0 +1 @@ +Raise exceptions with invalid paths in collection tarballs. diff --git a/galaxy_importer/collection.py b/galaxy_importer/collection.py index e1772b6a..4605a2a0 100644 --- a/galaxy_importer/collection.py +++ b/galaxy_importer/collection.py @@ -160,6 +160,12 @@ def _import_collection(file, filename, file_url, logger, cfg): def _extract_archive(fileobj, extract_dir): fileobj.seek(0) with tarfile.open(fileobj=fileobj, mode="r") as tf: - if any((item.startswith("/") or item.startswith("../")) for item in tf.getnames()): - raise exc.ImporterError("Invalid file paths detected.") + for item in tf.getmembers(): + if item.name.startswith("/") or "../" in item.name: + raise exc.ImporterError("Invalid file paths detected.") + if item.linkname: + # Ensure the link target is within the extraction root + link_target = os.path.normpath(os.path.join(extract_dir, item.linkname)) + if not link_target.startswith(os.path.abspath(extract_dir)): + raise exc.ImporterError("Invalid link target detected.") tf.extractall(extract_dir) diff --git a/tests/unit/test_collection_archive_extract.py b/tests/unit/test_collection_archive_extract.py new file mode 100644 index 00000000..47e997d3 --- /dev/null +++ b/tests/unit/test_collection_archive_extract.py @@ -0,0 +1,121 @@ +# (c) 2012-2023, Ansible by Red Hat +# +# This file is part of Ansible Galaxy +# +# Ansible Galaxy is free software: you can redistribute it and/or modify +# it under the terms of the Apache License as published by +# the Apache Software Foundation, either version 2 of the License, or +# (at your option) any later version. +# +# Ansible Galaxy is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Apache License for more details. +# +# You should have received a copy of the Apache License +# along with Galaxy. If not, see . + +import os +import shutil +import tarfile +import tempfile +import unittest +from io import BytesIO + +from galaxy_importer.collection import _extract_archive +from galaxy_importer.exceptions import ImporterError + + +class TestCollectionExtractArchive(unittest.TestCase): + def test_valid_archive(self): + # Create a valid tar archive with no invalid paths + archive_data = b"testfile content" + archive_file = BytesIO() + with tarfile.open(fileobj=archive_file, mode="w") as tf: + tarinfo = tarfile.TarInfo("testfile") + tarinfo.size = len(archive_data) + tf.addfile(tarinfo, BytesIO(archive_data)) + archive_file.seek(0) + + # Create a temporary extraction directory + extract_dir = tempfile.mkdtemp(prefix="collection-archive-extract-test-") + os.makedirs(extract_dir, exist_ok=True) + + try: + # Test extracting the archive + _extract_archive(archive_file, extract_dir) + finally: + pass + + # Assert that the extracted file exists + extracted_file_path = os.path.join(extract_dir, "testfile") + self.assertTrue(os.path.isfile(extracted_file_path)) + + # Clean up the temporary extraction directory + shutil.rmtree(extract_dir) + + def test_invalid_path_with_absolute_path(self): + # Create an archive with invalid paths + archive_data = b"testfile content" + archive_file = BytesIO() + with tarfile.open(fileobj=archive_file, mode="w") as tf: + tarinfo = tarfile.TarInfo("/invalid_path") + tarinfo.size = len(archive_data) + tf.addfile(tarinfo, BytesIO(archive_data)) + archive_file.seek(0) + + # Create a temporary extraction directory + extract_dir = tempfile.mkdtemp(prefix="collection-archive-extract-test-") + os.makedirs(extract_dir, exist_ok=True) + + # Test extracting the archive with invalid paths + with self.assertRaises(ImporterError): + _extract_archive(archive_file, extract_dir) + + # Clean up the temporary extraction directory + shutil.rmtree(extract_dir) + + def test_invalid_parent_reference(self): + # Create an archive with invalid paths + archive_data = b"testfile content" + archive_file = BytesIO() + with tarfile.open(fileobj=archive_file, mode="w") as tf: + tarinfo = tarfile.TarInfo("../invalid_path") + tarinfo.size = len(archive_data) + tf.addfile(tarinfo, BytesIO(archive_data)) + archive_file.seek(0) + + # Create a temporary extraction directory + extract_dir = tempfile.mkdtemp(prefix="collection-archive-extract-test-") + os.makedirs(extract_dir, exist_ok=True) + + # Test extracting the archive with invalid paths + with self.assertRaises(ImporterError): + _extract_archive(archive_file, extract_dir) + + # Clean up the temporary extraction directory + shutil.rmtree(extract_dir) + + def test_invalid_link_destination(self): + # Create an archive with a link whose destination is "/" + archive_data = b"testfile content" + archive_file = BytesIO() + with tarfile.open(fileobj=archive_file, mode="w") as tf: + tarinfo = tarfile.TarInfo("testfile") + tf.addfile(tarinfo, BytesIO(archive_data)) + tarinfo = tarfile.TarInfo("invalid_link") + tarinfo.type = tarfile.SYMTYPE + tarinfo.linkname = "/" + tf.addfile(tarinfo) + archive_file.seek(0) + + # Create a temporary extraction directory + extract_dir = tempfile.mkdtemp(prefix="collection-archive-extract-test-") + os.makedirs(extract_dir, exist_ok=True) + + # Test extracting the archive with an invalid link destination + with self.assertRaises(ImporterError): + _extract_archive(archive_file, extract_dir) + + # Clean up the temporary extraction directory + os.rmdir(extract_dir)