Using parallel threads and mpsc channels to find large files quickly

I was confused as to why my disk was slowly but steadily loosing storage space and wrote a binary crate to find out why. My initial approach used a single thread to traverse a directory specified as a command line argument. It took several hours to scan my home directory, and so I decided to use the opportunity to learn about mpsc channels.

Multiple-producer single-consumer channels are a way to send data from multiple producer threads, in this case these threads are scanning the filesystem and the receiving thread is using data sent by these threads to process metadata needed to size the files found by the scanning threads.

This page will be focused on setting such a project up and describing how the logic works.

Setting Up



  • Clap makes argument parsing simple and straightforward.
  • The attributes used here will automatically output from my Cargo.toml file describing the author, version, and crate.
  • Attributes can be used for each argument to specify the characters used to precede the argument and set default values.
  • I thought it made sense to allow users to specify the following:
    • How many largest files they want to see
    • The batch size used for processing files
    • The directory to scan for large files
    • A file containing lines describing directories to ignore
    • A verbosity setting that will currently only output all errors generated during runtime
 
    use clap::Parser;
    #[derive(Parser, Debug)]
    #[command(author, version, about, long_about = None)]
    pub struct Args {
        /// (optional) Number of largest entries to output
        #[arg(short = 'n', long = "num_entries", default_value_t = 10)]
        pub num_entries: usize,
    
        /// (optional) Number of files to size at one time
        #[arg(short = 'b', long = "batch_Size", default_value_t = 1000)]
        pub batch_size: usize,
    
        /// (optional) defaults to attempting to detect current working directory
        #[arg(short = 'd', long = "directory")]
        pub target_dir: Option<String>,
    
        /// (optional) Path to a file where each line specifies a directory to ignore
        #[arg(short = 'x', long = "excluded-dirs-file")]
        pub exclusion_file: Option<String>,
    
        #[arg(short, long)]
        pub verbose: bool,
    }   
                    




  • These arguments will all be passed to a Config struct that can be used to encapsulate them throughout the program, especially useful in the context of parallel threads since this struct can be wrapped in an Arc.
  • In addition to the command line arguments I am using a method from the standard library to determine the recommended number of threads to use based upon CPU cores. See these rust docs
  • I am also calling a function I wrote to determine the platform specific number of file handles that the program should open at one time. When I ran my initial rendition of this program, I kept seeing OS Error: Too many open files. Unix based systems typically limit the number of open files more aggressively than Windows, and so I have used separate logic:
/// Returns a platform specific (Windows or Unix) cap on open file handles.
/// On Unix will return 50% of the system's limit.
/// Windows uses a RAM based approach to allocate 64 file descriptors per 1GB of RAM.
fn get_fd_limit() -> usize {
    #[cfg(unix)]
    {
        use libc::{rlimit, RLIMIT_NOFILE};
        let mut rlim = rlimit {
            rlim_cur: 0,
            rlim_max: 0,
        };
        // Add some debug printing
        let result = unsafe { libc::getrlimit(RLIMIT_NOFILE, &mut rlim) };
        if result == 0 {
            let limit = rlim.rlim_cur as usize;
            return limit / 2;
        } else {
            // Print the error if getrlimit fails
            println!("Error: {}", std::io::Error::last_os_error());
        }
    }

    #[cfg(windows)]
    {
        // Try to get system memory info to make an educated guess
        use windows_sys::Win32::System::SystemInformation::GetPhysicallyInstalledSystemMemory;
        let mut memory_kb: u64 = 0;
        if unsafe { GetPhysicallyInstalledSystemMemory(&mut memory_kb) } != 0 {
            let memory_gb = memory_kb / (1024 * 1024);
            // Scale based on available memory, but cap at reasonable limits
            return usize::min(usize::max(512, (memory_gb * 64) as usize), 8192);
        }
        // Fallback for Windows
        return 2048;
    }

    // Default fallback
    100
}

                            
#[derive(Clone)]
pub struct Config {
    pub num_threads: usize,
    pub num_entries: usize,
    pub batch_size: usize,
    pub root_path: PathBuf,
    pub skip_dirs: HashSet<String>,
    pub max_open_files: usize,
    pub verbose: bool,
}

impl Config {
    pub fn build(args: &Args) -> Result<Config, Box<dyn Error>> {
        let num_threads = std::thread::available_parallelism()
            .map(|n| n.get())
            .unwrap_or(1);

        println!("Preparing to scan using {} threads", num_threads);

        let max_open_files = get_fd_limit();
        println!("Limiting open file handles to {}", max_open_files);

        let num_entries = args.num_entries;
        let batch_size = args.batch_size;
        let verbose = args.verbose;

        let root_path = if let Some(target_dir) = &args.target_dir {
            PathBuf::from(target_dir)
        } else {
            env::current_dir()?
        };

        let mut skip_dirs: HashSet<String> = HashSet::new();
        if let Some(exclusion_file) = &args.exclusion_file {
            let file = File::open(exclusion_file)
                .expect("A path to an excluded directories file was provided but the file could not be read");

            let reader = BufReader::new(file);
            reader.lines().for_each(|line| match line {
                Ok(dir) => {
                    skip_dirs.insert(dir);
                }
                Err(e) => log::error!("Error reading line: {}", e),
            });
        }

        Ok(Config {
            num_threads,
            num_entries,
            batch_size,
            root_path,
            skip_dirs,
            max_open_files,
            verbose
        })
    }
}
    
                    




  • The main method will begin program execution and parse the command line arguments into a Config struct.
  • I've also used a timing mechanism to measure how long the program runs for.
  • We still need to describe some structs, traits, and errors before jumping into the run method.
fn main() {
    std::env::set_var("RUST_LOG", "trace");
    env_logger::init();

    let start = Instant::now();

    let args = Args::parse();

    let config = Config::build(&args).unwrap_or_else(|err| {
        log::error!("Could not parse arguments: {}", err);
        process::exit(1);
    });

    if let Err(e) = run(config) {
        log::error!("Fatal Error: {e}");
        process::exit(1);
    }

    let duration = start.elapsed();
    println!(
        "\nProgram completed in {:?} seconds",
        duration.as_secs_f32()
    );
}                                
                            

Additional Program Logic



  • I've defined a TopEntries struct that is used to wrap a Vec of tuples describing a filepath and the associated size of the file at that path.
  • I've also wrapped an integer in this struct that will enforce a limit on the size of this Vec.
  • The limit is useful for inserting into the Vec. If the number of largest files is known when the program begins because the user either specified this number or else the program is defaulting to ten files then it doesn't make sense to store any additional file sizes that won't be used. We are only interested in the Nth largest.
  • By storing only the filepaths and sizes that are relevant to the user we eliminate some significant space complexity for very large directory traversals.
  • I've also enforced ordering such that the largest files remain at the front of the Vec, the Vec maintains a descending sorted order.
  • This means when a filepath/size tuple is inserted, it can be quickly determined if the Vec is under capacity or if the filesize is larger than the last tuple in the Vec, and if so, it can be inserted at the appropriate position.
  • I've used the partition_point() function from the core::slice crate that uses a binary search to determine the appropriate position for the insert to occur.
  • If the insert occurs and the Vec exceeds capacity, the last element (smallest) will be popped.
  • The get_entries() function is only used in my unit tests, to ensure the invariants described above are enforced.
pub struct TopEntries {
    pub entries: Vec<(String, u64)>,
    pub max_entries: usize,
}

impl TopEntries {
    pub fn new(max_entries: usize) -> Self {
        Self {
            entries: Vec::with_capacity(max_entries + 1),
            max_entries,
        }
    }

    pub fn insert(&mut self, path: String, size: u64) {
        if self.entries.len() < self.max_entries
            || size > self.entries.last().map(|(_, s)| *s).unwrap_or(0)
        {
            let idx = self.entries.partition_point(|(_, s)| *s > size);
            self.entries.insert(idx, (path, size));

            if self.entries.len() > self.max_entries {
                self.entries.pop();
            }
        }
    }

    #[allow(dead_code)]
    pub fn get_entries(&self) -> &[(String, u64)] {
        &self.entries
    }
}

                                




  • Instead of outputting each size represented in bytes, I wanted to make this more user friendly by outputting file sizes in the more common units KB, MB, GB, and TB.
  • I've defined a trait ByteSize associated with a format_size() function.
  • This function will return a format string based upon the number of bytes represented by the u64.
  • This results in more agreeable output: 256 MB instead of 256000000.
pub trait ByteSize {
    fn format_size(&self) -> String;
}

impl ByteSize for u64 {
    fn format_size(&self) -> String {
        const KB: u64 = 1024;
        const MB: u64 = KB * 1024;
        const GB: u64 = MB * 1024;
        const TB: u64 = GB * 1024;

        match self {
            bytes if *bytes >= TB => format!("{:.2} TB", *bytes as f64 / TB as f64),
            bytes if *bytes >= GB => format!("{:.2} GB", *bytes as f64 / GB as f64),
            bytes if *bytes >= MB => format!("{:.2} MB", *bytes as f64 / MB as f64),
            bytes if *bytes >= KB => format!("{:.2} KB", *bytes as f64 / KB as f64),
            bytes => format!("{} bytes", bytes),
        }
    }
}

                            




  • Lastly I've defined some error types that will make verbose output easier to read and make sense of.
  • It also makes error handling more straightforward in the library functions shown below.
#[derive(Debug)]
pub enum SearchError {
    IoError(std::io::Error),
    SendError(String),
    ThreadError(String),
    PathError(String),
}

impl From<std::io::Error> for SearchError {
    fn from(err: std::io::Error) -> Self {
        SearchError::IoError(err)
    }
}

impl std::fmt::Display for SearchError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            SearchError::IoError(e) => write!(f, "IO error: {}", e),
            SearchError::SendError(e) => write!(f, "Send error: {}", e),
            SearchError::ThreadError(e) => write!(f, "Thread error: {}", e),
            SearchError::PathError(e) => write!(f, "Path error: {}", e),
        }
    }
}

impl std::error::Error for SearchError {
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        match self {
            SearchError::IoError(e) => Some(e),
            _ => None,
        }
    }
}

                            

Library Functions

The remaining functions are part of my core program logic, and so I have placed them in a file lib.rs



  • The run() function is responsible for using the Config members in order to:
    • Set up local variables used for logical branches
    • Create an error log
    • Output information to the user and setup progress indicators (I am using Indicatif)
    • Create an mpsc channel so that scanning threads can send filesystem data as it becomes available
    • Spawn a new thread to invoke the parallel_search() function
    • Pass data sent by scanning threads to the process_batch() function
    • Wait for scanning logic to finish and display program output
pub fn run(config: Config) -> Result<(), Box<dyn Error>> {
    let is_verbose = config.verbose;
    let error_log: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
    let error_log_clone = error_log.clone();
    let config_arc: Arc<Config> = Arc::new(config.clone());

    print!(
        "Searching for {0} largest entries in {1}:\n",
        config.num_entries,
        config.root_path.display()
    );

    let multi_progress = MultiProgress::new();
    let scan_progress = multi_progress.add(ProgressBar::new_spinner());
    scan_progress.set_style(
        ProgressStyle::default_spinner()
            .template("{spinner:.green} [{elapsed_precise}] {msg}")
            .unwrap(),
    );

    let process_progress = multi_progress.add(ProgressBar::new_spinner());
    process_progress.set_style(
        ProgressStyle::default_spinner()
            .template("{spinner:.green} [{elapsed_precise}] {msg}")
            .unwrap(),
    );

    let (tx, rx) = mpsc::channel();
    let top_entries = Arc::new(Mutex::new(TopEntries::new(config.num_entries)));

    let root_path = config.root_path.clone();
    let scan_handle = thread::spawn(move || {
        parallel_search(
            &root_path,
            tx,
            scan_progress,
            config_arc.clone(),
            error_log_clone.clone(),
        )
    });

    let mut total_files = 0;
    let mut total_processed = 0;
    let mut total_attempts = 0;

    while let Ok(batch) = rx.recv() {
        total_files += batch.len();
        let (processed, attempted) =
            process_batch(batch, &top_entries, error_log.clone(), is_verbose);
        total_processed += processed;
        total_attempts += attempted;

        process_progress.set_message(format!(
            "Processing {} files (successfully processed: {}, failed: {})...",
            total_files,
            total_processed,
            total_attempts - total_processed
        ));
    }

    match scan_handle.join() {
        Ok(result) => result.map_err(|e| Box::new(e))?,
        Err(e) => {
            if is_verbose {
                error_log
                    .lock()
                    .unwrap()
                    .push(format!("Scanner thread panicked: {:?}", e));
            }
        }
    }

    process_progress.finish_with_message(format!(
        "Processed {} files ({} successful, {} failed)",
        total_attempts,
        total_processed,
        total_attempts - total_processed
    ));

    if is_verbose {
        println!();
        error_log.lock().unwrap().iter().for_each(|e| {
            eprintln!("{}", e);
        });
    }

    println!("\n");

    match top_entries.lock() {
        Ok(top) => {
            if top.entries.is_empty() {
                println!("No files found - run with -v flag for error output");
            } else {
                for (path, size) in top.entries.iter() {
                    println!("{}: {}", path, size.format_size());
                }
            }
        }
        Err(e) => {
            return Err(Box::new(io::Error::new(
                io::ErrorKind::Other,
                format!("Failed to lock top entries for final output: {}", e),
            )));
        }
    }

    Ok(())
}                                    
                                


  • The process_batch() function described earlier is responsible for performing a parallel metadata retrieval for each entry in a Vec of FileEntry types. If the metadata retrieval is successful, it will be added to a Vec of tuples that represent a filepath and and associated metadata for the file at that path.
  • If an error results, which is fairly common when scanning system files or any files the program user doesn't have permissions for, an error will be added to the error log, provided the user has opted into running the program verbosely.
  • All of the successful metadata retrievals will then be mapped to TopEntry insertions.
  • A reminder this means that a tuple describing the filepath and associated size will be pushed to the appropriate index of an N sized Vec in descending order, but only if the entry is larger than the last of these entries or the Vec is not at capacity.
  • This function returns a tuple describing the total number of metadata requests and those that have been successfully processed in order to update the progress indicator.
fn process_batch(
    batch: Vec<FileEntry>,
    top_entries: &Arc<Mutex<TopEntries>>,
    error_log: Arc<Mutex<Vec<String>>>,
    is_verbose: bool,
) -> (usize, usize) {
    let metadata_results: Vec<_> = batch
        .into_par_iter()
        .map(|entry| match entry.result {
            Ok(()) => match fs::metadata(&entry.path) {
                Ok(metadata) => Some((entry.path, Ok(metadata))),
                Err(err) => Some((entry.path, Err(err))),
            },
            Err(err) => Some((
                entry.path,
                Err(io::Error::new(
                    io::ErrorKind::Other,
                    format!("Previous error: {:?}", err),
                )),
            )),
        })
        .collect();

    let total = metadata_results.len();
    let mut processed = 0;
    let mut errors = Vec::new();

    for result in metadata_results {
        if let Some((path, metadata_result)) = result {
            match metadata_result {
                Ok(metadata) => match path.size_on_disk_fast(&metadata) {
                    Ok(size) => {
                        if let Some(path_str) = path.to_str() {
                            match top_entries.lock() {
                                Ok(mut top) => {
                                    top.insert(path_str.to_string(), size);
                                    processed += 1;
                                }
                                Err(err) => {
                                    errors.push(format!(
                                        "Failed to lock top_entries for {}: {}",
                                        path.display(),
                                        err
                                    ));
                                }
                            }
                        } else {
                            errors.push(format!("Invalid UTF-8 in path: {}", path.display()));
                        }
                    }
                    Err(err) => {
                        errors.push(format!(
                            "Failed to get size for {}: {}",
                            path.display(),
                            err
                        ));
                    }
                },
                Err(err) => {
                    errors.push(format!("Error processing {}: {}", path.display(), err));
                }
            }
        }
    }

    if !errors.is_empty() && is_verbose {
        error_log.lock().unwrap().extend(errors);
    }

    (processed, total)
}

                                


  • Lastly, I've defined a parallel_search() function that expects as arguments a file path to begin traversal from, a ProgressBar used to display runtime progress, an Arc wrapping a reference to a Config struct, and a Vec (wrapped in an Arc and Mutex for thread safety). Very importantly it also expects a Sender used to send a Vec of file-entries back to the run() function for batch processing.
  • This function operates using a VecDeque collection which pushes to the back of a doubly-linked-list and pops from the front.
  • The threads share access to this work queue and as filesystem entries are found they are added for batch processing if they are a file, or else added to the work queue if they are a directory.
  • Each thread also shares access to the progress bar indicator to update the user as to what is being scanned, the limit of open file handles, any directories to ignore, and other variables used to synchronize work across the scanning threads and log errors.
  • I've added logic not only to skip over any ignored directories, but also symlinks, since in my original implementation I found system directories contained many of these and they aren't very useful when trying to size actual files.
  • When a batch grows to the determined size it will be sent back to the run() function for processing.
  • When scanning finishes the threads are joined locally in this function and then the thread handle used to spawn this function is also joined.
  • At this point the program output logic in run() will execute and return control to main().
  • main() will display how long the program execution took and exit.
fn parallel_search(
    root_dir: &Path,
    tx: Sender<Vec<FileEntry>>,
    progress: ProgressBar,
    config: Arc<Config>,
    error_log: Arc<Mutex<Vec<String>>>,
) -> Result<(), SearchError> {
    let work_queue = Arc::new(Mutex::new(VecDeque::new()));
    let is_scanning = Arc::new(AtomicBool::new(true));

    let skip_dirs: HashSet<PathBuf> = config
        .skip_dirs
        .iter()
        .filter_map(|dir| match PathBuf::from(dir).canonicalize() {
            Ok(path) => Some(path),
            Err(err) => {
                if config.verbose {
                    error_log.lock().unwrap().push(format!(
                        "Warning: Could not canonicalize skip directory '{}': {}",
                        dir, err
                    ));
                }
                None
            }
        })
        .collect();

    match root_dir.canonicalize() {
        Ok(root) => work_queue.lock().unwrap().push_back(root),
        Err(err) => {
            if config.verbose {
                error_log
                    .lock()
                    .unwrap()
                    .push(format!("Failed to canonicalize root directory: {}", err));
            }
        }
    }

    let mut handles = vec![];
    let open_files = Arc::new(AtomicUsize::new(0));
    let errors_count = Arc::new(AtomicUsize::new(0));

    for _ in 0..config.num_threads {
        let work_queue = Arc::clone(&work_queue);
        let tx = tx.clone();
        let progress = progress.clone();
        let open_files = Arc::clone(&open_files);
        let is_scanning = Arc::clone(&is_scanning);
        let skip_dirs = skip_dirs.clone();
        let errors_count = Arc::clone(&errors_count);
        let config_clone = config.clone();
        let error_log = error_log.clone();

        handles.push(thread::spawn(move || -> Result<(), SearchError> {
            let mut batch = Vec::with_capacity(config_clone.batch_size);

            'outer: loop {
                let dir = {
                    match work_queue.lock() {
                        Ok(mut q) => q.pop_front(),
                        Err(e) => {
                            if config_clone.verbose {
                                error_log
                                    .lock()
                                    .unwrap()
                                    .push(format!("Failed to lock work queue: {}", e));
                            }
                            None
                        }
                    }
                };

                match dir {
                    Some(dir) => {
                        progress.set_message(format!("Scanning: {}", dir.display()));

                        match dir.canonicalize() {
                            Ok(canonical_dir) => {
                                if skip_dirs
                                    .iter()
                                    .any(|skip_dir| canonical_dir.starts_with(skip_dir))
                                {
                                    continue;
                                }
                            }
                            Err(e) => {
                                if config_clone.verbose {
                                    error_log.lock().unwrap().push(format!(
                                        "Failed to canonicalize directory {:#?}: {}",
                                        dir, e
                                    ));
                                }
                            }
                        }

                        let mut wait_time = 1;
                        while open_files.load(Ordering::Relaxed) >= config_clone.max_open_files {
                            thread::sleep(std::time::Duration::from_millis(wait_time));
                            wait_time = wait_time.saturating_mul(2).min(100);
                        }
                        open_files.fetch_add(1, Ordering::SeqCst);

                        match fs::read_dir(&dir) {
                            Ok(entries) => {
                                for entry in entries.flatten() {
                                    let path = entry.path();
                                    if path.is_symlink() {
                                        continue;
                                    }

                                    if path.is_dir() {
                                        work_queue.lock().unwrap().push_back(path);
                                    } else if path.is_file() {
                                        batch.push(FileEntry::new(path.clone()));

                                        if batch.len() >= config_clone.batch_size {
                                            tx.send(batch.clone()).unwrap();
                                            batch.clear();
                                        }
                                    }
                                }
                            }
                            Err(e) => {
                                if config_clone.verbose {
                                    error_log.lock().unwrap().push(format!(
                                        "Failed to read directory {:#?}: {}",
                                        dir, e
                                    ));
                                }
                                errors_count.fetch_add(1, Ordering::SeqCst);
                            }
                        }

                        open_files.fetch_sub(1, Ordering::SeqCst);
                    }
                    None => {
                        if !is_scanning.load(Ordering::Relaxed) {
                            break 'outer;
                        }
                        thread::sleep(std::time::Duration::from_millis(50));
                    }
                }
            }

            Ok(())
        }));
    }

    for handle in handles {
        if let Err(e) = handle.join().unwrap() {
            return Err(e);
        }
    }

    is_scanning.store(false, Ordering::Relaxed);
    Ok(())
}                                    
                                

Example

    A CLI to find large files

    Usage: ferris-files [OPTIONS]

    Options:
    -n, --num_entries 
            (optional) Number of largest entries to output [default: 10]
    -b, --batch_Size 
            (optional) Number of files to size at one time [default: 1000]
    -d, --directory 
            (optional) defaults to attempting to detect current working directory
    -x, --excluded-dirs-file 
            (optional) Path to a file where each line specifies a directory to ignore
    -v, --verbose
            
    -h, --help
            Print help
    -V, --version
            Print version


    Preparing to scan using 12 threads
    Limiting open file handles to 524287
    Searching for 10 largest entries in /Users/user:
        [00:00:15] Directory scan complete (121 errors encountered: run with -v for details)
        [00:00:15] Processed 1399886 files (1399886 successful, 0 failed)                                                                                                                                                                                                                                        
    
    /Users/user/Movies/TV/Media.localized/Movies/Twin Peaks_ Fire Walk with Me/Twin Peaks_ Fire Walk with Me (1080p HD).m4v: 5.17 GB
    /Users/user/Library/Android/sdk/system-images/android-33/google_apis/x86_64/system.img: 4.01 GB
    /Users/user/Library/Android/sdk/system-images/android-34/google_apis/x86_64/system.img: 4.01 GB
    /Users/user/Library/Android/sdk/system-images/android-31/google_apis/x86_64/system.img: 4.01 GB
    /Users/user/Movies/TV/Media.localized/Movies/The Animatrix/04 The Animatrix (1080p HD).m4v: 3.65 GB
    /Users/user/.android/avd/Pixel_5_API_33.avd/userdata-qemu.img.qcow2: 3.54 GB
    /Users/user/Library/Android/sdk/system-images/android-30/google_apis/x86/system.img: 3.01 GB
    /Users/user/Library/Android/sdk/system-images/android-31/google_apis_playstore/x86_64/system.img: 2.67 GB
    /Users/user/Library/Containers/com.docker.docker/Data/vms/0/data/Docker.raw: 2.32 GB
    /Users/user/Virtual Machines.localized/kali-linux-2024.2-vmware-amd64.vmwarevm/kali-linux-2024.2-vmware-amd64-s035.vmdk: 1.97 GB
    
    Program completed in 15.199807 seconds
                            


View the project source code on GitHub

Top Of Page