next.js/packages/next/src/server/app-render/app-render-prerender-utils.test.ts
app-render-prerender-utils.test.ts393 lines11.9 KB
import { PassThrough } from 'node:stream'
import { ReplayableNodeStream } from './app-render-prerender-utils'

function collectBytes(
  stream: import('node:stream').Readable
): Promise<number[][]> {
  return new Promise((resolve, reject) => {
    const chunks: number[][] = []
    stream.on('data', (chunk: Buffer) => chunks.push(Array.from(chunk)))
    stream.on('end', () => resolve(chunks))
    stream.on('error', reject)
  })
}

function tick(): Promise<void> {
  return new Promise((resolve) => setImmediate(resolve))
}

describe('ReplayableNodeStream', () => {
  describe('construction and buffering', () => {
    it('buffers Uint8Array chunks from the source', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      source.write(new Uint8Array([1, 2, 3]))
      source.write(new Uint8Array([4, 5, 6]))

      const ended = new Promise<void>((r) => source.on('end', r))
      source.end()
      await ended

      const collected = await collectBytes(replayable.createReplayStream())
      expect(collected).toEqual([
        [1, 2, 3],
        [4, 5, 6],
      ])
    })

    it('converts Buffer chunks to Uint8Array before buffering', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      source.write(Buffer.from([10, 20, 30]))

      const ended = new Promise<void>((r) => source.on('end', r))
      source.end()
      await ended

      const collected = await collectBytes(replayable.createReplayStream())
      expect(collected).toEqual([[10, 20, 30]])
    })

    it('handles an immediately-ending empty stream', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      const ended = new Promise<void>((r) => source.on('end', r))
      source.end()
      await ended

      const collected = await collectBytes(replayable.createReplayStream())
      expect(collected).toEqual([])
    })
  })

  describe('createReplayStream – replay after source completes', () => {
    it('replays all buffered chunks and ends', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      source.write(new Uint8Array([1, 2]))
      source.write(new Uint8Array([3, 4]))
      source.write(new Uint8Array([5, 6]))

      const ended = new Promise<void>((r) => source.on('end', r))
      source.end()
      await ended

      const collected = await collectBytes(replayable.createReplayStream())
      expect(collected).toEqual([
        [1, 2],
        [3, 4],
        [5, 6],
      ])
    })

    it('returns chunks in original order', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      for (let i = 0; i < 10; i++) {
        source.write(new Uint8Array([i]))
      }

      const ended = new Promise<void>((r) => source.on('end', r))
      source.end()
      await ended

      const collected = await collectBytes(replayable.createReplayStream())
      const expected = Array.from({ length: 10 }, (_, i) => [i])
      expect(collected).toEqual(expected)
    })

    it('replay from empty finished source emits no data then ends', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      const ended = new Promise<void>((r) => source.on('end', r))
      source.end()
      await ended

      const collected = await collectBytes(replayable.createReplayStream())
      expect(collected).toEqual([])
    })
  })

  describe('createReplayStream – live source (still streaming)', () => {
    it('replays buffered chunks first then forwards new chunks', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      source.write(new Uint8Array([1, 2]))

      const replay = replayable.createReplayStream()
      const collectPromise = collectBytes(replay)

      // Let _read() fire and drain the buffer before pushing live data.
      await tick()

      source.write(new Uint8Array([3, 4]))
      source.end()

      const collected = await collectPromise
      expect(collected).toEqual([
        [1, 2],
        [3, 4],
      ])
    })

    it('onEnd from source closes the replay stream', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      const replay = replayable.createReplayStream()
      const replayEnded = new Promise<void>((r) => replay.on('end', r))
      replay.resume()

      source.end()
      await replayEnded
    })

    it('onError from source destroys the replay stream', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      const replay = replayable.createReplayStream()
      const errorPromise = new Promise<Error>((resolve) => {
        replay.on('error', resolve)
      })

      const testError = new Error('source failed')
      source.destroy(testError)

      const received = await errorPromise
      expect(received).toBe(testError)
    })
  })

  describe('createReplayStream – error before replay', () => {
    it('immediately destroys replay if source already errored', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      const testError = new Error('already broken')
      source.destroy(testError)
      await tick()

      const replay = replayable.createReplayStream()
      const errorPromise = new Promise<Error>((resolve) => {
        replay.on('error', resolve)
      })

      const received = await errorPromise
      expect(received).toBe(testError)
    })
  })

  describe('multiple independent replay streams', () => {
    it('each replay stream receives all buffered and live chunks', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      source.write(new Uint8Array([1]))
      source.write(new Uint8Array([2]))

      const replay1 = replayable.createReplayStream()
      const replay2 = replayable.createReplayStream()

      const collect1 = collectBytes(replay1)
      const collect2 = collectBytes(replay2)

      // Let _read() drain buffers before pushing live data.
      await tick()

      source.write(new Uint8Array([3]))
      source.end()

      const [collected1, collected2] = await Promise.all([collect1, collect2])

      expect(collected1).toEqual([[1], [2], [3]])
      expect(collected2).toEqual([[1], [2], [3]])
    })

    it('closing one replay stream does not affect the other', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      const replay1 = replayable.createReplayStream()
      const replay2 = replayable.createReplayStream()

      replay1.destroy()

      source.write(new Uint8Array([42]))
      source.end()

      const collected2 = await collectBytes(replay2)
      expect(collected2).toEqual([[42]])
    })
  })

  describe('subscriber cleanup', () => {
    it('removes subscriber when replay stream is closed', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      const replay = replayable.createReplayStream()

      const subscribers = (replayable as any)._subscribers as Set<unknown>
      expect(subscribers.size).toBe(1)

      const closed = new Promise<void>((r) => replay.on('close', r))
      replay.destroy()
      await closed

      expect(subscribers.size).toBe(0)
    })

    it('does not register a subscriber when source already ended', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      const ended = new Promise<void>((r) => source.on('end', r))
      source.end()
      await ended

      replayable.createReplayStream()

      const subscribers = (replayable as any)._subscribers as Set<unknown>
      expect(subscribers.size).toBe(0)
    })

    it('does not register a subscriber when source already errored', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      source.destroy(new Error('boom'))
      await tick()

      const replay = replayable.createReplayStream()
      replay.on('error', () => {})

      const subscribers = (replayable as any)._subscribers as Set<unknown>
      expect(subscribers.size).toBe(0)
    })
  })

  describe('data after dispose', () => {
    it('does not crash when source emits data after dispose', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      source.write(new Uint8Array([1]))
      replayable.dispose()

      // Source keeps emitting after dispose — should not throw.
      source.write(new Uint8Array([2]))
      source.end()
      await tick()
    })
  })

  describe('error after partial data', () => {
    it('replay gets error even when chunks were buffered before error', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      source.write(new Uint8Array([1, 2, 3]))

      const testError = new Error('mid-stream failure')
      source.destroy(testError)
      await tick()

      const replay = replayable.createReplayStream()
      const errorPromise = new Promise<Error>((resolve) => {
        replay.on('error', resolve)
      })

      const received = await errorPromise
      expect(received).toBe(testError)
    })
  })

  describe('multiple replays from completed source', () => {
    it('each independently replays the full buffer', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      source.write(new Uint8Array([10]))
      source.write(new Uint8Array([20]))

      const ended = new Promise<void>((r) => source.on('end', r))
      source.end()
      await ended

      const collected1 = await collectBytes(replayable.createReplayStream())
      const collected2 = await collectBytes(replayable.createReplayStream())
      const collected3 = await collectBytes(replayable.createReplayStream())

      expect(collected1).toEqual([[10], [20]])
      expect(collected2).toEqual([[10], [20]])
      expect(collected3).toEqual([[10], [20]])
    })
  })

  describe('dispose', () => {
    it('throws InvariantError when creating replay after dispose', () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      replayable.dispose()

      expect(() => replayable.createReplayStream()).toThrow(
        /Cannot create a replay stream after the ReplayableNodeStream has been disposed/
      )
    })

    it('clears buffer but keeps subscribers so streams can still end', () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      const replay = replayable.createReplayStream()

      expect((replayable as any)._chunks).not.toBeNull()
      expect((replayable as any)._subscribers.size).toBe(1)

      replayable.dispose()

      expect((replayable as any)._chunks).toBeNull()
      // Subscribers must NOT be cleared: the replay stream's onEnd subscriber
      // needs to remain active so the stream receives push(null) when the
      // source ends, otherwise the consumer never sees EOF.
      expect((replayable as any)._subscribers.size).toBe(1)

      replay.destroy()
    })

    it('replay stream still ends when source ends after dispose', async () => {
      const source = new PassThrough()
      const replayable = new ReplayableNodeStream(source)

      source.write(new Uint8Array([1, 2]))

      const replay = replayable.createReplayStream()
      replayable.dispose()

      const collectPromise = collectBytes(replay)

      // Let _read() drain buffered chunks before pushing live data.
      await tick()

      // Source ends after dispose — the replay stream should still close
      source.write(new Uint8Array([3, 4]))
      source.end()

      const collected = await collectPromise
      expect(collected).toEqual([
        [1, 2],
        [3, 4],
      ])
    })
  })
})
Quest for Codev2.0.0
/
SIGN IN