Add custom segmenter support

This commit is contained in:
2026-05-20 14:59:52 +08:00
parent 8487fe9ee6
commit 7ac2c19dd8
6 changed files with 135 additions and 9 deletions

View File

@@ -554,6 +554,7 @@ pub struct Arg {
/// none segmenter - none
/// fastcdc segmenter - cdc:<min>:<avg>:<max>
/// fixed segmenter - fixed:<size>
/// custom segmenter - custom:<json_path>
pub xp3_segmenter: crate::scripts::kirikiri::archive::xp3::SegmenterConfig,
#[cfg(feature = "kirikiri-arc")]
#[arg(long, global = true)]

View File

@@ -3320,7 +3320,7 @@ fn main() {
#[cfg(feature = "kirikiri-arc")]
xp3_mdf_decompress: !arg.xp3_no_mdf_decompress,
#[cfg(feature = "kirikiri-arc")]
xp3_segmenter: arg.xp3_segmenter,
xp3_segmenter: arg.xp3_segmenter.clone(),
#[cfg(feature = "kirikiri-arc")]
xp3_compress_files: !arg.xp3_no_compress_files,
#[cfg(feature = "kirikiri-arc")]

View File

@@ -71,6 +71,16 @@ pub fn parse_segmenter_config(str: &str) -> Result<SegmenterConfig> {
}
Ok(SegmenterConfig::Fixed(size as usize))
}
"custom" => {
if parts.len() != 2 {
return Err(anyhow::anyhow!(
"Invalid Fixed segmenter config. Expected format: custom:json_path"
));
}
let json_path = parts[1];
let data = std::fs::read_to_string(json_path)?;
Ok(SegmenterConfig::Custom(serde_json::from_str(&data)?))
}
_ => Err(anyhow::anyhow!("Unknown segmenter type: {}", parts[0])),
}
}

View File

@@ -4,6 +4,7 @@ use std::io::{PipeReader, Read};
pub struct Reader {
inner: PipeReader,
adler: Adler32,
readed: u64,
}
impl Reader {
@@ -11,18 +12,24 @@ impl Reader {
Self {
inner,
adler: Adler32::new(),
readed: 0,
}
}
pub fn into_checksum(self) -> u32 {
self.adler.checksum()
}
pub fn total_readed(&self) -> u64 {
self.readed
}
}
impl Read for Reader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let n = self.inner.read(buf)?;
self.adler.write_slice(&buf[..n]);
self.readed += n as u64;
Ok(n)
}
}

View File

@@ -1,9 +1,20 @@
use super::reader::Reader;
use anyhow::Result;
use fastcdc::v2020::StreamCDC;
use serde::Deserialize;
use std::collections::HashMap;
use std::io::Read;
use std::sync::Arc;
#[derive(Copy, Clone, Debug)]
#[derive(Clone, Debug, Deserialize)]
pub struct Segments {
segments: Arc<HashMap<String, Vec<u64>>>,
#[serde(default)]
default_config: Box<SegmenterConfig>,
}
#[derive(Clone, Debug, Deserialize)]
#[serde(tag = "@type")]
/// Configuration options for the segmenter.
pub enum SegmenterConfig {
/// Do not segment the data.
@@ -16,6 +27,7 @@ pub enum SegmenterConfig {
},
/// Use fixed-size segments.
Fixed(usize),
Custom(Segments),
}
impl Default for SegmenterConfig {
@@ -39,6 +51,7 @@ pub trait Segmenter {
fn segment<'a>(
&'a self,
data: &'a mut Reader,
filename: &'a str,
) -> Box<dyn Iterator<Item = Result<Vec<u8>>> + 'a>;
}
@@ -52,6 +65,7 @@ impl Segmenter for FastCdcSegmenter {
fn segment<'a>(
&'a self,
data: &'a mut Reader,
_filename: &'a str,
) -> Box<dyn Iterator<Item = Result<Vec<u8>>> + 'a> {
let cdc = StreamCDC::new(
data,
@@ -71,6 +85,7 @@ impl Segmenter for FixedSizeSegmenter {
fn segment<'a>(
&'a self,
data: &'a mut Reader,
_filename: &'a str,
) -> Box<dyn Iterator<Item = Result<Vec<u8>>> + 'a> {
let size = self.size;
let mut buf = vec![0; size];
@@ -93,7 +108,90 @@ impl Segmenter for FixedSizeSegmenter {
}
}
pub fn create_segmenter(config: SegmenterConfig) -> Option<Box<dyn Segmenter + Send + Sync>> {
pub struct CustomSegmenter {
segments: Arc<HashMap<String, Vec<u64>>>,
inner: Box<dyn Segmenter + Send + Sync>,
}
impl Segmenter for CustomSegmenter {
fn segment<'a>(
&'a self,
data: &'a mut Reader,
filename: &'a str,
) -> Box<dyn Iterator<Item = Result<Vec<u8>>> + 'a> {
if let Some(segment_offsets) = self.segments.get(filename) {
let mut current_seg_idx = 0;
let mut reached_eof = false;
Box::new(std::iter::from_fn(move || {
if reached_eof {
return None;
}
// 获取当前 Reader 的绝对位置
let current_pos = data.total_readed();
if current_seg_idx < segment_offsets.len() {
// 1. 处理预设的分割点
let target_pos = segment_offsets[current_seg_idx];
current_seg_idx += 1;
if target_pos <= current_pos {
// 如果分割点无效(小于当前位置),跳过或视作该段为空
return Some(Ok(Vec::new()));
}
let to_read = (target_pos - current_pos) as usize;
let mut buf = vec![0; to_read];
let mut total_read = 0;
while total_read < to_read {
match data.read(&mut buf[total_read..]) {
Ok(0) => {
reached_eof = true;
break;
}
Ok(n) => total_read += n,
Err(e) => return Some(Err(e.into())),
}
}
if total_read == 0 && reached_eof {
None
} else {
buf.truncate(total_read);
Some(Ok(buf))
}
} else {
// 2. 处理“最后一个分割点之后”的剩余数据 (Tail)
// 标记为已到达末尾,保证下一次调用返回 None
reached_eof = true;
let mut final_buf = Vec::new();
let mut temp_buf = [0u8; 8192]; // 临时缓冲区用于读取剩余所有内容
loop {
match data.read(&mut temp_buf) {
Ok(0) => break,
Ok(n) => final_buf.extend_from_slice(&temp_buf[..n]),
Err(e) => return Some(Err(e.into())),
}
}
if final_buf.is_empty() {
None
} else {
Some(Ok(final_buf))
}
}
}))
} else {
self.inner.segment(data, filename)
}
}
}
pub fn create_segmenter(config: &SegmenterConfig) -> Option<Box<dyn Segmenter + Send + Sync>> {
match config {
SegmenterConfig::None => None,
SegmenterConfig::FastCdc {
@@ -101,10 +199,19 @@ pub fn create_segmenter(config: SegmenterConfig) -> Option<Box<dyn Segmenter + S
avg_size,
max_size,
} => Some(Box::new(FastCdcSegmenter {
min_size,
avg_size,
max_size,
min_size: *min_size,
avg_size: *avg_size,
max_size: *max_size,
})),
SegmenterConfig::Fixed(size) => Some(Box::new(FixedSizeSegmenter { size: *size })),
SegmenterConfig::Custom(manifest) => Some(Box::new(CustomSegmenter {
segments: manifest.segments.clone(),
inner: match create_segmenter(&manifest.default_config) {
Some(cfg) => cfg,
None => {
return None;
}
},
})),
SegmenterConfig::Fixed(size) => Some(Box::new(FixedSizeSegmenter { size })),
}
}

View File

@@ -101,7 +101,7 @@ impl Xp3ArchiveWriter<std::io::BufWriter<std::fs::File>> {
};
items.insert(file.to_string(), item);
}
let segmenter = create_segmenter(config.xp3_segmenter).map(|s| Arc::new(s));
let segmenter = create_segmenter(&config.xp3_segmenter).map(|s| Arc::new(s));
file.write_all(XP3_MAGIC)?;
file.write_u64(0)?; // Placeholder for index offset
Ok(Self {
@@ -244,12 +244,13 @@ impl<T: Write + Seek + Sync + Send + 'static> Archive for Xp3ArchiveWriter<T> {
#[cfg(feature = "zopfli")]
let zopfli_maximum_block_splits = self.zopfli_maximum_block_splits;
let zstd_compression_level = self.zstd_compression_level;
let name = name.to_owned();
self.runner.execute(
move |_| {
let mut reader = reader;
let mut offset_in_file = 0u64;
if let Some(segmenter) = segmenter {
for seg in segmenter.segment(&mut reader) {
for seg in segmenter.segment(&mut reader, &name) {
let seg = seg?;
let hash: [u8; 32] = Sha256::digest(&seg).into();
let seg_offset_in_file = offset_in_file;