Skip to content

Commit

Permalink
Raise exceptions on invalid tarball paths. (#250)
Browse files Browse the repository at this point in the history
Issue: AAH-2992

Signed-off-by: James Tanner <[email protected]>
  • Loading branch information
jctanner authored Dec 5, 2023
1 parent 2225280 commit e0a4223
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGES/2992.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Raise exceptions with invalid paths in collection tarballs.
10 changes: 8 additions & 2 deletions galaxy_importer/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ def _import_collection(file, filename, file_url, logger, cfg):
def _extract_archive(fileobj, extract_dir):
fileobj.seek(0)
with tarfile.open(fileobj=fileobj, mode="r") as tf:
if any((item.startswith("/") or item.startswith("../")) for item in tf.getnames()):
raise exc.ImporterError("Invalid file paths detected.")
for item in tf.getmembers():
if item.name.startswith("/") or "../" in item.name:
raise exc.ImporterError("Invalid file paths detected.")
if item.linkname:
# Ensure the link target is within the extraction root
link_target = os.path.normpath(os.path.join(extract_dir, item.linkname))
if not link_target.startswith(os.path.abspath(extract_dir)):
raise exc.ImporterError("Invalid link target detected.")
tf.extractall(extract_dir)
121 changes: 121 additions & 0 deletions tests/unit/test_collection_archive_extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# (c) 2012-2023, Ansible by Red Hat
#
# This file is part of Ansible Galaxy
#
# Ansible Galaxy is free software: you can redistribute it and/or modify
# it under the terms of the Apache License as published by
# the Apache Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# Ansible Galaxy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Apache License for more details.
#
# You should have received a copy of the Apache License
# along with Galaxy. If not, see <http://www.apache.org/licenses/>.

import os
import shutil
import tarfile
import tempfile
import unittest
from io import BytesIO

from galaxy_importer.collection import _extract_archive
from galaxy_importer.exceptions import ImporterError


class TestCollectionExtractArchive(unittest.TestCase):
def test_valid_archive(self):
# Create a valid tar archive with no invalid paths
archive_data = b"testfile content"
archive_file = BytesIO()
with tarfile.open(fileobj=archive_file, mode="w") as tf:
tarinfo = tarfile.TarInfo("testfile")
tarinfo.size = len(archive_data)
tf.addfile(tarinfo, BytesIO(archive_data))
archive_file.seek(0)

# Create a temporary extraction directory
extract_dir = tempfile.mkdtemp(prefix="collection-archive-extract-test-")
os.makedirs(extract_dir, exist_ok=True)

try:
# Test extracting the archive
_extract_archive(archive_file, extract_dir)
finally:
pass

# Assert that the extracted file exists
extracted_file_path = os.path.join(extract_dir, "testfile")
self.assertTrue(os.path.isfile(extracted_file_path))

# Clean up the temporary extraction directory
shutil.rmtree(extract_dir)

def test_invalid_path_with_absolute_path(self):
# Create an archive with invalid paths
archive_data = b"testfile content"
archive_file = BytesIO()
with tarfile.open(fileobj=archive_file, mode="w") as tf:
tarinfo = tarfile.TarInfo("/invalid_path")
tarinfo.size = len(archive_data)
tf.addfile(tarinfo, BytesIO(archive_data))
archive_file.seek(0)

# Create a temporary extraction directory
extract_dir = tempfile.mkdtemp(prefix="collection-archive-extract-test-")
os.makedirs(extract_dir, exist_ok=True)

# Test extracting the archive with invalid paths
with self.assertRaises(ImporterError):
_extract_archive(archive_file, extract_dir)

# Clean up the temporary extraction directory
shutil.rmtree(extract_dir)

def test_invalid_parent_reference(self):
# Create an archive with invalid paths
archive_data = b"testfile content"
archive_file = BytesIO()
with tarfile.open(fileobj=archive_file, mode="w") as tf:
tarinfo = tarfile.TarInfo("../invalid_path")
tarinfo.size = len(archive_data)
tf.addfile(tarinfo, BytesIO(archive_data))
archive_file.seek(0)

# Create a temporary extraction directory
extract_dir = tempfile.mkdtemp(prefix="collection-archive-extract-test-")
os.makedirs(extract_dir, exist_ok=True)

# Test extracting the archive with invalid paths
with self.assertRaises(ImporterError):
_extract_archive(archive_file, extract_dir)

# Clean up the temporary extraction directory
shutil.rmtree(extract_dir)

def test_invalid_link_destination(self):
# Create an archive with a link whose destination is "/"
archive_data = b"testfile content"
archive_file = BytesIO()
with tarfile.open(fileobj=archive_file, mode="w") as tf:
tarinfo = tarfile.TarInfo("testfile")
tf.addfile(tarinfo, BytesIO(archive_data))
tarinfo = tarfile.TarInfo("invalid_link")
tarinfo.type = tarfile.SYMTYPE
tarinfo.linkname = "/"
tf.addfile(tarinfo)
archive_file.seek(0)

# Create a temporary extraction directory
extract_dir = tempfile.mkdtemp(prefix="collection-archive-extract-test-")
os.makedirs(extract_dir, exist_ok=True)

# Test extracting the archive with an invalid link destination
with self.assertRaises(ImporterError):
_extract_archive(archive_file, extract_dir)

# Clean up the temporary extraction directory
os.rmdir(extract_dir)

0 comments on commit e0a4223

Please sign in to comment.