import type { ReactDOMServerReadableStream } from 'react-dom/server'
import { getTracer } from '../lib/trace/tracer'
import { AppRenderSpan } from '../lib/trace/constants'
import { DetachedPromise } from '../../lib/detached-promise'
import {
scheduleImmediate,
atLeastOneTask,
waitAtLeastOneReactRenderTask,
} from '../../lib/scheduler'
import { ENCODED_TAGS } from './encoded-tags'
import {
indexOfUint8Array,
isEquivalentUint8Arrays,
removeFromUint8Array,
} from './uint8array-helpers'
import { MISSING_ROOT_TAGS_ERROR } from '../../shared/lib/errors/constants'
import {
RSC_HEADER,
NEXT_ROUTER_PREFETCH_HEADER,
NEXT_ROUTER_SEGMENT_PREFETCH_HEADER,
NEXT_RSC_UNION_QUERY,
NEXT_INSTANT_PREFETCH_HEADER,
} from '../../client/components/app-router-headers'
import { computeCacheBustingSearchParam } from '../../shared/lib/router/utils/cache-busting-search-param'
import type { AnyStream } from '../app-render/stream-ops'
function voidCatch() {
// this catcher is designed to be used with pipeTo where we expect the underlying
// pipe implementation to forward errors but we don't want the pipeTo promise to reject
// and be unhandled
}
// We can share the same encoder instance everywhere
// Notably we cannot do the same for TextDecoder because it is stateful
// when handling streaming data
const encoder = new TextEncoder()
export function chainStreams<T>(
...streams: ReadableStream<T>[]
): ReadableStream<T> {
// If we have no streams, return an empty stream. This behavior is
// intentional as we're now providing the `RenderResult.EMPTY` value.
if (streams.length === 0) {
return new ReadableStream<T>({
start(controller) {
controller.close()
},
})
}
// If we only have 1 stream we fast path it by returning just this stream
if (streams.length === 1) {
return streams[0]
}
const { readable, writable } = new TransformStream()
// We always initiate pipeTo immediately. We know we have at least 2 streams
// so we need to avoid closing the writable when this one finishes.
let promise = streams[0].pipeTo(writable, { preventClose: true })
let i = 1
for (; i < streams.length - 1; i++) {
const nextStream = streams[i]
promise = promise.then(() =>
nextStream.pipeTo(writable, { preventClose: true })
)
}
// We can omit the length check because we halted before the last stream and there
// is at least two streams so the lastStream here will always be defined
const lastStream = streams[i]
promise = promise.then(() => lastStream.pipeTo(writable))
// Catch any errors from the streams and ignore them, they will be handled
// by whatever is consuming the readable stream.
promise.catch(voidCatch)
return readable
}
export function streamFromString(str: string): ReadableStream<Uint8Array> {
return new ReadableStream({
start(controller) {
controller.enqueue(encoder.encode(str))
controller.close()
},
})
}
export function streamFromBuffer(chunk: Buffer): ReadableStream<Uint8Array> {
return new ReadableStream({
start(controller) {
controller.enqueue(chunk)
controller.close()
},
})
}
async function streamToChunks(
stream: ReadableStream<Uint8Array>
): Promise<Array<Uint8Array>> {
const reader = stream.getReader()
const chunks: Array<Uint8Array> = []
while (true) {
const { done, value } = await reader.read()
if (done) {
break
}
chunks.push(value)
}
return chunks
}
function concatUint8Arrays(chunks: Array<Uint8Array>): Uint8Array {
const totalLength = chunks.reduce((sum, chunk) => sum + chunk.length, 0)
const result = new Uint8Array(totalLength)
let offset = 0
for (const chunk of chunks) {
result.set(chunk, offset)
offset += chunk.length
}
return result
}
export async function webstreamToUint8Array(
stream: ReadableStream<Uint8Array>
): Promise<Uint8Array> {
return concatUint8Arrays(await streamToChunks(stream))
}
function webToReadable(
stream: ReadableStream<Uint8Array> | import('node:stream').Readable
): import('node:stream').Readable {
if (process.env.NEXT_RUNTIME === 'edge') {
throw new Error('webToReadable cannot be used in the edge runtime')
} else {
let Readable: typeof import('node:stream').Readable
if (process.env.TURBOPACK) {
Readable = (require('node:stream') as typeof import('node:stream'))
.Readable
} else if (process.env.__NEXT_BUNDLER === 'Webpack') {
Readable = (
__non_webpack_require__('node:stream') as typeof import('node:stream')
).Readable
} else {
Readable = (require('node:stream') as typeof import('node:stream'))
.Readable
}
if (stream instanceof Readable) {
return stream
}
return Readable.fromWeb(stream as import('stream/web').ReadableStream)
}
}
export async function nodestreamToUint8Array(
stream: AnyStream
): Promise<Uint8Array> {
const chunks: Buffer[] = []
for await (const chunk of webToReadable(stream)) {
chunks.push(typeof chunk === 'string' ? Buffer.from(chunk) : chunk)
}
return Buffer.concat(chunks)
}
export async function streamToUint8Array(stream: AnyStream) {
if (process.env.NEXT_RUNTIME === 'edge') {
// Edge runtime always uses web streams
return webstreamToUint8Array(stream as ReadableStream<Uint8Array>)
} else {
let Readable: typeof import('node:stream').Readable
if (process.env.TURBOPACK) {
Readable = (require('node:stream') as typeof import('node:stream'))
.Readable
} else if (process.env.__NEXT_BUNDLER === 'Webpack') {
Readable = (
__non_webpack_require__('node:stream') as typeof import('node:stream')
).Readable
} else {
Readable = (require('node:stream') as typeof import('node:stream'))
.Readable
}
if (stream instanceof Readable) {
return nodestreamToUint8Array(stream)
}
return webstreamToUint8Array(stream)
}
}
export async function streamToBuffer(
stream: ReadableStream<Uint8Array>
): Promise<Buffer> {
return Buffer.concat(await streamToChunks(stream))
}
export async function streamToString(
stream: ReadableStream<Uint8Array>,
signal?: AbortSignal
): Promise<string> {
const decoder = new TextDecoder('utf-8', { fatal: true })
let string = ''
for await (const chunk of stream) {
if (signal?.aborted) {
return string
}
string += decoder.decode(chunk, { stream: true })
}
string += decoder.decode()
return string
}
export type BufferedTransformOptions = {
/**
* Flush synchronously once the buffer reaches this many bytes.
*/
readonly maxBufferByteLength?: number
}
export function createBufferedTransformStream(
options: BufferedTransformOptions = {}
): TransformStream<Uint8Array, Uint8Array> {
const { maxBufferByteLength = Infinity } = options
let bufferedChunks: Array<Uint8Array> = []
let bufferByteLength: number = 0
let pending: DetachedPromise<void> | undefined
const flush = (controller: TransformStreamDefaultController) => {
try {
if (bufferedChunks.length === 0) {
return
}
const chunk = new Uint8Array(bufferByteLength)
let copiedBytes = 0
for (let i = 0; i < bufferedChunks.length; i++) {
const bufferedChunk = bufferedChunks[i]
chunk.set(bufferedChunk, copiedBytes)
copiedBytes += bufferedChunk.byteLength
}
// We just wrote all the buffered chunks so we need to reset the bufferedChunks array
// and our bufferByteLength to prepare for the next round of buffered chunks
bufferedChunks.length = 0
bufferByteLength = 0
controller.enqueue(chunk)
} catch {
// If an error occurs while enqueuing, it can't be due to this
// transformer. It's most likely caused by the controller having been
// errored (for example, if the stream was cancelled).
}
}
const scheduleFlush = (controller: TransformStreamDefaultController) => {
if (pending) {
return
}
const detached = new DetachedPromise<void>()
pending = detached
scheduleImmediate(() => {
try {
flush(controller)
} finally {
pending = undefined
detached.resolve()
}
})
}
return new TransformStream({
transform(chunk, controller) {
// Combine the previous buffer with the new chunk.
bufferedChunks.push(chunk)
bufferByteLength += chunk.byteLength
if (bufferByteLength >= maxBufferByteLength) {
flush(controller)
} else {
scheduleFlush(controller)
}
},
flush() {
return pending?.promise
},
})
}
// TODO this is currently unused but once we add proper output:export support, it needs to be
// revisited. See https://github.com/vercel/next.js/pull/89478 for more details
//
// function createPrefetchCommentStream(
// isBuildTimePrerendering: boolean,
// buildId: string
// ): TransformStream<Uint8Array, Uint8Array> {
// // Insert an extra comment at the beginning of the HTML document. This must
// // come after the DOCTYPE, which is inserted by React.
// //
// // The first chunk sent by React will contain the doctype. After that, we can
// // pass through the rest of the chunks as-is.
// let didTransformFirstChunk = false
// return new TransformStream({
// transform(chunk, controller) {
// if (isBuildTimePrerendering && !didTransformFirstChunk) {
// didTransformFirstChunk = true
// const decoder = new TextDecoder('utf-8', { fatal: true })
// const chunkStr = decoder.decode(chunk, {
// stream: true,
// })
// const updatedChunkStr = insertBuildIdComment(chunkStr, buildId)
// controller.enqueue(encoder.encode(updatedChunkStr))
// return
// }
// controller.enqueue(chunk)
// },
// })
// }
export function renderToInitialFizzStream({
ReactDOMServer,
element,
streamOptions,
}: {
ReactDOMServer: {
renderToReadableStream: typeof import('react-dom/server').renderToReadableStream
}
element: React.ReactElement
streamOptions?: Parameters<typeof ReactDOMServer.renderToReadableStream>[1]
}): Promise<ReactDOMServerReadableStream> {
return getTracer().trace(AppRenderSpan.renderToReadableStream, async () =>
ReactDOMServer.renderToReadableStream(element, streamOptions)
)
}
export function createMetadataTransformStream(
insert: () => Promise<string> | string
): TransformStream<Uint8Array, Uint8Array> {
let chunkIndex = -1
let isMarkRemoved = false
return new TransformStream({
async transform(chunk, controller) {
let iconMarkIndex = -1
let closedHeadIndex = -1
chunkIndex++
if (isMarkRemoved) {
controller.enqueue(chunk)
return
}
let iconMarkLength = 0
// Only search for the closed head tag once
if (iconMarkIndex === -1) {
iconMarkIndex = indexOfUint8Array(chunk, ENCODED_TAGS.META.ICON_MARK)
if (iconMarkIndex === -1) {
controller.enqueue(chunk)
return
} else {
// When we found the `<meta name="«nxt-icon»"` tag prefix, we will remove it from the chunk.
// Its close tag could either be `/>` or `>`, checking the next char to ensure we cover both cases.
iconMarkLength = ENCODED_TAGS.META.ICON_MARK.length
// Check if next char is /, this is for xml mode.
if (chunk[iconMarkIndex + iconMarkLength] === 47) {
iconMarkLength += 2
} else {
// The last char is `>`
iconMarkLength++
}
}
}
// Check if icon mark is inside <head> tag in the first chunk.
if (chunkIndex === 0) {
closedHeadIndex = indexOfUint8Array(chunk, ENCODED_TAGS.CLOSED.HEAD)
if (iconMarkIndex !== -1) {
// The mark icon is located in the 1st chunk before the head tag.
// We do not need to insert the script tag in this case because it's in the head.
// Just remove the icon mark from the chunk.
if (iconMarkIndex < closedHeadIndex) {
const replaced = new Uint8Array(chunk.length - iconMarkLength)
// Remove the icon mark from the chunk.
replaced.set(chunk.subarray(0, iconMarkIndex))
replaced.set(
chunk.subarray(iconMarkIndex + iconMarkLength),
iconMarkIndex
)
chunk = replaced
} else {
// The icon mark is after the head tag, replace and insert the script tag at that position.
const insertion = await insert()
const encodedInsertion = encoder.encode(insertion)
const insertionLength = encodedInsertion.length
const replaced = new Uint8Array(
chunk.length - iconMarkLength + insertionLength
)
replaced.set(chunk.subarray(0, iconMarkIndex))
replaced.set(encodedInsertion, iconMarkIndex)
replaced.set(
chunk.subarray(iconMarkIndex + iconMarkLength),
iconMarkIndex + insertionLength
)
chunk = replaced
}
isMarkRemoved = true
}
// If there's no icon mark located, it will be handled later when if present in the following chunks.
} else {
// When it's appeared in the following chunks, we'll need to
// remove the mark and then insert the script tag at that position.
const insertion = await insert()
const encodedInsertion = encoder.encode(insertion)
const insertionLength = encodedInsertion.length
// Replace the icon mark with the hoist script or empty string.
const replaced = new Uint8Array(
chunk.length - iconMarkLength + insertionLength
)
// Set the first part of the chunk, before the icon mark.
replaced.set(chunk.subarray(0, iconMarkIndex))
// Set the insertion after the icon mark.
replaced.set(encodedInsertion, iconMarkIndex)
// Set the rest of the chunk after the icon mark.
replaced.set(
chunk.subarray(iconMarkIndex + iconMarkLength),
iconMarkIndex + insertionLength
)
chunk = replaced
isMarkRemoved = true
}
controller.enqueue(chunk)
},
})
}
export function createHeadInsertionTransformStream(
insert: () => Promise<string>
): TransformStream<Uint8Array, Uint8Array> {
let inserted = false
// We need to track if this transform saw any bytes because if it didn't
// we won't want to insert any server HTML at all
let hasBytes = false
return new TransformStream({
async transform(chunk, controller) {
hasBytes = true
const insertion = await insert()
if (inserted) {
if (insertion) {
const encodedInsertion = encoder.encode(insertion)
controller.enqueue(encodedInsertion)
}
controller.enqueue(chunk)
} else {
// TODO (@Ethan-Arrowood): Replace the generic `indexOfUint8Array` method with something finely tuned for the subset of things actually being checked for.
const index = indexOfUint8Array(chunk, ENCODED_TAGS.CLOSED.HEAD)
// In fully static rendering or non PPR rendering cases:
// `/head>` will always be found in the chunk in first chunk rendering.
if (index !== -1) {
if (insertion) {
const encodedInsertion = encoder.encode(insertion)
// Get the total count of the bytes in the chunk and the insertion
// e.g.
// chunk = <head><meta charset="utf-8"></head>
// insertion = <script>...</script>
// output = <head><meta charset="utf-8"> [ <script>...</script> ] </head>
const insertedHeadContent = new Uint8Array(
chunk.length + encodedInsertion.length
)
// Append the first part of the chunk, before the head tag
insertedHeadContent.set(chunk.slice(0, index))
// Append the server inserted content
insertedHeadContent.set(encodedInsertion, index)
// Append the rest of the chunk
insertedHeadContent.set(
chunk.slice(index),
index + encodedInsertion.length
)
controller.enqueue(insertedHeadContent)
} else {
controller.enqueue(chunk)
}
inserted = true
} else {
// This will happens in PPR rendering during next start, when the page is partially rendered.
// When the page resumes, the head tag will be found in the middle of the chunk.
// Where we just need to append the insertion and chunk to the current stream.
// e.g.
// PPR-static: <head>...</head><body> [ resume content ] </body>
// PPR-resume: [ insertion ] [ rest content ]
if (insertion) {
controller.enqueue(encoder.encode(insertion))
}
controller.enqueue(chunk)
inserted = true
}
}
},
async flush(controller) {
// Check before closing if there's anything remaining to insert.
if (hasBytes) {
const insertion = await insert()
if (insertion) {
controller.enqueue(encoder.encode(insertion))
}
}
},
})
}
async function createClientResumeScriptInsertionTransformStream(): Promise<
TransformStream<Uint8Array, Uint8Array>
> {
const segmentPath = '/_full'
const cacheBustingHeader = await computeCacheBustingSearchParam(
'1', // headers[NEXT_ROUTER_PREFETCH_HEADER]
'/_full', // headers[NEXT_ROUTER_SEGMENT_PREFETCH_HEADER]
undefined, // headers[NEXT_ROUTER_STATE_TREE_HEADER]
undefined // headers[NEXT_URL]
)
const searchStr = `${NEXT_RSC_UNION_QUERY}=${cacheBustingHeader}`
const NEXT_CLIENT_RESUME_SCRIPT = `<script>__NEXT_CLIENT_RESUME=fetch(location.pathname+'?${searchStr}',{credentials:'same-origin',headers:{'${RSC_HEADER}': '1','${NEXT_ROUTER_PREFETCH_HEADER}': '1','${NEXT_ROUTER_SEGMENT_PREFETCH_HEADER}': '${segmentPath}'}})</script>`
let didAlreadyInsert = false
return new TransformStream({
transform(chunk, controller) {
if (didAlreadyInsert) {
// Already inserted the script into the head. Pass through.
controller.enqueue(chunk)
return
}
// TODO (@Ethan-Arrowood): Replace the generic `indexOfUint8Array` method with something finely tuned for the subset of things actually being checked for.
const headClosingTagIndex = indexOfUint8Array(
chunk,
ENCODED_TAGS.CLOSED.HEAD
)
if (headClosingTagIndex === -1) {
// In fully static rendering or non PPR rendering cases:
// `/head>` will always be found in the chunk in first chunk rendering.
controller.enqueue(chunk)
return
}
const encodedInsertion = encoder.encode(NEXT_CLIENT_RESUME_SCRIPT)
// Get the total count of the bytes in the chunk and the insertion
// e.g.
// chunk = <head><meta charset="utf-8"></head>
// insertion = <script>...</script>
// output = <head><meta charset="utf-8"> [ <script>...</script> ] </head>
const insertedHeadContent = new Uint8Array(
chunk.length + encodedInsertion.length
)
// Append the first part of the chunk, before the head tag
insertedHeadContent.set(chunk.slice(0, headClosingTagIndex))
// Append the server inserted content
insertedHeadContent.set(encodedInsertion, headClosingTagIndex)
// Append the rest of the chunk
insertedHeadContent.set(
chunk.slice(headClosingTagIndex),
headClosingTagIndex + encodedInsertion.length
)
controller.enqueue(insertedHeadContent)
didAlreadyInsert = true
},
})
}
/**
* Creates a transform stream that injects an inline script as the first
* element inside <head>. Used during instant navigation testing to set
* self.__next_instant_test before any async bootstrap scripts execute.
*/
export async function createInstantTestScriptInsertionTransformStream(
requestId: string | null
): Promise<TransformStream<Uint8Array, Uint8Array>> {
// Kick off a fetch for the static RSC payload. This is the hydration
// source for the locked static shell — same as the __NEXT_CLIENT_RESUME
// fetch used for fallback routes, but with NEXT_INSTANT_PREFETCH_HEADER
// so the server returns static-only data.
//
// The fetch promise is stored as self.__next_instant_test, which doubles
// as the feature flag (truthy = instant test mode). The client processes
// this as a fallback prerender payload for hydration.
const segmentPath = '/_full'
const cacheBustingHeader = await computeCacheBustingSearchParam(
'1',
segmentPath,
undefined,
undefined
)
const searchStr = `${NEXT_RSC_UNION_QUERY}=${cacheBustingHeader}`
// In dev mode, inject self.__next_r (request ID) so that HMR WebSocket
// and debug channel initialization don't crash. The static shell
// bypasses renderToFizzStream which normally injects this via
// bootstrapScriptContent.
const requestIdScript =
requestId !== null ? `self.__next_r=${JSON.stringify(requestId)};` : ''
const INSTANT_TEST_SCRIPT = `<script>${requestIdScript}self.__next_instant_test=fetch(location.pathname+'?${searchStr}',{credentials:'same-origin',headers:{'${RSC_HEADER}':'1','${NEXT_ROUTER_PREFETCH_HEADER}':'1','${NEXT_ROUTER_SEGMENT_PREFETCH_HEADER}':'${segmentPath}','${NEXT_INSTANT_PREFETCH_HEADER}':'1'}})</script>`
let didAlreadyInsert = false
return new TransformStream({
transform(chunk, controller) {
if (didAlreadyInsert) {
// Already inserted the script into the head. Pass through.
controller.enqueue(chunk)
return
}
// Find the opening <head tag (may have attributes like <head class="...">)
const headOpenIndex = indexOfUint8Array(chunk, ENCODED_TAGS.OPENING.HEAD)
if (headOpenIndex === -1) {
controller.enqueue(chunk)
return
}
// Find the closing > of the <head ...> tag
const headCloseAngle = chunk.indexOf(
62, // '>'
headOpenIndex + ENCODED_TAGS.OPENING.HEAD.length
)
if (headCloseAngle === -1) {
controller.enqueue(chunk)
return
}
const encodedInsertion = encoder.encode(INSTANT_TEST_SCRIPT)
const insertionPoint = headCloseAngle + 1
// e.g.
// chunk = <!DOCTYPE html><html><head><meta charset="utf-8">...
// insertion = <script>self.__next_instant_test=fetch(...)</script>
// output = <!DOCTYPE html><html><head> [ <script>...</script> ] <meta charset="utf-8">...
const insertedHeadContent = new Uint8Array(
chunk.length + encodedInsertion.length
)
insertedHeadContent.set(chunk.slice(0, insertionPoint))
insertedHeadContent.set(encodedInsertion, insertionPoint)
insertedHeadContent.set(
chunk.slice(insertionPoint),
insertionPoint + encodedInsertion.length
)
controller.enqueue(insertedHeadContent)
didAlreadyInsert = true
},
flush(controller) {
// Append closing tags so the browser can parse the full document.
controller.enqueue(ENCODED_TAGS.CLOSED.BODY_AND_HTML)
},
})
}
// Suffix after main body content - scripts before </body>,
// but wait for the major chunks to be enqueued.
export function createDeferredSuffixStream(
suffix: string
): TransformStream<Uint8Array, Uint8Array> {
let flushed = false
let pending: DetachedPromise<void> | undefined
const flush = (controller: TransformStreamDefaultController) => {
const detached = new DetachedPromise<void>()
pending = detached
scheduleImmediate(() => {
try {
controller.enqueue(encoder.encode(suffix))
} catch {
// If an error occurs while enqueuing it can't be due to this
// transformers fault. It's likely due to the controller being
// errored due to the stream being cancelled.
} finally {
pending = undefined
detached.resolve()
}
})
}
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk)
// If we've already flushed, we're done.
if (flushed) return
// Schedule the flush to happen.
flushed = true
flush(controller)
},
flush(controller) {
if (pending) return pending.promise
if (flushed) return
// Flush now.
controller.enqueue(encoder.encode(suffix))
},
})
}
export function createFlightDataInjectionTransformStream(
stream: ReadableStream<Uint8Array>,
delayDataUntilFirstHtmlChunk: boolean
): TransformStream<Uint8Array, Uint8Array> {
let htmlStreamFinished = false
let pull: Promise<void> | null = null
let donePulling = false
function startOrContinuePulling(
controller: TransformStreamDefaultController
) {
if (!pull) {
pull = startPulling(controller)
}
return pull
}
async function startPulling(controller: TransformStreamDefaultController) {
const reader = stream.getReader()
if (delayDataUntilFirstHtmlChunk) {
// NOTE: streaming flush
// We are buffering here for the inlined data stream because the
// "shell" stream might be chunkenized again by the underlying stream
// implementation, e.g. with a specific high-water mark. To ensure it's
// the safe timing to pipe the data stream, this extra tick is
// necessary.
// We don't start reading until we've left the current Task to ensure
// that it's inserted after flushing the shell. Note that this implementation
// might get stale if impl details of Fizz change in the future.
await atLeastOneTask()
}
try {
while (true) {
const { done, value } = await reader.read()
if (done) {
donePulling = true
return
}
// We want to prioritize HTML over RSC data.
// The SSR render is based on the same RSC stream, so when we get a new RSC chunk,
// we're likely to produce an HTML chunk as well, so give it a chance to flush first.
if (!delayDataUntilFirstHtmlChunk && !htmlStreamFinished) {
await atLeastOneTask()
}
controller.enqueue(value)
}
} catch (err) {
controller.error(err)
}
}
return new TransformStream({
start(controller) {
if (!delayDataUntilFirstHtmlChunk) {
startOrContinuePulling(controller)
}
},
transform(chunk, controller) {
controller.enqueue(chunk)
// Start the streaming if it hasn't already been started yet.
if (delayDataUntilFirstHtmlChunk) {
startOrContinuePulling(controller)
}
},
flush(controller) {
htmlStreamFinished = true
if (donePulling) {
return
}
return startOrContinuePulling(controller)
},
})
}
export const CLOSE_TAG = '</body></html>'
/**
* This transform stream moves the suffix to the end of the stream, so results
* like `</body></html><script>...</script>` will be transformed to
* `<script>...</script></body></html>`.
*/
export function createMoveSuffixStream(): TransformStream<
Uint8Array,
Uint8Array
> {
let foundSuffix = false
return new TransformStream({
transform(chunk, controller) {
if (foundSuffix) {
return controller.enqueue(chunk)
}
const index = indexOfUint8Array(chunk, ENCODED_TAGS.CLOSED.BODY_AND_HTML)
if (index > -1) {
foundSuffix = true
// If the whole chunk is the suffix, then don't write anything, it will
// be written in the flush.
if (chunk.length === ENCODED_TAGS.CLOSED.BODY_AND_HTML.length) {
return
}
// Write out the part before the suffix.
const before = chunk.slice(0, index)
controller.enqueue(before)
// In the case where the suffix is in the middle of the chunk, we need
// to split the chunk into two parts.
if (chunk.length > ENCODED_TAGS.CLOSED.BODY_AND_HTML.length + index) {
// Write out the part after the suffix.
const after = chunk.slice(
index + ENCODED_TAGS.CLOSED.BODY_AND_HTML.length
)
controller.enqueue(after)
}
} else {
controller.enqueue(chunk)
}
},
flush(controller) {
// Even if we didn't find the suffix, the HTML is not valid if we don't
// add it, so insert it at the end.
controller.enqueue(ENCODED_TAGS.CLOSED.BODY_AND_HTML)
},
})
}
function createStripDocumentClosingTagsTransform(): TransformStream<
Uint8Array,
Uint8Array
> {
return new TransformStream({
transform(chunk, controller) {
// We rely on the assumption that chunks will never break across a code unit.
// This is reasonable because we currently concat all of React's output from a single
// flush into one chunk before streaming it forward which means the chunk will represent
// a single coherent utf-8 string. This is not safe to use if we change our streaming to no
// longer do this large buffered chunk
if (
isEquivalentUint8Arrays(chunk, ENCODED_TAGS.CLOSED.BODY_AND_HTML) ||
isEquivalentUint8Arrays(chunk, ENCODED_TAGS.CLOSED.BODY) ||
isEquivalentUint8Arrays(chunk, ENCODED_TAGS.CLOSED.HTML)
) {
// the entire chunk is the closing tags; return without enqueueing anything.
return
}
// We assume these tags will go at together at the end of the document and that
// they won't appear anywhere else in the document. This is not really a safe assumption
// but until we revamp our streaming infra this is a performant way to string the tags
chunk = removeFromUint8Array(chunk, ENCODED_TAGS.CLOSED.BODY)
chunk = removeFromUint8Array(chunk, ENCODED_TAGS.CLOSED.HTML)
controller.enqueue(chunk)
},
})
}
export function createHtmlDataDplIdTransformStream(
dplId: string
): TransformStream<Uint8Array, Uint8Array> {
let didTransform = false
return new TransformStream({
transform(chunk, controller) {
if (didTransform) {
controller.enqueue(chunk)
return
}
const htmlTagIndex = indexOfUint8Array(chunk, ENCODED_TAGS.OPENING.HTML)
if (htmlTagIndex === -1) {
controller.enqueue(chunk)
return
}
// Insert the data-dpl-id attribute right after "<html "
const insertionPoint = htmlTagIndex + ENCODED_TAGS.OPENING.HTML.length
const attribute = ` data-dpl-id="${dplId}"`
const encodedAttribute = encoder.encode(attribute)
const modifiedChunk = new Uint8Array(
chunk.length + encodedAttribute.length
)
// Copy everything before the insertion point
modifiedChunk.set(chunk.subarray(0, insertionPoint))
// Insert the attribute
modifiedChunk.set(encodedAttribute, insertionPoint)
// Copy everything after
modifiedChunk.set(
chunk.subarray(insertionPoint),
insertionPoint + encodedAttribute.length
)
controller.enqueue(modifiedChunk)
didTransform = true
},
})
}
/*
* Checks if the root layout is missing the html or body tags
* and if so, it will inject a script tag to throw an error in the browser, showing the user
* the error message in the error overlay.
*/
export function createRootLayoutValidatorStream(): TransformStream<
Uint8Array,
Uint8Array
> {
let foundHtml = false
let foundBody = false
return new TransformStream({
async transform(chunk, controller) {
// Peek into the streamed chunk to see if the tags are present.
if (
!foundHtml &&
indexOfUint8Array(chunk, ENCODED_TAGS.OPENING.HTML) > -1
) {
foundHtml = true
}
if (
!foundBody &&
indexOfUint8Array(chunk, ENCODED_TAGS.OPENING.BODY) > -1
) {
foundBody = true
}
controller.enqueue(chunk)
},
flush(controller) {
const missingTags: ('html' | 'body')[] = []
if (!foundHtml) missingTags.push('html')
if (!foundBody) missingTags.push('body')
if (!missingTags.length) return
controller.enqueue(
encoder.encode(
`<html id="__next_error__">
<template
data-next-error-message="Missing ${missingTags
.map((c) => `<${c}>`)
.join(
missingTags.length > 1 ? ' and ' : ''
)} tags in the root layout.\nRead more at https://nextjs.org/docs/messages/missing-root-layout-tags"
data-next-error-digest="${MISSING_ROOT_TAGS_ERROR}"
data-next-error-stack=""
></template>
`
)
)
},
})
}
export function chainTransformers<T>(
readable: ReadableStream<T>,
transformers: ReadonlyArray<TransformStream<T, T> | null>
): ReadableStream<T> {
let stream = readable
for (const transformer of transformers) {
if (!transformer) continue
stream = stream.pipeThrough(transformer)
}
return stream
}
export type ContinueStreamOptions = {
inlinedDataStream: ReadableStream<Uint8Array> | undefined
isStaticGeneration: boolean
deploymentId: string | undefined
getServerInsertedHTML: () => Promise<string>
getServerInsertedMetadata: () => Promise<string>
validateRootLayout?: boolean
/**
* Suffix to inject after the buffered data, but before the close tags.
*/
suffix?: string | undefined
}
export async function continueFizzStream(
renderStream: ReactDOMServerReadableStream,
{
suffix,
inlinedDataStream,
isStaticGeneration,
deploymentId,
getServerInsertedHTML,
getServerInsertedMetadata,
validateRootLayout,
}: ContinueStreamOptions
): Promise<ReadableStream<Uint8Array>> {
// Suffix itself might contain close tags at the end, so we need to split it.
const suffixUnclosed = suffix ? suffix.split(CLOSE_TAG, 1)[0] : null
if (isStaticGeneration) {
// If we're generating static HTML we need to wait for it to resolve before continuing.
await renderStream.allReady
} else {
// Otherwise, we want to make sure Fizz is done with all microtasky work
// before we start pulling the stream and cause a flush.
await waitAtLeastOneReactRenderTask()
}
return chainTransformers(renderStream, [
// Buffer everything to avoid flushing too frequently
createBufferedTransformStream(),
// Insert data-dpl-id attribute on the html tag
deploymentId ? createHtmlDataDplIdTransformStream(deploymentId) : null,
// Transform metadata
createMetadataTransformStream(getServerInsertedMetadata),
// Insert suffix content
suffixUnclosed != null && suffixUnclosed.length > 0
? createDeferredSuffixStream(suffixUnclosed)
: null,
// Insert the inlined data (Flight data, form state, etc.) stream into the HTML
inlinedDataStream
? createFlightDataInjectionTransformStream(inlinedDataStream, true)
: null,
// Validate the root layout for missing html or body tags
validateRootLayout ? createRootLayoutValidatorStream() : null,
// Close tags should always be deferred to the end
createMoveSuffixStream(),
// Special head insertions
// TODO-APP: Insert server side html to end of head in app layout rendering, to avoid
// hydration errors. Remove this once it's ready to be handled by react itself.
createHeadInsertionTransformStream(getServerInsertedHTML),
])
}
type ContinueDynamicPrerenderOptions = {
getServerInsertedHTML: () => Promise<string>
getServerInsertedMetadata: () => Promise<string>
deploymentId: string | undefined
}
export async function continueDynamicPrerender(
prerenderStream: ReadableStream<Uint8Array>,
{
getServerInsertedHTML,
getServerInsertedMetadata,
deploymentId,
}: ContinueDynamicPrerenderOptions
) {
return chainTransformers(prerenderStream, [
// Buffer everything to avoid flushing too frequently
createBufferedTransformStream(),
createStripDocumentClosingTagsTransform(),
// Insert data-dpl-id attribute on the html tag
deploymentId ? createHtmlDataDplIdTransformStream(deploymentId) : null,
// Insert generated tags to head
createHeadInsertionTransformStream(getServerInsertedHTML),
// Transform metadata
createMetadataTransformStream(getServerInsertedMetadata),
])
}
type ContinueStaticPrerenderOptions = {
inlinedDataStream: ReadableStream<Uint8Array>
getServerInsertedHTML: () => Promise<string>
getServerInsertedMetadata: () => Promise<string>
deploymentId: string | undefined
}
export async function continueStaticPrerender(
prerenderStream: ReadableStream<Uint8Array>,
{
inlinedDataStream,
getServerInsertedHTML,
getServerInsertedMetadata,
deploymentId,
}: ContinueStaticPrerenderOptions
) {
return chainTransformers(prerenderStream, [
// Buffer everything to avoid flushing too frequently
createBufferedTransformStream(),
// Add build id comment to start of the HTML document (in export mode)
// Insert data-dpl-id attribute on the html tag
deploymentId ? createHtmlDataDplIdTransformStream(deploymentId) : null,
// Insert generated tags to head
createHeadInsertionTransformStream(getServerInsertedHTML),
// Transform metadata
createMetadataTransformStream(getServerInsertedMetadata),
// Insert the inlined data (Flight data, form state, etc.) stream into the HTML
createFlightDataInjectionTransformStream(inlinedDataStream, true),
// Close tags should always be deferred to the end
createMoveSuffixStream(),
])
}
export async function continueStaticFallbackPrerender(
prerenderStream: ReadableStream<Uint8Array>,
{
inlinedDataStream,
getServerInsertedHTML,
getServerInsertedMetadata,
deploymentId,
}: ContinueStaticPrerenderOptions
) {
// Same as `continueStaticPrerender`, but also inserts an additional script
// to instruct the client to start fetching the hydration data as early
// as possible.
return chainTransformers(prerenderStream, [
// Buffer everything to avoid flushing too frequently
createBufferedTransformStream(),
// Insert data-dpl-id attribute on the html tag
deploymentId ? createHtmlDataDplIdTransformStream(deploymentId) : null,
// Insert generated tags to head
createHeadInsertionTransformStream(getServerInsertedHTML),
// Insert the client resume script into the head
await createClientResumeScriptInsertionTransformStream(),
// Transform metadata
createMetadataTransformStream(getServerInsertedMetadata),
// Insert the inlined data (Flight data, form state, etc.) stream into the HTML
createFlightDataInjectionTransformStream(inlinedDataStream, true),
// Close tags should always be deferred to the end
createMoveSuffixStream(),
])
}
type ContinueResumeOptions = {
inlinedDataStream: ReadableStream<Uint8Array>
getServerInsertedHTML: () => Promise<string>
getServerInsertedMetadata: () => Promise<string>
delayDataUntilFirstHtmlChunk: boolean
deploymentId: string | undefined
}
export async function continueDynamicHTMLResume(
renderStream: ReadableStream<Uint8Array>,
{
delayDataUntilFirstHtmlChunk,
inlinedDataStream,
getServerInsertedHTML,
getServerInsertedMetadata,
deploymentId,
}: ContinueResumeOptions
) {
return chainTransformers(renderStream, [
// Buffer everything to avoid flushing too frequently
createBufferedTransformStream(),
// Insert data-dpl-id attribute on the html tag
deploymentId ? createHtmlDataDplIdTransformStream(deploymentId) : null,
// Insert generated tags to head
createHeadInsertionTransformStream(getServerInsertedHTML),
// Transform metadata
createMetadataTransformStream(getServerInsertedMetadata),
// Insert the inlined data (Flight data, form state, etc.) stream into the HTML
createFlightDataInjectionTransformStream(
inlinedDataStream,
delayDataUntilFirstHtmlChunk
),
// Close tags should always be deferred to the end
createMoveSuffixStream(),
])
}
export function createDocumentClosingStream(): ReadableStream<Uint8Array> {
return streamFromString(CLOSE_TAG)
}
// ---------------------------------------------------------------------------
// Runtime prefetch transform (Web streams)
// ---------------------------------------------------------------------------
/**
* Web TransformStream that replaces the runtime prefetch sentinel in an RSC
* payload stream: `[<sentinel>]` -> `[<isPartial>,<staleTime>]`.
*
* This is the web equivalent of createRuntimePrefetchNodeTransform
* in node-stream-helpers.ts.
*/
export function createRuntimePrefetchTransformStream(
sentinel: number,
isPartial: boolean,
staleTime: number
): TransformStream<Uint8Array, Uint8Array> {
const enc = new TextEncoder()
// Search for: [<sentinel>]
// Replace with: [<isPartial>,<staleTime>]
const search = enc.encode(`[${sentinel}]`)
const first = search[0]
const replace = enc.encode(`[${isPartial},${staleTime}]`)
const searchLen = search.length
let currentChunk: Uint8Array | null = null
let found = false
function processChunk(
controller: TransformStreamDefaultController<Uint8Array>,
nextChunk: null | Uint8Array
) {
if (found) {
if (nextChunk) {
controller.enqueue(nextChunk)
}
return
}
if (currentChunk) {
// We can't search past the index that can contain a full match
let exclusiveUpperBound = currentChunk.length - (searchLen - 1)
if (nextChunk) {
// If we have any overflow bytes we can search up to the chunk's final byte
exclusiveUpperBound += Math.min(nextChunk.length, searchLen - 1)
}
if (exclusiveUpperBound < 1) {
// we can't match the current chunk.
controller.enqueue(currentChunk)
currentChunk = nextChunk // advance so we don't process this chunk again
return
}
let currentIndex = currentChunk.indexOf(first)
// check the current candidate match if it is within the bounds of our search space for the currentChunk
candidateLoop: while (
-1 < currentIndex &&
currentIndex < exclusiveUpperBound
) {
// We already know index 0 matches because we used indexOf to find the candidateIndex so we start at index 1
let matchIndex = 1
while (matchIndex < searchLen) {
const candidateIndex = currentIndex + matchIndex
const candidateValue =
candidateIndex < currentChunk.length
? currentChunk[candidateIndex]
: // if we ever hit this condition it is because there is a nextChunk we can read from
nextChunk![candidateIndex - currentChunk.length]
if (candidateValue !== search[matchIndex]) {
// No match, reset and continue the search from the next position
currentIndex = currentChunk.indexOf(first, currentIndex + 1)
continue candidateLoop
}
matchIndex++
}
// We found a complete match. currentIndex is our starting point to replace the value.
found = true
// enqueue everything up to the match
controller.enqueue(currentChunk.subarray(0, currentIndex))
// enqueue the replacement value
controller.enqueue(replace)
// If there are bytes in the currentChunk after the match enqueue them
if (currentIndex + searchLen < currentChunk.length) {
controller.enqueue(currentChunk.slice(currentIndex + searchLen))
}
// If we have a next chunk we enqueue it now
if (nextChunk) {
// if replacement spills over to the next chunk we first exclude the replaced bytes
const overflowBytes = currentIndex + searchLen - currentChunk.length
const truncatedChunk =
overflowBytes > 0 ? nextChunk!.subarray(overflowBytes) : nextChunk
controller.enqueue(truncatedChunk)
}
// We are now in found mode and don't need to track currentChunk anymore
currentChunk = null
return
}
// No match found in this chunk, emit it and wait for the next one
controller.enqueue(currentChunk)
}
// Advance to the next chunk
currentChunk = nextChunk
}
return new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
processChunk(controller, chunk)
},
flush(controller) {
processChunk(controller, null)
},
})
}