在 NodeJS Cloud 函数中转发 ReadableStream,并在结束前添加额外的块

编程入门 行业动态 更新时间:2024-10-03 17:19:23

在 NodeJS Cloud 函数中转发 ReadableStream,<a href=https://www.elefans.com/category/jswz/34/1771370.html style=并在结束前添加额外的块"/>

在 NodeJS Cloud 函数中转发 ReadableStream,并在结束前添加额外的块

我有以下功能,应该执行以下操作:

  • 获取 ReadableStream
  • 逐块阅读
  • 在最后一个块之前(openai 用“DONE”字符串表示),应该添加一个带有我的 extensionPayload 的额外块。

问题是原始/openai流的最后一个块的数据+我的扩展数据合并为一个块。但是要在客户端处理块,我需要它们是单独的块。

export async function extendOpenAIStream(
  openaiStream: ReadableStream<Uint8Array>,
  extensionPayload: JSONValue
) {
  const encoder = new TextEncoder()
  const decoder = new TextDecoder()

  const reader = openaiStream.getReader()

  const stream = new ReadableStream({
    cancel() {
      reader.cancel()
    },
    async start(controller) {
      while (true) {
        const { done, value } = await reader.read()
        const dataString = decoder.decode(value)

        // 
        if (done || dataString.includes('[DONE]')) {
          // Enque our extension
          const extendedValue = encoder.encode(
            `data: ${JSON.stringify(extensionPayload)}\n\n`
          )
          controller.enqueue(extendedValue)

          // Enque the original chunk
          controller.enqueue(value)

          // Close the stream
          controller.close()
          break
        }
        controller.enqueue(value)
      }
    },
  })

  return stream
}

预期块(分离块):

data: {"extensionPayload": {...}}
data: {"id":"...,"object":"chatpletion.chunk","created":1684486791,"model":"gpt-3.5-turbo-0301","choices":[{"delta":{},"index":0,"finish_reason":"stop"}]}

data: [DONE]

实际块(合并为一个块):

data: {"extensionPayload": {...}}

data: {"id":"...,"object":"chatpletion.chunk","created":1684486791,"model":"gpt-3.5-turbo-0301","choices":[{"delta":{},"index":0,"finish_reason":"stop"}]}

data: [DONE]
回答如下:

如果有人需要做类似的事情,我终于有了解决方案。事实证明,块是否被分割并不重要,因为我使用的是 eventsource-parser。 真正的问题是块变得支离破碎。根据 Vercel,这是预期的(见here。 我现在也在云函数中使用 eventsource-parser 来确保块不碎片化:

const stream = new ReadableStream({
  cancel() {
    reader.cancel()
  },
  async start(controller) {
    // Chunks might get fragmented so we use eventsource-parse to ensure the chunks are complete
    // See: https://vercel/docs/concepts/functions/edge-functions/streaming#caveats
    const parser = createParser((e) => {
      if (e.type !== 'event') return
      controller.enqueue(encoder.encode(`data: ${e.data}\n\n`))
    })

    while (true) {
      const { done, value } = await reader.read()
      const dataString = decoder.decode(value)

      // https://beta.openai/docs/api-reference/completions/create#completions/create-stream
      if (done || dataString.includes('[DONE]')) {
        // Enque our extension
        const extendedValue = encoder.encode(
          `data: ${JSON.stringify(extensionPayload)}\n\n`
        )
        controller.enqueue(extendedValue)

        // Close the stream
        controller.close()
        break
      }
      parser.feed(dataString)
    }
  },
})

更多推荐

在 NodeJS Cloud 函数中转发 ReadableStream,并在结束前添加额外的块

本文发布于:2024-05-31 05:23:50,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1771321.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:并在   函数   结束   NodeJS   Cloud

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!