diff --git a/Cargo.lock b/Cargo.lock index 52c51d37..02bd17d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -591,6 +591,132 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" +[[package]] +name = "orx-concurrent-bag" +version = "2.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77f4ee3fa475b986b517141e4bb23b7de47c6f17cbaf8a0b768460cb0940b806" +dependencies = [ + "orx-fixed-vec", + "orx-pinned-concurrent-col", + "orx-pinned-vec", + "orx-pseudo-default", + "orx-split-vec", +] + +[[package]] +name = "orx-concurrent-iter" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f13e613c1f32c5afb3cabcd86281380706563add1e5685ad715e76b38f85b31" +dependencies = [ + "orx-iterable", + "orx-pseudo-default", +] + +[[package]] +name = "orx-concurrent-ordered-bag" +version = "2.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18bc4fa7d5826117ebbb4c126146877e196212b21080a3c4d3706830ec0c99a7" +dependencies = [ + "orx-fixed-vec", + "orx-pinned-concurrent-col", + "orx-pinned-vec", + "orx-split-vec", +] + +[[package]] +name = "orx-fixed-vec" +version = "3.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac2256b477caf926e154327216db53826598a7ba6a7105a41733cd52ccf56ef" +dependencies = [ + "orx-concurrent-iter", + "orx-iterable", + "orx-pinned-vec", + "orx-pseudo-default", +] + +[[package]] +name = "orx-iterable" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfa2cb3f82a187c68835faac9cf03faaee70b93f4da3b85515ac1b4c6f8a432d" +dependencies = [ + "orx-self-or", +] + +[[package]] +name = "orx-parallel" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e71e3fff439c943d1751616a4604811de1f58847a4e59b56518e659ae6bedee0" +dependencies = [ + "orx-concurrent-bag", + "orx-concurrent-iter", + "orx-concurrent-ordered-bag", + "orx-fixed-vec", + "orx-iterable", + "orx-pinned-concurrent-col", + "orx-pinned-vec", + "orx-priority-queue", + "orx-pseudo-default", + "orx-split-vec", +] + +[[package]] +name = "orx-pinned-concurrent-col" +version = "2.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6a073fc7fabc32eec4ed9ff4cac154113e9a41aa04b8ede99c331b1e3d10706" +dependencies = [ + "orx-fixed-vec", + "orx-pinned-vec", + "orx-pseudo-default", + "orx-split-vec", +] + +[[package]] +name = "orx-pinned-vec" +version = "3.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcd865a63f5d56134bd5e5d7ffe6e2a6a8458d79f625ebf7b56c43292829e96c" +dependencies = [ + "orx-iterable", + "orx-pseudo-default", +] + +[[package]] +name = "orx-priority-queue" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8706de0d8349fc6e720343aca4b28aad38010d0bc966a4bc573df0beaed99091" + +[[package]] +name = "orx-pseudo-default" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34eaace9ae01f7025804fbca40ec45b87c19ba0328d97195e01c6135897762a8" + +[[package]] +name = "orx-self-or" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67a8e35dfe18921e475b9861266fd58a5ecfd681161f242d24a9e2d1e07fbc28" + +[[package]] +name = "orx-split-vec" +version = "3.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a68c641c1e26519d64b46f7012a95f58e4372642ff2740dae0f831d471a327c7" +dependencies = [ + "orx-concurrent-iter", + "orx-iterable", + "orx-pinned-vec", + "orx-pseudo-default", +] + [[package]] name = "parallel-disk-usage" version = "0.20.1" @@ -609,6 +735,7 @@ dependencies = [ "itertools 0.14.0", "maplit", "normalize-path", + "orx-parallel", "pipe-trait", "pretty_assertions", "rand", diff --git a/Cargo.toml b/Cargo.toml index 528ed1d2..81cfd463 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,8 +59,8 @@ derive_setters = "0.1.8" fmt-iter = "0.2.1" into-sorted = "0.0.3" itertools = "0.14.0" +orx-parallel = "2.3.0" pipe-trait = "0.4.0" -rayon = "1.10.0" rounded-div = "0.1.2" serde = { version = "1.0.219", optional = true } serde_json = { version = "1.0.141", optional = true } @@ -77,3 +77,4 @@ maplit = "1.0.2" normalize-path = "0.2.1" pretty_assertions = "1.4.1" rand = "0.9.2" +rayon = "1.10.0" # TODO: replace this with orx-parallel diff --git a/src/app.rs b/src/app.rs index 7fad5b51..04794fa4 100644 --- a/src/app.rs +++ b/src/app.rs @@ -67,7 +67,7 @@ impl App { .map_err(RuntimeError::DeserializationFailure)? .body; - trait VisualizeJsonTree: size::Size + Into + Send { + trait VisualizeJsonTree: size::Size + Into + Send + Sync { fn visualize_json_tree( tree: JsonTree, bytes_format: Self::DisplayFormat, @@ -103,7 +103,7 @@ impl App { } } - impl + Send> VisualizeJsonTree for Size {} + impl + Send + Sync> VisualizeJsonTree for Size {} macro_rules! visualize { ($tree:expr, $bytes_format:expr) => { @@ -149,10 +149,7 @@ impl App { }; if let Some(threads) = threads { - rayon::ThreadPoolBuilder::new() - .num_threads(threads) - .build_global() - .unwrap_or_else(|_| eprintln!("warning: Failed to set thread limit to {threads}")); + todo!("Set number of threads to {threads}"); } if cfg!(unix) && self.args.deduplicate_hardlinks && self.args.files.len() > 1 { diff --git a/src/data_tree/hardlink.rs b/src/data_tree/hardlink.rs index 8cbf93b8..9283ac16 100644 --- a/src/data_tree/hardlink.rs +++ b/src/data_tree/hardlink.rs @@ -1,12 +1,12 @@ use super::DataTree; use crate::size; use assert_cmp::debug_assert_op; -use rayon::prelude::*; +use orx_parallel::*; use std::{ffi::OsStr, path::Path}; impl DataTree where - Self: Send, + Self: Send + Sync, Name: AsRef, Size: size::Size + Sync, { @@ -39,7 +39,8 @@ where } self.children - .par_iter_mut() + .iter_mut() // TODO: request orx-parallel to add par_mut + .iter_into_par() .for_each(|child| child.par_deduplicate_hardlinks(&sub_hardlink_info)) } } diff --git a/src/data_tree/reflection/par_methods.rs b/src/data_tree/reflection/par_methods.rs index 786c1dad..751e8f85 100644 --- a/src/data_tree/reflection/par_methods.rs +++ b/src/data_tree/reflection/par_methods.rs @@ -1,12 +1,12 @@ use super::{ConversionError, Reflection}; use crate::{data_tree::DataTree, size}; -use rayon::prelude::*; +use orx_parallel::*; use std::{ffi::OsStr, iter::once}; impl Reflection where - Name: Send, - Size: size::Size + Send, + Name: Send + Sync, + Size: size::Size + Send + Sync, { /// Attempting to convert a [`Reflection`] into a valid [`DataTree`]. pub fn par_try_into_tree(self) -> Result, ConversionError> { @@ -25,8 +25,10 @@ where return Err(ConversionError::ExcessiveChildren { path, size, child }); } let children: Result, _> = children - .into_par_iter() + .into_par() .map(Self::par_try_into_tree) + .collect::>>() // TODO: request orx-parallel to make collecting Result possible + .into_iter() .collect(); let children = match children { Ok(children) => children, @@ -52,9 +54,9 @@ where transform: Transform, ) -> Result, Error> where - TargetName: Send, + TargetName: Send + Sync, TargetSize: size::Size + Send + Sync, - Error: Send, + Error: Send + Sync, Transform: Fn(Name, Size) -> Result<(TargetName, TargetSize), Error> + Copy + Sync, { let Reflection { @@ -63,8 +65,10 @@ where children, } = self; let children = children - .into_par_iter() + .into_par() .map(|child| child.par_try_map(transform)) + .collect::>>() // TODO: request orx-parallel to make collecting Result possible + .into_iter() .collect::, _>>()?; let (name, size) = transform(name, size)?; Ok(Reflection { diff --git a/src/data_tree/retain.rs b/src/data_tree/retain.rs index 2de05127..eb6c2fb3 100644 --- a/src/data_tree/retain.rs +++ b/src/data_tree/retain.rs @@ -1,10 +1,10 @@ use super::DataTree; use crate::size; -use rayon::prelude::*; +use orx_parallel::*; impl DataTree where - Self: Send, + Self: Send + Sync, Size: size::Size, { /// Internal function to be used by [`Self::par_retain`]. @@ -17,7 +17,8 @@ where .retain(|child| predicate(child, current_depth)); let next_depth = current_depth + 1; self.children - .par_iter_mut() + .iter_mut() // TODO: request orx-parallel to add par_mut + .iter_into_par() .for_each(|child| child.par_retain_with_depth(next_depth, predicate)) } diff --git a/src/data_tree/sort.rs b/src/data_tree/sort.rs index 0a441a10..60c175ac 100644 --- a/src/data_tree/sort.rs +++ b/src/data_tree/sort.rs @@ -1,17 +1,18 @@ use super::DataTree; use crate::size; -use rayon::prelude::*; +use orx_parallel::*; use std::cmp::Ordering; impl DataTree where - Self: Send, + Self: Send + Sync, Size: size::Size, { /// Sort all descendants recursively, in parallel. pub fn par_sort_by(&mut self, compare: impl Fn(&Self, &Self) -> Ordering + Copy + Sync) { self.children - .par_iter_mut() + .iter_mut() // TODO: request orx-parallel to add par_mut + .iter_into_par() .for_each(|child| child.par_sort_by(compare)); self.children.sort_unstable_by(compare); } diff --git a/src/tree_builder.rs b/src/tree_builder.rs index c6e1494d..d779706b 100644 --- a/src/tree_builder.rs +++ b/src/tree_builder.rs @@ -3,7 +3,7 @@ pub mod info; pub use info::Info; use super::{data_tree::DataTree, size}; -use rayon::prelude::*; +use orx_parallel::*; /// Collection of functions and starting points in order to build a [`DataTree`] with [`From`] or [`Into`]. #[derive(Debug)] @@ -34,7 +34,7 @@ where Name: Send + Sync, GetInfo: Fn(&Path) -> Info + Copy + Send + Sync, JoinPath: Fn(&Path, &Name) -> Path + Copy + Send + Sync, - Size: size::Size + Send, + Size: size::Size + Send + Sync, { /// Create a [`DataTree`] from a [`TreeBuilder`]. fn from(builder: TreeBuilder) -> Self { @@ -50,7 +50,7 @@ where let max_depth = max_depth.saturating_sub(1); let children = children - .into_par_iter() + .into_par() .map(|name| TreeBuilder { path: join_path(&path, &name), name, diff --git a/tests/_utils.rs b/tests/_utils.rs index 3887feb9..0268fd1d 100644 --- a/tests/_utils.rs +++ b/tests/_utils.rs @@ -2,6 +2,7 @@ use build_fs_tree::{dir, file, Build, MergeableFileSystemTree}; use command_extra::CommandExtra; use derive_more::{AsRef, Deref}; use into_sorted::IntoSorted; +use orx_parallel::*; use parallel_disk_usage::{ data_tree::{DataTree, DataTreeReflection}, fs_tree_builder::FsTreeBuilder, @@ -14,7 +15,6 @@ use parallel_disk_usage::{ use pipe_trait::Pipe; use pretty_assertions::assert_eq; use rand::{distr::Alphanumeric, rng, Rng}; -use rayon::prelude::*; use std::{ cmp::Ordering, env::temp_dir, @@ -219,7 +219,7 @@ impl SampleWorkspace { // Create files in no-hardlinks. // There will be no files with nlink > 1. - (0..files_per_branch).par_bridge().for_each(|index| { + (0..files_per_branch).into_par().for_each(|index| { let file_name = format!("file-{index}.txt"); let file_path = temp.join("no-hardlinks").join(file_name); if let Err(error) = write_file(&file_path, "a".repeat(bytes_per_file)) { @@ -233,7 +233,7 @@ impl SampleWorkspace { // Each file in the second group will have 1 exclusive link. // Each file in the third and fourth groups will have no links. // Each file in the remaining groups is PLANNED to have 1 external link from only-hardlinks/mixed. - (0..whole).par_bridge().for_each(|file_index| { + (0..whole).into_par().for_each(|file_index| { let file_name = format!("file-{file_index}.txt"); let file_path = temp.join("some-hardlinks").join(file_name); if let Err(error) = write_file(&file_path, "a".repeat(bytes_per_file)) { @@ -254,7 +254,7 @@ impl SampleWorkspace { // Create files in only-hardlinks/exclusive. // Each file in this directory will have 1 exclusive link. - (0..whole).par_bridge().for_each(|index| { + (0..whole).into_par().for_each(|index| { let file_name = format!("file-{index}.txt"); let file_path = temp.join("only-hardlinks/exclusive").join(file_name); if let Err(error) = write_file(&file_path, "a".repeat(bytes_per_file)) { @@ -271,7 +271,7 @@ impl SampleWorkspace { // Let's divide the PLANNED links into 2 equal groups. // Each link in the first group is PLANNED to share with only-hardlinks/external. // Each link in the second group is exclusive. - (half..whole).par_bridge().for_each(|index| { + (half..whole).into_par().for_each(|index| { let file_name = format!("link0-{index}.txt"); let file_path = temp.join("only-hardlinks/mixed").join(file_name); if let Err(error) = write_file(&file_path, "a".repeat(bytes_per_file)) { @@ -289,7 +289,7 @@ impl SampleWorkspace { // Let's divide the links into 2 equal groups. // The first group will share with only-hardlinks/mixed. // The second group will share with some-hardlinks. - (0..whole).par_bridge().for_each(|index| { + (0..whole).into_par().for_each(|index| { let link_name = format!("linkX-{index}.txt"); let link_path = temp.join("only-hardlinks/external").join(link_name); @@ -324,7 +324,7 @@ pub fn sanitize_tree_reflection( where Name: Ord, Size: size::Size, - DataTreeReflection: Send, + DataTreeReflection: Send + Sync, { let DataTreeReflection { name, @@ -333,7 +333,7 @@ where } = tree_reflection; let children = children .into_sorted_by(|left, right| left.name.cmp(&right.name)) - .into_par_iter() + .into_par() .map(sanitize_tree_reflection) .collect(); DataTreeReflection { diff --git a/tests/hardlinks_deduplication.rs b/tests/hardlinks_deduplication.rs index 73274458..b7a8bcbb 100644 --- a/tests/hardlinks_deduplication.rs +++ b/tests/hardlinks_deduplication.rs @@ -8,6 +8,7 @@ use command_extra::CommandExtra; use into_sorted::IntoSorted; use itertools::Itertools; use normalize_path::NormalizePath; +use orx_parallel::*; use parallel_disk_usage::{ bytes_format::BytesFormat, data_tree::Reflection, @@ -21,7 +22,7 @@ use parallel_disk_usage::{ }; use pipe_trait::Pipe; use pretty_assertions::assert_eq; -use rayon::prelude::*; +use rayon::prelude::*; // TODO: replace this with `orx-parallel` use std::{ collections::HashSet, iter, @@ -919,7 +920,7 @@ fn exclusive_hardlinks_only() { .cloned() .collect(); let expected_shared_details = (0..files_per_branch) - .par_bridge() + .into_par() .map(|index| ReflectionEntry { ino: file_inode(&format!("file-{index}.txt")), size: file_size, @@ -1045,10 +1046,10 @@ fn exclusive_only_and_external_only_hardlinks() { .cloned() .collect(); let expected_shared_details = iter::empty() - .par_bridge() + .par_bridge() // TODO: replace this with `orx-parallel` .chain( (0..(files_per_branch / 2)) - .par_bridge() + .par_bridge() // TODO: replace this with `orx-parallel` .map(|index| ReflectionEntry { ino: file_inode(&format!("link0-{index}.txt")), size: file_size, @@ -1058,7 +1059,7 @@ fn exclusive_only_and_external_only_hardlinks() { ) .chain( ((files_per_branch / 2)..files_per_branch) - .par_bridge() + .par_bridge() // TODO: replace this with `orx-parallel` .map(|index| ReflectionEntry { ino: file_inode(&format!("link0-{index}.txt")), size: file_size, @@ -1208,7 +1209,7 @@ fn external_hardlinks_only() { .cloned() .collect(); let expected_shared_details = (0..files_per_branch) - .par_bridge() + .into_par() .map(|index| ReflectionEntry { ino: file_inode(&format!("linkX-{index}.txt")), size: file_size, diff --git a/tests/visualizer.rs b/tests/visualizer.rs index 8c839f26..705e7a02 100644 --- a/tests/visualizer.rs +++ b/tests/visualizer.rs @@ -67,7 +67,7 @@ macro_rules! test_case { fn typical_tree(size_per_dir: Size, file_size_factor: u64) -> DataTree<&'static str, Size> where - Size: size::Size + Ord + From + Send, + Size: size::Size + Ord + From + Send + Sync, { let dir = DataTree::<&'static str, Size>::fixed_size_dir_constructor(size_per_dir); let file = @@ -645,7 +645,7 @@ test_case! { fn empty_dir(inode_size: Size) -> DataTree<&'static str, Size> where - Size: size::Size + Ord + From + Send, + Size: size::Size + Ord + From + Send + Sync, { DataTree::dir("empty directory", inode_size, Vec::new()).into_par_sorted(order_tree) } @@ -691,7 +691,7 @@ test_case! { fn long_and_short_names() -> DataTree<&'static str, Size> where - Size: size::Size + Ord + From + Send, + Size: size::Size + Ord + From + Send + Sync, { let dir = DataTree::<&'static str, Size>::fixed_size_dir_constructor(1.into()); let file = |name: &'static str, size: u64| DataTree::file(name, Size::from(size)); @@ -890,7 +890,7 @@ test_case! { fn tree_with_a_file_of_extremely_long_name() -> DataTree<&'static str, Size> where - Size: size::Size + Ord + From + Send, + Size: size::Size + Ord + From + Send + Sync, { let dir = DataTree::<&'static str, Size>::fixed_size_dir_constructor(4069.into()); let file = |name: &'static str, size: u64| DataTree::file(name, Size::from(size)); @@ -934,7 +934,7 @@ test_case! { fn big_tree_with_long_names() -> DataTree<&'static str, Size> where - Size: size::Size + Ord + From + Send, + Size: size::Size + Ord + From + Send + Sync, { let dir = DataTree::<&'static str, Size>::fixed_size_dir_constructor(4069.into()); let file = |name: &'static str, size: u64| DataTree::file(name, Size::from(size));