next.js/turbopack/crates/turbopack-dev-server/src/http.rs
http.rs282 lines11.0 KB
use std::{io::Read, iter};

use anyhow::{Result, anyhow};
use auto_hash_map::AutoSet;
use flate2::{Compression, bufread::GzEncoder};
use futures::{StreamExt, TryStreamExt, stream};
use hyper::{
    Request, Response,
    header::{CONTENT_ENCODING, CONTENT_LENGTH, HeaderName},
    http::HeaderValue,
};
use mime::Mime;
use turbo_tasks::{
    CollectiblesSource, Effects, OperationVc, ReadRef, ResolvedVc, TransientInstance, Vc,
    take_effects, util::SharedError,
};
use turbo_tasks_bytes::Bytes;
use turbo_tasks_fs::FileContent;
use turbopack_core::{
    asset::AssetContent,
    issue::{IssueReporter, IssueSeverity, handle_issues},
    version::VersionedContent,
};

use crate::source::{
    Body, ContentSource, ContentSourceSideEffect, HeaderList, ProxyResult,
    request::SourceRequest,
    resolve::{ResolveSourceRequestResult, resolve_source_request},
};

#[turbo_tasks::value(serialization = "skip")]
enum GetFromSourceResult {
    Static {
        content: ReadRef<FileContent>,
        status_code: u16,
        headers: ReadRef<HeaderList>,
        header_overwrites: ReadRef<HeaderList>,
    },
    HttpProxy(ReadRef<ProxyResult>),
    NotFound,
}

/// Resolves a [SourceRequest] within a [super::ContentSource], returning the
/// corresponding content as a
#[turbo_tasks::function(operation)]
async fn get_from_source_operation(
    source: OperationVc<Box<dyn ContentSource>>,
    request: TransientInstance<SourceRequest>,
) -> Result<Vc<GetFromSourceResult>> {
    Ok(
        match &*resolve_source_request(source, request).connect().await? {
            ResolveSourceRequestResult::Static(static_content_vc, header_overwrites) => {
                let static_content = static_content_vc.await?;
                if let AssetContent::File(file) = &*static_content.content.content().await? {
                    GetFromSourceResult::Static {
                        content: file.await?,
                        status_code: static_content.status_code,
                        headers: static_content.headers.await?,
                        header_overwrites: header_overwrites.await?,
                    }
                } else {
                    GetFromSourceResult::NotFound
                }
            }
            ResolveSourceRequestResult::HttpProxy(proxy) => {
                GetFromSourceResult::HttpProxy(proxy.connect().await?)
            }
            ResolveSourceRequestResult::NotFound => GetFromSourceResult::NotFound,
        }
        .cell(),
    )
}

#[turbo_tasks::value(serialization = "skip")]
struct GetFromSourceResultWithCollectibles {
    result: ReadRef<GetFromSourceResult>,
    effects: Effects,
    content_source_side_effects: AutoSet<ResolvedVc<Box<dyn ContentSourceSideEffect>>>,
}

#[turbo_tasks::function(operation)]
async fn get_from_source_with_collectibles_operation(
    source_op: OperationVc<Box<dyn ContentSource>>,
    request: TransientInstance<SourceRequest>,
) -> Result<Vc<GetFromSourceResultWithCollectibles>> {
    let op = get_from_source_operation(source_op, request);
    let result = op.read_strongly_consistent().await?;
    Ok(GetFromSourceResultWithCollectibles {
        result,
        effects: take_effects(op).await?,
        content_source_side_effects: op.peek_collectibles(),
    }
    .cell())
}

/// Processes an HTTP request within a given content source and returns the response.
pub async fn process_request_with_content_source(
    source: OperationVc<Box<dyn ContentSource>>,
    request: Request<hyper::Body>,
    issue_reporter: Vc<Box<dyn IssueReporter>>,
) -> Result<(
    Response<hyper::Body>,
    AutoSet<ResolvedVc<Box<dyn ContentSourceSideEffect>>>,
)> {
    let original_path = request.uri().path().to_string();
    let request = http_request_to_source_request(request).await?;
    let wrapper_op =
        get_from_source_with_collectibles_operation(source, TransientInstance::new(request));
    let GetFromSourceResultWithCollectibles {
        result,
        effects,
        content_source_side_effects,
    } = &*wrapper_op.read_strongly_consistent().await?;
    effects.apply().await?;
    handle_issues(
        wrapper_op,
        issue_reporter,
        IssueSeverity::Fatal,
        Some(&original_path),
        Some("get_from_source_operation"),
    )
    .await?;
    match &**result {
        GetFromSourceResult::Static {
            content,
            status_code,
            headers,
            header_overwrites,
        } => {
            if let FileContent::Content(file) = &**content {
                let mut response = Response::builder().status(*status_code);

                let header_map = response.headers_mut().expect("headers must be defined");

                for (header_name, header_value) in headers {
                    header_map.append(
                        HeaderName::try_from(header_name.as_str())?,
                        hyper::header::HeaderValue::try_from(header_value.as_str())?,
                    );
                }

                for (header_name, header_value) in header_overwrites.iter() {
                    header_map.insert(
                        HeaderName::try_from(header_name.as_str())?,
                        hyper::header::HeaderValue::try_from(header_value.as_str())?,
                    );
                }

                // naively checking if content is `compressible`.
                let mut should_compress = false;
                let should_compress_predicate = |mime: &Mime| {
                    matches!(
                        (mime.type_(), mime.subtype(), mime.suffix()),
                        (_, mime::PLAIN, _)
                            | (_, mime::JSON, _)
                            | (mime::TEXT, _, _)
                            | (mime::APPLICATION, mime::XML, _)
                            | (mime::APPLICATION, mime::JAVASCRIPT, _)
                            | (_, _, Some(mime::XML))
                            | (_, _, Some(mime::JSON))
                            | (_, _, Some(mime::TEXT))
                    )
                };

                if let Some(content_type) = file.content_type() {
                    header_map.append(
                        "content-type",
                        hyper::header::HeaderValue::try_from(content_type.to_string())?,
                    );

                    should_compress = should_compress_predicate(content_type);
                } else if let hyper::header::Entry::Vacant(entry) = header_map.entry("content-type")
                {
                    let guess = mime_guess::from_path(&original_path).first_or_octet_stream();
                    should_compress = should_compress_predicate(&guess);
                    // If a text type, application/javascript, or application/json was
                    // guessed, use a utf-8 charset as we most likely generated it as
                    // such.
                    entry.insert(hyper::header::HeaderValue::try_from(
                        if (guess.type_() == mime::TEXT
                            || guess.subtype() == mime::JAVASCRIPT
                            || guess.subtype() == mime::JSON)
                            && guess.get_param("charset").is_none()
                        {
                            guess.to_string() + "; charset=utf-8"
                        } else {
                            guess.to_string()
                        },
                    )?);
                }

                if !header_map.contains_key("cache-control") {
                    // The dev server contents might change at any time, we can't cache them.
                    header_map.append(
                        "cache-control",
                        hyper::header::HeaderValue::try_from("must-revalidate")?,
                    );
                }

                let content = file.content();
                let response = if should_compress {
                    header_map.insert(CONTENT_ENCODING, HeaderValue::from_static("gzip"));

                    // Hyper requires an owned reader... We could do this with streaming by cloning
                    // each `Bytes` and implementing `BufRead` for `Iterator<bytes::Bytes>`, but
                    // it's not really worth it, just compressing the whole thing up-front is fine.
                    //
                    // Use fast compression, since we're likely just tranferring data over
                    // localhost.
                    let mut gz_bytes = Vec::new();
                    GzEncoder::new(content.read(), Compression::fast())
                        .read_to_end(&mut gz_bytes)
                        .expect("read of Rope should never fail");
                    response.body(hyper::Body::wrap_stream(stream::iter(iter::once(
                        hyper::Result::Ok(gz_bytes),
                    ))))?
                } else {
                    // hyper requires an owned stream, so we must clone the iterator items
                    // this is relatively cheap: each chunk is a `Bytes`, so `Clone` updates a
                    // refcount
                    let owned_chunks: Vec<_> =
                        content.read().cloned().map(hyper::Result::Ok).collect();
                    header_map.insert(
                        CONTENT_LENGTH,
                        hyper::header::HeaderValue::try_from(content.len().to_string())?,
                    );
                    response.body(hyper::Body::wrap_stream(stream::iter(owned_chunks)))?
                };

                return Ok((response, content_source_side_effects.clone()));
            }
        }
        GetFromSourceResult::HttpProxy(proxy_result) => {
            let mut response = Response::builder().status(proxy_result.status);
            let headers = response.headers_mut().expect("headers must be defined");

            for (name, value) in &proxy_result.headers {
                headers.append(
                    HeaderName::from_bytes(name.as_bytes())?,
                    hyper::header::HeaderValue::from_str(value)?,
                );
            }

            return Ok((
                response.body(hyper::Body::wrap_stream(proxy_result.body.read()))?,
                content_source_side_effects.clone(),
            ));
        }
        GetFromSourceResult::NotFound => {}
    }

    Ok((
        Response::builder().status(404).body(hyper::Body::empty())?,
        content_source_side_effects.clone(),
    ))
}

async fn http_request_to_source_request(request: Request<hyper::Body>) -> Result<SourceRequest> {
    let (parts, body) = request.into_parts();

    // For simplicity, we fully consume the body now and early return if there were
    // any errors.
    let bytes: Vec<_> = body
        .map(|bytes| {
            bytes.map_or_else(
                |e| Err(SharedError::new(anyhow!(e))),
                // The outer Ok is consumed by try_collect, but the Body type requires a Result, so
                // we need to double wrap.
                |b| Ok(Ok(Bytes::from(b))),
            )
        })
        .try_collect::<Vec<_>>()
        .await?;

    Ok(SourceRequest {
        method: parts.method.to_string(),
        uri: parts.uri,
        headers: parts.headers,
        body: Body::new(bytes),
    })
}
Quest for Codev2.0.0
/
SIGN IN