I was confused as to why my disk was slowly but steadily loosing storage space and wrote a binary crate to find out why. My initial approach used a single thread to traverse a directory specified as a command line argument. It took several hours to scan my home directory, and so I decided to use the opportunity to learn about mpsc channels.
Multiple-producer single-consumer channels are a way to send data from multiple producer threads, in this case these threads are scanning the filesystem and the receiving thread is using data sent by these threads to process metadata needed to size the files found by the scanning threads.
This page will be focused on setting such a project up and describing how the logic works.
use clap::Parser;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct Args {
/// (optional) Number of largest entries to output
#[arg(short = 'n', long = "num_entries", default_value_t = 10)]
pub num_entries: usize,
/// (optional) Number of files to size at one time
#[arg(short = 'b', long = "batch_Size", default_value_t = 1000)]
pub batch_size: usize,
/// (optional) defaults to attempting to detect current working directory
#[arg(short = 'd', long = "directory")]
pub target_dir: Option<String>,
/// (optional) Path to a file where each line specifies a directory to ignore
#[arg(short = 'x', long = "excluded-dirs-file")]
pub exclusion_file: Option<String>,
#[arg(short, long)]
pub verbose: bool,
}
Config
struct that
can be used to encapsulate them throughout the program, especially useful in the
context of parallel threads since this struct can be wrapped in an
Arc.
OS Error: Too many open files
. Unix based systems typically limit
the number of open files
more aggressively than Windows, and so I have used separate logic:
/// Returns a platform specific (Windows or Unix) cap on open file handles.
/// On Unix will return 50% of the system's limit.
/// Windows uses a RAM based approach to allocate 64 file descriptors per 1GB of RAM.
fn get_fd_limit() -> usize {
#[cfg(unix)]
{
use libc::{rlimit, RLIMIT_NOFILE};
let mut rlim = rlimit {
rlim_cur: 0,
rlim_max: 0,
};
// Add some debug printing
let result = unsafe { libc::getrlimit(RLIMIT_NOFILE, &mut rlim) };
if result == 0 {
let limit = rlim.rlim_cur as usize;
return limit / 2;
} else {
// Print the error if getrlimit fails
println!("Error: {}", std::io::Error::last_os_error());
}
}
#[cfg(windows)]
{
// Try to get system memory info to make an educated guess
use windows_sys::Win32::System::SystemInformation::GetPhysicallyInstalledSystemMemory;
let mut memory_kb: u64 = 0;
if unsafe { GetPhysicallyInstalledSystemMemory(&mut memory_kb) } != 0 {
let memory_gb = memory_kb / (1024 * 1024);
// Scale based on available memory, but cap at reasonable limits
return usize::min(usize::max(512, (memory_gb * 64) as usize), 8192);
}
// Fallback for Windows
return 2048;
}
// Default fallback
100
}
#[derive(Clone)]
pub struct Config {
pub num_threads: usize,
pub num_entries: usize,
pub batch_size: usize,
pub root_path: PathBuf,
pub skip_dirs: HashSet<String>,
pub max_open_files: usize,
pub verbose: bool,
}
impl Config {
pub fn build(args: &Args) -> Result<Config, Box<dyn Error>> {
let num_threads = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
println!("Preparing to scan using {} threads", num_threads);
let max_open_files = get_fd_limit();
println!("Limiting open file handles to {}", max_open_files);
let num_entries = args.num_entries;
let batch_size = args.batch_size;
let verbose = args.verbose;
let root_path = if let Some(target_dir) = &args.target_dir {
PathBuf::from(target_dir)
} else {
env::current_dir()?
};
let mut skip_dirs: HashSet<String> = HashSet::new();
if let Some(exclusion_file) = &args.exclusion_file {
let file = File::open(exclusion_file)
.expect("A path to an excluded directories file was provided but the file could not be read");
let reader = BufReader::new(file);
reader.lines().for_each(|line| match line {
Ok(dir) => {
skip_dirs.insert(dir);
}
Err(e) => log::error!("Error reading line: {}", e),
});
}
Ok(Config {
num_threads,
num_entries,
batch_size,
root_path,
skip_dirs,
max_open_files,
verbose
})
}
}
Config
struct.
run
method.
fn main() {
std::env::set_var("RUST_LOG", "trace");
env_logger::init();
let start = Instant::now();
let args = Args::parse();
let config = Config::build(&args).unwrap_or_else(|err| {
log::error!("Could not parse arguments: {}", err);
process::exit(1);
});
if let Err(e) = run(config) {
log::error!("Fatal Error: {e}");
process::exit(1);
}
let duration = start.elapsed();
println!(
"\nProgram completed in {:?} seconds",
duration.as_secs_f32()
);
}
TopEntries
struct that is used to wrap a
Vec of tuples describing a filepath and the associated size of the file at that
path.
partition_point()
function from the
core::slice
crate that uses a binary search to determine the appropriate position for the insert
to occur.
get_entries()
function is only used in my unit tests, to ensure the
invariants
described above are enforced.
pub struct TopEntries {
pub entries: Vec<(String, u64)>,
pub max_entries: usize,
}
impl TopEntries {
pub fn new(max_entries: usize) -> Self {
Self {
entries: Vec::with_capacity(max_entries + 1),
max_entries,
}
}
pub fn insert(&mut self, path: String, size: u64) {
if self.entries.len() < self.max_entries
|| size > self.entries.last().map(|(_, s)| *s).unwrap_or(0)
{
let idx = self.entries.partition_point(|(_, s)| *s > size);
self.entries.insert(idx, (path, size));
if self.entries.len() > self.max_entries {
self.entries.pop();
}
}
}
#[allow(dead_code)]
pub fn get_entries(&self) -> &[(String, u64)] {
&self.entries
}
}
ByteSize
associated with a format_size()
function.
u64
.
pub trait ByteSize {
fn format_size(&self) -> String;
}
impl ByteSize for u64 {
fn format_size(&self) -> String {
const KB: u64 = 1024;
const MB: u64 = KB * 1024;
const GB: u64 = MB * 1024;
const TB: u64 = GB * 1024;
match self {
bytes if *bytes >= TB => format!("{:.2} TB", *bytes as f64 / TB as f64),
bytes if *bytes >= GB => format!("{:.2} GB", *bytes as f64 / GB as f64),
bytes if *bytes >= MB => format!("{:.2} MB", *bytes as f64 / MB as f64),
bytes if *bytes >= KB => format!("{:.2} KB", *bytes as f64 / KB as f64),
bytes => format!("{} bytes", bytes),
}
}
}
#[derive(Debug)]
pub enum SearchError {
IoError(std::io::Error),
SendError(String),
ThreadError(String),
PathError(String),
}
impl From<std::io::Error> for SearchError {
fn from(err: std::io::Error) -> Self {
SearchError::IoError(err)
}
}
impl std::fmt::Display for SearchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SearchError::IoError(e) => write!(f, "IO error: {}", e),
SearchError::SendError(e) => write!(f, "Send error: {}", e),
SearchError::ThreadError(e) => write!(f, "Thread error: {}", e),
SearchError::PathError(e) => write!(f, "Path error: {}", e),
}
}
}
impl std::error::Error for SearchError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
SearchError::IoError(e) => Some(e),
_ => None,
}
}
}
The remaining functions are part of my core program logic, and so I have placed them in a file
lib.rs
run()
function is responsible for using the Config
members
in order to:
parallel_search()
function
process_batch()
function
pub fn run(config: Config) -> Result<(), Box<dyn Error>> {
let is_verbose = config.verbose;
let error_log: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let error_log_clone = error_log.clone();
let config_arc: Arc<Config> = Arc::new(config.clone());
print!(
"Searching for {0} largest entries in {1}:\n",
config.num_entries,
config.root_path.display()
);
let multi_progress = MultiProgress::new();
let scan_progress = multi_progress.add(ProgressBar::new_spinner());
scan_progress.set_style(
ProgressStyle::default_spinner()
.template("{spinner:.green} [{elapsed_precise}] {msg}")
.unwrap(),
);
let process_progress = multi_progress.add(ProgressBar::new_spinner());
process_progress.set_style(
ProgressStyle::default_spinner()
.template("{spinner:.green} [{elapsed_precise}] {msg}")
.unwrap(),
);
let (tx, rx) = mpsc::channel();
let top_entries = Arc::new(Mutex::new(TopEntries::new(config.num_entries)));
let root_path = config.root_path.clone();
let scan_handle = thread::spawn(move || {
parallel_search(
&root_path,
tx,
scan_progress,
config_arc.clone(),
error_log_clone.clone(),
)
});
let mut total_files = 0;
let mut total_processed = 0;
let mut total_attempts = 0;
while let Ok(batch) = rx.recv() {
total_files += batch.len();
let (processed, attempted) =
process_batch(batch, &top_entries, error_log.clone(), is_verbose);
total_processed += processed;
total_attempts += attempted;
process_progress.set_message(format!(
"Processing {} files (successfully processed: {}, failed: {})...",
total_files,
total_processed,
total_attempts - total_processed
));
}
match scan_handle.join() {
Ok(result) => result.map_err(|e| Box::new(e))?,
Err(e) => {
if is_verbose {
error_log
.lock()
.unwrap()
.push(format!("Scanner thread panicked: {:?}", e));
}
}
}
process_progress.finish_with_message(format!(
"Processed {} files ({} successful, {} failed)",
total_attempts,
total_processed,
total_attempts - total_processed
));
if is_verbose {
println!();
error_log.lock().unwrap().iter().for_each(|e| {
eprintln!("{}", e);
});
}
println!("\n");
match top_entries.lock() {
Ok(top) => {
if top.entries.is_empty() {
println!("No files found - run with -v flag for error output");
} else {
for (path, size) in top.entries.iter() {
println!("{}: {}", path, size.format_size());
}
}
}
Err(e) => {
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
format!("Failed to lock top entries for final output: {}", e),
)));
}
}
Ok(())
}
process_batch()
function described earlier is responsible
for performing a parallel metadata retrieval for each entry in a Vec of
FileEntry
types. If the metadata retrieval is successful, it will be added to a Vec of
tuples that represent a filepath and and associated metadata for the file at that
path.
TopEntry
insertions.
fn process_batch(
batch: Vec<FileEntry>,
top_entries: &Arc<Mutex<TopEntries>>,
error_log: Arc<Mutex<Vec<String>>>,
is_verbose: bool,
) -> (usize, usize) {
let metadata_results: Vec<_> = batch
.into_par_iter()
.map(|entry| match entry.result {
Ok(()) => match fs::metadata(&entry.path) {
Ok(metadata) => Some((entry.path, Ok(metadata))),
Err(err) => Some((entry.path, Err(err))),
},
Err(err) => Some((
entry.path,
Err(io::Error::new(
io::ErrorKind::Other,
format!("Previous error: {:?}", err),
)),
)),
})
.collect();
let total = metadata_results.len();
let mut processed = 0;
let mut errors = Vec::new();
for result in metadata_results {
if let Some((path, metadata_result)) = result {
match metadata_result {
Ok(metadata) => match path.size_on_disk_fast(&metadata) {
Ok(size) => {
if let Some(path_str) = path.to_str() {
match top_entries.lock() {
Ok(mut top) => {
top.insert(path_str.to_string(), size);
processed += 1;
}
Err(err) => {
errors.push(format!(
"Failed to lock top_entries for {}: {}",
path.display(),
err
));
}
}
} else {
errors.push(format!("Invalid UTF-8 in path: {}", path.display()));
}
}
Err(err) => {
errors.push(format!(
"Failed to get size for {}: {}",
path.display(),
err
));
}
},
Err(err) => {
errors.push(format!("Error processing {}: {}", path.display(), err));
}
}
}
}
if !errors.is_empty() && is_verbose {
error_log.lock().unwrap().extend(errors);
}
(processed, total)
}
parallel_search()
function that expects as
arguments
a file path to begin traversal from, a ProgressBar used to display runtime progress,
an Arc wrapping a reference to a Config struct, and a Vec (wrapped in an Arc and
Mutex for thread safety). Very importantly it also expects a Sender
used
to send a Vec of file-entries back to the run()
function for batch
processing.
run()
function
for processing.
run()
will execute and return control to main()
.
main()
will display how long the program execution took and exit.
fn parallel_search(
root_dir: &Path,
tx: Sender<Vec<FileEntry>>,
progress: ProgressBar,
config: Arc<Config>,
error_log: Arc<Mutex<Vec<String>>>,
) -> Result<(), SearchError> {
let work_queue = Arc::new(Mutex::new(VecDeque::new()));
let is_scanning = Arc::new(AtomicBool::new(true));
let skip_dirs: HashSet<PathBuf> = config
.skip_dirs
.iter()
.filter_map(|dir| match PathBuf::from(dir).canonicalize() {
Ok(path) => Some(path),
Err(err) => {
if config.verbose {
error_log.lock().unwrap().push(format!(
"Warning: Could not canonicalize skip directory '{}': {}",
dir, err
));
}
None
}
})
.collect();
match root_dir.canonicalize() {
Ok(root) => work_queue.lock().unwrap().push_back(root),
Err(err) => {
if config.verbose {
error_log
.lock()
.unwrap()
.push(format!("Failed to canonicalize root directory: {}", err));
}
}
}
let mut handles = vec![];
let open_files = Arc::new(AtomicUsize::new(0));
let errors_count = Arc::new(AtomicUsize::new(0));
for _ in 0..config.num_threads {
let work_queue = Arc::clone(&work_queue);
let tx = tx.clone();
let progress = progress.clone();
let open_files = Arc::clone(&open_files);
let is_scanning = Arc::clone(&is_scanning);
let skip_dirs = skip_dirs.clone();
let errors_count = Arc::clone(&errors_count);
let config_clone = config.clone();
let error_log = error_log.clone();
handles.push(thread::spawn(move || -> Result<(), SearchError> {
let mut batch = Vec::with_capacity(config_clone.batch_size);
'outer: loop {
let dir = {
match work_queue.lock() {
Ok(mut q) => q.pop_front(),
Err(e) => {
if config_clone.verbose {
error_log
.lock()
.unwrap()
.push(format!("Failed to lock work queue: {}", e));
}
None
}
}
};
match dir {
Some(dir) => {
progress.set_message(format!("Scanning: {}", dir.display()));
match dir.canonicalize() {
Ok(canonical_dir) => {
if skip_dirs
.iter()
.any(|skip_dir| canonical_dir.starts_with(skip_dir))
{
continue;
}
}
Err(e) => {
if config_clone.verbose {
error_log.lock().unwrap().push(format!(
"Failed to canonicalize directory {:#?}: {}",
dir, e
));
}
}
}
let mut wait_time = 1;
while open_files.load(Ordering::Relaxed) >= config_clone.max_open_files {
thread::sleep(std::time::Duration::from_millis(wait_time));
wait_time = wait_time.saturating_mul(2).min(100);
}
open_files.fetch_add(1, Ordering::SeqCst);
match fs::read_dir(&dir) {
Ok(entries) => {
for entry in entries.flatten() {
let path = entry.path();
if path.is_symlink() {
continue;
}
if path.is_dir() {
work_queue.lock().unwrap().push_back(path);
} else if path.is_file() {
batch.push(FileEntry::new(path.clone()));
if batch.len() >= config_clone.batch_size {
tx.send(batch.clone()).unwrap();
batch.clear();
}
}
}
}
Err(e) => {
if config_clone.verbose {
error_log.lock().unwrap().push(format!(
"Failed to read directory {:#?}: {}",
dir, e
));
}
errors_count.fetch_add(1, Ordering::SeqCst);
}
}
open_files.fetch_sub(1, Ordering::SeqCst);
}
None => {
if !is_scanning.load(Ordering::Relaxed) {
break 'outer;
}
thread::sleep(std::time::Duration::from_millis(50));
}
}
}
Ok(())
}));
}
for handle in handles {
if let Err(e) = handle.join().unwrap() {
return Err(e);
}
}
is_scanning.store(false, Ordering::Relaxed);
Ok(())
}
A CLI to find large files Usage: ferris-files [OPTIONS] Options: -n, --num_entries(optional) Number of largest entries to output [default: 10] -b, --batch_Size (optional) Number of files to size at one time [default: 1000] -d, --directory (optional) defaults to attempting to detect current working directory -x, --excluded-dirs-file (optional) Path to a file where each line specifies a directory to ignore -v, --verbose -h, --help Print help -V, --version Print version Preparing to scan using 12 threads Limiting open file handles to 524287 Searching for 10 largest entries in /Users/user: [00:00:15] Directory scan complete (121 errors encountered: run with -v for details) [00:00:15] Processed 1399886 files (1399886 successful, 0 failed) /Users/user/Movies/TV/Media.localized/Movies/Twin Peaks_ Fire Walk with Me/Twin Peaks_ Fire Walk with Me (1080p HD).m4v: 5.17 GB /Users/user/Library/Android/sdk/system-images/android-33/google_apis/x86_64/system.img: 4.01 GB /Users/user/Library/Android/sdk/system-images/android-34/google_apis/x86_64/system.img: 4.01 GB /Users/user/Library/Android/sdk/system-images/android-31/google_apis/x86_64/system.img: 4.01 GB /Users/user/Movies/TV/Media.localized/Movies/The Animatrix/04 The Animatrix (1080p HD).m4v: 3.65 GB /Users/user/.android/avd/Pixel_5_API_33.avd/userdata-qemu.img.qcow2: 3.54 GB /Users/user/Library/Android/sdk/system-images/android-30/google_apis/x86/system.img: 3.01 GB /Users/user/Library/Android/sdk/system-images/android-31/google_apis_playstore/x86_64/system.img: 2.67 GB /Users/user/Library/Containers/com.docker.docker/Data/vms/0/data/Docker.raw: 2.32 GB /Users/user/Virtual Machines.localized/kali-linux-2024.2-vmware-amd64.vmwarevm/kali-linux-2024.2-vmware-amd64-s035.vmdk: 1.97 GB Program completed in 15.199807 seconds