next.js/turbopack/crates/turbo-tasks-fs/src/lib.rs
lib.rs3746 lines136.5 KB
#![feature(arbitrary_self_types)]
#![feature(arbitrary_self_types_pointers)]
#![feature(btree_cursors)] // needed for the `InvalidatorMap` and watcher, reduces time complexity
#![feature(io_error_more)]
#![feature(min_specialization)]
// if `normalize_lexically` isn't eventually stabilized, we can copy the implementation from the
// stdlib into our source tree
#![feature(normalize_lexically)]
#![feature(trivial_bounds)]
// Junction points are used on Windows. We could use a third-party crate for this if the junction
// API isn't eventually stabilized.
#![cfg_attr(windows, feature(junction_point))]
#![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this
#![allow(clippy::mutable_key_type)]

pub mod attach;
pub mod embed;
pub mod glob;
mod globset;
pub mod invalidation;
mod invalidator_map;
pub mod json;
mod mutex_map;
mod path_map;
mod read_glob;
mod retry;
pub mod rope;
pub mod source_context;
pub mod util;
pub(crate) mod virtual_fs;
mod watcher;

use std::{
    borrow::Cow,
    cmp::{Ordering, min},
    env,
    error::Error as StdError,
    fmt::{self, Debug, Formatter},
    fs::FileType,
    future::Future,
    io::{self, BufRead, BufReader, ErrorKind, Read, Write as _},
    mem::take,
    path::{MAIN_SEPARATOR, Path, PathBuf},
    sync::{Arc, LazyLock, Weak},
    time::Duration,
};

use anyhow::{Context, Result, anyhow, bail};
use auto_hash_map::{AutoMap, AutoSet};
use bincode::{Decode, Encode};
use bitflags::bitflags;
use dunce::simplified;
use indexmap::IndexSet;
use jsonc_parser::{ParseOptions, parse_to_serde_value};
use mime::Mime;
use rustc_hash::FxHashSet;
use serde_json::Value;
use tokio::{
    runtime::Handle,
    sync::{RwLock, RwLockReadGuard},
};
use tracing::Instrument;
use turbo_rcstr::{RcStr, rcstr};
use turbo_tasks::{
    Completion, Effect, EffectStateStorage, InvalidationReason, NonLocalValue, ReadRef, ResolvedVc,
    TaskInput, TurboTasksApi, ValueToString, ValueToStringRef, Vc, debug::ValueDebugFormat,
    emit_effect, mark_session_dependent, parallel, trace::TraceRawVcs, turbo_tasks_weak, turbobail,
    turbofmt,
};
use turbo_tasks_hash::{
    DeterministicHash, DeterministicHasher, HashAlgorithm, deterministic_hash, hash_xxh3_hash64,
    hash_xxh3_hash128,
};
use turbo_unix_path::{
    get_parent_path, get_relative_path_to, join_path, normalize_path, sys_to_unix, unix_to_sys,
};

use crate::{
    attach::AttachedFileSystem,
    glob::Glob,
    invalidation::Write,
    invalidator_map::InvalidatorMap,
    json::UnparsableJson,
    mutex_map::MutexMap,
    path_map::OrderedPathMapExt,
    read_glob::{read_glob, track_glob},
    retry::{can_retry, retry_blocking, retry_blocking_custom},
    rope::{Rope, RopeReader},
    util::extract_disk_access,
    watcher::DiskWatcher,
};
pub use crate::{read_glob::ReadGlobResult, virtual_fs::VirtualFileSystem};

/// Validate the path, returning the valid path, a modified-but-now-valid path, or bailing with an
/// error.
///
/// The behaviour of the file system changes depending on the OS, and indeed sometimes the FS
/// implementation of the OS itself.
///
/// - On Windows the limit for normal file paths is 260 characters, a holdover from the DOS days,
///   but Rust will opportunistically rewrite paths to 'UNC' paths for supported path operations
///   which can be up to 32767 characters long.
/// - On macOS, the limit is traditionally 255 characters for the file name and a second limit of
///   1024 for the entire path (verified by running `getconf PATH_MAX /`).
/// - On Linux, the limit differs between kernel (and by extension, distro) and filesystem. On most
///   common file systems (e.g. ext4, btrfs, and xfs), individual file names can be up to 255 bytes
///   with no hard limit on total path length. [Some legacy POSIX APIs are restricted to the
///   `PATH_MAX` value of 4096 bytes in `limits.h`, but most applications support longer
///   paths][PATH_MAX].
///
/// For more details, refer to <https://en.wikipedia.org/wiki/Comparison_of_file_systems#Limits>.
///
/// Realistically, the output path lengths will be the same across all platforms, so we need to set
/// a conservative limit and be particular about when we decide to bump it. Here we have opted for
/// 255 characters, because it is the shortest of the three options.
///
/// [PATH_MAX]: https://eklitzke.org/path-max-is-tricky
pub fn validate_path_length(path: &Path) -> Result<Cow<'_, Path>> {
    /// Here we check if the path is too long for windows, and if so, attempt to canonicalize it
    /// to a UNC path.
    fn validate_path_length_inner(path: &Path) -> Result<Cow<'_, Path>> {
        if cfg!(windows) {
            const MAX_PATH_LENGTH_WINDOWS: usize = 260;
            const UNC_PREFIX: &str = "\\\\?\\";

            if path.starts_with(UNC_PREFIX) {
                return Ok(path.into());
            }

            if path.as_os_str().len() > MAX_PATH_LENGTH_WINDOWS {
                let new_path = std::fs::canonicalize(path).map_err(|err| {
                    anyhow!(err).context("file is too long, and could not be normalized")
                })?;
                return Ok(new_path.into());
            }

            Ok(path.into())
        } else {
            /// here we are only going to check if the total length exceeds, or the last segment
            /// exceeds. This heuristic is primarily to avoid long file names, and it makes the
            /// operation much cheaper.
            const MAX_FILE_NAME_LENGTH_UNIX: usize = 255;
            // macOS reports a limit of 1024, but I (@arlyon) have had issues with paths above 1016
            // so we subtract a bit to be safe. on most linux distros this is likely a lot larger
            // than 1024, but macOS is *special*
            const MAX_PATH_LENGTH: usize = 1024 - 8;

            // check the last segment (file name)
            if path
                .file_name()
                .map(|n| n.as_encoded_bytes().len())
                .unwrap_or(0)
                > MAX_FILE_NAME_LENGTH_UNIX
            {
                anyhow::bail!(
                    "file name is too long (exceeds {} bytes)",
                    MAX_FILE_NAME_LENGTH_UNIX,
                );
            }

            if path.as_os_str().len() > MAX_PATH_LENGTH {
                anyhow::bail!("path is too long (exceeds {MAX_PATH_LENGTH} bytes)");
            }

            Ok(path.into())
        }
    }

    validate_path_length_inner(path)
        .with_context(|| format!("path length for file {path:?} exceeds max length of filesystem"))
}

trait ConcurrencyLimitedExt {
    type Output;
    async fn concurrency_limited(self, semaphore: &tokio::sync::Semaphore) -> Self::Output;
}

impl<F, R> ConcurrencyLimitedExt for F
where
    F: Future<Output = R>,
{
    type Output = R;
    async fn concurrency_limited(self, semaphore: &tokio::sync::Semaphore) -> Self::Output {
        let _permit = semaphore.acquire().await;
        self.await
    }
}

fn number_env_var(name: &'static str) -> Option<usize> {
    env::var(name)
        .ok()
        .filter(|val| !val.is_empty())
        .map(|val| match val.parse() {
            Ok(n) => n,
            Err(err) => panic!("{name} must be a valid integer: {err}"),
        })
        .filter(|val| *val != 0)
}

fn create_read_semaphore() -> tokio::sync::Semaphore {
    // the semaphore isn't serialized, and we assume the environment variable doesn't change during
    // runtime, so it's okay to access it in this untracked way.
    static TURBO_ENGINE_READ_CONCURRENCY: LazyLock<usize> =
        LazyLock::new(|| number_env_var("TURBO_ENGINE_READ_CONCURRENCY").unwrap_or(64));
    tokio::sync::Semaphore::new(*TURBO_ENGINE_READ_CONCURRENCY)
}

fn create_write_semaphore() -> tokio::sync::Semaphore {
    // the semaphore isn't serialized, and we assume the environment variable doesn't change during
    // runtime, so it's okay to access it in this untracked way.
    static TURBO_ENGINE_WRITE_CONCURRENCY: LazyLock<usize> = LazyLock::new(|| {
        number_env_var("TURBO_ENGINE_WRITE_CONCURRENCY").unwrap_or(
            // We write a lot of smallish files where high concurrency will cause metadata
            // thrashing. So 4 threads is a safe cross platform suitable value.
            4,
        )
    });
    tokio::sync::Semaphore::new(*TURBO_ENGINE_WRITE_CONCURRENCY)
}

#[turbo_tasks::value_trait]
pub trait FileSystem: ValueToString {
    /// Returns the path to the root of the file system.
    #[turbo_tasks::function]
    fn root(self: ResolvedVc<Self>) -> Vc<FileSystemPath> {
        FileSystemPath::new_normalized(self, RcStr::default()).cell()
    }
    #[turbo_tasks::function]
    fn read(self: Vc<Self>, fs_path: FileSystemPath) -> Vc<FileContent>;
    #[turbo_tasks::function]
    fn read_link(self: Vc<Self>, fs_path: FileSystemPath) -> Vc<LinkContent>;
    #[turbo_tasks::function]
    fn raw_read_dir(self: Vc<Self>, fs_path: FileSystemPath) -> Vc<RawDirectoryContent>;
    #[turbo_tasks::function]
    fn write(self: Vc<Self>, fs_path: FileSystemPath, content: Vc<FileContent>) -> Vc<()>;
    /// See [`FileSystemPath::write_symbolic_link_dir`].
    #[turbo_tasks::function]
    fn write_link(self: Vc<Self>, fs_path: FileSystemPath, target: Vc<LinkContent>) -> Vc<()>;
    #[turbo_tasks::function]
    fn metadata(self: Vc<Self>, fs_path: FileSystemPath) -> Vc<FileMeta>;
}

#[derive(TraceRawVcs, ValueDebugFormat, NonLocalValue, Encode, Decode)]
struct DiskFileSystemInner {
    pub name: RcStr,
    pub root: RcStr,
    #[turbo_tasks(debug_ignore, trace_ignore)]
    #[bincode(skip)]
    mutex_map: MutexMap<PathBuf>,
    #[turbo_tasks(debug_ignore, trace_ignore)]
    #[bincode(skip)]
    invalidator_map: InvalidatorMap,
    #[turbo_tasks(debug_ignore, trace_ignore)]
    #[bincode(skip)]
    dir_invalidator_map: InvalidatorMap,
    /// Lock that makes invalidation atomic. It will keep a write lock during
    /// watcher invalidation and a read lock during other operations.
    #[turbo_tasks(debug_ignore, trace_ignore)]
    #[bincode(skip)]
    invalidation_lock: RwLock<()>,
    /// Semaphore to limit the maximum number of concurrent file operations.
    #[turbo_tasks(debug_ignore, trace_ignore)]
    #[bincode(skip, default = "create_read_semaphore")]
    read_semaphore: tokio::sync::Semaphore,
    /// Semaphore to limit the maximum number of concurrent file operations.
    #[turbo_tasks(debug_ignore, trace_ignore)]
    #[bincode(skip, default = "create_write_semaphore")]
    write_semaphore: tokio::sync::Semaphore,

    #[turbo_tasks(debug_ignore, trace_ignore)]
    watcher: DiskWatcher,
    /// Root paths that we do not allow access to from this filesystem.
    /// Useful for things like output directories to prevent accidental ouroboros situations.
    denied_paths: Vec<RcStr>,
    /// Used by invalidators when called from a non-turbo-tasks thread, specifically in the fs
    /// watcher.
    #[turbo_tasks(debug_ignore, trace_ignore)]
    #[bincode(skip, default = "turbo_tasks_weak")]
    turbo_tasks: Weak<dyn TurboTasksApi>,
    /// Used by invalidators when called from a non-tokio thread, specifically in the fs watcher.
    #[turbo_tasks(debug_ignore, trace_ignore)]
    #[bincode(skip, default = "Handle::current")]
    tokio_handle: Handle,
    #[turbo_tasks(debug_ignore, trace_ignore)]
    #[bincode(skip)]
    effect_state_storage: EffectStateStorage,
}

impl DiskFileSystemInner {
    /// Returns the root as Path
    fn root_path(&self) -> &Path {
        // just in case there's a windows unc path prefix we remove it with `dunce`
        simplified(Path::new(&*self.root))
    }

    /// Checks if a path is within the denied path
    /// Returns true if the path should be treated as non-existent
    ///
    /// Since denied_paths are guaranteed to be:
    /// - normalized (no ../ traversals)
    /// - using unix separators (/)
    /// - relative to the fs root
    ///
    /// We can efficiently check using string operations
    fn is_path_denied(&self, path: &FileSystemPath) -> bool {
        let path = &path.path;
        self.denied_paths.iter().any(|denied_path| {
            path.starts_with(denied_path.as_str())
                && (path.len() == denied_path.len()
                    || path.as_bytes().get(denied_path.len()) == Some(&b'/'))
        })
    }

    /// registers the path as an invalidator for the current task,
    /// has to be called within a turbo-tasks function
    async fn register_read_invalidator(&self, path: &Path) -> Result<()> {
        if let Some(invalidator) = turbo_tasks::get_invalidator() {
            self.invalidator_map.insert(path.to_owned(), invalidator);
            self.watcher
                .ensure_watched_file(path, self.root_path())
                .await?;
        }
        Ok(())
    }

    /// After an effect writes to a path, invalidate any read tasks tracking that path so they
    /// re-read the updated content. This is necessary because the file watcher may not be active
    /// (e.g., in tests or build-only scenarios).
    fn invalidate_from_write(&self, full_path: &Path) {
        let mut invalidator_map = self.invalidator_map.lock().unwrap();
        if let Some(invalidators) = invalidator_map.remove(full_path) {
            let Some(turbo_tasks) = self.turbo_tasks.upgrade() else {
                return;
            };
            let _guard = self.tokio_handle.enter();
            let reason = Write {
                path: full_path.to_string_lossy().into_owned(),
            };
            for invalidator in invalidators {
                invalidator.invalidate_with_reason(&*turbo_tasks, reason.clone());
            }
        }
    }

    /// registers the path as an invalidator for the current task,
    /// has to be called within a turbo-tasks function
    async fn register_dir_invalidator(&self, path: &Path) -> Result<()> {
        if let Some(invalidator) = turbo_tasks::get_invalidator() {
            self.dir_invalidator_map
                .insert(path.to_owned(), invalidator);
            self.watcher
                .ensure_watched_dir(path, self.root_path())
                .await?;
        }
        Ok(())
    }

    async fn lock_path(&self, full_path: &Path) -> PathLockGuard<'_> {
        let lock1 = self.invalidation_lock.read().await;
        let lock2 = self.mutex_map.lock(full_path.to_path_buf()).await;
        PathLockGuard(lock1, lock2)
    }

    fn invalidate(&self) {
        let _span = tracing::info_span!("invalidate filesystem", name = &*self.root).entered();
        let Some(turbo_tasks) = self.turbo_tasks.upgrade() else {
            return;
        };
        let _guard = self.tokio_handle.enter();

        let invalidator_map = take(&mut *self.invalidator_map.lock().unwrap());
        let dir_invalidator_map = take(&mut *self.dir_invalidator_map.lock().unwrap());
        let invalidators = invalidator_map
            .into_iter()
            .chain(dir_invalidator_map)
            .flat_map(|(_, invalidators)| invalidators.into_iter())
            .collect::<Vec<_>>();
        parallel::for_each_owned(invalidators, |invalidator| {
            invalidator.invalidate(&*turbo_tasks)
        });
    }

    /// Invalidates every tracked file in the filesystem.
    ///
    /// Calls the given
    fn invalidate_with_reason<R: InvalidationReason + Clone>(
        &self,
        reason: impl Fn(&Path) -> R + Sync,
    ) {
        let _span = tracing::info_span!("invalidate filesystem", name = &*self.root).entered();
        let Some(turbo_tasks) = self.turbo_tasks.upgrade() else {
            return;
        };
        let _guard = self.tokio_handle.enter();

        let invalidator_map = take(&mut *self.invalidator_map.lock().unwrap());
        let dir_invalidator_map = take(&mut *self.dir_invalidator_map.lock().unwrap());
        let invalidators = invalidator_map
            .into_iter()
            .chain(dir_invalidator_map)
            .flat_map(|(path, invalidators)| {
                let reason_for_path = reason(&path);
                invalidators
                    .into_iter()
                    .map(move |i| (reason_for_path.clone(), i))
            })
            .collect::<Vec<_>>();
        parallel::for_each_owned(invalidators, |(reason, invalidator)| {
            invalidator.invalidate_with_reason(&*turbo_tasks, reason)
        });
    }

    /// Invalidates tracked files/directories for `paths` and their children.
    /// Also invalidates tracked directory reads for all parent directories to
    /// account for file creations/deletions under the deferred subtree.
    fn invalidate_path_and_children_with_reason<R: InvalidationReason + Clone>(
        &self,
        paths: impl IntoIterator<Item = PathBuf>,
        reason: impl Fn(&Path) -> R + Sync,
    ) {
        let _span =
            tracing::info_span!("invalidate filesystem paths", name = &*self.root).entered();
        let Some(turbo_tasks) = self.turbo_tasks.upgrade() else {
            return;
        };
        let _guard = self.tokio_handle.enter();

        let mut invalidator_map = self.invalidator_map.lock().unwrap();
        let mut dir_invalidator_map = self.dir_invalidator_map.lock().unwrap();
        let mut invalidators = Vec::new();
        let mut parent_dirs_to_invalidate = FxHashSet::default();

        for path in paths {
            let mut current_parent = path.parent();
            while let Some(parent) = current_parent {
                parent_dirs_to_invalidate.insert(parent.to_path_buf());
                current_parent = parent.parent();
            }

            for (invalidated_path, path_invalidators) in
                invalidator_map.extract_path_with_children(&path)
            {
                let reason_for_path = reason(&invalidated_path);
                invalidators.extend(
                    path_invalidators
                        .into_iter()
                        .map(|invalidator| (reason_for_path.clone(), invalidator)),
                );
            }

            for (invalidated_path, path_invalidators) in
                dir_invalidator_map.extract_path_with_children(&path)
            {
                let reason_for_path = reason(&invalidated_path);
                invalidators.extend(
                    path_invalidators
                        .into_iter()
                        .map(|invalidator| (reason_for_path.clone(), invalidator)),
                );
            }
        }

        for path in parent_dirs_to_invalidate {
            if let Some(path_invalidators) = dir_invalidator_map.remove(&path) {
                let reason_for_path = reason(&path);
                invalidators.extend(
                    path_invalidators
                        .into_iter()
                        .map(|invalidator| (reason_for_path.clone(), invalidator)),
                );
            }
        }

        drop(invalidator_map);
        drop(dir_invalidator_map);

        parallel::for_each_owned(invalidators, |(reason, invalidator)| {
            invalidator.invalidate_with_reason(&*turbo_tasks, reason)
        });
    }

    #[tracing::instrument(level = "info", name = "start filesystem watching", skip_all, fields(path = %self.root))]
    async fn start_watching_internal(
        self: &Arc<Self>,
        report_invalidation_reason: bool,
        poll_interval: Option<Duration>,
    ) -> Result<()> {
        let root_path = self.root_path().to_path_buf();

        // create the directory for the filesystem on disk, if it doesn't exist
        retry_blocking(|| std::fs::create_dir_all(&root_path))
            .instrument(tracing::info_span!("create root directory", name = ?root_path))
            .concurrency_limited(&self.write_semaphore)
            .await?;

        self.watcher
            .start_watching(self.clone(), report_invalidation_reason, poll_interval)
            .await?;

        Ok(())
    }
}

#[derive(Clone, ValueToString)]
#[value_to_string(self.inner.name)]
#[turbo_tasks::value(cell = "new", eq = "manual")]
pub struct DiskFileSystem {
    inner: Arc<DiskFileSystemInner>,
}

impl DiskFileSystem {
    pub fn name(&self) -> &RcStr {
        &self.inner.name
    }

    pub fn root(&self) -> &RcStr {
        &self.inner.root
    }

    pub fn invalidate(&self) {
        self.inner.invalidate();
    }

    pub fn invalidate_with_reason<R: InvalidationReason + Clone>(
        &self,
        reason: impl Fn(&Path) -> R + Sync,
    ) {
        self.inner.invalidate_with_reason(reason);
    }

    pub fn invalidate_path_and_children_with_reason<R: InvalidationReason + Clone>(
        &self,
        paths: impl IntoIterator<Item = PathBuf>,
        reason: impl Fn(&Path) -> R + Sync,
    ) {
        self.inner
            .invalidate_path_and_children_with_reason(paths, reason);
    }

    pub async fn start_watching(&self, poll_interval: Option<Duration>) -> Result<()> {
        self.inner
            .start_watching_internal(false, poll_interval)
            .await
    }

    pub async fn start_watching_with_invalidation_reason(
        &self,
        poll_interval: Option<Duration>,
    ) -> Result<()> {
        self.inner
            .start_watching_internal(true, poll_interval)
            .await
    }

    pub async fn stop_watching(&self) {
        self.inner.watcher.stop_watching().await;
    }

    /// Try to convert [`Path`] to [`FileSystemPath`]. Return `None` if the file path leaves the
    /// filesystem root. If no `relative_to` argument is given, it is assumed that the `sys_path` is
    /// relative to the [`DiskFileSystem`] root.
    ///
    /// Attempts to convert absolute paths to paths relative to the filesystem root, though we only
    /// attempt to do so lexically.
    ///
    /// Assumes `self` is the `DiskFileSystem` contained in `vc_self`. This API is a bit awkward
    /// because:
    /// - [`Path`]/[`PathBuf`] should not be stored in the filesystem cache, so the function cannot
    ///   be a [`turbo_tasks::function`].
    /// - It's a little convenient for this function to be sync.
    pub fn try_from_sys_path(
        &self,
        vc_self: ResolvedVc<DiskFileSystem>,
        sys_path: &Path,
        relative_to: Option<&FileSystemPath>,
    ) -> Option<FileSystemPath> {
        let vc_self = ResolvedVc::upcast(vc_self);

        let sys_path = simplified(sys_path);
        let relative_sys_path = if sys_path.is_absolute() {
            // `normalize_lexically` will return an error if the relative `sys_path` leaves the
            // DiskFileSystem root
            let normalized_sys_path = sys_path.normalize_lexically().ok()?;
            normalized_sys_path
                .strip_prefix(self.inner.root_path())
                .ok()?
                .to_owned()
        } else if let Some(relative_to) = relative_to {
            debug_assert_eq!(
                relative_to.fs, vc_self,
                "`relative_to.fs` must match the current `ResolvedVc<DiskFileSystem>`"
            );
            let mut joined_sys_path = PathBuf::from(unix_to_sys(&relative_to.path).into_owned());
            joined_sys_path.push(sys_path);
            joined_sys_path.normalize_lexically().ok()?
        } else {
            sys_path.normalize_lexically().ok()?
        };

        Some(FileSystemPath {
            fs: vc_self,
            path: RcStr::from(sys_to_unix(relative_sys_path.to_str()?)),
        })
    }

    pub fn to_sys_path(&self, fs_path: &FileSystemPath) -> PathBuf {
        let path = self.inner.root_path();
        if fs_path.path.is_empty() {
            path.to_path_buf()
        } else {
            path.join(&*unix_to_sys(&fs_path.path))
        }
    }
}

#[allow(dead_code, reason = "we need to hold onto the locks")]
struct PathLockGuard<'a>(
    #[allow(dead_code)] RwLockReadGuard<'a, ()>,
    #[allow(dead_code)] mutex_map::MutexMapGuard<'a, PathBuf>,
);

fn format_absolute_fs_path(path: &Path, name: &str, root_path: &Path) -> Option<String> {
    if let Ok(rel_path) = path.strip_prefix(root_path) {
        let path = if MAIN_SEPARATOR != '/' {
            let rel_path = rel_path.to_string_lossy().replace(MAIN_SEPARATOR, "/");
            format!("[{name}]/{rel_path}")
        } else {
            format!("[{name}]/{}", rel_path.display())
        };
        Some(path)
    } else {
        None
    }
}

impl DiskFileSystem {
    /// Create a new instance of `DiskFileSystem`.
    /// # Arguments
    ///
    /// * `name` - Name of the filesystem.
    /// * `root` - Path to the given filesystem's root. Should be
    ///   [canonicalized][std::fs::canonicalize].
    pub fn new(name: RcStr, root: Vc<RcStr>) -> Vc<Self> {
        Self::new_internal(name, root, Vec::new())
    }

    /// Create a new instance of `DiskFileSystem`.
    /// # Arguments
    ///
    /// * `name` - Name of the filesystem.
    /// * `root` - Path to the given filesystem's root. Should be
    ///   [canonicalized][std::fs::canonicalize].
    /// * `denied_paths` - Paths within this filesystem that are not allowed to be accessed or
    ///   navigated into.  These must be normalized, non-empty and relative to the fs root.
    pub fn new_with_denied_paths(
        name: RcStr,
        root: Vc<RcStr>,
        denied_paths: Vec<RcStr>,
    ) -> Vc<Self> {
        for denied_path in &denied_paths {
            debug_assert!(!denied_path.is_empty(), "denied_path must not be empty");
            debug_assert!(
                normalize_path(denied_path).as_deref() == Some(&**denied_path),
                "denied_path must be normalized: {denied_path:?}"
            );
        }
        Self::new_internal(name, root, denied_paths)
    }
}

#[turbo_tasks::value_impl]
impl DiskFileSystem {
    #[turbo_tasks::function]
    async fn new_internal(
        name: RcStr,
        root: Vc<RcStr>,
        denied_paths: Vec<RcStr>,
    ) -> Result<Vc<Self>> {
        let root = root.owned().await?;
        let instance = DiskFileSystem {
            inner: Arc::new(DiskFileSystemInner {
                name,
                root,
                mutex_map: Default::default(),
                invalidation_lock: Default::default(),
                invalidator_map: InvalidatorMap::new(),
                dir_invalidator_map: InvalidatorMap::new(),
                read_semaphore: create_read_semaphore(),
                write_semaphore: create_write_semaphore(),
                watcher: DiskWatcher::new(),
                denied_paths,
                turbo_tasks: turbo_tasks_weak(),
                tokio_handle: Handle::current(),
                effect_state_storage: EffectStateStorage::default(),
            }),
        };

        Ok(Self::cell(instance))
    }
}

impl Debug for DiskFileSystem {
    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
        write!(f, "name: {}, root: {}", self.inner.name, self.inner.root)
    }
}

#[turbo_tasks::value_impl]
impl FileSystem for DiskFileSystem {
    #[turbo_tasks::function(fs)]
    async fn read(&self, fs_path: FileSystemPath) -> Result<Vc<FileContent>> {
        mark_session_dependent();

        // Check if path is denied - if so, treat as NotFound
        if self.inner.is_path_denied(&fs_path) {
            return Ok(FileContent::NotFound.cell());
        }
        let full_path = self.to_sys_path(&fs_path);

        self.inner.register_read_invalidator(&full_path).await?;

        let _lock = self.inner.lock_path(&full_path).await;
        let content = match retry_blocking(|| File::from_path(&full_path))
            .instrument(tracing::info_span!("read file", name = ?full_path))
            .concurrency_limited(&self.inner.read_semaphore)
            .await
        {
            Ok(file) => FileContent::new(file),
            Err(e) if e.kind() == ErrorKind::NotFound || e.kind() == ErrorKind::InvalidFilename => {
                FileContent::NotFound
            }
            // ast-grep-ignore: no-context-format
            Err(e) => return Err(anyhow!(e).context(format!("reading file {full_path:?}"))),
        };
        Ok(content.cell())
    }

    #[turbo_tasks::function(fs)]
    async fn raw_read_dir(&self, fs_path: FileSystemPath) -> Result<Vc<RawDirectoryContent>> {
        mark_session_dependent();

        // Check if directory itself is denied - if so, treat as NotFound
        if self.inner.is_path_denied(&fs_path) {
            return Ok(RawDirectoryContent::not_found());
        }
        let full_path = self.to_sys_path(&fs_path);

        self.inner.register_dir_invalidator(&full_path).await?;

        // we use the sync std function here as it's a lot faster (600%) in node-file-trace
        let read_dir = match retry_blocking(|| std::fs::read_dir(&full_path))
            .instrument(tracing::info_span!("read directory", name = ?full_path))
            .concurrency_limited(&self.inner.read_semaphore)
            .await
        {
            Ok(dir) => dir,
            Err(e)
                if e.kind() == ErrorKind::NotFound
                    || e.kind() == ErrorKind::NotADirectory
                    || e.kind() == ErrorKind::InvalidFilename =>
            {
                return Ok(RawDirectoryContent::not_found());
            }
            Err(e) => {
                // ast-grep-ignore: no-context-format
                return Err(anyhow!(e).context(format!("reading dir {full_path:?}")));
            }
        };
        let dir_path = fs_path.path.as_str();
        let denied_entries: FxHashSet<&str> = self
            .inner
            .denied_paths
            .iter()
            .filter_map(|denied_path| {
                // If we have a denied path, we need to see if the current directory is a prefix of
                // the denied path meaning that it is possible that some directory entry needs to be
                // filtered. we do this first to avoid string manipulation on every
                // iteration of the directory entries. So while expanding `foo/bar`,
                // if `foo/bar/baz` is denied, we filter out `baz`.
                // But if foo/bar/baz/qux is denied we don't filter anything from this level.
                if denied_path.starts_with(dir_path) {
                    let denied_path_suffix =
                        if denied_path.as_bytes().get(dir_path.len()) == Some(&b'/') {
                            Some(&denied_path[dir_path.len() + 1..])
                        } else if dir_path.is_empty() {
                            Some(denied_path.as_str())
                        } else {
                            None
                        };
                    // if the suffix is `foo/bar` we cannot filter foo from this level
                    denied_path_suffix.filter(|s| !s.contains('/'))
                } else {
                    None
                }
            })
            .collect();

        let entries = read_dir
            .filter_map(|r| {
                let e = match r {
                    Ok(e) => e,
                    Err(err) => return Some(Err(err.into())),
                };

                // we filter out any non unicode names
                let file_name = RcStr::from(e.file_name().to_str()?);
                // Filter out denied entries
                if denied_entries.contains(file_name.as_str()) {
                    return None;
                }

                let entry = match e.file_type() {
                    Ok(t) if t.is_file() => RawDirectoryEntry::File,
                    Ok(t) if t.is_dir() => RawDirectoryEntry::Directory,
                    Ok(t) if t.is_symlink() => RawDirectoryEntry::Symlink,
                    Ok(_) => RawDirectoryEntry::Other,
                    Err(err) => return Some(Err(err.into())),
                };

                Some(anyhow::Ok((file_name, entry)))
            })
            .collect::<Result<_>>()
            .with_context(|| format!("reading directory item in {full_path:?}"))?;

        Ok(RawDirectoryContent::new(entries))
    }

    #[turbo_tasks::function(fs)]
    async fn read_link(&self, fs_path: FileSystemPath) -> Result<Vc<LinkContent>> {
        mark_session_dependent();

        // Check if path is denied - if so, treat as NotFound
        if self.inner.is_path_denied(&fs_path) {
            return Ok(LinkContent::NotFound.cell());
        }
        let full_path = self.to_sys_path(&fs_path);

        self.inner.register_read_invalidator(&full_path).await?;

        let _lock = self.inner.lock_path(&full_path).await;
        let link_path = match retry_blocking(|| std::fs::read_link(&full_path))
            .instrument(tracing::info_span!("read symlink", name = ?full_path))
            .concurrency_limited(&self.inner.read_semaphore)
            .await
        {
            Ok(res) => res,
            Err(_) => return Ok(LinkContent::NotFound.cell()),
        };
        let is_link_absolute = link_path.is_absolute();

        let mut file = link_path.clone();
        if !is_link_absolute {
            if let Some(normalized_linked_path) = full_path.parent().and_then(|p| {
                normalize_path(&sys_to_unix(p.join(&file).to_string_lossy().as_ref()))
            }) {
                #[cfg(windows)]
                {
                    file = PathBuf::from(normalized_linked_path);
                }
                // `normalize_path` stripped the leading `/` of the path
                // add it back here or the `strip_prefix` will return `Err`
                #[cfg(not(windows))]
                {
                    file = PathBuf::from(format!("/{normalized_linked_path}"));
                }
            } else {
                return Ok(LinkContent::Invalid.cell());
            }
        }

        // strip the root from the path, it serves two purpose
        // 1. ensure the linked path is under the root
        // 2. strip the root path if the linked path is absolute
        //
        // we use `dunce::simplify` to strip a potential UNC prefix on windows, on any
        // other OS this gets compiled away
        let result = simplified(&file).strip_prefix(simplified(Path::new(&self.inner.root)));

        let relative_to_root_path = match result {
            Ok(file) => PathBuf::from(sys_to_unix(&file.to_string_lossy()).as_ref()),
            Err(_) => return Ok(LinkContent::Invalid.cell()),
        };

        let (target, file_type) = if is_link_absolute {
            let target_string = RcStr::from(relative_to_root_path.to_string_lossy());
            (
                target_string.clone(),
                FileSystemPath::new_normalized(fs_path.fs().to_resolved().await?, target_string)
                    .get_type()
                    .await?,
            )
        } else {
            let link_path_string_cow = link_path.to_string_lossy();
            let link_path_unix = RcStr::from(sys_to_unix(&link_path_string_cow));
            (
                link_path_unix.clone(),
                fs_path.parent().join(&link_path_unix)?.get_type().await?,
            )
        };

        Ok(LinkContent::Link {
            target,
            link_type: {
                let mut link_type = Default::default();
                if link_path.is_absolute() {
                    link_type |= LinkType::ABSOLUTE;
                }
                if matches!(&*file_type, FileSystemEntryType::Directory) {
                    link_type |= LinkType::DIRECTORY;
                }
                link_type
            },
        }
        .cell())
    }

    #[turbo_tasks::function(fs)]
    async fn write(&self, fs_path: FileSystemPath, content: Vc<FileContent>) -> Result<()> {
        // You might be tempted to use `mark_session_dependent` here, but
        // `write` purely declares a side effect and does not need to be reexecuted in the next
        // session. All side effects are reexecuted in general.

        // Check if path is denied - if so, return an error
        if self.inner.is_path_denied(&fs_path) {
            turbobail!("Cannot write to denied path: {fs_path}");
        }
        let full_path = self.to_sys_path(&fs_path);

        // Persist the file content so it is stored in the persistent cache.
        // Since FileContent uses serialization = "hash", persisting it here ensures the full
        // content is available in the persistent cache (via PersistedFileContent) and does not
        // require recomputing the content on cache restore — avoiding unnecessary downstream
        // recomputation.
        let content = content.persist().await?;

        let inner = self.inner.clone();

        #[derive(TraceRawVcs, NonLocalValue)]
        struct WriteEffect {
            full_path: PathBuf,
            inner: Arc<DiskFileSystemInner>,
            content: ReadRef<PersistedFileContent>,
            content_hash: u128,
        }

        impl Effect for WriteEffect {
            type Error = AnyhowWrapper;
            type Value = u128;

            fn key(&self) -> Vec<u8> {
                self.full_path.as_os_str().as_encoded_bytes().to_vec()
            }

            fn value(&self) -> &u128 {
                &self.content_hash
            }

            fn state_storage(&self) -> &EffectStateStorage {
                &self.inner.effect_state_storage
            }

            async fn apply(&self) -> Result<(), AnyhowWrapper> {
                self.apply_inner().await.map_err(AnyhowWrapper::from)
            }
        }

        impl WriteEffect {
            async fn apply_inner(&self) -> anyhow::Result<()> {
                let full_path = validate_path_length(&self.full_path)?;

                let _lock = self.inner.lock_path(&full_path).await;

                // We perform an untracked comparison here, so that this write is not dependent
                // on a read's Vc<FileContent> (and the memory it holds). Our untracked read can
                // be freed immediately. Given this is an output file, it's unlikely any Turbo
                // code will need to read the file from disk into a Vc<FileContent>, so we're
                // not wasting cycles.
                let compare = self
                    .content
                    .streaming_compare(&full_path)
                    .instrument(tracing::info_span!("read file before write", name = ?full_path))
                    .concurrency_limited(&self.inner.read_semaphore)
                    .await?;
                if compare == FileComparison::Equal {
                    return Ok(());
                }

                match &*self.content {
                    PersistedFileContent::Content(..) => {
                        let content = self.content.clone();
                        let full_path = full_path.into_owned();
                        async {
                            let do_write = || {
                                let content = content.clone();
                                let full_path = full_path.clone();
                                let span = tracing::info_span!("write file", name = ?full_path);
                                retry_blocking(move || {
                                    let mut f = std::fs::File::create(&full_path)?;
                                    let PersistedFileContent::Content(file) = &*content else {
                                        unreachable!()
                                    };
                                    std::io::copy(&mut file.read(), &mut f)?;
                                    #[cfg(unix)]
                                    f.set_permissions(file.meta.permissions.into())?;
                                    f.flush()?;

                                    static WRITE_VERSION: LazyLock<bool> = LazyLock::new(|| {
                                        std::env::var_os("TURBO_ENGINE_WRITE_VERSION")
                                            .is_some_and(|v| v == "1" || v == "true")
                                    });
                                    if *WRITE_VERSION {
                                        let mut full_path = full_path.clone();
                                        let hash = hash_xxh3_hash64(file);
                                        let ext = full_path.extension();
                                        let ext = if let Some(ext) = ext {
                                            format!("{:016x}.{}", hash, ext.to_string_lossy())
                                        } else {
                                            format!("{hash:016x}")
                                        };
                                        full_path.set_extension(ext);
                                        let mut f = std::fs::File::create(&full_path)?;
                                        std::io::copy(&mut file.read(), &mut f)?;
                                        #[cfg(unix)]
                                        f.set_permissions(file.meta.permissions.into())?;
                                        f.flush()?;
                                    }
                                    Ok::<(), io::Error>(())
                                })
                                .instrument(span)
                            };

                            match do_write().await {
                                Err(e) if e.kind() == ErrorKind::NotFound => {
                                    // The parent directory doesn't exist. Create it and retry once.
                                    if let Some(parent) = full_path.parent() {
                                        retry_blocking(|| std::fs::create_dir_all(parent))
                                            .instrument(tracing::info_span!(
                                                "create directory",
                                                name = ?parent
                                            ))
                                            .await
                                            .with_context(|| {
                                                format!(
                                                    "failed to create directory {parent:?} for \
                                                     write to {full_path:?}",
                                                )
                                            })?;
                                    }
                                    do_write().await.with_context(|| {
                                        format!("failed to write to {full_path:?}")
                                    })?;
                                }
                                result => {
                                    result.with_context(|| {
                                        format!("failed to write to {full_path:?}")
                                    })?;
                                }
                            }
                            anyhow::Ok(())
                        }
                        .concurrency_limited(&self.inner.write_semaphore)
                        .await?;
                    }
                    PersistedFileContent::NotFound => {
                        retry_blocking(|| std::fs::remove_file(&full_path))
                            .instrument(tracing::info_span!("remove file", name = ?full_path))
                            .concurrency_limited(&self.inner.write_semaphore)
                            .await
                            .or_else(|err| {
                                if err.kind() == ErrorKind::NotFound {
                                    Ok(())
                                } else {
                                    Err(err)
                                }
                            })
                            .with_context(|| format!("removing {full_path:?} failed"))?;
                    }
                }

                // Invalidate any read tasks tracking this path so they re-read the new content
                self.inner.invalidate_from_write(&self.full_path);

                Ok(())
            }
        }

        let content_hash = u128::from_le_bytes(hash_xxh3_hash128(&*content));
        emit_effect(WriteEffect {
            full_path,
            inner,
            content,
            content_hash,
        });

        Ok(())
    }

    #[turbo_tasks::function(fs)]
    async fn write_link(&self, fs_path: FileSystemPath, target: Vc<LinkContent>) -> Result<()> {
        // You might be tempted to use `mark_session_dependent` here, but we purely declare a side
        // effect and does not need to be re-executed in the next session. All side effects are
        // re-executed in general.

        // Check if path is denied - if so, return an error
        if self.inner.is_path_denied(&fs_path) {
            turbobail!("Cannot write link to denied path: {fs_path}");
        }

        let content = target.await?;

        let full_path = self.to_sys_path(&fs_path);
        let inner = self.inner.clone();

        #[derive(TraceRawVcs, NonLocalValue)]
        struct WriteLinkEffect {
            full_path: PathBuf,
            inner: Arc<DiskFileSystemInner>,
            content: ReadRef<LinkContent>,
            content_hash: u128,
        }

        impl Effect for WriteLinkEffect {
            type Error = AnyhowWrapper;
            type Value = u128;

            fn key(&self) -> Vec<u8> {
                self.full_path.as_os_str().as_encoded_bytes().to_vec()
            }

            fn value(&self) -> &u128 {
                &self.content_hash
            }

            fn state_storage(&self) -> &EffectStateStorage {
                &self.inner.effect_state_storage
            }

            async fn apply(&self) -> Result<(), AnyhowWrapper> {
                self.apply_inner().await.map_err(AnyhowWrapper::from)
            }
        }

        impl WriteLinkEffect {
            async fn apply_inner(&self) -> anyhow::Result<()> {
                let full_path = validate_path_length(&self.full_path)?;

                let _lock = self.inner.lock_path(&full_path).await;

                enum OsSpecificLinkContent {
                    Link {
                        #[cfg(windows)]
                        is_directory: bool,
                        target: PathBuf,
                    },
                    NotFound,
                    Invalid,
                }

                let os_specific_link_content = match &*self.content {
                    LinkContent::Link { target, link_type } => {
                        let is_directory = link_type.contains(LinkType::DIRECTORY);
                        let target_path = if link_type.contains(LinkType::ABSOLUTE) {
                            Path::new(&self.inner.root).join(unix_to_sys(target).as_ref())
                        } else {
                            let relative_target = PathBuf::from(unix_to_sys(target).as_ref());
                            if cfg!(windows) && is_directory {
                                // Windows junction points must always be stored as absolute
                                full_path
                                    .parent()
                                    .unwrap_or(&full_path)
                                    .join(relative_target)
                            } else {
                                relative_target
                            }
                        };
                        OsSpecificLinkContent::Link {
                            #[cfg(windows)]
                            is_directory,
                            target: target_path,
                        }
                    }
                    LinkContent::Invalid => OsSpecificLinkContent::Invalid,
                    LinkContent::NotFound => OsSpecificLinkContent::NotFound,
                };

                let old_content = match retry_blocking(|| std::fs::read_link(&full_path))
                    .instrument(tracing::info_span!("read symlink before write", name = ?full_path))
                    .concurrency_limited(&self.inner.read_semaphore)
                    .await
                {
                    Ok(res) => Some((res.is_absolute(), res)),
                    Err(_) => None,
                };
                let is_equal = match (&os_specific_link_content, &old_content) {
                    (
                        OsSpecificLinkContent::Link { target, .. },
                        Some((old_is_absolute, old_target)),
                    ) => target == old_target && target.is_absolute() == *old_is_absolute,
                    (OsSpecificLinkContent::NotFound, None) => true,
                    _ => false,
                };
                if is_equal {
                    return Ok(());
                }

                match os_specific_link_content {
                    OsSpecificLinkContent::Link {
                        target,
                        #[cfg(windows)]
                        is_directory,
                        ..
                    } => {
                        let full_path = full_path.into_owned();

                        #[derive(thiserror::Error, Debug)]
                        #[error("{msg}: {source}")]
                        struct SymlinkCreationError {
                            msg: &'static str,
                            #[source]
                            source: io::Error,
                        }

                        let mut has_old_content = old_content.is_some();
                        let try_create_link = || {
                            if has_old_content {
                                // Remove existing symlink before creating a new one. On Unix,
                                // symlink(2) fails with EEXIST if the link already exists instead
                                // of overwriting it. Windows has similar behavior with junction
                                // points.
                                remove_symbolic_link_dir_helper(&full_path).map_err(|err| {
                                    SymlinkCreationError {
                                        msg: "removal of existing symbolic link or junction point \
                                              failed",
                                        source: err,
                                    }
                                })?;
                                has_old_content = false;
                            }
                            #[cfg(not(windows))]
                            let io_result = std::os::unix::fs::symlink(&target, &full_path);
                            #[cfg(windows)]
                            let io_result = if is_directory {
                                std::os::windows::fs::junction_point(&target, &full_path)
                            } else {
                                std::os::windows::fs::symlink_file(&target, &full_path)
                            };
                            io_result.map_err(|err| {
                                if err.kind() == ErrorKind::AlreadyExists {
                                    // try to remove the symlink on the next iteration of the loop
                                    has_old_content = true;
                                }
                                SymlinkCreationError {
                                    msg: "creation of a new symbolic link or junction point failed",
                                    source: err,
                                }
                            })
                        };
                        fn can_retry_link(err: &SymlinkCreationError) -> bool {
                            err.source.kind() == ErrorKind::AlreadyExists || can_retry(&err.source)
                        }
                        let err_context = || {
                            #[cfg(not(windows))]
                            let message = format!(
                                "failed to create symlink at {full_path:?} pointing to {target:?}"
                            );
                            #[cfg(windows)]
                            let message = if is_directory {
                                format!(
                                    "failed to create junction point at {full_path:?} pointing to \
                                     {target:?}"
                                )
                            } else {
                                format!(
                                    "failed to create symlink at {full_path:?} pointing to \
                                     {target:?}\n\
                                    (Note: creating file symlinks on Windows require developer \
                                     mode or admin permissions: \
                                     https://learn.microsoft.com/en-us/windows/advanced-settings/developer-mode)",
                                )
                            };
                            message
                        };
                        async {
                            let write_result =
                                retry_blocking_custom(try_create_link, can_retry_link)
                                    .instrument(tracing::info_span!(
                                        "write symlink",
                                        name = ?full_path,
                                        target = ?target,
                                    ))
                                    .await;

                            match write_result {
                                Err(ref e) if e.source.kind() == ErrorKind::NotFound => {
                                    // Parent directory doesn't exist. Create it and retry once.
                                    if let Some(parent) = full_path.parent() {
                                        retry_blocking(|| std::fs::create_dir_all(parent))
                                            .instrument(tracing::info_span!(
                                                "create directory",
                                                name = ?parent
                                            ))
                                            .await
                                            .with_context(|| {
                                                format!(
                                                    "failed to create directory {parent:?} for \
                                                     write link to {full_path:?}",
                                                )
                                            })?;
                                    }
                                    // After the first attempt, any pre-existing link was already
                                    // removed (has_old_content is now false), so just create.
                                    retry_blocking_custom(
                                        || {
                                            #[cfg(not(windows))]
                                            let io_result =
                                                std::os::unix::fs::symlink(&target, &full_path);
                                            #[cfg(windows)]
                                            let io_result = if is_directory {
                                                std::os::windows::fs::junction_point(
                                                    &target, &full_path,
                                                )
                                            } else {
                                                std::os::windows::fs::symlink_file(
                                                    &target, &full_path,
                                                )
                                            };
                                            io_result.map_err(|err| SymlinkCreationError {
                                                msg: "creation of a new symbolic link or junction \
                                                      point failed",
                                                source: err,
                                            })
                                        },
                                        |e: &SymlinkCreationError| can_retry(&e.source),
                                    )
                                    .instrument(tracing::info_span!(
                                        "write symlink",
                                        name = ?full_path,
                                        target = ?target,
                                    ))
                                    .await
                                    .with_context(err_context)?;
                                }
                                result => result.with_context(err_context)?,
                            }
                            anyhow::Ok(())
                        }
                        .concurrency_limited(&self.inner.write_semaphore)
                        .await?;
                    }
                    OsSpecificLinkContent::Invalid => {
                        bail!("invalid symlink target: {full_path:?}");
                    }
                    OsSpecificLinkContent::NotFound => {
                        retry_blocking(|| remove_symbolic_link_dir_helper(&full_path))
                            .instrument(tracing::info_span!("remove symlink", name = ?full_path))
                            .concurrency_limited(&self.inner.write_semaphore)
                            .await
                            .with_context(|| format!("removing {full_path:?} failed"))?;
                    }
                }

                // Invalidate any read tasks tracking this path so they re-read the new content
                self.inner.invalidate_from_write(&self.full_path);

                Ok(())
            }
        }

        let content_hash = u128::from_le_bytes(hash_xxh3_hash128(&*content));
        emit_effect(WriteLinkEffect {
            full_path,
            inner,
            content,
            content_hash,
        });
        Ok(())
    }

    #[turbo_tasks::function(fs)]
    async fn metadata(&self, fs_path: FileSystemPath) -> Result<Vc<FileMeta>> {
        mark_session_dependent();
        let full_path = self.to_sys_path(&fs_path);

        // Check if path is denied - if so, return an error (metadata shouldn't be readable)
        if self.inner.is_path_denied(&fs_path) {
            turbobail!("Cannot read metadata from denied path: {fs_path}");
        }

        self.inner.register_read_invalidator(&full_path).await?;

        let _lock = self.inner.lock_path(&full_path).await;
        let meta = retry_blocking(|| std::fs::metadata(&full_path))
            .instrument(tracing::info_span!("read metadata", name = ?full_path))
            .concurrency_limited(&self.inner.read_semaphore)
            .await
            .with_context(|| format!("reading metadata for {:?}", full_path))?;

        Ok(FileMeta::cell(meta.into()))
    }
}

fn remove_symbolic_link_dir_helper(path: &Path) -> io::Result<()> {
    let result = if cfg!(windows) {
        // Junction points on Windows are treated as directories, and therefore need
        // `remove_dir`:
        //
        // > `RemoveDirectory` can be used to remove a directory junction. Since the target
        // > directory and its contents will remain accessible through its canonical path, the
        // > target directory itself is not affected by removing a junction which targets it.
        //
        // -- https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-removedirectoryw
        //
        // However, Next 16.1.0 shipped with symlinks, before we switched to junction links on
        // Windows, and `remove_dir` won't work on symlinks. So try to remove it as a directory
        // (junction) first, and then fall back to removing it as a file (symlink).
        std::fs::remove_dir(path).or_else(|err| {
            if err.kind() == ErrorKind::NotADirectory {
                std::fs::remove_file(path)
            } else {
                Err(err)
            }
        })
    } else {
        std::fs::remove_file(path)
    };
    match result {
        Ok(()) => Ok(()),
        Err(err) if err.kind() == ErrorKind::NotFound => Ok(()),
        Err(err) => Err(err),
    }
}

#[derive(Debug, Clone, Hash, TaskInput)]
#[turbo_tasks::value(shared)]
pub struct FileSystemPath {
    pub fs: ResolvedVc<Box<dyn FileSystem>>,
    pub path: RcStr,
}

impl ValueToStringRef for FileSystemPath {
    async fn to_string_ref(&self) -> Result<RcStr> {
        turbofmt!("[{}]/{}", self.fs, self.path).await
    }
}

#[turbo_tasks::value_impl]
impl ValueToString for FileSystemPath {
    #[turbo_tasks::function]
    async fn to_string(&self) -> Result<Vc<RcStr>> {
        Ok(Vc::cell(self.to_string_ref().await?))
    }
}

impl FileSystemPath {
    pub fn is_inside_ref(&self, other: &FileSystemPath) -> bool {
        if self.fs == other.fs && self.path.starts_with(&*other.path) {
            if other.path.is_empty() {
                true
            } else {
                self.path.as_bytes().get(other.path.len()) == Some(&b'/')
            }
        } else {
            false
        }
    }

    pub fn is_inside_or_equal_ref(&self, other: &FileSystemPath) -> bool {
        if self.fs == other.fs && self.path.starts_with(&*other.path) {
            if other.path.is_empty() {
                true
            } else {
                matches!(
                    self.path.as_bytes().get(other.path.len()),
                    Some(&b'/') | None
                )
            }
        } else {
            false
        }
    }

    pub fn is_root(&self) -> bool {
        self.path.is_empty()
    }

    pub fn is_in_node_modules(&self) -> bool {
        self.path.starts_with("node_modules/") || self.path.contains("/node_modules/")
    }

    /// Returns the path of `inner` relative to `self`.
    ///
    /// Note: this method always strips the leading `/` from the result.
    pub fn get_path_to<'a>(&self, inner: &'a FileSystemPath) -> Option<&'a str> {
        if self.fs != inner.fs {
            return None;
        }
        let path = inner.path.strip_prefix(&*self.path)?;
        if self.path.is_empty() {
            Some(path)
        } else if let Some(stripped) = path.strip_prefix('/') {
            Some(stripped)
        } else {
            None
        }
    }

    pub fn get_relative_path_to(&self, other: &FileSystemPath) -> Option<RcStr> {
        if self.fs != other.fs {
            return None;
        }

        Some(get_relative_path_to(&self.path, &other.path).into())
    }

    /// Returns the final component of the FileSystemPath, or an empty string
    /// for the root path.
    pub fn file_name(&self) -> &str {
        let (_, file_name) = self.split_file_name();
        file_name
    }

    /// Returns true if this path has the given extension
    ///
    /// slightly faster than `self.extension() == Some(extension)` as we can simply match a
    /// suffix
    pub fn has_extension(&self, extension: &str) -> bool {
        debug_assert!(!extension.contains('/') && extension.starts_with('.'));
        self.path.ends_with(extension)
    }

    /// Returns the extension (without a leading `.`)
    pub fn extension(&self) -> Option<&str> {
        let (_, extension) = self.split_extension();
        extension
    }

    /// Splits the path into two components:
    /// 1. The path without the extension;
    /// 2. The extension, if any.
    fn split_extension(&self) -> (&str, Option<&str>) {
        if let Some((path_before_extension, extension)) = self.path.rsplit_once('.') {
            if extension.contains('/') ||
                // The file name begins with a `.` and has no other `.`s within.
                path_before_extension.ends_with('/') || path_before_extension.is_empty()
            {
                (self.path.as_str(), None)
            } else {
                (path_before_extension, Some(extension))
            }
        } else {
            (self.path.as_str(), None)
        }
    }

    /// Splits the path into two components:
    /// 1. The parent directory, if any;
    /// 2. The file name;
    fn split_file_name(&self) -> (Option<&str>, &str) {
        // Since the path is normalized, we know `parent`, if any, must not be empty.
        if let Some((parent, file_name)) = self.path.rsplit_once('/') {
            (Some(parent), file_name)
        } else {
            (None, self.path.as_str())
        }
    }

    /// Splits the path into three components:
    /// 1. The parent directory, if any;
    /// 2. The file stem;
    /// 3. The extension, if any.
    fn split_file_stem_extension(&self) -> (Option<&str>, &str, Option<&str>) {
        let (path_before_extension, extension) = self.split_extension();

        if let Some((parent, file_stem)) = path_before_extension.rsplit_once('/') {
            (Some(parent), file_stem, extension)
        } else {
            (None, path_before_extension, extension)
        }
    }
}

#[turbo_tasks::value(transparent)]
pub struct FileSystemPathOption(Option<FileSystemPath>);

#[turbo_tasks::value_impl]
impl FileSystemPathOption {
    #[turbo_tasks::function]
    pub fn none() -> Vc<Self> {
        Vc::cell(None)
    }
}

impl FileSystemPath {
    /// Create a new FileSystemPath from a path within a FileSystem. The
    /// /-separated path is expected to be already normalized (this is asserted
    /// in dev mode).
    fn new_normalized(fs: ResolvedVc<Box<dyn FileSystem>>, path: RcStr) -> Self {
        // On Windows, the path must be converted to a unix path before creating. But on
        // Unix, backslashes are a valid char in file names, and the path can be
        // provided by the user, so we allow it.
        debug_assert!(
            MAIN_SEPARATOR != '\\' || !path.contains('\\'),
            "path {path} must not contain a Windows directory '\\', it must be normalized to Unix \
             '/'",
        );
        debug_assert!(
            normalize_path(&path).as_deref() == Some(&*path),
            "path {path} must be normalized",
        );
        FileSystemPath { fs, path }
    }

    /// Adds a subpath to the current path. The /-separated `path` argument might contain ".." or
    /// "." segments, but it must not leave the root of the filesystem.
    pub fn join(&self, path: &str) -> Result<Self> {
        if let Some(path) = join_path(&self.path, path) {
            Ok(Self::new_normalized(self.fs, path.into()))
        } else {
            bail!(
                "FileSystemPath(\"{}\").join(\"{}\") leaves the filesystem root",
                self.path,
                path,
            );
        }
    }

    /// Adds a suffix to the filename. `path` must not contain `/`.
    pub fn append(&self, path: &str) -> Result<Self> {
        if path.contains('/') {
            bail!(
                "FileSystemPath(\"{}\").append(\"{}\") must not append '/'",
                self.path,
                path,
            )
        }
        Ok(Self::new_normalized(
            self.fs,
            format!("{}{}", self.path, path).into(),
        ))
    }

    /// Adds a suffix to the basename of the file path. `appending` must not contain `/`. The [file
    /// extension][FileSystemPath::extension] will stay intact.
    pub fn append_to_stem(&self, appending: &str) -> Result<Self> {
        if appending.contains('/') {
            bail!(
                "FileSystemPath({:?}).append_to_stem({:?}) must not append '/'",
                self.path,
                appending,
            )
        }
        if let (path, Some(ext)) = self.split_extension() {
            return Ok(Self::new_normalized(
                self.fs,
                format!("{path}{appending}.{ext}").into(),
            ));
        }
        Ok(Self::new_normalized(
            self.fs,
            format!("{}{}", self.path, appending).into(),
        ))
    }

    /// Similar to [FileSystemPath::join], but returns an [`Option`] that will be [`None`] when the
    /// joined path would leave the filesystem root.
    #[allow(clippy::needless_borrow)] // for windows build
    pub fn try_join(&self, path: &str) -> Option<FileSystemPath> {
        // TODO(PACK-3279): Remove this once we do not produce invalid paths at the first place.
        #[cfg(target_os = "windows")]
        let path = path.replace('\\', "/");

        join_path(&self.path, &path).map(|p| Self::new_normalized(self.fs, RcStr::from(p)))
    }

    /// Similar to [FileSystemPath::try_join], but returns [`None`] when the new path would leave
    /// the current path (not just the filesystem root). This is useful for preventing access
    /// outside of a directory.
    pub fn try_join_inside(&self, path: &str) -> Option<FileSystemPath> {
        if let Some(p) = join_path(&self.path, path)
            && p.starts_with(&*self.path)
        {
            return Some(Self::new_normalized(self.fs, RcStr::from(p)));
        }
        None
    }

    /// DETERMINISM: Result is in random order. Either sort the result or do not depend on the
    /// order.
    pub fn read_glob(&self, glob: Vc<Glob>) -> Vc<ReadGlobResult> {
        read_glob(self.clone(), glob)
    }

    // Tracks all files and directories matching the glob using the filesystem watcher. Follows
    // symlinks as though they were part of the original hierarchy. The returned [`Vc`] will be
    // invalidated if a file or directory changes.
    pub fn track_glob(&self, glob: Vc<Glob>, include_dot_files: bool) -> Vc<Completion> {
        track_glob(self.clone(), glob, include_dot_files)
    }

    pub fn root(&self) -> Vc<Self> {
        self.fs().root()
    }
}

impl FileSystemPath {
    pub fn fs(&self) -> Vc<Box<dyn FileSystem>> {
        *self.fs
    }

    pub fn is_inside(&self, other: &FileSystemPath) -> bool {
        self.is_inside_ref(other)
    }

    pub fn is_inside_or_equal(&self, other: &FileSystemPath) -> bool {
        self.is_inside_or_equal_ref(other)
    }

    /// Creates a new [`FileSystemPath`] like `self` but with the given
    /// extension.
    pub fn with_extension(&self, extension: &str) -> FileSystemPath {
        let (path_without_extension, _) = self.split_extension();
        Self::new_normalized(
            self.fs,
            // Like `Path::with_extension` and `PathBuf::set_extension`, if the extension is empty,
            // we remove the extension altogether.
            match extension.is_empty() {
                true => path_without_extension.into(),
                false => format!("{path_without_extension}.{extension}").into(),
            },
        )
    }

    /// Extracts the stem (non-extension) portion of self.file_name.
    ///
    /// The stem is:
    ///
    /// * [`None`], if there is no file name;
    /// * The entire file name if there is no embedded `.`;
    /// * The entire file name if the file name begins with `.` and has no other `.`s within;
    /// * Otherwise, the portion of the file name before the final `.`
    pub fn file_stem(&self) -> Option<&str> {
        let (_, file_stem, _) = self.split_file_stem_extension();
        if file_stem.is_empty() {
            return None;
        }
        Some(file_stem)
    }
}

impl std::fmt::Display for FileSystemPath {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.write_str(&self.path)
    }
}

#[turbo_tasks::function]
pub async fn rebase(
    fs_path: FileSystemPath,
    old_base: FileSystemPath,
    new_base: FileSystemPath,
) -> Result<Vc<FileSystemPath>> {
    let new_path;
    if old_base.path.is_empty() {
        if new_base.path.is_empty() {
            new_path = fs_path.path.clone();
        } else {
            new_path = [new_base.path.as_str(), "/", &fs_path.path].concat().into();
        }
    } else {
        let base_path = [&old_base.path, "/"].concat();
        if !fs_path.path.starts_with(&base_path) {
            turbobail!(
                "rebasing {fs_path} from {old_base} onto {new_base} doesn't work because it's not \
                 part of the source path",
            );
        }
        if new_base.path.is_empty() {
            new_path = [&fs_path.path[base_path.len()..]].concat().into();
        } else {
            new_path = [new_base.path.as_str(), &fs_path.path[old_base.path.len()..]]
                .concat()
                .into();
        }
    }
    Ok(new_base.fs.root().await?.join(&new_path)?.cell())
}

// Not turbo-tasks functions, only delegating
impl FileSystemPath {
    pub fn read(&self) -> Vc<FileContent> {
        self.fs().read(self.clone())
    }

    pub fn read_link(&self) -> Vc<LinkContent> {
        self.fs().read_link(self.clone())
    }

    pub fn read_json(&self) -> Vc<FileJsonContent> {
        self.fs().read(self.clone()).parse_json()
    }

    pub fn read_json5(&self) -> Vc<FileJsonContent> {
        self.fs().read(self.clone()).parse_json5()
    }

    /// Reads content of a directory.
    ///
    /// DETERMINISM: Result is in random order. Either sort result or do not
    /// depend on the order.
    pub fn raw_read_dir(&self) -> Vc<RawDirectoryContent> {
        self.fs().raw_read_dir(self.clone())
    }

    pub fn write(&self, content: Vc<FileContent>) -> Vc<()> {
        self.fs().write(self.clone(), content)
    }

    /// Creates a symbolic link to a directory on *nix platforms, or a directory junction point on
    /// Windows.
    ///
    /// [Windows supports symbolic links][windows-symlink], but they [can require elevated
    /// privileges][windows-privileges] if "developer mode" is not enabled, so we can't safely use
    /// them. Using junction points [matches the behavior of pnpm][pnpm-windows].
    ///
    /// This only supports directories because Windows junction points are incompatible with files.
    /// To ensure compatibility, this will return an error if the target is a file, even on
    /// platforms with full symlink support.
    ///
    /// **We intentionally do not provide an API for symlinking a file**, as we cannot support that
    /// on all Windows configurations.
    ///
    /// [windows-symlink]: https://blogs.windows.com/windowsdeveloper/2016/12/02/symlinks-windows-10/
    /// [windows-privileges]: https://learn.microsoft.com/en-us/previous-versions/windows/it-pro/windows-10/security/threat-protection/security-policy-settings/create-symbolic-links
    /// [pnpm-windows]: https://pnpm.io/faq#does-it-work-on-windows
    pub fn write_symbolic_link_dir(&self, target: Vc<LinkContent>) -> Vc<()> {
        self.fs().write_link(self.clone(), target)
    }

    pub fn metadata(&self) -> Vc<FileMeta> {
        self.fs().metadata(self.clone())
    }

    // Returns the realpath to the file, resolving all symlinks and reporting an error if the path
    // is invalid.
    pub async fn realpath(&self) -> Result<FileSystemPath> {
        let result = &(*self.realpath_with_links().await?);
        match &result.path_result {
            Ok(path) => Ok(path.clone()),
            Err(error) => bail!("{}", error.as_error_message(self, result).await?),
        }
    }

    pub fn rebase(
        fs_path: FileSystemPath,
        old_base: FileSystemPath,
        new_base: FileSystemPath,
    ) -> Vc<FileSystemPath> {
        rebase(fs_path, old_base, new_base)
    }
}

impl FileSystemPath {
    /// Reads content of a directory.
    ///
    /// DETERMINISM: Result is in random order. Either sort result or do not
    /// depend on the order.
    pub fn read_dir(&self) -> Vc<DirectoryContent> {
        read_dir(self.clone())
    }

    pub fn parent(&self) -> FileSystemPath {
        let path = &self.path;
        if path.is_empty() {
            return self.clone();
        }
        FileSystemPath::new_normalized(self.fs, RcStr::from(get_parent_path(path)))
    }

    // It is important that get_type uses read_dir and not stat/metadata.
    // - `get_type` is called very very often during resolving and stat would
    // make it 1 syscall per call, whereas read_dir would make it 1 syscall per
    // directory.
    // - `metadata` allows you to use the "wrong" casing on
    // case-insensitive filesystems, while read_dir gives you the "correct"
    // casing. We want to enforce "correct" casing to avoid broken builds on
    // Vercel deployments (case-sensitive).
    pub fn get_type(&self) -> Vc<FileSystemEntryType> {
        get_type(self.clone())
    }

    pub fn realpath_with_links(&self) -> Vc<RealPathResult> {
        realpath_with_links(self.clone())
    }
}

#[derive(Clone, Debug)]
#[turbo_tasks::value(shared)]
pub struct RealPathResult {
    pub path_result: Result<FileSystemPath, RealPathResultError>,
    pub symlinks: Vec<FileSystemPath>,
}

/// Errors that can occur when resolving a path with symlinks.
/// Many of these can be transient conditions that might happen when package managers are running.
#[derive(Debug, Clone, Hash, Eq, PartialEq, NonLocalValue, TraceRawVcs, Encode, Decode)]
pub enum RealPathResultError {
    TooManySymlinks,
    CycleDetected,
    Invalid,
    NotFound,
}

impl RealPathResultError {
    /// Formats the error message
    pub async fn as_error_message(
        &self,
        orig: &FileSystemPath,
        result: &RealPathResult,
    ) -> Result<RcStr> {
        Ok(match self {
            RealPathResultError::TooManySymlinks => {
                let len = result.symlinks.len();
                turbofmt!("Symlink {orig} leads to too many other symlinks ({len} links)").await?
            }
            RealPathResultError::CycleDetected => {
                // symlinks is Vec<FileSystemPath> — format with Debug since
                // turbofmt can't resolve a whole Vec asynchronously.
                let symlinks_dbg = format!(
                    "{:?}",
                    result.symlinks.iter().map(|s| &s.path).collect::<Vec<_>>()
                );
                turbofmt!("Symlink {orig} is in a symlink loop: {symlinks_dbg}").await?
            }
            RealPathResultError::Invalid => {
                turbofmt!("Symlink {orig} is invalid, it points out of the filesystem root").await?
            }
            RealPathResultError::NotFound => {
                turbofmt!("Symlink {orig} is invalid, it points at a file that doesn't exist")
                    .await?
            }
        })
    }
}

#[derive(Clone, Copy, Debug, Default, DeterministicHash, PartialOrd, Ord)]
#[turbo_tasks::value(shared)]
pub enum Permissions {
    Readable,
    #[default]
    Writable,
    Executable,
}

// Only handle the permissions on unix platform for now

#[cfg(unix)]
impl From<Permissions> for std::fs::Permissions {
    fn from(perm: Permissions) -> Self {
        use std::os::unix::fs::PermissionsExt;
        match perm {
            Permissions::Readable => std::fs::Permissions::from_mode(0o444),
            Permissions::Writable => std::fs::Permissions::from_mode(0o664),
            Permissions::Executable => std::fs::Permissions::from_mode(0o755),
        }
    }
}

#[cfg(unix)]
impl From<std::fs::Permissions> for Permissions {
    fn from(perm: std::fs::Permissions) -> Self {
        use std::os::unix::fs::PermissionsExt;
        if perm.readonly() {
            Permissions::Readable
        } else {
            // https://github.com/fitzgen/is_executable/blob/master/src/lib.rs#L96
            if perm.mode() & 0o111 != 0 {
                Permissions::Executable
            } else {
                Permissions::Writable
            }
        }
    }
}

#[cfg(not(unix))]
impl From<std::fs::Permissions> for Permissions {
    fn from(_: std::fs::Permissions) -> Self {
        Permissions::default()
    }
}

#[turbo_tasks::value(shared, serialization = "hash")]
#[derive(Clone, Debug, PartialOrd, Ord)]
pub enum FileContent {
    Content(File),
    NotFound,
}

impl From<File> for FileContent {
    fn from(file: File) -> Self {
        FileContent::Content(file)
    }
}

/// A persisted version of [`FileContent`] that stores the full file content in the task cache.
///
/// [`FileContent`] uses `serialization = "hash"`, so only a hash is kept in the persistent cache.
/// When reading the file content back from the cache, the hash is compared to detect changes, but
/// the actual data is not available. `PersistedFileContent` provides the full data so that
/// [`DiskFileSystem::write`] can retrieve it without re-reading from disk.
#[turbo_tasks::value(shared)]
#[derive(Clone, Debug, DeterministicHash, PartialOrd, Ord)]
pub enum PersistedFileContent {
    Content(File),
    NotFound,
}

impl PersistedFileContent {
    /// Performs a comparison of self's data against a disk file's streamed read.
    async fn streaming_compare(&self, path: &Path) -> Result<FileComparison> {
        let old_file =
            extract_disk_access(retry_blocking(|| std::fs::File::open(path)).await, path)?;
        let Some(old_file) = old_file else {
            return Ok(match self {
                PersistedFileContent::NotFound => FileComparison::Equal,
                _ => FileComparison::Create,
            });
        };
        // We know old file exists, does the new file?
        let PersistedFileContent::Content(new_file) = self else {
            return Ok(FileComparison::NotEqual);
        };

        let old_meta = extract_disk_access(retry_blocking(|| old_file.metadata()).await, path)?;
        let Some(old_meta) = old_meta else {
            // If we failed to get meta, then the old file has been deleted between the
            // handle open. In which case, we just pretend the file never existed.
            return Ok(FileComparison::Create);
        };
        // If the meta is different, we need to rewrite the file to update it.
        if new_file.meta != old_meta.into() {
            return Ok(FileComparison::NotEqual);
        }

        // So meta matches, and we have a file handle. Let's stream the contents to see
        // if they match.
        let mut new_contents = new_file.read();
        let mut old_contents = BufReader::new(old_file);
        Ok(loop {
            let new_chunk = new_contents.fill_buf()?;
            let Ok(old_chunk) = old_contents.fill_buf() else {
                break FileComparison::NotEqual;
            };

            let len = min(new_chunk.len(), old_chunk.len());
            if len == 0 {
                if new_chunk.len() == old_chunk.len() {
                    break FileComparison::Equal;
                } else {
                    break FileComparison::NotEqual;
                }
            }

            if new_chunk[0..len] != old_chunk[0..len] {
                break FileComparison::NotEqual;
            }

            new_contents.consume(len);
            old_contents.consume(len);
        })
    }
}

#[derive(Clone, Debug, Eq, PartialEq)]
enum FileComparison {
    Create,
    Equal,
    NotEqual,
}

bitflags! {
  #[derive(
    Default,
    TraceRawVcs,
    NonLocalValue,
    DeterministicHash,
    Encode,
    Decode,
  )]
  pub struct LinkType: u8 {
      const DIRECTORY = 0b00000001;
      const ABSOLUTE = 0b00000010;
  }
}

/// The contents of a symbolic link. On Windows, this may be a junction point.
///
/// When reading, we treat symbolic links and junction points on Windows as equivalent. When
/// creating a new link, we always create junction points, because symlink creation may fail if
/// Windows "developer mode" is not enabled and we're running in an unprivileged environment.
#[turbo_tasks::value(shared)]
#[derive(Debug, DeterministicHash)]
pub enum LinkContent {
    /// A valid symbolic link pointing to `target`.
    ///
    /// When reading a relative link, the target is raw value read from the link.
    ///
    /// When reading an absolute link, the target is stripped of the root path while reading. This
    /// ensures we don't store absolute paths inside of the persistent cache.
    ///
    /// We don't use the [`FileSystemPath`] to store the target, because the [`FileSystemPath`] is
    /// always normalized. In [`FileSystemPath::write_symbolic_link_dir`] we need to compare
    /// `target` with the value returned by [`std::fs::read_link`].
    Link {
        target: RcStr,
        link_type: LinkType,
    },
    // Invalid means the link is invalid it points out of the filesystem root
    Invalid,
    // The target was not found
    NotFound,
}

#[turbo_tasks::value(shared)]
#[derive(Clone, DeterministicHash, PartialOrd, Ord)]
pub struct File {
    #[turbo_tasks(debug_ignore)]
    content: Rope,
    meta: FileMeta,
}

impl File {
    /// Reads a [File] from the given path
    fn from_path(p: &Path) -> io::Result<Self> {
        let mut file = std::fs::File::open(p)?;
        let metadata = file.metadata()?;

        let mut output = Vec::with_capacity(metadata.len() as usize);
        file.read_to_end(&mut output)?;

        Ok(File {
            meta: metadata.into(),
            content: Rope::from(output),
        })
    }

    /// Creates a [File] from raw bytes.
    fn from_bytes(content: Vec<u8>) -> Self {
        File {
            meta: FileMeta::default(),
            content: Rope::from(content),
        }
    }

    /// Creates a [File] from a rope.
    fn from_rope(content: Rope) -> Self {
        File {
            meta: FileMeta::default(),
            content,
        }
    }

    /// Returns the content type associated with this file.
    pub fn content_type(&self) -> Option<&Mime> {
        self.meta.content_type.as_ref()
    }

    /// Sets the content type associated with this file.
    pub fn with_content_type(mut self, content_type: Mime) -> Self {
        self.meta.content_type = Some(content_type);
        self
    }

    /// Returns a Read/AsyncRead/Stream/Iterator to access the File's contents.
    pub fn read(&self) -> RopeReader<'_> {
        self.content.read()
    }
}

impl Debug for File {
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
        f.debug_struct("File")
            .field("meta", &self.meta)
            .field("content (hash)", &hash_xxh3_hash64(&self.content))
            .finish()
    }
}

impl From<RcStr> for File {
    fn from(s: RcStr) -> Self {
        s.into_owned().into()
    }
}

impl From<String> for File {
    fn from(s: String) -> Self {
        File::from_bytes(s.into_bytes())
    }
}

impl From<ReadRef<RcStr>> for File {
    fn from(s: ReadRef<RcStr>) -> Self {
        File::from_bytes(s.as_bytes().to_vec())
    }
}

impl From<&str> for File {
    fn from(s: &str) -> Self {
        File::from_bytes(s.as_bytes().to_vec())
    }
}

impl From<Vec<u8>> for File {
    fn from(bytes: Vec<u8>) -> Self {
        File::from_bytes(bytes)
    }
}

impl From<&[u8]> for File {
    fn from(bytes: &[u8]) -> Self {
        File::from_bytes(bytes.to_vec())
    }
}

impl From<ReadRef<Rope>> for File {
    fn from(rope: ReadRef<Rope>) -> Self {
        File::from_rope(ReadRef::into_owned(rope))
    }
}

impl From<Rope> for File {
    fn from(rope: Rope) -> Self {
        File::from_rope(rope)
    }
}

impl File {
    pub fn new(meta: FileMeta, content: Vec<u8>) -> Self {
        Self {
            meta,
            content: Rope::from(content),
        }
    }

    /// Returns the associated [FileMeta] of this file.
    pub fn meta(&self) -> &FileMeta {
        &self.meta
    }

    /// Returns the immutable contents of this file.
    pub fn content(&self) -> &Rope {
        &self.content
    }
}

#[turbo_tasks::value(shared)]
#[derive(Debug, Clone, Default)]
pub struct FileMeta {
    // Size of the file
    // len: u64,
    permissions: Permissions,
    #[bincode(with = "turbo_bincode::mime_option")]
    #[turbo_tasks(trace_ignore)]
    content_type: Option<Mime>,
}

impl Ord for FileMeta {
    fn cmp(&self, other: &Self) -> Ordering {
        self.permissions
            .cmp(&other.permissions)
            .then_with(|| self.content_type.as_ref().cmp(&other.content_type.as_ref()))
    }
}

impl PartialOrd for FileMeta {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

impl From<std::fs::Metadata> for FileMeta {
    fn from(meta: std::fs::Metadata) -> Self {
        let permissions = meta.permissions().into();

        Self {
            permissions,
            content_type: None,
        }
    }
}

impl DeterministicHash for FileMeta {
    fn deterministic_hash<H: DeterministicHasher>(&self, state: &mut H) {
        self.permissions.deterministic_hash(state);
        if let Some(content_type) = &self.content_type {
            content_type.to_string().deterministic_hash(state);
        }
    }
}

impl FileContent {
    pub fn new(file: File) -> Self {
        FileContent::Content(file)
    }

    pub fn is_content(&self) -> bool {
        matches!(self, FileContent::Content(_))
    }

    pub fn as_content(&self) -> Option<&File> {
        match self {
            FileContent::Content(file) => Some(file),
            FileContent::NotFound => None,
        }
    }

    pub fn parse_json_ref(&self) -> FileJsonContent {
        match self {
            FileContent::Content(file) => {
                let content = file.content.clone().into_bytes();
                let de = &mut serde_json::Deserializer::from_slice(&content);
                match serde_path_to_error::deserialize(de) {
                    Ok(data) => FileJsonContent::Content(data),
                    Err(e) => FileJsonContent::Unparsable(Box::new(
                        UnparsableJson::from_serde_path_to_error(e),
                    )),
                }
            }
            FileContent::NotFound => FileJsonContent::NotFound,
        }
    }

    pub fn parse_json_with_comments_ref(&self) -> FileJsonContent {
        match self {
            FileContent::Content(file) => match file.content.to_str() {
                Ok(string) => match parse_to_serde_value(
                    &string,
                    &ParseOptions {
                        allow_comments: true,
                        allow_trailing_commas: true,
                        allow_loose_object_property_names: false,
                    },
                ) {
                    Ok(data) => match data {
                        Some(value) => FileJsonContent::Content(value),
                        None => FileJsonContent::unparsable(rcstr!(
                            "text content doesn't contain any json data"
                        )),
                    },
                    Err(e) => FileJsonContent::Unparsable(Box::new(
                        UnparsableJson::from_jsonc_error(e, string.as_ref()),
                    )),
                },
                Err(_) => FileJsonContent::unparsable(rcstr!("binary is not valid utf-8 text")),
            },
            FileContent::NotFound => FileJsonContent::NotFound,
        }
    }

    pub fn parse_json5_ref(&self) -> FileJsonContent {
        match self {
            FileContent::Content(file) => match file.content.to_str() {
                Ok(string) => match parse_to_serde_value(
                    &string,
                    &ParseOptions {
                        allow_comments: true,
                        allow_trailing_commas: true,
                        allow_loose_object_property_names: true,
                    },
                ) {
                    Ok(data) => match data {
                        Some(value) => FileJsonContent::Content(value),
                        None => FileJsonContent::unparsable(rcstr!(
                            "text content doesn't contain any json data"
                        )),
                    },
                    Err(e) => FileJsonContent::Unparsable(Box::new(
                        UnparsableJson::from_jsonc_error(e, string.as_ref()),
                    )),
                },
                Err(_) => FileJsonContent::unparsable(rcstr!("binary is not valid utf-8 text")),
            },
            FileContent::NotFound => FileJsonContent::NotFound,
        }
    }

    pub fn lines_ref(&self) -> FileLinesContent {
        match self {
            FileContent::Content(file) => match file.content.to_str() {
                Ok(string) => {
                    let mut bytes_offset = 0;
                    FileLinesContent::Lines(
                        string
                            .split('\n')
                            .map(|l| {
                                let line = FileLine {
                                    content: l.to_string(),
                                    bytes_offset,
                                };
                                bytes_offset += (l.len() + 1) as u32;
                                line
                            })
                            .collect(),
                    )
                }
                Err(_) => FileLinesContent::Unparsable,
            },
            FileContent::NotFound => FileLinesContent::NotFound,
        }
    }
}

#[turbo_tasks::value_impl]
impl FileContent {
    #[turbo_tasks::function]
    pub fn len(&self) -> Result<Vc<Option<u64>>> {
        Ok(Vc::cell(match self {
            FileContent::Content(file) => Some(file.content.len() as u64),
            FileContent::NotFound => None,
        }))
    }

    #[turbo_tasks::function]
    pub fn parse_json(&self) -> Result<Vc<FileJsonContent>> {
        Ok(self.parse_json_ref().cell())
    }

    #[turbo_tasks::function]
    pub fn parse_json_with_comments(&self) -> Vc<FileJsonContent> {
        self.parse_json_with_comments_ref().cell()
    }

    #[turbo_tasks::function]
    pub fn parse_json5(&self) -> Vc<FileJsonContent> {
        self.parse_json5_ref().cell()
    }

    #[turbo_tasks::function]
    pub fn lines(&self) -> Vc<FileLinesContent> {
        self.lines_ref().cell()
    }

    #[turbo_tasks::function]
    pub async fn hash(&self) -> Result<Vc<u64>> {
        Ok(Vc::cell(hash_xxh3_hash64(self)))
    }

    /// Converts this [`FileContent`] into a [`PersistedFileContent`] by cloning.
    ///
    /// Use this in contexts where the full file content must be serialized to the persistent
    /// task cache (e.g., in [`DiskFileSystem::write`]).
    #[turbo_tasks::function]
    pub fn persist(&self) -> Vc<PersistedFileContent> {
        match self {
            FileContent::Content(file) => PersistedFileContent::Content(file.clone()).cell(),
            FileContent::NotFound => PersistedFileContent::NotFound.cell(),
        }
    }

    /// Compared to [FileContent::hash], this hashes only the bytes of the file content and
    /// nothing else, returning `None` if the file does not exist.
    ///
    /// If `salt` is non-empty it is written into the hasher before the file bytes in a single
    /// pass. An empty salt produces the same result as hashing without a prefix.
    #[turbo_tasks::function]
    pub async fn content_hash(
        &self,
        salt: Vc<RcStr>,
        algorithm: HashAlgorithm,
    ) -> Result<Vc<Option<RcStr>>> {
        match self {
            FileContent::Content(file) => Ok(Vc::cell(Some(
                deterministic_hash(&salt.await?, file.content().content_hash(), algorithm).into(),
            ))),
            FileContent::NotFound => Ok(Vc::cell(None)),
        }
    }
}

/// A file's content interpreted as a JSON value.
#[turbo_tasks::value(shared, serialization = "skip")]
pub enum FileJsonContent {
    Content(Value),
    Unparsable(Box<UnparsableJson>),
    NotFound,
}

#[turbo_tasks::value_impl]
impl ValueToString for FileJsonContent {
    /// Returns the JSON file content as a UTF-8 string.
    ///
    /// This operation will only succeed if the file contents are a valid JSON
    /// value.
    #[turbo_tasks::function]
    fn to_string(&self) -> Result<Vc<RcStr>> {
        match self {
            FileJsonContent::Content(json) => Ok(Vc::cell(json.to_string().into())),
            FileJsonContent::Unparsable(e) => bail!("File is not valid JSON: {}", e),
            FileJsonContent::NotFound => bail!("File not found"),
        }
    }
}

#[turbo_tasks::value_impl]
impl FileJsonContent {
    #[turbo_tasks::function]
    pub async fn content(self: Vc<Self>) -> Result<Vc<Value>> {
        match &*self.await? {
            FileJsonContent::Content(json) => Ok(Vc::cell(json.clone())),
            FileJsonContent::Unparsable(e) => bail!("File is not valid JSON: {}", e),
            FileJsonContent::NotFound => bail!("File not found"),
        }
    }
}
impl FileJsonContent {
    pub fn unparsable(message: RcStr) -> Self {
        FileJsonContent::Unparsable(Box::new(UnparsableJson {
            message,
            path: None,
            start_location: None,
            end_location: None,
        }))
    }

    pub fn unparsable_with_message(message: RcStr) -> Self {
        FileJsonContent::Unparsable(Box::new(UnparsableJson {
            message,
            path: None,
            start_location: None,
            end_location: None,
        }))
    }
}

#[derive(Debug, PartialEq, Eq)]
pub struct FileLine {
    pub content: String,
    pub bytes_offset: u32,
}

impl FileLine {
    pub fn len(&self) -> usize {
        self.content.len()
    }

    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }
}

#[turbo_tasks::value(shared, serialization = "skip")]
pub enum FileLinesContent {
    Lines(#[turbo_tasks(trace_ignore)] Vec<FileLine>),
    Unparsable,
    NotFound,
}

#[derive(Hash, Clone, Debug, PartialEq, Eq, TraceRawVcs, NonLocalValue, Encode, Decode)]
pub enum RawDirectoryEntry {
    File,
    Directory,
    Symlink,
    // Other just means 'not a file, directory, or symlink'
    Other,
}

#[derive(Hash, Clone, Debug, PartialEq, Eq, TraceRawVcs, NonLocalValue, Encode, Decode)]
pub enum DirectoryEntry {
    File(FileSystemPath),
    Directory(FileSystemPath),
    Symlink(FileSystemPath),
    Other(FileSystemPath),
    Error(RcStr),
}

impl DirectoryEntry {
    /// Handles the `DirectoryEntry::Symlink` variant by checking the symlink target
    /// type and replacing it with `DirectoryEntry::File` or
    /// `DirectoryEntry::Directory`.
    pub async fn resolve_symlink(self) -> Result<Self> {
        if let DirectoryEntry::Symlink(symlink) = &self {
            let result = &*symlink.realpath_with_links().await?;
            let real_path = match &result.path_result {
                Ok(path) => path,
                Err(error) => {
                    return Ok(DirectoryEntry::Error(
                        error.as_error_message(symlink, result).await?,
                    ));
                }
            };
            Ok(match *real_path.get_type().await? {
                FileSystemEntryType::Directory => DirectoryEntry::Directory(real_path.clone()),
                FileSystemEntryType::File => DirectoryEntry::File(real_path.clone()),
                // Happens if the link is to a non-existent file
                FileSystemEntryType::NotFound => DirectoryEntry::Error(
                    turbofmt!("Symlink {symlink} points at {real_path} which does not exist")
                        .await?,
                ),
                // This is caused by eventual consistency
                FileSystemEntryType::Symlink => turbobail!(
                    "Symlink {symlink} points at a symlink but realpath_with_links returned a path"
                ),
                _ => self,
            })
        } else {
            Ok(self)
        }
    }

    pub fn path(self) -> Option<FileSystemPath> {
        match self {
            DirectoryEntry::File(path)
            | DirectoryEntry::Directory(path)
            | DirectoryEntry::Symlink(path)
            | DirectoryEntry::Other(path) => Some(path),
            DirectoryEntry::Error(_) => None,
        }
    }
}

#[turbo_tasks::value]
#[derive(Hash, Clone, Copy, Debug)]
pub enum FileSystemEntryType {
    NotFound,
    File,
    Directory,
    Symlink,
    /// These would be things like named pipes, sockets, etc.
    Other,
    Error,
}

impl From<FileType> for FileSystemEntryType {
    fn from(file_type: FileType) -> Self {
        match file_type {
            t if t.is_dir() => FileSystemEntryType::Directory,
            t if t.is_file() => FileSystemEntryType::File,
            t if t.is_symlink() => FileSystemEntryType::Symlink,
            _ => FileSystemEntryType::Other,
        }
    }
}

impl From<DirectoryEntry> for FileSystemEntryType {
    fn from(entry: DirectoryEntry) -> Self {
        FileSystemEntryType::from(&entry)
    }
}

impl From<&DirectoryEntry> for FileSystemEntryType {
    fn from(entry: &DirectoryEntry) -> Self {
        match entry {
            DirectoryEntry::File(_) => FileSystemEntryType::File,
            DirectoryEntry::Directory(_) => FileSystemEntryType::Directory,
            DirectoryEntry::Symlink(_) => FileSystemEntryType::Symlink,
            DirectoryEntry::Other(_) => FileSystemEntryType::Other,
            DirectoryEntry::Error(_) => FileSystemEntryType::Error,
        }
    }
}

impl From<RawDirectoryEntry> for FileSystemEntryType {
    fn from(entry: RawDirectoryEntry) -> Self {
        FileSystemEntryType::from(&entry)
    }
}

impl From<&RawDirectoryEntry> for FileSystemEntryType {
    fn from(entry: &RawDirectoryEntry) -> Self {
        match entry {
            RawDirectoryEntry::File => FileSystemEntryType::File,
            RawDirectoryEntry::Directory => FileSystemEntryType::Directory,
            RawDirectoryEntry::Symlink => FileSystemEntryType::Symlink,
            RawDirectoryEntry::Other => FileSystemEntryType::Other,
        }
    }
}

#[turbo_tasks::value]
#[derive(Debug)]
pub enum RawDirectoryContent {
    // The entry keys are the directory relative file names
    // e.g. for `/bar/foo`, it will be `foo`
    Entries(AutoMap<RcStr, RawDirectoryEntry>),
    NotFound,
}

impl RawDirectoryContent {
    pub fn new(entries: AutoMap<RcStr, RawDirectoryEntry>) -> Vc<Self> {
        Self::cell(RawDirectoryContent::Entries(entries))
    }

    pub fn not_found() -> Vc<Self> {
        Self::cell(RawDirectoryContent::NotFound)
    }
}

#[turbo_tasks::value]
#[derive(Debug)]
pub enum DirectoryContent {
    Entries(AutoMap<RcStr, DirectoryEntry>),
    NotFound,
}

impl DirectoryContent {
    pub fn new(entries: AutoMap<RcStr, DirectoryEntry>) -> Vc<Self> {
        Self::cell(DirectoryContent::Entries(entries))
    }

    pub fn not_found() -> Vc<Self> {
        Self::cell(DirectoryContent::NotFound)
    }
}

#[derive(ValueToString)]
#[value_to_string("null")]
#[turbo_tasks::value(shared)]
pub struct NullFileSystem;

#[turbo_tasks::value_impl]
impl FileSystem for NullFileSystem {
    #[turbo_tasks::function]
    fn read(&self, _fs_path: FileSystemPath) -> Vc<FileContent> {
        FileContent::NotFound.cell()
    }

    #[turbo_tasks::function]
    fn read_link(&self, _fs_path: FileSystemPath) -> Vc<LinkContent> {
        LinkContent::NotFound.cell()
    }

    #[turbo_tasks::function]
    fn raw_read_dir(&self, _fs_path: FileSystemPath) -> Vc<RawDirectoryContent> {
        RawDirectoryContent::not_found()
    }

    #[turbo_tasks::function]
    fn write(&self, _fs_path: FileSystemPath, _content: Vc<FileContent>) {}

    #[turbo_tasks::function]
    fn write_link(&self, _fs_path: FileSystemPath, _target: Vc<LinkContent>) {}

    #[turbo_tasks::function]
    fn metadata(&self, _fs_path: FileSystemPath) -> Vc<FileMeta> {
        FileMeta::default().cell()
    }
}

pub async fn to_sys_path(mut path: FileSystemPath) -> Result<Option<PathBuf>> {
    loop {
        if let Some(fs) = ResolvedVc::try_downcast_type::<AttachedFileSystem>(path.fs) {
            path = fs.get_inner_fs_path(path).owned().await?;
            continue;
        }

        if let Some(fs) = ResolvedVc::try_downcast_type::<DiskFileSystem>(path.fs) {
            let sys_path = fs.await?.to_sys_path(&path);
            return Ok(Some(sys_path));
        }

        return Ok(None);
    }
}

#[turbo_tasks::function]
async fn read_dir(path: FileSystemPath) -> Result<Vc<DirectoryContent>> {
    let fs = path.fs().to_resolved().await?;
    match &*fs.raw_read_dir(path.clone()).await? {
        RawDirectoryContent::NotFound => Ok(DirectoryContent::not_found()),
        RawDirectoryContent::Entries(entries) => {
            let mut normalized_entries = AutoMap::new();
            let dir_path = &path.path;
            for (name, entry) in entries {
                // Construct the path directly instead of going through `join`.
                // We do not need to normalize since the `name` is guaranteed to be a simple
                // path segment.
                let path = if dir_path.is_empty() {
                    name.clone()
                } else {
                    RcStr::from(format!("{dir_path}/{name}"))
                };

                let entry_path = FileSystemPath::new_normalized(fs, path);
                let entry = match entry {
                    RawDirectoryEntry::File => DirectoryEntry::File(entry_path),
                    RawDirectoryEntry::Directory => DirectoryEntry::Directory(entry_path),
                    RawDirectoryEntry::Symlink => DirectoryEntry::Symlink(entry_path),
                    RawDirectoryEntry::Other => DirectoryEntry::Other(entry_path),
                };
                normalized_entries.insert(name.clone(), entry);
            }
            Ok(DirectoryContent::new(normalized_entries))
        }
    }
}

#[turbo_tasks::function]
async fn get_type(path: FileSystemPath) -> Result<Vc<FileSystemEntryType>> {
    if path.is_root() {
        return Ok(FileSystemEntryType::Directory.cell());
    }
    let parent = path.parent();
    let dir_content = parent.raw_read_dir().await?;
    match &*dir_content {
        RawDirectoryContent::NotFound => Ok(FileSystemEntryType::NotFound.cell()),
        RawDirectoryContent::Entries(entries) => {
            let (_, file_name) = path.split_file_name();
            if let Some(entry) = entries.get(file_name) {
                Ok(FileSystemEntryType::from(entry).cell())
            } else {
                Ok(FileSystemEntryType::NotFound.cell())
            }
        }
    }
}

#[turbo_tasks::function]
async fn realpath_with_links(path: FileSystemPath) -> Result<Vc<RealPathResult>> {
    let mut current_path = path;
    let mut symlinks: IndexSet<FileSystemPath> = IndexSet::new();
    let mut visited: AutoSet<RcStr> = AutoSet::new();
    let mut error = RealPathResultError::TooManySymlinks;
    // Pick some arbitrary symlink depth limit... similar to the ELOOP logic for realpath(3).
    // SYMLOOP_MAX is 40 for Linux: https://unix.stackexchange.com/q/721724
    for _i in 0..40 {
        if current_path.is_root() {
            // fast path
            return Ok(RealPathResult {
                path_result: Ok(current_path),
                symlinks: symlinks.into_iter().collect(),
            }
            .cell());
        }

        if !visited.insert(current_path.path.clone()) {
            error = RealPathResultError::CycleDetected;
            break; // we detected a cycle
        }

        // see if a parent segment of the path is a symlink and resolve that first
        let parent = current_path.parent();
        let parent_result = parent.realpath_with_links().owned().await?;
        let basename = current_path
            .path
            .rsplit_once('/')
            .map_or(current_path.path.as_str(), |(_, name)| name);
        symlinks.extend(parent_result.symlinks);
        let parent_path = match parent_result.path_result {
            Ok(path) => {
                if path != parent {
                    current_path = path.join(basename)?;
                }
                path
            }
            Err(parent_error) => {
                error = parent_error;
                break;
            }
        };

        // use `get_type` before trying `read_link`, as there's a good chance of a cache hit on
        // `get_type`, and `read_link` isn't the common codepath.
        if !matches!(
            *current_path.get_type().await?,
            FileSystemEntryType::Symlink
        ) {
            return Ok(RealPathResult {
                path_result: Ok(current_path),
                symlinks: symlinks.into_iter().collect(), // convert set to vec
            }
            .cell());
        }

        match &*current_path.read_link().await? {
            LinkContent::Link { target, link_type } => {
                symlinks.insert(current_path.clone());
                current_path = if link_type.contains(LinkType::ABSOLUTE) {
                    current_path.root().owned().await?
                } else {
                    parent_path
                }
                .join(target)?;
            }
            LinkContent::NotFound => {
                error = RealPathResultError::NotFound;
                break;
            }
            LinkContent::Invalid => {
                error = RealPathResultError::Invalid;
                break;
            }
        }
    }

    // Too many attempts or detected a cycle, we bailed out!
    //
    // TODO: There's no proper way to indicate an non-turbo-tasks error here, so just return the
    // original path and all the symlinks we followed.
    //
    // Returning the followed symlinks is still important, even if there is an error! Otherwise
    // we may never notice if the symlink loop is fixed.
    Ok(RealPathResult {
        path_result: Err(error),
        symlinks: symlinks.into_iter().collect(),
    }
    .cell())
}

/// Wrapper to convert [`anyhow::Error`] to `impl std::error::Error` for use in [`Effect::apply`].
// TODO(bgw): use a structured error type instead of anyhow for write/write_link
#[derive(TraceRawVcs, NonLocalValue)]
pub(crate) struct AnyhowWrapper(anyhow::Error);

impl fmt::Display for AnyhowWrapper {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt::Display::fmt(&self.0, f)
    }
}

impl fmt::Debug for AnyhowWrapper {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt::Debug::fmt(&self.0, f)
    }
}

impl StdError for AnyhowWrapper {
    fn source(&self) -> Option<&(dyn StdError + 'static)> {
        self.0.source()
    }
}

impl From<anyhow::Error> for AnyhowWrapper {
    fn from(err: anyhow::Error) -> Self {
        AnyhowWrapper(err)
    }
}

#[cfg(test)]
mod tests {
    use turbo_rcstr::rcstr;
    use turbo_tasks::{Effects, OperationVc, Vc, take_effects};
    use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};

    use super::*;

    #[turbo_tasks::function(operation)]
    async fn extract_effects_operation(op: OperationVc<()>) -> anyhow::Result<Vc<Effects>> {
        let _ = op.resolve().strongly_consistent().await?;
        Ok(take_effects(op).await?.cell())
    }

    #[test]
    fn test_get_relative_path_to() {
        assert_eq!(get_relative_path_to("a/b/c", "a/b/c").as_str(), ".");
        assert_eq!(get_relative_path_to("a/c/d", "a/b/c").as_str(), "../../b/c");
        assert_eq!(get_relative_path_to("", "a/b/c").as_str(), "./a/b/c");
        assert_eq!(get_relative_path_to("a/b/c", "").as_str(), "../../..");
        assert_eq!(
            get_relative_path_to("a/b/c", "c/b/a").as_str(),
            "../../../c/b/a"
        );
        assert_eq!(
            get_relative_path_to("file:///a/b/c", "file:///c/b/a").as_str(),
            "../../../c/b/a"
        );
    }

    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn with_extension() {
        turbo_tasks_testing::VcStorage::with(async {
            let fs = Vc::upcast::<Box<dyn FileSystem>>(VirtualFileSystem::new())
                .to_resolved()
                .await?;

            let path_txt = FileSystemPath::new_normalized(fs, rcstr!("foo/bar.txt"));

            let path_json = path_txt.with_extension("json");
            assert_eq!(&*path_json.path, "foo/bar.json");

            let path_no_ext = path_txt.with_extension("");
            assert_eq!(&*path_no_ext.path, "foo/bar");

            let path_new_ext = path_no_ext.with_extension("json");
            assert_eq!(&*path_new_ext.path, "foo/bar.json");

            let path_no_slash_txt = FileSystemPath::new_normalized(fs, rcstr!("bar.txt"));

            let path_no_slash_json = path_no_slash_txt.with_extension("json");
            assert_eq!(path_no_slash_json.path.as_str(), "bar.json");

            let path_no_slash_no_ext = path_no_slash_txt.with_extension("");
            assert_eq!(path_no_slash_no_ext.path.as_str(), "bar");

            let path_no_slash_new_ext = path_no_slash_no_ext.with_extension("json");
            assert_eq!(path_no_slash_new_ext.path.as_str(), "bar.json");

            anyhow::Ok(())
        })
        .await
        .unwrap()
    }

    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn file_stem() {
        turbo_tasks_testing::VcStorage::with(async {
            let fs = Vc::upcast::<Box<dyn FileSystem>>(VirtualFileSystem::new())
                .to_resolved()
                .await?;

            let path = FileSystemPath::new_normalized(fs, rcstr!(""));
            assert_eq!(path.file_stem(), None);

            let path = FileSystemPath::new_normalized(fs, rcstr!("foo/bar.txt"));
            assert_eq!(path.file_stem(), Some("bar"));

            let path = FileSystemPath::new_normalized(fs, rcstr!("bar.txt"));
            assert_eq!(path.file_stem(), Some("bar"));

            let path = FileSystemPath::new_normalized(fs, rcstr!("foo/bar"));
            assert_eq!(path.file_stem(), Some("bar"));

            let path = FileSystemPath::new_normalized(fs, rcstr!("foo/.bar"));
            assert_eq!(path.file_stem(), Some(".bar"));

            anyhow::Ok(())
        })
        .await
        .unwrap()
    }

    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn test_try_from_sys_path() {
        let sys_root = if cfg!(windows) {
            Path::new(r"C:\fake\root")
        } else {
            Path::new(r"/fake/root")
        };

        let tt = turbo_tasks::TurboTasks::new(TurboTasksBackend::new(
            BackendOptions::default(),
            noop_backing_storage(),
        ));
        tt.run_once(async {
            assert_try_from_sys_path_operation(RcStr::from(sys_root.to_str().unwrap()))
                .read_strongly_consistent()
                .await?;

            anyhow::Ok(())
        })
        .await
        .unwrap();
    }

    #[turbo_tasks::function(operation)]
    async fn assert_try_from_sys_path_operation(sys_root: RcStr) -> anyhow::Result<()> {
        let sys_root = Path::new(sys_root.as_str());
        let fs_vc = DiskFileSystem::new(
            rcstr!("temp"),
            Vc::cell(RcStr::from(sys_root.to_str().unwrap())),
        )
        .to_resolved()
        .await?;
        let fs = fs_vc.await?;
        let fs_root_path = fs_vc.root().await?;

        assert_eq!(
            fs.try_from_sys_path(
                fs_vc,
                &Path::new("relative").join("directory"),
                /* relative_to */ None,
            )
            .unwrap()
            .path,
            "relative/directory"
        );

        assert_eq!(
            fs.try_from_sys_path(
                fs_vc,
                &sys_root
                    .join("absolute")
                    .join("directory")
                    .join("..")
                    .join("normalized_path"),
                /* relative_to */ Some(&fs_root_path.join("ignored").unwrap()),
            )
            .unwrap()
            .path,
            "absolute/normalized_path"
        );

        assert_eq!(
            fs.try_from_sys_path(
                fs_vc,
                Path::new("child"),
                /* relative_to */ Some(&fs_root_path.join("parent").unwrap()),
            )
            .unwrap()
            .path,
            "parent/child"
        );

        assert_eq!(
            fs.try_from_sys_path(
                fs_vc,
                &Path::new("..").join("parallel_dir"),
                /* relative_to */ Some(&fs_root_path.join("parent").unwrap()),
            )
            .unwrap()
            .path,
            "parallel_dir"
        );

        assert_eq!(
            fs.try_from_sys_path(
                fs_vc,
                &Path::new("relative")
                    .join("..")
                    .join("..")
                    .join("leaves_root"),
                /* relative_to */ None,
            ),
            None
        );

        assert_eq!(
            fs.try_from_sys_path(
                fs_vc,
                &sys_root
                    .join("absolute")
                    .join("..")
                    .join("..")
                    .join("leaves_root"),
                /* relative_to */ None,
            ),
            None
        );

        Ok(())
    }

    #[cfg(test)]
    mod symlink_tests {
        use std::{
            fs::{File, create_dir_all, read_to_string},
            io::Write,
        };

        use rand::{RngExt, SeedableRng};
        use turbo_rcstr::{RcStr, rcstr};
        use turbo_tasks::{ResolvedVc, Vc};
        use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};

        use super::extract_effects_operation;
        use crate::{DiskFileSystem, FileSystem, FileSystemPath, LinkContent, LinkType};

        #[turbo_tasks::function(operation)]
        async fn test_write_link_effect_operation(
            fs: ResolvedVc<DiskFileSystem>,
            path: FileSystemPath,
            target: RcStr,
        ) -> anyhow::Result<()> {
            let write_file = |f| {
                fs.write_link(
                    f,
                    LinkContent::Link {
                        target: format!("{target}/data.txt").into(),
                        link_type: LinkType::empty(),
                    }
                    .cell(),
                )
            };
            // Write it twice (same content)
            write_file(path.join("symlink-file")?).await?;
            write_file(path.join("symlink-file")?).await?;

            let write_dir = |f| {
                fs.write_link(
                    f,
                    LinkContent::Link {
                        target: target.clone(),
                        link_type: LinkType::DIRECTORY,
                    }
                    .cell(),
                )
            };
            // Write it twice (same content)
            write_dir(path.join("symlink-dir")?).await?;
            write_dir(path.join("symlink-dir")?).await?;

            Ok(())
        }

        #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
        async fn test_write_link() {
            let scratch = tempfile::tempdir().unwrap();
            let path = scratch.path().to_owned();

            create_dir_all(path.join("subdir-a")).unwrap();
            File::create_new(path.join("subdir-a/data.txt"))
                .unwrap()
                .write_all(b"foo")
                .unwrap();
            create_dir_all(path.join("subdir-b")).unwrap();
            File::create_new(path.join("subdir-b/data.txt"))
                .unwrap()
                .write_all(b"bar")
                .unwrap();
            let root = path.to_str().unwrap().into();

            let tt = turbo_tasks::TurboTasks::new(TurboTasksBackend::new(
                BackendOptions::default(),
                noop_backing_storage(),
            ));

            tt.run_once(async move {
                let fs = disk_file_system_operation(root)
                    .resolve()
                    .strongly_consistent()
                    .await?;
                let root_path = disk_file_system_root(fs);

                extract_effects_operation(test_write_link_effect_operation(
                    fs,
                    root_path.clone(),
                    rcstr!("subdir-a"),
                ))
                .read_strongly_consistent()
                .await?
                .apply()
                .await?;

                assert_eq!(read_to_string(path.join("symlink-file")).unwrap(), "foo");
                assert_eq!(
                    read_to_string(path.join("symlink-dir/data.txt")).unwrap(),
                    "foo"
                );

                // Write the same links again but with different targets
                extract_effects_operation(test_write_link_effect_operation(
                    fs,
                    root_path,
                    rcstr!("subdir-b"),
                ))
                .read_strongly_consistent()
                .await?
                .apply()
                .await?;

                assert_eq!(read_to_string(path.join("symlink-file")).unwrap(), "bar");
                assert_eq!(
                    read_to_string(path.join("symlink-dir/data.txt")).unwrap(),
                    "bar"
                );

                anyhow::Ok(())
            })
            .await
            .unwrap();
        }

        const STRESS_ITERATIONS: usize = 100;
        const STRESS_PARALLELISM: usize = 8;
        const STRESS_TARGET_COUNT: usize = 20;
        const STRESS_SYMLINK_COUNT: usize = 16;

        #[turbo_tasks::function(operation)]
        fn disk_file_system_operation(fs_root: RcStr) -> Vc<DiskFileSystem> {
            DiskFileSystem::new(rcstr!("test"), Vc::cell(fs_root))
        }

        fn disk_file_system_root(fs: ResolvedVc<DiskFileSystem>) -> FileSystemPath {
            FileSystemPath {
                fs: ResolvedVc::upcast(fs),
                path: rcstr!(""),
            }
        }

        #[turbo_tasks::function(operation)]
        async fn write_symlink_stress_batch(
            fs: ResolvedVc<DiskFileSystem>,
            symlinks_dir: FileSystemPath,
            updates: Vec<(usize, usize)>,
        ) -> anyhow::Result<()> {
            use turbo_tasks::TryJoinIterExt;

            updates
                .into_iter()
                .map(|(symlink_idx, target_idx)| {
                    let target = RcStr::from(format!("../_targets/{target_idx}"));
                    let symlink_path = symlinks_dir.join(&symlink_idx.to_string()).unwrap();
                    async move {
                        fs.write_link(
                            symlink_path,
                            LinkContent::Link {
                                target,
                                link_type: LinkType::DIRECTORY,
                            }
                            .cell(),
                        )
                        .await
                    }
                })
                .try_join()
                .await?;
            Ok(())
        }

        #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
        async fn test_symlink_stress() {
            let scratch = tempfile::tempdir().unwrap();
            let path = scratch.path().to_owned();

            let targets_dir = path.join("_targets");
            create_dir_all(&targets_dir).unwrap();
            for i in 0..STRESS_TARGET_COUNT {
                create_dir_all(targets_dir.join(i.to_string())).unwrap();
            }
            create_dir_all(path.join("_symlinks")).unwrap();

            let root = RcStr::from(path.to_str().unwrap());

            let tt = turbo_tasks::TurboTasks::new(TurboTasksBackend::new(
                BackendOptions::default(),
                noop_backing_storage(),
            ));

            tt.run_once(async move {
                let fs = disk_file_system_operation(root)
                    .resolve()
                    .strongly_consistent()
                    .await?;
                let root_path = disk_file_system_root(fs);
                let symlinks_dir = root_path.join("_symlinks")?;

                let initial_updates: Vec<(usize, usize)> =
                    (0..STRESS_SYMLINK_COUNT).map(|i| (i, 0)).collect();
                extract_effects_operation(write_symlink_stress_batch(
                    fs,
                    symlinks_dir.clone(),
                    initial_updates,
                ))
                .read_strongly_consistent()
                .await?
                .apply()
                .await?;

                let mut rng = rand::rngs::SmallRng::seed_from_u64(0);
                for _ in 0..STRESS_ITERATIONS {
                    let mut updates_map = rustc_hash::FxHashMap::default();
                    for _ in 0..STRESS_PARALLELISM {
                        let symlink_idx = rng.random_range(0..STRESS_SYMLINK_COUNT);
                        let target_idx = rng.random_range(0..STRESS_TARGET_COUNT);
                        updates_map.insert(symlink_idx, target_idx);
                    }
                    let updates: Vec<(usize, usize)> = updates_map.into_iter().collect();

                    extract_effects_operation(write_symlink_stress_batch(
                        fs,
                        symlinks_dir.clone(),
                        updates,
                    ))
                    .read_strongly_consistent()
                    .await?
                    .apply()
                    .await?;
                }

                anyhow::Ok(())
            })
            .await
            .unwrap();

            tt.stop_and_wait().await;
        }
    }

    // Tests helpers for denied_path tests
    #[cfg(test)]
    mod denied_path_tests {
        use std::{
            fs::{File, create_dir_all, read_to_string},
            io::Write,
            path::Path,
        };

        use turbo_rcstr::{RcStr, rcstr};
        use turbo_tasks::{Effects, Vc, take_effects};
        use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};

        use crate::{
            DirectoryContent, DiskFileSystem, File as TurboFile, FileContent, FileSystem,
            FileSystemPath,
            glob::{Glob, GlobOptions},
        };

        /// Helper to set up a test filesystem with denied_path
        /// Creates the filesystem structure on disk and returns paths
        fn setup_test_fs() -> (tempfile::TempDir, RcStr, RcStr) {
            let scratch = tempfile::tempdir().unwrap();
            let path = scratch.path();

            // Create standard test structure:
            // /allowed_file.txt
            // /allowed_dir/file.txt
            // /other_file.txt
            // /denied_dir/secret.txt
            // /denied_dir/nested/deep.txt
            File::create_new(path.join("allowed_file.txt"))
                .unwrap()
                .write_all(b"allowed content")
                .unwrap();

            create_dir_all(path.join("allowed_dir")).unwrap();
            File::create_new(path.join("allowed_dir/file.txt"))
                .unwrap()
                .write_all(b"allowed dir content")
                .unwrap();

            File::create_new(path.join("other_file.txt"))
                .unwrap()
                .write_all(b"other content")
                .unwrap();

            create_dir_all(path.join("denied_dir/nested")).unwrap();
            File::create_new(path.join("denied_dir/secret.txt"))
                .unwrap()
                .write_all(b"secret content")
                .unwrap();
            File::create_new(path.join("denied_dir/nested/deep.txt"))
                .unwrap()
                .write_all(b"deep secret")
                .unwrap();

            let root = RcStr::from(path.to_str().unwrap());
            // denied_path should be relative to root, using unix separators
            let denied_path = rcstr!("denied_dir");

            (scratch, root, denied_path)
        }

        #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
        async fn test_denied_path_read() {
            #[turbo_tasks::function(operation)]
            async fn test_operation(root: RcStr, denied_path: RcStr) -> anyhow::Result<()> {
                let fs = DiskFileSystem::new_with_denied_paths(
                    rcstr!("test"),
                    Vc::cell(root),
                    vec![denied_path],
                );
                let root_path = fs.root().await?;

                // Test 1: Reading allowed file should work
                let allowed_file = root_path.join("allowed_file.txt")?;
                let content = allowed_file.read().await?;
                assert!(
                    matches!(&*content, FileContent::Content(_)),
                    "allowed file should be readable"
                );

                // Test 2: Direct read of denied file should return NotFound
                let denied_file = root_path.join("denied_dir/secret.txt")?;
                let content = denied_file.read().await?;
                assert!(
                    matches!(&*content, FileContent::NotFound),
                    "denied file should return NotFound, got {:?}",
                    content
                );

                // Test 3: Reading nested denied file should return NotFound
                let nested_denied = root_path.join("denied_dir/nested/deep.txt")?;
                let content = nested_denied.read().await?;
                assert!(
                    matches!(&*content, FileContent::NotFound),
                    "nested denied file should return NotFound"
                );

                // Test 4: Reading the denied directory itself should return NotFound
                let denied_dir = root_path.join("denied_dir")?;
                let content = denied_dir.read().await?;
                assert!(
                    matches!(&*content, FileContent::NotFound),
                    "denied directory should return NotFound"
                );

                Ok(())
            }

            let (_scratch, root, denied_path) = setup_test_fs();
            let tt = turbo_tasks::TurboTasks::new(TurboTasksBackend::new(
                BackendOptions::default(),
                noop_backing_storage(),
            ));
            tt.run_once(async {
                test_operation(root, denied_path)
                    .read_strongly_consistent()
                    .await?;

                anyhow::Ok(())
            })
            .await
            .unwrap();
        }

        #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
        async fn test_denied_path_read_dir() {
            #[turbo_tasks::function(operation)]
            async fn test_operation(root: RcStr, denied_path: RcStr) -> anyhow::Result<()> {
                let fs = DiskFileSystem::new_with_denied_paths(
                    rcstr!("test"),
                    Vc::cell(root),
                    vec![denied_path],
                );
                let root_path = fs.root().await?;

                // Test: read_dir on root should not include denied_dir
                let dir_content = root_path.read_dir().await?;
                match &*dir_content {
                    DirectoryContent::Entries(entries) => {
                        assert!(
                            entries.contains_key(&rcstr!("allowed_dir")),
                            "allowed_dir should be visible"
                        );
                        assert!(
                            entries.contains_key(&rcstr!("other_file.txt")),
                            "other_file.txt should be visible"
                        );
                        assert!(
                            entries.contains_key(&rcstr!("allowed_file.txt")),
                            "allowed_file.txt should be visible"
                        );
                        assert!(
                            !entries.contains_key(&rcstr!("denied_dir")),
                            "denied_dir should NOT be visible in read_dir"
                        );
                    }
                    DirectoryContent::NotFound => panic!("root directory should exist"),
                }

                // Test: read_dir on denied_dir should return NotFound
                let denied_dir = root_path.join("denied_dir")?;
                let dir_content = denied_dir.read_dir().await?;
                assert!(
                    matches!(&*dir_content, DirectoryContent::NotFound),
                    "denied_dir read_dir should return NotFound"
                );

                Ok(())
            }

            let (_scratch, root, denied_path) = setup_test_fs();
            let tt = turbo_tasks::TurboTasks::new(TurboTasksBackend::new(
                BackendOptions::default(),
                noop_backing_storage(),
            ));
            tt.run_once(async {
                test_operation(root, denied_path)
                    .read_strongly_consistent()
                    .await?;

                anyhow::Ok(())
            })
            .await
            .unwrap();
        }

        #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
        async fn test_denied_path_read_glob() {
            #[turbo_tasks::function(operation)]
            async fn test_operation(root: RcStr, denied_path: RcStr) -> anyhow::Result<()> {
                let fs = DiskFileSystem::new_with_denied_paths(
                    rcstr!("test"),
                    Vc::cell(root),
                    vec![denied_path],
                );
                let root_path = fs.root().await?;

                // Test: read_glob with ** should not reveal denied files
                let glob_result = root_path
                    .read_glob(Glob::new(rcstr!("**/*.txt"), GlobOptions::default()))
                    .await?;

                // Check top level results
                assert!(
                    glob_result.results.contains_key("allowed_file.txt"),
                    "allowed_file.txt should be found"
                );
                assert!(
                    glob_result.results.contains_key("other_file.txt"),
                    "other_file.txt should be found"
                );
                assert!(
                    !glob_result.results.contains_key("denied_dir"),
                    "denied_dir should NOT appear in glob results"
                );

                // Check that denied_dir doesn't appear in inner results
                assert!(
                    !glob_result.inner.contains_key("denied_dir"),
                    "denied_dir should NOT appear in glob inner results"
                );

                // Verify allowed_dir is present (to ensure we're not filtering everything)
                assert!(
                    glob_result.inner.contains_key("allowed_dir"),
                    "allowed_dir directory should be present"
                );
                let sub_inner = glob_result.inner.get("allowed_dir").unwrap().await?;
                assert!(
                    sub_inner.results.contains_key("file.txt"),
                    "allowed_dir/file.txt should be found"
                );

                Ok(())
            }

            let (_scratch, root, denied_path) = setup_test_fs();
            let tt = turbo_tasks::TurboTasks::new(TurboTasksBackend::new(
                BackendOptions::default(),
                noop_backing_storage(),
            ));
            tt.run_once(async {
                test_operation(root, denied_path)
                    .read_strongly_consistent()
                    .await?;

                anyhow::Ok(())
            })
            .await
            .unwrap();
        }

        #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
        async fn test_denied_path_write() {
            #[turbo_tasks::function(operation)]
            async fn write_file_operation(
                path: FileSystemPath,
                contents: RcStr,
            ) -> anyhow::Result<()> {
                path.write(
                    FileContent::Content(TurboFile::from_bytes(contents.to_string().into_bytes()))
                        .cell(),
                )
                .await?;
                Ok(())
            }

            /// Writes the allowed file and captures effects to be applied at
            /// the top level.
            #[turbo_tasks::function(operation)]
            async fn write_allowed_file_operation(
                root: RcStr,
                denied_path: RcStr,
                file_path: RcStr,
                contents: RcStr,
            ) -> anyhow::Result<Vc<Effects>> {
                let fs = DiskFileSystem::new_with_denied_paths(
                    rcstr!("test"),
                    Vc::cell(root),
                    vec![denied_path],
                );
                let root_path = fs.root().await?;
                let allowed_file = root_path.join(&file_path)?;
                let write_op = write_file_operation(allowed_file, contents);
                write_op.read_strongly_consistent().await?;
                Ok(take_effects(write_op).await?.cell())
            }

            #[turbo_tasks::function(operation)]
            async fn test_denied_writes_operation(
                root: RcStr,
                denied_path: RcStr,
                denied_file: RcStr,
                nested_denied_file: RcStr,
            ) -> anyhow::Result<()> {
                let fs = DiskFileSystem::new_with_denied_paths(
                    rcstr!("test"),
                    Vc::cell(root),
                    vec![denied_path],
                );
                let root_path = fs.root().await?;

                let path = root_path.join(&denied_file)?;
                let result = write_file_operation(path, rcstr!("forbidden"))
                    .read_strongly_consistent()
                    .await;
                assert!(
                    result.is_err(),
                    "writing to denied path should return an error"
                );

                let path = root_path.join(&nested_denied_file)?;
                let result = write_file_operation(path, rcstr!("nested"))
                    .read_strongly_consistent()
                    .await;
                assert!(
                    result.is_err(),
                    "writing to nested denied path should return an error"
                );

                Ok(())
            }

            let (_scratch, root, denied_path) = setup_test_fs();
            let tt = turbo_tasks::TurboTasks::new(TurboTasksBackend::new(
                BackendOptions::default(),
                noop_backing_storage(),
            ));
            tt.run_once(async {
                const ALLOWED_FILE: &str = "allowed_dir/new_file.txt";
                const TEST_CONTENT: &str = "test content";

                // Test 1: Writing to allowed directory should work
                let effects = write_allowed_file_operation(
                    root.clone(),
                    denied_path.clone(),
                    RcStr::from(ALLOWED_FILE),
                    RcStr::from(TEST_CONTENT),
                )
                .read_strongly_consistent()
                .await?;
                effects.apply().await?;

                // Verify the file was written to disk
                let content = read_to_string(Path::new(root.as_str()).join(ALLOWED_FILE))?;
                assert_eq!(content, TEST_CONTENT, "allowed file write should succeed");

                // Tests 2 & 3: Writing to denied paths should fail
                test_denied_writes_operation(
                    root,
                    denied_path,
                    RcStr::from("denied_dir/forbidden.txt"),
                    RcStr::from("denied_dir/nested/file.txt"),
                )
                .read_strongly_consistent()
                .await?;

                anyhow::Ok(())
            })
            .await
            .unwrap();
        }
    }
}
Quest for Codev2.0.0
/
SIGN IN