From 539aba98e3114060bb3abc7724aeedb5dab04f72 Mon Sep 17 00:00:00 2001 From: lifegpc Date: Wed, 28 Sep 2022 00:01:48 +0000 Subject: [PATCH] Fix task manager not works well. ( Fix #268 --- src/download.rs | 119 +++++++++++++++++--------------------------- src/ext/any.rs | 6 --- src/ext/mod.rs | 1 - src/task_manager.rs | 52 ++++--------------- 4 files changed, 55 insertions(+), 123 deletions(-) delete mode 100644 src/ext/any.rs diff --git a/src/download.rs b/src/download.rs index 1c76120..4420f5c 100644 --- a/src/download.rs +++ b/src/download.rs @@ -13,7 +13,6 @@ use crate::downloader::DownloaderHelper; use crate::downloader::DownloaderResult; use crate::downloader::LocalFile; use crate::error::PixivDownloaderError; -use crate::ext::any::AsAny; use crate::ext::try_err::TryErr; use crate::fanbox::article::block::FanboxArticleBlock; use crate::fanbox::article::url_embed::FanboxArticleUrlEmbed; @@ -40,7 +39,6 @@ use std::fs::create_dir_all; use std::ops::Deref; use std::path::PathBuf; use std::sync::Arc; -use tokio::task::JoinHandle; impl Main { pub async fn download(&mut self) -> i32 { @@ -122,20 +120,16 @@ impl Main { let mut re = 0; tasks.join().await; let tasks = tasks.take_finished_tasks(); - for mut task in tasks { - let task = task.as_any_mut(); - if let Some(task) = task.downcast_mut::>>() - { - let result = match task.await { - Ok(result) => result, - Err(e) => Err(PixivDownloaderError::from(e)), - }; - match result { - Ok(_) => {} - Err(e) => { - println!("{} {}", gettext("Failed to download post:"), e); - re = 1; - } + for task in tasks { + let result = match task.await { + Ok(result) => result, + Err(e) => Err(PixivDownloaderError::from(e)), + }; + match result { + Ok(_) => {} + Err(e) => { + println!("{} {}", gettext("Failed to download post:"), e); + re = 1; } } } @@ -290,10 +284,6 @@ pub async fn download_artwork( tasks.join().await; let mut tasks = tasks.take_finished_tasks(); let task = tasks.get_mut(0).try_err(gettext("No finished task."))?; - let task = task.as_any_mut(); - let task = task - .downcast_mut::>>() - .try_err("Failed to downcast task.")?; task.await??; #[cfg(feature = "ugoira")] { @@ -381,16 +371,13 @@ pub async fn download_artwork( } tasks.join().await; let tasks = tasks.take_finished_tasks(); - for mut task in tasks { - let t = task.as_any_mut(); - if let Some(task) = t.downcast_mut::>>() { - let r = task.await; - let r = match r { - Ok(r) => r, - Err(e) => Err(PixivDownloaderError::from(e)), - }; - concat_pixiv_downloader_error!(re, r); - } + for task in tasks { + let r = task.await; + let r = match r { + Ok(r) => r, + Err(e) => Err(PixivDownloaderError::from(e)), + }; + concat_pixiv_downloader_error!(re, r); } return re; } else if pages_data.is_some() { @@ -415,16 +402,13 @@ pub async fn download_artwork( } let mut re = Ok(()); let tasks = tasks.take_finished_tasks(); - for mut task in tasks { - let t = task.as_any_mut(); - if let Some(task) = t.downcast_mut::>>() { - let r = task.await; - let r = match r { - Ok(r) => r, - Err(e) => Err(PixivDownloaderError::from(e)), - }; - concat_pixiv_downloader_error!(re, r); - } + for task in tasks { + let r = task.await; + let r = match r { + Ok(r) => r, + Err(e) => Err(PixivDownloaderError::from(e)), + }; + concat_pixiv_downloader_error!(re, r); } return re; } else { @@ -447,10 +431,6 @@ pub async fn download_artwork( tasks.join().await; let mut tasks = tasks.take_finished_tasks(); let task = tasks.get_mut(0).try_err(gettext("No tasks finished."))?; - let task = task.as_any_mut(); - let task = task - .downcast_mut::>>() - .try_err("Failed to downcast the result.")?; task.await??; } Ok(()) @@ -744,29 +724,23 @@ pub async fn download_fanbox_post( } tasks.join().await; let tasks = tasks.take_finished_tasks(); - for mut task in tasks { - let task = task.as_any_mut(); - if let Some(task) = task.downcast_mut::>>() { - let r = task.await; - let r = match r { - Ok(r) => r, - Err(e) => Err(PixivDownloaderError::from(e)), - }; - concat_pixiv_downloader_error!(re, r); - } + for task in tasks { + let r = task.await; + let r = match r { + Ok(r) => r, + Err(e) => Err(PixivDownloaderError::from(e)), + }; + concat_pixiv_downloader_error!(re, r); } ptasks.join().await; let ptasks = ptasks.take_finished_tasks(); - for mut task in ptasks { - let task = task.as_any_mut(); - if let Some(task) = task.downcast_mut::>>() { - let r = task.await; - let r = match r { - Ok(r) => r, - Err(e) => Err(PixivDownloaderError::from(e)), - }; - concat_pixiv_downloader_error!(re, r); - } + for task in ptasks { + let r = task.await; + let r = match r { + Ok(r) => r, + Err(e) => Err(PixivDownloaderError::from(e)), + }; + concat_pixiv_downloader_error!(re, r); } re } @@ -890,16 +864,13 @@ pub async fn download_fanbox_creator_info( tasks.join().await; let mut re = Ok(()); let tasks = tasks.take_finished_tasks(); - for mut task in tasks { - let task = task.as_any_mut(); - if let Some(task) = task.downcast_mut::>>() { - let r = task.await; - let r = match r { - Ok(r) => r, - Err(e) => Err(PixivDownloaderError::from(e)), - }; - concat_pixiv_downloader_error!(re, r); - } + for task in tasks { + let r = task.await; + let r = match r { + Ok(r) => r, + Err(e) => Err(PixivDownloaderError::from(e)), + }; + concat_pixiv_downloader_error!(re, r); } re } diff --git a/src/ext/any.rs b/src/ext/any.rs deleted file mode 100644 index e9646b1..0000000 --- a/src/ext/any.rs +++ /dev/null @@ -1,6 +0,0 @@ -use std::any::Any; - -pub trait AsAny { - fn as_any(&self) -> &T; - fn as_any_mut(&mut self) -> &mut T; -} diff --git a/src/ext/mod.rs b/src/ext/mod.rs index 6d83387..7588f22 100644 --- a/src/ext/mod.rs +++ b/src/ext/mod.rs @@ -1,4 +1,3 @@ -pub mod any; pub mod atomic; pub mod cstr; #[cfg(feature = "flagset")] diff --git a/src/task_manager.rs b/src/task_manager.rs index d3ef50d..493f9c4 100644 --- a/src/task_manager.rs +++ b/src/task_manager.rs @@ -1,11 +1,9 @@ -use crate::ext::any::AsAny; use crate::ext::replace::ReplaceWith; use crate::ext::replace::ReplaceWith2; use crate::ext::rw_lock::GetRwLock; use crate::opthelper::get_helper; use futures_util::lock::Mutex; use indicatif::MultiProgress; -use std::any::Any; use std::future::Future; use std::sync::Arc; use std::sync::RwLock; @@ -21,30 +19,6 @@ lazy_static! { static ref PROGRESS_BAR: Arc = Arc::new(MultiProgress::new()); } -pub trait IsFinished { - fn is_finished(&self) -> bool; -} - -impl IsFinished for JoinHandle { - fn is_finished(&self) -> bool { - self.is_finished() - } -} - -pub trait IsFinishedAny: IsFinished + Any {} - -impl IsFinishedAny for T where T: IsFinished + Any {} - -impl AsAny for Box { - fn as_any(&self) -> &(dyn Any + Send + Sync + 'static) { - self - } - - fn as_any_mut(&mut self) -> &mut (dyn Any + Send + Sync + 'static) { - self - } -} - pub trait GetMaxCount { fn get_max_count(&self) -> usize; } @@ -82,17 +56,17 @@ impl GetMaxCount for MaxDownloadPostTasks { } /// Task manager -pub struct TaskManager { +pub struct TaskManager { /// Current running task - tasks: RwLock>>, + tasks: RwLock>>, /// Finished task - finished_tasks: RwLock>>, + finished_tasks: RwLock>>, /// Total task count task_count: Arc>, max_count: Box, } -impl TaskManager { +impl TaskManager { /// Create a new instance pub fn new( task_count: Arc>, @@ -114,9 +88,8 @@ impl TaskManager { /// Add a new task. pub async fn add_task(&self, future: F) where - F: Future + Send + 'static, + F: Future + Send + 'static, F::Output: Send + 'static, - JoinHandle: IsFinishedAny + Send + Sync + 'static, { let total_count = self.max_count.get_max_count(); loop { @@ -136,9 +109,7 @@ impl TaskManager { self.tasks.replace_with2(new_tasks); count.replace_with(new_count); if *count < total_count { - self.tasks - .get_mut() - .push(Box::new(tokio::task::spawn(future))); + self.tasks.get_mut().push(tokio::task::spawn(future)); count.replace_with(*count + 1); break; } @@ -150,9 +121,8 @@ impl TaskManager { /// Try add a new task, if queue is full, run future on local thread pub async fn add_task_else_run_local(&self, future: F) -> Option where - F: Future + Send + 'static, + F: Future + Send + 'static, F::Output: Send + 'static, - JoinHandle: IsFinishedAny + Send + Sync + 'static, { let total_count = self.max_count.get_max_count(); { @@ -171,9 +141,7 @@ impl TaskManager { self.tasks.replace_with2(new_tasks); count.replace_with(new_count); if *count < total_count { - self.tasks - .get_mut() - .push(Box::new(tokio::task::spawn(future))); + self.tasks.get_mut().push(tokio::task::spawn(future)); count.replace_with(*count + 1); return None; } @@ -208,7 +176,7 @@ impl TaskManager { } /// Take all finished tasks - pub fn take_finished_tasks(&self) -> Vec> { + pub fn take_finished_tasks(&self) -> Vec> { self.finished_tasks.replace_with2(Vec::new()) } } @@ -225,7 +193,7 @@ pub fn get_total_post_task_count() -> Arc> { Arc::clone(&TOTAL_POST_TASK_COUNT) } -impl Default for TaskManager { +impl Default for TaskManager { fn default() -> Self { Self::new(get_total_download_task_count(), MaxDownloadTasks::new()) }