Skip to content

Commit f4e00eb

Browse files
committed
more robust streaming
1 parent e6dcf6a commit f4e00eb

File tree

2 files changed

+39
-29
lines changed

2 files changed

+39
-29
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "oneping",
3-
"version": "1.0.6",
3+
"version": "1.0.7",
44
"description": "Query LLMs and Whisper straight from the web.",
55
"type": "module",
66
"author": "Doug Hanley",

src/curl.js

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,26 @@ function robustParse(json) {
77
return JSON.parse(json);
88
} catch (e) {
99
console.error(e);
10+
console.log(json);
1011
return null;
1112
}
1213
}
1314

15+
async function* streamSSE(stream) {
16+
let buf = ''
17+
for await (const chunk of stream) {
18+
buf += chunk
19+
let boundary;
20+
while ((boundary = buf.indexOf('\n\n')) >= 0) {
21+
yield buf.slice(0, boundary)
22+
buf = buf.slice(boundary + 2)
23+
}
24+
}
25+
if (buf.length > 0) {
26+
yield buf
27+
}
28+
}
29+
1430
//
1531
// authorization
1632
//
@@ -73,31 +89,24 @@ function extractor_anthropic(response) {
7389
return response.content[0].text;
7490
}
7591

76-
function* stream_openai(response) {
77-
for (const block of response.split('\n\n')) {
78-
if (block.length == 0) continue;
79-
const [match, data0] = /^data: (.*)$/.exec(block)
80-
if (data0 == '[DONE]') break;
81-
const data = robustParse(data0);
82-
if (data == null) continue;
83-
const text = data.choices[0].delta.content;
84-
if (text != null) yield text;
85-
}
92+
function stream_openai(chunk) {
93+
const [match, data0] = /^data: (.*)$/.exec(chunk)
94+
if (data0 == '[DONE]') return;
95+
const data = robustParse(data0);
96+
if (data == null) return;
97+
return data.choices[0].delta.content;
8698
}
8799

88-
function* stream_anthropic(chunk) {
89-
for (const block of chunk.split('\n\n')) {
90-
if (block.length == 0) continue;
91-
const [line1, line2] = block.split('\n')
92-
const [match1, event] = /^event: (.*)$/.exec(line1)
93-
const [match2, data0] = /^data: (.*)$/.exec(line2)
94-
const data = robustParse(data0);
95-
if (data == null) continue;
96-
if (event == 'content_block_start') {
97-
yield data.content_block.text;
98-
} else if (event == 'content_block_delta') {
99-
yield data.delta.text;
100-
}
100+
function stream_anthropic(chunk) {
101+
const [line1, line2] = chunk.split('\n')
102+
const [match1, event] = /^event: (.*)$/.exec(line1)
103+
const [match2, data0] = /^data: (.*)$/.exec(line2)
104+
const data = robustParse(data0);
105+
if (data == null) return;
106+
if (event == 'content_block_start') {
107+
return data.content_block.text;
108+
} else if (event == 'content_block_delta') {
109+
return data.delta.text;
101110
}
102111
}
103112

@@ -248,12 +257,13 @@ async function* stream(query, args) {
248257
}
249258

250259
// stream decode and parse
251-
const stream = response.body
252-
.pipeThrough(new TextDecoderStream())
253-
.pipeThrough(new TransformStream({ transform }));
260+
const stream = response.body.pipeThrough(new TextDecoderStream())
254261

255-
// yield chunks
256-
yield* stream;
262+
// process stream one SSE event at a time
263+
for await (const data of streamSSE(stream)) {
264+
const text = provider.stream(data);
265+
if (text != null) yield text;
266+
}
257267
}
258268

259269
class Chat {

0 commit comments

Comments
 (0)