next.js/packages/next/src/trace/report/to-json.ts
to-json.ts170 lines4.6 KB
import { traceGlobals, traceId } from '../shared'
import fs from 'fs'
import path from 'path'
import { PHASE_DEVELOPMENT_SERVER } from '../../shared/lib/constants'
import type { TraceEvent } from '../types'
import type { Reporter } from './types'

// Batch events as zipkin allows for multiple events to be sent in one go
export function batcher(reportEvents: (evts: TraceEvent[]) => Promise<void>) {
  const events: TraceEvent[] = []
  // Promise queue to ensure events are always sent on flushAll
  const queue = new Set()
  return {
    flushAll: async () => {
      await Promise.all(queue)
      if (events.length > 0) {
        await reportEvents(events)
        events.length = 0
      }
    },
    report: (event: TraceEvent) => {
      events.push(event)

      if (events.length > 100) {
        const evts = events.slice()
        events.length = 0
        const report = reportEvents(evts)
        queue.add(report)
        report.then(() => queue.delete(report))
      }
    },
  }
}

const writeStreamOptions = {
  flags: 'a',
  encoding: 'utf8' as const,
}
class RotatingWriteStream {
  file: string
  writeStream!: fs.WriteStream
  size: number
  sizeLimit: number
  private rotatePromise: Promise<void> | undefined
  private drainPromise: Promise<void> | undefined
  constructor(file: string, sizeLimit: number) {
    this.file = file
    this.size = 0
    this.sizeLimit = sizeLimit
    this.createWriteStream()
  }
  private createWriteStream() {
    const phase = traceGlobals.get('phase')
    this.writeStream = fs.createWriteStream(this.file, {
      ...writeStreamOptions,
      // In dev, append so traces accumulate across sessions. In production,
      // truncate so each build starts with a fresh trace file.
      flags: phase === PHASE_DEVELOPMENT_SERVER ? 'a' : 'w',
    })
  }
  // Recreate the file
  private async rotate() {
    await this.end()
    try {
      fs.unlinkSync(this.file)
    } catch (err: any) {
      // It's fine if the file does not exist yet
      if (err.code !== 'ENOENT') {
        throw err
      }
    }
    this.size = 0
    this.createWriteStream()
    this.rotatePromise = undefined
  }
  async write(data: string): Promise<void> {
    if (this.rotatePromise) await this.rotatePromise

    this.size += data.length
    if (this.size > this.sizeLimit) {
      await (this.rotatePromise = this.rotate())
    }

    if (!this.writeStream.write(data, 'utf8')) {
      if (this.drainPromise === undefined) {
        this.drainPromise = new Promise<void>((resolve, _reject) => {
          this.writeStream.once('drain', () => {
            this.drainPromise = undefined
            resolve()
          })
        })
      }
      await this.drainPromise
    }
  }

  end(): Promise<void> {
    return new Promise((resolve) => {
      this.writeStream.end(resolve)
    })
  }
}

export function createJsonReporter(options: {
  filename: string
  sizeLimit: number | ((phase: string) => number)
  filter?: (event: TraceEvent) => boolean
}): Reporter {
  let writeStream: RotatingWriteStream
  let batch: ReturnType<typeof batcher> | undefined

  function report(event: TraceEvent) {
    if (options.filter && !options.filter(event)) {
      return
    }

    const distDir = traceGlobals.get('distDir')
    const phase = traceGlobals.get('phase')
    if (!distDir || !phase) {
      return
    }

    if (!batch) {
      batch = batcher(async (events: TraceEvent[]) => {
        if (!writeStream) {
          await fs.promises.mkdir(distDir, { recursive: true })
          const file = path.join(distDir, options.filename)
          const limit =
            typeof options.sizeLimit === 'function'
              ? options.sizeLimit(phase)
              : options.sizeLimit
          writeStream = new RotatingWriteStream(file, limit)
        }
        const eventsJson = JSON.stringify(events)
        try {
          await writeStream.write(eventsJson + '\n')
        } catch (err) {
          console.log(err)
        }
      })
    }

    batch.report({
      ...event,
      traceId,
    })
  }

  return {
    flushAll: (opts?: { end: boolean }) =>
      batch
        ? batch.flushAll().then(() => {
            const phase = traceGlobals.get('phase')
            // Only end writeStream when manually flushing in production
            if (opts?.end || phase !== PHASE_DEVELOPMENT_SERVER) {
              return writeStream.end()
            }
          })
        : undefined,
    report,
  }
}

export default createJsonReporter({
  filename: 'trace',
  sizeLimit: (phase) =>
    // Development is limited to 50MB, production is unlimited
    phase === PHASE_DEVELOPMENT_SERVER ? 52428800 : Infinity,
})
Quest for Codev2.0.0
/
SIGN IN