Use thread local dep graph encoding

This commit is contained in:
John Kåre Alsaker 2025-04-11 21:27:53 +02:00
parent 6e23095adf
commit d3ec14bbec
6 changed files with 290 additions and 185 deletions

View file

@ -43,7 +43,7 @@ pub use self::freeze::{FreezeLock, FreezeReadGuard, FreezeWriteGuard};
pub use self::lock::{Lock, LockGuard, Mode};
pub use self::mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode};
pub use self::parallel::{
join, par_for_each_in, par_map, parallel_guard, scope, spawn, try_par_for_each_in,
broadcast, join, par_for_each_in, par_map, parallel_guard, scope, spawn, try_par_for_each_in,
};
pub use self::vec::{AppendOnlyIndexVec, AppendOnlyVec};
pub use self::worker_local::{Registry, WorkerLocal};

View file

@ -237,3 +237,13 @@ pub fn par_map<I: DynSend, T: IntoIterator<Item = I>, R: DynSend, C: FromIterato
}
})
}
pub fn broadcast<R: DynSend>(op: impl Fn(usize) -> R + DynSync) -> Vec<R> {
if mode::is_dyn_thread_safe() {
let op = FromDyn::from(op);
let results = rayon_core::broadcast(|context| op.derive(op(context.index())));
results.into_iter().map(|r| r.into_inner()).collect()
} else {
vec![op(0)]
}
}

View file

@ -44,10 +44,6 @@ pub(crate) fn save_dep_graph(tcx: TyCtxt<'_>) {
sess.time("assert_dep_graph", || assert_dep_graph(tcx));
sess.time("check_dirty_clean", || dirty_clean::check_dirty_clean_annotations(tcx));
if sess.opts.unstable_opts.incremental_info {
tcx.dep_graph.print_incremental_info()
}
join(
move || {
sess.time("incr_comp_persist_dep_graph", || {
@ -172,12 +168,5 @@ pub(crate) fn build_dep_graph(
// First encode the commandline arguments hash
sess.opts.dep_tracking_hash(false).encode(&mut encoder);
Some(DepGraph::new(
sess,
prev_graph,
prev_work_products,
encoder,
sess.opts.unstable_opts.query_dep_graph,
sess.opts.unstable_opts.incremental_info,
))
Some(DepGraph::new(sess, prev_graph, prev_work_products, encoder))
}

View file

@ -11,7 +11,7 @@ use rustc_data_structures::outline;
use rustc_data_structures::profiling::QueryInvocationId;
use rustc_data_structures::sharded::{self, ShardedHashMap};
use rustc_data_structures::stable_hasher::{HashStable, StableHasher};
use rustc_data_structures::sync::{AtomicU64, Lock};
use rustc_data_structures::sync::{AtomicU64, Lock, is_dyn_thread_safe};
use rustc_data_structures::unord::UnordMap;
use rustc_errors::DiagInner;
use rustc_index::IndexVec;
@ -124,19 +124,11 @@ impl<D: Deps> DepGraph<D> {
prev_graph: Arc<SerializedDepGraph>,
prev_work_products: WorkProductMap,
encoder: FileEncoder,
record_graph: bool,
record_stats: bool,
) -> DepGraph<D> {
let prev_graph_node_count = prev_graph.node_count();
let current = CurrentDepGraph::new(
session,
prev_graph_node_count,
encoder,
record_graph,
record_stats,
Arc::clone(&prev_graph),
);
let current =
CurrentDepGraph::new(session, prev_graph_node_count, encoder, Arc::clone(&prev_graph));
let colors = DepNodeColorMap::new(prev_graph_node_count);
@ -1052,17 +1044,8 @@ impl<D: Deps> DepGraph<D> {
}
}
pub fn print_incremental_info(&self) {
if let Some(data) = &self.data {
data.current.encoder.print_incremental_info(
data.current.total_read_count.load(Ordering::Relaxed),
data.current.total_duplicate_read_count.load(Ordering::Relaxed),
)
}
}
pub fn finish_encoding(&self) -> FileEncodeResult {
if let Some(data) = &self.data { data.current.encoder.finish() } else { Ok(0) }
if let Some(data) = &self.data { data.current.encoder.finish(&data.current) } else { Ok(0) }
}
pub(crate) fn next_virtual_depnode_index(&self) -> DepNodeIndex {
@ -1179,8 +1162,8 @@ pub(super) struct CurrentDepGraph<D: Deps> {
/// These are simple counters that are for profiling and
/// debugging and only active with `debug_assertions`.
total_read_count: AtomicU64,
total_duplicate_read_count: AtomicU64,
pub(super) total_read_count: AtomicU64,
pub(super) total_duplicate_read_count: AtomicU64,
}
impl<D: Deps> CurrentDepGraph<D> {
@ -1188,8 +1171,6 @@ impl<D: Deps> CurrentDepGraph<D> {
session: &Session,
prev_graph_node_count: usize,
encoder: FileEncoder,
record_graph: bool,
record_stats: bool,
previous: Arc<SerializedDepGraph>,
) -> Self {
let mut stable_hasher = StableHasher::new();
@ -1211,14 +1192,7 @@ impl<D: Deps> CurrentDepGraph<D> {
session.opts.unstable_opts.incremental_verify_ich || cfg!(debug_assertions);
CurrentDepGraph {
encoder: GraphEncoder::new(
encoder,
prev_graph_node_count,
record_graph,
record_stats,
&session.prof,
previous,
),
encoder: GraphEncoder::new(session, encoder, prev_graph_node_count, previous),
anon_node_to_index: ShardedHashMap::with_capacity(
// FIXME: The count estimate is off as anon nodes are only a portion of the nodes.
new_node_count_estimate / sharded::shards(),
@ -1345,6 +1319,7 @@ impl Default for TaskDeps {
// array, using one u32 per entry.
pub(super) struct DepNodeColorMap {
values: IndexVec<SerializedDepNodeIndex, AtomicU32>,
sync: bool,
}
const COMPRESSED_NONE: u32 = u32::MAX;
@ -1353,7 +1328,10 @@ const COMPRESSED_RED: u32 = u32::MAX - 1;
impl DepNodeColorMap {
fn new(size: usize) -> DepNodeColorMap {
debug_assert!(COMPRESSED_RED > DepNodeIndex::MAX_AS_U32);
DepNodeColorMap { values: (0..size).map(|_| AtomicU32::new(COMPRESSED_NONE)).collect() }
DepNodeColorMap {
values: (0..size).map(|_| AtomicU32::new(COMPRESSED_NONE)).collect(),
sync: is_dyn_thread_safe(),
}
}
#[inline]
@ -1362,6 +1340,36 @@ impl DepNodeColorMap {
if value <= DepNodeIndex::MAX_AS_U32 { Some(DepNodeIndex::from_u32(value)) } else { None }
}
/// This tries to atomically mark a node green and assign `index` as the new
/// index.
#[inline]
pub(super) fn try_mark_green(
&self,
prev_index: SerializedDepNodeIndex,
index: DepNodeIndex,
) -> Result<(), DepNodeIndex> {
let value = &self.values[prev_index];
if self.sync {
match value.compare_exchange(
COMPRESSED_NONE,
index.as_u32(),
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => Ok(()),
Err(v) => Err(DepNodeIndex::from_u32(v)),
}
} else {
let v = value.load(Ordering::Relaxed);
if v == COMPRESSED_NONE {
value.store(index.as_u32(), Ordering::Relaxed);
Ok(())
} else {
Err(DepNodeIndex::from_u32(v))
}
}
}
#[inline]
pub(super) fn get(&self, index: SerializedDepNodeIndex) -> Option<DepNodeColor> {
match self.values[index].load(Ordering::Acquire) {

View file

@ -12,6 +12,7 @@ pub(crate) use graph::DepGraphData;
pub use graph::{DepGraph, DepNodeIndex, TaskDepsRef, WorkProduct, WorkProductMap, hash_result};
pub use query::DepGraphQuery;
use rustc_data_structures::profiling::SelfProfilerRef;
use rustc_data_structures::sync::DynSync;
use rustc_session::Session;
pub use serialized::{SerializedDepGraph, SerializedDepNodeIndex};
use tracing::instrument;
@ -89,7 +90,7 @@ pub trait DepContext: Copy {
}
}
pub trait Deps {
pub trait Deps: DynSync {
/// Execute the operation with provided dependencies.
fn with_deps<OP, R>(deps: TaskDepsRef<'_>, op: OP) -> R
where

View file

@ -35,23 +35,27 @@
//! If the number of edges in this node does not fit in the bits available in the header, we
//! store it directly after the header with leb128.
use std::iter;
use std::cell::RefCell;
use std::cmp::max;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::{iter, mem, u64};
use rustc_data_structures::fingerprint::{Fingerprint, PackedFingerprint};
use rustc_data_structures::fx::FxHashMap;
use rustc_data_structures::outline;
use rustc_data_structures::profiling::SelfProfilerRef;
use rustc_data_structures::sync::Lock;
use rustc_data_structures::sync::{Lock, WorkerLocal, broadcast};
use rustc_data_structures::unhash::UnhashMap;
use rustc_index::{Idx, IndexVec};
use rustc_index::IndexVec;
use rustc_serialize::opaque::mem_encoder::MemEncoder;
use rustc_serialize::opaque::{FileEncodeResult, FileEncoder, IntEncodedWithFixedSize, MemDecoder};
use rustc_serialize::{Decodable, Decoder, Encodable, Encoder};
use rustc_session::Session;
use tracing::{debug, instrument};
use super::graph::{DepNodeColor, DepNodeColorMap};
use super::graph::{CurrentDepGraph, DepNodeColor, DepNodeColorMap};
use super::query::DepGraphQuery;
use super::{DepKind, DepNode, DepNodeIndex, Deps};
use crate::dep_graph::edges::EdgesVec;
@ -76,6 +80,9 @@ const DEP_NODE_PAD: usize = DEP_NODE_SIZE - 1;
const DEP_NODE_WIDTH_BITS: usize = DEP_NODE_SIZE / 2;
/// Data for use when recompiling the **current crate**.
///
/// There may be unused indices with DEP_KIND_NULL in this graph due to batch allocation of
/// indices to threads.
#[derive(Debug, Default)]
pub struct SerializedDepGraph {
/// The set of all DepNodes in the graph
@ -184,26 +191,30 @@ impl SerializedDepGraph {
pub fn decode<D: Deps>(d: &mut MemDecoder<'_>, deps: &D) -> Arc<SerializedDepGraph> {
// The last 16 bytes are the node count and edge count.
debug!("position: {:?}", d.position());
let (node_count, edge_count) =
d.with_position(d.len() - 2 * IntEncodedWithFixedSize::ENCODED_SIZE, |d| {
// `node_max` is the number of indices including empty nodes while `node_count`
// is the number of actually encoded nodes.
let (node_max, node_count, edge_count) =
d.with_position(d.len() - 3 * IntEncodedWithFixedSize::ENCODED_SIZE, |d| {
debug!("position: {:?}", d.position());
let node_max = IntEncodedWithFixedSize::decode(d).0 as usize;
let node_count = IntEncodedWithFixedSize::decode(d).0 as usize;
let edge_count = IntEncodedWithFixedSize::decode(d).0 as usize;
(node_count, edge_count)
(node_max, node_count, edge_count)
});
debug!("position: {:?}", d.position());
debug!(?node_count, ?edge_count);
let graph_bytes = d.len() - (2 * IntEncodedWithFixedSize::ENCODED_SIZE) - d.position();
let graph_bytes = d.len() - (3 * IntEncodedWithFixedSize::ENCODED_SIZE) - d.position();
let mut nodes = IndexVec::from_elem_n(
DepNode { kind: D::DEP_KIND_NULL, hash: PackedFingerprint::from(Fingerprint::ZERO) },
node_count,
node_max,
);
let mut fingerprints = IndexVec::from_elem_n(Fingerprint::ZERO, node_count);
let mut fingerprints = IndexVec::from_elem_n(Fingerprint::ZERO, node_max);
let mut edge_list_indices =
IndexVec::from_elem_n(EdgeHeader { repr: 0, num_edges: 0 }, node_count);
IndexVec::from_elem_n(EdgeHeader { repr: 0, num_edges: 0 }, node_max);
// This estimation assumes that all of the encoded bytes are for the edge lists or for the
// fixed-size node headers. But that's not necessarily true; if any edge list has a length
@ -217,7 +228,7 @@ impl SerializedDepGraph {
let mut edge_list_data =
Vec::with_capacity(graph_bytes - node_count * size_of::<SerializedNodeHeader<D>>());
for _index in 0..node_count {
for _ in 0..node_count {
// Decode the header for this edge; the header packs together as many of the fixed-size
// fields as possible to limit the number of times we update decoder state.
let node_header =
@ -263,8 +274,8 @@ impl SerializedDepGraph {
for (idx, node) in nodes.iter_enumerated() {
if index[node.kind.as_usize()].insert(node.hash, idx).is_some() {
// Side effect nodes can have duplicates
if node.kind != D::DEP_KIND_SIDE_EFFECT {
// Empty nodes and side effect nodes can have duplicates
if node.kind != D::DEP_KIND_NULL && node.kind != D::DEP_KIND_SIDE_EFFECT {
let name = deps.name(node.kind);
panic!(
"Error: A dep graph node ({name}) does not have an unique index. \
@ -508,17 +519,32 @@ struct Stat {
edge_counter: u64,
}
struct EncoderState<D: Deps> {
previous: Arc<SerializedDepGraph>,
encoder: FileEncoder,
total_node_count: usize,
total_edge_count: usize,
stats: Option<FxHashMap<DepKind, Stat>>,
mem_encoder: MemEncoder,
struct LocalEncoderState {
next_node_index: u32,
remaining_node_index: u32,
encoder: MemEncoder,
node_count: usize,
edge_count: usize,
/// Stores the number of times we've encoded each dep kind.
kind_stats: Vec<u32>,
}
struct LocalEncoderResult {
node_max: u32,
node_count: usize,
edge_count: usize,
/// Stores the number of times we've encoded each dep kind.
kind_stats: Vec<u32>,
}
struct EncoderState<D: Deps> {
next_node_index: AtomicU64,
previous: Arc<SerializedDepGraph>,
file: Lock<Option<FileEncoder>>,
local: WorkerLocal<RefCell<LocalEncoderState>>,
stats: Option<Lock<FxHashMap<DepKind, Stat>>>,
marker: PhantomData<D>,
}
@ -526,34 +552,58 @@ impl<D: Deps> EncoderState<D> {
fn new(encoder: FileEncoder, record_stats: bool, previous: Arc<SerializedDepGraph>) -> Self {
Self {
previous,
encoder,
total_edge_count: 0,
total_node_count: 0,
stats: record_stats.then(FxHashMap::default),
mem_encoder: MemEncoder::new(),
kind_stats: iter::repeat(0).take(D::DEP_KIND_MAX as usize + 1).collect(),
next_node_index: AtomicU64::new(0),
stats: record_stats.then(|| Lock::new(FxHashMap::default())),
file: Lock::new(Some(encoder)),
local: WorkerLocal::new(|_| {
RefCell::new(LocalEncoderState {
next_node_index: 0,
remaining_node_index: 0,
edge_count: 0,
node_count: 0,
encoder: MemEncoder::new(),
kind_stats: iter::repeat(0).take(D::DEP_KIND_MAX as usize + 1).collect(),
})
}),
marker: PhantomData,
}
}
#[inline]
fn alloc_index(&mut self) -> DepNodeIndex {
let index = DepNodeIndex::new(self.total_node_count);
self.total_node_count += 1;
index
fn next_index(&self, local: &mut LocalEncoderState) -> DepNodeIndex {
if local.remaining_node_index == 0 {
let count = 256;
// We assume that there won't be enough active threads to overflow u64 from u32::MAX here.
assert!(self.next_node_index.load(Ordering::Relaxed) <= u32::MAX as u64);
local.next_node_index =
self.next_node_index.fetch_add(count, Ordering::Relaxed).try_into().unwrap();
local.remaining_node_index = count as u32;
}
DepNodeIndex::from_u32(local.next_node_index)
}
#[inline]
fn bump_index(&self, local: &mut LocalEncoderState) {
local.remaining_node_index -= 1;
local.next_node_index += 1;
local.node_count += 1;
}
#[inline]
fn record(
&mut self,
&self,
node: DepNode,
index: DepNodeIndex,
edge_count: usize,
edges: impl FnOnce(&mut Self) -> Vec<DepNodeIndex>,
edges: impl FnOnce(&Self) -> Vec<DepNodeIndex>,
record_graph: &Option<Lock<DepGraphQuery>>,
) -> DepNodeIndex {
self.kind_stats[node.kind.as_usize()] += 1;
self.total_edge_count += edge_count;
local: &mut LocalEncoderState,
) {
local.kind_stats[node.kind.as_usize()] += 1;
local.edge_count += edge_count;
if let Some(record_graph) = &record_graph {
// Call `edges` before the outlined code to allow the closure to be optimized out.
@ -568,40 +618,47 @@ impl<D: Deps> EncoderState<D> {
});
}
if let Some(stats) = &mut self.stats {
if let Some(stats) = &self.stats {
let kind = node.kind;
// Outline the stats code as it's typically disabled and cold.
outline(move || {
let mut stats = stats.lock();
let stat =
stats.entry(kind).or_insert(Stat { kind, node_counter: 0, edge_counter: 0 });
stat.node_counter += 1;
stat.edge_counter += edge_count as u64;
});
}
index
}
#[inline]
fn flush_mem_encoder(&mut self) {
let data = &mut self.mem_encoder.data;
fn flush_mem_encoder(&self, local: &mut LocalEncoderState) {
let data = &mut local.encoder.data;
if data.len() > 64 * 1024 {
self.encoder.emit_raw_bytes(&data[..]);
self.file.lock().as_mut().unwrap().emit_raw_bytes(&data[..]);
data.clear();
}
}
/// Encodes a node to the current graph.
fn encode_node(
&mut self,
&self,
index: DepNodeIndex,
node: &NodeInfo,
record_graph: &Option<Lock<DepGraphQuery>>,
) -> DepNodeIndex {
let index = self.alloc_index();
node.encode::<D>(&mut self.mem_encoder, index);
self.flush_mem_encoder();
self.record(node.node, index, node.edges.len(), |_| node.edges[..].to_vec(), record_graph)
local: &mut LocalEncoderState,
) {
node.encode::<D>(&mut local.encoder, index);
self.flush_mem_encoder(&mut *local);
self.record(
node.node,
index,
node.edges.len(),
|_| node.edges[..].to_vec(),
record_graph,
&mut *local,
);
}
/// Encodes a node that was promoted from the previous graph. It reads the information directly from
@ -612,16 +669,17 @@ impl<D: Deps> EncoderState<D> {
/// It expects all edges to already have a new dep node index assigned.
#[inline]
fn encode_promoted_node(
&mut self,
&self,
index: DepNodeIndex,
prev_index: SerializedDepNodeIndex,
record_graph: &Option<Lock<DepGraphQuery>>,
colors: &DepNodeColorMap,
) -> DepNodeIndex {
let index = self.alloc_index();
local: &mut LocalEncoderState,
) {
let node = self.previous.index_to_node(prev_index);
let fingerprint = self.previous.fingerprint_by_index(prev_index);
let edge_count = NodeInfo::encode_promoted::<D>(
&mut self.mem_encoder,
&mut local.encoder,
node,
index,
fingerprint,
@ -629,7 +687,7 @@ impl<D: Deps> EncoderState<D> {
colors,
&self.previous,
);
self.flush_mem_encoder();
self.flush_mem_encoder(&mut *local);
self.record(
node,
index,
@ -641,38 +699,60 @@ impl<D: Deps> EncoderState<D> {
.collect()
},
record_graph,
&mut *local,
);
index
}
fn finish(self, profiler: &SelfProfilerRef) -> FileEncodeResult {
let Self {
mut encoder,
mem_encoder,
total_node_count,
total_edge_count,
stats: _,
kind_stats,
marker: _,
previous,
} = self;
fn finish(&self, profiler: &SelfProfilerRef, current: &CurrentDepGraph<D>) -> FileEncodeResult {
// Prevent more indices from being allocated.
self.next_node_index.store(u32::MAX as u64 + 1, Ordering::SeqCst);
encoder.emit_raw_bytes(&mem_encoder.data);
let results = broadcast(|_| {
let mut local = self.local.borrow_mut();
let node_count = total_node_count.try_into().unwrap();
let edge_count = total_edge_count.try_into().unwrap();
// Prevent more indices from being allocated on this thread.
local.remaining_node_index = 0;
let data = mem::replace(&mut local.encoder.data, Vec::new());
self.file.lock().as_mut().unwrap().emit_raw_bytes(&data);
LocalEncoderResult {
kind_stats: local.kind_stats.clone(),
node_max: local.next_node_index,
node_count: local.node_count,
edge_count: local.edge_count,
}
});
let mut encoder = self.file.lock().take().unwrap();
let mut kind_stats: Vec<u32> = iter::repeat(0).take(D::DEP_KIND_MAX as usize + 1).collect();
let mut node_max = 0;
let mut node_count = 0;
let mut edge_count = 0;
for result in results {
node_max = max(node_max, result.node_max);
node_count += result.node_count;
edge_count += result.edge_count;
for (i, stat) in result.kind_stats.iter().enumerate() {
kind_stats[i] += stat;
}
}
// Encode the number of each dep kind encountered
for count in kind_stats.iter() {
count.encode(&mut encoder);
}
previous.session_count.checked_add(1).unwrap().encode(&mut encoder);
self.previous.session_count.checked_add(1).unwrap().encode(&mut encoder);
debug!(?node_count, ?edge_count);
debug!(?node_max, ?node_count, ?edge_count);
debug!("position: {:?}", encoder.position());
IntEncodedWithFixedSize(node_count).encode(&mut encoder);
IntEncodedWithFixedSize(edge_count).encode(&mut encoder);
IntEncodedWithFixedSize(node_max.try_into().unwrap()).encode(&mut encoder);
IntEncodedWithFixedSize(node_count.try_into().unwrap()).encode(&mut encoder);
IntEncodedWithFixedSize(edge_count.try_into().unwrap()).encode(&mut encoder);
debug!("position: {:?}", encoder.position());
// Drop the encoder so that nothing is written after the counts.
let result = encoder.finish();
@ -681,44 +761,20 @@ impl<D: Deps> EncoderState<D> {
// don't need a dependency on rustc_incremental just for that.
profiler.artifact_size("dep_graph", "dep-graph.bin", position as u64);
}
self.print_incremental_info(current, node_count, edge_count);
result
}
}
pub(crate) struct GraphEncoder<D: Deps> {
profiler: SelfProfilerRef,
status: Lock<Option<EncoderState<D>>>,
record_graph: Option<Lock<DepGraphQuery>>,
}
impl<D: Deps> GraphEncoder<D> {
pub(crate) fn new(
encoder: FileEncoder,
prev_node_count: usize,
record_graph: bool,
record_stats: bool,
profiler: &SelfProfilerRef,
previous: Arc<SerializedDepGraph>,
) -> Self {
let record_graph = record_graph.then(|| Lock::new(DepGraphQuery::new(prev_node_count)));
let status = Lock::new(Some(EncoderState::new(encoder, record_stats, previous)));
GraphEncoder { status, record_graph, profiler: profiler.clone() }
}
pub(crate) fn with_query(&self, f: impl Fn(&DepGraphQuery)) {
if let Some(record_graph) = &self.record_graph {
f(&record_graph.lock())
}
}
pub(crate) fn print_incremental_info(
fn print_incremental_info(
&self,
total_read_count: u64,
total_duplicate_read_count: u64,
current: &CurrentDepGraph<D>,
total_node_count: usize,
total_edge_count: usize,
) {
let mut status = self.status.lock();
let status = status.as_mut().unwrap();
if let Some(record_stats) = &status.stats {
if let Some(record_stats) = &self.stats {
let record_stats = record_stats.lock();
let mut stats: Vec<_> = record_stats.values().collect();
stats.sort_by_key(|s| -(s.node_counter as i64));
@ -730,10 +786,13 @@ impl<D: Deps> GraphEncoder<D> {
eprintln!("[incremental] DepGraph Statistics");
eprintln!("{SEPARATOR}");
eprintln!("[incremental]");
eprintln!("[incremental] Total Node Count: {}", status.total_node_count);
eprintln!("[incremental] Total Edge Count: {}", status.total_edge_count);
eprintln!("[incremental] Total Node Count: {}", total_node_count);
eprintln!("[incremental] Total Edge Count: {}", total_edge_count);
if cfg!(debug_assertions) {
let total_read_count = current.total_read_count.load(Ordering::Relaxed);
let total_duplicate_read_count =
current.total_duplicate_read_count.load(Ordering::Relaxed);
eprintln!("[incremental] Total Edge Reads: {total_read_count}");
eprintln!("[incremental] Total Duplicate Edge Reads: {total_duplicate_read_count}");
}
@ -747,7 +806,7 @@ impl<D: Deps> GraphEncoder<D> {
for stat in stats {
let node_kind_ratio =
(100.0 * (stat.node_counter as f64)) / (status.total_node_count as f64);
(100.0 * (stat.node_counter as f64)) / (total_node_count as f64);
let node_kind_avg_edges = (stat.edge_counter as f64) / (stat.node_counter as f64);
eprintln!(
@ -763,6 +822,35 @@ impl<D: Deps> GraphEncoder<D> {
eprintln!("[incremental]");
}
}
}
pub(crate) struct GraphEncoder<D: Deps> {
profiler: SelfProfilerRef,
status: EncoderState<D>,
record_graph: Option<Lock<DepGraphQuery>>,
}
impl<D: Deps> GraphEncoder<D> {
pub(crate) fn new(
sess: &Session,
encoder: FileEncoder,
prev_node_count: usize,
previous: Arc<SerializedDepGraph>,
) -> Self {
let record_graph = sess
.opts
.unstable_opts
.query_dep_graph
.then(|| Lock::new(DepGraphQuery::new(prev_node_count)));
let status = EncoderState::new(encoder, sess.opts.unstable_opts.incremental_info, previous);
GraphEncoder { status, record_graph, profiler: sess.prof.clone() }
}
pub(crate) fn with_query(&self, f: impl Fn(&DepGraphQuery)) {
if let Some(record_graph) = &self.record_graph {
f(&record_graph.lock())
}
}
/// Encodes a node that does not exists in the previous graph.
pub(crate) fn send_new(
@ -773,7 +861,11 @@ impl<D: Deps> GraphEncoder<D> {
) -> DepNodeIndex {
let _prof_timer = self.profiler.generic_activity("incr_comp_encode_dep_graph");
let node = NodeInfo { node, fingerprint, edges };
self.status.lock().as_mut().unwrap().encode_node(&node, &self.record_graph)
let mut local = self.status.local.borrow_mut();
let index = self.status.next_index(&mut *local);
self.status.bump_index(&mut *local);
self.status.encode_node(index, &node, &self.record_graph, &mut *local);
index
}
/// Encodes a node that exists in the previous graph, but was re-executed.
@ -791,23 +883,24 @@ impl<D: Deps> GraphEncoder<D> {
let _prof_timer = self.profiler.generic_activity("incr_comp_encode_dep_graph");
let node = NodeInfo { node, fingerprint, edges };
let mut status = self.status.lock();
let status = status.as_mut().unwrap();
let mut local = self.status.local.borrow_mut();
// Check colors inside the lock to avoid racing when `send_promoted` is called concurrently
// on the same index.
match colors.get(prev_index) {
None => {
let dep_node_index = status.encode_node(&node, &self.record_graph);
colors.insert(
prev_index,
if is_green { DepNodeColor::Green(dep_node_index) } else { DepNodeColor::Red },
);
dep_node_index
let index = self.status.next_index(&mut *local);
if is_green {
// Use `try_mark_green` to avoid racing when `send_promoted` is called concurrently
// on the same index.
match colors.try_mark_green(prev_index, index) {
Ok(()) => (),
Err(dep_node_index) => return dep_node_index,
}
Some(DepNodeColor::Green(dep_node_index)) => dep_node_index,
Some(DepNodeColor::Red) => panic!(),
} else {
colors.insert(prev_index, DepNodeColor::Red);
}
self.status.bump_index(&mut *local);
self.status.encode_node(index, &node, &self.record_graph, &mut *local);
index
}
/// Encodes a node that was promoted from the previous graph. It reads the information directly from
@ -822,26 +915,30 @@ impl<D: Deps> GraphEncoder<D> {
) -> DepNodeIndex {
let _prof_timer = self.profiler.generic_activity("incr_comp_encode_dep_graph");
let mut status = self.status.lock();
let status = status.as_mut().unwrap();
let mut local = self.status.local.borrow_mut();
let index = self.status.next_index(&mut *local);
// Check colors inside the lock to avoid racing when `send_promoted` or `send_and_color`
// Use `try_mark_green` to avoid racing when `send_promoted` or `send_and_color`
// is called concurrently on the same index.
match colors.get(prev_index) {
None => {
let dep_node_index =
status.encode_promoted_node(prev_index, &self.record_graph, colors);
colors.insert(prev_index, DepNodeColor::Green(dep_node_index));
dep_node_index
match colors.try_mark_green(prev_index, index) {
Ok(()) => {
self.status.bump_index(&mut *local);
self.status.encode_promoted_node(
index,
prev_index,
&self.record_graph,
colors,
&mut *local,
);
index
}
Some(DepNodeColor::Green(dep_node_index)) => dep_node_index,
Some(DepNodeColor::Red) => panic!(),
Err(dep_node_index) => dep_node_index,
}
}
pub(crate) fn finish(&self) -> FileEncodeResult {
pub(crate) fn finish(&self, current: &CurrentDepGraph<D>) -> FileEncodeResult {
let _prof_timer = self.profiler.generic_activity("incr_comp_encode_dep_graph_finish");
self.status.lock().take().unwrap().finish(&self.profiler)
self.status.finish(&self.profiler, current)
}
}