import type { ChildProcess } from 'child_process'
import { Worker as JestWorker } from 'next/dist/compiled/jest-worker'
import { Transform } from 'stream'
import {
formatDebugAddress,
formatNodeOptions,
getNodeDebugType,
getParsedDebugAddress,
getParsedNodeOptions,
type DebugAddress,
} from '../server/lib/utils'
type FarmOptions = NonNullable<ConstructorParameters<typeof JestWorker>[1]>
const RESTARTED = Symbol('restarted')
const cleanupWorkers = (worker: JestWorker) => {
for (const curWorker of ((worker as any)._workerPool?._workers || []) as {
_child?: ChildProcess
}[]) {
curWorker._child?.kill('SIGINT')
}
}
export function getNextBuildDebuggerPortOffset(_: {
kind: 'export-page'
}): number {
// 0: export worker
return 0
}
export class Worker {
private _worker: JestWorker | undefined
private _onActivity: (() => void) | undefined
private _onActivityAbort: (() => void) | undefined
constructor(
workerPath: string,
options: Omit<FarmOptions, 'forkOptions'> & {
forkOptions?:
| (Omit<NonNullable<FarmOptions['forkOptions']>, 'env'> & {
env?: Partial<NodeJS.ProcessEnv> | undefined
})
| undefined
/**
* `-1` if not inspectable
*/
debuggerPortOffset: number
enableSourceMaps?: boolean
/**
* True if `--max-old-space-size` should not be forwarded to the worker.
*/
isolatedMemory: boolean
timeout?: number
onActivity?: () => void
onActivityAbort?: () => void
onRestart?: (method: string, args: any[], attempts: number) => void
logger?: Pick<typeof console, 'error' | 'info' | 'warn'>
exposedMethods: ReadonlyArray<string>
enableWorkerThreads?: boolean
}
) {
let {
enableSourceMaps,
timeout,
onRestart,
logger = console,
debuggerPortOffset,
isolatedMemory,
onActivity,
onActivityAbort,
...farmOptions
} = options
this._onActivity = onActivity
this._onActivityAbort = onActivityAbort
let restartPromise: Promise<typeof RESTARTED>
let resolveRestartPromise: (arg: typeof RESTARTED) => void
let activeTasks = 0
this._worker = undefined
// ensure we end workers if they weren't before exit
process.on('exit', () => {
this.close()
})
const nodeOptions = getParsedNodeOptions()
const originalOptions = { ...nodeOptions }
delete nodeOptions.inspect
delete nodeOptions['inspect-brk']
delete nodeOptions['inspect_brk']
if (debuggerPortOffset !== -1) {
const nodeDebugType = getNodeDebugType(originalOptions)
if (nodeDebugType) {
const debuggerAddress = getParsedDebugAddress(
originalOptions[nodeDebugType]
)
const address: DebugAddress = {
host: debuggerAddress.host,
// current process runs on `address.port`
port:
debuggerAddress.port === 0
? 0
: debuggerAddress.port + 1 + debuggerPortOffset,
}
nodeOptions[nodeDebugType] = formatDebugAddress(address)
}
}
if (enableSourceMaps) {
nodeOptions['enable-source-maps'] = true
}
if (isolatedMemory) {
delete nodeOptions['max-old-space-size']
delete nodeOptions['max_old_space_size']
}
const { nodeOptions: formattedNodeOptions, execArgv } =
formatNodeOptions(nodeOptions)
const createWorker = () => {
const workerEnv: NodeJS.ProcessEnv = {
...process.env,
...((farmOptions.forkOptions?.env || {}) as any),
IS_NEXT_WORKER: 'true',
NODE_OPTIONS: formattedNodeOptions,
}
if (workerEnv.FORCE_COLOR === undefined) {
// Mirror the enablement heuristic from picocolors (see https://github.com/vercel/next.js/blob/6a40da0345939fe4f7b1ae519b296a86dd103432/packages/next/src/lib/picocolors.ts#L21-L24).
// Picocolors snapshots `process.env`/`stdout.isTTY` at module load time, so when the worker
// process bootstraps with piped stdio its own check would disable colors. Re-evaluating the
// same conditions here lets us opt the worker into color output only when the parent would
// have seen colors, while still respecting explicit opt-outs like NO_COLOR.
const supportsColors =
!workerEnv.NO_COLOR &&
!workerEnv.CI &&
workerEnv.TERM !== 'dumb' &&
(process.stdout.isTTY || process.stderr?.isTTY)
if (supportsColors) {
workerEnv.FORCE_COLOR = '1'
}
}
this._worker = new JestWorker(workerPath, {
...farmOptions,
forkOptions: {
...farmOptions.forkOptions,
execArgv: [...execArgv, ...(farmOptions.forkOptions?.execArgv || [])],
env: workerEnv,
},
maxRetries: 0,
}) as JestWorker
restartPromise = new Promise(
(resolve) => (resolveRestartPromise = resolve)
)
/**
* Jest Worker has two worker types, ChildProcessWorker (uses child_process) and NodeThreadWorker (uses worker_threads)
* Next.js uses ChildProcessWorker by default, but it can be switched to NodeThreadWorker with an experimental flag
*
* We only want to handle ChildProcessWorker's orphan process issue, so we access the private property "_child":
* https://github.com/facebook/jest/blob/b38d7d345a81d97d1dc3b68b8458b1837fbf19be/packages/jest-worker/src/workers/ChildProcessWorker.ts
*
* But this property is not available in NodeThreadWorker, so we need to check if we are using ChildProcessWorker
*/
if (!farmOptions.enableWorkerThreads) {
for (const worker of ((this._worker as any)._workerPool?._workers ||
[]) as {
_child?: ChildProcess
}[]) {
worker._child?.on('exit', (code, signal) => {
if ((code || (signal && signal !== 'SIGINT')) && this._worker) {
logger.error(
`Next.js build worker exited with code: ${code} and signal: ${signal}`
)
// if a child process doesn't exit gracefully, we want to bubble up the exit code to the parent process
process.exit(code ?? 1)
}
})
// if a child process emits a particular message, we track that as activity
// so the parent process can keep track of progress
worker._child?.on('message', ([, data]: [number, unknown]) => {
if (
data &&
typeof data === 'object' &&
'type' in data &&
data.type === 'activity'
) {
onActivityImpl()
}
})
}
}
let aborted = false
const onActivityAbortImpl = () => {
if (!aborted) {
this._onActivityAbort?.()
aborted = true
}
}
// Listen to the worker's stdout and stderr, if there's any thing logged, abort the activity first
const abortActivityStreamOnLog = new Transform({
transform(_chunk, _encoding, callback) {
onActivityAbortImpl()
callback()
},
})
// Stop the activity if there's any output from the worker
this._worker.getStdout().pipe(abortActivityStreamOnLog)
this._worker.getStderr().pipe(abortActivityStreamOnLog)
// Pipe the worker's stdout and stderr to the parent process
this._worker.getStdout().pipe(process.stdout)
this._worker.getStderr().pipe(process.stderr)
}
createWorker()
const onHanging = () => {
const worker = this._worker
if (!worker) return
const resolve = resolveRestartPromise
createWorker()
logger.warn(
`Sending SIGTERM signal to static worker due to timeout${
timeout ? ` of ${timeout / 1000} seconds` : ''
}. Subsequent errors may be a result of the worker exiting.`
)
worker.end().then(() => {
resolve(RESTARTED)
})
}
let hangingTimer: NodeJS.Timeout | false = false
const onActivityImpl = () => {
if (hangingTimer) clearTimeout(hangingTimer)
if (this._onActivity) this._onActivity()
hangingTimer = activeTasks > 0 && setTimeout(onHanging, timeout)
}
// TODO: Remove this once callers stop passing non-serializable values
// (e.g. functions) in worker method arguments. The structured clone
// algorithm used by worker_threads rejects functions, unlike
// child_process which silently drops them via JSON serialization.
const sanitizeArgs = farmOptions.enableWorkerThreads
? (args: any[]) => JSON.parse(JSON.stringify(args))
: (args: any[]) => args
for (const method of farmOptions.exposedMethods) {
if (method.startsWith('_')) continue
;(this as any)[method] = timeout
? // eslint-disable-next-line no-loop-func
async (...args: any[]) => {
activeTasks++
const sanitizedArgs = sanitizeArgs(args)
try {
let attempts = 0
for (;;) {
onActivityImpl()
const result = await Promise.race([
(this._worker as any)[method](...sanitizedArgs),
restartPromise,
])
if (result !== RESTARTED) return result
if (onRestart) onRestart(method, sanitizedArgs, ++attempts)
}
} finally {
activeTasks--
onActivityImpl()
}
}
: (...args: any[]) =>
(this._worker as any)[method](...sanitizeArgs(args))
}
}
setOnActivity(onActivity: (() => void) | undefined): void {
this._onActivity = onActivity
}
setOnActivityAbort(onActivityAbort: (() => void) | undefined): void {
this._onActivityAbort = onActivityAbort
}
end(): ReturnType<JestWorker['end']> {
const worker = this._worker
if (!worker) {
throw new Error('Farm is ended, no more calls can be done to it')
}
cleanupWorkers(worker)
this._worker = undefined
return worker.end()
}
/**
* Quietly end the worker if it exists
*/
close(): void {
if (this._worker) {
cleanupWorkers(this._worker)
this._worker.end()
}
}
}