Add thread pool support for BGI CBG Decoder

This commit is contained in:
2025-09-13 18:44:45 +08:00
parent 8df08f487b
commit e5e437b1e3
10 changed files with 280 additions and 13 deletions

View File

@@ -172,6 +172,11 @@ pub struct Arg {
/// Whether to create scrambled SysGrp images. When in import mode, the default value depends on the original image.
/// When in creation mode, it is not enabled by default.
pub bgi_img_scramble: Option<bool>,
#[cfg(feature = "bgi-img")]
#[arg(long, global = true, default_value_t = crate::types::get_default_threads())]
/// Workers count for decode BGI compressed images v2 in parallel. Default is half of CPU cores.
/// Set this to 1 to disable parallel decoding. 0 means same as 1.
pub bgi_img_workers: usize,
#[cfg(feature = "cat-system-arc")]
#[arg(long, global = true, group = "cat_system_int_encrypt_passwordg")]
/// CatSystem2 engine int archive password

View File

@@ -3,6 +3,7 @@ pub mod atomic;
#[cfg(feature = "fancy-regex")]
pub mod fancy_regex;
pub mod io;
pub mod mutex;
pub mod path;
#[cfg(feature = "emote-psb")]
pub mod psb;

19
src/ext/mutex.rs Normal file
View File

@@ -0,0 +1,19 @@
//! Extension for [std::sync::Mutex].
pub trait MutexExt<T> {
/// Lock the mutex, blocking the current thread until it can be acquired.
fn lock_blocking(&self) -> std::sync::MutexGuard<'_, T>;
}
impl<T> MutexExt<T> for std::sync::Mutex<T> {
fn lock_blocking(&self) -> std::sync::MutexGuard<'_, T> {
loop {
match self.try_lock() {
Ok(guard) => return guard,
Err(std::sync::TryLockError::WouldBlock) => {
std::thread::yield_now();
}
Err(std::sync::TryLockError::Poisoned(err)) => return err.into_inner(),
}
}
}
}

View File

@@ -1925,6 +1925,8 @@ fn main() {
kirikiri_title: arg.kirikiri_title,
#[cfg(feature = "favorite")]
favorite_hcb_filter_ascii: !arg.favorite_hcb_no_filter_ascii,
#[cfg(feature = "bgi-img")]
bgi_img_workers: arg.bgi_img_workers,
};
match &arg.command {
args::Command::Export { input, output } => {

View File

@@ -6,6 +6,7 @@ use crate::types::*;
use crate::utils::bit_stream::*;
use crate::utils::img::*;
use crate::utils::struct_pack::*;
use crate::utils::threadpool::*;
use anyhow::Result;
use msg_tool_macro::*;
use std::io::{Read, Seek, Write};
@@ -133,6 +134,7 @@ pub struct BgiCBG {
header: BgiCBGHeader,
data: MemReader,
color_type: CbgColorType,
decode_workers: usize,
}
impl BgiCBG {
@@ -140,7 +142,7 @@ impl BgiCBG {
///
/// * `data` - The buffer containing the script data.
/// * `config` - Extra configuration options.
pub fn new(data: Vec<u8>, _config: &ExtraConfig) -> Result<Self> {
pub fn new(data: Vec<u8>, config: &ExtraConfig) -> Result<Self> {
let mut reader = MemReader::new(data);
let mut magic = [0u8; 16];
reader.read_exact(&mut magic)?;
@@ -167,6 +169,7 @@ impl BgiCBG {
header,
data: reader,
color_type,
decode_workers: config.bgi_img_workers.max(1),
})
}
}
@@ -185,7 +188,12 @@ impl Script for BgiCBG {
}
fn export_image(&self) -> Result<ImageData> {
let decoder = CbgDecoder::new(self.data.to_ref(), &self.header, self.color_type)?;
let decoder = CbgDecoder::new(
self.data.to_ref(),
&self.header,
self.color_type,
self.decode_workers,
)?;
Ok(decoder.unpack()?)
}
@@ -209,6 +217,7 @@ struct CbgDecoder<'a> {
magic: u32,
pixel_size: u8,
stride: usize,
workers: usize,
}
impl<'a> CbgDecoder<'a> {
@@ -216,6 +225,7 @@ impl<'a> CbgDecoder<'a> {
reader: MemReaderRef<'a>,
info: &'a BgiCBGHeader,
color_type: CbgColorType,
workers: usize,
) -> Result<Self> {
let magic = 0;
let key = info.key;
@@ -230,6 +240,7 @@ impl<'a> CbgDecoder<'a> {
color_type,
pixel_size,
stride,
workers,
})
}
@@ -334,7 +345,7 @@ impl<'a> CbgDecoder<'a> {
has_alpha: AtomicBool::new(false),
});
let mut tasks = Vec::new();
let thread_pool = ThreadPool::new(self.workers, Some("cbg-decoder-worker-"));
let mut dst = 0i32;
for i in 0..y_blocks {
@@ -347,23 +358,27 @@ impl<'a> CbgDecoder<'a> {
let closure_dst = dst;
let decoder_ref = Arc::clone(&decoder);
let task = std::thread::spawn(move || {
decoder_ref.unpack_block(block_offset, next_offset - block_offset, closure_dst)
});
tasks.push(task);
thread_pool.execute(
move || {
decoder_ref.unpack_block(block_offset, next_offset - block_offset, closure_dst)
},
true,
)?;
dst += width * 32;
}
if self.info.bpp == 32 {
let decoder_ref = Arc::clone(&decoder);
let task =
std::thread::spawn(move || decoder_ref.unpack_alpha(offsets[y_blocks as usize]));
tasks.push(task);
thread_pool.execute(
move || decoder_ref.unpack_alpha(offsets[y_blocks as usize]),
true,
)?;
}
let tasks = thread_pool.into_results();
for task in tasks {
task.join()
.map_err(|e| anyhow::anyhow!("Thread join failed: {:?}", e))??;
task?;
}
let has_alpha = decoder.has_alpha.qload();

View File

@@ -426,6 +426,11 @@ pub struct ExtraConfig {
#[default(true)]
/// Whether to filter ascii strings in Favorite HCB script.
pub favorite_hcb_filter_ascii: bool,
#[cfg(feature = "bgi-img")]
#[default(get_default_threads())]
/// Workers count for decode BGI compressed images v2 in parallel. Default is half of CPU cores.
/// Set this to 1 to disable parallel decoding. 0 means same as 1.
pub bgi_img_workers: usize,
}
#[derive(Clone, Copy, Debug, ValueEnum, PartialEq, Eq, PartialOrd, Ord)]
@@ -915,3 +920,9 @@ impl AsRef<str> for LosslessAudioFormat {
}
}
}
#[cfg(feature = "utils-threadpool")]
#[allow(unused)]
pub(crate) fn get_default_threads() -> usize {
num_cpus::get().max(2) / 2
}

View File

@@ -25,6 +25,8 @@ pub mod pcm;
#[cfg(feature = "utils-str")]
pub mod str;
pub mod struct_pack;
#[cfg(feature = "utils-threadpool")]
pub mod threadpool;
#[cfg(windows)]
pub use encoding_win::WinError;

209
src/utils/threadpool.rs Normal file
View File

@@ -0,0 +1,209 @@
//! Thread pool utilities
use crate::ext::mutex::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{
Arc, Condvar, Mutex,
mpsc::{Receiver, SyncSender, TrySendError, sync_channel},
};
use std::thread::{self, JoinHandle};
type Job<T> = Box<dyn FnOnce() -> T + Send + 'static>;
/// A simple generic thread pool.
///
/// - T: the return type of tasks. Completed task results are stored in `results: Arc<Mutex<Vec<T>>>`.
/// - execute accepts a task and a `block_if_full` flag:
/// * if true, submission will block when the pool is saturated until a worker becomes available;
/// * if false, submission will return an error when the pool is saturated.
/// - join waits until all submitted tasks have completed (it does not shut down the pool).
pub struct ThreadPool<T: Send + 'static> {
sender: Option<SyncSender<Job<T>>>,
#[allow(unused)]
receiver: Arc<Mutex<Receiver<Job<T>>>>,
workers: Vec<JoinHandle<()>>,
/// Completed task results
pub results: Arc<Mutex<Vec<T>>>,
/// Number of pending tasks (queued + running)
pending: Arc<AtomicUsize>,
/// Pair for wait/notify in join
pending_pair: Arc<(Mutex<()>, Condvar)>,
size: usize,
}
#[derive(Debug)]
/// Error type for [ThreadPool::execute]
pub enum ExecuteError {
/// Pool is full
Full,
/// Pool is closed
Closed,
}
impl std::error::Error for ExecuteError {}
impl std::fmt::Display for ExecuteError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ExecuteError::Full => write!(f, "ThreadPool is full"),
ExecuteError::Closed => write!(f, "ThreadPool is closed"),
}
}
}
impl<T: Send + 'static> ThreadPool<T> {
pub fn size(&self) -> usize {
self.size
}
/// Create a new thread pool with `size` workers.
/// The internal submission channel is bounded to `size`, so when all workers are busy and
/// the channel is full, further submissions will block or return error depending on the flag.
///
/// * `name` - Optional base name for worker threads. If None, "threadpool-worker-" is used.
pub fn new<'a>(size: usize, name: Option<&'a str>) -> Self {
assert!(size > 0, "size must be > 0");
let (tx, rx) = sync_channel::<Job<T>>(size);
let receiver = Arc::new(Mutex::new(rx));
let results = Arc::new(Mutex::new(Vec::new()));
let pending = Arc::new(AtomicUsize::new(0));
let pending_pair = Arc::new((Mutex::new(()), Condvar::new()));
let thread_name = name.unwrap_or("threadpool-worker-");
let mut workers = Vec::with_capacity(size);
for id in 0..size {
let rx_clone = Arc::clone(&receiver);
let results_clone = Arc::clone(&results);
let pending_clone = Arc::clone(&pending);
let pending_pair_clone = Arc::clone(&pending_pair);
let handle = thread::Builder::new()
.name(format!("{}{}", thread_name, id))
.spawn(move || {
loop {
// Lock receiver to call recv. Using a Mutex around Receiver serializes
// the recv calls but is fine for this simple implementation.
let job = {
let guard = rx_clone.lock_blocking();
// If recv returns Err, sender was dropped -> exit thread
guard.recv()
};
match job {
Ok(job) => {
// Execute the job and store result
let res = job();
{
let mut r = results_clone.lock_blocking();
r.push(res);
}
// Decrement pending count and notify join waiters
pending_clone.fetch_sub(1, Ordering::SeqCst);
let (lock, cvar) = &*pending_pair_clone;
let _g = lock.lock_blocking();
cvar.notify_all();
}
Err(_) => {
// Channel closed -> shutdown worker
break;
}
}
}
})
.expect("failed to spawn worker thread");
workers.push(handle);
}
ThreadPool {
sender: Some(tx),
receiver,
workers,
results,
pending,
pending_pair,
size,
}
}
/// Execute a task. If `block_if_full` is true, this call will block when the internal
/// submission channel is full (i.e. all workers busy and buffer full) until space becomes available.
/// If `block_if_full` is false, this returns Err(ExecuteError::Full) when the channel is full.
pub fn execute<F>(&self, job: F, block_if_full: bool) -> Result<(), ExecuteError>
where
F: FnOnce() -> T + Send + 'static,
{
let sender = match &self.sender {
Some(s) => s,
None => return Err(ExecuteError::Closed),
};
// Increase pending count for this submission. If submission fails we will decrement.
self.pending.fetch_add(1, Ordering::SeqCst);
let boxed: Job<T> = Box::new(job);
if block_if_full {
// This will block until there is space in the bounded channel or the channel is closed.
if sender.send(boxed).is_err() {
// Channel closed
self.pending.fetch_sub(1, Ordering::SeqCst);
return Err(ExecuteError::Closed);
}
Ok(())
} else {
match sender.try_send(boxed) {
Ok(()) => Ok(()),
Err(TrySendError::Full(_)) => {
// revert pending increment
self.pending.fetch_sub(1, Ordering::SeqCst);
Err(ExecuteError::Full)
}
Err(TrySendError::Disconnected(_)) => {
self.pending.fetch_sub(1, Ordering::SeqCst);
Err(ExecuteError::Closed)
}
}
}
}
/// Wait until all submitted tasks have completed. This does not shut down the pool; new tasks
/// can still be submitted after join returns.
pub fn join(&self) {
// Fast path
if self.pending.load(Ordering::SeqCst) == 0 {
return;
}
let (lock, cvar) = &*self.pending_pair;
let mut guard = lock.lock_blocking();
while self.pending.load(Ordering::SeqCst) != 0 {
guard = match cvar.wait(guard) {
Ok(g) => g,
Err(poisoned) => poisoned.into_inner(),
};
}
}
/// Wait until all submitted tasks have completed, then return the results.
pub fn into_results(self) -> Vec<T> {
self.join();
let mut results = self.results.lock_blocking();
results.split_off(0)
}
}
impl<T: Send + 'static> Drop for ThreadPool<T> {
fn drop(&mut self) {
// Close sender so worker threads exit recv loop
self.sender.take();
// Dropping the sender (SyncSender) happens above; but to ensure we close the channel we
// explicitly drop any remaining clones by letting sender go out of scope.
// Join worker threads
while let Some(handle) = self.workers.pop() {
let _ = handle.join();
}
}
}