Skip to content
This repository was archived by the owner on Jul 22, 2025. It is now read-only.

Commit 4db7773

Browse files
committed
Switch to JsonStreamingTracker for partial JSON parsing
1 parent 2ad1d65 commit 4db7773

16 files changed

+113
-110
lines changed

lib/completions/anthropic_message_processor.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ def initialize(name, id, partial_tool_calls: false)
1010
@raw_json = +""
1111
@tool_call = DiscourseAi::Completions::ToolCall.new(id: id, name: name, parameters: {})
1212
@streaming_parser =
13-
DiscourseAi::Completions::ToolCallProgressTracker.new(self) if partial_tool_calls
13+
DiscourseAi::Completions::JsonStreamingTracker.new(self) if partial_tool_calls
1414
end
1515

1616
def append(json)

lib/completions/endpoints/anthropic.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ def prepare_payload(prompt, model_params, dialect)
117117
if model_params[:response_format].present?
118118
prefilled_message << " " if !prefilled_message.empty?
119119
prefilled_message << "{"
120+
@forced_json_through_prefill = true
120121
end
121122

122123
if !prefilled_message.empty?

lib/completions/endpoints/aws_bedrock.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ def prepare_payload(prompt, model_params, dialect)
144144
if model_params[:response_format].present?
145145
prefilled_message << " " if !prefilled_message.empty?
146146
prefilled_message << "{"
147+
@forced_json_through_prefill = true
147148
end
148149

149150
if !prefilled_message.empty?

lib/completions/endpoints/base.rb

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ def perform_completion!(
7373
LlmQuota.check_quotas!(@llm_model, user)
7474
start_time = Time.now
7575

76+
@forced_json_through_prefill = false
7677
@partial_tool_calls = partial_tool_calls
7778
@output_thinking = output_thinking
7879

@@ -109,12 +110,12 @@ def perform_completion!(
109110
structured_output = nil
110111

111112
if model_params[:response_format].present?
112-
response_structure =
113-
model_params[:response_format].dig(:json_schema, :schema, :required)
113+
schema_properties =
114+
model_params[:response_format].dig(:json_schema, :schema, :properties)
114115

115-
if response_structure.present?
116+
if schema_properties.present?
116117
structured_output =
117-
DiscourseAi::Completions::StructuredOutput.new(response_structure.map(&:to_sym))
118+
DiscourseAi::Completions::StructuredOutput.new(schema_properties)
118119
end
119120
end
120121

@@ -135,6 +136,10 @@ def perform_completion!(
135136

136137
request = prepare_request(request_body)
137138

139+
# Some providers rely on prefill to return structured outputs, so the start
140+
# of the JSON won't be included in the response. Supply it to keep JSON valid.
141+
structured_output << +"{" if structured_output && @forced_json_through_prefill
142+
138143
http.request(request) do |response|
139144
if response.code.to_i != 200
140145
Rails.logger.error(

lib/completions/endpoints/canned_response.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,11 @@ def is_structured_output?(response)
9090
end
9191

9292
def as_structured_output(response)
93-
keys = model_params[:response_format].dig(:json_schema, :schema, :properties)&.keys
94-
return response if keys.blank?
93+
schema_properties = model_params[:response_format].dig(:json_schema, :schema, :properties)
94+
return response if schema_properties.blank?
9595

96-
output = DiscourseAi::Completions::StructuredOutput.new(keys)
97-
output << { keys.first => response }.to_json
96+
output = DiscourseAi::Completions::StructuredOutput.new(schema_properties)
97+
output << { schema_properties.keys.first => response }.to_json
9898

9999
output
100100
end

lib/completions/tool_call_progress_tracker.rb renamed to lib/completions/json_streaming_tracker.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
module DiscourseAi
44
module Completions
5-
class ToolCallProgressTracker
6-
attr_reader :current_key, :current_value, :tool_call
5+
class JsonStreamingTracker
6+
attr_reader :current_key, :current_value, :stream_consumer
77

8-
def initialize(tool_call)
9-
@tool_call = tool_call
8+
def initialize(stream_consumer)
9+
@stream_consumer = stream_consumer
1010
@current_key = nil
1111
@current_value = nil
1212
@parser = DiscourseAi::Completions::JsonStreamingParser.new
@@ -18,7 +18,7 @@ def initialize(tool_call)
1818

1919
@parser.value do |v|
2020
if @current_key
21-
tool_call.notify_progress(@current_key, v)
21+
stream_consumer.notify_progress(@current_key, v)
2222
@current_key = nil
2323
end
2424
end
@@ -39,7 +39,7 @@ def <<(json)
3939

4040
if @parser.state == :start_string && @current_key
4141
# this is is worth notifying
42-
tool_call.notify_progress(@current_key, @parser.buf)
42+
stream_consumer.notify_progress(@current_key, @parser.buf)
4343
end
4444

4545
@current_key = nil if @parser.state == :end_value

lib/completions/nova_message_processor.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ def initialize(name, id, partial_tool_calls: false)
1010
@raw_json = +""
1111
@tool_call = DiscourseAi::Completions::ToolCall.new(id: id, name: name, parameters: {})
1212
@streaming_parser =
13-
DiscourseAi::Completions::ToolCallProgressTracker.new(self) if partial_tool_calls
13+
DiscourseAi::Completions::JsonStreamingTracker.new(self) if partial_tool_calls
1414
end
1515

1616
def append(json)

lib/completions/open_ai_message_processor.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def process_streamed_message(json)
5959
if id.present? && name.present?
6060
@tool_arguments = +""
6161
@tool = ToolCall.new(id: id, name: name)
62-
@streaming_parser = ToolCallProgressTracker.new(self) if @partial_tool_calls
62+
@streaming_parser = JsonStreamingTracker.new(self) if @partial_tool_calls
6363
end
6464

6565
@tool_arguments << arguments.to_s

lib/completions/structured_output.rb

Lines changed: 28 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -3,91 +3,49 @@
33
module DiscourseAi
44
module Completions
55
class StructuredOutput
6-
def initialize(property_names)
7-
@raw_response = +""
8-
@state = :awaiting_key
9-
@current_key = +""
10-
@escape = false
11-
12-
@full_output =
13-
property_names.reduce({}) do |memo, pn|
14-
memo[pn.to_sym] = +""
15-
memo
6+
def initialize(json_schema_properties)
7+
@property_names = json_schema_properties.keys.map(&:to_sym)
8+
@property_cursors =
9+
json_schema_properties.reduce({}) do |m, (k, prop)|
10+
m[k.to_sym] = 0 if prop[:type] == "string"
11+
m
1612
end
1713

18-
# Partial output is what we processed in the last chunk.
19-
@partial_output_proto = @full_output.deep_dup
20-
@last_chunk_output = @full_output.deep_dup
14+
@tracked = {}
15+
16+
@partial_json_tracker = JsonStreamingTracker.new(self)
2117
end
2218

23-
attr_reader :full_output, :last_chunk_output
19+
attr_reader :last_chunk_buffer
2420

2521
def <<(raw)
26-
@raw_response << raw
27-
28-
@last_chunk_output = @partial_output_proto.deep_dup
22+
@partial_json_tracker << raw
23+
end
2924

30-
raw.each_char do |char|
31-
case @state
32-
when :awaiting_key
33-
if char == "\""
34-
@current_key = +""
35-
@state = :parsing_key
36-
@escape = false
37-
end
38-
when :parsing_key
39-
if char == "\""
40-
@state = :awaiting_colon
41-
else
42-
@current_key << char
43-
end
44-
when :awaiting_colon
45-
@state = :awaiting_value if char == ":"
46-
when :awaiting_value
47-
if char == '"'
48-
@escape = false
49-
@state = :parsing_value
50-
end
51-
when :parsing_value
52-
if @escape
53-
# Don't add escape sequence until we know what it is
54-
unescaped = unescape_char(char)
55-
@full_output[@current_key.to_sym] << unescaped
56-
@last_chunk_output[@current_key.to_sym] << unescaped
25+
def read_latest_buffered_chunk
26+
@property_names.reduce({}) do |memo, pn|
27+
if @tracked[pn].present?
28+
# This means this property is a string and we want to return unread chunks.
29+
if @property_cursors[pn].present?
30+
unread = @tracked[pn][@property_cursors[pn]..]
5731

58-
@escape = false
59-
elsif char == "\\"
60-
@escape = true
61-
elsif char == "\""
62-
@state = :awaiting_key_or_end
32+
memo[pn] = unread if unread.present?
33+
@property_cursors[pn] = @tracked[pn].length
6334
else
64-
@full_output[@current_key.to_sym] << char
65-
@last_chunk_output[@current_key.to_sym] << char
35+
# Ints and bools are always returned as is.
36+
memo[pn] = @tracked[pn]
6637
end
67-
when :awaiting_key_or_end
68-
@state = :awaiting_key if char == ","
69-
# End of object or whitespace ignored here
70-
else
71-
next
7238
end
39+
40+
memo
7341
end
7442
end
7543

76-
private
77-
78-
def unescape_char(char)
79-
chars = {
80-
'"' => '"',
81-
'\\' => '\\',
82-
"/" => "/",
83-
"b" => "\b",
84-
"f" => "\f",
85-
"n" => "\n",
86-
"r" => "\r",
87-
"t" => "\t",
88-
}
44+
def notify_progress(key, value)
45+
key_sym = key.to_sym
46+
return if !@property_names.include?(key_sym)
8947

90-
chars[char] || char
48+
@tracked[key_sym] = value
9149
end
9250
end
9351
end

lib/personas/bot.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ def reply(context, llm_args: {}, &update_blk)
152152
current_thinking << partial
153153
end
154154
elsif partial.is_a?(DiscourseAi::Completions::StructuredOutput)
155-
update_blk.call(partial.last_chunk_output, cancel, nil, :structured_output)
155+
update_blk.call(partial, cancel, nil, :structured_output)
156156
else
157157
update_blk.call(partial, cancel)
158158
end

0 commit comments

Comments
 (0)