并在结束前添加额外的块"/>
在 NodeJS Cloud 函数中转发 ReadableStream,并在结束前添加额外的块
我有以下功能,应该执行以下操作:
- 获取 ReadableStream
- 逐块阅读
- 在最后一个块之前(openai 用“DONE”字符串表示),应该添加一个带有我的 extensionPayload 的额外块。
问题是原始/openai流的最后一个块的数据+我的扩展数据合并为一个块。但是要在客户端处理块,我需要它们是单独的块。
export async function extendOpenAIStream(
openaiStream: ReadableStream<Uint8Array>,
extensionPayload: JSONValue
) {
const encoder = new TextEncoder()
const decoder = new TextDecoder()
const reader = openaiStream.getReader()
const stream = new ReadableStream({
cancel() {
reader.cancel()
},
async start(controller) {
while (true) {
const { done, value } = await reader.read()
const dataString = decoder.decode(value)
//
if (done || dataString.includes('[DONE]')) {
// Enque our extension
const extendedValue = encoder.encode(
`data: ${JSON.stringify(extensionPayload)}\n\n`
)
controller.enqueue(extendedValue)
// Enque the original chunk
controller.enqueue(value)
// Close the stream
controller.close()
break
}
controller.enqueue(value)
}
},
})
return stream
}
预期块(分离块):
data: {"extensionPayload": {...}}
data: {"id":"...,"object":"chatpletion.chunk","created":1684486791,"model":"gpt-3.5-turbo-0301","choices":[{"delta":{},"index":0,"finish_reason":"stop"}]}
data: [DONE]
实际块(合并为一个块):
data: {"extensionPayload": {...}}
data: {"id":"...,"object":"chatpletion.chunk","created":1684486791,"model":"gpt-3.5-turbo-0301","choices":[{"delta":{},"index":0,"finish_reason":"stop"}]}
data: [DONE]
回答如下:
如果有人需要做类似的事情,我终于有了解决方案。事实证明,块是否被分割并不重要,因为我使用的是 eventsource-parser。 真正的问题是块变得支离破碎。根据 Vercel,这是预期的(见here。 我现在也在云函数中使用 eventsource-parser 来确保块不碎片化:
const stream = new ReadableStream({
cancel() {
reader.cancel()
},
async start(controller) {
// Chunks might get fragmented so we use eventsource-parse to ensure the chunks are complete
// See: https://vercel/docs/concepts/functions/edge-functions/streaming#caveats
const parser = createParser((e) => {
if (e.type !== 'event') return
controller.enqueue(encoder.encode(`data: ${e.data}\n\n`))
})
while (true) {
const { done, value } = await reader.read()
const dataString = decoder.decode(value)
// https://beta.openai/docs/api-reference/completions/create#completions/create-stream
if (done || dataString.includes('[DONE]')) {
// Enque our extension
const extendedValue = encoder.encode(
`data: ${JSON.stringify(extensionPayload)}\n\n`
)
controller.enqueue(extendedValue)
// Close the stream
controller.close()
break
}
parser.feed(dataString)
}
},
})
更多推荐
在 NodeJS Cloud 函数中转发 ReadableStream,并在结束前添加额外的块
发布评论