Skip to content

Commit c95774c

Browse files
committed
added unpacking plugin for tenvis pk2 container
1 parent e1cd266 commit c95774c

File tree

7 files changed

+210
-0
lines changed

7 files changed

+210
-0
lines changed

fact_extractor/plugins/unpacking/tenvis_pk2/__init__.py

Whitespace-only changes.

fact_extractor/plugins/unpacking/tenvis_pk2/code/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
import struct
2+
from datetime import datetime
3+
from pathlib import Path
4+
from typing import BinaryIO
5+
6+
NAME = 'tenvis_pk2'
7+
MIME_PATTERNS = ['firmware/pk2']
8+
VERSION = '0.1.0'
9+
10+
PK2_MAGIC = b'PK2\x00'
11+
XOR_KEY = [0xA1, 0x83, 0x24, 0x78, 0xB3, 0x41, 0x43, 0x56]
12+
KEY_LEN = len(XOR_KEY)
13+
14+
15+
class Pk2FileHeader:
16+
"""
17+
Header struct:
18+
0 | 4 | uint32 magic
19+
4 | 4 | uint32 camera type
20+
8 | 4 | uint32 creation time
21+
12 | 4 | char[4] version
22+
16 | 8 | char[8] reserved
23+
24 | 4 | uint32 section count
24+
total size: 28 bytes
25+
"""
26+
27+
size = 28
28+
29+
def __init__(self, fp: BinaryIO):
30+
file_hdr_data = fp.read(self.size)
31+
self.magic = file_hdr_data[:4] # we parse the magic as bytes
32+
self.camera_type, self.creation_time = struct.unpack('<II', file_hdr_data[4:12])
33+
self.creation_time_readable = datetime.fromtimestamp(self.creation_time).isoformat()
34+
self.version = file_hdr_data[12:16]
35+
self.reserved = file_hdr_data[16:24]
36+
self.section_count = struct.unpack('<I', file_hdr_data[24:28])[0]
37+
38+
def to_dict(self):
39+
return {
40+
'magic': self.magic.decode(errors='replace'),
41+
'camera_type': self.camera_type,
42+
'creation_time': self.creation_time_readable,
43+
'version': self.version.rstrip(b'\x00').decode('ascii', errors='replace'),
44+
'reserved': self.reserved.rstrip(b'\x00').decode('ascii', errors='replace'),
45+
'section_count': self.section_count,
46+
}
47+
48+
49+
class Pk2SectionHeader:
50+
"""
51+
0 | 4 | uint32 section type
52+
4 | 16 | char[16] hash
53+
20 | 4 | uint32 payload size
54+
total size: 24 bytes
55+
"""
56+
57+
size = 24
58+
59+
def __init__(self, fp: BinaryIO, offset: int):
60+
fp.seek(offset)
61+
section_header_data = fp.read(self.size)
62+
self.offset = offset
63+
self.type = section_header_data[:4].rstrip(b'\x00').decode('ascii', errors='replace')
64+
self.hash = section_header_data[4:20].hex()
65+
self.payload_size = struct.unpack('<I', section_header_data[20:24])[0]
66+
67+
def to_dict(self):
68+
return self.__dict__
69+
70+
71+
class Pk2File:
72+
"""
73+
0 | 4 | uint32 filename size
74+
4 | x | char[x] filename
75+
4+x | 4 | uint32 data size
76+
x+8 | y | ?? data (XOR encrypted)
77+
total size: 4 + x + 4 + y bytes (== section payload size)
78+
"""
79+
80+
_BLOCK_SIZE = 1024 # we read the file block-wise to save memory
81+
82+
def __init__(self, fp: BinaryIO, offset: int, size: int):
83+
self._fp = fp
84+
self.size = size
85+
self._file_offset = offset
86+
self._fp.seek(self._file_offset)
87+
filename_size = struct.unpack('<I', self._fp.read(4)[:4])[0]
88+
self.filename = self._fp.read(filename_size).rstrip(b'\x00').decode('ascii', errors='replace')
89+
self.data_offset = self._file_offset + 4 + filename_size + 4
90+
self.data_size = struct.unpack('<I', self._fp.read(4))[0]
91+
92+
def save(self, save_dir: Path):
93+
output_path = save_dir / self.filename.lstrip('/')
94+
output_path.parent.mkdir(exist_ok=True, parents=True)
95+
self._fp.seek(self.data_offset)
96+
remaining = self.data_size
97+
with output_path.open('wb') as out_fp:
98+
while remaining > 0:
99+
chunk_size = min(self._BLOCK_SIZE, remaining)
100+
data = self._fp.read(chunk_size)
101+
if not data:
102+
break
103+
output = _decrypt(data)
104+
out_fp.write(output)
105+
remaining -= chunk_size
106+
107+
def to_dict(self):
108+
return {k: v for k, v in self.__dict__.items() if not k.startswith('_')}
109+
110+
111+
class Pk2Cmd:
112+
"""
113+
0 | x | char[x] command (XOR encrypted)
114+
total size: x bytes (== section payload size)
115+
"""
116+
117+
def __init__(self, fp: BinaryIO, offset: int, size: int):
118+
fp.seek(offset)
119+
self.command = _decrypt(fp.read(size)).rstrip(b'\x00').decode('ascii', errors='replace')
120+
self.size = size
121+
122+
def __str__(self):
123+
cmd = repr(self.command)
124+
return f'CMD: {cmd}'
125+
126+
127+
def _decrypt(data: bytes) -> bytearray:
128+
output = bytearray()
129+
for index, char in enumerate(data):
130+
output.append(char ^ XOR_KEY[index % KEY_LEN])
131+
return output
132+
133+
134+
def unpack_function(file_path: str, tmp_dir: str):
135+
input_path, output_dir = Path(file_path), Path(tmp_dir)
136+
meta = {'sections': []}
137+
138+
if input_path.stat().st_size < Pk2FileHeader.size + Pk2SectionHeader.size:
139+
meta['errors'] = ['file too small']
140+
return meta
141+
142+
with input_path.open('rb') as fp:
143+
file_header = Pk2FileHeader(fp)
144+
offset = file_header.size
145+
meta['header'] = file_header.to_dict()
146+
for _ in range(file_header.section_count):
147+
try:
148+
section_header = Pk2SectionHeader(fp, offset)
149+
section_meta = section_header.to_dict()
150+
offset += section_header.size
151+
if section_header.type == 'FILE':
152+
pk2_file = Pk2File(fp, offset, section_header.payload_size)
153+
pk2_file.save(output_dir)
154+
offset += pk2_file.size
155+
section_meta['file'] = pk2_file.to_dict()
156+
elif section_header.type == 'CMD':
157+
pk2_command = Pk2Cmd(fp, offset, section_header.payload_size)
158+
offset += pk2_command.size
159+
section_meta['command'] = pk2_command.command
160+
else:
161+
meta.setdefault('errors', []).append(f'unknown section type: {section_header.type}')
162+
break
163+
meta['sections'].append(section_meta)
164+
except struct.error:
165+
meta.setdefault('errors', []).append(f'error while parsing section at offset {offset}')
166+
break
167+
168+
return {'output': meta}
169+
170+
171+
# ----> Do not edit below this line <----
172+
def setup(unpack_tool):
173+
for item in MIME_PATTERNS:
174+
unpack_tool.register_plugin(item, (unpack_function, NAME, VERSION))

fact_extractor/plugins/unpacking/tenvis_pk2/test/__init__.py

Whitespace-only changes.
Binary file not shown.
Binary file not shown.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from pathlib import Path
2+
3+
from test.unit.unpacker.test_unpacker import TestUnpackerBase
4+
5+
TEST_DATA_DIR = Path(__file__).parent / 'data'
6+
7+
8+
class TestTenvisPk2Unpacker(TestUnpackerBase):
9+
def test_unpacker_selection_generic(self):
10+
self.check_unpacker_selection('firmware/pk2', 'tenvis_pk2')
11+
12+
def test_extraction_pk2(self):
13+
in_file = TEST_DATA_DIR / 'test.pk2'
14+
assert in_file.is_file()
15+
files, meta = self.unpacker.extract_files_from_file(str(in_file), self.tmp_dir.name)
16+
assert meta['plugin_used'] == 'tenvis_pk2'
17+
assert len(files) == 2
18+
assert {Path(f).name for f in files} == {'bar', 'foo'}, 'not all files unpacked'
19+
output_file = Path(sorted(files)[1])
20+
assert output_file.read_bytes() == b'foobar\n', 'files not decrypted correctly'
21+
22+
assert 'sections' in meta['output']
23+
assert len(meta['output']['sections']) == 3
24+
sections_by_offset = {s['offset']: s for s in meta['output']['sections']}
25+
assert set(sections_by_offset) == {28, 67, 111}
26+
assert sections_by_offset[28]['type'] == 'CMD'
27+
assert sections_by_offset[28]['command'] == 'echo "foobar"\n'
28+
29+
def test_extraction_pk2_error(self):
30+
in_file = TEST_DATA_DIR / 'broken.pk2'
31+
assert in_file.is_file()
32+
files, meta = self.unpacker.extract_files_from_file(str(in_file), self.tmp_dir.name)
33+
assert meta['plugin_used'] == 'tenvis_pk2'
34+
assert len(files) == 0
35+
assert 'errors' in meta['output']
36+
assert meta['output']['errors'] == ['error while parsing section at offset 91']

0 commit comments

Comments
 (0)