Skip to content
157 changes: 140 additions & 17 deletions src/openai/_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import io
import os
import pathlib
from typing import overload
import mimetypes
import asyncio
from typing import Optional, overload
from typing_extensions import TypeGuard

import anyio

from ._types import (
FileTypes,
FileContent,
Expand Down Expand Up @@ -61,15 +61,59 @@ def to_httpx_files(files: RequestFiles | None) -> HttpxRequestFiles | None:


def _transform_file(file: FileTypes) -> HttpxFileTypes:
if is_tuple_t(file):
name = file[0]
content = read_file_content(file[1])

if len(file) >= 3 and file[2] is not None:
if len(file) >= 4:
return (name, content, file[2], file[3])
return (name, content, file[2])

inferred: Optional[str] = _guess_content_type_from_filename(name)
if inferred is None:
if isinstance(content, (bytes, bytearray)):
inferred = _sniff_content_type_from_bytes(bytes(content)) or "application/octet-stream"
elif isinstance(file[1], os.PathLike):
try:
inferred = _guess_content_type_from_filename(pathlib.Path(file[1]).name) # type: ignore[arg-type]
except Exception:
inferred = None

if len(file) >= 4:
return (name, content, inferred, file[3])
if inferred is not None:
return (name, content, inferred)
return (name, content)

if is_file_content(file):
if isinstance(file, os.PathLike):
path = pathlib.Path(file)
return (path.name, path.read_bytes())

return file

if is_tuple_t(file):
return (file[0], read_file_content(file[1]), *file[2:])
data = path.read_bytes()
ctype = _guess_content_type_from_filename(path.name) or _sniff_content_type_from_bytes(data)
if ctype is not None:
return (path.name, data, ctype)
return (path.name, data)

elif isinstance(file, (bytes, bytearray)):
data = bytes(file)
ctype = _sniff_content_type_from_bytes(data) or "application/octet-stream"
name = _default_filename_for_content_type(ctype)
return (name, data, ctype)

elif isinstance(file, io.IOBase):
file_name = None
try:
name_attr = getattr(file, "name", None)
if isinstance(name_attr, str):
file_name = os.path.basename(name_attr)
except Exception:
file_name = None

ctype = _guess_content_type_from_filename(file_name)
if ctype is not None:
return (file_name, file, ctype)
return (file_name, file)

raise TypeError(f"Expected file types input to be a FileContent type or to be a tuple")

Expand Down Expand Up @@ -103,21 +147,100 @@ async def async_to_httpx_files(files: RequestFiles | None) -> HttpxRequestFiles


async def _async_transform_file(file: FileTypes) -> HttpxFileTypes:
if is_tuple_t(file):
name = file[0]
content = await async_read_file_content(file[1])

if len(file) >= 3 and file[2] is not None:
if len(file) >= 4:
return (name, content, file[2], file[3])
return (name, content, file[2])

inferred: Optional[str] = _guess_content_type_from_filename(name)
if inferred is None:
if isinstance(content, (bytes, bytearray)):
inferred = _sniff_content_type_from_bytes(bytes(content)) or "application/octet-stream"
elif isinstance(file[1], os.PathLike):
try:
inferred = _guess_content_type_from_filename(pathlib.Path(file[1]).name) # type: ignore[arg-type]
except Exception:
inferred = None

if len(file) >= 4:
return (name, content, inferred, file[3])
if inferred is not None:
return (name, content, inferred)
return (name, content)

if is_file_content(file):
if isinstance(file, os.PathLike):
path = anyio.Path(file)
return (path.name, await path.read_bytes())

return file

if is_tuple_t(file):
return (file[0], await async_read_file_content(file[1]), *file[2:])
name = os.path.basename(os.fspath(file))
data = await asyncio.to_thread(lambda: pathlib.Path(file).read_bytes())
ctype = _guess_content_type_from_filename(name) or _sniff_content_type_from_bytes(data)
if ctype is not None:
return (name, data, ctype)
return (name, data)

elif isinstance(file, (bytes, bytearray)):
data = bytes(file)
ctype = _sniff_content_type_from_bytes(data) or "application/octet-stream"
name = _default_filename_for_content_type(ctype)
return (name, data, ctype)

elif isinstance(file, io.IOBase):
file_name = None
try:
name_attr = getattr(file, "name", None)
if isinstance(name_attr, str):
file_name = os.path.basename(name_attr)
except Exception:
file_name = None

ctype = _guess_content_type_from_filename(file_name)
if ctype is not None:
return (file_name, file, ctype)
return (file_name, file)

raise TypeError(f"Expected file types input to be a FileContent type or to be a tuple")


async def async_read_file_content(file: FileContent) -> HttpxFileContent:
if isinstance(file, os.PathLike):
return await anyio.Path(file).read_bytes()
return await asyncio.to_thread(lambda: pathlib.Path(file).read_bytes())

return file


def _guess_content_type_from_filename(filename: Optional[str]) -> Optional[str]:
if not filename:
return None
guessed, _ = mimetypes.guess_type(filename)
return guessed


def _sniff_content_type_from_bytes(data: bytes) -> Optional[str]:
# PDF: %PDF-
if len(data) >= 4 and data[:4] == b"%PDF":
return "application/pdf"
# PNG: 89 50 4E 47 0D 0A 1A 0A
if len(data) >= 8 and data[:8] == b"\x89PNG\r\n\x1a\n":
return "image/png"
# JPEG: FF D8 FF
if len(data) >= 3 and data[:3] == b"\xff\xd8\xff":
return "image/jpeg"
# GIF: GIF87a or GIF89a
if len(data) >= 6 and (data[:6] == b"GIF87a" or data[:6] == b"GIF89a"):
return "image/gif"
return None


def _default_filename_for_content_type(content_type: str) -> str:
if content_type == "application/pdf":
return "upload.pdf"
if content_type == "image/png":
return "upload.png"
if content_type == "image/jpeg":
return "upload.jpg"
if content_type == "image/gif":
return "upload.gif"
return "upload.bin"