Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 支持稍后再看的扫描与下载 #131

Merged
merged 4 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/bili_sync/src/adapter/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod collection;
mod favorite;
mod watch_later;

use std::collections::HashSet;
use std::path::Path;
Expand All @@ -11,12 +12,14 @@ pub use collection::collection_from;
pub use favorite::favorite_from;
use futures::Stream;
use sea_orm::DatabaseConnection;
use watch_later::watch_later_from;

use crate::bilibili::{BiliClient, CollectionItem, VideoInfo};

pub enum Args<'a> {
Favorite { fid: &'a str },
Collection { collection_item: &'a CollectionItem },
WatchLater,
}

pub async fn video_list_from<'a>(
Expand All @@ -28,6 +31,7 @@ pub async fn video_list_from<'a>(
match args {
Args::Favorite { fid } => favorite_from(fid, path, bili_client, connection).await,
Args::Collection { collection_item } => collection_from(collection_item, path, bili_client, connection).await,
Args::WatchLater => watch_later_from(path, bili_client, connection).await,
}
}

Expand Down
195 changes: 195 additions & 0 deletions crates/bili_sync/src/adapter/watch_later.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
use std::collections::HashSet;
use std::path::Path;
use std::pin::Pin;

use anyhow::Result;
use bili_sync_entity::*;
use bili_sync_migration::OnConflict;
use filenamify::filenamify;
use futures::Stream;
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
use sea_orm::{DatabaseConnection, QuerySelect, TransactionTrait};

use super::VideoListModel;
use crate::bilibili::{BiliClient, BiliError, Video, VideoInfo, WatchLater};
use crate::config::TEMPLATE;
use crate::utils::id_time_key;
use crate::utils::model::create_video_pages;
use crate::utils::status::Status;

pub async fn watch_later_from<'a>(
path: &Path,
bili_client: &'a BiliClient,
connection: &DatabaseConnection,
) -> Result<(Box<dyn VideoListModel>, Pin<Box<dyn Stream<Item = VideoInfo> + 'a>>)> {
let watch_later = WatchLater::new(bili_client);
watch_later::Entity::insert(watch_later::ActiveModel {
id: Set(1),
path: Set(path.to_string_lossy().to_string()),
..Default::default()
})
.on_conflict(
OnConflict::column(watch_later::Column::Id)
.update_column(watch_later::Column::Path)
.to_owned(),
)
.exec(connection)
.await?;
Ok((
Box::new(
watch_later::Entity::find()
.filter(watch_later::Column::Id.eq(1))
.one(connection)
.await?
.unwrap(),
),
Box::pin(watch_later.into_video_stream()),
))
}
use async_trait::async_trait;

#[async_trait]
impl VideoListModel for watch_later::Model {
async fn video_count(&self, connection: &DatabaseConnection) -> Result<u64> {
Ok(video::Entity::find()
.filter(video::Column::WatchLaterId.eq(self.id))
.count(connection)
.await?)
}

async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result<Vec<video::Model>> {
Ok(video::Entity::find()
.filter(
video::Column::WatchLaterId
.eq(self.id)
.and(video::Column::Valid.eq(true))
.and(video::Column::DownloadStatus.eq(0))
.and(video::Column::Category.eq(2))
.and(video::Column::SinglePage.is_null()),
)
.all(connection)
.await?)
}

async fn unhandled_video_pages(
&self,
connection: &DatabaseConnection,
) -> Result<Vec<(video::Model, Vec<page::Model>)>> {
Ok(video::Entity::find()
.filter(
video::Column::WatchLaterId
.eq(self.id)
.and(video::Column::Valid.eq(true))
.and(video::Column::DownloadStatus.lt(Status::handled()))
.and(video::Column::Category.eq(2))
.and(video::Column::SinglePage.is_not_null()),
)
.find_with_related(page::Entity)
.all(connection)
.await?)
}

async fn exist_labels(
&self,
videos_info: &[VideoInfo],
connection: &DatabaseConnection,
) -> Result<HashSet<String>> {
let bvids = videos_info.iter().map(|v| v.bvid().to_string()).collect::<Vec<_>>();
Ok(video::Entity::find()
.filter(
video::Column::WatchLaterId
.eq(self.id)
.and(video::Column::Bvid.is_in(bvids)),
)
.select_only()
.columns([video::Column::Bvid, video::Column::Favtime])
.into_tuple()
.all(connection)
.await?
.into_iter()
.map(|(bvid, time)| id_time_key(&bvid, &time))
.collect::<HashSet<_>>())
}

fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option<video::Model>) -> video::ActiveModel {
let mut video_model = video_info.to_model(base_model);
video_model.watch_later_id = Set(Some(self.id));
if let Some(fmt_args) = &video_info.to_fmt_args() {
video_model.path = Set(Path::new(&self.path)
.join(filenamify(
TEMPLATE
.render("video", fmt_args)
.unwrap_or_else(|_| video_info.bvid().to_string()),
))
.to_string_lossy()
.to_string());
}
video_model
}

async fn fetch_videos_detail(
&self,
bili_clent: &BiliClient,
videos_model: Vec<video::Model>,
connection: &DatabaseConnection,
) -> Result<()> {
for video_model in videos_model {
let video = Video::new(bili_clent, video_model.bvid.clone());
let info: Result<_> = async { Ok((video.get_tags().await?, video.get_pages().await?)) }.await;
match info {
Ok((tags, pages_info)) => {
let txn = connection.begin().await?;
// 将分页信息写入数据库
create_video_pages(&pages_info, &video_model, &txn).await?;
// 将页标记和 tag 写入数据库
let mut video_active_model: video::ActiveModel = video_model.into();
video_active_model.single_page = Set(Some(pages_info.len() == 1));
video_active_model.tags = Set(Some(serde_json::to_value(tags).unwrap()));
video_active_model.save(&txn).await?;
txn.commit().await?;
}
Err(e) => {
error!(
"获取视频 {} - {} 的详细信息失败,错误为:{}",
&video_model.bvid, &video_model.name, e
);
if let Some(BiliError::RequestFailed(-404, _)) = e.downcast_ref::<BiliError>() {
let mut video_active_model: video::ActiveModel = video_model.into();
video_active_model.valid = Set(false);
video_active_model.save(connection).await?;
}
continue;
}
};
}
Ok(())
}

fn log_fetch_video_start(&self) {
info!("开始获取稍后再看的视频与分页信息...");
}

fn log_fetch_video_end(&self) {
info!("获取稍后再看的视频与分页信息完成");
}

fn log_download_video_start(&self) {
info!("开始下载稍后再看中所有未处理过的视频...");
}

fn log_download_video_end(&self) {
info!("下载稍后再看中未处理过的视频完成");
}

fn log_refresh_video_start(&self) {
info!("开始扫描稍后再看的新视频...");
}

fn log_refresh_video_end(&self, got_count: usize, new_count: u64) {
info!(
"扫描稍后再看的新视频完成,获取了 {} 条新视频,其中有 {} 条新视频",
got_count, new_count,
);
}
}
2 changes: 1 addition & 1 deletion crates/bili_sync/src/bilibili/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl<'a> Collection<'a> {
break;
},
};
for video_info in videos_info.into_iter(){
for video_info in videos_info{
yield video_info;
}
let fields = match self.collection.collection_type{
Expand Down
2 changes: 1 addition & 1 deletion crates/bili_sync/src/bilibili/favorite_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<'a> FavoriteList<'a> {
break;
},
};
for video_info in videos_info.into_iter(){
for video_info in videos_info{
yield video_info;
}
if videos["data"]["has_more"].is_boolean() && videos["data"]["has_more"].as_bool().unwrap(){
Expand Down
52 changes: 52 additions & 0 deletions crates/bili_sync/src/bilibili/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub use error::BiliError;
pub use favorite_list::FavoriteList;
use favorite_list::Upper;
pub use video::{Dimension, PageInfo, Video};
pub use watch_later::WatchLater;

mod analyzer;
mod client;
Expand All @@ -19,6 +20,7 @@ mod danmaku;
mod error;
mod favorite_list;
mod video;
mod watch_later;

pub(crate) trait Validate {
type Output;
Expand Down Expand Up @@ -81,6 +83,24 @@ pub enum VideoInfo {
pubtime: DateTime<Utc>,
attr: i32,
},
/// 从稍后再看中获取的视频信息
WatchLater {
title: String,
bvid: String,
#[serde(rename = "desc")]
intro: String,
#[serde(rename = "pic")]
cover: String,
#[serde(rename = "owner")]
upper: Upper,
#[serde(with = "ts_seconds")]
ctime: DateTime<Utc>,
#[serde(rename = "add_at", with = "ts_seconds")]
fav_time: DateTime<Utc>,
#[serde(rename = "pubdate", with = "ts_seconds")]
pubtime: DateTime<Utc>,
state: i32,
},
/// 从视频列表中获取的视频信息
Simple {
bvid: String,
Expand All @@ -92,3 +112,35 @@ pub enum VideoInfo {
pubtime: DateTime<Utc>,
},
}

#[cfg(test)]
mod tests {
use futures::{pin_mut, StreamExt};

use super::*;

#[ignore = "only for manual test"]
#[tokio::test]
async fn assert_video_info_type() {
let bili_client = BiliClient::new();
let video = Video::new(&bili_client, "BV1Z54y1C7ZB".to_string());
assert!(matches!(video.get_view_info().await, Ok(VideoInfo::View { .. })));
let collection_item = CollectionItem {
mid: "521722088".to_string(),
sid: "387214".to_string(),
collection_type: CollectionType::Series,
};
let collection = Collection::new(&bili_client, &collection_item);
let stream = collection.into_simple_video_stream();
pin_mut!(stream);
assert!(matches!(stream.next().await, Some(VideoInfo::Simple { .. })));
let favorite = FavoriteList::new(&bili_client, "3084505258".to_string());
let stream = favorite.into_video_stream();
pin_mut!(stream);
assert!(matches!(stream.next().await, Some(VideoInfo::Detail { .. })));
let watch_later = WatchLater::new(&bili_client);
let stream = watch_later.into_video_stream();
pin_mut!(stream);
assert!(matches!(stream.next().await, Some(VideoInfo::WatchLater { .. })));
}
}
48 changes: 48 additions & 0 deletions crates/bili_sync/src/bilibili/watch_later.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use anyhow::Result;
use async_stream::stream;
use futures::Stream;
use serde_json::Value;

use crate::bilibili::{BiliClient, Validate, VideoInfo};
pub struct WatchLater<'a> {
client: &'a BiliClient,
}

impl<'a> WatchLater<'a> {
pub fn new(client: &'a BiliClient) -> Self {
Self { client }
}

async fn get_videos(&self) -> Result<Value> {
self.client
.request(reqwest::Method::GET, "https://api.bilibili.com/x/v2/history/toview")
.send()
.await?
.error_for_status()?
.json::<serde_json::Value>()
.await?
.validate()
}

pub fn into_video_stream(self) -> impl Stream<Item = VideoInfo> + 'a {
stream! {
let Ok(mut videos) = self.get_videos().await else {
error!("Failed to get watch later list");
return;
};
if !videos["data"]["list"].is_array() {
error!("Watch later list is not an array");
}
let videos_info = match serde_json::from_value::<Vec<VideoInfo>>(videos["data"]["list"].take()) {
Ok(v) => v,
Err(e) => {
error!("Failed to parse watch later list: {}", e);
return;
}
};
for video in videos_info {
yield video;
}
}
}
}
Loading