Skip to content

Commit 5b71783

Browse files
committed
Refactor func_metadata() implementation
1 parent 40acbc5 commit 5b71783

File tree

3 files changed

+89
-118
lines changed

3 files changed

+89
-118
lines changed

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ dependencies = [
3333
"uvicorn>=0.31.1; sys_platform != 'emscripten'",
3434
"jsonschema>=4.20.0",
3535
"pywin32>=310; sys_platform == 'win32'",
36+
"typing-extensions>=4.9.0",
37+
"typing-inspection>=0.4.1",
3638
]
3739

3840
[project.optional-dependencies]

src/mcp/server/fastmcp/utilities/func_metadata.py

Lines changed: 83 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from collections.abc import Awaitable, Callable, Sequence
44
from itertools import chain
55
from types import GenericAlias
6-
from typing import Annotated, Any, ForwardRef, cast, get_args, get_origin, get_type_hints
6+
from typing import Annotated, Any, cast, get_args, get_origin, get_type_hints
77

88
import pydantic_core
99
from pydantic import (
@@ -14,10 +14,10 @@
1414
WithJsonSchema,
1515
create_model,
1616
)
17-
from pydantic._internal._typing_extra import eval_type_backport
1817
from pydantic.fields import FieldInfo
1918
from pydantic.json_schema import GenerateJsonSchema, JsonSchemaWarningKind
20-
from pydantic_core import PydanticUndefined
19+
from typing_extensions import is_typeddict
20+
from typing_inspection.introspection import UNKNOWN, AnnotationSource, inspect_annotation
2121

2222
from mcp.server.fastmcp.exceptions import InvalidSignature
2323
from mcp.server.fastmcp.utilities.logging import get_logger
@@ -205,56 +205,47 @@ def func_metadata(
205205
- output_model: A pydantic model for the return type if output is structured
206206
- output_conversion: Records how function output should be converted before returning.
207207
"""
208-
sig = _get_typed_signature(func)
208+
try:
209+
sig = inspect.signature(func, eval_str=True)
210+
except NameError as e:
211+
# This raise could perhaps be skipped, and we (FastMCP) just call
212+
# model_rebuild right before using it 🤷
213+
raise InvalidSignature(f"Unable to evaluate type annotations for callable {func.__name__!r}") from e
209214
params = sig.parameters
210215
dynamic_pydantic_model_params: dict[str, Any] = {}
211-
globalns = getattr(func, "__globals__", {})
212216
for param in params.values():
213217
if param.name.startswith("_"):
214218
raise InvalidSignature(f"Parameter {param.name} of {func.__name__} cannot start with '_'")
215219
if param.name in skip_names:
216220
continue
217-
annotation = param.annotation
218-
219-
# `x: None` / `x: None = None`
220-
if annotation is None:
221-
annotation = Annotated[
222-
None,
223-
Field(default=param.default if param.default is not inspect.Parameter.empty else PydanticUndefined),
224-
]
225-
226-
# Untyped field
227-
if annotation is inspect.Parameter.empty:
228-
annotation = Annotated[
229-
Any,
230-
Field(),
231-
# 🤷
232-
WithJsonSchema({"title": param.name, "type": "string"}),
233-
]
234-
235-
field_info = FieldInfo.from_annotated_attribute(
236-
_get_typed_annotation(annotation, globalns),
237-
param.default if param.default is not inspect.Parameter.empty else PydanticUndefined,
238-
)
239221

222+
annotation = param.annotation if param.annotation is not inspect.Parameter.empty else Any
223+
field_name = param.name
224+
field_kwargs: dict[str, Any] = {}
225+
field_metadata: list[Any] = []
226+
227+
if param.annotation is inspect.Parameter.empty:
228+
field_metadata.append(WithJsonSchema({"title": param.name, "type": "string"}))
240229
# Check if the parameter name conflicts with BaseModel attributes
241230
# This is necessary because Pydantic warns about shadowing parent attributes
242-
if hasattr(BaseModel, param.name) and callable(getattr(BaseModel, param.name)):
231+
if hasattr(BaseModel, field_name) and callable(getattr(BaseModel, field_name)):
243232
# Use an alias to avoid the shadowing warning
244-
field_info.alias = param.name
245-
field_info.validation_alias = param.name
246-
field_info.serialization_alias = param.name
247-
# Use a prefixed internal name
248-
internal_name = f"field_{param.name}"
249-
dynamic_pydantic_model_params[internal_name] = (field_info.annotation, field_info)
233+
field_kwargs["alias"] = field_name
234+
# Use a prefixed field name
235+
field_name = f"field_{field_name}"
236+
237+
if param.default is not inspect.Parameter.empty:
238+
dynamic_pydantic_model_params[field_name] = (
239+
Annotated[(annotation, *field_metadata, Field(**field_kwargs))],
240+
param.default,
241+
)
250242
else:
251-
dynamic_pydantic_model_params[param.name] = (field_info.annotation, field_info)
252-
continue
243+
dynamic_pydantic_model_params[field_name] = Annotated[(annotation, *field_metadata, Field(**field_kwargs))]
253244

254245
arguments_model = create_model(
255246
f"{func.__name__}Arguments",
256-
**dynamic_pydantic_model_params,
257247
__base__=ArgModelBase,
248+
**dynamic_pydantic_model_params,
258249
)
259250

260251
if structured_output is False:
@@ -265,15 +256,21 @@ def func_metadata(
265256
if sig.return_annotation is inspect.Parameter.empty and structured_output is True:
266257
raise InvalidSignature(f"Function {func.__name__}: return annotation required for structured output")
267258

268-
output_info = FieldInfo.from_annotation(_get_typed_annotation(sig.return_annotation, globalns))
269-
annotation = output_info.annotation
259+
inspected_return_ann = inspect_annotation(sig.return_annotation, annotation_source=AnnotationSource.FUNCTION)
260+
return_type_expr = inspected_return_ann.type
261+
if return_type_expr is UNKNOWN and structured_output is True:
262+
# `return_type_expr` is `UNKNOWN` when a bare type qualifier is used (unlikely to happen as a return annotation
263+
# because it doesn't make any sense, but technically possible).
264+
raise InvalidSignature(f"Function {func.__name__}: return annotation required for structured output")
270265

271-
output_model, output_schema, wrap_output = _try_create_model_and_schema(annotation, func.__name__, output_info)
266+
output_model, output_schema, wrap_output = _try_create_model_and_schema(
267+
sig.return_annotation, return_type_expr, func.__name__
268+
)
272269

273270
if output_model is None and structured_output is True:
274271
# Model creation failed or produced warnings - no structured output
275272
raise InvalidSignature(
276-
f"Function {func.__name__}: return type {annotation} is not serializable for structured output"
273+
f"Function {func.__name__}: return type {return_type_expr} is not serializable for structured output"
277274
)
278275

279276
return FuncMetadata(
@@ -285,10 +282,18 @@ def func_metadata(
285282

286283

287284
def _try_create_model_and_schema(
288-
annotation: Any, func_name: str, field_info: FieldInfo
285+
annotation: Any,
286+
type_expr: Any,
287+
func_name: str,
289288
) -> tuple[type[BaseModel] | None, dict[str, Any] | None, bool]:
290289
"""Try to create a model and schema for the given annotation without warnings.
291290
291+
Args:
292+
annotation: The original return annotation (may be wrapped in `Annotated` or type qualifiers).
293+
type_expr: The underlying type expression derived from the return annotation
294+
(`Annotated` and type qualifiers were stripped).
295+
func_name: The name of the function.
296+
292297
Returns:
293298
tuple of (model or None, schema or None, wrap_output)
294299
Model and schema are None if warnings occur or creation fails.
@@ -298,58 +303,58 @@ def _try_create_model_and_schema(
298303
wrap_output = False
299304

300305
# First handle special case: None
301-
if annotation is None:
302-
model = _create_wrapped_model(func_name, annotation, field_info)
306+
if type_expr is None:
307+
model = _create_wrapped_model(func_name, type_expr)
303308
wrap_output = True
304309

305310
# Handle GenericAlias types (list[str], dict[str, int], Union[str, int], etc.)
306-
elif isinstance(annotation, GenericAlias):
307-
origin = get_origin(annotation)
311+
elif isinstance(type_expr, GenericAlias):
312+
origin = get_origin(type_expr)
308313

309314
# Special case: dict with string keys can use RootModel
310315
if origin is dict:
311-
args = get_args(annotation)
316+
args = get_args(type_expr)
312317
if len(args) == 2 and args[0] is str:
313-
model = _create_dict_model(func_name, annotation)
318+
model = _create_dict_model(func_name, type_expr)
314319
else:
315320
# dict with non-str keys needs wrapping
316-
model = _create_wrapped_model(func_name, annotation, field_info)
321+
model = _create_wrapped_model(func_name, type_expr)
317322
wrap_output = True
318323
else:
319324
# All other generic types need wrapping (list, tuple, Union, Optional, etc.)
320-
model = _create_wrapped_model(func_name, annotation, field_info)
325+
model = _create_wrapped_model(func_name, type_expr)
321326
wrap_output = True
322327

323328
# Handle regular type objects
324-
elif isinstance(annotation, type):
325-
type_annotation: type[Any] = cast(type[Any], annotation)
329+
elif isinstance(type_expr, type):
330+
type_annotation = cast(type[Any], type_expr)
326331

327332
# Case 1: BaseModel subclasses (can be used directly)
328-
if issubclass(annotation, BaseModel):
329-
model = annotation
333+
if issubclass(type_annotation, BaseModel):
334+
model = type_annotation
330335

331-
# Case 2: TypedDict (special dict subclass with __annotations__)
332-
elif hasattr(type_annotation, "__annotations__") and issubclass(annotation, dict):
336+
# Case 2: TypedDicts:
337+
elif is_typeddict(type_annotation):
333338
model = _create_model_from_typeddict(type_annotation)
334339

335340
# Case 3: Primitive types that need wrapping
336-
elif annotation in (str, int, float, bool, bytes, type(None)):
337-
model = _create_wrapped_model(func_name, annotation, field_info)
341+
elif type_annotation in (str, int, float, bool, bytes, type(None)):
342+
model = _create_wrapped_model(func_name, type_annotation)
338343
wrap_output = True
339344

340345
# Case 4: Other class types (dataclasses, regular classes with annotations)
341346
else:
342347
type_hints = get_type_hints(type_annotation)
343348
if type_hints:
344349
# Classes with type hints can be converted to Pydantic models
345-
model = _create_model_from_class(type_annotation)
350+
model = _create_model_from_class(type_annotation, type_hints)
346351
# Classes without type hints are not serializable - model remains None
347352

348353
# Handle any other types not covered above
349354
else:
350355
# This includes typing constructs that aren't GenericAlias in Python 3.10
351356
# (e.g., Union, Optional in some Python versions)
352-
model = _create_wrapped_model(func_name, annotation, field_info)
357+
model = _create_wrapped_model(func_name, type_expr)
353358
wrap_output = True
354359

355360
if model:
@@ -363,40 +368,39 @@ def _try_create_model_and_schema(
363368
# ValueError: When there are issues with the type definition (including our custom warnings)
364369
# SchemaError: When Pydantic can't build a schema
365370
# ValidationError: When validation fails
366-
logger.info(f"Cannot create schema for type {annotation} in {func_name}: {type(e).__name__}: {e}")
371+
logger.info(f"Cannot create schema for type {type_expr} in {func_name}: {type(e).__name__}: {e}")
367372
return None, None, False
368373

369374
return model, schema, wrap_output
370375

371376
return None, None, False
372377

373378

374-
def _create_model_from_class(cls: type[Any]) -> type[BaseModel]:
379+
_no_default = object()
380+
381+
382+
def _create_model_from_class(cls: type[Any], type_hints: dict[str, Any]) -> type[BaseModel]:
375383
"""Create a Pydantic model from an ordinary class.
376384
377385
The created model will:
378386
- Have the same name as the class
379387
- Have fields with the same names and types as the class's fields
380388
- Include all fields whose type does not include None in the set of required fields
381389
382-
Precondition: cls must have type hints (i.e., get_type_hints(cls) is non-empty)
390+
Precondition: cls must have type hints (i.e., `type_hints` is non-empty)
383391
"""
384-
type_hints = get_type_hints(cls)
385-
386392
model_fields: dict[str, Any] = {}
387393
for field_name, field_type in type_hints.items():
388394
if field_name.startswith("_"):
389395
continue
390396

391-
default = getattr(cls, field_name, PydanticUndefined)
392-
field_info = FieldInfo.from_annotated_attribute(field_type, default)
393-
model_fields[field_name] = (field_info.annotation, field_info)
394-
395-
# Create a base class with the config
396-
class BaseWithConfig(BaseModel):
397-
model_config = ConfigDict(from_attributes=True)
397+
default = getattr(cls, field_name, _no_default)
398+
if default is _no_default:
399+
model_fields[field_name] = field_type
400+
else:
401+
model_fields[field_name] = (field_type, default)
398402

399-
return create_model(cls.__name__, **model_fields, __base__=BaseWithConfig)
403+
return create_model(cls.__name__, __config__=ConfigDict(from_attributes=True), **model_fields)
400404

401405

402406
def _create_model_from_typeddict(td_type: type[Any]) -> type[BaseModel]:
@@ -409,20 +413,18 @@ def _create_model_from_typeddict(td_type: type[Any]) -> type[BaseModel]:
409413

410414
model_fields: dict[str, Any] = {}
411415
for field_name, field_type in type_hints.items():
412-
field_info = FieldInfo.from_annotation(field_type)
413-
414416
if field_name not in required_keys:
415417
# For optional TypedDict fields, set default=None
416418
# This makes them not required in the Pydantic model
417419
# The model should use exclude_unset=True when dumping to get TypedDict semantics
418-
field_info.default = None
419-
420-
model_fields[field_name] = (field_info.annotation, field_info)
420+
model_fields[field_name] = (field_type, None)
421+
else:
422+
model_fields[field_name] = field_type
421423

422-
return create_model(td_type.__name__, **model_fields, __base__=BaseModel)
424+
return create_model(td_type.__name__, **model_fields)
423425

424426

425-
def _create_wrapped_model(func_name: str, annotation: Any, field_info: FieldInfo) -> type[BaseModel]:
427+
def _create_wrapped_model(func_name: str, annotation: Any) -> type[BaseModel]:
426428
"""Create a model that wraps a type in a 'result' field.
427429
428430
This is used for primitive types, generic types like list/dict, etc.
@@ -433,7 +435,7 @@ def _create_wrapped_model(func_name: str, annotation: Any, field_info: FieldInfo
433435
if annotation is None:
434436
annotation = type(None)
435437

436-
return create_model(model_name, result=(annotation, field_info), __base__=BaseModel)
438+
return create_model(model_name, result=annotation)
437439

438440

439441
def _create_dict_model(func_name: str, dict_annotation: Any) -> type[BaseModel]:
@@ -449,43 +451,6 @@ class DictModel(RootModel[dict_annotation]):
449451
return DictModel
450452

451453

452-
def _get_typed_annotation(annotation: Any, globalns: dict[str, Any]) -> Any:
453-
def try_eval_type(value: Any, globalns: dict[str, Any], localns: dict[str, Any]) -> tuple[Any, bool]:
454-
try:
455-
return eval_type_backport(value, globalns, localns), True
456-
except NameError:
457-
return value, False
458-
459-
if isinstance(annotation, str):
460-
annotation = ForwardRef(annotation)
461-
annotation, status = try_eval_type(annotation, globalns, globalns)
462-
463-
# This check and raise could perhaps be skipped, and we (FastMCP) just call
464-
# model_rebuild right before using it 🤷
465-
if status is False:
466-
raise InvalidSignature(f"Unable to evaluate type annotation {annotation}")
467-
468-
return annotation
469-
470-
471-
def _get_typed_signature(call: Callable[..., Any]) -> inspect.Signature:
472-
"""Get function signature while evaluating forward references"""
473-
signature = inspect.signature(call)
474-
globalns = getattr(call, "__globals__", {})
475-
typed_params = [
476-
inspect.Parameter(
477-
name=param.name,
478-
kind=param.kind,
479-
default=param.default,
480-
annotation=_get_typed_annotation(param.annotation, globalns),
481-
)
482-
for param in signature.parameters.values()
483-
]
484-
typed_return = _get_typed_annotation(signature.return_annotation, globalns)
485-
typed_signature = inspect.Signature(typed_params, return_annotation=typed_return)
486-
return typed_signature
487-
488-
489454
def _convert_to_content(
490455
result: Any,
491456
) -> Sequence[ContentBlock]:

uv.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)