next.js/turbopack/crates/turbo-persistence/src/tests.rs
tests.rs2195 lines62.6 KB
use std::{fs, path::Path, time::Instant};

use anyhow::Result;
use rayon::iter::{IntoParallelIterator, ParallelIterator};

use crate::{
    DbConfig, FamilyConfig, FamilyKind,
    constants::{MAX_MEDIUM_VALUE_SIZE, MAX_SMALL_VALUE_SIZE},
    db::{CompactConfig, TurboPersistence},
    parallel_scheduler::ParallelScheduler,
    write_batch::WriteBatch,
};

#[derive(Clone, Copy)]
struct RayonParallelScheduler;

impl ParallelScheduler for RayonParallelScheduler {
    fn block_in_place<R>(&self, f: impl FnOnce() -> R + Send) -> R
    where
        R: Send,
    {
        f()
    }

    fn parallel_for_each<T>(&self, items: &[T], f: impl Fn(&T) + Send + Sync)
    where
        T: Sync,
    {
        items.into_par_iter().for_each(f);
    }

    fn try_parallel_for_each<'l, T, E>(
        &self,
        items: &'l [T],
        f: impl (Fn(&'l T) -> Result<(), E>) + Send + Sync,
    ) -> Result<(), E>
    where
        T: Sync,
        E: Send,
    {
        items.into_par_iter().try_for_each(f)
    }

    fn try_parallel_for_each_mut<'l, T, E>(
        &self,
        items: &'l mut [T],
        f: impl (Fn(&'l mut T) -> Result<(), E>) + Send + Sync,
    ) -> Result<(), E>
    where
        T: Send + Sync,
        E: Send,
    {
        items.into_par_iter().try_for_each(f)
    }

    fn try_parallel_for_each_owned<T, E>(
        &self,
        items: Vec<T>,
        f: impl (Fn(T) -> Result<(), E>) + Send + Sync,
    ) -> Result<(), E>
    where
        T: Send + Sync,
        E: Send,
    {
        items.into_par_iter().try_for_each(f)
    }

    fn parallel_map_collect<'l, Item, PerItemResult, Result>(
        &self,
        items: &'l [Item],
        f: impl Fn(&'l Item) -> PerItemResult + Send + Sync,
    ) -> Result
    where
        Item: Sync,
        PerItemResult: Send + Sync,
        Result: FromIterator<PerItemResult>,
    {
        items
            .into_par_iter()
            .map(f)
            .collect_vec_list()
            .into_iter()
            .flatten()
            .collect()
    }

    fn parallel_map_collect_owned<Item, PerItemResult, Result>(
        &self,
        items: Vec<Item>,
        f: impl Fn(Item) -> PerItemResult + Send + Sync,
    ) -> Result
    where
        Item: Send + Sync,
        PerItemResult: Send + Sync,
        Result: FromIterator<PerItemResult>,
    {
        items
            .into_par_iter()
            .map(f)
            .collect_vec_list()
            .into_iter()
            .flatten()
            .collect()
    }
}

#[test]
fn full_cycle() -> Result<()> {
    let mut test_cases = Vec::new();
    type TestCases = Vec<(
        &'static str,
        Box<dyn Fn(&mut WriteBatch<Vec<u8>, RayonParallelScheduler, 16>) -> Result<()>>,
        Box<dyn Fn(&TurboPersistence<RayonParallelScheduler, 16>) -> Result<()>>,
    )>;

    fn test_case(
        test_cases: &mut TestCases,
        name: &'static str,
        write: impl Fn(&mut WriteBatch<Vec<u8>, RayonParallelScheduler, 16>) -> Result<()> + 'static,
        read: impl Fn(&TurboPersistence<RayonParallelScheduler, 16>) -> Result<()> + 'static,
    ) {
        test_cases.push((
            name,
            Box::new(write)
                as Box<dyn Fn(&mut WriteBatch<Vec<u8>, RayonParallelScheduler, 16>) -> Result<()>>,
            Box::new(read)
                as Box<dyn Fn(&TurboPersistence<RayonParallelScheduler, 16>) -> Result<()>>,
        ));
    }

    test_case(
        &mut test_cases,
        "Simple",
        |batch| {
            for i in 10..100u8 {
                batch.put(0, vec![1, i], vec![i].into())?;
            }
            Ok(())
        },
        |db| {
            let Some(value) = db.get(0, &[1, 42u8])? else {
                panic!("Value not found");
            };
            assert_eq!(&*value, &[42]);
            assert_eq!(db.get(0, &[1, 42u8, 42])?, None);
            assert_eq!(db.get(0, &[1, 1u8])?, None);
            assert_eq!(db.get(0, &[1, 255u8])?, None);
            Ok(())
        },
    );

    test_case(
        &mut test_cases,
        "Many SST files",
        |batch| {
            for i in 10..100u8 {
                batch.put(0, vec![2, i], vec![i].into())?;
                unsafe { batch.flush(0)? };
            }
            Ok(())
        },
        |db| {
            let Some(value) = db.get(0, &[2, 42u8])? else {
                panic!("Value not found");
            };
            assert_eq!(&*value, &[42]);
            assert_eq!(db.get(0, &[2, 42u8, 42])?, None);
            assert_eq!(db.get(0, &[2, 1u8])?, None);
            assert_eq!(db.get(0, &[2, 255u8])?, None);
            Ok(())
        },
    );

    test_case(
        &mut test_cases,
        "Families",
        |batch| {
            for i in 0..16u8 {
                batch.put(u32::from(i), vec![i], vec![i].into())?;
            }
            Ok(())
        },
        |db| {
            let Some(value) = db.get(8, &[8u8])? else {
                panic!("Value not found");
            };
            assert_eq!(&*value, &[8]);
            assert!(db.get(8, &[8u8, 8])?.is_none());
            assert!(db.get(8, &[0u8])?.is_none());
            assert!(db.get(8, &[255u8])?.is_none());
            Ok(())
        },
    );

    test_case(
        &mut test_cases,
        "Medium keys and values",
        |batch| {
            for i in 0..200u8 {
                let mut key = vec![3u8];
                key.extend(vec![i; 10 * 1024]);
                batch.put(0, key, vec![i; 100 * 1024].into())?;
            }
            Ok(())
        },
        |db| {
            for i in 0..200u8 {
                let mut key = vec![3u8];
                key.extend(vec![i; 10 * 1024]);
                let Some(value) = db.get(0, &key)? else {
                    panic!("Value not found");
                };
                assert_eq!(&*value, &vec![i; 100 * 1024]);
            }
            Ok(())
        },
    );

    const BLOB_SIZE: usize = 65 * 1024 * 1024;
    #[expect(clippy::assertions_on_constants)]
    {
        assert!(BLOB_SIZE > MAX_MEDIUM_VALUE_SIZE);
    }
    test_case(
        &mut test_cases,
        "Large keys and values (blob files)",
        |batch| {
            for i in 0..2u8 {
                let mut key = vec![4u8];
                key.extend(vec![i; BLOB_SIZE]);
                batch.put(0, key, vec![i; BLOB_SIZE].into())?;
            }
            Ok(())
        },
        |db| {
            for i in 0..2u8 {
                let mut key = vec![4u8];
                key.extend(vec![i; BLOB_SIZE]);
                let value_expected = vec![i; BLOB_SIZE];
                let Some(value) = db.get(0, &key)? else {
                    panic!("Value not found");
                };
                assert_eq!(&*value, &value_expected);
            }
            Ok(())
        },
    );

    fn different_sizes_range() -> impl Iterator<Item = u8> {
        (10..20).map(|value| value * 10)
    }
    test_case(
        &mut test_cases,
        "Different sizes keys and values",
        |batch| {
            for i in different_sizes_range() {
                let mut key = vec![5u8];
                key.extend(vec![i; i as usize]);
                batch.put(0, key, vec![i; i as usize].into())?;
            }
            Ok(())
        },
        |db| {
            for i in different_sizes_range() {
                let mut key = vec![5u8];
                key.extend(vec![i; i as usize]);
                let Some(value) = db.get(0, &key)? else {
                    panic!("Value not found");
                };
                assert_eq!(&*value, &vec![i; i as usize]);
            }
            Ok(())
        },
    );

    test_case(
        &mut test_cases,
        "Many items (1% read)",
        |batch| {
            for i in 0..1000 * 1024u32 {
                let key = [&[6u8], &i.to_be_bytes()[..]].concat();
                batch.put(0, key, i.to_be_bytes().to_vec().into())?;
            }
            Ok(())
        },
        |db| {
            for i in 0..10 * 1024u32 {
                let i = i * 100;
                let key = [&[6u8], &i.to_be_bytes()[..]].concat();
                let Some(value) = db.get(0, &key)? else {
                    panic!("Value not found");
                };
                assert_eq!(&*value, &i.to_be_bytes());
            }
            Ok(())
        },
    );

    test_case(
        &mut test_cases,
        "Many items (1% read, multi-threaded)",
        |batch| {
            (0..10 * 1024 * 1024u32).into_par_iter().for_each(|i| {
                let key = [&[7u8], &i.to_be_bytes()[..]].concat();
                batch.put(0, key, i.to_be_bytes().to_vec().into()).unwrap();
            });
            Ok(())
        },
        |db| {
            (0..100 * 1024u32).into_par_iter().for_each(|i| {
                let i = i * 100;
                let key = [&[7u8], &i.to_be_bytes()[..]].concat();
                let Some(value) = db.get(0, &key).unwrap() else {
                    panic!("Value not found");
                };
                assert_eq!(&*value, &i.to_be_bytes());
            });
            Ok(())
        },
    );

    // Run each test case standalone
    for (name, write, read) in test_cases.iter() {
        let tempdir = tempfile::tempdir()?;
        let path = tempdir.path();

        {
            let start = Instant::now();
            let db = TurboPersistence::open_with_parallel_scheduler(
                path.to_path_buf(),
                RayonParallelScheduler,
            )?;
            let mut batch = db.write_batch()?;
            write(&mut batch)?;
            db.commit_write_batch(batch)?;
            println!("{name} write time: {:?}", start.elapsed());

            let start = Instant::now();
            read(&db)?;
            println!("{name} read time: {:?}", start.elapsed());

            let start = Instant::now();
            drop(db);
            println!("{name} drop time: {:?}", start.elapsed());
        }
        {
            let start = Instant::now();
            let db = TurboPersistence::open_with_parallel_scheduler(
                path.to_path_buf(),
                RayonParallelScheduler,
            )?;
            println!("{name} restore time: {:?}", start.elapsed());
            let start = Instant::now();
            read(&db)?;
            println!("{name} read time after restore: {:?}", start.elapsed());
            let start = Instant::now();
            read(&db)?;
            println!("{name} read time after read: {:?}", start.elapsed());

            #[cfg(feature = "stats")]
            println!("{name} stats: {:#?}", db.statistics());

            let start = Instant::now();
            db.full_compact()?;
            println!("{name} compact time: {:?}", start.elapsed());

            let start = Instant::now();
            read(&db)?;
            println!("{name} read time after compact: {:?}", start.elapsed());

            let start = Instant::now();
            drop(db);
            println!("{name} drop time after compact: {:?}", start.elapsed());
        }
        {
            let start = Instant::now();
            let db = TurboPersistence::open_with_parallel_scheduler(
                path.to_path_buf(),
                RayonParallelScheduler,
            )?;
            println!("{name} restore time after compact: {:?}", start.elapsed());
            let start = Instant::now();
            read(&db)?;
            println!(
                "{name} read time after compact + restore: {:?}",
                start.elapsed()
            );
            let start = Instant::now();
            read(&db)?;
            println!(
                "{name} read time after compact + restore + read: {:?}",
                start.elapsed()
            );

            #[cfg(feature = "stats")]
            println!("{name} stats (compacted): {:#?}", db.statistics());

            let start = Instant::now();
            drop(db);
            println!(
                "{name} drop time after compact + restore: {:?}",
                start.elapsed()
            );
        }
    }

    // Run all test cases in a single db
    {
        let tempdir = tempfile::tempdir()?;
        let path = tempdir.path();

        {
            let start = Instant::now();
            let db = TurboPersistence::open_with_parallel_scheduler(
                path.to_path_buf(),
                RayonParallelScheduler,
            )?;
            let mut batch = db.write_batch()?;
            for (_, write, _) in test_cases.iter() {
                write(&mut batch)?;
            }
            db.commit_write_batch(batch)?;
            println!("All write time: {:?}", start.elapsed());

            for (name, _, read) in test_cases.iter() {
                let start = Instant::now();
                read(&db)?;
                println!("{name} read time: {:?}", start.elapsed());
            }

            let start = Instant::now();
            drop(db);
            println!("All drop time: {:?}", start.elapsed());
        }
        {
            let start = Instant::now();
            let db = TurboPersistence::open_with_parallel_scheduler(
                path.to_path_buf(),
                RayonParallelScheduler,
            )?;
            println!("All restore time: {:?}", start.elapsed());
            for (name, _, read) in test_cases.iter() {
                let start = Instant::now();
                read(&db)?;
                println!("{name} read time after restore: {:?}", start.elapsed());
            }
            for (name, _, read) in test_cases.iter() {
                let start = Instant::now();
                read(&db)?;
                println!("{name} read time after read: {:?}", start.elapsed());
            }
            #[cfg(feature = "stats")]
            println!("All stats: {:#?}", db.statistics());

            let start = Instant::now();
            db.full_compact()?;
            println!("All compact time: {:?}", start.elapsed());

            for (name, _, read) in test_cases.iter() {
                let start = Instant::now();
                read(&db)?;
                println!("{name} read time after compact: {:?}", start.elapsed());
            }

            let start = Instant::now();
            drop(db);
            println!("All drop time after compact: {:?}", start.elapsed());
        }

        {
            let start = Instant::now();
            let db = TurboPersistence::open_with_parallel_scheduler(
                path.to_path_buf(),
                RayonParallelScheduler,
            )?;
            println!("All restore time after compact: {:?}", start.elapsed());

            for (name, _, read) in test_cases.iter() {
                let start = Instant::now();
                read(&db)?;
                println!(
                    "{name} read time after compact + restore: {:?}",
                    start.elapsed()
                );
            }
            for (name, _, read) in test_cases.iter() {
                let start = Instant::now();
                read(&db)?;
                println!(
                    "{name} read time after compact + restore + read: {:?}",
                    start.elapsed()
                );
            }

            #[cfg(feature = "stats")]
            println!("All stats (compacted): {:#?}", db.statistics());

            let start = Instant::now();
            drop(db);
            println!(
                "All drop time after compact + restore: {:?}",
                start.elapsed()
            );
        }
    }
    Ok(())
}

#[test]
fn persist_changes() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    const READ_COUNT: u32 = 2_000; // we'll read every 10th value, so writes are 10x this value
    fn put(
        b: &WriteBatch<(u8, [u8; 4]), RayonParallelScheduler, 1>,
        key: u8,
        value: u8,
    ) -> Result<()> {
        for i in 0..(READ_COUNT * 10) {
            b.put(0, (key, i.to_be_bytes()), vec![value].into())?;
        }
        Ok(())
    }
    fn check(db: &TurboPersistence<RayonParallelScheduler, 1>, key: u8, value: u8) -> Result<()> {
        for i in 0..READ_COUNT {
            // read every 10th item
            let i = i * 10;
            assert_eq!(
                db.get(0, &(key, i.to_be_bytes()))?.as_deref(),
                Some(&[value][..]),
            );
        }
        Ok(())
    }

    {
        let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler(
            path.to_path_buf(),
            RayonParallelScheduler,
        )?;
        let b = db.write_batch()?;
        put(&b, 1, 11)?;
        put(&b, 2, 21)?;
        put(&b, 3, 31)?;
        db.commit_write_batch(b)?;

        check(&db, 1, 11)?;
        check(&db, 2, 21)?;
        check(&db, 3, 31)?;

        db.shutdown()?;
    }

    println!("---");
    {
        let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler(
            path.to_path_buf(),
            RayonParallelScheduler,
        )?;
        let b = db.write_batch()?;
        put(&b, 1, 12)?;
        put(&b, 2, 22)?;
        db.commit_write_batch(b)?;

        check(&db, 1, 12)?;
        check(&db, 2, 22)?;
        check(&db, 3, 31)?;

        db.shutdown()?;
    }

    {
        let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler(
            path.to_path_buf(),
            RayonParallelScheduler,
        )?;
        let b = db.write_batch()?;
        put(&b, 1, 13)?;
        db.commit_write_batch(b)?;

        check(&db, 1, 13)?;
        check(&db, 2, 22)?;
        check(&db, 3, 31)?;

        db.shutdown()?;
    }

    println!("---");
    {
        let db = TurboPersistence::open_with_parallel_scheduler(
            path.to_path_buf(),
            RayonParallelScheduler,
        )?;

        check(&db, 1, 13)?;
        check(&db, 2, 22)?;
        check(&db, 3, 31)?;

        db.shutdown()?;
    }

    println!("---");
    {
        let db = TurboPersistence::open_with_parallel_scheduler(
            path.to_path_buf(),
            RayonParallelScheduler,
        )?;

        db.compact(&CompactConfig {
            optimal_merge_count: 4,
            min_merge_duplication_bytes: 1,
            optimal_merge_duplication_bytes: 1,
            ..Default::default()
        })?;

        check(&db, 1, 13)?;
        check(&db, 2, 22)?;
        check(&db, 3, 31)?;

        db.shutdown()?;
    }

    println!("---");
    {
        let db = TurboPersistence::open_with_parallel_scheduler(
            path.to_path_buf(),
            RayonParallelScheduler,
        )?;

        check(&db, 1, 13)?;
        check(&db, 2, 22)?;
        check(&db, 3, 31)?;

        db.shutdown()?;
    }

    Ok(())
}

#[test]
fn partial_compaction() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    const READ_COUNT: u32 = 2_000; // we'll read every 10th value, so writes are 10x this value
    fn put(
        b: &WriteBatch<(u8, [u8; 4]), RayonParallelScheduler, 1>,
        key: u8,
        value: u8,
    ) -> Result<()> {
        for i in 0..(READ_COUNT * 10) {
            b.put(0, (key, i.to_be_bytes()), vec![value].into())?;
        }
        Ok(())
    }
    fn check(db: &TurboPersistence<RayonParallelScheduler, 1>, key: u8, value: u8) -> Result<()> {
        for i in 0..READ_COUNT {
            // read every 10th item
            let i = i * 10;
            assert_eq!(
                db.get(0, &(key, i.to_be_bytes()))?.as_deref(),
                Some(&[value][..]),
                "Key {key} {i} expected {value}"
            );
        }
        Ok(())
    }

    for i in 0..50 {
        println!("--- Iteration {i} ---");
        println!("Add more entries");
        {
            let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler(
                path.to_path_buf(),
                RayonParallelScheduler,
            )?;
            let b = db.write_batch()?;
            put(&b, i, i)?;
            put(&b, i + 1, i)?;
            put(&b, i + 2, i)?;
            db.commit_write_batch(b)?;

            for j in 0..i {
                check(&db, j, j)?;
            }
            check(&db, i, i)?;
            check(&db, i + 1, i)?;
            check(&db, i + 2, i)?;

            db.shutdown()?;
        }

        println!("Compaction");
        {
            let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler(
                path.to_path_buf(),
                RayonParallelScheduler,
            )?;

            db.compact(&CompactConfig {
                optimal_merge_count: 4,
                min_merge_duplication_bytes: 1,
                optimal_merge_duplication_bytes: 1,
                ..Default::default()
            })?;

            for j in 0..i {
                check(&db, j, j)?;
            }
            check(&db, i, i)?;
            check(&db, i + 1, i)?;
            check(&db, i + 2, i)?;

            db.shutdown()?;
        }

        println!("Restore check");
        {
            let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler(
                path.to_path_buf(),
                RayonParallelScheduler,
            )?;

            for j in 0..i {
                check(&db, j, j)?;
            }
            check(&db, i, i)?;
            check(&db, i + 1, i)?;
            check(&db, i + 2, i)?;

            db.shutdown()?;
        }
    }

    Ok(())
}

#[test]
fn merge_file_removal() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let _ = fs::remove_dir_all(path);

    const READ_COUNT: u32 = 2_000; // we'll read every 10th value, so writes are 10x this value
    fn put(
        b: &WriteBatch<(u8, [u8; 4]), RayonParallelScheduler, 1>,
        key: u8,
        value: u32,
    ) -> Result<()> {
        for i in 0..(READ_COUNT * 10) {
            b.put(
                0,
                (key, i.to_be_bytes()),
                value.to_be_bytes().to_vec().into(),
            )?;
        }
        Ok(())
    }
    fn check(db: &TurboPersistence<RayonParallelScheduler, 1>, key: u8, value: u32) -> Result<()> {
        for i in 0..READ_COUNT {
            // read every 10th item
            let i = i * 10;
            assert_eq!(
                db.get(0, &(key, i.to_be_bytes()))?.as_deref(),
                Some(&value.to_be_bytes()[..]),
                "Key {key} {i} expected {value}"
            );
        }
        Ok(())
    }
    fn iter_bits(v: u32) -> impl Iterator<Item = u8> {
        (0..32u8).filter(move |i| v & (1 << i) != 0)
    }

    {
        println!("--- Init ---");
        let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler(
            path.to_path_buf(),
            RayonParallelScheduler,
        )?;
        let b = db.write_batch()?;
        for j in 0..=255 {
            put(&b, j, 0)?;
        }
        db.commit_write_batch(b)?;
        db.shutdown()?;
    }

    let mut expected_values = [0; 256];

    for i in 1..50 {
        println!("--- Iteration {i} ---");
        let i = i * 37;
        println!("Add more entries");
        {
            let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler(
                path.to_path_buf(),
                RayonParallelScheduler,
            )?;
            let b = db.write_batch()?;
            for j in iter_bits(i) {
                println!("Put {j} = {i}");
                expected_values[j as usize] = i;
                put(&b, j, i)?;
            }
            db.commit_write_batch(b)?;

            for j in 0..32 {
                check(&db, j, expected_values[j as usize])?;
            }

            db.shutdown()?;
        }

        println!("Compaction");
        {
            let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler(
                path.to_path_buf(),
                RayonParallelScheduler,
            )?;

            db.compact(&CompactConfig {
                optimal_merge_count: 4,
                min_merge_duplication_bytes: 1,
                optimal_merge_duplication_bytes: 1,
                ..Default::default()
            })?;

            for j in 0..32 {
                check(&db, j, expected_values[j as usize])?;
            }

            db.shutdown()?;
        }

        println!("Restore check");
        {
            let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler(
                path.to_path_buf(),
                RayonParallelScheduler,
            )?;

            for j in 0..32 {
                check(&db, j, expected_values[j as usize])?;
            }

            db.shutdown()?;
        }
    }

    Ok(())
}

#[test]
fn batch_get_basic() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let db = TurboPersistence::<_, 16>::open_with_parallel_scheduler(
        path.to_path_buf(),
        RayonParallelScheduler,
    )?;

    // Write some test data
    let batch = db.write_batch()?;
    for i in 0..100u8 {
        batch.put(0, vec![i], vec![i].into())?;
    }
    db.commit_write_batch(batch)?;

    // Test batch_get with mixed existing and non-existing keys
    let keys_to_fetch = vec![vec![10u8], vec![20u8], vec![200u8], vec![50u8], vec![255u8]];
    let results = db.batch_get(0, &keys_to_fetch)?;

    assert_eq!(results.len(), 5);
    assert_eq!(results[0].as_deref(), Some(&[10u8][..]));
    assert_eq!(results[1].as_deref(), Some(&[20u8][..]));
    assert_eq!(results[2], None); // 200 doesn't exist
    assert_eq!(results[3].as_deref(), Some(&[50u8][..]));
    assert_eq!(results[4], None); // 255 doesn't exist

    db.shutdown()?;
    Ok(())
}

#[test]
fn batch_get_all_existing() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let db = TurboPersistence::<_, 16>::open_with_parallel_scheduler(
        path.to_path_buf(),
        RayonParallelScheduler,
    )?;

    // Write test data
    let batch = db.write_batch()?;
    for i in 0..50u8 {
        batch.put(0, vec![i], vec![i * 2].into())?;
    }
    db.commit_write_batch(batch)?;

    // Fetch all existing keys
    let keys_to_fetch: Vec<Vec<u8>> = (0..50u8).map(|i| vec![i]).collect();
    let results = db.batch_get(0, &keys_to_fetch)?;

    assert_eq!(results.len(), 50);
    for (i, result) in results.iter().enumerate() {
        assert_eq!(result.as_deref(), Some(&[(i * 2) as u8][..]));
    }

    db.shutdown()?;
    Ok(())
}

#[test]
fn batch_get_none_existing() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let db = TurboPersistence::<_, 16>::open_with_parallel_scheduler(
        path.to_path_buf(),
        RayonParallelScheduler,
    )?;

    // Write some data but query different keys
    let batch = db.write_batch()?;
    for i in 0..10u8 {
        batch.put(0, vec![i], vec![i].into())?;
    }
    db.commit_write_batch(batch)?;

    // Fetch non-existing keys
    let keys_to_fetch: Vec<Vec<u8>> = (100..110u8).map(|i| vec![i]).collect();
    let results = db.batch_get(0, &keys_to_fetch)?;

    assert_eq!(results.len(), 10);
    for result in results.iter() {
        assert_eq!(result, &None);
    }

    db.shutdown()?;
    Ok(())
}

#[test]
fn batch_get_empty() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let db = TurboPersistence::<_, 16>::open_with_parallel_scheduler(
        path.to_path_buf(),
        RayonParallelScheduler,
    )?;

    // Write some data
    let batch = db.write_batch()?;
    batch.put(0, vec![1u8], vec![1u8].into())?;
    db.commit_write_batch(batch)?;

    // Fetch with empty key list
    let keys_to_fetch: Vec<Vec<u8>> = vec![];
    let results = db.batch_get(0, &keys_to_fetch)?;

    assert_eq!(results.len(), 0);

    db.shutdown()?;
    Ok(())
}

#[test]
fn batch_get_duplicate_keys() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let db = TurboPersistence::<_, 16>::open_with_parallel_scheduler(
        path.to_path_buf(),
        RayonParallelScheduler,
    )?;

    // Write test data
    let batch = db.write_batch()?;
    batch.put(0, vec![42u8], vec![100u8].into())?;
    batch.put(0, vec![43u8], vec![101u8].into())?;
    db.commit_write_batch(batch)?;

    // Fetch with duplicate keys - results should maintain order
    let keys_to_fetch = vec![
        vec![42u8],
        vec![43u8],
        vec![42u8],
        vec![99u8], // non-existing
        vec![42u8],
    ];
    let results = db.batch_get(0, &keys_to_fetch)?;

    assert_eq!(results.len(), 5);
    assert_eq!(results[0].as_deref(), Some(&[100u8][..]));
    assert_eq!(results[1].as_deref(), Some(&[101u8][..]));
    assert_eq!(results[2].as_deref(), Some(&[100u8][..]));
    assert_eq!(results[3], None);
    assert_eq!(results[4].as_deref(), Some(&[100u8][..]));

    db.shutdown()?;
    Ok(())
}

#[test]
fn batch_get_large_batch() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let db = TurboPersistence::<_, 16>::open_with_parallel_scheduler(
        path.to_path_buf(),
        RayonParallelScheduler,
    )?;

    // Write many entries
    let batch = db.write_batch()?;
    for i in 0..1000u32 {
        batch.put(
            0,
            i.to_be_bytes().to_vec(),
            (i * 2).to_be_bytes().to_vec().into(),
        )?;
    }
    db.commit_write_batch(batch)?;

    // Fetch a large batch (every 10th entry)
    let keys_to_fetch: Vec<Vec<u8>> = (0..1000u32)
        .filter(|i| i % 10 == 0)
        .map(|i| i.to_be_bytes().to_vec())
        .collect();
    let results = db.batch_get(0, &keys_to_fetch)?;

    assert_eq!(results.len(), 100);
    for (idx, i) in (0..1000u32).filter(|i| i % 10 == 0).enumerate() {
        assert_eq!(
            results[idx].as_deref(),
            Some(&(i * 2).to_be_bytes()[..]),
            "Failed at index {idx} for key {i}"
        );
    }

    db.shutdown()?;
    Ok(())
}

#[test]
fn batch_get_different_sizes() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let db = TurboPersistence::<_, 16>::open_with_parallel_scheduler(
        path.to_path_buf(),
        RayonParallelScheduler,
    )?;

    // Write values of different sizes
    let batch = db.write_batch()?;
    batch.put(0, vec![0u8], vec![].into())?; // empty
    batch.put(0, vec![1u8], vec![1u8; 4].into())?; // inline
    batch.put(0, vec![2u8], vec![2u8; 10].into())?; // small
    batch.put(0, vec![3u8], vec![3u8; MAX_SMALL_VALUE_SIZE + 1].into())?; // medium
    batch.put(0, vec![4u8], vec![4u8; 10 * MAX_SMALL_VALUE_SIZE].into())?; // larger
    batch.put(0, vec![5u8], vec![5u8; MAX_MEDIUM_VALUE_SIZE + 1].into())?; // blob
    db.commit_write_batch(batch)?;

    // Fetch all with different sizes
    let keys_to_fetch = vec![
        vec![0u8],
        vec![1u8],
        vec![2u8],
        vec![3u8],
        vec![4u8],
        vec![5u8],
        vec![6u8], // non-existing
    ];
    let results = db.batch_get(0, &keys_to_fetch)?;

    assert_eq!(results.len(), 7);
    assert_eq!(results[0].as_deref(), Some(&[][..]));
    assert_eq!(results[1].as_deref(), Some(&vec![1u8; 4][..]));
    assert_eq!(results[2].as_deref(), Some(&vec![2u8; 10][..]));
    assert_eq!(
        results[3].as_deref(),
        Some(&vec![3u8; MAX_SMALL_VALUE_SIZE + 1][..])
    );
    assert_eq!(
        results[4].as_deref(),
        Some(&vec![4u8; 10 * MAX_SMALL_VALUE_SIZE][..])
    );
    assert_eq!(
        results[5].as_deref(),
        Some(&vec![5u8; MAX_MEDIUM_VALUE_SIZE + 1][..])
    );
    assert_eq!(results[6], None);

    db.shutdown()?;
    Ok(())
}

#[test]
fn batch_get_across_families() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let db = TurboPersistence::<_, 16>::open_with_parallel_scheduler(
        path.to_path_buf(),
        RayonParallelScheduler,
    )?;

    // Write to multiple families
    let batch = db.write_batch()?;
    for family in 0..4u32 {
        for i in 0..20u8 {
            batch.put(family, vec![i], vec![family as u8, i].into())?;
        }
    }
    db.commit_write_batch(batch)?;

    // Fetch from each family separately
    for family in 0..4usize {
        let keys_to_fetch: Vec<Vec<u8>> = (0..20u8).map(|i| vec![i]).collect();
        let results = db.batch_get(family, &keys_to_fetch)?;

        assert_eq!(results.len(), 20);
        for (i, result) in results.iter().enumerate() {
            assert_eq!(
                result.as_deref(),
                Some(&vec![family as u8, i as u8][..]),
                "Failed at family {family}, index {i}"
            );
        }
    }

    // Verify family isolation - keys from family 0 shouldn't be in family 1
    let keys_to_fetch: Vec<Vec<u8>> = (0..20u8).map(|i| vec![i]).collect();
    let results_f0 = db.batch_get(0, &keys_to_fetch)?;
    let results_f1 = db.batch_get(1, &keys_to_fetch)?;

    // Same keys, but different values per family
    assert_ne!(results_f0[0].as_deref(), results_f1[0].as_deref());

    db.shutdown()?;
    Ok(())
}

#[test]
fn batch_get_after_compaction() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let db = TurboPersistence::<_, 16>::open_with_parallel_scheduler(
        path.to_path_buf(),
        RayonParallelScheduler,
    )?;

    // Write data across multiple batches to create multiple SST files
    for batch_num in 0..5u8 {
        let batch = db.write_batch()?;
        for i in 0..20u8 {
            let key = batch_num * 20 + i;
            batch.put(0, vec![key], vec![key].into())?;
        }
        db.commit_write_batch(batch)?;
    }

    // Fetch before compaction
    let keys_to_fetch: Vec<Vec<u8>> = (0..100u8).map(|i| vec![i]).collect();
    let results_before = db.batch_get(0, &keys_to_fetch)?;

    // Compact database
    db.full_compact()?;

    // Fetch after compaction
    let results_after = db.batch_get(0, &keys_to_fetch)?;

    // Results should be identical
    assert_eq!(results_before.len(), results_after.len());
    for i in 0..100 {
        assert_eq!(
            results_before[i].as_deref(),
            results_after[i].as_deref(),
            "Mismatch at index {i}"
        );
        assert_eq!(results_after[i].as_deref(), Some(&[i as u8][..]));
    }

    db.shutdown()?;
    Ok(())
}

#[test]
fn batch_get_with_overwrites() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let db = TurboPersistence::<_, 16>::open_with_parallel_scheduler(
        path.to_path_buf(),
        RayonParallelScheduler,
    )?;

    // Write initial data
    let batch = db.write_batch()?;
    for i in 0..50u8 {
        batch.put(0, vec![i], vec![i].into())?;
    }
    db.commit_write_batch(batch)?;

    // Overwrite some keys
    let batch = db.write_batch()?;
    for i in 0..25u8 {
        batch.put(0, vec![i], vec![i + 100].into())?;
    }
    db.commit_write_batch(batch)?;

    // Fetch all keys
    let keys_to_fetch: Vec<Vec<u8>> = (0..50u8).map(|i| vec![i]).collect();
    let results = db.batch_get(0, &keys_to_fetch)?;

    assert_eq!(results.len(), 50);
    // First 25 should have new values
    for (i, result) in results.iter().enumerate().take(25) {
        assert_eq!(
            result.as_deref(),
            Some(&[i as u8 + 100][..]),
            "Failed at index {i}"
        );
    }
    // Last 25 should have original values
    for (i, result) in results.iter().enumerate().skip(25) {
        assert_eq!(
            result.as_deref(),
            Some(&[i as u8][..]),
            "Failed at index {i}"
        );
    }

    db.shutdown()?;
    Ok(())
}

#[test]
fn batch_get_comparison_with_get() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let db = TurboPersistence::<_, 16>::open_with_parallel_scheduler(
        path.to_path_buf(),
        RayonParallelScheduler,
    )?;

    // Write test data
    let batch = db.write_batch()?;
    for i in 0..100u32 {
        batch.put(
            0,
            i.to_be_bytes().to_vec(),
            (i * 3).to_be_bytes().to_vec().into(),
        )?;
    }
    db.commit_write_batch(batch)?;

    // Prepare keys
    let keys_to_fetch: Vec<Vec<u8>> = (0..150u32)
        .filter(|i| i % 3 == 0)
        .map(|i| i.to_be_bytes().to_vec())
        .collect();

    // Get results using batch_get
    let batch_results = db.batch_get(0, &keys_to_fetch)?;

    // Get results using individual get calls
    let mut individual_results = Vec::new();
    for key in &keys_to_fetch {
        individual_results.push(db.get(0, key)?);
    }

    // Compare results
    assert_eq!(batch_results.len(), individual_results.len());
    for (i, (batch_result, individual_result)) in batch_results
        .iter()
        .zip(individual_results.iter())
        .enumerate()
    {
        assert_eq!(
            batch_result.as_deref(),
            individual_result.as_deref(),
            "Mismatch at index {i}"
        );
    }

    db.shutdown()?;
    Ok(())
}

#[test]
fn batch_get_after_restore() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    // Write data and close
    {
        let db = TurboPersistence::<_, 16>::open_with_parallel_scheduler(
            path.to_path_buf(),
            RayonParallelScheduler,
        )?;

        let batch = db.write_batch()?;
        for i in 0..100u8 {
            batch.put(0, vec![i], vec![i, i + 1].into())?;
        }
        db.commit_write_batch(batch)?;
        db.shutdown()?;
    }

    // Reopen and test batch_get
    {
        let db = TurboPersistence::<_, 16>::open_with_parallel_scheduler(
            path.to_path_buf(),
            RayonParallelScheduler,
        )?;

        let keys_to_fetch: Vec<Vec<u8>> = (0..100u8).step_by(5).map(|i| vec![i]).collect();
        let results = db.batch_get(0, &keys_to_fetch)?;

        assert_eq!(results.len(), 20);
        for (idx, i) in (0..100u8).step_by(5).enumerate() {
            assert_eq!(
                results[idx].as_deref(),
                Some(&vec![i, i + 1][..]),
                "Failed at index {idx} for key {i}"
            );
        }

        db.shutdown()?;
    }

    Ok(())
}

/// Test that compaction works with many small values without overflowing block indices.
/// Reproduces a CI benchmark failure with key_4/value_512/entries_1.98Mi/compacted.
#[test]
fn many_small_values_compaction() -> Result<()> {
    use rand::{RngExt, SeedableRng, rngs::SmallRng};

    use crate::parallel_scheduler::SerialScheduler;

    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let db = TurboPersistence::<SerialScheduler, 1>::open(path.to_path_buf())?;

    let mut rng = SmallRng::seed_from_u64(42);

    // Mimic the benchmark: key_size=4, value_size=512, single commit, then compact.
    // entry_count = 1GB / (4+512) ≈ 2M entries
    let entry_count = 1024 * 1024 * 1024 / (4 + 512);
    let batch = db.write_batch()?;
    for i in 0..entry_count as u32 {
        let key = i.to_be_bytes().to_vec();
        let mut value = vec![0u8; 512];
        rng.fill(&mut value[..]);
        batch.put(0, key, value.into())?;
    }
    db.commit_write_batch(batch)?;

    // This is what panics in CI with "Block index overflow"
    for _ in 0..3 {
        db.full_compact()?;
    }

    // Quick sanity check
    let result = db.get(0, &0u32.to_be_bytes())?;
    assert!(result.is_some(), "Entry 0 not found after compaction");
    assert_eq!(result.unwrap().len(), 512);

    db.shutdown()?;
    Ok(())
}

/// Test compaction with MAX_SMALL_VALUE_SIZE (4096-byte) values.
/// Worst case for small value blocks: fewest entries per block.
#[test]
fn many_max_small_values_compaction() -> Result<()> {
    use rand::{RngExt, SeedableRng, rngs::SmallRng};

    use crate::{constants::MAX_SMALL_VALUE_SIZE, parallel_scheduler::SerialScheduler};

    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let db = TurboPersistence::<SerialScheduler, 1>::open(path.to_path_buf())?;

    let mut rng = SmallRng::seed_from_u64(43);

    // Write enough entries across two commits so compaction merges them into large SSTs.
    let entry_count = 512 * 1024;
    for batch_start in [0, entry_count] {
        let batch = db.write_batch()?;
        for i in batch_start..batch_start + entry_count {
            let key = (i as u32).to_be_bytes().to_vec();
            let mut value = vec![0u8; MAX_SMALL_VALUE_SIZE];
            rng.fill(&mut value[..]);
            batch.put(0, key, value.into())?;
        }
        db.commit_write_batch(batch)?;
    }

    for _ in 0..3 {
        db.full_compact()?;
    }

    let result = db.get(0, &0u32.to_be_bytes())?;
    assert!(result.is_some(), "Entry 0 not found after compaction");
    assert_eq!(result.unwrap().len(), MAX_SMALL_VALUE_SIZE);

    db.shutdown()?;
    Ok(())
}

/// Test compaction with 4097-byte values (minimum medium size).
/// Each medium value gets its own dedicated block, so this is the worst case for block count.
#[test]
fn many_medium_values_compaction() -> Result<()> {
    use rand::{RngExt, SeedableRng, rngs::SmallRng};

    use crate::{constants::MAX_SMALL_VALUE_SIZE, parallel_scheduler::SerialScheduler};

    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let db = TurboPersistence::<SerialScheduler, 1>::open(path.to_path_buf())?;

    let mut rng = SmallRng::seed_from_u64(44);

    let value_size = MAX_SMALL_VALUE_SIZE + 1; // 4097 bytes = minimum medium size
    // Write enough entries across two commits so compaction merges them.
    let entry_count = 128 * 1024;
    for batch_start in [0, entry_count] {
        let batch = db.write_batch()?;
        for i in batch_start..batch_start + entry_count {
            let key = (i as u32).to_be_bytes().to_vec();
            let mut value = vec![0u8; value_size];
            rng.fill(&mut value[..]);
            batch.put(0, key, value.into())?;
        }
        db.commit_write_batch(batch)?;
    }

    for _ in 0..3 {
        db.full_compact()?;
    }

    let result = db.get(0, &0u32.to_be_bytes())?;
    assert!(result.is_some(), "Entry 0 not found after compaction");
    assert_eq!(result.unwrap().len(), value_size);

    db.shutdown()?;
    Ok(())
}

#[test]
fn compaction_multi_value_preserves_different_values() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let config = multi_value_config();

    let key = vec![42u8];

    {
        let db = TurboPersistence::<_, 1>::open_with_config_and_parallel_scheduler(
            path.to_path_buf(),
            config,
            RayonParallelScheduler,
        )?;

        // Write same key with different values in separate batches
        let batch = db.write_batch()?;
        batch.put(0, key.clone(), vec![1u8].into())?;
        db.commit_write_batch(batch)?;

        let batch = db.write_batch()?;
        batch.put(0, key.clone(), vec![2u8].into())?;
        db.commit_write_batch(batch)?;

        let batch = db.write_batch()?;
        batch.put(0, key.clone(), vec![3u8].into())?;
        db.commit_write_batch(batch)?;

        // Before compaction: all 3 values exist
        let results = db.get_multiple(0, &key.as_slice())?;
        assert_eq!(results.len(), 3, "Should have 3 values before compaction");

        // Compact with MultiValue mode
        db.full_compact()?;

        // After compaction: all different values should be preserved
        let results = db.get_multiple(0, &key.as_slice())?;
        assert_eq!(
            results.len(),
            3,
            "MultiValue should preserve all different values after compaction"
        );

        let mut values: Vec<u8> = results.iter().map(|r| r[0]).collect();
        values.sort();
        assert_eq!(values, vec![1, 2, 3], "All values should be preserved");

        db.shutdown()?;
    }

    Ok(())
}

fn multi_value_config() -> DbConfig<1> {
    let mut config = DbConfig::<1>::default();
    config.family_configs[0] = FamilyConfig {
        name: "test",
        kind: FamilyKind::MultiValue,
    };
    config
}

#[test]
fn compaction_multi_value_multiple_compactions() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let config = multi_value_config();

    let key = vec![42u8];

    {
        let db = TurboPersistence::<_, 1>::open_with_config_and_parallel_scheduler(
            path.to_path_buf(),
            config.clone(),
            RayonParallelScheduler,
        )?;

        // Write initial values
        for value in [1u8, 2, 3] {
            let batch = db.write_batch()?;
            batch.put(0, key.clone(), vec![value].into())?;
            db.commit_write_batch(batch)?;
        }

        db.full_compact()?;

        // After first compaction: 3 unique values
        let results = db.get_multiple(0, &key.as_slice())?;
        assert_eq!(results.len(), 3);

        // Add more values (some duplicates of existing values)
        for value in [2u8, 4, 1] {
            let batch = db.write_batch()?;
            batch.put(0, key.clone(), vec![value].into())?;
            db.commit_write_batch(batch)?;
        }

        // Before second compaction: all 6 entries present (no dedup)
        let results = db.get_multiple(0, &key.as_slice())?;
        assert_eq!(results.len(), 6);

        // Second compaction
        db.full_compact()?;

        // After second compaction: all 6 entries preserved (no dedup)
        let results = db.get_multiple(0, &key.as_slice())?;
        assert_eq!(
            results.len(),
            6,
            "Should have all 6 values after second compaction (no dedup)"
        );

        let mut values: Vec<u8> = results.iter().map(|r| r[0]).collect();
        values.sort();
        assert_eq!(
            values,
            vec![1, 1, 2, 2, 3, 4],
            "Should have values 1, 1, 2, 2, 3, 4"
        );

        db.shutdown()?;
    }

    // Reopen and verify persistence
    {
        let db = TurboPersistence::<_, 1>::open_with_config_and_parallel_scheduler(
            path.to_path_buf(),
            config,
            RayonParallelScheduler,
        )?;

        let results = db.get_multiple(0, &key.as_slice())?;
        assert_eq!(results.len(), 6, "Should still have 6 values after reopen");

        db.shutdown()?;
    }

    Ok(())
}

#[test]
fn multi_value_delete_key() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let config = multi_value_config();
    let key = vec![42u8];

    {
        let db = TurboPersistence::<_, 1>::open_with_config_and_parallel_scheduler(
            path.to_path_buf(),
            config.clone(),
            RayonParallelScheduler,
        )?;

        // Write multiple values for the same key across separate batches
        for value in [1u8, 2, 3] {
            let batch = db.write_batch()?;
            batch.put(0, key.clone(), vec![value].into())?;
            db.commit_write_batch(batch)?;
        }

        // Verify all values are present
        let results = db.get_multiple(0, &key.as_slice())?;
        assert_eq!(results.len(), 3, "Should have 3 values before deletion");

        // Delete the key
        let batch = db.write_batch()?;
        batch.delete(0, key.clone())?;
        db.commit_write_batch(batch)?;

        // Verify deleted
        let results = db.get_multiple(0, &key.as_slice())?;
        assert!(
            results.is_empty(),
            "get_multiple should return empty after delete"
        );

        // Compact and verify still deleted
        db.full_compact()?;

        let results = db.get_multiple(0, &key.as_slice())?;
        assert!(
            results.is_empty(),
            "get_multiple should return empty after compaction"
        );

        db.shutdown()?;
    }

    // Reopen and verify deletion persists
    {
        let db = TurboPersistence::<_, 1>::open_with_config_and_parallel_scheduler(
            path.to_path_buf(),
            config,
            RayonParallelScheduler,
        )?;

        let results = db.get_multiple(0, &key.as_slice())?;
        assert!(
            results.is_empty(),
            "get_multiple should return empty after reopen"
        );

        db.shutdown()?;
    }

    Ok(())
}

#[test]
fn multi_value_delete_then_rewrite() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let config = multi_value_config();
    let key = vec![42u8];

    {
        let db = TurboPersistence::<_, 1>::open_with_config_and_parallel_scheduler(
            path.to_path_buf(),
            config.clone(),
            RayonParallelScheduler,
        )?;

        // Write initial values
        for value in [1u8, 2, 3] {
            let batch = db.write_batch()?;
            batch.put(0, key.clone(), vec![value].into())?;
            db.commit_write_batch(batch)?;
        }

        // Delete the key
        let batch = db.write_batch()?;
        batch.delete(0, key.clone())?;
        db.commit_write_batch(batch)?;

        let results = db.get_multiple(0, &key.as_slice())?;
        assert!(results.is_empty(), "Should be deleted");

        // Write new values for the same key
        for value in [10u8, 20] {
            let batch = db.write_batch()?;
            batch.put(0, key.clone(), vec![value].into())?;
            db.commit_write_batch(batch)?;
        }

        // Only the new values should be visible — old values must not reappear
        let results = db.get_multiple(0, &key.as_slice())?;
        assert_eq!(results.len(), 2, "Should have only the 2 new values");
        let mut values: Vec<u8> = results.iter().map(|r| r[0]).collect();
        values.sort();
        assert_eq!(values, vec![10, 20], "Should have only new values 10, 20");

        // After compaction, the tombstone prunes old values; only new values remain
        db.full_compact()?;

        let results = db.get_multiple(0, &key.as_slice())?;
        assert_eq!(
            results.len(),
            2,
            "Should still have 2 values after compaction"
        );
        let mut values: Vec<u8> = results.iter().map(|r| r[0]).collect();
        values.sort();
        assert_eq!(
            values,
            vec![10, 20],
            "Should still have values 10, 20 after compaction"
        );

        db.shutdown()?;
    }

    // Reopen and verify
    {
        let db = TurboPersistence::<_, 1>::open_with_config_and_parallel_scheduler(
            path.to_path_buf(),
            config,
            RayonParallelScheduler,
        )?;

        let results = db.get_multiple(0, &key.as_slice())?;
        assert_eq!(results.len(), 2, "Should have 2 values after reopen");
        let mut values: Vec<u8> = results.iter().map(|r| r[0]).collect();
        values.sort();
        assert_eq!(
            values,
            vec![10, 20],
            "Should have values 10, 20 after reopen"
        );

        db.shutdown()?;
    }

    Ok(())
}

#[test]
fn multi_value_delete_with_compaction_interleaved() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let config = multi_value_config();
    let key = vec![42u8];

    {
        let db = TurboPersistence::<_, 1>::open_with_config_and_parallel_scheduler(
            path.to_path_buf(),
            config.clone(),
            RayonParallelScheduler,
        )?;

        // Write values 1, 2
        for value in [1u8, 2] {
            let batch = db.write_batch()?;
            batch.put(0, key.clone(), vec![value].into())?;
            db.commit_write_batch(batch)?;
        }

        // Compact — values 1, 2 are now in a compacted SST
        db.full_compact()?;

        let results = db.get_multiple(0, &key.as_slice())?;
        assert_eq!(
            results.len(),
            2,
            "Should have 2 values after first compaction"
        );

        // Write value 3 (new SST on top of compacted data)
        let batch = db.write_batch()?;
        batch.put(0, key.clone(), vec![3u8].into())?;
        db.commit_write_batch(batch)?;

        // Delete the key
        let batch = db.write_batch()?;
        batch.delete(0, key.clone())?;
        db.commit_write_batch(batch)?;

        // Verify deleted
        let results = db.get_multiple(0, &key.as_slice())?;
        assert!(results.is_empty(), "Should be deleted");

        // Compact again — merges everything
        db.full_compact()?;

        // After compaction, the tombstone prunes all values — key appears empty
        let results = db.get_multiple(0, &key.as_slice())?;
        assert!(
            results.is_empty(),
            "get_multiple should return empty after compaction"
        );

        // Write new value 4 — visible because it goes into a newer SST than the tombstone
        let batch = db.write_batch()?;
        batch.put(0, key.clone(), vec![4u8].into())?;
        db.commit_write_batch(batch)?;

        let results = db.get_multiple(0, &key.as_slice())?;
        assert_eq!(results.len(), 1, "Should have only value 4");
        assert_eq!(results[0].as_ref(), &[4u8]);

        db.shutdown()?;
    }

    // Reopen and verify
    {
        let db = TurboPersistence::<_, 1>::open_with_config_and_parallel_scheduler(
            path.to_path_buf(),
            config,
            RayonParallelScheduler,
        )?;

        let results = db.get_multiple(0, &key.as_slice())?;
        assert_eq!(results.len(), 1, "Should have only value 4 after reopen");
        assert_eq!(results[0].as_ref(), &[4u8]);

        db.shutdown()?;
    }

    Ok(())
}

// Tests for the new WriteBatch semantics:
// - SingleValue: duplicate keys in the same batch is a user error (panics in debug builds)
// - MultiValue: tombstone only shadows entries from older SSTs, not entries in the same batch

#[test]
#[cfg(debug_assertions)]
#[should_panic(expected = "WriteBatch invariant violation: SingleValue family has duplicate key")]
fn single_value_duplicate_key_panics() {
    let tempdir = tempfile::tempdir().unwrap();
    let path = tempdir.path();

    // For SingleValue, writing the same key twice in one batch should panic in debug builds.
    let key = vec![1u8];

    let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler(
        path.to_path_buf(),
        RayonParallelScheduler,
    )
    .unwrap();

    let batch = db.write_batch().unwrap();
    batch.put(0, key.clone(), vec![10u8].into()).unwrap();
    batch.put(0, key.clone(), vec![20u8].into()).unwrap(); // should panic
    db.commit_write_batch(batch).unwrap(); // panics during commit
}

#[test]
fn multi_value_tombstone_only_shadows_older_ssts() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let config = multi_value_config();

    // For MultiValue, a tombstone only shadows entries from older SSTs.
    // Entries in the same batch are NOT shadowed by the tombstone.
    let key = vec![3u8];

    {
        let db = TurboPersistence::<_, 1>::open_with_config_and_parallel_scheduler(
            path.to_path_buf(),
            config.clone(),
            RayonParallelScheduler,
        )?;

        // First write an older value in a separate batch (separate SST)
        let batch = db.write_batch()?;
        batch.put(0, key.clone(), vec![99u8].into())?;
        db.commit_write_batch(batch)?;

        // Now in a single batch: put(A), delete, put(B)
        // With the new semantics:
        // - The tombstone shadows the older SST (99)
        // - Entries in the same batch (A=10 and B=20) are preserved
        let batch = db.write_batch()?;
        batch.put(0, key.clone(), vec![10u8].into())?;
        batch.delete(0, key.clone())?;
        batch.put(0, key.clone(), vec![20u8].into())?;
        db.commit_write_batch(batch)?;

        let results = db.get_multiple(0, &key.as_slice())?;
        // Should have both values from the same batch (10 and 20), but not 99
        // The tombstone shadows the older SST but not the same-batch entries
        assert_eq!(
            results.len(),
            2,
            "Should have 2 values (both from same batch), got {:?}",
            results
        );

        db.shutdown()?;
    }

    Ok(())
}

#[test]
fn multi_value_tombstone_shadows_older_sst_only() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let config = multi_value_config();

    // For MultiValue, tombstone in a batch shadows only entries from older SSTs.
    let key = vec![4u8];

    {
        let db = TurboPersistence::<_, 1>::open_with_config_and_parallel_scheduler(
            path.to_path_buf(),
            config.clone(),
            RayonParallelScheduler,
        )?;

        // Write an older value in a separate batch
        let batch = db.write_batch()?;
        batch.put(0, key.clone(), vec![99u8].into())?;
        db.commit_write_batch(batch)?;

        // Now in a single batch: put(A), put(B), delete
        // The tombstone shadows the older SST (99), but A (10) and B (20) are preserved.
        let batch = db.write_batch()?;
        batch.put(0, key.clone(), vec![10u8].into())?;
        batch.put(0, key.clone(), vec![20u8].into())?;
        batch.delete(0, key.clone())?;
        db.commit_write_batch(batch)?;

        let results = db.get_multiple(0, &key.as_slice())?;
        // Should have both values from the same batch (10 and 20), but not 99
        assert_eq!(
            results.len(),
            2,
            "Should have 2 values (both from same batch, tombstone shadows older SST), got {:?}",
            results
        );

        db.shutdown()?;
    }

    Ok(())
}

/// Returns the number of `.blob` files in the given directory.
fn count_blob_files(dir: &Path) -> usize {
    fs::read_dir(dir)
        .unwrap()
        .filter_map(|e| e.ok())
        .filter(|e| e.path().extension().is_some_and(|ext| ext == "blob"))
        .count()
}

/// Test that compaction deletes blob files when their entries are superseded
/// by newer values (SingleValue family).
#[test]
fn compaction_deletes_superseded_blob() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler(
        path.to_path_buf(),
        RayonParallelScheduler,
    )?;

    let blob_value = vec![42u8; MAX_MEDIUM_VALUE_SIZE + 1];

    // Write a blob-sized value
    let batch = db.write_batch()?;
    batch.put(0, vec![1u8], blob_value.clone().into())?;
    db.commit_write_batch(batch)?;

    assert_eq!(
        count_blob_files(path),
        1,
        "Should have 1 blob file after first write"
    );

    // Verify we can read it
    let result = db.get(0, &vec![1u8])?;
    assert_eq!(result.as_deref(), Some(&blob_value[..]));

    // Overwrite the key with a small (non-blob) value in a new batch
    let batch = db.write_batch()?;
    batch.put(0, vec![1u8], vec![99u8].into())?;
    db.commit_write_batch(batch)?;

    // Blob file still exists before compaction (old SST still references it)
    assert_eq!(
        count_blob_files(path),
        1,
        "Blob file should still exist before compaction"
    );

    // Compact — the old blob entry is superseded by the newer small value
    db.full_compact()?;

    // The new value should still be readable
    let result = db.get(0, &vec![1u8])?;
    assert_eq!(result.as_deref(), Some(&[99u8][..]));

    // After compaction, the old blob file should be deleted immediately.
    assert_eq!(
        count_blob_files(path),
        0,
        "Old blob file should be deleted after compaction"
    );

    db.shutdown()?;

    Ok(())
}

/// Test that compaction deletes blob files when a key is deleted via tombstone
/// (SingleValue family).
#[test]
fn compaction_deletes_blob_on_tombstone() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler(
        path.to_path_buf(),
        RayonParallelScheduler,
    )?;

    let blob_value = vec![42u8; MAX_MEDIUM_VALUE_SIZE + 1];

    // Write a blob-sized value
    let batch = db.write_batch()?;
    batch.put(0, vec![1u8], blob_value.clone().into())?;
    db.commit_write_batch(batch)?;

    assert_eq!(count_blob_files(path), 1);

    // Delete the key
    let batch = db.write_batch()?;
    batch.delete(0, vec![1u8])?;
    db.commit_write_batch(batch)?;

    // Blob file still exists before compaction
    assert_eq!(
        count_blob_files(path),
        1,
        "Blob file should still exist before compaction"
    );

    // Compact — tombstone supersedes the blob entry
    db.full_compact()?;

    // Key should not be found
    let result = db.get(0, &vec![1u8])?;
    assert!(result.is_none());

    // After compaction, the blob file should be deleted immediately.
    assert_eq!(
        count_blob_files(path),
        0,
        "Blob file should be deleted after compaction"
    );

    db.shutdown()?;

    Ok(())
}

/// Test that compaction deletes blob files for MultiValue families when a
/// tombstone prunes older blob entries.
#[test]
fn compaction_deletes_blob_multi_value_tombstone() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let config = DbConfig {
        family_configs: [FamilyConfig {
            name: "test",
            kind: FamilyKind::MultiValue,
        }],
    };

    let db = TurboPersistence::<_, 1>::open_with_config_and_parallel_scheduler(
        path.to_path_buf(),
        config,
        RayonParallelScheduler,
    )?;

    let blob_value = vec![42u8; MAX_MEDIUM_VALUE_SIZE + 1];

    // Write a blob-sized value
    let batch = db.write_batch()?;
    batch.put(0, vec![1u8], blob_value.clone().into())?;
    db.commit_write_batch(batch)?;

    assert_eq!(count_blob_files(path), 1);

    // Delete the key (tombstone) and write a new small value in a new batch
    let batch = db.write_batch()?;
    batch.delete(0, vec![1u8])?;
    batch.put(0, vec![1u8], vec![99u8].into())?;
    db.commit_write_batch(batch)?;

    // Blob file still exists before compaction
    assert_eq!(count_blob_files(path), 1);

    // Compact — tombstone prunes the old blob entry
    db.full_compact()?;

    // The new value should still be readable
    let results = db.get_multiple(0, &vec![1u8].as_slice())?;
    assert_eq!(results.len(), 1);
    assert_eq!(results[0].as_ref(), &[99u8]);

    // After compaction, the old blob file should be deleted immediately.
    assert_eq!(
        count_blob_files(path),
        0,
        "Blob file should be deleted after compaction"
    );

    db.shutdown()?;

    Ok(())
}

/// Test that compaction preserves blob files that are still referenced
/// (not superseded).
#[test]
fn compaction_preserves_active_blob() -> Result<()> {
    let tempdir = tempfile::tempdir()?;
    let path = tempdir.path();

    let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler(
        path.to_path_buf(),
        RayonParallelScheduler,
    )?;

    let blob_value = vec![42u8; MAX_MEDIUM_VALUE_SIZE + 1];

    // Write a blob-sized value
    let batch = db.write_batch()?;
    batch.put(0, vec![1u8], blob_value.clone().into())?;
    db.commit_write_batch(batch)?;

    // Write a different key to create a second SST (so compaction has work to do)
    let batch = db.write_batch()?;
    batch.put(0, vec![2u8], vec![1u8].into())?;
    db.commit_write_batch(batch)?;

    assert_eq!(count_blob_files(path), 1);

    // Compact — the blob entry is still the latest, should be preserved
    db.full_compact()?;

    // Blob file should still exist
    assert_eq!(
        count_blob_files(path),
        1,
        "Active blob file should be preserved after compaction"
    );

    // Value should still be readable
    let result = db.get(0, &vec![1u8])?;
    assert_eq!(result.as_deref(), Some(&blob_value[..]));

    db.shutdown()?;
    Ok(())
}
Quest for Codev2.0.0
/
SIGN IN