diff --git a/setup.py b/setup.py index 58372e0..03ace49 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="tiddl", - version="1.6.0", + version="1.7.0", description="TIDDL (Tidal Downloader) is a Python CLI application that allows downloading Tidal tracks.", long_description=open('README.md', encoding="utf-8").read(), long_description_content_type='text/markdown', diff --git a/tiddl/__init__.py b/tiddl/__init__.py index 0900c57..ee58645 100644 --- a/tiddl/__init__.py +++ b/tiddl/__init__.py @@ -6,7 +6,7 @@ from .api import TidalApi from .auth import getDeviceAuth, getToken, refreshToken from .config import Config, HOME_DIRECTORY -from .download import downloadTrackStream, downloadCover +from .download import downloadTrackStream, Cover from .parser import QUALITY_ARGS, parser from .types import TRACK_QUALITY, TrackQuality, Track from .utils import ( @@ -19,6 +19,8 @@ initLogging, ) +SAVE_COVER = True + def main(): args = parser.parse_args() @@ -121,13 +123,15 @@ def main(): ) def downloadTrack( - track: Track, file_template: str, skip_existing=True, sleep=False, playlist="" + track: Track, + file_template: str, + skip_existing=True, + sleep=False, + playlist="", + cover_data=b"", ) -> tuple[str, str]: file_dir, file_name = formatFilename(file_template, track, playlist) - # it will stop detecting existing file for other extensions. - # we need to store track `id + quality` in metadata to differentiate tracks - # TODO: create better existing file detecting ✨ file_path = f"{download_path}/{file_dir}/{file_name}" if skip_existing and ( os.path.isfile(file_path + ".m4a") or os.path.isfile(file_path + ".flac") @@ -138,7 +142,11 @@ def downloadTrack( if sleep: sleep_time = randint(5, 15) / 10 + 1 logger.info(f"sleeping for {sleep_time}s") - time.sleep(sleep_time) + try: + time.sleep(sleep_time) + except KeyboardInterrupt: + logger.info("stopping...") + exit() stream = api.getTrackStream(track["id"], track_quality) quality = TRACK_QUALITY[stream["audioQuality"]] @@ -162,13 +170,18 @@ def downloadTrack( stream["manifest"], stream["manifestMimeType"], ) - try: - setMetadata(track_path, track) - except ValueError as e: - logger.error(f"setMetadata error: {e}") track_path = convertToFlac(track_path) + if not cover_data: + cover = Cover(track["album"]["cover"]) + cover_data = cover.content + + try: + setMetadata(track_path, track, cover_data) + except ValueError as e: + logger.error(f"could not set metadata. {e}") + logger.info(f"track saved as {track_path}") return file_dir, file_name @@ -180,7 +193,7 @@ def downloadAlbum(album_id: str | int, skip_existing: bool): # i dont know if limit 100 is suspicious # but i will leave it here album_items = api.getAlbumItems(album_id, limit=100) - file_dir = "" + album_cover = Cover(album["cover"]) for item in album_items["items"]: track = item["item"] @@ -189,10 +202,11 @@ def downloadAlbum(album_id: str | int, skip_existing: bool): file_template=config["settings"]["album_template"], skip_existing=skip_existing, sleep=True, + cover_data=album_cover.content, ) - if file_dir: - downloadCover(album["cover"], f"{download_path}/{file_dir}") + if SAVE_COVER: + album_cover.save(f"{download_path}/{file_dir}") skip_existing = not args.no_skip failed_input = [] @@ -215,9 +229,13 @@ def downloadAlbum(album_id: str | int, skip_existing: bool): match input_type: case "track": track = api.getTrack(input_id) + downloadTrack( - track, file_template=track_template, skip_existing=skip_existing + track, + file_template=track_template, + skip_existing=skip_existing, ) + continue case "album": @@ -243,16 +261,26 @@ def downloadAlbum(album_id: str | int, skip_existing: bool): playlist = api.getPlaylist(input_id) logger.info(f"playlist: {playlist['title']} ({playlist['url']})") + playlist_cover = Cover( + playlist["squareImage"], 1080 + ) # playlists have 1080x1080 size + playlist_items = api.getPlaylistItems(input_id) + for item in playlist_items["items"]: - downloadTrack( - item["item"], + track = item["item"] + + file_dir, file_name = downloadTrack( + track, file_template=config["settings"]["playlist_template"], skip_existing=skip_existing, sleep=True, playlist=playlist["title"], ) + if SAVE_COVER: + playlist_cover.save(f"{download_path}/{file_dir}") + continue case _: diff --git a/tiddl/download.py b/tiddl/download.py index b2b20cb..6d0d0d3 100644 --- a/tiddl/download.py +++ b/tiddl/download.py @@ -226,7 +226,13 @@ def downloadTrackStream( return file_path -def downloadCover(uid: str, path: str, size=640): +def downloadCover(uid: str, path: str, size=1280): + file = f"{path}/cover.jpg" + + if os.path.isfile(file): + logger.debug(f"cover already exists ({file})") + return + formatted_uid = uid.replace("-", "/") url = f"https://resources.tidal.com/images/{formatted_uid}/{size}x{size}.jpg" @@ -236,10 +242,56 @@ def downloadCover(uid: str, path: str, size=640): logger.error(f"could not download cover. ({req.status_code}) {url}") return - file = f"{path}/cover.jpg" - try: with open(file, "wb") as f: f.write(req.content) except FileNotFoundError as e: logger.error(f"could not save cover. {file} -> {e}") + + +class Cover: + def __init__(self, uid: str, size=1280) -> None: + if size > 1280: + logger.warning( + f"can not set cover size higher than 1280 (user set: {size})" + ) + size = 1280 + + self.uid = uid + + formatted_uid = uid.replace("-", "/") + self.url = ( + f"https://resources.tidal.com/images/{formatted_uid}/{size}x{size}.jpg" + ) + + logger.debug((self.uid, self.url)) + self.content = self.get() + + def get(self) -> bytes: + req = requests.get(self.url) + + if req.status_code != 200: + logger.error(f"could not download cover. ({req.status_code}) {self.url}") + return b"" + + logger.debug("got cover") + + return req.content + + def save(self, path: str): + if not self.content: + logger.error("cover file content is empty") + return + + file = f"{path}/cover.jpg" + + if os.path.isfile(file): + logger.debug(f"cover already exists ({file})") + return + + try: + with open(file, "wb") as f: + logger.debug(file) + f.write(self.content) + except FileNotFoundError as e: + logger.error(f"could not save cover. {file} -> {e}") diff --git a/tiddl/utils.py b/tiddl/utils.py index 9387827..4fe60f7 100644 --- a/tiddl/utils.py +++ b/tiddl/utils.py @@ -4,7 +4,7 @@ import subprocess from typing import TypedDict, Literal, List, get_args -from mutagen.flac import FLAC as MutagenFLAC +from mutagen.flac import FLAC as MutagenFLAC, Picture from mutagen.easymp4 import EasyMP4 as MutagenMP4 from .types.track import Track @@ -58,7 +58,7 @@ def formatFilename(template: str, track: Track, playlist=""): } dirs = template.split("/") - filename = dirs.pop().format(**formatted_track) + filename = sanitizeFileName(dirs.pop().format(**formatted_track)) template_without_filename = "/".join(dirs) formatted_dir = template_without_filename.format(**formatted_track) @@ -70,11 +70,20 @@ def formatFilename(template: str, track: Track, playlist=""): def sanitizeDirName(dir_name: str): # replace invalid characters with an underscore - sanitized_dir = re.sub(r'[<>:"|?*]', "_", dir_name) + sanitized = re.sub(r'[<>:"|?*]', "_", dir_name) # strip whitespace - sanitized_dir = sanitized_dir.strip() + sanitized = sanitized.strip() - return sanitized_dir + return sanitized + + +def sanitizeFileName(file_name: str): + # replace invalid characters with an underscore + sanitized = re.sub(r'[<>:"|?*/\\]', "_", file_name) + # strip whitespace + sanitized = sanitized.strip() + + return sanitized def loadingSymbol(i: int, text: str): @@ -83,13 +92,19 @@ def loadingSymbol(i: int, text: str): print(f"\r{text} {symbol}", end="\r") -def setMetadata(file_path: str, track: Track): +def setMetadata(file_path: str, track: Track, cover_data=b""): _, extension = os.path.splitext(file_path) if extension == ".flac": metadata = MutagenFLAC(file_path) + if cover_data: + picture = Picture() + picture.data = cover_data + picture.mime = "image/jpeg" + metadata.add_picture(picture) elif extension == ".m4a": metadata = MutagenMP4(file_path) + # i dont know if there is a way to add cover for m4a file else: raise ValueError(f"Unknown file extension: {extension}") @@ -124,7 +139,7 @@ def convertToFlac(source_path: str, remove_source=True): if source_extension != ".m4a": return source_path - logger.info(f"converting `{source_path}` to FLAC") + logger.debug(f"converting `{source_path}` to FLAC") command = ["ffmpeg", "-i", source_path, dest_path] result = subprocess.run( command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL