Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace JSON parsing with pydantic from_json #422

Open
jackmpcollins opened this issue Feb 2, 2025 · 0 comments
Open

Replace JSON parsing with pydantic from_json #422

jackmpcollins opened this issue Feb 2, 2025 · 0 comments

Comments

@jackmpcollins
Copy link
Owner

Use from_json from pydantic to replace the existing streamed JSON parsing:

@dataclass
class JsonArrayParserState:
"""State of the parser for a streamed JSON array."""
array_level: int = 0
object_level: int = 0
in_string: bool = False
is_escaped: bool = False
is_element_separator: bool = False
def update(self, char: str) -> None:
if self.in_string:
if char == '"' and not self.is_escaped:
self.in_string = False
elif char == '"':
self.in_string = True
elif char == ",":
if self.array_level == 1 and self.object_level == 0:
self.is_element_separator = True
return
elif char == "[":
self.array_level += 1
elif char == "]":
self.array_level -= 1
if self.array_level == 0:
self.is_element_separator = True
return
elif char == "{":
self.object_level += 1
elif char == "}":
self.object_level -= 1
elif char == "\\":
self.is_escaped = not self.is_escaped
else:
self.is_escaped = False
self.is_element_separator = False
def iter_streamed_json_array(chunks: Iterable[str]) -> Iterable[str]:
"""Convert a streamed JSON array into an iterable of JSON object strings.
This ignores all characters before the start of the first array i.e. the first "["
"""
iter_chars: Iterator[str] = chain.from_iterable(chunks)
parser_state = JsonArrayParserState()
iter_chars = dropwhile(lambda x: x != "[", iter_chars)
parser_state.update(next(iter_chars))
item_chars: list[str] = []
for char in iter_chars:
parser_state.update(char)
if parser_state.is_element_separator:
if item_chars:
yield "".join(item_chars).strip()
item_chars = []
else:
item_chars.append(char)
async def aiter_streamed_json_array(chunks: AsyncIterable[str]) -> AsyncIterable[str]:
"""Async version of `iter_streamed_json_array`."""
async def chars_generator() -> AsyncIterable[str]:
async for chunk in chunks:
for char in chunk:
yield char
iter_chars = chars_generator()
parser_state = JsonArrayParserState()
async for char in iter_chars:
if char == "[":
break
parser_state.update("[")
item_chars: list[str] = []
async for char in iter_chars:
parser_state.update(char)
if parser_state.is_element_separator:
if item_chars:
yield "".join(item_chars).strip()
item_chars = []
else:
item_chars.append(char)

https://docs.pydantic.dev/latest/concepts/json/#partial-json-parsing

from pydantic_core import from_json

partial_json_data = '["aa", "bb", "c'  

result = from_json(partial_json_data, allow_partial=True)
print(result)  
#> ['aa', 'bb']

Something like

current_object = 0
accumlated_chunks = ""
for chunk in stream:
    accumulated_chunks += chunk
    objects = from_json(accumulated_chunks)
    # Current object is complete when next object is being generated
    if len(objects) > current_object + 1:
        yield objects[-1]
        current_object += 1
yield objects[-1]

Performance is probably not a concern as the LLM generation will most likely be the bottleneck.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant