forked from solana-labs/solana
-
Notifications
You must be signed in to change notification settings - Fork 271
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add custom wrapper around jemalloc to track allocator usage by thread…
… name in metrics
- Loading branch information
1 parent
324b055
commit 414d8b5
Showing
9 changed files
with
423 additions
and
52 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,213 @@ | ||
use { | ||
jemallocator::Jemalloc, | ||
std::{ | ||
alloc::{GlobalAlloc, Layout}, | ||
cell::RefCell, | ||
sync::{ | ||
atomic::{AtomicUsize, Ordering}, | ||
RwLock, | ||
}, | ||
}, | ||
}; | ||
|
||
const NAME_LEN: usize = 16; | ||
type ThreadName = arrayvec::ArrayVec<u8, NAME_LEN>; | ||
|
||
static SELF: JemWrapStats = JemWrapStats { | ||
named_thread_stats: RwLock::new(None), | ||
unnamed_thread_stats: Counters::new(), | ||
process_stats: Counters::new(), | ||
}; | ||
|
||
#[derive(Debug)] | ||
pub struct Counters { | ||
allocations_total: AtomicUsize, | ||
deallocations_total: AtomicUsize, | ||
bytes_allocated_total: AtomicUsize, | ||
bytes_deallocated_total: AtomicUsize, | ||
} | ||
|
||
pub struct CountersView { | ||
pub allocations_total: usize, | ||
pub deallocations_total: usize, | ||
pub bytes_allocated_total: usize, | ||
pub bytes_deallocated_total: usize, | ||
} | ||
|
||
impl Default for Counters { | ||
fn default() -> Self { | ||
Self::new() | ||
} | ||
} | ||
|
||
impl Counters { | ||
pub fn view(&self) -> CountersView { | ||
CountersView { | ||
allocations_total: self.allocations_total.load(Ordering::Relaxed), | ||
deallocations_total: self.deallocations_total.load(Ordering::Relaxed), | ||
bytes_allocated_total: self.bytes_allocated_total.load(Ordering::Relaxed), | ||
bytes_deallocated_total: self.bytes_deallocated_total.load(Ordering::Relaxed), | ||
} | ||
} | ||
|
||
const fn new() -> Self { | ||
Self { | ||
allocations_total: AtomicUsize::new(0), | ||
deallocations_total: AtomicUsize::new(0), | ||
bytes_allocated_total: AtomicUsize::new(0), | ||
bytes_deallocated_total: AtomicUsize::new(0), | ||
} | ||
} | ||
} | ||
|
||
impl Counters { | ||
pub fn alloc(&self, size: usize) { | ||
self.bytes_allocated_total | ||
.fetch_add(size, Ordering::Relaxed); | ||
self.allocations_total.fetch_add(1, Ordering::Relaxed); | ||
} | ||
pub fn dealloc(&self, size: usize) { | ||
self.bytes_deallocated_total | ||
.fetch_add(size, Ordering::Relaxed); | ||
self.deallocations_total.fetch_add(1, Ordering::Relaxed); | ||
} | ||
} | ||
|
||
#[repr(C, align(4096))] | ||
pub struct JemWrapAllocator { | ||
jemalloc: Jemalloc, | ||
} | ||
|
||
impl JemWrapAllocator { | ||
pub const fn new() -> Self { | ||
Self { jemalloc: Jemalloc } | ||
} | ||
} | ||
|
||
impl Default for JemWrapAllocator { | ||
fn default() -> Self { | ||
Self::new() | ||
} | ||
} | ||
|
||
struct JemWrapStats { | ||
pub named_thread_stats: RwLock<Option<MemPoolStats>>, | ||
pub unnamed_thread_stats: Counters, | ||
pub process_stats: Counters, | ||
} | ||
|
||
pub fn view_allocations(f: impl FnOnce(&MemPoolStats)) { | ||
let lock_guard = &SELF.named_thread_stats.read().unwrap(); | ||
if let Some(stats) = lock_guard.as_ref() { | ||
f(stats); | ||
} | ||
} | ||
pub fn view_global_allocations() -> (CountersView, CountersView) { | ||
(SELF.unnamed_thread_stats.view(), SELF.process_stats.view()) | ||
} | ||
|
||
#[derive(Debug, Default)] | ||
pub struct MemPoolStats { | ||
pub data: Vec<(ThreadName, Counters)>, | ||
} | ||
|
||
impl MemPoolStats { | ||
pub fn add(&mut self, prefix: &str) { | ||
let key: ThreadName = prefix | ||
.as_bytes() | ||
.try_into() | ||
.unwrap_or_else(|_| panic!("Prefix can not be over {} bytes long", NAME_LEN)); | ||
|
||
self.data.push((key, Counters::default())); | ||
} | ||
} | ||
|
||
pub fn init_allocator(mps: MemPoolStats) { | ||
SELF.named_thread_stats.write().unwrap().replace(mps); | ||
} | ||
|
||
pub fn deinit_allocator() -> MemPoolStats { | ||
SELF.named_thread_stats.write().unwrap().take().unwrap() | ||
} | ||
|
||
unsafe impl Sync for JemWrapAllocator {} | ||
|
||
unsafe impl GlobalAlloc for JemWrapAllocator { | ||
unsafe fn alloc(&self, layout: Layout) -> *mut u8 { | ||
let alloc = self.jemalloc.alloc(layout); | ||
if alloc.is_null() { | ||
return alloc; | ||
} | ||
SELF.process_stats.alloc(layout.size()); | ||
if let Ok(stats) = SELF.named_thread_stats.try_read() { | ||
if let Some(stats) = stats.as_ref() { | ||
if let Some(stats) = match_thread_name_safely(stats, true) { | ||
stats.alloc(layout.size()); | ||
} | ||
} | ||
} else { | ||
SELF.unnamed_thread_stats.alloc(layout.size()); | ||
} | ||
alloc | ||
} | ||
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { | ||
self.jemalloc.dealloc(ptr, layout); | ||
if ptr.is_null() { | ||
return; | ||
} | ||
SELF.process_stats.dealloc(layout.size()); | ||
if let Ok(stats) = SELF.named_thread_stats.try_read() { | ||
if let Some(stats) = stats.as_ref() { | ||
if let Some(stats) = match_thread_name_safely(stats, false) { | ||
stats.dealloc(layout.size()); | ||
} | ||
} | ||
} else { | ||
SELF.unnamed_thread_stats.dealloc(layout.size()); | ||
} | ||
} | ||
} | ||
|
||
thread_local! ( | ||
static THREAD_NAME: RefCell<ThreadName> = RefCell::new(ThreadName::new()) | ||
); | ||
|
||
fn match_thread_name_safely(stats: &MemPoolStats, insert_if_missing: bool) -> Option<&Counters> { | ||
let name: Option<ThreadName> = THREAD_NAME | ||
.try_with(|v| { | ||
let mut name = v.borrow_mut(); | ||
if name.is_empty() { | ||
if insert_if_missing { | ||
unsafe { | ||
name.set_len(NAME_LEN); | ||
let res = libc::pthread_getname_np( | ||
libc::pthread_self(), | ||
name.as_mut_ptr() as *mut i8, | ||
name.capacity(), | ||
); | ||
if res == 0 { | ||
let name_len = memchr::memchr(0, &name).unwrap_or(name.len()); | ||
name.set_len(name_len); | ||
} | ||
} | ||
} else { | ||
return None; | ||
} | ||
} | ||
Some(name.clone()) | ||
}) | ||
.ok() | ||
.flatten(); | ||
match name { | ||
Some(name) => { | ||
for (prefix, stats) in stats.data.iter() { | ||
if !name.starts_with(prefix) { | ||
continue; | ||
} | ||
return Some(stats); | ||
} | ||
None | ||
} | ||
None => None, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
use { | ||
crate::jemalloc_monitor::*, log::Level, solana_metrics::datapoint::DataPoint, | ||
std::time::Duration, | ||
}; | ||
|
||
fn watcher_thread() { | ||
fn extend_lifetime<'b>(r: &'b str) -> &'static str { | ||
// SAFETY: it is safe to extend lifetimes here since we can never write any metrics beyond the point | ||
// where allocator is deinitialized. The function is private so can not be called from outside | ||
// Metrics can not work with non-static strings due to design limitations. | ||
unsafe { std::mem::transmute::<&'b str, &'static str>(r) } | ||
} | ||
let mut exit = false; | ||
while !exit { | ||
view_allocations(|stats| { | ||
if stats.data.is_empty() { | ||
exit = true; | ||
} | ||
let mut datapoint = DataPoint::new("MemoryBytesAllocatedTotal"); | ||
for (name, counters) in stats.data.iter() { | ||
let s = counters.view(); | ||
let name = extend_lifetime(std::str::from_utf8(name).unwrap()); | ||
datapoint.add_field_i64(name, s.bytes_allocated_total as i64); | ||
} | ||
solana_metrics::submit(datapoint, Level::Info); | ||
let mut datapoint = DataPoint::new("MemoryBytesDeallocated"); | ||
for (name, counters) in stats.data.iter() { | ||
let s = counters.view(); | ||
let name = extend_lifetime(std::str::from_utf8(name).unwrap()); | ||
datapoint.add_field_i64(name, s.bytes_deallocated_total as i64); | ||
} | ||
solana_metrics::submit(datapoint, Level::Info); | ||
}); | ||
let (cunnamed, _cproc) = view_global_allocations(); | ||
let mut datapoint = solana_metrics::datapoint::DataPoint::new("MemoryUnnamedThreads"); | ||
datapoint.add_field_i64( | ||
"bytes_allocated_total", | ||
cunnamed.bytes_allocated_total as i64, | ||
); | ||
datapoint.add_field_i64( | ||
"bytes_deallocated_total", | ||
cunnamed.bytes_deallocated_total as i64, | ||
); | ||
solana_metrics::submit(datapoint, Level::Info); | ||
|
||
std::thread::sleep(Duration::from_millis(1000)); | ||
} | ||
} | ||
|
||
pub fn setup_watch_memory_usage() { | ||
let mut mps = MemPoolStats::default(); | ||
// his list is brittle but there does not appear to be a better way | ||
for thread in [ | ||
"solPohTickProd", | ||
"solSigVerTpuVote", | ||
"solRcvrGossip", | ||
"solSigVerTpu", | ||
"solClusterInfo", | ||
"solGossip", | ||
"solRepair", | ||
"FetchStage", | ||
"solShredFetch", | ||
"solReplayTx", | ||
"solReplayFork", | ||
"solRayonGlob", | ||
"solSvrfyShred", | ||
"solSigVerify", | ||
"solRetransmit", | ||
"solRunGossip", | ||
"solGossipWork", | ||
"solWinInsert", | ||
"solGossipCons", | ||
"solAccounts", | ||
"solAccountsLo", | ||
"solAcctHash", | ||
"solVoteSigVerTpu", | ||
"solTrSigVerTpu", | ||
"solQuicClientRt", | ||
"solQuicTVo", | ||
"solQuicTpu", | ||
"solQuicTpuFwd", | ||
"solRepairQuic", | ||
"solGossipQuic", | ||
"solTurbineQuic", | ||
] { | ||
mps.add(thread); | ||
} | ||
init_allocator(mps); | ||
std::thread::spawn(watcher_thread); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.