use std::{
borrow::Cow,
collections::hash_map::Entry,
mem::transmute,
ops::{Deref, DerefMut},
sync::Arc,
};
use anyhow::Result;
use rustc_hash::{FxHashMap, FxHashSet};
use turbo_rcstr::{RcStr, RcStrInterning, rcstr};
use turbopack_trace_utils::tracing::{TraceRow, TraceValue};
use super::TraceFormat;
use crate::{
span::{SpanArgs, SpanIndex},
store_container::{StoreContainer, StoreWriteGuard},
timestamp::Timestamp,
};
#[derive(Default)]
struct AllocationInfo {
allocations: u64,
deallocations: u64,
allocation_count: u64,
deallocation_count: u64,
}
struct InternalRow {
id: Option<u64>,
ty: InternalRowType,
}
enum InternalRowType {
Start {
new_id: u64,
ts: Timestamp,
name: RcStr,
target: RcStr,
values: SpanArgs,
},
End,
SelfTime {
start: Timestamp,
end: Timestamp,
},
Event {
ts: Timestamp,
/// Pre-extracted at parse time from the typed `TraceValue::UInt`;
/// the runtime `values` vec no longer carries it.
duration: Timestamp,
/// Pre-extracted at parse time from the typed `TraceValue::String`;
/// defaults to `"event"` when not present.
name: RcStr,
values: SpanArgs,
},
Record {
values: SpanArgs,
},
Allocation {
allocations: u64,
allocation_count: u64,
},
Deallocation {
deallocations: u64,
deallocation_count: u64,
},
}
pub struct TurbopackFormat {
store: Arc<StoreContainer>,
id_mapping: FxHashMap<u64, SpanIndex>,
dropped_ids: FxHashSet<u64>,
remaining_ids_to_drop: usize,
queued_rows: FxHashMap<u64, Vec<InternalRow>>,
outdated_spans: FxHashSet<SpanIndex>,
thread_stacks: FxHashMap<u64, Vec<u64>>,
thread_allocation_counters: FxHashMap<u64, AllocationInfo>,
self_time_started: FxHashMap<(u64, u64), Timestamp>,
interner: RcStrInterning,
}
impl TurbopackFormat {
pub fn new(store: Arc<StoreContainer>) -> Self {
let drop_ids = std::env::var("DROP_SPANS")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or_default();
Self {
store,
id_mapping: FxHashMap::with_capacity_and_hasher(131_072, Default::default()),
dropped_ids: FxHashSet::with_capacity_and_hasher(drop_ids, Default::default()),
remaining_ids_to_drop: drop_ids,
queued_rows: FxHashMap::with_capacity_and_hasher(1_024, Default::default()),
outdated_spans: FxHashSet::with_capacity_and_hasher(8_192, Default::default()),
thread_stacks: FxHashMap::with_capacity_and_hasher(64, Default::default()),
thread_allocation_counters: FxHashMap::with_capacity_and_hasher(64, Default::default()),
self_time_started: FxHashMap::with_capacity_and_hasher(256, Default::default()),
interner: RcStrInterning::new(),
}
}
fn intern_span_args(&mut self, values: Vec<(Cow<'_, str>, TraceValue<'_>)>) -> SpanArgs {
values
.into_iter()
.map(|(k, v)| {
let k = self.interner.intern_cow(k);
let v = match v {
TraceValue::String(s) => self.interner.intern_cow(s),
other => self.interner.intern_display(&other),
};
(k, v)
})
.collect()
}
fn process(&mut self, store: &mut StoreWriteGuard, row: TraceRow<'_>) {
match row {
TraceRow::Start {
ts,
id,
parent,
name,
target,
values,
} => {
let ts = Timestamp::from_micros(ts);
let name = self.interner.intern_cow(name);
let target = self.interner.intern_cow(target);
let values = self.intern_span_args(values);
self.process_internal_row(
store,
InternalRow {
id: parent,
ty: InternalRowType::Start {
ts,
new_id: id,
name,
target,
values,
},
},
);
}
TraceRow::Record { id, values } => {
let values = self.intern_span_args(values);
self.process_internal_row(
store,
InternalRow {
id: Some(id),
ty: InternalRowType::Record { values },
},
);
}
TraceRow::End { ts: _, id } => {
self.process_internal_row(
store,
InternalRow {
id: Some(id),
ty: InternalRowType::End,
},
);
}
TraceRow::Enter { ts, id, thread_id } => {
let ts = Timestamp::from_micros(ts);
let stack = self.thread_stacks.entry(thread_id).or_default();
if let Some(&parent) = stack.last() {
if let Some(parent_start) = self.self_time_started.remove(&(parent, thread_id))
{
stack.push(id);
self.process_internal_row(
store,
InternalRow {
id: Some(parent),
ty: InternalRowType::SelfTime {
start: parent_start,
end: ts,
},
},
);
} else {
stack.push(id);
}
} else {
stack.push(id);
}
self.self_time_started.insert((id, thread_id), ts);
}
TraceRow::Exit { ts, id, thread_id } => {
let ts = Timestamp::from_micros(ts);
let stack = self.thread_stacks.entry(thread_id).or_default();
if let Some(pos) = stack.iter().rev().position(|&x| x == id) {
let stack_index = stack.len() - pos - 1;
stack.remove(stack_index);
if stack_index > 0 {
let parent = stack[stack_index - 1];
self.self_time_started.insert((parent, thread_id), ts);
}
}
if let Some(start) = self.self_time_started.remove(&(id, thread_id)) {
self.process_internal_row(
store,
InternalRow {
id: Some(id),
ty: InternalRowType::SelfTime { start, end: ts },
},
);
}
}
TraceRow::Event { ts, parent, values } => {
let ts = Timestamp::from_micros(ts);
// Pull `duration` and `name` out of the typed values during
// interning
let mut duration = Timestamp::ZERO;
let mut name = rcstr!("event");
let mut interned_values: SpanArgs = SpanArgs::with_capacity(values.len());
for (k, v) in values {
match k.as_ref() {
"duration" => {
duration = Timestamp::from_micros(v.as_u64().unwrap_or(0));
}
"name" => {
if let TraceValue::String(s) = v {
name = self.interner.intern_cow(s);
}
}
_ => {
let k = self.interner.intern_cow(k);
let v = match v {
TraceValue::String(s) => self.interner.intern_cow(s),
other => self.interner.intern_display(&other),
};
interned_values.push((k, v));
}
}
}
self.process_internal_row(
store,
InternalRow {
id: parent,
ty: InternalRowType::Event {
ts,
duration,
name,
values: interned_values,
},
},
);
}
TraceRow::Allocation {
ts: _,
thread_id,
allocations,
allocation_count,
deallocations,
deallocation_count,
} => {
let stack = self.thread_stacks.entry(thread_id).or_default();
if let Some(&id) = stack.last() {
if allocations > 0 {
self.process_internal_row(
store,
InternalRow {
id: Some(id),
ty: InternalRowType::Allocation {
allocations,
allocation_count,
},
},
);
}
if deallocations > 0 {
self.process_internal_row(
store,
InternalRow {
id: Some(id),
ty: InternalRowType::Deallocation {
deallocations,
deallocation_count,
},
},
);
}
}
}
TraceRow::MemorySample {
ts,
memory,
memory_pressure,
} => {
let ts = Timestamp::from_micros(ts);
store.add_memory_sample(ts, memory, memory_pressure);
}
TraceRow::AllocationCounters {
ts: _,
thread_id,
allocations,
allocation_count,
deallocations,
deallocation_count,
} => {
let info = AllocationInfo {
allocations,
deallocations,
allocation_count,
deallocation_count,
};
let mut diff = AllocationInfo::default();
match self.thread_allocation_counters.entry(thread_id) {
Entry::Occupied(mut entry) => {
let counter = entry.get_mut();
diff.allocations = info.allocations - counter.allocations;
diff.deallocations = info.deallocations - counter.deallocations;
diff.allocation_count = info.allocation_count - counter.allocation_count;
diff.deallocation_count =
info.deallocation_count - counter.deallocation_count;
counter.allocations = info.allocations;
counter.deallocations = info.deallocations;
counter.allocation_count = info.allocation_count;
counter.deallocation_count = info.deallocation_count;
}
Entry::Vacant(entry) => {
entry.insert(info);
}
}
let stack = self.thread_stacks.entry(thread_id).or_default();
if let Some(&id) = stack.last() {
if diff.allocations > 0 {
self.process_internal_row(
store,
InternalRow {
id: Some(id),
ty: InternalRowType::Allocation {
allocations: diff.allocations,
allocation_count: diff.allocation_count,
},
},
);
}
if diff.deallocations > 0 {
self.process_internal_row(
store,
InternalRow {
id: Some(id),
ty: InternalRowType::Deallocation {
deallocations: diff.deallocations,
deallocation_count: diff.deallocation_count,
},
},
);
}
}
}
}
}
fn process_internal_row(&mut self, store: &mut StoreWriteGuard, row: InternalRow) {
let id = if let Some(id) = row.id {
if matches!(
row.ty,
InternalRowType::End
| InternalRowType::Event { .. }
| InternalRowType::Record { .. }
| InternalRowType::SelfTime { .. }
) && self.dropped_ids.contains(&id)
{
return;
}
if let Some(id) = self.id_mapping.get(&id) {
Some(*id)
} else {
// Parent hasn't been seen yet; queue this row to be processed
// when the parent arrives. The row is already lifetime-free
// (strings interned eagerly at parse time) so no allocation
// is needed to extend its lifetime here.
self.queued_rows.entry(id).or_default().push(row);
return;
}
} else {
None
};
match row.ty {
InternalRowType::Start {
ts,
new_id,
name,
target,
values,
} => {
if self.remaining_ids_to_drop > 0
&& let Some(id) = id
{
self.remaining_ids_to_drop -= 1;
self.dropped_ids.insert(new_id);
self.id_mapping.insert(new_id, id);
} else {
let span_id =
store.add_span(id, ts, target, name, values, &mut self.outdated_spans);
self.id_mapping.insert(new_id, span_id);
}
// Inline-flush any rows that were waiting on this parent.
// Processing them now keeps the just-added Span (and its
// surrounding Store state) hot in cache, and avoids the
// memcpy of a working-queue extend. Recursion depth is
// bounded by the depth of orphan chains in the trace.
if let Some(rows) = self.queued_rows.remove(&new_id) {
for row in rows {
self.process_internal_row(store, row);
}
}
}
InternalRowType::Record { values } => {
store.add_args(id.unwrap(), values, &mut self.outdated_spans);
}
InternalRowType::End => {
store.complete_span(id.unwrap());
}
InternalRowType::SelfTime { start, end } => {
store.add_self_time(id.unwrap(), start, end, &mut self.outdated_spans);
}
InternalRowType::Event {
ts,
duration,
name,
values,
} => {
let start = ts.saturating_sub(duration);
let id = store.add_span(
id,
start,
rcstr!("event"),
name,
values,
&mut self.outdated_spans,
);
store.add_self_time(id, start, ts, &mut self.outdated_spans);
store.complete_span(id);
}
InternalRowType::Allocation {
allocations,
allocation_count,
} => {
store.add_allocation(
id.unwrap(),
allocations,
allocation_count,
&mut self.outdated_spans,
);
}
InternalRowType::Deallocation {
deallocations,
deallocation_count,
} => {
store.add_deallocation(
id.unwrap(),
deallocations,
deallocation_count,
&mut self.outdated_spans,
);
}
}
}
}
impl TraceFormat for TurbopackFormat {
type Reused = Vec<TraceRow<'static>>;
fn create_reused() -> Vec<TraceRow<'static>> {
// Pre-allocate for a typical batch size to avoid repeated doubling during initial read.
Vec::with_capacity(4_096)
}
fn stats(&self) -> String {
use std::fmt::Write;
let spans = self.id_mapping.len();
let mut stats = format!("{spans} spans");
let dropped_spans = self.dropped_ids.len();
if dropped_spans > 0 {
let total_drop = dropped_spans + self.remaining_ids_to_drop;
write!(stats, ", {dropped_spans}/{total_drop} dropped").unwrap();
}
let queued_spans = self.queued_rows.len();
if queued_spans > 0 {
write!(stats, ", {queued_spans} queued").unwrap();
}
stats
}
fn read(&mut self, mut buffer: &[u8], reuse: &mut Self::Reused) -> Result<usize> {
reuse.clear();
let mut reuse = ClearOnDrop(reuse);
// Safety: The Vec is empty and is cleared on leaving this scope, so it's safe to cast the
// lifetime of data, since there is no data and data can't leave this function.
let rows =
unsafe { transmute::<&mut Vec<TraceRow<'_>>, &mut Vec<TraceRow<'_>>>(&mut *reuse) };
let mut bytes_read = 0;
loop {
match postcard::take_from_bytes(buffer) {
Ok((row, remaining)) => {
bytes_read += buffer.len() - remaining.len();
buffer = remaining;
rows.push(row);
}
Err(err) => {
if matches!(err, postcard::Error::DeserializeUnexpectedEnd) {
break;
}
return Err(err.into());
}
}
}
if !rows.is_empty() {
let store = self.store.clone();
let mut iter = rows.drain(..);
{
let mut store = store.write();
for row in iter.by_ref() {
self.process(&mut store, row);
}
store.invalidate_outdated_spans(&self.outdated_spans);
self.outdated_spans.clear();
}
}
Ok(bytes_read)
}
}
struct ClearOnDrop<'l, T>(&'l mut Vec<T>);
impl<T> Drop for ClearOnDrop<'_, T> {
fn drop(&mut self) {
self.0.clear();
}
}
impl<T> Deref for ClearOnDrop<'_, T> {
type Target = Vec<T>;
fn deref(&self) -> &Self::Target {
self.0
}
}
impl<T> DerefMut for ClearOnDrop<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0
}
}