Auto merge of #139758 - Zoxc:thread-local-graph, r=oli-obk
Use thread local dep graph encoding This adds thread local encoding of dep graph nodes. Each thread has a `MemEncoder` that gets flushed to the global `FileEncoder` when it exceeds 64 kB. Each thread also has a local cache of dep indices. This means there can now be empty gaps in `SerializedDepGraph`. Indices are marked green and also allocated by the new atomic operation `DepNodeColorMap::try_mark_green` as the encoder lock is removed.
This commit is contained in:
commit
3ef8e64ce9
6 changed files with 301 additions and 186 deletions
|
|
@ -43,7 +43,7 @@ pub use self::freeze::{FreezeLock, FreezeReadGuard, FreezeWriteGuard};
|
|||
pub use self::lock::{Lock, LockGuard, Mode};
|
||||
pub use self::mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode};
|
||||
pub use self::parallel::{
|
||||
join, par_for_each_in, par_map, parallel_guard, scope, spawn, try_par_for_each_in,
|
||||
broadcast, join, par_for_each_in, par_map, parallel_guard, scope, spawn, try_par_for_each_in,
|
||||
};
|
||||
pub use self::vec::{AppendOnlyIndexVec, AppendOnlyVec};
|
||||
pub use self::worker_local::{Registry, WorkerLocal};
|
||||
|
|
|
|||
|
|
@ -237,3 +237,13 @@ pub fn par_map<I: DynSend, T: IntoIterator<Item = I>, R: DynSend, C: FromIterato
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn broadcast<R: DynSend>(op: impl Fn(usize) -> R + DynSync) -> Vec<R> {
|
||||
if mode::is_dyn_thread_safe() {
|
||||
let op = FromDyn::from(op);
|
||||
let results = rayon_core::broadcast(|context| op.derive(op(context.index())));
|
||||
results.into_iter().map(|r| r.into_inner()).collect()
|
||||
} else {
|
||||
vec![op(0)]
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,10 +44,6 @@ pub(crate) fn save_dep_graph(tcx: TyCtxt<'_>) {
|
|||
sess.time("assert_dep_graph", || assert_dep_graph(tcx));
|
||||
sess.time("check_dirty_clean", || dirty_clean::check_dirty_clean_annotations(tcx));
|
||||
|
||||
if sess.opts.unstable_opts.incremental_info {
|
||||
tcx.dep_graph.print_incremental_info()
|
||||
}
|
||||
|
||||
join(
|
||||
move || {
|
||||
sess.time("incr_comp_persist_dep_graph", || {
|
||||
|
|
@ -172,12 +168,5 @@ pub(crate) fn build_dep_graph(
|
|||
// First encode the commandline arguments hash
|
||||
sess.opts.dep_tracking_hash(false).encode(&mut encoder);
|
||||
|
||||
Some(DepGraph::new(
|
||||
sess,
|
||||
prev_graph,
|
||||
prev_work_products,
|
||||
encoder,
|
||||
sess.opts.unstable_opts.query_dep_graph,
|
||||
sess.opts.unstable_opts.incremental_info,
|
||||
))
|
||||
Some(DepGraph::new(sess, prev_graph, prev_work_products, encoder))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ use rustc_data_structures::outline;
|
|||
use rustc_data_structures::profiling::QueryInvocationId;
|
||||
use rustc_data_structures::sharded::{self, ShardedHashMap};
|
||||
use rustc_data_structures::stable_hasher::{HashStable, StableHasher};
|
||||
use rustc_data_structures::sync::{AtomicU64, Lock};
|
||||
use rustc_data_structures::sync::{AtomicU64, Lock, is_dyn_thread_safe};
|
||||
use rustc_data_structures::unord::UnordMap;
|
||||
use rustc_errors::DiagInner;
|
||||
use rustc_index::IndexVec;
|
||||
|
|
@ -124,19 +124,11 @@ impl<D: Deps> DepGraph<D> {
|
|||
prev_graph: Arc<SerializedDepGraph>,
|
||||
prev_work_products: WorkProductMap,
|
||||
encoder: FileEncoder,
|
||||
record_graph: bool,
|
||||
record_stats: bool,
|
||||
) -> DepGraph<D> {
|
||||
let prev_graph_node_count = prev_graph.node_count();
|
||||
|
||||
let current = CurrentDepGraph::new(
|
||||
session,
|
||||
prev_graph_node_count,
|
||||
encoder,
|
||||
record_graph,
|
||||
record_stats,
|
||||
Arc::clone(&prev_graph),
|
||||
);
|
||||
let current =
|
||||
CurrentDepGraph::new(session, prev_graph_node_count, encoder, Arc::clone(&prev_graph));
|
||||
|
||||
let colors = DepNodeColorMap::new(prev_graph_node_count);
|
||||
|
||||
|
|
@ -1052,17 +1044,8 @@ impl<D: Deps> DepGraph<D> {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn print_incremental_info(&self) {
|
||||
if let Some(data) = &self.data {
|
||||
data.current.encoder.print_incremental_info(
|
||||
data.current.total_read_count.load(Ordering::Relaxed),
|
||||
data.current.total_duplicate_read_count.load(Ordering::Relaxed),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn finish_encoding(&self) -> FileEncodeResult {
|
||||
if let Some(data) = &self.data { data.current.encoder.finish() } else { Ok(0) }
|
||||
if let Some(data) = &self.data { data.current.encoder.finish(&data.current) } else { Ok(0) }
|
||||
}
|
||||
|
||||
pub(crate) fn next_virtual_depnode_index(&self) -> DepNodeIndex {
|
||||
|
|
@ -1179,8 +1162,8 @@ pub(super) struct CurrentDepGraph<D: Deps> {
|
|||
|
||||
/// These are simple counters that are for profiling and
|
||||
/// debugging and only active with `debug_assertions`.
|
||||
total_read_count: AtomicU64,
|
||||
total_duplicate_read_count: AtomicU64,
|
||||
pub(super) total_read_count: AtomicU64,
|
||||
pub(super) total_duplicate_read_count: AtomicU64,
|
||||
}
|
||||
|
||||
impl<D: Deps> CurrentDepGraph<D> {
|
||||
|
|
@ -1188,8 +1171,6 @@ impl<D: Deps> CurrentDepGraph<D> {
|
|||
session: &Session,
|
||||
prev_graph_node_count: usize,
|
||||
encoder: FileEncoder,
|
||||
record_graph: bool,
|
||||
record_stats: bool,
|
||||
previous: Arc<SerializedDepGraph>,
|
||||
) -> Self {
|
||||
let mut stable_hasher = StableHasher::new();
|
||||
|
|
@ -1211,14 +1192,7 @@ impl<D: Deps> CurrentDepGraph<D> {
|
|||
session.opts.unstable_opts.incremental_verify_ich || cfg!(debug_assertions);
|
||||
|
||||
CurrentDepGraph {
|
||||
encoder: GraphEncoder::new(
|
||||
encoder,
|
||||
prev_graph_node_count,
|
||||
record_graph,
|
||||
record_stats,
|
||||
&session.prof,
|
||||
previous,
|
||||
),
|
||||
encoder: GraphEncoder::new(session, encoder, prev_graph_node_count, previous),
|
||||
anon_node_to_index: ShardedHashMap::with_capacity(
|
||||
// FIXME: The count estimate is off as anon nodes are only a portion of the nodes.
|
||||
new_node_count_estimate / sharded::shards(),
|
||||
|
|
@ -1345,6 +1319,7 @@ impl Default for TaskDeps {
|
|||
// array, using one u32 per entry.
|
||||
pub(super) struct DepNodeColorMap {
|
||||
values: IndexVec<SerializedDepNodeIndex, AtomicU32>,
|
||||
sync: bool,
|
||||
}
|
||||
|
||||
const COMPRESSED_NONE: u32 = u32::MAX;
|
||||
|
|
@ -1353,7 +1328,10 @@ const COMPRESSED_RED: u32 = u32::MAX - 1;
|
|||
impl DepNodeColorMap {
|
||||
fn new(size: usize) -> DepNodeColorMap {
|
||||
debug_assert!(COMPRESSED_RED > DepNodeIndex::MAX_AS_U32);
|
||||
DepNodeColorMap { values: (0..size).map(|_| AtomicU32::new(COMPRESSED_NONE)).collect() }
|
||||
DepNodeColorMap {
|
||||
values: (0..size).map(|_| AtomicU32::new(COMPRESSED_NONE)).collect(),
|
||||
sync: is_dyn_thread_safe(),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
|
@ -1362,6 +1340,37 @@ impl DepNodeColorMap {
|
|||
if value <= DepNodeIndex::MAX_AS_U32 { Some(DepNodeIndex::from_u32(value)) } else { None }
|
||||
}
|
||||
|
||||
/// This tries to atomically mark a node green and assign `index` as the new
|
||||
/// index. This returns `Ok` if `index` gets assigned, otherwise it returns
|
||||
/// the alreadly allocated index in `Err`.
|
||||
#[inline]
|
||||
pub(super) fn try_mark_green(
|
||||
&self,
|
||||
prev_index: SerializedDepNodeIndex,
|
||||
index: DepNodeIndex,
|
||||
) -> Result<(), DepNodeIndex> {
|
||||
let value = &self.values[prev_index];
|
||||
if self.sync {
|
||||
match value.compare_exchange(
|
||||
COMPRESSED_NONE,
|
||||
index.as_u32(),
|
||||
Ordering::Relaxed,
|
||||
Ordering::Relaxed,
|
||||
) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(v) => Err(DepNodeIndex::from_u32(v)),
|
||||
}
|
||||
} else {
|
||||
let v = value.load(Ordering::Relaxed);
|
||||
if v == COMPRESSED_NONE {
|
||||
value.store(index.as_u32(), Ordering::Relaxed);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(DepNodeIndex::from_u32(v))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(super) fn get(&self, index: SerializedDepNodeIndex) -> Option<DepNodeColor> {
|
||||
match self.values[index].load(Ordering::Acquire) {
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ pub(crate) use graph::DepGraphData;
|
|||
pub use graph::{DepGraph, DepNodeIndex, TaskDepsRef, WorkProduct, WorkProductMap, hash_result};
|
||||
pub use query::DepGraphQuery;
|
||||
use rustc_data_structures::profiling::SelfProfilerRef;
|
||||
use rustc_data_structures::sync::DynSync;
|
||||
use rustc_session::Session;
|
||||
pub use serialized::{SerializedDepGraph, SerializedDepNodeIndex};
|
||||
use tracing::instrument;
|
||||
|
|
@ -89,7 +90,7 @@ pub trait DepContext: Copy {
|
|||
}
|
||||
}
|
||||
|
||||
pub trait Deps {
|
||||
pub trait Deps: DynSync {
|
||||
/// Execute the operation with provided dependencies.
|
||||
fn with_deps<OP, R>(deps: TaskDepsRef<'_>, op: OP) -> R
|
||||
where
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@
|
|||
//! node and edge count are stored at the end of the file, all the arrays can be
|
||||
//! pre-allocated with the right length.
|
||||
//!
|
||||
//! The encoding of the de-pgraph is generally designed around the fact that fixed-size
|
||||
//! The encoding of the dep-graph is generally designed around the fact that fixed-size
|
||||
//! reads of encoded data are generally faster than variable-sized reads. Ergo we adopt
|
||||
//! essentially the same varint encoding scheme used in the rmeta format; the edge lists
|
||||
//! for each node on the graph store a 2-bit integer which is the number of bytes per edge
|
||||
|
|
@ -34,24 +34,32 @@
|
|||
//! [`DepKind`], number of edges, and bytes per edge are all bit-packed together, if they fit.
|
||||
//! If the number of edges in this node does not fit in the bits available in the header, we
|
||||
//! store it directly after the header with leb128.
|
||||
//!
|
||||
//! Dep-graph indices are bulk allocated to threads inside `LocalEncoderState`. Having threads
|
||||
//! own these indices helps avoid races when they are conditionally used when marking nodes green.
|
||||
//! It also reduces congestion on the shared index count.
|
||||
|
||||
use std::iter;
|
||||
use std::cell::RefCell;
|
||||
use std::cmp::max;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::{iter, mem, u64};
|
||||
|
||||
use rustc_data_structures::fingerprint::{Fingerprint, PackedFingerprint};
|
||||
use rustc_data_structures::fx::FxHashMap;
|
||||
use rustc_data_structures::outline;
|
||||
use rustc_data_structures::profiling::SelfProfilerRef;
|
||||
use rustc_data_structures::sync::Lock;
|
||||
use rustc_data_structures::sync::{AtomicU64, Lock, WorkerLocal, broadcast};
|
||||
use rustc_data_structures::unhash::UnhashMap;
|
||||
use rustc_index::{Idx, IndexVec};
|
||||
use rustc_index::IndexVec;
|
||||
use rustc_serialize::opaque::mem_encoder::MemEncoder;
|
||||
use rustc_serialize::opaque::{FileEncodeResult, FileEncoder, IntEncodedWithFixedSize, MemDecoder};
|
||||
use rustc_serialize::{Decodable, Decoder, Encodable, Encoder};
|
||||
use rustc_session::Session;
|
||||
use tracing::{debug, instrument};
|
||||
|
||||
use super::graph::{DepNodeColor, DepNodeColorMap};
|
||||
use super::graph::{CurrentDepGraph, DepNodeColor, DepNodeColorMap};
|
||||
use super::query::DepGraphQuery;
|
||||
use super::{DepKind, DepNode, DepNodeIndex, Deps};
|
||||
use crate::dep_graph::edges::EdgesVec;
|
||||
|
|
@ -76,6 +84,9 @@ const DEP_NODE_PAD: usize = DEP_NODE_SIZE - 1;
|
|||
const DEP_NODE_WIDTH_BITS: usize = DEP_NODE_SIZE / 2;
|
||||
|
||||
/// Data for use when recompiling the **current crate**.
|
||||
///
|
||||
/// There may be unused indices with DEP_KIND_NULL in this graph due to batch allocation of
|
||||
/// indices to threads.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct SerializedDepGraph {
|
||||
/// The set of all DepNodes in the graph
|
||||
|
|
@ -184,26 +195,30 @@ impl SerializedDepGraph {
|
|||
pub fn decode<D: Deps>(d: &mut MemDecoder<'_>, deps: &D) -> Arc<SerializedDepGraph> {
|
||||
// The last 16 bytes are the node count and edge count.
|
||||
debug!("position: {:?}", d.position());
|
||||
let (node_count, edge_count) =
|
||||
d.with_position(d.len() - 2 * IntEncodedWithFixedSize::ENCODED_SIZE, |d| {
|
||||
|
||||
// `node_max` is the number of indices including empty nodes while `node_count`
|
||||
// is the number of actually encoded nodes.
|
||||
let (node_max, node_count, edge_count) =
|
||||
d.with_position(d.len() - 3 * IntEncodedWithFixedSize::ENCODED_SIZE, |d| {
|
||||
debug!("position: {:?}", d.position());
|
||||
let node_max = IntEncodedWithFixedSize::decode(d).0 as usize;
|
||||
let node_count = IntEncodedWithFixedSize::decode(d).0 as usize;
|
||||
let edge_count = IntEncodedWithFixedSize::decode(d).0 as usize;
|
||||
(node_count, edge_count)
|
||||
(node_max, node_count, edge_count)
|
||||
});
|
||||
debug!("position: {:?}", d.position());
|
||||
|
||||
debug!(?node_count, ?edge_count);
|
||||
|
||||
let graph_bytes = d.len() - (2 * IntEncodedWithFixedSize::ENCODED_SIZE) - d.position();
|
||||
let graph_bytes = d.len() - (3 * IntEncodedWithFixedSize::ENCODED_SIZE) - d.position();
|
||||
|
||||
let mut nodes = IndexVec::from_elem_n(
|
||||
DepNode { kind: D::DEP_KIND_NULL, hash: PackedFingerprint::from(Fingerprint::ZERO) },
|
||||
node_count,
|
||||
node_max,
|
||||
);
|
||||
let mut fingerprints = IndexVec::from_elem_n(Fingerprint::ZERO, node_count);
|
||||
let mut fingerprints = IndexVec::from_elem_n(Fingerprint::ZERO, node_max);
|
||||
let mut edge_list_indices =
|
||||
IndexVec::from_elem_n(EdgeHeader { repr: 0, num_edges: 0 }, node_count);
|
||||
IndexVec::from_elem_n(EdgeHeader { repr: 0, num_edges: 0 }, node_max);
|
||||
|
||||
// This estimation assumes that all of the encoded bytes are for the edge lists or for the
|
||||
// fixed-size node headers. But that's not necessarily true; if any edge list has a length
|
||||
|
|
@ -217,7 +232,7 @@ impl SerializedDepGraph {
|
|||
let mut edge_list_data =
|
||||
Vec::with_capacity(graph_bytes - node_count * size_of::<SerializedNodeHeader<D>>());
|
||||
|
||||
for _index in 0..node_count {
|
||||
for _ in 0..node_count {
|
||||
// Decode the header for this edge; the header packs together as many of the fixed-size
|
||||
// fields as possible to limit the number of times we update decoder state.
|
||||
let node_header =
|
||||
|
|
@ -263,8 +278,8 @@ impl SerializedDepGraph {
|
|||
|
||||
for (idx, node) in nodes.iter_enumerated() {
|
||||
if index[node.kind.as_usize()].insert(node.hash, idx).is_some() {
|
||||
// Side effect nodes can have duplicates
|
||||
if node.kind != D::DEP_KIND_SIDE_EFFECT {
|
||||
// Empty nodes and side effect nodes can have duplicates
|
||||
if node.kind != D::DEP_KIND_NULL && node.kind != D::DEP_KIND_SIDE_EFFECT {
|
||||
let name = deps.name(node.kind);
|
||||
panic!(
|
||||
"Error: A dep graph node ({name}) does not have an unique index. \
|
||||
|
|
@ -508,17 +523,32 @@ struct Stat {
|
|||
edge_counter: u64,
|
||||
}
|
||||
|
||||
struct EncoderState<D: Deps> {
|
||||
previous: Arc<SerializedDepGraph>,
|
||||
encoder: FileEncoder,
|
||||
total_node_count: usize,
|
||||
total_edge_count: usize,
|
||||
stats: Option<FxHashMap<DepKind, Stat>>,
|
||||
|
||||
mem_encoder: MemEncoder,
|
||||
struct LocalEncoderState {
|
||||
next_node_index: u32,
|
||||
remaining_node_index: u32,
|
||||
encoder: MemEncoder,
|
||||
node_count: usize,
|
||||
edge_count: usize,
|
||||
|
||||
/// Stores the number of times we've encoded each dep kind.
|
||||
kind_stats: Vec<u32>,
|
||||
}
|
||||
|
||||
struct LocalEncoderResult {
|
||||
node_max: u32,
|
||||
node_count: usize,
|
||||
edge_count: usize,
|
||||
|
||||
/// Stores the number of times we've encoded each dep kind.
|
||||
kind_stats: Vec<u32>,
|
||||
}
|
||||
|
||||
struct EncoderState<D: Deps> {
|
||||
next_node_index: AtomicU64,
|
||||
previous: Arc<SerializedDepGraph>,
|
||||
file: Lock<Option<FileEncoder>>,
|
||||
local: WorkerLocal<RefCell<LocalEncoderState>>,
|
||||
stats: Option<Lock<FxHashMap<DepKind, Stat>>>,
|
||||
marker: PhantomData<D>,
|
||||
}
|
||||
|
||||
|
|
@ -526,34 +556,63 @@ impl<D: Deps> EncoderState<D> {
|
|||
fn new(encoder: FileEncoder, record_stats: bool, previous: Arc<SerializedDepGraph>) -> Self {
|
||||
Self {
|
||||
previous,
|
||||
encoder,
|
||||
total_edge_count: 0,
|
||||
total_node_count: 0,
|
||||
stats: record_stats.then(FxHashMap::default),
|
||||
mem_encoder: MemEncoder::new(),
|
||||
kind_stats: iter::repeat(0).take(D::DEP_KIND_MAX as usize + 1).collect(),
|
||||
next_node_index: AtomicU64::new(0),
|
||||
stats: record_stats.then(|| Lock::new(FxHashMap::default())),
|
||||
file: Lock::new(Some(encoder)),
|
||||
local: WorkerLocal::new(|_| {
|
||||
RefCell::new(LocalEncoderState {
|
||||
next_node_index: 0,
|
||||
remaining_node_index: 0,
|
||||
edge_count: 0,
|
||||
node_count: 0,
|
||||
encoder: MemEncoder::new(),
|
||||
kind_stats: iter::repeat(0).take(D::DEP_KIND_MAX as usize + 1).collect(),
|
||||
})
|
||||
}),
|
||||
marker: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn alloc_index(&mut self) -> DepNodeIndex {
|
||||
let index = DepNodeIndex::new(self.total_node_count);
|
||||
self.total_node_count += 1;
|
||||
index
|
||||
fn next_index(&self, local: &mut LocalEncoderState) -> DepNodeIndex {
|
||||
if local.remaining_node_index == 0 {
|
||||
const COUNT: u32 = 256;
|
||||
|
||||
// We assume that there won't be enough active threads to overflow `u64` from `u32::MAX` here.
|
||||
// This can exceed u32::MAX by at most `N` * `COUNT` where `N` is the thread pool count since
|
||||
// `try_into().unwrap()` will make threads panic when `self.next_node_index` exceeds u32::MAX.
|
||||
local.next_node_index =
|
||||
self.next_node_index.fetch_add(COUNT as u64, Ordering::Relaxed).try_into().unwrap();
|
||||
|
||||
// Check that we'll stay within `u32`
|
||||
local.next_node_index.checked_add(COUNT).unwrap();
|
||||
|
||||
local.remaining_node_index = COUNT;
|
||||
}
|
||||
|
||||
DepNodeIndex::from_u32(local.next_node_index)
|
||||
}
|
||||
|
||||
/// Marks the index previously returned by `next_index` as used.
|
||||
#[inline]
|
||||
fn bump_index(&self, local: &mut LocalEncoderState) {
|
||||
local.remaining_node_index -= 1;
|
||||
local.next_node_index += 1;
|
||||
local.node_count += 1;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record(
|
||||
&mut self,
|
||||
&self,
|
||||
node: DepNode,
|
||||
index: DepNodeIndex,
|
||||
edge_count: usize,
|
||||
edges: impl FnOnce(&mut Self) -> Vec<DepNodeIndex>,
|
||||
edges: impl FnOnce(&Self) -> Vec<DepNodeIndex>,
|
||||
record_graph: &Option<Lock<DepGraphQuery>>,
|
||||
) -> DepNodeIndex {
|
||||
self.kind_stats[node.kind.as_usize()] += 1;
|
||||
self.total_edge_count += edge_count;
|
||||
local: &mut LocalEncoderState,
|
||||
) {
|
||||
local.kind_stats[node.kind.as_usize()] += 1;
|
||||
local.edge_count += edge_count;
|
||||
|
||||
if let Some(record_graph) = &record_graph {
|
||||
// Call `edges` before the outlined code to allow the closure to be optimized out.
|
||||
|
|
@ -568,40 +627,47 @@ impl<D: Deps> EncoderState<D> {
|
|||
});
|
||||
}
|
||||
|
||||
if let Some(stats) = &mut self.stats {
|
||||
if let Some(stats) = &self.stats {
|
||||
let kind = node.kind;
|
||||
|
||||
// Outline the stats code as it's typically disabled and cold.
|
||||
outline(move || {
|
||||
let mut stats = stats.lock();
|
||||
let stat =
|
||||
stats.entry(kind).or_insert(Stat { kind, node_counter: 0, edge_counter: 0 });
|
||||
stat.node_counter += 1;
|
||||
stat.edge_counter += edge_count as u64;
|
||||
});
|
||||
}
|
||||
|
||||
index
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn flush_mem_encoder(&mut self) {
|
||||
let data = &mut self.mem_encoder.data;
|
||||
fn flush_mem_encoder(&self, local: &mut LocalEncoderState) {
|
||||
let data = &mut local.encoder.data;
|
||||
if data.len() > 64 * 1024 {
|
||||
self.encoder.emit_raw_bytes(&data[..]);
|
||||
self.file.lock().as_mut().unwrap().emit_raw_bytes(&data[..]);
|
||||
data.clear();
|
||||
}
|
||||
}
|
||||
|
||||
/// Encodes a node to the current graph.
|
||||
fn encode_node(
|
||||
&mut self,
|
||||
&self,
|
||||
index: DepNodeIndex,
|
||||
node: &NodeInfo,
|
||||
record_graph: &Option<Lock<DepGraphQuery>>,
|
||||
) -> DepNodeIndex {
|
||||
let index = self.alloc_index();
|
||||
node.encode::<D>(&mut self.mem_encoder, index);
|
||||
self.flush_mem_encoder();
|
||||
self.record(node.node, index, node.edges.len(), |_| node.edges[..].to_vec(), record_graph)
|
||||
local: &mut LocalEncoderState,
|
||||
) {
|
||||
node.encode::<D>(&mut local.encoder, index);
|
||||
self.flush_mem_encoder(&mut *local);
|
||||
self.record(
|
||||
node.node,
|
||||
index,
|
||||
node.edges.len(),
|
||||
|_| node.edges[..].to_vec(),
|
||||
record_graph,
|
||||
&mut *local,
|
||||
);
|
||||
}
|
||||
|
||||
/// Encodes a node that was promoted from the previous graph. It reads the information directly from
|
||||
|
|
@ -612,16 +678,17 @@ impl<D: Deps> EncoderState<D> {
|
|||
/// It expects all edges to already have a new dep node index assigned.
|
||||
#[inline]
|
||||
fn encode_promoted_node(
|
||||
&mut self,
|
||||
&self,
|
||||
index: DepNodeIndex,
|
||||
prev_index: SerializedDepNodeIndex,
|
||||
record_graph: &Option<Lock<DepGraphQuery>>,
|
||||
colors: &DepNodeColorMap,
|
||||
) -> DepNodeIndex {
|
||||
let index = self.alloc_index();
|
||||
local: &mut LocalEncoderState,
|
||||
) {
|
||||
let node = self.previous.index_to_node(prev_index);
|
||||
let fingerprint = self.previous.fingerprint_by_index(prev_index);
|
||||
let edge_count = NodeInfo::encode_promoted::<D>(
|
||||
&mut self.mem_encoder,
|
||||
&mut local.encoder,
|
||||
node,
|
||||
index,
|
||||
fingerprint,
|
||||
|
|
@ -629,7 +696,7 @@ impl<D: Deps> EncoderState<D> {
|
|||
colors,
|
||||
&self.previous,
|
||||
);
|
||||
self.flush_mem_encoder();
|
||||
self.flush_mem_encoder(&mut *local);
|
||||
self.record(
|
||||
node,
|
||||
index,
|
||||
|
|
@ -641,38 +708,60 @@ impl<D: Deps> EncoderState<D> {
|
|||
.collect()
|
||||
},
|
||||
record_graph,
|
||||
&mut *local,
|
||||
);
|
||||
index
|
||||
}
|
||||
|
||||
fn finish(self, profiler: &SelfProfilerRef) -> FileEncodeResult {
|
||||
let Self {
|
||||
mut encoder,
|
||||
mem_encoder,
|
||||
total_node_count,
|
||||
total_edge_count,
|
||||
stats: _,
|
||||
kind_stats,
|
||||
marker: _,
|
||||
previous,
|
||||
} = self;
|
||||
fn finish(&self, profiler: &SelfProfilerRef, current: &CurrentDepGraph<D>) -> FileEncodeResult {
|
||||
// Prevent more indices from being allocated.
|
||||
self.next_node_index.store(u32::MAX as u64 + 1, Ordering::SeqCst);
|
||||
|
||||
encoder.emit_raw_bytes(&mem_encoder.data);
|
||||
let results = broadcast(|_| {
|
||||
let mut local = self.local.borrow_mut();
|
||||
|
||||
let node_count = total_node_count.try_into().unwrap();
|
||||
let edge_count = total_edge_count.try_into().unwrap();
|
||||
// Prevent more indices from being allocated on this thread.
|
||||
local.remaining_node_index = 0;
|
||||
|
||||
let data = mem::replace(&mut local.encoder.data, Vec::new());
|
||||
self.file.lock().as_mut().unwrap().emit_raw_bytes(&data);
|
||||
|
||||
LocalEncoderResult {
|
||||
kind_stats: local.kind_stats.clone(),
|
||||
node_max: local.next_node_index,
|
||||
node_count: local.node_count,
|
||||
edge_count: local.edge_count,
|
||||
}
|
||||
});
|
||||
|
||||
let mut encoder = self.file.lock().take().unwrap();
|
||||
|
||||
let mut kind_stats: Vec<u32> = iter::repeat(0).take(D::DEP_KIND_MAX as usize + 1).collect();
|
||||
|
||||
let mut node_max = 0;
|
||||
let mut node_count = 0;
|
||||
let mut edge_count = 0;
|
||||
|
||||
for result in results {
|
||||
node_max = max(node_max, result.node_max);
|
||||
node_count += result.node_count;
|
||||
edge_count += result.edge_count;
|
||||
for (i, stat) in result.kind_stats.iter().enumerate() {
|
||||
kind_stats[i] += stat;
|
||||
}
|
||||
}
|
||||
|
||||
// Encode the number of each dep kind encountered
|
||||
for count in kind_stats.iter() {
|
||||
count.encode(&mut encoder);
|
||||
}
|
||||
|
||||
previous.session_count.checked_add(1).unwrap().encode(&mut encoder);
|
||||
self.previous.session_count.checked_add(1).unwrap().encode(&mut encoder);
|
||||
|
||||
debug!(?node_count, ?edge_count);
|
||||
debug!(?node_max, ?node_count, ?edge_count);
|
||||
debug!("position: {:?}", encoder.position());
|
||||
IntEncodedWithFixedSize(node_count).encode(&mut encoder);
|
||||
IntEncodedWithFixedSize(edge_count).encode(&mut encoder);
|
||||
IntEncodedWithFixedSize(node_max.try_into().unwrap()).encode(&mut encoder);
|
||||
IntEncodedWithFixedSize(node_count.try_into().unwrap()).encode(&mut encoder);
|
||||
IntEncodedWithFixedSize(edge_count.try_into().unwrap()).encode(&mut encoder);
|
||||
debug!("position: {:?}", encoder.position());
|
||||
// Drop the encoder so that nothing is written after the counts.
|
||||
let result = encoder.finish();
|
||||
|
|
@ -681,44 +770,20 @@ impl<D: Deps> EncoderState<D> {
|
|||
// don't need a dependency on rustc_incremental just for that.
|
||||
profiler.artifact_size("dep_graph", "dep-graph.bin", position as u64);
|
||||
}
|
||||
|
||||
self.print_incremental_info(current, node_count, edge_count);
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct GraphEncoder<D: Deps> {
|
||||
profiler: SelfProfilerRef,
|
||||
status: Lock<Option<EncoderState<D>>>,
|
||||
record_graph: Option<Lock<DepGraphQuery>>,
|
||||
}
|
||||
|
||||
impl<D: Deps> GraphEncoder<D> {
|
||||
pub(crate) fn new(
|
||||
encoder: FileEncoder,
|
||||
prev_node_count: usize,
|
||||
record_graph: bool,
|
||||
record_stats: bool,
|
||||
profiler: &SelfProfilerRef,
|
||||
previous: Arc<SerializedDepGraph>,
|
||||
) -> Self {
|
||||
let record_graph = record_graph.then(|| Lock::new(DepGraphQuery::new(prev_node_count)));
|
||||
let status = Lock::new(Some(EncoderState::new(encoder, record_stats, previous)));
|
||||
GraphEncoder { status, record_graph, profiler: profiler.clone() }
|
||||
}
|
||||
|
||||
pub(crate) fn with_query(&self, f: impl Fn(&DepGraphQuery)) {
|
||||
if let Some(record_graph) = &self.record_graph {
|
||||
f(&record_graph.lock())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn print_incremental_info(
|
||||
fn print_incremental_info(
|
||||
&self,
|
||||
total_read_count: u64,
|
||||
total_duplicate_read_count: u64,
|
||||
current: &CurrentDepGraph<D>,
|
||||
total_node_count: usize,
|
||||
total_edge_count: usize,
|
||||
) {
|
||||
let mut status = self.status.lock();
|
||||
let status = status.as_mut().unwrap();
|
||||
if let Some(record_stats) = &status.stats {
|
||||
if let Some(record_stats) = &self.stats {
|
||||
let record_stats = record_stats.lock();
|
||||
let mut stats: Vec<_> = record_stats.values().collect();
|
||||
stats.sort_by_key(|s| -(s.node_counter as i64));
|
||||
|
||||
|
|
@ -730,10 +795,13 @@ impl<D: Deps> GraphEncoder<D> {
|
|||
eprintln!("[incremental] DepGraph Statistics");
|
||||
eprintln!("{SEPARATOR}");
|
||||
eprintln!("[incremental]");
|
||||
eprintln!("[incremental] Total Node Count: {}", status.total_node_count);
|
||||
eprintln!("[incremental] Total Edge Count: {}", status.total_edge_count);
|
||||
eprintln!("[incremental] Total Node Count: {}", total_node_count);
|
||||
eprintln!("[incremental] Total Edge Count: {}", total_edge_count);
|
||||
|
||||
if cfg!(debug_assertions) {
|
||||
let total_read_count = current.total_read_count.load(Ordering::Relaxed);
|
||||
let total_duplicate_read_count =
|
||||
current.total_duplicate_read_count.load(Ordering::Relaxed);
|
||||
eprintln!("[incremental] Total Edge Reads: {total_read_count}");
|
||||
eprintln!("[incremental] Total Duplicate Edge Reads: {total_duplicate_read_count}");
|
||||
}
|
||||
|
|
@ -747,7 +815,7 @@ impl<D: Deps> GraphEncoder<D> {
|
|||
|
||||
for stat in stats {
|
||||
let node_kind_ratio =
|
||||
(100.0 * (stat.node_counter as f64)) / (status.total_node_count as f64);
|
||||
(100.0 * (stat.node_counter as f64)) / (total_node_count as f64);
|
||||
let node_kind_avg_edges = (stat.edge_counter as f64) / (stat.node_counter as f64);
|
||||
|
||||
eprintln!(
|
||||
|
|
@ -763,6 +831,35 @@ impl<D: Deps> GraphEncoder<D> {
|
|||
eprintln!("[incremental]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct GraphEncoder<D: Deps> {
|
||||
profiler: SelfProfilerRef,
|
||||
status: EncoderState<D>,
|
||||
record_graph: Option<Lock<DepGraphQuery>>,
|
||||
}
|
||||
|
||||
impl<D: Deps> GraphEncoder<D> {
|
||||
pub(crate) fn new(
|
||||
sess: &Session,
|
||||
encoder: FileEncoder,
|
||||
prev_node_count: usize,
|
||||
previous: Arc<SerializedDepGraph>,
|
||||
) -> Self {
|
||||
let record_graph = sess
|
||||
.opts
|
||||
.unstable_opts
|
||||
.query_dep_graph
|
||||
.then(|| Lock::new(DepGraphQuery::new(prev_node_count)));
|
||||
let status = EncoderState::new(encoder, sess.opts.unstable_opts.incremental_info, previous);
|
||||
GraphEncoder { status, record_graph, profiler: sess.prof.clone() }
|
||||
}
|
||||
|
||||
pub(crate) fn with_query(&self, f: impl Fn(&DepGraphQuery)) {
|
||||
if let Some(record_graph) = &self.record_graph {
|
||||
f(&record_graph.lock())
|
||||
}
|
||||
}
|
||||
|
||||
/// Encodes a node that does not exists in the previous graph.
|
||||
pub(crate) fn send_new(
|
||||
|
|
@ -773,7 +870,11 @@ impl<D: Deps> GraphEncoder<D> {
|
|||
) -> DepNodeIndex {
|
||||
let _prof_timer = self.profiler.generic_activity("incr_comp_encode_dep_graph");
|
||||
let node = NodeInfo { node, fingerprint, edges };
|
||||
self.status.lock().as_mut().unwrap().encode_node(&node, &self.record_graph)
|
||||
let mut local = self.status.local.borrow_mut();
|
||||
let index = self.status.next_index(&mut *local);
|
||||
self.status.bump_index(&mut *local);
|
||||
self.status.encode_node(index, &node, &self.record_graph, &mut *local);
|
||||
index
|
||||
}
|
||||
|
||||
/// Encodes a node that exists in the previous graph, but was re-executed.
|
||||
|
|
@ -791,23 +892,24 @@ impl<D: Deps> GraphEncoder<D> {
|
|||
let _prof_timer = self.profiler.generic_activity("incr_comp_encode_dep_graph");
|
||||
let node = NodeInfo { node, fingerprint, edges };
|
||||
|
||||
let mut status = self.status.lock();
|
||||
let status = status.as_mut().unwrap();
|
||||
let mut local = self.status.local.borrow_mut();
|
||||
|
||||
// Check colors inside the lock to avoid racing when `send_promoted` is called concurrently
|
||||
// on the same index.
|
||||
match colors.get(prev_index) {
|
||||
None => {
|
||||
let dep_node_index = status.encode_node(&node, &self.record_graph);
|
||||
colors.insert(
|
||||
prev_index,
|
||||
if is_green { DepNodeColor::Green(dep_node_index) } else { DepNodeColor::Red },
|
||||
);
|
||||
dep_node_index
|
||||
let index = self.status.next_index(&mut *local);
|
||||
|
||||
if is_green {
|
||||
// Use `try_mark_green` to avoid racing when `send_promoted` is called concurrently
|
||||
// on the same index.
|
||||
match colors.try_mark_green(prev_index, index) {
|
||||
Ok(()) => (),
|
||||
Err(dep_node_index) => return dep_node_index,
|
||||
}
|
||||
Some(DepNodeColor::Green(dep_node_index)) => dep_node_index,
|
||||
Some(DepNodeColor::Red) => panic!(),
|
||||
} else {
|
||||
colors.insert(prev_index, DepNodeColor::Red);
|
||||
}
|
||||
|
||||
self.status.bump_index(&mut *local);
|
||||
self.status.encode_node(index, &node, &self.record_graph, &mut *local);
|
||||
index
|
||||
}
|
||||
|
||||
/// Encodes a node that was promoted from the previous graph. It reads the information directly from
|
||||
|
|
@ -822,26 +924,30 @@ impl<D: Deps> GraphEncoder<D> {
|
|||
) -> DepNodeIndex {
|
||||
let _prof_timer = self.profiler.generic_activity("incr_comp_encode_dep_graph");
|
||||
|
||||
let mut status = self.status.lock();
|
||||
let status = status.as_mut().unwrap();
|
||||
let mut local = self.status.local.borrow_mut();
|
||||
let index = self.status.next_index(&mut *local);
|
||||
|
||||
// Check colors inside the lock to avoid racing when `send_promoted` or `send_and_color`
|
||||
// Use `try_mark_green` to avoid racing when `send_promoted` or `send_and_color`
|
||||
// is called concurrently on the same index.
|
||||
match colors.get(prev_index) {
|
||||
None => {
|
||||
let dep_node_index =
|
||||
status.encode_promoted_node(prev_index, &self.record_graph, colors);
|
||||
colors.insert(prev_index, DepNodeColor::Green(dep_node_index));
|
||||
dep_node_index
|
||||
match colors.try_mark_green(prev_index, index) {
|
||||
Ok(()) => {
|
||||
self.status.bump_index(&mut *local);
|
||||
self.status.encode_promoted_node(
|
||||
index,
|
||||
prev_index,
|
||||
&self.record_graph,
|
||||
colors,
|
||||
&mut *local,
|
||||
);
|
||||
index
|
||||
}
|
||||
Some(DepNodeColor::Green(dep_node_index)) => dep_node_index,
|
||||
Some(DepNodeColor::Red) => panic!(),
|
||||
Err(dep_node_index) => dep_node_index,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn finish(&self) -> FileEncodeResult {
|
||||
pub(crate) fn finish(&self, current: &CurrentDepGraph<D>) -> FileEncodeResult {
|
||||
let _prof_timer = self.profiler.generic_activity("incr_comp_encode_dep_graph_finish");
|
||||
|
||||
self.status.lock().take().unwrap().finish(&self.profiler)
|
||||
self.status.finish(&self.profiler, current)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue