mirror of
https://github.com/lifegpc/pixiv_downloader.git
synced 2026-06-26 21:57:06 +08:00
Fix task manager not works well. ( Fix #268
This commit is contained in:
119
src/download.rs
119
src/download.rs
@@ -13,7 +13,6 @@ use crate::downloader::DownloaderHelper;
|
||||
use crate::downloader::DownloaderResult;
|
||||
use crate::downloader::LocalFile;
|
||||
use crate::error::PixivDownloaderError;
|
||||
use crate::ext::any::AsAny;
|
||||
use crate::ext::try_err::TryErr;
|
||||
use crate::fanbox::article::block::FanboxArticleBlock;
|
||||
use crate::fanbox::article::url_embed::FanboxArticleUrlEmbed;
|
||||
@@ -40,7 +39,6 @@ use std::fs::create_dir_all;
|
||||
use std::ops::Deref;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
impl Main {
|
||||
pub async fn download(&mut self) -> i32 {
|
||||
@@ -122,20 +120,16 @@ impl Main {
|
||||
let mut re = 0;
|
||||
tasks.join().await;
|
||||
let tasks = tasks.take_finished_tasks();
|
||||
for mut task in tasks {
|
||||
let task = task.as_any_mut();
|
||||
if let Some(task) = task.downcast_mut::<JoinHandle<Result<(), PixivDownloaderError>>>()
|
||||
{
|
||||
let result = match task.await {
|
||||
Ok(result) => result,
|
||||
Err(e) => Err(PixivDownloaderError::from(e)),
|
||||
};
|
||||
match result {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
println!("{} {}", gettext("Failed to download post:"), e);
|
||||
re = 1;
|
||||
}
|
||||
for task in tasks {
|
||||
let result = match task.await {
|
||||
Ok(result) => result,
|
||||
Err(e) => Err(PixivDownloaderError::from(e)),
|
||||
};
|
||||
match result {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
println!("{} {}", gettext("Failed to download post:"), e);
|
||||
re = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -290,10 +284,6 @@ pub async fn download_artwork(
|
||||
tasks.join().await;
|
||||
let mut tasks = tasks.take_finished_tasks();
|
||||
let task = tasks.get_mut(0).try_err(gettext("No finished task."))?;
|
||||
let task = task.as_any_mut();
|
||||
let task = task
|
||||
.downcast_mut::<JoinHandle<Result<(), PixivDownloaderError>>>()
|
||||
.try_err("Failed to downcast task.")?;
|
||||
task.await??;
|
||||
#[cfg(feature = "ugoira")]
|
||||
{
|
||||
@@ -381,16 +371,13 @@ pub async fn download_artwork(
|
||||
}
|
||||
tasks.join().await;
|
||||
let tasks = tasks.take_finished_tasks();
|
||||
for mut task in tasks {
|
||||
let t = task.as_any_mut();
|
||||
if let Some(task) = t.downcast_mut::<JoinHandle<Result<(), PixivDownloaderError>>>() {
|
||||
let r = task.await;
|
||||
let r = match r {
|
||||
Ok(r) => r,
|
||||
Err(e) => Err(PixivDownloaderError::from(e)),
|
||||
};
|
||||
concat_pixiv_downloader_error!(re, r);
|
||||
}
|
||||
for task in tasks {
|
||||
let r = task.await;
|
||||
let r = match r {
|
||||
Ok(r) => r,
|
||||
Err(e) => Err(PixivDownloaderError::from(e)),
|
||||
};
|
||||
concat_pixiv_downloader_error!(re, r);
|
||||
}
|
||||
return re;
|
||||
} else if pages_data.is_some() {
|
||||
@@ -415,16 +402,13 @@ pub async fn download_artwork(
|
||||
}
|
||||
let mut re = Ok(());
|
||||
let tasks = tasks.take_finished_tasks();
|
||||
for mut task in tasks {
|
||||
let t = task.as_any_mut();
|
||||
if let Some(task) = t.downcast_mut::<JoinHandle<Result<(), PixivDownloaderError>>>() {
|
||||
let r = task.await;
|
||||
let r = match r {
|
||||
Ok(r) => r,
|
||||
Err(e) => Err(PixivDownloaderError::from(e)),
|
||||
};
|
||||
concat_pixiv_downloader_error!(re, r);
|
||||
}
|
||||
for task in tasks {
|
||||
let r = task.await;
|
||||
let r = match r {
|
||||
Ok(r) => r,
|
||||
Err(e) => Err(PixivDownloaderError::from(e)),
|
||||
};
|
||||
concat_pixiv_downloader_error!(re, r);
|
||||
}
|
||||
return re;
|
||||
} else {
|
||||
@@ -447,10 +431,6 @@ pub async fn download_artwork(
|
||||
tasks.join().await;
|
||||
let mut tasks = tasks.take_finished_tasks();
|
||||
let task = tasks.get_mut(0).try_err(gettext("No tasks finished."))?;
|
||||
let task = task.as_any_mut();
|
||||
let task = task
|
||||
.downcast_mut::<JoinHandle<Result<(), PixivDownloaderError>>>()
|
||||
.try_err("Failed to downcast the result.")?;
|
||||
task.await??;
|
||||
}
|
||||
Ok(())
|
||||
@@ -744,29 +724,23 @@ pub async fn download_fanbox_post(
|
||||
}
|
||||
tasks.join().await;
|
||||
let tasks = tasks.take_finished_tasks();
|
||||
for mut task in tasks {
|
||||
let task = task.as_any_mut();
|
||||
if let Some(task) = task.downcast_mut::<JoinHandle<Result<(), PixivDownloaderError>>>() {
|
||||
let r = task.await;
|
||||
let r = match r {
|
||||
Ok(r) => r,
|
||||
Err(e) => Err(PixivDownloaderError::from(e)),
|
||||
};
|
||||
concat_pixiv_downloader_error!(re, r);
|
||||
}
|
||||
for task in tasks {
|
||||
let r = task.await;
|
||||
let r = match r {
|
||||
Ok(r) => r,
|
||||
Err(e) => Err(PixivDownloaderError::from(e)),
|
||||
};
|
||||
concat_pixiv_downloader_error!(re, r);
|
||||
}
|
||||
ptasks.join().await;
|
||||
let ptasks = ptasks.take_finished_tasks();
|
||||
for mut task in ptasks {
|
||||
let task = task.as_any_mut();
|
||||
if let Some(task) = task.downcast_mut::<JoinHandle<Result<(), PixivDownloaderError>>>() {
|
||||
let r = task.await;
|
||||
let r = match r {
|
||||
Ok(r) => r,
|
||||
Err(e) => Err(PixivDownloaderError::from(e)),
|
||||
};
|
||||
concat_pixiv_downloader_error!(re, r);
|
||||
}
|
||||
for task in ptasks {
|
||||
let r = task.await;
|
||||
let r = match r {
|
||||
Ok(r) => r,
|
||||
Err(e) => Err(PixivDownloaderError::from(e)),
|
||||
};
|
||||
concat_pixiv_downloader_error!(re, r);
|
||||
}
|
||||
re
|
||||
}
|
||||
@@ -890,16 +864,13 @@ pub async fn download_fanbox_creator_info(
|
||||
tasks.join().await;
|
||||
let mut re = Ok(());
|
||||
let tasks = tasks.take_finished_tasks();
|
||||
for mut task in tasks {
|
||||
let task = task.as_any_mut();
|
||||
if let Some(task) = task.downcast_mut::<JoinHandle<Result<(), PixivDownloaderError>>>() {
|
||||
let r = task.await;
|
||||
let r = match r {
|
||||
Ok(r) => r,
|
||||
Err(e) => Err(PixivDownloaderError::from(e)),
|
||||
};
|
||||
concat_pixiv_downloader_error!(re, r);
|
||||
}
|
||||
for task in tasks {
|
||||
let r = task.await;
|
||||
let r = match r {
|
||||
Ok(r) => r,
|
||||
Err(e) => Err(PixivDownloaderError::from(e)),
|
||||
};
|
||||
concat_pixiv_downloader_error!(re, r);
|
||||
}
|
||||
re
|
||||
}
|
||||
|
||||
@@ -1,6 +0,0 @@
|
||||
use std::any::Any;
|
||||
|
||||
pub trait AsAny<T: Any + ?Sized> {
|
||||
fn as_any(&self) -> &T;
|
||||
fn as_any_mut(&mut self) -> &mut T;
|
||||
}
|
||||
@@ -1,4 +1,3 @@
|
||||
pub mod any;
|
||||
pub mod atomic;
|
||||
pub mod cstr;
|
||||
#[cfg(feature = "flagset")]
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
use crate::ext::any::AsAny;
|
||||
use crate::ext::replace::ReplaceWith;
|
||||
use crate::ext::replace::ReplaceWith2;
|
||||
use crate::ext::rw_lock::GetRwLock;
|
||||
use crate::opthelper::get_helper;
|
||||
use futures_util::lock::Mutex;
|
||||
use indicatif::MultiProgress;
|
||||
use std::any::Any;
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
@@ -21,30 +19,6 @@ lazy_static! {
|
||||
static ref PROGRESS_BAR: Arc<MultiProgress> = Arc::new(MultiProgress::new());
|
||||
}
|
||||
|
||||
pub trait IsFinished {
|
||||
fn is_finished(&self) -> bool;
|
||||
}
|
||||
|
||||
impl<T> IsFinished for JoinHandle<T> {
|
||||
fn is_finished(&self) -> bool {
|
||||
self.is_finished()
|
||||
}
|
||||
}
|
||||
|
||||
pub trait IsFinishedAny: IsFinished + Any {}
|
||||
|
||||
impl<T> IsFinishedAny for T where T: IsFinished + Any {}
|
||||
|
||||
impl AsAny<dyn Any + Send + Sync + 'static> for Box<dyn IsFinishedAny + Send + Sync> {
|
||||
fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) {
|
||||
self
|
||||
}
|
||||
|
||||
fn as_any_mut(&mut self) -> &mut (dyn Any + Send + Sync + 'static) {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
pub trait GetMaxCount {
|
||||
fn get_max_count(&self) -> usize;
|
||||
}
|
||||
@@ -82,17 +56,17 @@ impl GetMaxCount for MaxDownloadPostTasks {
|
||||
}
|
||||
|
||||
/// Task manager
|
||||
pub struct TaskManager {
|
||||
pub struct TaskManager<T> {
|
||||
/// Current running task
|
||||
tasks: RwLock<Vec<Box<dyn IsFinishedAny + Send + Sync>>>,
|
||||
tasks: RwLock<Vec<JoinHandle<T>>>,
|
||||
/// Finished task
|
||||
finished_tasks: RwLock<Vec<Box<dyn IsFinishedAny + Send + Sync>>>,
|
||||
finished_tasks: RwLock<Vec<JoinHandle<T>>>,
|
||||
/// Total task count
|
||||
task_count: Arc<Mutex<usize>>,
|
||||
max_count: Box<dyn GetMaxCount + Send + Sync>,
|
||||
}
|
||||
|
||||
impl TaskManager {
|
||||
impl<O> TaskManager<O> {
|
||||
/// Create a new instance
|
||||
pub fn new<T: GetMaxCount + Send + Sync + 'static>(
|
||||
task_count: Arc<Mutex<usize>>,
|
||||
@@ -114,9 +88,8 @@ impl TaskManager {
|
||||
/// Add a new task.
|
||||
pub async fn add_task<F>(&self, future: F)
|
||||
where
|
||||
F: Future + Send + 'static,
|
||||
F: Future<Output = O> + Send + 'static,
|
||||
F::Output: Send + 'static,
|
||||
JoinHandle<F::Output>: IsFinishedAny + Send + Sync + 'static,
|
||||
{
|
||||
let total_count = self.max_count.get_max_count();
|
||||
loop {
|
||||
@@ -136,9 +109,7 @@ impl TaskManager {
|
||||
self.tasks.replace_with2(new_tasks);
|
||||
count.replace_with(new_count);
|
||||
if *count < total_count {
|
||||
self.tasks
|
||||
.get_mut()
|
||||
.push(Box::new(tokio::task::spawn(future)));
|
||||
self.tasks.get_mut().push(tokio::task::spawn(future));
|
||||
count.replace_with(*count + 1);
|
||||
break;
|
||||
}
|
||||
@@ -150,9 +121,8 @@ impl TaskManager {
|
||||
/// Try add a new task, if queue is full, run future on local thread
|
||||
pub async fn add_task_else_run_local<F>(&self, future: F) -> Option<F::Output>
|
||||
where
|
||||
F: Future + Send + 'static,
|
||||
F: Future<Output = O> + Send + 'static,
|
||||
F::Output: Send + 'static,
|
||||
JoinHandle<F::Output>: IsFinishedAny + Send + Sync + 'static,
|
||||
{
|
||||
let total_count = self.max_count.get_max_count();
|
||||
{
|
||||
@@ -171,9 +141,7 @@ impl TaskManager {
|
||||
self.tasks.replace_with2(new_tasks);
|
||||
count.replace_with(new_count);
|
||||
if *count < total_count {
|
||||
self.tasks
|
||||
.get_mut()
|
||||
.push(Box::new(tokio::task::spawn(future)));
|
||||
self.tasks.get_mut().push(tokio::task::spawn(future));
|
||||
count.replace_with(*count + 1);
|
||||
return None;
|
||||
}
|
||||
@@ -208,7 +176,7 @@ impl TaskManager {
|
||||
}
|
||||
|
||||
/// Take all finished tasks
|
||||
pub fn take_finished_tasks(&self) -> Vec<Box<dyn IsFinishedAny + Send + Sync>> {
|
||||
pub fn take_finished_tasks(&self) -> Vec<JoinHandle<O>> {
|
||||
self.finished_tasks.replace_with2(Vec::new())
|
||||
}
|
||||
}
|
||||
@@ -225,7 +193,7 @@ pub fn get_total_post_task_count() -> Arc<Mutex<usize>> {
|
||||
Arc::clone(&TOTAL_POST_TASK_COUNT)
|
||||
}
|
||||
|
||||
impl Default for TaskManager {
|
||||
impl<O> Default for TaskManager<O> {
|
||||
fn default() -> Self {
|
||||
Self::new(get_total_download_task_count(), MaxDownloadTasks::new())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user