Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use fw magic (2) #163

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions fact_extractor/helperFunctions/magic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""This is a wrapper around pymagic.
It aims to provide the same API but with the ability to load multiple magic
files in the default api.
"""

from __future__ import annotations

import os
from os import PathLike

import magic as pymagic

from helperFunctions.file_system import get_src_dir

# On ubuntu this is provided by the libmagic-mgc package
_default_magic = os.getenv('MAGIC', '/usr/lib/file/magic.mgc')
_fw_magic = f'{get_src_dir()}/bin/firmware'
_magic_file = f'{_fw_magic}:{_default_magic}'

_instances = {}


def _get_magic_instance(**kwargs):
"""Returns an instance of pymagic.Magic"""
# Dicts are not hashable but sorting and creating a tuple is a valid hash
key = hash(tuple(sorted(kwargs.items())))
instance = _instances.get(key)
if instance is None:
instance = _instances[key] = pymagic.Magic(**kwargs)
return instance


def from_file(filename: bytes | str | PathLike, magic_file: str | None = _magic_file, **kwargs) -> str:
"""Like pymagic's ``magic.from_file`` but it accepts all keyword arguments
that ``magic.Magic`` accepts.
"""
instance = _get_magic_instance(magic_file=magic_file, **kwargs)
return instance.from_file(filename)


def from_buffer(buf: bytes | str, magic_file: str | None = _magic_file, **kwargs) -> str:
"""Like pymagic's ``magic.from_buffer`` but it accepts all keyword arguments
that ``magic.Magic`` accepts.
"""
instance = _get_magic_instance(magic_file=magic_file, **kwargs)
return instance.from_buffer(buf)
36 changes: 23 additions & 13 deletions fact_extractor/helperFunctions/statistics.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
from configparser import ConfigParser
from __future__ import annotations

from contextlib import suppress
from pathlib import Path
from typing import Dict, List
from typing import TYPE_CHECKING

from common_helper_files import safe_rglob
from common_helper_unpacking_classifier import (
avg_entropy, get_binary_size_without_padding, is_compressed
)
from fact_helper_file import get_file_type_from_path
from common_helper_unpacking_classifier import avg_entropy, get_binary_size_without_padding, is_compressed

from helperFunctions import magic
from helperFunctions.config import read_list_from_config

if TYPE_CHECKING:
from configparser import ConfigParser
from pathlib import Path


def add_unpack_statistics(extraction_dir: Path, meta_data: Dict):
def add_unpack_statistics(extraction_dir: Path, meta_data: dict):
unpacked_files, unpacked_directories = 0, 0
for extracted_item in safe_rglob(extraction_dir):
if extracted_item.is_file():
Expand All @@ -23,21 +26,28 @@ def add_unpack_statistics(extraction_dir: Path, meta_data: Dict):
meta_data['number_of_unpacked_directories'] = unpacked_directories


def get_unpack_status(file_path: str, binary: bytes, extracted_files: List[Path], meta_data: Dict, config: ConfigParser):
def get_unpack_status(
file_path: str, binary: bytes, extracted_files: list[Path], meta_data: dict, config: ConfigParser
):
meta_data['summary'] = []
meta_data['entropy'] = avg_entropy(binary)

if not extracted_files and meta_data.get('number_of_excluded_files', 0) == 0:
if get_file_type_from_path(file_path)['mime'] in read_list_from_config(config, 'ExpertSettings', 'compressed_file_types')\
or not is_compressed(binary, compress_entropy_threshold=config.getfloat('ExpertSettings', 'unpack_threshold'), classifier=avg_entropy):
if magic.from_file(file_path, mime=True) in read_list_from_config(
config, 'ExpertSettings', 'compressed_file_types'
) or not is_compressed(
binary,
compress_entropy_threshold=config.getfloat('ExpertSettings', 'unpack_threshold'),
classifier=avg_entropy,
):
meta_data['summary'] = ['unpacked']
else:
meta_data['summary'] = ['packed']
else:
_detect_unpack_loss(binary, extracted_files, meta_data, config.getint('ExpertSettings', 'header_overhead'))


def _detect_unpack_loss(binary: bytes, extracted_files: List[Path], meta_data: Dict, header_overhead: int):
def _detect_unpack_loss(binary: bytes, extracted_files: list[Path], meta_data: dict, header_overhead: int):
decoding_overhead = 1 - meta_data.get('encoding_overhead', 0)
cleaned_size = get_binary_size_without_padding(binary) * decoding_overhead - header_overhead
size_of_extracted_files = _total_size_of_extracted_files(extracted_files)
Expand All @@ -46,7 +56,7 @@ def _detect_unpack_loss(binary: bytes, extracted_files: List[Path], meta_data: D
meta_data['summary'] = ['data lost'] if cleaned_size > size_of_extracted_files else ['no data lost']


def _total_size_of_extracted_files(extracted_files: List[Path]) -> int:
def _total_size_of_extracted_files(extracted_files: list[Path]) -> int:
total_size = 0
for item in extracted_files:
with suppress(OSError):
Expand Down
41 changes: 33 additions & 8 deletions fact_extractor/install/common.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import logging
import os
from contextlib import suppress
import subprocess as sp
from pathlib import Path

from helperFunctions.config import load_config
from helperFunctions.install import (
apt_install_packages, apt_update_sources, pip_install_packages, load_requirements_file
OperateInDirectory,
apt_install_packages,
apt_update_sources,
load_requirements_file,
pip_install_packages,
)

APT_DEPENDENCIES = {
Expand All @@ -30,13 +33,35 @@
],
}
PIP_DEPENDENCY_FILE = Path(__file__).parent.parent.parent / 'requirements-common.txt'
BIN_DIR = Path(__file__).parent.parent / 'bin'


def install_apt_dependencies(distribution: str):
apt_install_packages(*APT_DEPENDENCIES['common'])
apt_install_packages(*APT_DEPENDENCIES[distribution])


def _install_magic():
with OperateInDirectory(BIN_DIR):
sp.run(
[
'wget',
'--output-document',
'firmware.xz',
'https://github.com/fkie-cad/firmware-magic-database/releases/download/v0.2.2/firmware.xz',
],
check=True,
)
sp.run(
[
'unxz',
'--force',
'firmware.xz',
],
check=False,
)


def main(distribution):
logging.info('Updating package lists')
apt_update_sources()
Expand All @@ -45,13 +70,13 @@ def main(distribution):
install_apt_dependencies(distribution)
pip_install_packages(*load_requirements_file(PIP_DEPENDENCY_FILE))

# make bin dir
with suppress(FileExistsError):
os.mkdir('../bin')
BIN_DIR.mkdir(exist_ok=True)

_install_magic()

config = load_config('main.cfg')
data_folder = config.get('unpack', 'data_folder')
os.makedirs(str(Path(data_folder, 'files')), exist_ok=True)
os.makedirs(str(Path(data_folder, 'reports')), exist_ok=True)
Path(data_folder, 'files').mkdir(parents=True, exist_ok=True)
Path(data_folder, 'reports').mkdir(exist_ok=True)

return 0
2 changes: 1 addition & 1 deletion fact_extractor/install/pre_install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ echo "Install Pre-Install Requirements"
(apt-get update && apt-get install sudo) || true

sudo apt-get update
sudo apt-get -y install git apt-transport-https ca-certificates curl software-properties-common wget libmagic-dev
sudo apt-get -y install git apt-transport-https ca-certificates curl software-properties-common wget libmagic-dev xz-utils

IS_VENV=$(python3 -c 'import sys; print(sys.exec_prefix!=sys.base_prefix)')
if [[ $IS_VENV == "False" ]]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'''
"""
This plugin unpacks all files via carving
'''
"""

from __future__ import annotations

import logging
Expand All @@ -9,7 +10,8 @@
from pathlib import Path

from common_helper_process import execute_shell_command
from fact_helper_file import get_file_type_from_path

from helperFunctions import magic

NAME = 'generic_carver'
MIME_PATTERNS = ['generic/carver']
Expand All @@ -24,10 +26,10 @@


def unpack_function(file_path, tmp_dir):
'''
"""
file_path specifies the input file.
tmp_dir should be used to store the extracted files.
'''
"""

logging.debug(f'File Type unknown: execute binwalk on {file_path}')
output = execute_shell_command(f'binwalk --extract --carve --signature --directory {tmp_dir} {file_path}')
Expand All @@ -45,7 +47,7 @@ def remove_false_positive_archives(self) -> str:
for file_path in self.unpack_directory.glob('**/*'):
if not file_path.is_file():
continue
file_type = get_file_type_from_path(file_path)['mime']
file_type = magic.from_file(file_path, mime=True)

if file_type == 'application/x-tar' or self._is_possible_tar(file_type, file_path):
self._remove_invalid_archives(file_path, 'tar -tvf {}', 'does not look like a tar archive')
Expand Down Expand Up @@ -81,10 +83,7 @@ def _is_possible_tar(file_type: str, file_path: Path) -> bool:
def _remove_invalid_archives(self, file_path: Path, command, search_key=None):
output = execute_shell_command(command.format(file_path))

if search_key and search_key in output.replace('\n ', ''):
self._remove_file(file_path)

elif not search_key and _output_is_empty(output):
if search_key and search_key in output.replace('\n ', '') or not search_key and _output_is_empty(output):
self._remove_file(file_path)

def _remove_file(self, file_path):
Expand Down Expand Up @@ -115,7 +114,7 @@ def _output_is_empty(output):


def _find_trailing_data_index_zip(file_path: Path) -> int | None:
'''Archives carved by binwalk often have trailing data at the end. 7z can determine the actual file size.'''
"""Archives carved by binwalk often have trailing data at the end. 7z can determine the actual file size."""
output = execute_shell_command(f'7z l {file_path}')
if 'There are data after the end of archive' in output:
match = REAL_SIZE_REGEX.search(output)
Expand All @@ -140,7 +139,7 @@ def drop_underscore_directory(tmp_dir):
extracted_contents = list(Path(tmp_dir).iterdir())
if not extracted_contents:
return
if not len(extracted_contents) == 1 or not extracted_contents[0].name.endswith('.extracted'):
if len(extracted_contents) != 1 or not extracted_contents[0].name.endswith('.extracted'):
return
for result in extracted_contents[0].iterdir():
shutil.move(str(result), str(result.parent.parent))
Expand Down
23 changes: 16 additions & 7 deletions fact_extractor/plugins/unpacking/generic_fs/code/generic_fs.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
'''
"""
This plugin mounts filesystem images and extracts their content
'''
"""

import re
from shlex import split
from subprocess import run, PIPE, STDOUT
from subprocess import PIPE, STDOUT, run
from tempfile import TemporaryDirectory
from time import sleep

from fact_helper_file import get_file_type_from_path
from helperFunctions import magic

NAME = 'genericFS'
MIME_PATTERNS = [
'filesystem/btrfs', 'filesystem/dosmbr', 'filesystem/f2fs', 'filesystem/jfs', 'filesystem/minix',
'filesystem/reiserfs', 'filesystem/romfs', 'filesystem/udf', 'filesystem/xfs', 'generic/fs',
'filesystem/btrfs',
'filesystem/dosmbr',
'filesystem/f2fs',
'filesystem/jfs',
'filesystem/minix',
'filesystem/reiserfs',
'filesystem/romfs',
'filesystem/udf',
'filesystem/xfs',
'generic/fs',
]
VERSION = '0.6.1'
TYPES = {
Expand All @@ -28,7 +37,7 @@


def unpack_function(file_path, tmp_dir):
mime_type = get_file_type_from_path(file_path)['mime']
mime_type = magic.from_file(file_path, mime=True)
if mime_type == 'filesystem/dosmbr':
output = _mount_from_boot_record(file_path, tmp_dir)
else:
Expand Down
Binary file added fact_extractor/test/data/ros_header
Binary file not shown.
14 changes: 14 additions & 0 deletions fact_extractor/test/unit/test_mime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from pathlib import Path

from helperFunctions import magic
from helperFunctions.file_system import get_fact_bin_dir, get_test_data_dir


def test_magic():
firmware_magic_path = Path(get_fact_bin_dir()) / 'firmware'
assert firmware_magic_path.is_file()

assert (
magic.from_file(f'{get_test_data_dir()}/ros_header', mime=True) == 'firmware/ros'
), 'firmware-magic-database is not loaded'
assert magic.from_file(f'{get_test_data_dir()}/container/test.zip', mime=True) == 'application/zip'
Loading
Loading