next.js/packages/next/src/server/pipe-readable.ts
pipe-readable.ts242 lines6.4 KB
import type { ServerResponse } from 'node:http'
import type { Readable } from 'node:stream'

import {
  ResponseAbortedName,
  createAbortController,
} from './web/spec-extension/adapters/next-request'
import { DetachedPromise } from '../lib/detached-promise'
import { getTracer } from './lib/trace/tracer'
import { NextNodeServerSpan } from './lib/trace/constants'
import { getClientComponentLoaderMetrics } from './client-component-renderer-logger'

export function isAbortError(e: any): e is Error & { name: 'AbortError' } {
  return e?.name === 'AbortError' || e?.name === ResponseAbortedName
}

const HAS_CLIENT_COMPONENT_METRICS_ENABLED =
  'performance' in globalThis && process.env.NEXT_OTEL_PERFORMANCE_PREFIX

function createWriterFromResponse(
  res: ServerResponse,
  waitUntilForEnd?: Promise<unknown>
): WritableStream<Uint8Array> {
  let started = false

  // Create a promise that will resolve once the response has drained. See
  // https://nodejs.org/api/stream.html#stream_event_drain
  let drained = new DetachedPromise<void>()
  function onDrain() {
    drained.resolve()
  }
  res.on('drain', onDrain)

  // If the finish event fires, it means we shouldn't block and wait for the
  // drain event.
  res.once('close', () => {
    res.off('drain', onDrain)
    drained.resolve()
  })

  // Create a promise that will resolve once the response has finished. See
  // https://nodejs.org/api/http.html#event-finish_1
  const finished = new DetachedPromise<void>()
  res.once('finish', () => {
    finished.resolve()
  })

  // Create a writable stream that will write to the response.
  return new WritableStream<Uint8Array>({
    write: async (chunk) => {
      // You'd think we'd want to use `start` instead of placing this in `write`
      // but this ensures that we don't actually flush the headers until we've
      // started writing chunks.
      if (!started) {
        started = true

        if (HAS_CLIENT_COMPONENT_METRICS_ENABLED) {
          const metrics = getClientComponentLoaderMetrics()
          if (metrics) {
            performance.measure(
              `${process.env.NEXT_OTEL_PERFORMANCE_PREFIX}:next-client-component-loading`,
              {
                start: metrics.clientComponentLoadStart,
                end:
                  metrics.clientComponentLoadStart +
                  metrics.clientComponentLoadTimes,
              }
            )
          }
        }

        res.flushHeaders()
        getTracer().trace(
          NextNodeServerSpan.startResponse,
          {
            spanName: 'start response',
          },
          () => undefined
        )
      }

      try {
        const ok = res.write(chunk)

        // Added by the `compression` middleware, this is a function that will
        // flush the partially-compressed response to the client.
        if ('flush' in res && typeof res.flush === 'function') {
          res.flush()
        }

        // If the write returns false, it means there's some backpressure, so
        // wait until it's streamed before continuing.
        if (!ok) {
          await drained.promise

          // Reset the drained promise so that we can wait for the next drain event.
          drained = new DetachedPromise<void>()
        }
      } catch (err) {
        res.end()
        throw new Error('failed to write chunk to response', { cause: err })
      }
    },
    abort: (err) => {
      if (res.writableFinished) return

      res.destroy(err)
    },
    close: async () => {
      // if a waitUntil promise was passed, wait for it to resolve before
      // ending the response.
      if (waitUntilForEnd) {
        await waitUntilForEnd
      }

      if (res.writableFinished) return

      res.end()
      return finished.promise
    },
  })
}

export async function pipeToNodeResponse(
  readable: ReadableStream<Uint8Array>,
  res: ServerResponse,
  waitUntilForEnd?: Promise<unknown>
) {
  try {
    // If the response has already errored, then just return now.
    const { errored, destroyed } = res
    if (errored || destroyed) return

    // Create a new AbortController so that we can abort the readable if the
    // client disconnects.
    const controller = createAbortController(res)

    const writer = createWriterFromResponse(res, waitUntilForEnd)

    await readable.pipeTo(writer, { signal: controller.signal })
  } catch (err: any) {
    // If this isn't related to an abort error, re-throw it.
    if (isAbortError(err)) return

    throw new Error('failed to pipe response', { cause: err })
  }
}

export async function pipeNodeReadableToNodeResponse(
  readable: Readable,
  res: ServerResponse,
  waitUntilForEnd?: Promise<unknown>
) {
  try {
    const { errored, destroyed } = res
    if (errored || destroyed) return

    let started = false

    const finished = new DetachedPromise<void>()

    res.once('close', () => {
      readable.destroy()
      finished.resolve()
    })

    readable.on('data', (chunk: Buffer) => {
      if (!started) {
        started = true

        if (
          'performance' in globalThis &&
          process.env.NEXT_OTEL_PERFORMANCE_PREFIX
        ) {
          const metrics = getClientComponentLoaderMetrics()
          if (metrics) {
            performance.measure(
              `${process.env.NEXT_OTEL_PERFORMANCE_PREFIX}:next-client-component-loading`,
              {
                start: metrics.clientComponentLoadStart,
                end:
                  metrics.clientComponentLoadStart +
                  metrics.clientComponentLoadTimes,
              }
            )
          }
        }

        res.flushHeaders()
        getTracer().trace(
          NextNodeServerSpan.startResponse,
          {
            spanName: 'start response',
          },
          () => undefined
        )
      }

      const ok = res.write(chunk)

      if ('flush' in res && typeof res.flush === 'function') {
        res.flush()
      }

      if (!ok) {
        readable.pause()
        res.once('drain', () => {
          readable.resume()
        })
      }
    })

    readable.on('end', async () => {
      if (waitUntilForEnd) {
        await waitUntilForEnd
      }

      if (!res.writableFinished) {
        res.end()
      }

      finished.resolve()
    })

    readable.on('error', (err) => {
      if (isAbortError(err)) {
        finished.resolve()
        return
      }

      res.destroy(err)
      finished.resolve()
    })

    await finished.promise
  } catch (err: any) {
    if (isAbortError(err)) return

    throw new Error('failed to pipe response', { cause: err })
  }
}
Quest for Codev2.0.0
/
SIGN IN