use std::{io::Read, iter};
use anyhow::{Result, anyhow};
use auto_hash_map::AutoSet;
use flate2::{Compression, bufread::GzEncoder};
use futures::{StreamExt, TryStreamExt, stream};
use hyper::{
Request, Response,
header::{CONTENT_ENCODING, CONTENT_LENGTH, HeaderName},
http::HeaderValue,
};
use mime::Mime;
use turbo_tasks::{
CollectiblesSource, Effects, OperationVc, ReadRef, ResolvedVc, TransientInstance, Vc,
take_effects, util::SharedError,
};
use turbo_tasks_bytes::Bytes;
use turbo_tasks_fs::FileContent;
use turbopack_core::{
asset::AssetContent,
issue::{IssueReporter, IssueSeverity, handle_issues},
version::VersionedContent,
};
use crate::source::{
Body, ContentSource, ContentSourceSideEffect, HeaderList, ProxyResult,
request::SourceRequest,
resolve::{ResolveSourceRequestResult, resolve_source_request},
};
#[turbo_tasks::value(serialization = "skip")]
enum GetFromSourceResult {
Static {
content: ReadRef<FileContent>,
status_code: u16,
headers: ReadRef<HeaderList>,
header_overwrites: ReadRef<HeaderList>,
},
HttpProxy(ReadRef<ProxyResult>),
NotFound,
}
/// Resolves a [SourceRequest] within a [super::ContentSource], returning the
/// corresponding content as a
#[turbo_tasks::function(operation)]
async fn get_from_source_operation(
source: OperationVc<Box<dyn ContentSource>>,
request: TransientInstance<SourceRequest>,
) -> Result<Vc<GetFromSourceResult>> {
Ok(
match &*resolve_source_request(source, request).connect().await? {
ResolveSourceRequestResult::Static(static_content_vc, header_overwrites) => {
let static_content = static_content_vc.await?;
if let AssetContent::File(file) = &*static_content.content.content().await? {
GetFromSourceResult::Static {
content: file.await?,
status_code: static_content.status_code,
headers: static_content.headers.await?,
header_overwrites: header_overwrites.await?,
}
} else {
GetFromSourceResult::NotFound
}
}
ResolveSourceRequestResult::HttpProxy(proxy) => {
GetFromSourceResult::HttpProxy(proxy.connect().await?)
}
ResolveSourceRequestResult::NotFound => GetFromSourceResult::NotFound,
}
.cell(),
)
}
#[turbo_tasks::value(serialization = "skip")]
struct GetFromSourceResultWithCollectibles {
result: ReadRef<GetFromSourceResult>,
effects: Effects,
content_source_side_effects: AutoSet<ResolvedVc<Box<dyn ContentSourceSideEffect>>>,
}
#[turbo_tasks::function(operation)]
async fn get_from_source_with_collectibles_operation(
source_op: OperationVc<Box<dyn ContentSource>>,
request: TransientInstance<SourceRequest>,
) -> Result<Vc<GetFromSourceResultWithCollectibles>> {
let op = get_from_source_operation(source_op, request);
let result = op.read_strongly_consistent().await?;
Ok(GetFromSourceResultWithCollectibles {
result,
effects: take_effects(op).await?,
content_source_side_effects: op.peek_collectibles(),
}
.cell())
}
/// Processes an HTTP request within a given content source and returns the response.
pub async fn process_request_with_content_source(
source: OperationVc<Box<dyn ContentSource>>,
request: Request<hyper::Body>,
issue_reporter: Vc<Box<dyn IssueReporter>>,
) -> Result<(
Response<hyper::Body>,
AutoSet<ResolvedVc<Box<dyn ContentSourceSideEffect>>>,
)> {
let original_path = request.uri().path().to_string();
let request = http_request_to_source_request(request).await?;
let wrapper_op =
get_from_source_with_collectibles_operation(source, TransientInstance::new(request));
let GetFromSourceResultWithCollectibles {
result,
effects,
content_source_side_effects,
} = &*wrapper_op.read_strongly_consistent().await?;
effects.apply().await?;
handle_issues(
wrapper_op,
issue_reporter,
IssueSeverity::Fatal,
Some(&original_path),
Some("get_from_source_operation"),
)
.await?;
match &**result {
GetFromSourceResult::Static {
content,
status_code,
headers,
header_overwrites,
} => {
if let FileContent::Content(file) = &**content {
let mut response = Response::builder().status(*status_code);
let header_map = response.headers_mut().expect("headers must be defined");
for (header_name, header_value) in headers {
header_map.append(
HeaderName::try_from(header_name.as_str())?,
hyper::header::HeaderValue::try_from(header_value.as_str())?,
);
}
for (header_name, header_value) in header_overwrites.iter() {
header_map.insert(
HeaderName::try_from(header_name.as_str())?,
hyper::header::HeaderValue::try_from(header_value.as_str())?,
);
}
// naively checking if content is `compressible`.
let mut should_compress = false;
let should_compress_predicate = |mime: &Mime| {
matches!(
(mime.type_(), mime.subtype(), mime.suffix()),
(_, mime::PLAIN, _)
| (_, mime::JSON, _)
| (mime::TEXT, _, _)
| (mime::APPLICATION, mime::XML, _)
| (mime::APPLICATION, mime::JAVASCRIPT, _)
| (_, _, Some(mime::XML))
| (_, _, Some(mime::JSON))
| (_, _, Some(mime::TEXT))
)
};
if let Some(content_type) = file.content_type() {
header_map.append(
"content-type",
hyper::header::HeaderValue::try_from(content_type.to_string())?,
);
should_compress = should_compress_predicate(content_type);
} else if let hyper::header::Entry::Vacant(entry) = header_map.entry("content-type")
{
let guess = mime_guess::from_path(&original_path).first_or_octet_stream();
should_compress = should_compress_predicate(&guess);
// If a text type, application/javascript, or application/json was
// guessed, use a utf-8 charset as we most likely generated it as
// such.
entry.insert(hyper::header::HeaderValue::try_from(
if (guess.type_() == mime::TEXT
|| guess.subtype() == mime::JAVASCRIPT
|| guess.subtype() == mime::JSON)
&& guess.get_param("charset").is_none()
{
guess.to_string() + "; charset=utf-8"
} else {
guess.to_string()
},
)?);
}
if !header_map.contains_key("cache-control") {
// The dev server contents might change at any time, we can't cache them.
header_map.append(
"cache-control",
hyper::header::HeaderValue::try_from("must-revalidate")?,
);
}
let content = file.content();
let response = if should_compress {
header_map.insert(CONTENT_ENCODING, HeaderValue::from_static("gzip"));
// Hyper requires an owned reader... We could do this with streaming by cloning
// each `Bytes` and implementing `BufRead` for `Iterator<bytes::Bytes>`, but
// it's not really worth it, just compressing the whole thing up-front is fine.
//
// Use fast compression, since we're likely just tranferring data over
// localhost.
let mut gz_bytes = Vec::new();
GzEncoder::new(content.read(), Compression::fast())
.read_to_end(&mut gz_bytes)
.expect("read of Rope should never fail");
response.body(hyper::Body::wrap_stream(stream::iter(iter::once(
hyper::Result::Ok(gz_bytes),
))))?
} else {
// hyper requires an owned stream, so we must clone the iterator items
// this is relatively cheap: each chunk is a `Bytes`, so `Clone` updates a
// refcount
let owned_chunks: Vec<_> =
content.read().cloned().map(hyper::Result::Ok).collect();
header_map.insert(
CONTENT_LENGTH,
hyper::header::HeaderValue::try_from(content.len().to_string())?,
);
response.body(hyper::Body::wrap_stream(stream::iter(owned_chunks)))?
};
return Ok((response, content_source_side_effects.clone()));
}
}
GetFromSourceResult::HttpProxy(proxy_result) => {
let mut response = Response::builder().status(proxy_result.status);
let headers = response.headers_mut().expect("headers must be defined");
for (name, value) in &proxy_result.headers {
headers.append(
HeaderName::from_bytes(name.as_bytes())?,
hyper::header::HeaderValue::from_str(value)?,
);
}
return Ok((
response.body(hyper::Body::wrap_stream(proxy_result.body.read()))?,
content_source_side_effects.clone(),
));
}
GetFromSourceResult::NotFound => {}
}
Ok((
Response::builder().status(404).body(hyper::Body::empty())?,
content_source_side_effects.clone(),
))
}
async fn http_request_to_source_request(request: Request<hyper::Body>) -> Result<SourceRequest> {
let (parts, body) = request.into_parts();
// For simplicity, we fully consume the body now and early return if there were
// any errors.
let bytes: Vec<_> = body
.map(|bytes| {
bytes.map_or_else(
|e| Err(SharedError::new(anyhow!(e))),
// The outer Ok is consumed by try_collect, but the Body type requires a Result, so
// we need to double wrap.
|b| Ok(Ok(Bytes::from(b))),
)
})
.try_collect::<Vec<_>>()
.await?;
Ok(SourceRequest {
method: parts.method.to_string(),
uri: parts.uri,
headers: parts.headers,
body: Body::new(bytes),
})
}