diff --git a/src/args.rs b/src/args.rs index 4b7939c..935e76f 100644 --- a/src/args.rs +++ b/src/args.rs @@ -507,6 +507,10 @@ pub struct Arg { #[arg(long, global = true)] /// Disable compressing index in Kirikiri XP3 archive when creating XP3 archive. pub xp3_no_compress_index: bool, + #[cfg(feature = "kirikiri-arc")] + #[arg(long, global = true, default_value_t = num_cpus::get())] + /// Workers count for compress files in Kirikiri XP3 archive when creating in parallel. + pub xp3_compress_workers: usize, #[command(subcommand)] /// Command pub command: Command, diff --git a/src/main.rs b/src/main.rs index c0406ed..15d3b08 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2755,6 +2755,8 @@ fn main() { xp3_compress_files: !arg.xp3_no_compress_files, #[cfg(feature = "kirikiri-arc")] xp3_compress_index: !arg.xp3_no_compress_index, + #[cfg(feature = "kirikiri-arc")] + xp3_compress_workers: arg.xp3_compress_workers, }); match &arg.command { args::Command::Export { input, output } => { diff --git a/src/scripts/kirikiri/archive/xp3pack/writer.rs b/src/scripts/kirikiri/archive/xp3pack/writer.rs index 099b009..218d3ac 100644 --- a/src/scripts/kirikiri/archive/xp3pack/writer.rs +++ b/src/scripts/kirikiri/archive/xp3pack/writer.rs @@ -10,7 +10,7 @@ use crate::utils::encoding::*; use crate::utils::threadpool::ThreadPool; use anyhow::Result; use sha2::{Digest, Sha256}; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::io::{Seek, Write}; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; @@ -71,6 +71,8 @@ pub struct Xp3ArchiveWriter { zlib_compression_level: u32, segmenter: Option>>, stats: Arc, + compress_workers: Option>>>, + processing_segments: Arc>>, } impl Xp3ArchiveWriter> { @@ -101,6 +103,16 @@ impl Xp3ArchiveWriter> { zlib_compression_level: config.zlib_compression_level, segmenter, stats: Arc::new(Stats::default()), + compress_workers: if config.xp3_compress_files { + Some(Arc::new(ThreadPool::new( + config.xp3_compress_workers.max(1), + Some("xp3-compress"), + false, + )?)) + } else { + None + }, + processing_segments: Arc::new(Mutex::new(HashSet::new())), }) } } @@ -161,12 +173,14 @@ impl Archive for Xp3ArchiveWriter { for err in self.runner.take_results() { err?; } - let mut item = { + let item = { let items = self.items.lock_blocking(); - items - .get(name) - .ok_or_else(|| anyhow::anyhow!("File not found in archive: {}", name))? - .clone() + Arc::new(Mutex::new( + items + .get(name) + .ok_or_else(|| anyhow::anyhow!("File not found in archive: {}", name))? + .clone(), + )) }; let (reader, writer) = std::io::pipe()?; let reader = Reader::new(reader); @@ -178,6 +192,8 @@ impl Archive for Xp3ArchiveWriter { let stats = self.stats.clone(); let is_compressed = self.compress_files; let zlib_compression_level = self.zlib_compression_level; + let workers = self.compress_workers.clone(); + let processiong_segments = self.processing_segments.clone(); self.runner.execute( move |_| { let mut reader = reader; @@ -186,6 +202,8 @@ impl Archive for Xp3ArchiveWriter { for seg in segmenter.segment(&mut reader) { let seg = seg?; let hash: [u8; 32] = Sha256::digest(&seg).into(); + let seg_offset_in_file = offset_in_file; + offset_in_file += seg.len() as u64; let fseg = match { let mut segments = segments.lock_blocking(); if let Some(old_seg) = segments.get(&hash) { @@ -202,43 +220,131 @@ impl Archive for Xp3ArchiveWriter { } } { Ok(mut info) => { - let data = if is_compressed { - let mut e = flate2::write::ZlibEncoder::new( - Vec::new(), - flate2::Compression::new(zlib_compression_level), - ); - e.write_all(&seg)?; - e.finish()? + if let Some(workers) = workers.as_ref() { + { + let mut processing = + processiong_segments.lock_blocking(); + processing.insert(hash); + } + let file = file.clone(); + let segments = segments.clone(); + let stats = stats.clone(); + let item = item.clone(); + let processiong_segments = processiong_segments.clone(); + workers.execute( + move |_| { + let data = { + let mut e = flate2::write::ZlibEncoder::new( + Vec::new(), + flate2::Compression::new( + zlib_compression_level, + ), + ); + e.write_all(&seg)?; + e.finish()? + }; + let mut file = file.lock_blocking(); + let start = file.seek(std::io::SeekFrom::End(0))?; + file.write_all(&data)?; + info.start = start; + info.archived_size = data.len() as u64; + let stats = stats.clone(); + stats.total_original_size.fetch_add( + info.original_size, + Ordering::Relaxed, + ); + stats.final_archive_size.fetch_add( + info.archived_size, + Ordering::Relaxed, + ); + stats + .total_segments + .fetch_add(1, Ordering::Relaxed); + stats + .unique_segments + .fetch_add(1, Ordering::Relaxed); + let mut segments = segments.lock_blocking(); + segments.insert(hash, info.clone()); + let ninfo = Segment { + is_compressed: info.is_compressed, + start: info.start, + offset_in_file: seg_offset_in_file, + original_size: info.original_size, + archived_size: info.archived_size, + }; + let mut item = item.lock_blocking(); + item.original_size += ninfo.original_size; + item.archived_size += ninfo.archived_size; + item.segments.push(ninfo); + let mut processing = + processiong_segments.lock_blocking(); + processing.remove(&hash); + Ok(()) + }, + true, + )?; + None } else { - seg - }; - let mut file = file.lock_blocking(); - let start = file.seek(std::io::SeekFrom::End(0))?; - file.write_all(&data)?; - info.start = start; - info.archived_size = data.len() as u64; - let stats = stats.clone(); - stats - .total_original_size - .fetch_add(info.original_size, Ordering::Relaxed); - stats - .final_archive_size - .fetch_add(info.archived_size, Ordering::Relaxed); - stats.total_segments.fetch_add(1, Ordering::Relaxed); - stats.unique_segments.fetch_add(1, Ordering::Relaxed); - let mut segments = segments.lock_blocking(); - segments.insert(hash, info.clone()); - let ninfo = Segment { - is_compressed: info.is_compressed, - start: info.start, - offset_in_file: offset_in_file, - original_size: info.original_size, - archived_size: info.archived_size, - }; - offset_in_file += info.original_size; - ninfo + { + let mut processing = + processiong_segments.lock_blocking(); + processing.insert(hash); + } + let data = seg; + let mut file = file.lock_blocking(); + let start = file.seek(std::io::SeekFrom::End(0))?; + file.write_all(&data)?; + info.start = start; + info.archived_size = data.len() as u64; + let stats = stats.clone(); + stats + .total_original_size + .fetch_add(info.original_size, Ordering::Relaxed); + stats + .final_archive_size + .fetch_add(info.archived_size, Ordering::Relaxed); + stats.total_segments.fetch_add(1, Ordering::Relaxed); + stats.unique_segments.fetch_add(1, Ordering::Relaxed); + let mut segments = segments.lock_blocking(); + segments.insert(hash, info.clone()); + let ninfo = Segment { + is_compressed: info.is_compressed, + start: info.start, + offset_in_file: seg_offset_in_file, + original_size: info.original_size, + archived_size: info.archived_size, + }; + { + let mut processing = + processiong_segments.lock_blocking(); + processing.remove(&hash); + } + Some(ninfo) + } } - Err(seg_info) => { + Err(mut seg_info) => { + let mut need_update = false; + loop { + if { + let processing = processiong_segments.lock_blocking(); + !processing.contains(&hash) + } { + break; + } + need_update = true; + std::thread::sleep(std::time::Duration::from_millis(10)); + } + if need_update { + seg_info = { + let segments = segments.lock_blocking(); + segments + .get(&hash) + .ok_or(anyhow::anyhow!( + "Failed to get latest segment info." + ))? + .clone() + }; + } let stats = stats.clone(); stats .total_original_size @@ -250,17 +356,19 @@ impl Archive for Xp3ArchiveWriter { let ninfo = Segment { is_compressed: seg_info.is_compressed, start: seg_info.start, - offset_in_file: offset_in_file, + offset_in_file: seg_offset_in_file, original_size: seg_info.original_size, archived_size: seg_info.archived_size, }; - offset_in_file += seg_info.original_size; - ninfo + Some(ninfo) } }; - item.original_size += fseg.original_size; - item.archived_size += fseg.archived_size; - item.segments.push(fseg); + if let Some(fseg) = fseg { + let mut item = item.lock_blocking(); + item.original_size += fseg.original_size; + item.archived_size += fseg.archived_size; + item.segments.push(fseg); + } } } else { let mut file = file.lock_blocking(); @@ -288,6 +396,7 @@ impl Archive for Xp3ArchiveWriter { size }, }; + let mut item = item.lock_blocking(); item.original_size += ninfo.original_size; item.archived_size += ninfo.archived_size; let stats = stats.clone(); @@ -301,7 +410,15 @@ impl Archive for Xp3ArchiveWriter { stats.unique_segments.fetch_add(1, Ordering::Relaxed); item.segments.push(ninfo); } + if let Some(workers) = workers { + workers.join(); + for err in workers.take_results() { + err?; + } + } + let mut item = item.lock_blocking().to_owned(); item.file_hash = reader.into_checksum(); + item.segments.sort_by_key(|s| s.offset_in_file); let mut items = items.lock_blocking(); items.insert(item.name.clone(), item); Ok(()) diff --git a/src/types.rs b/src/types.rs index abd4049..7389ba2 100644 --- a/src/types.rs +++ b/src/types.rs @@ -491,6 +491,10 @@ pub struct ExtraConfig { #[default(true)] /// Compress index in Kirikiri XP3 archive when creating. Default is true. pub xp3_compress_index: bool, + #[cfg(feature = "kirikiri-arc")] + #[default(num_cpus::get())] + /// Workers count for compress files in Kirikiri XP3 archive when creating in parallel. Default is CPU cores count. + pub xp3_compress_workers: usize, } #[derive(Clone, Copy, Debug, ValueEnum, PartialEq, Eq, PartialOrd, Ord)]