Use thread pool to compress block in xp3 file

This commit is contained in:
2025-10-11 09:29:16 +08:00
parent 309bccd485
commit ad82ba13fc
4 changed files with 174 additions and 47 deletions

View File

@@ -10,7 +10,7 @@ use crate::utils::encoding::*;
use crate::utils::threadpool::ThreadPool;
use anyhow::Result;
use sha2::{Digest, Sha256};
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::io::{Seek, Write};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
@@ -71,6 +71,8 @@ pub struct Xp3ArchiveWriter<T: Write + Seek> {
zlib_compression_level: u32,
segmenter: Option<Arc<Box<dyn Segmenter + Send + Sync>>>,
stats: Arc<Stats>,
compress_workers: Option<Arc<ThreadPool<Result<()>>>>,
processing_segments: Arc<Mutex<HashSet<[u8; 32]>>>,
}
impl Xp3ArchiveWriter<std::io::BufWriter<std::fs::File>> {
@@ -101,6 +103,16 @@ impl Xp3ArchiveWriter<std::io::BufWriter<std::fs::File>> {
zlib_compression_level: config.zlib_compression_level,
segmenter,
stats: Arc::new(Stats::default()),
compress_workers: if config.xp3_compress_files {
Some(Arc::new(ThreadPool::new(
config.xp3_compress_workers.max(1),
Some("xp3-compress"),
false,
)?))
} else {
None
},
processing_segments: Arc::new(Mutex::new(HashSet::new())),
})
}
}
@@ -161,12 +173,14 @@ impl<T: Write + Seek + Sync + Send + 'static> Archive for Xp3ArchiveWriter<T> {
for err in self.runner.take_results() {
err?;
}
let mut item = {
let item = {
let items = self.items.lock_blocking();
items
.get(name)
.ok_or_else(|| anyhow::anyhow!("File not found in archive: {}", name))?
.clone()
Arc::new(Mutex::new(
items
.get(name)
.ok_or_else(|| anyhow::anyhow!("File not found in archive: {}", name))?
.clone(),
))
};
let (reader, writer) = std::io::pipe()?;
let reader = Reader::new(reader);
@@ -178,6 +192,8 @@ impl<T: Write + Seek + Sync + Send + 'static> Archive for Xp3ArchiveWriter<T> {
let stats = self.stats.clone();
let is_compressed = self.compress_files;
let zlib_compression_level = self.zlib_compression_level;
let workers = self.compress_workers.clone();
let processiong_segments = self.processing_segments.clone();
self.runner.execute(
move |_| {
let mut reader = reader;
@@ -186,6 +202,8 @@ impl<T: Write + Seek + Sync + Send + 'static> Archive for Xp3ArchiveWriter<T> {
for seg in segmenter.segment(&mut reader) {
let seg = seg?;
let hash: [u8; 32] = Sha256::digest(&seg).into();
let seg_offset_in_file = offset_in_file;
offset_in_file += seg.len() as u64;
let fseg = match {
let mut segments = segments.lock_blocking();
if let Some(old_seg) = segments.get(&hash) {
@@ -202,43 +220,131 @@ impl<T: Write + Seek + Sync + Send + 'static> Archive for Xp3ArchiveWriter<T> {
}
} {
Ok(mut info) => {
let data = if is_compressed {
let mut e = flate2::write::ZlibEncoder::new(
Vec::new(),
flate2::Compression::new(zlib_compression_level),
);
e.write_all(&seg)?;
e.finish()?
if let Some(workers) = workers.as_ref() {
{
let mut processing =
processiong_segments.lock_blocking();
processing.insert(hash);
}
let file = file.clone();
let segments = segments.clone();
let stats = stats.clone();
let item = item.clone();
let processiong_segments = processiong_segments.clone();
workers.execute(
move |_| {
let data = {
let mut e = flate2::write::ZlibEncoder::new(
Vec::new(),
flate2::Compression::new(
zlib_compression_level,
),
);
e.write_all(&seg)?;
e.finish()?
};
let mut file = file.lock_blocking();
let start = file.seek(std::io::SeekFrom::End(0))?;
file.write_all(&data)?;
info.start = start;
info.archived_size = data.len() as u64;
let stats = stats.clone();
stats.total_original_size.fetch_add(
info.original_size,
Ordering::Relaxed,
);
stats.final_archive_size.fetch_add(
info.archived_size,
Ordering::Relaxed,
);
stats
.total_segments
.fetch_add(1, Ordering::Relaxed);
stats
.unique_segments
.fetch_add(1, Ordering::Relaxed);
let mut segments = segments.lock_blocking();
segments.insert(hash, info.clone());
let ninfo = Segment {
is_compressed: info.is_compressed,
start: info.start,
offset_in_file: seg_offset_in_file,
original_size: info.original_size,
archived_size: info.archived_size,
};
let mut item = item.lock_blocking();
item.original_size += ninfo.original_size;
item.archived_size += ninfo.archived_size;
item.segments.push(ninfo);
let mut processing =
processiong_segments.lock_blocking();
processing.remove(&hash);
Ok(())
},
true,
)?;
None
} else {
seg
};
let mut file = file.lock_blocking();
let start = file.seek(std::io::SeekFrom::End(0))?;
file.write_all(&data)?;
info.start = start;
info.archived_size = data.len() as u64;
let stats = stats.clone();
stats
.total_original_size
.fetch_add(info.original_size, Ordering::Relaxed);
stats
.final_archive_size
.fetch_add(info.archived_size, Ordering::Relaxed);
stats.total_segments.fetch_add(1, Ordering::Relaxed);
stats.unique_segments.fetch_add(1, Ordering::Relaxed);
let mut segments = segments.lock_blocking();
segments.insert(hash, info.clone());
let ninfo = Segment {
is_compressed: info.is_compressed,
start: info.start,
offset_in_file: offset_in_file,
original_size: info.original_size,
archived_size: info.archived_size,
};
offset_in_file += info.original_size;
ninfo
{
let mut processing =
processiong_segments.lock_blocking();
processing.insert(hash);
}
let data = seg;
let mut file = file.lock_blocking();
let start = file.seek(std::io::SeekFrom::End(0))?;
file.write_all(&data)?;
info.start = start;
info.archived_size = data.len() as u64;
let stats = stats.clone();
stats
.total_original_size
.fetch_add(info.original_size, Ordering::Relaxed);
stats
.final_archive_size
.fetch_add(info.archived_size, Ordering::Relaxed);
stats.total_segments.fetch_add(1, Ordering::Relaxed);
stats.unique_segments.fetch_add(1, Ordering::Relaxed);
let mut segments = segments.lock_blocking();
segments.insert(hash, info.clone());
let ninfo = Segment {
is_compressed: info.is_compressed,
start: info.start,
offset_in_file: seg_offset_in_file,
original_size: info.original_size,
archived_size: info.archived_size,
};
{
let mut processing =
processiong_segments.lock_blocking();
processing.remove(&hash);
}
Some(ninfo)
}
}
Err(seg_info) => {
Err(mut seg_info) => {
let mut need_update = false;
loop {
if {
let processing = processiong_segments.lock_blocking();
!processing.contains(&hash)
} {
break;
}
need_update = true;
std::thread::sleep(std::time::Duration::from_millis(10));
}
if need_update {
seg_info = {
let segments = segments.lock_blocking();
segments
.get(&hash)
.ok_or(anyhow::anyhow!(
"Failed to get latest segment info."
))?
.clone()
};
}
let stats = stats.clone();
stats
.total_original_size
@@ -250,17 +356,19 @@ impl<T: Write + Seek + Sync + Send + 'static> Archive for Xp3ArchiveWriter<T> {
let ninfo = Segment {
is_compressed: seg_info.is_compressed,
start: seg_info.start,
offset_in_file: offset_in_file,
offset_in_file: seg_offset_in_file,
original_size: seg_info.original_size,
archived_size: seg_info.archived_size,
};
offset_in_file += seg_info.original_size;
ninfo
Some(ninfo)
}
};
item.original_size += fseg.original_size;
item.archived_size += fseg.archived_size;
item.segments.push(fseg);
if let Some(fseg) = fseg {
let mut item = item.lock_blocking();
item.original_size += fseg.original_size;
item.archived_size += fseg.archived_size;
item.segments.push(fseg);
}
}
} else {
let mut file = file.lock_blocking();
@@ -288,6 +396,7 @@ impl<T: Write + Seek + Sync + Send + 'static> Archive for Xp3ArchiveWriter<T> {
size
},
};
let mut item = item.lock_blocking();
item.original_size += ninfo.original_size;
item.archived_size += ninfo.archived_size;
let stats = stats.clone();
@@ -301,7 +410,15 @@ impl<T: Write + Seek + Sync + Send + 'static> Archive for Xp3ArchiveWriter<T> {
stats.unique_segments.fetch_add(1, Ordering::Relaxed);
item.segments.push(ninfo);
}
if let Some(workers) = workers {
workers.join();
for err in workers.take_results() {
err?;
}
}
let mut item = item.lock_blocking().to_owned();
item.file_hash = reader.into_checksum();
item.segments.sort_by_key(|s| s.offset_in_file);
let mut items = items.lock_blocking();
items.insert(item.name.clone(), item);
Ok(())