|
| 1 | +# |
| 2 | +# GGUF file reading/modification support. For API usage information, |
| 3 | +# please see the files scripts/ for some fairly simple examples. |
| 4 | +# |
| 5 | +from __future__ import annotations |
| 6 | + |
| 7 | +import logging |
| 8 | +import os |
| 9 | +from collections import OrderedDict |
| 10 | +from typing import Any, Literal, NamedTuple, TypeVar, Union |
| 11 | + |
| 12 | +import numpy as np |
| 13 | +import numpy.typing as npt |
| 14 | + |
| 15 | +def quant_shape_to_byte_shape(shape: Sequence[int], quant_type: GGMLQuantizationType) -> tuple[int, ...]: |
| 16 | + block_size, type_size = GGML_QUANT_SIZES[quant_type] |
| 17 | + if shape[-1] % block_size != 0: |
| 18 | + raise ValueError(f"Quantized tensor row size ({shape[-1]}) is not a multiple of {quant_type.name} block size ({block_size})") |
| 19 | + return (*shape[:-1], shape[-1] // block_size * type_size) |
| 20 | + |
| 21 | +if __name__ == "__main__": |
| 22 | + import sys |
| 23 | + from pathlib import Path |
| 24 | + |
| 25 | + # Allow running file in package as a script. |
| 26 | + sys.path.insert(0, str(Path(__file__).parent.parent)) |
| 27 | + |
| 28 | +from gguf.constants import ( |
| 29 | + GGML_QUANT_SIZES, |
| 30 | + GGUF_DEFAULT_ALIGNMENT, |
| 31 | + GGUF_MAGIC, |
| 32 | + GGUF_VERSION, |
| 33 | + GGMLQuantizationType, |
| 34 | + GGUFValueType, |
| 35 | +) |
| 36 | + |
| 37 | +logger = logging.getLogger(__name__) |
| 38 | + |
| 39 | +READER_SUPPORTED_VERSIONS = [2, GGUF_VERSION] |
| 40 | + |
| 41 | + |
| 42 | +class ReaderField(NamedTuple): |
| 43 | + # Offset to start of this field. |
| 44 | + offset: int |
| 45 | + |
| 46 | + # Name of the field (not necessarily from file data). |
| 47 | + name: str |
| 48 | + |
| 49 | + # Data parts. Some types have multiple components, such as strings |
| 50 | + # that consist of a length followed by the string data. |
| 51 | + parts: list[npt.NDArray[Any]] = [] |
| 52 | + |
| 53 | + # Indexes into parts that we can call the actual data. For example |
| 54 | + # an array of strings will be populated with indexes to the actual |
| 55 | + # string data. |
| 56 | + data: list[int] = [-1] |
| 57 | + |
| 58 | + types: list[GGUFValueType] = [] |
| 59 | + |
| 60 | + |
| 61 | +class ReaderTensor(NamedTuple): |
| 62 | + name: str |
| 63 | + tensor_type: GGMLQuantizationType |
| 64 | + shape: npt.NDArray[np.uint32] |
| 65 | + n_elements: int |
| 66 | + n_bytes: int |
| 67 | + data_offset: int |
| 68 | + data: npt.NDArray[Any] |
| 69 | + field: ReaderField |
| 70 | + |
| 71 | + |
| 72 | +class GGUFReader: |
| 73 | + # I - same as host, S - swapped |
| 74 | + byte_order: Literal['I', 'S'] = 'I' |
| 75 | + alignment: int = GGUF_DEFAULT_ALIGNMENT |
| 76 | + data_offset: int |
| 77 | + |
| 78 | + # Note: Internal helper, API may change. |
| 79 | + gguf_scalar_to_np: dict[GGUFValueType, type[np.generic]] = { |
| 80 | + GGUFValueType.UINT8: np.uint8, |
| 81 | + GGUFValueType.INT8: np.int8, |
| 82 | + GGUFValueType.UINT16: np.uint16, |
| 83 | + GGUFValueType.INT16: np.int16, |
| 84 | + GGUFValueType.UINT32: np.uint32, |
| 85 | + GGUFValueType.INT32: np.int32, |
| 86 | + GGUFValueType.FLOAT32: np.float32, |
| 87 | + GGUFValueType.UINT64: np.uint64, |
| 88 | + GGUFValueType.INT64: np.int64, |
| 89 | + GGUFValueType.FLOAT64: np.float64, |
| 90 | + GGUFValueType.BOOL: np.bool_, |
| 91 | + } |
| 92 | + |
| 93 | + def __init__(self, path: os.PathLike[str] | str, mode: Literal['r', 'r+', 'c'] = 'r'): |
| 94 | + self.data = np.memmap(path, mode = mode) |
| 95 | + offs = 0 |
| 96 | + |
| 97 | + # Check for GGUF magic |
| 98 | + if self._get(offs, np.uint32, override_order = '<')[0] != GGUF_MAGIC: |
| 99 | + raise ValueError('GGUF magic invalid') |
| 100 | + offs += 4 |
| 101 | + |
| 102 | + # Check GGUF version |
| 103 | + temp_version = self._get(offs, np.uint32) |
| 104 | + if temp_version[0] & 65535 == 0: |
| 105 | + # If we get 0 here that means it's (probably) a GGUF file created for |
| 106 | + # the opposite byte order of the machine this script is running on. |
| 107 | + self.byte_order = 'S' |
| 108 | + temp_version = temp_version.newbyteorder(self.byte_order) |
| 109 | + version = temp_version[0] |
| 110 | + if version not in READER_SUPPORTED_VERSIONS: |
| 111 | + raise ValueError(f'Sorry, file appears to be version {version} which we cannot handle') |
| 112 | + self.fields: OrderedDict[str, ReaderField] = OrderedDict() |
| 113 | + self.tensors: list[ReaderTensor] = [] |
| 114 | + offs += self._push_field(ReaderField(offs, 'GGUF.version', [temp_version], [0], [GGUFValueType.UINT32])) |
| 115 | + |
| 116 | + # Check tensor count and kv count |
| 117 | + temp_counts = self._get(offs, np.uint64, 2) |
| 118 | + offs += self._push_field(ReaderField(offs, 'GGUF.tensor_count', [temp_counts[:1]], [0], [GGUFValueType.UINT64])) |
| 119 | + offs += self._push_field(ReaderField(offs, 'GGUF.kv_count', [temp_counts[1:]], [0], [GGUFValueType.UINT64])) |
| 120 | + tensor_count, kv_count = temp_counts |
| 121 | + offs = self._build_fields(offs, kv_count) |
| 122 | + |
| 123 | + # Build Tensor Info Fields |
| 124 | + offs, tensors_fields = self._build_tensor_info(offs, tensor_count) |
| 125 | + new_align = self.fields.get('general.alignment') |
| 126 | + if new_align is not None: |
| 127 | + if new_align.types != [GGUFValueType.UINT32]: |
| 128 | + raise ValueError('Bad type for general.alignment field') |
| 129 | + self.alignment = new_align.parts[-1][0] |
| 130 | + padding = offs % self.alignment |
| 131 | + if padding != 0: |
| 132 | + offs += self.alignment - padding |
| 133 | + self.data_offset = offs |
| 134 | + self._build_tensors(offs, tensors_fields) |
| 135 | + |
| 136 | + _DT = TypeVar('_DT', bound = npt.DTypeLike) |
| 137 | + |
| 138 | + # Fetch a key/value metadata field by key. |
| 139 | + def get_field(self, key: str) -> Union[ReaderField, None]: |
| 140 | + return self.fields.get(key, None) |
| 141 | + |
| 142 | + # Fetch a tensor from the list by index. |
| 143 | + def get_tensor(self, idx: int) -> ReaderTensor: |
| 144 | + return self.tensors[idx] |
| 145 | + |
| 146 | + def _get( |
| 147 | + self, offset: int, dtype: npt.DTypeLike, count: int = 1, override_order: None | Literal['I', 'S', '<'] = None, |
| 148 | + ) -> npt.NDArray[Any]: |
| 149 | + count = int(count) |
| 150 | + itemsize = int(np.empty([], dtype = dtype).itemsize) |
| 151 | + end_offs = offset + itemsize * count |
| 152 | + arr = self.data[offset:end_offs].view(dtype=dtype)[:count] |
| 153 | + if override_order is None: |
| 154 | + return arr |
| 155 | + return arr.view(arr.dtype.newbyteorder(override_order)) |
| 156 | + |
| 157 | + def _push_field(self, field: ReaderField, skip_sum: bool = False) -> int: |
| 158 | + if field.name in self.fields: |
| 159 | + # TODO: add option to generate error on duplicate keys |
| 160 | + # raise KeyError(f'Duplicate {field.name} already in list at offset {field.offset}') |
| 161 | + |
| 162 | + logger.warning(f'Duplicate key {field.name} at offset {field.offset}') |
| 163 | + self.fields[field.name + '_{}'.format(field.offset)] = field |
| 164 | + else: |
| 165 | + self.fields[field.name] = field |
| 166 | + return 0 if skip_sum else sum(int(part.nbytes) for part in field.parts) |
| 167 | + |
| 168 | + def _get_str(self, offset: int) -> tuple[npt.NDArray[np.uint64], npt.NDArray[np.uint8]]: |
| 169 | + slen = self._get(offset, np.uint64) |
| 170 | + return slen, self._get(offset + 8, np.uint8, slen[0]) |
| 171 | + |
| 172 | + def _get_field_parts( |
| 173 | + self, orig_offs: int, raw_type: int, |
| 174 | + ) -> tuple[int, list[npt.NDArray[Any]], list[int], list[GGUFValueType]]: |
| 175 | + offs = orig_offs |
| 176 | + types: list[GGUFValueType] = [] |
| 177 | + gtype = GGUFValueType(raw_type) |
| 178 | + types.append(gtype) |
| 179 | + # Handle strings. |
| 180 | + if gtype == GGUFValueType.STRING: |
| 181 | + sparts: list[npt.NDArray[Any]] = list(self._get_str(offs)) |
| 182 | + size = sum(int(part.nbytes) for part in sparts) |
| 183 | + return size, sparts, [1], types |
| 184 | + # Check if it's a simple scalar type. |
| 185 | + nptype = self.gguf_scalar_to_np.get(gtype) |
| 186 | + if nptype is not None: |
| 187 | + val = self._get(offs, nptype) |
| 188 | + return int(val.nbytes), [val], [0], types |
| 189 | + # Handle arrays. |
| 190 | + if gtype == GGUFValueType.ARRAY: |
| 191 | + raw_itype = self._get(offs, np.uint32) |
| 192 | + offs += int(raw_itype.nbytes) |
| 193 | + alen = self._get(offs, np.uint64) |
| 194 | + offs += int(alen.nbytes) |
| 195 | + aparts: list[npt.NDArray[Any]] = [raw_itype, alen] |
| 196 | + data_idxs: list[int] = [] |
| 197 | + for idx in range(alen[0]): |
| 198 | + curr_size, curr_parts, curr_idxs, curr_types = self._get_field_parts(offs, raw_itype[0]) |
| 199 | + if idx == 0: |
| 200 | + types += curr_types |
| 201 | + idxs_offs = len(aparts) |
| 202 | + aparts += curr_parts |
| 203 | + data_idxs += (idx + idxs_offs for idx in curr_idxs) |
| 204 | + offs += curr_size |
| 205 | + return offs - orig_offs, aparts, data_idxs, types |
| 206 | + # We can't deal with this one. |
| 207 | + raise ValueError('Unknown/unhandled field type {gtype}') |
| 208 | + |
| 209 | + def _get_tensor_info_field(self, orig_offs: int) -> ReaderField: |
| 210 | + offs = orig_offs |
| 211 | + |
| 212 | + # Get Tensor Name |
| 213 | + name_len, name_data = self._get_str(offs) |
| 214 | + offs += int(name_len.nbytes + name_data.nbytes) |
| 215 | + |
| 216 | + # Get Tensor Dimensions Count |
| 217 | + n_dims = self._get(offs, np.uint32) |
| 218 | + offs += int(n_dims.nbytes) |
| 219 | + |
| 220 | + # Get Tensor Dimension Array |
| 221 | + dims = self._get(offs, np.uint64, n_dims[0]) |
| 222 | + offs += int(dims.nbytes) |
| 223 | + |
| 224 | + # Get Tensor Encoding Scheme Type |
| 225 | + raw_dtype = self._get(offs, np.uint32) |
| 226 | + offs += int(raw_dtype.nbytes) |
| 227 | + |
| 228 | + # Get Tensor Offset |
| 229 | + offset_tensor = self._get(offs, np.uint64) |
| 230 | + offs += int(offset_tensor.nbytes) |
| 231 | + |
| 232 | + return ReaderField( |
| 233 | + orig_offs, |
| 234 | + str(bytes(name_data), encoding = 'utf-8'), |
| 235 | + [name_len, name_data, n_dims, dims, raw_dtype, offset_tensor], |
| 236 | + [1, 3, 4, 5], |
| 237 | + ) |
| 238 | + |
| 239 | + def _build_fields(self, offs: int, count: int) -> int: |
| 240 | + for _ in range(count): |
| 241 | + orig_offs = offs |
| 242 | + kv_klen, kv_kdata = self._get_str(offs) |
| 243 | + offs += int(kv_klen.nbytes + kv_kdata.nbytes) |
| 244 | + raw_kv_type = self._get(offs, np.uint32) |
| 245 | + offs += int(raw_kv_type.nbytes) |
| 246 | + parts: list[npt.NDArray[Any]] = [kv_klen, kv_kdata, raw_kv_type] |
| 247 | + idxs_offs = len(parts) |
| 248 | + field_size, field_parts, field_idxs, field_types = self._get_field_parts(offs, raw_kv_type[0]) |
| 249 | + parts += field_parts |
| 250 | + self._push_field(ReaderField( |
| 251 | + orig_offs, |
| 252 | + str(bytes(kv_kdata), encoding = 'utf-8'), |
| 253 | + parts, |
| 254 | + [idx + idxs_offs for idx in field_idxs], |
| 255 | + field_types, |
| 256 | + ), skip_sum = True) |
| 257 | + offs += field_size |
| 258 | + return offs |
| 259 | + |
| 260 | + def _build_tensor_info(self, offs: int, count: int) -> tuple[int, list[ReaderField]]: |
| 261 | + tensor_fields = [] |
| 262 | + for _ in range(count): |
| 263 | + field = self._get_tensor_info_field(offs) |
| 264 | + offs += sum(int(part.nbytes) for part in field.parts) |
| 265 | + tensor_fields.append(field) |
| 266 | + return offs, tensor_fields |
| 267 | + |
| 268 | + def _build_tensors(self, start_offs: int, fields: list[ReaderField]) -> None: |
| 269 | + tensors = [] |
| 270 | + tensor_names = set() # keep track of name to prevent duplicated tensors |
| 271 | + for field in fields: |
| 272 | + _name_len, name_data, _n_dims, dims, raw_dtype, offset_tensor = field.parts |
| 273 | + # check if there's any tensor having same name already in the list |
| 274 | + tensor_name = str(bytes(name_data), encoding = 'utf-8') |
| 275 | + if tensor_name in tensor_names: |
| 276 | + raise ValueError(f'Found duplicated tensor with name {tensor_name}') |
| 277 | + tensor_names.add(tensor_name) |
| 278 | + ggml_type = GGMLQuantizationType(raw_dtype[0]) |
| 279 | + n_elems = int(np.prod(dims)) |
| 280 | + np_dims = tuple(reversed(dims.tolist())) |
| 281 | + block_size, type_size = GGML_QUANT_SIZES[ggml_type] |
| 282 | + n_bytes = n_elems * type_size // block_size |
| 283 | + data_offs = int(start_offs + offset_tensor[0]) |
| 284 | + item_type: npt.DTypeLike |
| 285 | + if ggml_type == GGMLQuantizationType.F16: |
| 286 | + item_count = n_elems |
| 287 | + item_type = np.float16 |
| 288 | + elif ggml_type == GGMLQuantizationType.F32: |
| 289 | + item_count = n_elems |
| 290 | + item_type = np.float32 |
| 291 | + elif ggml_type == GGMLQuantizationType.F64: |
| 292 | + item_count = n_elems |
| 293 | + item_type = np.float64 |
| 294 | + elif ggml_type == GGMLQuantizationType.I8: |
| 295 | + item_count = n_elems |
| 296 | + item_type = np.int8 |
| 297 | + elif ggml_type == GGMLQuantizationType.I16: |
| 298 | + item_count = n_elems |
| 299 | + item_type = np.int16 |
| 300 | + elif ggml_type == GGMLQuantizationType.I32: |
| 301 | + item_count = n_elems |
| 302 | + item_type = np.int32 |
| 303 | + elif ggml_type == GGMLQuantizationType.I64: |
| 304 | + item_count = n_elems |
| 305 | + item_type = np.int64 |
| 306 | + else: |
| 307 | + item_count = n_bytes |
| 308 | + item_type = np.uint8 |
| 309 | + np_dims = quant_shape_to_byte_shape(np_dims, ggml_type) |
| 310 | + tensors.append(ReaderTensor( |
| 311 | + name = tensor_name, |
| 312 | + tensor_type = ggml_type, |
| 313 | + shape = dims, |
| 314 | + n_elements = n_elems, |
| 315 | + n_bytes = n_bytes, |
| 316 | + data_offset = data_offs, |
| 317 | + data = self._get(data_offs, item_type, item_count).reshape(np_dims), |
| 318 | + field = field, |
| 319 | + )) |
| 320 | + self.tensors = tensors |
0 commit comments