next.js/turbopack/crates/turbo-persistence/src/meta_file_builder.rs
meta_file_builder.rs99 lines3.1 KB
use std::{
    fs::File,
    io::{self, BufWriter, Write},
    path::Path,
};

use anyhow::{Context, Result};
use byteorder::{BE, WriteBytesExt};
use qfilter::Filter;
use zerocopy::IntoBytes;

use crate::{meta_file::EntryHeader, static_sorted_file_builder::StaticSortedFileBuilderMeta};

pub struct MetaFileBuilder<'a> {
    family: u32,
    /// Entries in the meta file, tuples of (sequence_number, StaticSortedFileBuilderMetaResult)
    entries: Vec<(u32, StaticSortedFileBuilderMeta<'a>)>,
    /// Obsolete SST files, represented by their sequence numbers
    obsolete_sst_files: Vec<u32>,
    /// Optional AMQF for used key hashes
    used_key_hashes_amqf: Option<Filter>,
}

impl<'a> MetaFileBuilder<'a> {
    pub fn new(family: u32) -> Self {
        Self {
            family,
            entries: Vec::new(),
            obsolete_sst_files: Vec::new(),
            used_key_hashes_amqf: None,
        }
    }

    pub fn add(&mut self, sequence_number: u32, sst: StaticSortedFileBuilderMeta<'a>) {
        self.entries.push((sequence_number, sst));
    }

    pub fn add_obsolete_sst_file(&mut self, sequence_number: u32) {
        self.obsolete_sst_files.push(sequence_number);
    }

    pub fn set_used_key_hashes_amqf(&mut self, amqf: Filter) {
        self.used_key_hashes_amqf = Some(amqf);
    }

    #[tracing::instrument(level = "trace", skip_all)]
    pub fn write(self, db_path: &Path, seq: u32) -> Result<File> {
        let file = db_path.join(format!("{seq:08}.meta"));
        self.write_internal(&file)
            .with_context(|| format!("Unable to write meta file {seq:08}.meta"))
    }

    fn write_internal(mut self, file: &Path) -> io::Result<File> {
        let mut file = BufWriter::new(File::create(file)?);
        file.write_u32::<BE>(0xFE4ADA4A)?; // Magic number
        file.write_u32::<BE>(self.family)?;

        self.obsolete_sst_files.sort();
        file.write_u32::<BE>(self.obsolete_sst_files.len() as u32)?;
        for obsolete_sst in &self.obsolete_sst_files {
            file.write_u32::<BE>(*obsolete_sst)?;
        }

        file.write_u32::<BE>(self.entries.len() as u32)?;

        let mut amqf_offset = 0;
        for (sequence_number, sst) in &self.entries {
            amqf_offset += sst.amqf.len();
            let header = EntryHeader::new(
                *sequence_number,
                sst.block_count,
                sst.min_hash,
                sst.max_hash,
                sst.size,
                sst.flags,
                amqf_offset as u32,
            );
            file.write_all(header.as_bytes())?;
        }
        let serialized_used_key_hashes = self
            .used_key_hashes_amqf
            .as_ref()
            .map(|f| postcard::to_allocvec(f).expect("AMQF serialization failed"));
        amqf_offset += serialized_used_key_hashes
            .as_ref()
            .map(|bytes| bytes.len())
            .unwrap_or(0);
        file.write_u32::<BE>(amqf_offset as u32)?;

        for (_, sst) in &self.entries {
            file.write_all(&sst.amqf)?;
        }
        if let Some(bytes) = &serialized_used_key_hashes {
            file.write_all(bytes)?;
        }
        Ok(file.into_inner()?)
    }
}
Quest for Codev2.0.0
/
SIGN IN