Skip to content

Commit c31d169

Browse files
authored
feat(ollama): streaming (#279)
1 parent 910c196 commit c31d169

File tree

7 files changed

+507
-54
lines changed

7 files changed

+507
-54
lines changed
+219
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Prism\Prism\Providers\Ollama\Handlers;
6+
7+
use Generator;
8+
use Illuminate\Http\Client\PendingRequest;
9+
use Illuminate\Http\Client\RequestException;
10+
use Illuminate\Http\Client\Response;
11+
use Prism\Prism\Concerns\CallsTools;
12+
use Prism\Prism\Enums\FinishReason;
13+
use Prism\Prism\Exceptions\PrismChunkDecodeException;
14+
use Prism\Prism\Exceptions\PrismException;
15+
use Prism\Prism\Exceptions\PrismRateLimitedException;
16+
use Prism\Prism\Providers\Ollama\Concerns\MapsFinishReason;
17+
use Prism\Prism\Providers\Ollama\Concerns\MapsToolCalls;
18+
use Prism\Prism\Providers\Ollama\Maps\MessageMap;
19+
use Prism\Prism\Providers\Ollama\Maps\ToolMap;
20+
use Prism\Prism\Text\Chunk;
21+
use Prism\Prism\Text\Request;
22+
use Prism\Prism\ValueObjects\Messages\AssistantMessage;
23+
use Prism\Prism\ValueObjects\Messages\ToolResultMessage;
24+
use Psr\Http\Message\StreamInterface;
25+
use Throwable;
26+
27+
class Stream
28+
{
29+
use CallsTools, MapsFinishReason, MapsToolCalls;
30+
31+
public function __construct(protected PendingRequest $client) {}
32+
33+
/**
34+
* @return Generator<Chunk>
35+
*/
36+
public function handle(Request $request): Generator
37+
{
38+
$response = $this->sendRequest($request);
39+
40+
yield from $this->processStream($response, $request);
41+
}
42+
43+
/**
44+
* @return Generator<Chunk>
45+
*/
46+
protected function processStream(Response $response, Request $request, int $depth = 0): Generator
47+
{
48+
49+
if ($depth >= $request->maxSteps()) {
50+
throw new PrismException('Maximum tool call chain depth exceeded');
51+
}
52+
53+
$text = '';
54+
$toolCalls = [];
55+
56+
while (! $response->getBody()->eof()) {
57+
$data = $this->parseNextDataLine($response->getBody());
58+
59+
if ($data === null) {
60+
continue;
61+
}
62+
63+
if ($this->hasToolCalls($data)) {
64+
$toolCalls = $this->extractToolCalls($data, $toolCalls);
65+
66+
continue;
67+
}
68+
69+
if ($this->mapFinishReason($data) === FinishReason::ToolCalls) {
70+
yield from $this->handleToolCalls($request, $text, $toolCalls, $depth);
71+
72+
return;
73+
}
74+
75+
$content = data_get($data, 'message.content', '') ?? '';
76+
$text .= $content;
77+
78+
$finishReason = (bool) data_get($data, 'done', false)
79+
? FinishReason::Stop
80+
: FinishReason::Unknown;
81+
82+
yield new Chunk(
83+
text: $content,
84+
finishReason: $finishReason !== FinishReason::Unknown ? $finishReason : null
85+
);
86+
}
87+
}
88+
89+
/**
90+
* @return array<string, mixed>|null Parsed JSON data or null if line should be skipped
91+
*/
92+
protected function parseNextDataLine(StreamInterface $stream): ?array
93+
{
94+
$line = $this->readLine($stream);
95+
96+
if (in_array(trim($line), ['', '0'], true)) {
97+
return null;
98+
}
99+
100+
try {
101+
return json_decode($line, true, flags: JSON_THROW_ON_ERROR);
102+
} catch (Throwable $e) {
103+
throw new PrismChunkDecodeException('Ollama', $e);
104+
}
105+
}
106+
107+
/**
108+
* @param array<string, mixed> $data
109+
* @param array<int, array<string, mixed>> $toolCalls
110+
* @return array<int, array<string, mixed>>
111+
*/
112+
protected function extractToolCalls(array $data, array $toolCalls): array
113+
{
114+
foreach (data_get($data, 'message.tool_calls', []) as $index => $toolCall) {
115+
if ($name = data_get($toolCall, 'function.name')) {
116+
$toolCalls[$index]['name'] = $name;
117+
$toolCalls[$index]['arguments'] = '';
118+
$toolCalls[$index]['id'] = data_get($toolCall, 'id');
119+
}
120+
121+
if ($arguments = data_get($toolCall, 'function.arguments')) {
122+
123+
$argumentValue = is_array($arguments) ? json_encode($arguments) : $arguments;
124+
$toolCalls[$index]['arguments'] .= $argumentValue;
125+
}
126+
}
127+
128+
return $toolCalls;
129+
}
130+
131+
/**
132+
* @param array<int, array<string, mixed>> $toolCalls
133+
* @return Generator<Chunk>
134+
*/
135+
protected function handleToolCalls(
136+
Request $request,
137+
string $text,
138+
array $toolCalls,
139+
int $depth
140+
): Generator {
141+
142+
$toolCalls = $this->mapToolCalls($toolCalls);
143+
144+
$toolResults = $this->callTools($request->tools(), $toolCalls);
145+
146+
$request->addMessage(new AssistantMessage($text, $toolCalls));
147+
$request->addMessage(new ToolResultMessage($toolResults));
148+
149+
yield new Chunk(
150+
text: '',
151+
toolCalls: $toolCalls,
152+
toolResults: $toolResults,
153+
);
154+
155+
$nextResponse = $this->sendRequest($request);
156+
yield from $this->processStream($nextResponse, $request, $depth + 1);
157+
}
158+
159+
/**
160+
* @param array<string, mixed> $data
161+
*/
162+
protected function hasToolCalls(array $data): bool
163+
{
164+
return (bool) data_get($data, 'message.tool_calls');
165+
}
166+
167+
protected function sendRequest(Request $request): Response
168+
{
169+
if (count($request->systemPrompts()) > 1) {
170+
throw new PrismException('Ollama does not support multiple system prompts using withSystemPrompt / withSystemPrompts. However, you can provide additional system prompts by including SystemMessages in with withMessages.');
171+
}
172+
173+
try {
174+
return $this
175+
->client
176+
->withOptions(['stream' => true])
177+
->throw()
178+
->post('api/chat', [
179+
'model' => $request->model(),
180+
'system' => data_get($request->systemPrompts(), '0.content', ''),
181+
'messages' => (new MessageMap($request->messages()))->map(),
182+
'tools' => ToolMap::map($request->tools()),
183+
'stream' => true,
184+
'options' => array_filter([
185+
'temperature' => $request->temperature(),
186+
'num_predict' => $request->maxTokens() ?? 2048,
187+
'top_p' => $request->topP(),
188+
]),
189+
]);
190+
} catch (Throwable $e) {
191+
if ($e instanceof RequestException && $e->response->getStatusCode() === 429) {
192+
throw new PrismRateLimitedException([]);
193+
}
194+
195+
throw PrismException::providerRequestError($request->model(), $e);
196+
}
197+
}
198+
199+
protected function readLine(StreamInterface $stream): string
200+
{
201+
$buffer = '';
202+
203+
while (! $stream->eof()) {
204+
$byte = $stream->read(1);
205+
206+
if ($byte === '') {
207+
return $buffer;
208+
}
209+
210+
$buffer .= $byte;
211+
212+
if ($byte === "\n") {
213+
break;
214+
}
215+
}
216+
217+
return $buffer;
218+
}
219+
}

src/Providers/Ollama/Ollama.php

+7-2
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
use Prism\Prism\Contracts\Provider;
1111
use Prism\Prism\Embeddings\Request as EmbeddingsRequest;
1212
use Prism\Prism\Embeddings\Response as EmbeddingsResponse;
13-
use Prism\Prism\Exceptions\PrismException;
1413
use Prism\Prism\Providers\Ollama\Handlers\Embeddings;
14+
use Prism\Prism\Providers\Ollama\Handlers\Stream;
1515
use Prism\Prism\Providers\Ollama\Handlers\Structured;
1616
use Prism\Prism\Providers\Ollama\Handlers\Text;
1717
use Prism\Prism\Structured\Request as StructuredRequest;
@@ -62,7 +62,12 @@ public function embeddings(EmbeddingsRequest $request): EmbeddingsResponse
6262
#[\Override]
6363
public function stream(TextRequest $request): Generator
6464
{
65-
throw PrismException::unsupportedProviderAction(__METHOD__, class_basename($this));
65+
$handler = new Stream($this->client(
66+
$request->clientOptions(),
67+
$request->clientRetry()
68+
));
69+
70+
return $handler->handle($request);
6671
}
6772

6873
/**

tests/Fixtures/FixtureResponse.php

+52-52
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,58 @@ public static function fakeResponseSequence(
8888
])->preventStrayRequests();
8989
}
9090

91+
public static function fakeStreamResponses(string $requestPath, string $name): void
92+
{
93+
$basePath = dirname(static::filePath("{$name}-1.sse"));
94+
95+
// Find all recorded .sse files for this test
96+
$files = collect(is_dir($basePath) ? scandir($basePath) : [])
97+
->filter(fn ($file): int|false => preg_match('/^'.preg_quote(basename($name), '/').'-\d+\.sse$/', $file))
98+
->map(fn ($file): string => $basePath.'/'.$file)
99+
->values()
100+
->toArray();
101+
102+
// If no files exist, automatically record the streaming responses
103+
if (empty($files)) {
104+
static::recordStreamResponses($requestPath, $name);
105+
106+
return;
107+
}
108+
109+
// Sort files numerically
110+
usort($files, function ($a, $b): int {
111+
preg_match('/-(\d+)\.sse$/', $a, $matchesA);
112+
preg_match('/-(\d+)\.sse$/', $b, $matchesB);
113+
114+
return (int) $matchesA[1] <=> (int) $matchesB[1];
115+
});
116+
117+
// Create response sequence from the files
118+
$responses = array_map(fn ($file) => Http::response(
119+
file_get_contents($file),
120+
200,
121+
[
122+
'Content-Type' => 'text/event-stream',
123+
'Cache-Control' => 'no-cache',
124+
'Connection' => 'keep-alive',
125+
'Transfer-Encoding' => 'chunked',
126+
]
127+
), $files);
128+
129+
if ($responses === []) {
130+
$responses[] = Http::response(
131+
"data: {\"error\":\"No recorded stream responses found\"}\n\ndata: [DONE]\n\n",
132+
200,
133+
['Content-Type' => 'text/event-stream']
134+
);
135+
}
136+
137+
// Register the fake responses
138+
Http::fake([
139+
$requestPath => Http::sequence($responses),
140+
])->preventStrayRequests();
141+
}
142+
91143
protected static function filePath(string $filePath): string
92144
{
93145
return sprintf('%s/%s', __DIR__, $filePath);
@@ -172,56 +224,4 @@ protected static function recordStreamResponses(string $requestPath, string $nam
172224
return Http::response('{"error":"Not mocked"}', 404);
173225
});
174226
}
175-
176-
protected static function fakeStreamResponses(string $requestPath, string $name): void
177-
{
178-
$basePath = dirname(static::filePath("{$name}-1.sse"));
179-
180-
// Find all recorded .sse files for this test
181-
$files = collect(is_dir($basePath) ? scandir($basePath) : [])
182-
->filter(fn ($file): int|false => preg_match('/^'.preg_quote(basename($name), '/').'-\d+\.sse$/', $file))
183-
->map(fn ($file): string => $basePath.'/'.$file)
184-
->values()
185-
->toArray();
186-
187-
// If no files exist, automatically record the streaming responses
188-
if (empty($files)) {
189-
static::recordStreamResponses($requestPath, $name);
190-
191-
return;
192-
}
193-
194-
// Sort files numerically
195-
usort($files, function ($a, $b): int {
196-
preg_match('/-(\d+)\.sse$/', $a, $matchesA);
197-
preg_match('/-(\d+)\.sse$/', $b, $matchesB);
198-
199-
return (int) $matchesA[1] <=> (int) $matchesB[1];
200-
});
201-
202-
// Create response sequence from the files
203-
$responses = array_map(fn ($file) => Http::response(
204-
file_get_contents($file),
205-
200,
206-
[
207-
'Content-Type' => 'text/event-stream',
208-
'Cache-Control' => 'no-cache',
209-
'Connection' => 'keep-alive',
210-
'Transfer-Encoding' => 'chunked',
211-
]
212-
), $files);
213-
214-
if ($responses === []) {
215-
$responses[] = Http::response(
216-
"data: {\"error\":\"No recorded stream responses found\"}\n\ndata: [DONE]\n\n",
217-
200,
218-
['Content-Type' => 'text/event-stream']
219-
);
220-
}
221-
222-
// Register the fake responses
223-
Http::fake([
224-
$requestPath => Http::sequence($responses),
225-
])->preventStrayRequests();
226-
}
227227
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:37.255986Z","message":{"role":"assistant","content":"I"},"done":false}
2+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:37.298061Z","message":{"role":"assistant","content":" am"},"done":false}
3+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:37.338913Z","message":{"role":"assistant","content":" Gr"},"done":false}
4+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:37.380156Z","message":{"role":"assistant","content":"an"},"done":false}
5+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:37.422782Z","message":{"role":"assistant","content":"ite"},"done":false}
6+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:37.465241Z","message":{"role":"assistant","content":","},"done":false}
7+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:37.506745Z","message":{"role":"assistant","content":" a"},"done":false}
8+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:37.550387Z","message":{"role":"assistant","content":" language"},"done":false}
9+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:37.592188Z","message":{"role":"assistant","content":" model"},"done":false}
10+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:37.634329Z","message":{"role":"assistant","content":" developed"},"done":false}
11+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:37.676125Z","message":{"role":"assistant","content":" by"},"done":false}
12+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:37.717757Z","message":{"role":"assistant","content":" IBM"},"done":false}
13+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:37.760111Z","message":{"role":"assistant","content":" in"},"done":false}
14+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:37.801811Z","message":{"role":"assistant","content":" "},"done":false}
15+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:37.843153Z","message":{"role":"assistant","content":"2"},"done":false}
16+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:37.884556Z","message":{"role":"assistant","content":"0"},"done":false}
17+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:37.925817Z","message":{"role":"assistant","content":"2"},"done":false}
18+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:37.967355Z","message":{"role":"assistant","content":"4"},"done":false}
19+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.00989Z","message":{"role":"assistant","content":"."},"done":false}
20+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.051238Z","message":{"role":"assistant","content":" I"},"done":false}
21+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.092541Z","message":{"role":"assistant","content":" am"},"done":false}
22+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.135168Z","message":{"role":"assistant","content":" designed"},"done":false}
23+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.178044Z","message":{"role":"assistant","content":" to"},"done":false}
24+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.219314Z","message":{"role":"assistant","content":" understand"},"done":false}
25+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.262311Z","message":{"role":"assistant","content":" and"},"done":false}
26+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.303619Z","message":{"role":"assistant","content":" respond"},"done":false}
27+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.34506Z","message":{"role":"assistant","content":" to"},"done":false}
28+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.387669Z","message":{"role":"assistant","content":" a"},"done":false}
29+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.43038Z","message":{"role":"assistant","content":" wide"},"done":false}
30+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.471961Z","message":{"role":"assistant","content":" range"},"done":false}
31+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.514682Z","message":{"role":"assistant","content":" of"},"done":false}
32+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.556376Z","message":{"role":"assistant","content":" questions"},"done":false}
33+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.598044Z","message":{"role":"assistant","content":" and"},"done":false}
34+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.639513Z","message":{"role":"assistant","content":" promp"},"done":false}
35+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.681122Z","message":{"role":"assistant","content":"ts"},"done":false}
36+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.724076Z","message":{"role":"assistant","content":"."},"done":false}
37+
{"model":"granite3-dense:8b","created_at":"2025-03-20T18:32:38.766786Z","message":{"role":"assistant","content":""},"done_reason":"stop","done":true,"total_duration":2203237375,"load_duration":336223209,"prompt_eval_count":37,"prompt_eval_duration":355000000,"eval_count":37,"eval_duration":1511000000}

0 commit comments

Comments
 (0)