diff --git a/README.md b/README.md index bebd7aa..f4f897c 100644 --- a/README.md +++ b/README.md @@ -175,6 +175,44 @@ There are three parameters for each sensor: - `exclude` should be a list of shows you'd like to exclude, since it's based on your watched history. To find keys to put there, go on trakt.tv, search for a show, click on it, notice the url slug, copy/paste it. So, if I want to hide "Friends", I'll do the steps mentioned above, then land on https://trakt.tv/shows/friends, I'll just have to copy/paste the last part, `friends`, that's it You can also use the Trakt.tv "hidden" function to hide a show from [your calendar](https://trakt.tv/calendars/my/shows) or the [progress page](https://trakt.tv/users//progress) +##### Lists sensor + +Lists sensor allows you to fetch both public and private lists from Trakt, each list will be a sensor. The items in the list will be sorted by their rank on Trakt. + +There are four parameters for each sensor: + + - `friendly_name` **MANDATORY** should be a string for the name of the sensor. This has to be unique for each list. + - `list_id` **MANDATORY** should be the Trakt list ID. For public lists the ID has to be numeric, for private lists the ID can be either the numeric ID or the slug from the URL. To get the numeric ID of a public list, copy the link address of the list before opening it or open the Report List window. This will give you a URL like `https://trakt.tv/lists/2142753`. The `2142753` part is the numeric ID you need to use. + - `private_list` _OPTIONAL_ has to be set to `true` if using your own private list. Default is `false` + - `media_type` _OPTIONAL_ can be used to filter the media type within the list, possible values are `show`, `movie`, `episode`. Default is blank, which will show all media types + - `max_medias` _OPTIONAL_ should be a positive number for how many items to grab. Default is `3` + - `sort_by` _OPTIONAL_ should be a string for how to sort the list. Default is `rank`. Possible values are: + - `rank` - Placement in the list + - `rating` - TMDB rating + - `rating_trakt` - Trakt rating + - `runtime` + - `released` + - `listed_at` - Date the item was added to the list + - `sort_order` _OPTIONAL_ should be a string for the sort order. Possible values are `asc`, `desc`. Default is `asc` + +###### Lists Example +```yaml + lists: + - friendly_name: "Christmas Watchlist" + private_list: True # Set to True if the list is your own private list + list_id: "christmas-watchlist" # Can be the slug, because it's a private list + max_medias: 5 + - friendly_name: "2024 Academy Awards" + list_id: 26885014 + max_medias: 5 + sort_by: rating_trakt # Sort by Trakt user rating instead of lsit rank + sort_order: desc + - friendly_name: "Star Trek Movies" + list_id: 967660 + media_type: "movie" # Filters the list to only show movies + max_medias: 5 +``` + ##### Stats sensors Creates individual sensors giving all of your stats about the movies, shows, and episodes you have watched, collected, and rated. @@ -220,7 +258,7 @@ trakt_tv: - movies_minutes ``` -#### Example +#### Configuration Example For example, adding only the following to `configuration.yaml` will create two sensors. One with the next 10 TV episodes in the next 30 days and another with the next 5 movies coming out in the next 45 days: diff --git a/custom_components/trakt_tv/apis/trakt.py b/custom_components/trakt_tv/apis/trakt.py index 4e9ae7a..877243e 100644 --- a/custom_components/trakt_tv/apis/trakt.py +++ b/custom_components/trakt_tv/apis/trakt.py @@ -17,7 +17,7 @@ from ..const import API_HOST, DOMAIN from ..exception import TraktException from ..models.kind import BASIC_KINDS, UPCOMING_KINDS, TraktKind -from ..models.media import Medias +from ..models.media import Episode, Medias, Movie, Show from ..utils import cache_insert, cache_retrieve, deserialize_json LOGGER = logging.getLogger(__name__) @@ -63,7 +63,7 @@ async def retry_request(self, wait_time, response, method, url, retry, **kwargs) guidance = f"Too many retries, if you find this error, please raise an issue at https://github.com/dylandoamaral/trakt-integration/issues." raise TraktException(f"{error} {guidance}") - async def request(self, method, url, retry=10, **kwargs) -> ClientResponse: + async def request(self, method, url, retry=10, **kwargs) -> dict[str, Any]: """Make a request.""" access_token = await self.async_get_access_token() client_id = self.hass.data[DOMAIN]["configuration"]["client_id"] @@ -390,6 +390,83 @@ async def fetch_recommendations(self, configured_kinds: list[TraktKind]): return res + async def fetch_list( + self, path: str, list_id: str, user_path: bool, max_items: int, media_type: str + ): + """Fetch the list, if user_path is True, the list will be fetched from the user end-point""" + # Add the user path if needed + if user_path: + path = f"users/me/{path}" + + # Replace the list_id in the path + path = path.replace("{list_id}", list_id) + + # Add media type filter to the path + if media_type: + # Check if the media type is supported + if Medias.trakt_to_class(media_type): + path = f"{path}/{media_type}" + else: + LOGGER.warn(f"Filtering list on {media_type} is not supported") + return None + + # Add extended info used for sorting + path = f"{path}?extended=full" + + return await self.request("get", path) + + async def fetch_lists(self, configured_kind: TraktKind): + + # Get config for all lists + configuration = Configuration(data=self.hass.data) + lists = configuration.get_sensor_config(configured_kind.value.identifier) + + # Fetch the lists + data = await gather( + *[ + self.fetch_list( + configured_kind.value.path, + list_config["list_id"], + list_config["private_list"], + list_config["max_medias"], + list_config["media_type"], + ) + for list_config in lists + ] + ) + + # Process the results + language = configuration.get_language() + + res = {} + for list_config, raw_medias in zip(lists, data): + if raw_medias is not None: + medias = [] + for media in raw_medias: + # Get model based on media type in data + media_type = media.get("type") + model = Medias.trakt_to_class(media_type) + + if model: + medias.append(model.from_trakt(media)) + else: + LOGGER.warn( + f"Media type {media_type} in {list_config['friendly_name']} is not supported" + ) + + if not medias: + LOGGER.warn( + f"No entries found for list {list_config['friendly_name']}" + ) + continue + + await gather( + *[media.get_more_information(language) for media in medias] + ) + res[list_config["friendly_name"]] = Medias(medias) + + return {configured_kind: res} + async def fetch_stats(self): # Load data data = await self.request("get", f"users/me/stats") @@ -435,6 +512,9 @@ async def retrieve_data(self): configured_kind=TraktKind.NEXT_TO_WATCH_UPCOMING, only_upcoming=True, ), + "lists": lambda: self.fetch_lists( + configured_kind=TraktKind.LIST, + ), "stats": lambda: self.fetch_stats(), } @@ -459,7 +539,12 @@ async def retrieve_data(self): sources.append(sub_source) coroutine_sources_data.append(source_function.get(sub_source)()) - """ Load user stats """ + """Add the lists sensors""" + if configuration.source_exists("lists"): + sources.append("lists") + coroutine_sources_data.append(source_function.get("lists")()) + + """ Add user stats """ if configuration.source_exists("stats"): sources.append("stats") coroutine_sources_data.append(source_function.get("stats")()) diff --git a/custom_components/trakt_tv/configuration.py b/custom_components/trakt_tv/configuration.py index 7ffc3b4..449beda 100644 --- a/custom_components/trakt_tv/configuration.py +++ b/custom_components/trakt_tv/configuration.py @@ -78,6 +78,12 @@ def recommendation_identifier_exists(self, identifier: str) -> bool: def get_recommendation_max_medias(self, identifier: str) -> int: return self.get_max_medias(identifier, "recommendation") + def get_sensor_config(self, identifier: str) -> list: + try: + return self.conf["sensors"][identifier] + except KeyError: + return [] + def stats_key_exists(self, key: str) -> bool: return key in self.conf["sensors"]["stats"] diff --git a/custom_components/trakt_tv/const.py b/custom_components/trakt_tv/const.py index 54dc9f1..32c7ee6 100644 --- a/custom_components/trakt_tv/const.py +++ b/custom_components/trakt_tv/const.py @@ -6,8 +6,6 @@ OAUTH2_AUTHORIZE = "https://trakt.tv/oauth/authorize" OAUTH2_TOKEN = f"{API_HOST}/oauth/token" -UPCOMING_DATA_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" - TMDB_HOST = "http://api.tmdb.org" TMDB_TOKEN = "0eee347e2333d7a97b724106353ca42f" @@ -173,3 +171,7 @@ "za", "zu", ] + +SORT_BY_OPTIONS = ["rating", "rating_trakt", "rank", "runtime", "released", "listed_at"] + +SORT_HOW_OPTIONS = ["asc", "desc"] diff --git a/custom_components/trakt_tv/models/kind.py b/custom_components/trakt_tv/models/kind.py index b098dba..b6bf738 100644 --- a/custom_components/trakt_tv/models/kind.py +++ b/custom_components/trakt_tv/models/kind.py @@ -7,9 +7,9 @@ @dataclass class CalendarInformation: identifier: str - name: str + name: str | None path: str - model: Media + model: Media | None class TraktKind(Enum): @@ -23,6 +23,7 @@ class TraktKind(Enum): NEXT_TO_WATCH_UPCOMING = CalendarInformation( "only_upcoming", "Only Upcoming", "shows", Show ) + LIST = CalendarInformation("lists", None, "lists/{list_id}/items", None) @classmethod def from_string(cls, string): diff --git a/custom_components/trakt_tv/models/media.py b/custom_components/trakt_tv/models/media.py index ae866ad..41ab329 100644 --- a/custom_components/trakt_tv/models/media.py +++ b/custom_components/trakt_tv/models/media.py @@ -1,11 +1,10 @@ from abc import ABC, abstractmethod, abstractstaticmethod from dataclasses import dataclass, field -from datetime import datetime, timezone -from typing import Any, Dict, List, Optional +from datetime import datetime +from typing import Any, Dict, List, Optional, Type from custom_components.trakt_tv.apis.tmdb import get_movie_data, get_show_data - -from ..const import UPCOMING_DATA_FORMAT +from custom_components.trakt_tv.utils import parse_utc_date first_item = { "title_default": "$title", @@ -73,6 +72,7 @@ def common_information(self) -> Dict[str, Any]: "fanart": self.fanart, "genres": self.genres, "rating": self.rating, + "rating_trakt": self.rating_trakt, "studio": self.studio, } @@ -99,6 +99,9 @@ class Movie(Media): runtime: Optional[int] = None studio: Optional[str] = None released: Optional[datetime] = None # This one is actually mandatory + rank: Optional[int] = None + listed_at: Optional[datetime] = None + rating_trakt: Optional[int] = None @staticmethod def from_trakt(data) -> "Movie": @@ -107,16 +110,13 @@ def from_trakt(data) -> "Movie": """ movie = data if data.get("title") else data["movie"] - released = ( - datetime.fromisoformat(data["released"]).replace(tzinfo=timezone.utc) - if data.get("released") - else None - ) - return Movie( name=movie["title"], - released=released, + released=parse_utc_date(data.get("released")), ids=Identifiers.from_trakt(movie), + rank=data.get("rank"), + listed_at=parse_utc_date(data.get("listed_at")), + rating_trakt=movie.get("rating"), ) async def get_more_information(self, language): @@ -143,9 +143,7 @@ async def get_more_information(self, language): self.studio = production_companies[0].get("name") if not self.released: if data.get("release_date"): - self.released = datetime.fromisoformat(data["release_date"]).replace( - tzinfo=timezone.utc - ) + self.released = parse_utc_date(data.get("release_date")) else: self.released = datetime.min @@ -197,6 +195,10 @@ class Show(Media): studio: Optional[str] = None episode: Optional[Episode] = None released: Optional[datetime] = None + runtime: Optional[int] = None + rank: Optional[int] = None + listed_at: Optional[datetime] = None + rating_trakt: Optional[int] = None @staticmethod def from_trakt(data) -> "Show": @@ -205,21 +207,17 @@ def from_trakt(data) -> "Show": """ show = data if data.get("title") else data["show"] - released = ( - datetime.strptime(data["first_aired"], UPCOMING_DATA_FORMAT).replace( - tzinfo=timezone.utc - ) - if data.get("first_aired") - else None - ) - episode = Episode.from_trakt(data["episode"]) if data.get("episode") else None return Show( name=show["title"], ids=Identifiers.from_trakt(show), - released=released, + released=parse_utc_date(data.get("first_aired")), episode=episode, + rank=data.get("rank"), + listed_at=parse_utc_date(data.get("listed_at")), + runtime=show.get("runtime"), + rating_trakt=show.get("rating"), ) def update_common_information(self, data: Dict[str, Any]): @@ -285,13 +283,24 @@ def to_homeassistant(self) -> Dict[str, Any]: class Medias: items: List[Media] - def to_homeassistant(self) -> Dict[str, Any]: + def to_homeassistant(self, sort_by="released", sort_order="asc") -> Dict[str, Any]: """ Convert the List of medias to recommendation data. :return: The dictionary containing all necessary information for upcoming media card """ - medias = sorted(self.items, key=lambda media: media.released) + medias = sorted( + self.items, + key=lambda media: getattr(media, sort_by), + reverse=sort_order == "desc", + ) medias = [media.to_homeassistant() for media in medias] return [first_item] + medias + + @staticmethod + def trakt_to_class( + trakt_type: str, + ) -> Type[Show] | Type[Movie] | Type[Episode] | None: + type_to_class = {"show": Show, "episode": Show, "movie": Movie} + return type_to_class.get(trakt_type, None) diff --git a/custom_components/trakt_tv/schema.py b/custom_components/trakt_tv/schema.py index 194836a..495924e 100644 --- a/custom_components/trakt_tv/schema.py +++ b/custom_components/trakt_tv/schema.py @@ -6,7 +6,7 @@ from homeassistant.helpers import config_validation as cv from voluptuous import ALLOW_EXTRA, PREVENT_EXTRA, In, Required, Schema -from .const import DOMAIN, LANGUAGE_CODES +from .const import DOMAIN, LANGUAGE_CODES, SORT_BY_OPTIONS, SORT_HOW_OPTIONS from .models.kind import BASIC_KINDS, NEXT_TO_WATCH_KINDS, TraktKind @@ -41,6 +41,7 @@ def sensors_schema() -> Dict[str, Any]: "all_upcoming": upcoming_schema(), "next_to_watch": next_to_watch_schema(), "recommendation": recommendation_schema(), + "lists": Schema([lists_schema()]), "stats": Schema(stats_schema()), } @@ -77,6 +78,20 @@ def recommendation_schema() -> Dict[str, Any]: return subschemas +def lists_schema() -> dict[Required, Any]: + schema = { + Required("list_id"): cv.string, + Required("friendly_name"): cv.string, + Required("max_medias", default=3): cv.positive_int, + Required("private_list", default=False): cv.boolean, + Required("media_type", default=""): cv.string, + Required("sort_by", default="rank"): In(SORT_BY_OPTIONS), + Required("sort_order", default="asc"): In(SORT_HOW_OPTIONS), + } + + return schema + + def stats_schema() -> list[str]: return [ "all", diff --git a/custom_components/trakt_tv/sensor.py b/custom_components/trakt_tv/sensor.py index 91558c8..a145416 100644 --- a/custom_components/trakt_tv/sensor.py +++ b/custom_components/trakt_tv/sensor.py @@ -69,6 +69,25 @@ async def async_setup_entry(hass, config_entry, async_add_entities): ) sensors.append(sensor) + for trakt_kind in TraktKind: + if trakt_kind != TraktKind.LIST: + continue + + identifier = trakt_kind.value.identifier + + if configuration.source_exists(identifier): + for list_entry in configuration.get_sensor_config(identifier): + sensor = TraktSensor( + hass=hass, + config_entry=list_entry, + coordinator=coordinator, + trakt_kind=trakt_kind, + source=identifier, + prefix=f"Trakt List {list_entry['friendly_name']}", + mdi_icon="mdi:view-list", + ) + sensors.append(sensor) + # Add sensors for stats if configuration.source_exists("stats"): stats = {} @@ -133,13 +152,26 @@ def __init__( @property def name(self): """Return the name of the sensor.""" + if not self.trakt_kind.value.name: + return f"{self.prefix}" return f"{self.prefix} {self.trakt_kind.value.name}" @property def medias(self): - if self.coordinator.data: - return self.coordinator.data.get(self.source, {}).get(self.trakt_kind, None) - return None + if not self.coordinator.data: + return None + + if self.trakt_kind == TraktKind.LIST: + try: + name = self.config_entry["friendly_name"] + return self.coordinator.data[self.source][self.trakt_kind][name] + except KeyError: + return None + + try: + return self.coordinator.data[self.source][self.trakt_kind] + except KeyError: + return None @property def configuration(self): @@ -152,10 +184,17 @@ def configuration(self): @property def data(self): - if self.medias: - max_medias = self.configuration["max_medias"] - return self.medias.to_homeassistant()[0 : max_medias + 1] - return [] + if not self.medias: + return [] + + if self.trakt_kind == TraktKind.LIST: + sort_by = self.config_entry["sort_by"] + sort_order = self.config_entry["sort_order"] + max_medias = self.config_entry["max_medias"] + return self.medias.to_homeassistant(sort_by, sort_order)[0 : max_medias + 1] + + max_medias = self.configuration["max_medias"] + return self.medias.to_homeassistant()[0 : max_medias + 1] @property def state(self): diff --git a/custom_components/trakt_tv/utils.py b/custom_components/trakt_tv/utils.py index edb50cc..5f0373e 100644 --- a/custom_components/trakt_tv/utils.py +++ b/custom_components/trakt_tv/utils.py @@ -1,6 +1,6 @@ import json import time -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from math import ceil from typing import Any, Dict, List, Optional, Tuple @@ -92,3 +92,14 @@ def cache_retrieve(cache: Dict[str, Any], key: str) -> Optional[Any]: return None else: return None + + +def parse_utc_date(date_str: Optional[str]) -> Optional[datetime]: + """ + Parse an ISO date string (all dates returned from Trakt) to a datetime object. + """ + return ( + datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc) + if date_str + else None + )