Skip to content

Commit

Permalink
generic_carver: replaced external call to unblob with use of internal…
Browse files Browse the repository at this point in the history
… methods
  • Loading branch information
jstucke committed Jul 25, 2024
1 parent 53a507a commit 9ffb3e7
Show file tree
Hide file tree
Showing 13 changed files with 106 additions and 224 deletions.
216 changes: 68 additions & 148 deletions fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,159 +4,79 @@
from __future__ import annotations

import logging
import re
import shutil
import traceback
from itertools import chain
from pathlib import Path
from typing import Iterable

from common_helper_process import execute_shell_command
from fact_helper_file import get_file_type_from_path
import structlog
from common_helper_unpacking_classifier import avg_entropy
from unblob.extractor import carve_unknown_chunk, carve_valid_chunk
from unblob.file_utils import File
from unblob.finder import search_chunks
from unblob.handlers import BUILTIN_HANDLERS
from unblob.models import TaskResult, PaddingChunk, UnknownChunk, Chunk
from unblob.processing import Task, remove_inner_chunks, calculate_unknown_chunks

NAME = 'generic_carver'
MIME_PATTERNS = ['generic/carver']
VERSION = '0.9'

TAR_MAGIC = b'ustar'
BZ2_EOF_MAGIC = [ # the magic string is only aligned to half bytes -> two possible strings
b'\x17\x72\x45\x38\x50\x90',
b'\x77\x24\x53\x85\x09',
]
REAL_SIZE_REGEX = re.compile(r'Physical Size = (\d+)')


def unpack_function(file_path, tmp_dir):
'''
file_path specifies the input file.
tmp_dir should be used to store the extracted files.
'''
logging.debug(f'File type unknown: Execute unblob on {file_path}')

temp_file = Path('/tmp/unblob_report.json')
temp_file.unlink(missing_ok=True)
output = execute_shell_command(
f'unblob -sk --report {temp_file.absolute()} --entropy-depth 0 --depth 1 --extract-dir {tmp_dir} {file_path}'
)
meta = temp_file.read_text(encoding='utf-8')
temp_file.unlink(missing_ok=True)

drop_underscore_directory(tmp_dir)
filter_log = ArchivesFilter(tmp_dir).remove_false_positive_archives()

return {
'output': output,
'unblob_meta': meta,
'filter_log': filter_log
}


class ArchivesFilter:
def __init__(self, unpack_directory):
self.unpack_directory = Path(unpack_directory)
self.screening_logs = []

def remove_false_positive_archives(self) -> str:
for file_path in self.unpack_directory.glob('**/*'):
if not file_path.is_file():
continue
file_type = get_file_type_from_path(file_path)['mime']

if file_type == 'application/x-tar' or self._is_possible_tar(file_type, file_path):
self._remove_invalid_archives(file_path, 'tar -tvf {}', 'does not look like a tar archive')

elif file_type == 'application/x-xz':
self._remove_invalid_archives(file_path, 'xz -c -d {} | wc -c')

elif file_type == 'application/gzip':
self._remove_invalid_archives(file_path, 'gzip -c -d {} | wc -c')

elif file_path.suffix == '7z' or file_type in [
'application/x-7z-compressed',
'application/x-lzma',
'application/zip',
'application/zlib',
]:
self._remove_invalid_archives(file_path, '7z l {}', 'ERROR')

if file_path.is_file():
self._remove_trailing_data(file_type, file_path)

return '\n'.join(self.screening_logs)

@staticmethod
def _is_possible_tar(file_type: str, file_path: Path) -> bool:
# broken tar archives may be identified as octet-stream by newer versions of libmagic
if file_type == 'application/octet-stream':
with file_path.open(mode='rb') as fp:
fp.seek(0x101)
return fp.read(5) == TAR_MAGIC
return False

def _remove_invalid_archives(self, file_path: Path, command, search_key=None):
output = execute_shell_command(command.format(file_path))

if search_key and search_key in output.replace('\n ', ''):
self._remove_file(file_path)

elif not search_key and _output_is_empty(output):
self._remove_file(file_path)

def _remove_file(self, file_path):
file_path.unlink()
self.screening_logs.append(f'{file_path.name} was removed (invalid archive)')

def _remove_trailing_data(self, file_type: str, file_path: Path):
trailing_data_index = None

if file_type in ['application/zip', 'application/zlib']:
trailing_data_index = _find_trailing_data_index_zip(file_path)

elif file_type == 'application/x-bzip2':
trailing_data_index = _find_trailing_data_index_bz2(file_path)

if trailing_data_index:
self._resize_file(trailing_data_index, file_path)

def _resize_file(self, actual_size: int, file_path: Path):
with file_path.open('rb') as fp:
actual_content = fp.read(actual_size)
file_path.write_bytes(actual_content)
self.screening_logs.append(f'Removed trailing data at the end of {file_path.name}')


def _output_is_empty(output):
return int((output.split())[-1]) == 0


def _find_trailing_data_index_zip(file_path: Path) -> int | None:
'''Archives carved by binwalk often have trailing data at the end. 7z can determine the actual file size.'''
output = execute_shell_command(f'7z l {file_path}')
if 'There are data after the end of archive' in output:
match = REAL_SIZE_REGEX.search(output)
if match:
return int(match.groups()[0])
return None


def _find_trailing_data_index_bz2(file_path: Path) -> int | None:
output = execute_shell_command(f'bzip2 -t {file_path}')
if 'trailing garbage' in output:
file_content = file_path.read_bytes()
matches = sorted(index for magic in BZ2_EOF_MAGIC if (index := file_content.find(magic)) != -1)
# there may be two matches, but we want the first one (but also not -1 == no match)
if matches:
# 10 is magic string + CRC 32 checksum + padding (see https://en.wikipedia.org/wiki/Bzip2#File_format)
return matches[0] + 10
return None


def drop_underscore_directory(tmp_dir):
extracted_contents = list(Path(tmp_dir).iterdir())
if not extracted_contents:
return
if not len(extracted_contents) == 1 or not extracted_contents[0].name.endswith('_extract'):
return
for result in extracted_contents[0].iterdir():
shutil.move(str(result), str(result.parent.parent))
shutil.rmtree(str(extracted_contents[0]))
VERSION = '1.0.0'

# deactivate internal logger of unblob because it can slow down searching chunks
structlog.configure(wrapper_class=structlog.make_filtering_bound_logger(logging.CRITICAL))


def unpack_function(file_path: str, tmp_dir: str) -> dict:
extraction_dir = Path(tmp_dir)
chunks = []
filter_report = ''
path = Path(file_path)

try:
with File.from_path(path) as file:
for chunk in _find_chunks(path, file):
if isinstance(chunk, PaddingChunk):
continue
if isinstance(chunk, UnknownChunk):
if _has_low_entropy(file, chunk):
filter_report += (
f'removed chunk {chunk.start_offset}-{chunk.end_offset} (reason: low entropy)\n'
)
continue
carve_unknown_chunk(extraction_dir, file, chunk)
else:
carve_valid_chunk(extraction_dir, file, chunk)
chunks.append(chunk.as_report(None).asdict())

report = _create_report(chunks) if chunks else 'No valid chunks found.'
if filter_report:
report += f'\nFiltered chunks:\n{filter_report}'
except Exception as error:
report = f"Error {error} during unblob extraction:\n{traceback.format_exc()}"
return {'output': report}


def _find_chunks(file_path: Path, file: File) -> Iterable[Chunk]:
task = Task(path=file_path, depth=0, blob_id='')
known_chunks = remove_inner_chunks(search_chunks(file, file.size(), BUILTIN_HANDLERS, TaskResult(task)))
unknown_chunks = calculate_unknown_chunks(known_chunks, file.size())
yield from chain(known_chunks, unknown_chunks)


def _create_report(chunk_list: list[dict]) -> str:
report = ['Extracted chunks:']
for chunk in sorted(chunk_list, key=lambda c: c['start_offset']):
chunk_type = chunk.get('handler_name', 'unknown')
report.append(
f'start: {chunk["start_offset"]}, end: {chunk["end_offset"]}, size: {chunk["size"]}, type: {chunk_type}'
)
return '\n'.join(report)


def _has_low_entropy(file: File, chunk: UnknownChunk) -> bool:
file.seek(chunk.start_offset)
content = file.read(chunk.size)
return avg_entropy(content) < 0.01


# ----> Do not edit below this line <----
Expand Down
5 changes: 1 addition & 4 deletions fact_extractor/plugins/unpacking/generic_carver/install.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
#!/usr/bin/env bash

# This setup is largely ripped of from emba @ https://github.com/e-m-b-a/emba/blob/master/installer/IP61_unblob.sh
# Thanks to m-1-k-3 and the emba team!

cd "$( dirname "${BASH_SOURCE[0]}" )" || exit 1

echo "------------------------------------"
echo " install unblob via poetry "
echo " install unblob dependencies "
echo "------------------------------------"

sudo -EH apt-get install -y e2fsprogs img2simg lziprecover xz-utils libmagic1 libhyperscan5
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from pathlib import Path

from helperFunctions.file_system import get_test_data_dir
from test.unit.unpacker.test_unpacker import TestUnpackerBase

# pylint: disable=protected-access

TEST_DATA_DIR = Path(__file__).parent / 'data'


class TestGenericCarver(TestUnpackerBase):
def test_unpacker_selection_generic(self):
self.check_unpacker_selection('generic/carver', 'generic_carver')

def test_extraction(self):
in_file = f'{get_test_data_dir()}/generic_carver_test'
files, meta_data = self.unpacker._extract_files_from_file_using_specific_unpacker(
in_file, self.tmp_dir.name, self.unpacker.unpacker_plugins['generic/carver']
)
files = set(files)
assert len(files) == 3, 'file number incorrect'
assert f'{self.tmp_dir.name}/100-887.zip' in files, 'hidden zip not identified correctly'
assert 'output' in meta_data

def test_filter(self):
in_file = TEST_DATA_DIR / 'carving_test_file'
assert Path(in_file).is_file()
files, meta_data = self.unpacker._extract_files_from_file_using_specific_unpacker(
str(in_file), self.tmp_dir.name, self.unpacker.unpacker_plugins['generic/carver']
)
files = set(files)
assert len(files) == 4, 'file number incorrect'
assert 'removed chunk 300-428' in meta_data['output']
for file in ('0-128.unknown', '128-300.zip', '428-562.sevenzip', '562-626.unknown'):
assert f'{self.tmp_dir.name}/{file}' in files

This file was deleted.

3 changes: 2 additions & 1 deletion requirements-unpackers.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ extract-dtb~=1.2.3
# uefi
uefi-firmware~=1.11
# unblob
unblob
# FixMe: pin to next stable version; the latest release is missing a bug fix related to zip64
git+https://github.com/onekey-sec/unblob.git@e0d9805

0 comments on commit 9ffb3e7

Please sign in to comment.