next.js/packages/next/src/lib/worker.ts
worker.ts315 lines10.1 KB
import type { ChildProcess } from 'child_process'
import { Worker as JestWorker } from 'next/dist/compiled/jest-worker'
import { Transform } from 'stream'
import {
  formatDebugAddress,
  formatNodeOptions,
  getNodeDebugType,
  getParsedDebugAddress,
  getParsedNodeOptions,
  type DebugAddress,
} from '../server/lib/utils'

type FarmOptions = NonNullable<ConstructorParameters<typeof JestWorker>[1]>

const RESTARTED = Symbol('restarted')

const cleanupWorkers = (worker: JestWorker) => {
  for (const curWorker of ((worker as any)._workerPool?._workers || []) as {
    _child?: ChildProcess
  }[]) {
    curWorker._child?.kill('SIGINT')
  }
}

export function getNextBuildDebuggerPortOffset(_: {
  kind: 'export-page'
}): number {
  // 0: export worker
  return 0
}

export class Worker {
  private _worker: JestWorker | undefined

  private _onActivity: (() => void) | undefined
  private _onActivityAbort: (() => void) | undefined

  constructor(
    workerPath: string,
    options: Omit<FarmOptions, 'forkOptions'> & {
      forkOptions?:
        | (Omit<NonNullable<FarmOptions['forkOptions']>, 'env'> & {
            env?: Partial<NodeJS.ProcessEnv> | undefined
          })
        | undefined
      /**
       * `-1` if not inspectable
       */
      debuggerPortOffset: number
      enableSourceMaps?: boolean
      /**
       * True if `--max-old-space-size` should not be forwarded to the worker.
       */
      isolatedMemory: boolean
      timeout?: number
      onActivity?: () => void
      onActivityAbort?: () => void
      onRestart?: (method: string, args: any[], attempts: number) => void
      logger?: Pick<typeof console, 'error' | 'info' | 'warn'>
      exposedMethods: ReadonlyArray<string>
      enableWorkerThreads?: boolean
    }
  ) {
    let {
      enableSourceMaps,
      timeout,
      onRestart,
      logger = console,
      debuggerPortOffset,
      isolatedMemory,
      onActivity,
      onActivityAbort,
      ...farmOptions
    } = options

    this._onActivity = onActivity
    this._onActivityAbort = onActivityAbort

    let restartPromise: Promise<typeof RESTARTED>
    let resolveRestartPromise: (arg: typeof RESTARTED) => void
    let activeTasks = 0

    this._worker = undefined

    // ensure we end workers if they weren't before exit
    process.on('exit', () => {
      this.close()
    })

    const nodeOptions = getParsedNodeOptions()
    const originalOptions = { ...nodeOptions }
    delete nodeOptions.inspect
    delete nodeOptions['inspect-brk']
    delete nodeOptions['inspect_brk']
    if (debuggerPortOffset !== -1) {
      const nodeDebugType = getNodeDebugType(originalOptions)
      if (nodeDebugType) {
        const debuggerAddress = getParsedDebugAddress(
          originalOptions[nodeDebugType]
        )
        const address: DebugAddress = {
          host: debuggerAddress.host,
          // current process runs on `address.port`
          port:
            debuggerAddress.port === 0
              ? 0
              : debuggerAddress.port + 1 + debuggerPortOffset,
        }
        nodeOptions[nodeDebugType] = formatDebugAddress(address)
      }
    }

    if (enableSourceMaps) {
      nodeOptions['enable-source-maps'] = true
    }

    if (isolatedMemory) {
      delete nodeOptions['max-old-space-size']
      delete nodeOptions['max_old_space_size']
    }

    const { nodeOptions: formattedNodeOptions, execArgv } =
      formatNodeOptions(nodeOptions)

    const createWorker = () => {
      const workerEnv: NodeJS.ProcessEnv = {
        ...process.env,
        ...((farmOptions.forkOptions?.env || {}) as any),
        IS_NEXT_WORKER: 'true',
        NODE_OPTIONS: formattedNodeOptions,
      }

      if (workerEnv.FORCE_COLOR === undefined) {
        // Mirror the enablement heuristic from picocolors (see https://github.com/vercel/next.js/blob/6a40da0345939fe4f7b1ae519b296a86dd103432/packages/next/src/lib/picocolors.ts#L21-L24).
        // Picocolors snapshots `process.env`/`stdout.isTTY` at module load time, so when the worker
        // process bootstraps with piped stdio its own check would disable colors. Re-evaluating the
        // same conditions here lets us opt the worker into color output only when the parent would
        // have seen colors, while still respecting explicit opt-outs like NO_COLOR.
        const supportsColors =
          !workerEnv.NO_COLOR &&
          !workerEnv.CI &&
          workerEnv.TERM !== 'dumb' &&
          (process.stdout.isTTY || process.stderr?.isTTY)

        if (supportsColors) {
          workerEnv.FORCE_COLOR = '1'
        }
      }

      this._worker = new JestWorker(workerPath, {
        ...farmOptions,
        forkOptions: {
          ...farmOptions.forkOptions,
          execArgv: [...execArgv, ...(farmOptions.forkOptions?.execArgv || [])],
          env: workerEnv,
        },
        maxRetries: 0,
      }) as JestWorker
      restartPromise = new Promise(
        (resolve) => (resolveRestartPromise = resolve)
      )

      /**
       * Jest Worker has two worker types, ChildProcessWorker (uses child_process) and NodeThreadWorker (uses worker_threads)
       * Next.js uses ChildProcessWorker by default, but it can be switched to NodeThreadWorker with an experimental flag
       *
       * We only want to handle ChildProcessWorker's orphan process issue, so we access the private property "_child":
       * https://github.com/facebook/jest/blob/b38d7d345a81d97d1dc3b68b8458b1837fbf19be/packages/jest-worker/src/workers/ChildProcessWorker.ts
       *
       * But this property is not available in NodeThreadWorker, so we need to check if we are using ChildProcessWorker
       */
      if (!farmOptions.enableWorkerThreads) {
        for (const worker of ((this._worker as any)._workerPool?._workers ||
          []) as {
          _child?: ChildProcess
        }[]) {
          worker._child?.on('exit', (code, signal) => {
            if ((code || (signal && signal !== 'SIGINT')) && this._worker) {
              logger.error(
                `Next.js build worker exited with code: ${code} and signal: ${signal}`
              )

              // if a child process doesn't exit gracefully, we want to bubble up the exit code to the parent process
              process.exit(code ?? 1)
            }
          })

          // if a child process emits a particular message, we track that as activity
          // so the parent process can keep track of progress
          worker._child?.on('message', ([, data]: [number, unknown]) => {
            if (
              data &&
              typeof data === 'object' &&
              'type' in data &&
              data.type === 'activity'
            ) {
              onActivityImpl()
            }
          })
        }
      }

      let aborted = false
      const onActivityAbortImpl = () => {
        if (!aborted) {
          this._onActivityAbort?.()
          aborted = true
        }
      }

      // Listen to the worker's stdout and stderr, if there's any thing logged, abort the activity first
      const abortActivityStreamOnLog = new Transform({
        transform(_chunk, _encoding, callback) {
          onActivityAbortImpl()
          callback()
        },
      })
      // Stop the activity if there's any output from the worker
      this._worker.getStdout().pipe(abortActivityStreamOnLog)
      this._worker.getStderr().pipe(abortActivityStreamOnLog)

      // Pipe the worker's stdout and stderr to the parent process
      this._worker.getStdout().pipe(process.stdout)
      this._worker.getStderr().pipe(process.stderr)
    }
    createWorker()

    const onHanging = () => {
      const worker = this._worker
      if (!worker) return
      const resolve = resolveRestartPromise
      createWorker()
      logger.warn(
        `Sending SIGTERM signal to static worker due to timeout${
          timeout ? ` of ${timeout / 1000} seconds` : ''
        }. Subsequent errors may be a result of the worker exiting.`
      )
      worker.end().then(() => {
        resolve(RESTARTED)
      })
    }

    let hangingTimer: NodeJS.Timeout | false = false

    const onActivityImpl = () => {
      if (hangingTimer) clearTimeout(hangingTimer)
      if (this._onActivity) this._onActivity()

      hangingTimer = activeTasks > 0 && setTimeout(onHanging, timeout)
    }

    // TODO: Remove this once callers stop passing non-serializable values
    // (e.g. functions) in worker method arguments. The structured clone
    // algorithm used by worker_threads rejects functions, unlike
    // child_process which silently drops them via JSON serialization.
    const sanitizeArgs = farmOptions.enableWorkerThreads
      ? (args: any[]) => JSON.parse(JSON.stringify(args))
      : (args: any[]) => args

    for (const method of farmOptions.exposedMethods) {
      if (method.startsWith('_')) continue
      ;(this as any)[method] = timeout
        ? // eslint-disable-next-line no-loop-func
          async (...args: any[]) => {
            activeTasks++
            const sanitizedArgs = sanitizeArgs(args)
            try {
              let attempts = 0
              for (;;) {
                onActivityImpl()
                const result = await Promise.race([
                  (this._worker as any)[method](...sanitizedArgs),
                  restartPromise,
                ])
                if (result !== RESTARTED) return result
                if (onRestart) onRestart(method, sanitizedArgs, ++attempts)
              }
            } finally {
              activeTasks--
              onActivityImpl()
            }
          }
        : (...args: any[]) =>
            (this._worker as any)[method](...sanitizeArgs(args))
    }
  }

  setOnActivity(onActivity: (() => void) | undefined): void {
    this._onActivity = onActivity
  }
  setOnActivityAbort(onActivityAbort: (() => void) | undefined): void {
    this._onActivityAbort = onActivityAbort
  }

  end(): ReturnType<JestWorker['end']> {
    const worker = this._worker
    if (!worker) {
      throw new Error('Farm is ended, no more calls can be done to it')
    }
    cleanupWorkers(worker)
    this._worker = undefined
    return worker.end()
  }

  /**
   * Quietly end the worker if it exists
   */
  close(): void {
    if (this._worker) {
      cleanupWorkers(this._worker)
      this._worker.end()
    }
  }
}
Quest for Codev2.0.0
/
SIGN IN