This commit is contained in:
2022-06-11 14:38:03 +00:00
committed by GitHub
parent 25fd3a73fe
commit 98e8e8dd69
4 changed files with 95 additions and 10 deletions

View File

@@ -16,6 +16,7 @@ use reqwest::IntoUrl;
use std::collections::HashMap;
use std::fs::remove_file;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;
use std::ops::DerefMut;
use std::path::Path;
@@ -151,6 +152,15 @@ impl <T: Write + Seek + Send + Sync + ClearFile> DownloaderInternal<T> {
}
}
/// Seek in the file.
/// * `data` - Data
pub fn seek(&self, pos: SeekFrom) -> Result<u64, DownloaderError> {
match self.file.get_mut().deref_mut() {
Some(f) => { Ok(f.seek(pos)?) }
None => { Ok(0) }
}
}
/// Write datas to the file.
/// * `data` - Data
pub fn write(&self, data: &[u8]) -> Result<(), DownloaderError> {

View File

@@ -1,5 +1,6 @@
use crate::downloader::pd_file::PdFileError;
use http::status::StatusCode;
use tokio::time::error::Elapsed;
#[derive(Debug, derive_more::From)]
pub enum DownloaderError {
@@ -8,6 +9,7 @@ pub enum DownloaderError {
IoError(std::io::Error),
String(String),
ErrorStatusCode(StatusCode),
Timeout(Elapsed),
}
impl From<&str> for DownloaderError {

View File

@@ -42,6 +42,8 @@ lazy_static! {
const STATUS_OFFSET: SeekFrom = SeekFrom::Start(10);
/// The offset of the file_size in pd file
const FILE_SIZE_OFFSET: SeekFrom = SeekFrom::Start(12);
/// The offset of the downloaded_file_size in pd file
const DOWNLOADED_FILE_SIZE_OFFSET: SeekFrom = SeekFrom::Start(20);
#[derive(Debug)]
/// The pd file
@@ -121,15 +123,12 @@ impl PdFile {
}
/// Complete the download.
/// After calling this function. The pd file will be removed.
pub fn complete(&self) -> Result<(), PdFileError> {
self.set_completed();
if !self.is_mem_only() {
self.need_saved.qstore(true);
let mut f = self.file.get_mut();
let f = f.as_mut().unwrap();
f.seek(STATUS_OFFSET)?;
f.write_le_u8(self.status.get_ref().int_value())?;
self.need_saved.qstore(false);
self.force_close();
self.remove_pd_file()?;
}
Ok(())
}
@@ -166,6 +165,25 @@ impl PdFile {
}
}
/// Increase the downloaded file size.
/// * `size` - The file size want to added.
///
/// Returns the downloaded file size
pub fn inc(&self, size: u64) -> Result<u64, PdFileError> {
let mut downloaded_size = self.downloaded_file_size.qload();
downloaded_size += size;
self.downloaded_file_size.qstore(downloaded_size);
if !self.is_mem_only() {
self.need_saved.qstore(true);
let mut f = self.file.get_mut();
let f = f.as_mut().unwrap();
f.seek(DOWNLOADED_FILE_SIZE_OFFSET)?;
f.write_le_u64(downloaded_size)?;
self.need_saved.qstore(false);
}
Ok(downloaded_size)
}
#[inline]
/// Returns true if the download is completed.
pub fn is_completed(&self) -> bool {
@@ -327,6 +345,12 @@ impl PdFile {
self.status.replace_with2(PdFileStatus::Downloaded);
}
#[inline]
/// Set status to downloading.
fn set_downloading(&self) {
self.status.replace_with2(PdFileStatus::Downloading);
}
/// Set the file name
/// * `file_name` - The file name. Should not be empty.
pub fn set_file_name<S: AsRef<str> + ?Sized>(&self, file_name: &S) -> Result<(), PdFileError> {
@@ -347,6 +371,8 @@ impl PdFile {
/// Set the target size of the file. If unknown, set this to 0.
/// * `file_size` - The target size of the file.
///
/// This will also set the status to downloading.
pub fn set_file_size(&self, file_size: u64) -> Result<(), PdFileError> {
self.file_size.qstore(file_size);
if !self.is_mem_only() {
@@ -357,6 +383,15 @@ impl PdFile {
f.write_le_u64(file_size)?;
self.need_saved.qstore(false);
}
self.set_downloading();
if !self.is_mem_only() {
self.need_saved.qstore(true);
let mut f = self.file.get_mut();
let f = f.as_mut().unwrap();
f.seek(STATUS_OFFSET)?;
f.write_le_u8(self.status.get_ref().int_value())?;
self.need_saved.qstore(false);
}
Ok(())
}

View File

@@ -8,18 +8,35 @@ use http_content_range::ContentRange;
use reqwest::Response;
use std::ops::Deref;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;
use std::sync::Arc;
/// Create a download tasks in simple thread mode.
pub async fn create_download_tasks_simple<T: Seek + Write + Send + Sync + ClearFile>(d: Arc<DownloaderInternal<T>>) -> Result<(), DownloaderError> {
let start = if d.pd.is_downloading() {
let mut start = if d.pd.is_downloading() {
d.pd.get_downloaded_file_size()
} else {
0
};
let file_size = d.pd.get_file_size();
let mut headers = d.headers.deref().clone();
if start != 0 {
match d.seek(SeekFrom::Start(start)) {
Ok(data) => {
if data != start {
start = 0;
}
}
Err(_) => {
start = 0;
}
}
if start == 0 {
d.seek(SeekFrom::Start(0))?;
d.pd.clear()?;
}
}
if start != 0 {
headers.insert(String::from("Range"), format!("bytes={}-", start));
}
@@ -80,20 +97,41 @@ pub async fn create_download_tasks_simple<T: Seek + Write + Send + Sync + ClearF
/// Handle download process
pub async fn handle_download<T: Seek + Write + Send + Sync + ClearFile>(d: Arc<DownloaderInternal<T>>, re: Response) -> Result<(), DownloaderError> {
let mut stream = re.bytes_stream();
let is_multi = d.is_multi_threads();
loop {
let mut n = stream.next();
let re = tokio::time::timeout(std::time::Duration::from_secs(10), &mut n).await;
match re {
Ok(s) => {
match s {
Some(data) => {}
Some(data) => {
match data {
Ok(data) => {
d.pd.inc(data.len() as u64)?;
d.write(&data)?;
}
Err(e) => {
if !is_multi {
d.pd.clear()?;
}
return Err(DownloaderError::from(e));
}
}
}
None => {
d.pd.complete()?;
if !is_multi {
d.pd.complete()?;
}
break;
}
}
}
Err(_) => {} // TODO: Timed out
Err(e) => {
if !is_multi {
d.pd.clear()?;
}
return Err(DownloaderError::from(e));
}
}
}
Ok(())