next.js/turbopack/crates/turbo-persistence/src/write_batch.rs
write_batch.rs588 lines24.7 KB
use std::{
    cell::SyncUnsafeCell,
    fs::File,
    io::Write,
    mem::{replace, take},
    path::PathBuf,
    sync::atomic::{AtomicU32, AtomicU64, Ordering},
};

use anyhow::{Context, Result};
use byteorder::{BE, WriteBytesExt};
use either::Either;
use parking_lot::Mutex;
use smallvec::SmallVec;
use thread_local::ThreadLocal;

use crate::{
    FamilyConfig, ValueBuffer,
    collector::Collector,
    collector_entry::CollectorEntry,
    compression::{checksum_block, compress_into_buffer},
    constants::{MAX_MEDIUM_VALUE_SIZE, THREAD_LOCAL_SIZE_SHIFT},
    db::WriteOperationGuard,
    key::StoreKey,
    meta_file::MetaEntryFlags,
    meta_file_builder::MetaFileBuilder,
    parallel_scheduler::ParallelScheduler,
    static_sorted_file_builder::{StaticSortedFileBuilderMeta, write_static_stored_file},
};

/// The thread local state of a `WriteBatch`. `FAMILIES` should fit within a `u32`.
//
// NOTE: This type *must* use `usize`, even though the real type used in storage is `u32` because
// there's no way to cast a `u32` to `usize` when declaring an array without the nightly
// `min_generic_const_args` feature.
struct ThreadLocalState<K: StoreKey + Send, const FAMILIES: usize> {
    /// The collectors for each family.
    collectors: [Option<Collector<K, THREAD_LOCAL_SIZE_SHIFT>>; FAMILIES],
    /// The list of new blob files that have been created.
    /// Tuple of (sequence number, file).
    new_blob_files: Vec<(u32, File)>,
}

const COLLECTOR_SHARDS: usize = 4;
const COLLECTOR_SHARD_SHIFT: usize =
    u64::BITS as usize - COLLECTOR_SHARDS.trailing_zeros() as usize;

/// The result of a `WriteBatch::finish` operation.
pub(crate) struct FinishResult {
    pub(crate) sequence_number: u32,
    /// Tuple of (sequence number, file).
    pub(crate) new_meta_files: Vec<(u32, File)>,
    /// Tuple of (sequence number, file).
    pub(crate) new_sst_files: Vec<(u32, File)>,
    /// Tuple of (sequence number, file).
    pub(crate) new_blob_files: Vec<(u32, File)>,
    /// Number of keys written in this batch.
    pub(crate) keys_written: u64,
}

enum GlobalCollectorState<K: StoreKey + Send> {
    /// Initial state. Single collector. Once the collector is full, we switch to sharded mode.
    Unsharded(Collector<K>),
    /// Sharded mode.
    /// We use multiple collectors, and select one based on the first bits of the key hash.
    Sharded([Collector<K>; COLLECTOR_SHARDS]),
}

/// A write batch.
pub struct WriteBatch<'db, K: StoreKey + Send, S: ParallelScheduler, const FAMILIES: usize> {
    /// RAII guard that releases the write-operation slot (and rolls back on failure) on drop.
    _write_guard: WriteOperationGuard<'db>,
    /// Parallel scheduler
    parallel_scheduler: S,
    /// The database path
    db_path: PathBuf,
    /// Per-family configuration (kind: SingleValue/MultiValue).
    #[cfg_attr(not(feature = "verify_sst_content"), allow(dead_code))]
    family_configs: [FamilyConfig; FAMILIES],
    /// The current sequence number counter. Increased for every new SST file or blob file.
    current_sequence_number: AtomicU32,
    /// The thread local state.
    thread_locals: ThreadLocal<SyncUnsafeCell<ThreadLocalState<K, FAMILIES>>>,
    /// Collectors in use. The thread local collectors flush into these when they are full.
    collectors: [Mutex<GlobalCollectorState<K>>; FAMILIES],
    /// Meta file builders for each family.
    meta_collectors: [Mutex<Vec<(u32, StaticSortedFileBuilderMeta<'static>)>>; FAMILIES],
    /// The list of new SST files that have been created.
    /// Tuple of (sequence number, file).
    new_sst_files: Mutex<Vec<(u32, File)>>,
}

impl<'db, K: StoreKey + Send + Sync, S: ParallelScheduler, const FAMILIES: usize>
    WriteBatch<'db, K, S, FAMILIES>
{
    /// Creates a new write batch for a database with per-family configuration.
    pub(crate) fn new(
        write_guard: WriteOperationGuard<'db>,
        path: PathBuf,
        current: u32,
        parallel_scheduler: S,
        family_configs: [FamilyConfig; FAMILIES],
    ) -> Self {
        const {
            assert!(FAMILIES <= usize_from_u32(u32::MAX));
        };
        Self {
            _write_guard: write_guard,
            parallel_scheduler,
            db_path: path,
            family_configs,
            current_sequence_number: AtomicU32::new(current),
            thread_locals: ThreadLocal::new(),
            collectors: [(); FAMILIES]
                .map(|_| Mutex::new(GlobalCollectorState::Unsharded(Collector::new()))),
            meta_collectors: [(); FAMILIES].map(|_| Mutex::new(Vec::new())),
            new_sst_files: Mutex::new(Vec::new()),
        }
    }

    /// Marks the write operation as successfully completed.
    ///
    /// Must be called before dropping the `WriteBatch` to skip the rollback in the guard's `Drop`
    /// impl. Typically called by `TurboPersistence::commit_write_batch` after a successful commit.
    pub(crate) fn mark_succeeded(&mut self) {
        self._write_guard.success();
    }

    /// Returns the thread local state for the current thread.
    #[allow(clippy::mut_from_ref)]
    fn thread_local_state(&self) -> &mut ThreadLocalState<K, FAMILIES> {
        let cell = self.thread_locals.get_or(|| {
            SyncUnsafeCell::new(ThreadLocalState {
                collectors: [const { None }; FAMILIES],
                new_blob_files: Vec::new(),
            })
        });
        // Safety: We know that the cell is only accessed from the current thread.
        unsafe { &mut *cell.get() }
    }

    /// Returns the collector for a family for the current thread.
    fn thread_local_collector_mut<'l>(
        &self,
        state: &'l mut ThreadLocalState<K, FAMILIES>,
        family: u32,
    ) -> Result<&'l mut Collector<K, THREAD_LOCAL_SIZE_SHIFT>> {
        debug_assert!(usize_from_u32(family) < FAMILIES);
        let collector =
            state.collectors[usize_from_u32(family)].get_or_insert_with(|| Collector::new());
        if collector.is_full() {
            self.flush_thread_local_collector(family, collector)?;
        }
        Ok(collector)
    }

    #[tracing::instrument(level = "trace", skip(self, collector), fields(family_name = self.family_configs[usize_from_u32(family)].name))]
    fn flush_thread_local_collector(
        &self,
        family: u32,
        collector: &mut Collector<K, THREAD_LOCAL_SIZE_SHIFT>,
    ) -> Result<()> {
        let mut full_collectors = SmallVec::<[_; 2]>::new();
        {
            let mut global_collector_state = self.collectors[usize_from_u32(family)].lock();
            for entry in collector.drain() {
                match &mut *global_collector_state {
                    GlobalCollectorState::Unsharded(collector) => {
                        collector.add_entry(entry);
                        if collector.is_full() {
                            // When full, split the entries into shards.
                            let mut shards: [Collector<K>; 4] =
                                [(); COLLECTOR_SHARDS].map(|_| Collector::new());
                            for entry in collector.drain() {
                                let shard = (entry.key.hash >> COLLECTOR_SHARD_SHIFT) as usize;
                                shards[shard].add_entry(entry);
                            }
                            // There is a rare edge case where all entries are in the same shard,
                            // and the collector is full after the split.
                            for collector in shards.iter_mut() {
                                if collector.is_full() {
                                    full_collectors
                                        .push(replace(&mut *collector, Collector::new()));
                                }
                            }
                            *global_collector_state = GlobalCollectorState::Sharded(shards);
                        }
                    }
                    GlobalCollectorState::Sharded(shards) => {
                        let shard = (entry.key.hash >> COLLECTOR_SHARD_SHIFT) as usize;
                        let collector = &mut shards[shard];
                        collector.add_entry(entry);
                        if collector.is_full() {
                            full_collectors.push(replace(&mut *collector, Collector::new()));
                        }
                    }
                }
            }
        }
        // After flushing write all the full global collectors to disk.
        // TODO: This can distribute work unfairly
        // * a thread could fill up multiple global collectors and then get stuck writing them all
        //   out, if multiple threads could work on it we could take care of spare IO parallism
        // * we can also have too much IO parallism with many threads concurrently writing files.
        //
        // Ideally we would limit the amount of data buffered in memory and control the amount of IO
        // parallism.  Consider:
        // * store full-buffers as a field on WireBatch (queued writes)
        // * each thread will attempt to poll and flush a full buffer after flushing its local
        //   buffer.
        // This will distribute the writing work more fairly, but now we have the problem of to
        // many concurrent writes contending for filesystem locks.  So we could also use a semaphore
        // to restrict how many concurrent writes occur.  But then we would accumulate 'fullBuffers'
        // leading to too much memory consumption.  So really we also need to slow down the threads
        // submitting work data.  To do this we could simply use a tokio semaphore and make all
        // these operations async, or we could integrate with the parallel::map operation that is
        // driving the work to slow down task submission in this case.
        for mut global_collector in full_collectors {
            // When the global collector is full, we create a new SST file.
            let sst = self.create_sst_file(
                family,
                global_collector.sorted(self.family_configs[usize_from_u32(family)].kind),
            )?;
            self.new_sst_files.lock().push(sst);
            drop(global_collector);
        }
        Ok(())
    }

    /// Puts a key-value pair into the write batch.
    pub fn put(&self, family: u32, key: K, value: ValueBuffer<'_>) -> Result<()> {
        let state = self.thread_local_state();
        let collector = self.thread_local_collector_mut(state, family)?;
        if value.len() <= MAX_MEDIUM_VALUE_SIZE {
            collector.put(key, value);
        } else {
            let (blob, file) = self.create_blob(&value)?;
            collector.put_blob(key, blob);
            state.new_blob_files.push((blob, file));
        }
        Ok(())
    }

    /// Puts a delete operation into the write batch.
    pub fn delete(&self, family: u32, key: K) -> Result<()> {
        let state = self.thread_local_state();
        let collector = self.thread_local_collector_mut(state, family)?;
        collector.delete(key);
        Ok(())
    }

    /// Flushes a family of the write batch, reducing the amount of buffered memory used.
    /// Does not commit any data persistently.
    ///
    /// # Safety
    ///
    /// Caller must ensure that no concurrent put or delete operation is happening on the flushed
    /// family.
    #[tracing::instrument(level = "trace", skip(self), fields(family_name = self.family_configs[usize_from_u32(family)].name))]
    pub unsafe fn flush(&self, family: u32) -> Result<()> {
        // Flush the thread local collectors to the global collector.
        let mut collectors = Vec::new();
        for cell in self.thread_locals.iter() {
            let state = unsafe { &mut *cell.get() };
            if let Some(collector) = state.collectors[usize_from_u32(family)].take()
                && !collector.is_empty()
            {
                collectors.push(collector);
            }
        }

        self.parallel_scheduler
            .try_parallel_for_each_owned(collectors, |mut collector| {
                self.flush_thread_local_collector(family, &mut collector)?;
                drop(collector);
                anyhow::Ok(())
            })?;

        // Now we flush the global collector(s).
        let mut collector_state = self.collectors[usize_from_u32(family)].lock();
        match &mut *collector_state {
            GlobalCollectorState::Unsharded(collector) => {
                if !collector.is_empty() {
                    let sst = self.create_sst_file(
                        family,
                        collector.sorted(self.family_configs[usize_from_u32(family)].kind),
                    )?;
                    collector.clear();
                    self.new_sst_files.lock().push(sst);
                }
            }
            GlobalCollectorState::Sharded(_) => {
                let GlobalCollectorState::Sharded(mut shards) = replace(
                    &mut *collector_state,
                    GlobalCollectorState::Unsharded(Collector::new()),
                ) else {
                    unreachable!();
                };
                self.parallel_scheduler
                    .try_parallel_for_each_mut(&mut shards, |collector| {
                        if !collector.is_empty() {
                            let sst = self.create_sst_file(
                                family,
                                collector.sorted(self.family_configs[usize_from_u32(family)].kind),
                            )?;
                            collector.clear();
                            self.new_sst_files.lock().push(sst);
                            collector.drop_contents();
                        }
                        anyhow::Ok(())
                    })?;
            }
        }

        Ok(())
    }

    /// Finishes the write batch by returning the new sequence number and the new SST files. This
    /// writes all outstanding thread local data to disk.
    #[tracing::instrument(level = "trace", skip_all)]
    pub(crate) fn finish(
        &mut self,
        get_accessed_key_hashes: impl Fn(u32) -> qfilter::Filter + Send + Sync,
    ) -> Result<FinishResult> {
        let mut new_blob_files = Vec::new();

        // First, we flush all thread local collectors to the global collectors.
        {
            let _span = tracing::trace_span!("flush thread local collectors").entered();
            let mut collectors = [const { Vec::new() }; FAMILIES];
            for cell in self.thread_locals.iter_mut() {
                let state = cell.get_mut();
                new_blob_files.append(&mut state.new_blob_files);
                for (family, thread_local_collector) in state.collectors.iter_mut().enumerate() {
                    if let Some(collector) = thread_local_collector.take()
                        && !collector.is_empty()
                    {
                        collectors[family].push(collector);
                    }
                }
            }
            let to_flush = collectors
                .into_iter()
                .enumerate()
                .flat_map(|(family, collector)| {
                    collector
                        .into_iter()
                        .map(move |collector| (family as u32, collector))
                })
                .collect::<Vec<_>>();
            self.parallel_scheduler.try_parallel_for_each_owned(
                to_flush,
                |(family, mut collector)| {
                    self.flush_thread_local_collector(family, &mut collector)?;
                    drop(collector);
                    anyhow::Ok(())
                },
            )?;
        }

        let _span = tracing::trace_span!("flush collectors").entered();

        // Now we reduce the global collectors in parallel
        let mut new_sst_files = take(self.new_sst_files.get_mut());
        let shared_new_sst_files = Mutex::new(&mut new_sst_files);

        let new_collectors =
            [(); FAMILIES].map(|_| Mutex::new(GlobalCollectorState::Unsharded(Collector::new())));
        let collectors = replace(&mut self.collectors, new_collectors);
        let collectors = collectors
            .into_iter()
            .enumerate()
            .flat_map(|(family, state)| {
                let collector = state.into_inner();
                match collector {
                    GlobalCollectorState::Unsharded(collector) => {
                        Either::Left([(family, collector)].into_iter())
                    }
                    GlobalCollectorState::Sharded(shards) => {
                        Either::Right(shards.into_iter().map(move |collector| (family, collector)))
                    }
                }
            })
            .collect::<Vec<_>>();
        self.parallel_scheduler.try_parallel_for_each_owned(
            collectors,
            |(family, mut collector)| {
                let family = family as u32;
                if !collector.is_empty() {
                    let sst = self.create_sst_file(
                        family,
                        collector.sorted(self.family_configs[usize_from_u32(family)].kind),
                    )?;
                    collector.clear();
                    drop(collector);
                    shared_new_sst_files.lock().push(sst);
                }
                anyhow::Ok(())
            },
        )?;

        // Now we need to write the new meta files.
        let new_meta_collectors = [(); FAMILIES].map(|_| Mutex::new(Vec::new()));
        let meta_collectors = replace(&mut self.meta_collectors, new_meta_collectors);
        let keys_written = AtomicU64::new(0);
        let file_to_write = meta_collectors
            .into_iter()
            .map(|mutex| mutex.into_inner())
            .enumerate()
            .filter(|(_, sst_files)| !sst_files.is_empty())
            .collect::<Vec<_>>();
        let new_meta_files = self
            .parallel_scheduler
            .parallel_map_collect_owned::<_, _, Result<Vec<_>>>(
                file_to_write,
                |(family, sst_files)| {
                    let family = family as u32;
                    let mut entries = 0;
                    let mut builder = MetaFileBuilder::new(family);
                    for (seq, sst) in sst_files {
                        entries += sst.entries;
                        builder.add(seq, sst);
                    }
                    keys_written.fetch_add(entries, Ordering::Relaxed);
                    let accessed_key_hashes = get_accessed_key_hashes(family);
                    builder.set_used_key_hashes_amqf(accessed_key_hashes);
                    let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
                    let file = builder.write(&self.db_path, seq)?;
                    Ok((seq, file))
                },
            )?;

        // Finally we return the new files and sequence number.
        let seq = self.current_sequence_number.load(Ordering::SeqCst);
        Ok(FinishResult {
            sequence_number: seq,
            new_meta_files,
            new_sst_files,
            new_blob_files,
            keys_written: keys_written.into_inner(),
        })
    }

    /// Creates a new blob file with the given value.
    /// Returns a tuple of (sequence number, file).
    #[tracing::instrument(level = "trace", skip(self, value), fields(value_len = value.len()))]
    fn create_blob(&self, value: &[u8]) -> Result<(u32, File)> {
        let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
        let mut compressed = Vec::new();
        compress_into_buffer(value, &mut compressed)
            .context("Compression of value for blob file failed")?;

        let mut buffer = Vec::with_capacity(8 + compressed.len());
        buffer.write_u32::<BE>(value.len() as u32)?;
        buffer.write_u32::<BE>(checksum_block(&compressed))?;
        buffer.extend_from_slice(&compressed);

        let file = self.db_path.join(format!("{seq:08}.blob"));
        let mut file = File::create(&file).context("Unable to create blob file")?;
        file.write_all(&buffer)
            .context("Unable to write blob file")?;
        file.flush().context("Unable to flush blob file")?;
        Ok((seq, file))
    }

    /// Creates a new SST file with the given collector data.
    /// Returns a tuple of (sequence number, file).
    #[tracing::instrument(level = "trace", skip(self, collector_data), fields(family_name = self.family_configs[usize_from_u32(family)].name))]
    fn create_sst_file(
        &self,
        family: u32,
        collector_data: (&[CollectorEntry<K>], usize),
    ) -> Result<(u32, File)> {
        let (entries, _total_key_size) = collector_data;
        let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1;

        let path = self.db_path.join(format!("{seq:08}.sst"));
        let (meta, file) = self
            .parallel_scheduler
            .block_in_place(|| write_static_stored_file(entries, &path, MetaEntryFlags::FRESH))
            .with_context(|| format!("Unable to write SST file {seq:08}.sst"))?;

        #[cfg(feature = "verify_sst_content")]
        {
            use core::panic;

            use crate::{
                collector_entry::CollectorEntryValue,
                key::hash_key,
                lookup_entry::LookupValue,
                static_sorted_file::{
                    BlockCache, SstLookupResult, StaticSortedFile, StaticSortedFileMetaData,
                },
                static_sorted_file_builder::Entry,
            };

            file.sync_all()?;
            let sst = StaticSortedFile::open(
                &self.db_path,
                StaticSortedFileMetaData {
                    sequence_number: seq,
                    block_count: meta.block_count,
                },
            )?;
            let cache2 = BlockCache::with(
                10,
                u64::MAX,
                Default::default(),
                Default::default(),
                Default::default(),
            );
            let cache3 = BlockCache::with(
                10,
                u64::MAX,
                Default::default(),
                Default::default(),
                Default::default(),
            );
            let mut key_buf = Vec::new();
            let family_config = self.family_configs[usize_from_u32(family)].kind;
            for entry in entries {
                entry.write_key_to(&mut key_buf);
                let result = sst
                    .lookup::<_, true>(hash_key(&key_buf), &key_buf, &cache2, &cache3)
                    .expect("key found");
                key_buf.clear();
                match result {
                    SstLookupResult::Found(values) => {
                        if values.len() > 1 {
                            use crate::FamilyKind;

                            assert!(
                                values.len() == 1 || family_config == FamilyKind::MultiValue,
                                "only multi-value tables can have more than one value, got {} \
                                 values",
                                values.len()
                            )
                        }
                        match &entry.value {
                            CollectorEntryValue::Large { blob } => {
                                assert!(
                                    values.contains(&LookupValue::Blob {
                                        sequence_number: *blob
                                    }),
                                    "we wrote a blob but did not read it"
                                );
                            }
                            CollectorEntryValue::Deleted => assert!(
                                values.first() == Some(&LookupValue::Deleted),
                                "we wrote a deleted tombstone but it was not first in results"
                            ),
                            v => {
                                assert!(
                                    values.into_iter().any(|lv| {
                                        if let LookupValue::Slice { value } = lv {
                                            &*value == v.as_bytes().unwrap()
                                        } else {
                                            false
                                        }
                                    }),
                                    "we wrote a slice of bytes but did not read it"
                                )
                            }
                        }
                    }
                    SstLookupResult::NotFound => panic!("All keys must exist"),
                }
            }
        }

        self.meta_collectors[usize_from_u32(family)]
            .lock()
            .push((seq, meta));

        Ok((seq, file))
    }
}

#[inline(always)]
const fn usize_from_u32(value: u32) -> usize {
    // This should always be true, as we assume at least a 32-bit width architecture for Turbopack.
    // Since this is a const expression, we expect it to be compiled away.
    const {
        assert!(u32::BITS < usize::BITS);
    };
    value as usize
}
Quest for Codev2.0.0
/
SIGN IN