diff --git a/unshackle/services/CWTV/__init__.py b/unshackle/services/CWTV/__init__.py new file mode 100644 index 0000000..60fd94a --- /dev/null +++ b/unshackle/services/CWTV/__init__.py @@ -0,0 +1,278 @@ +from __future__ import annotations + +import json +import re +from collections.abc import Generator +from datetime import timedelta +from typing import Any +from urllib.parse import quote, urljoin + +import click +from click import Context + +try: + from devine.core.manifests import DASH # type: ignore + from devine.core.search_result import SearchResult # type: ignore + from devine.core.service import Service # type: ignore + from devine.core.titles import Episode, Movie, Movies, Series # type: ignore + from devine.core.tracks import Chapter, Chapters, Tracks # type: ignore +except ImportError: + try: + from unshackle.core.manifests import DASH + from unshackle.core.search_result import SearchResult + from unshackle.core.service import Service + from unshackle.core.titles import Episode, Movie, Movies, Series + from unshackle.core.tracks import Chapter, Chapters, Tracks + except ImportError: + raise ImportError("CWTV service requires devine or unshackle to be installed") + +from lxml import etree +from requests import Request + + +class CWTV(Service): + """ + \b + Service code for CWTV streaming service (https://www.cwtv.com/). + + \b + Version: 1.0.0 + Author: stabbedbybrick + Authorization: None + Geofence: US (API and downloads) + Robustness: + L3: 1080p, AAC2.0 + + \b + Tips: + - Input should be complete URL: + SHOW: https://www.cwtv.com/shows/sullivans-crossing + EPISODE: https://www.cwtv.com/series/sullivans-crossing/new-beginnings/?play=7778f443-c7cc-4843-8e3c-d97d53b813d2 + MOVIE: https://www.cwtv.com/movies/burnt/ + """ + + GEOFENCE = ("us",) + ALIASES = ("cw",) + + @staticmethod + @click.command(name="CWTV", short_help="https://www.cwtv.com/", help=__doc__) + @click.argument("title", type=str) + @click.pass_context + def cli(ctx: Context, **kwargs: Any) -> CWTV: + return CWTV(ctx, **kwargs) + + def __init__(self, ctx: Context, title: str): + self.title = title + super().__init__(ctx) + + self.session.headers.update(self.config["headers"]) + + def search(self) -> Generator[SearchResult, None, None]: + results = self._request( + "GET", "https://www.cwtv.com/search/", + params={ + "q": quote(self.title), + "format": "json2", + "service": "t", + "cwuid": "8195356001251527455", + }, + ) + + for result in results["items"]: + if result.get("type") not in ("shows", "series", "movies"): + continue + + video_type = "shows" if result.get("type") in ("series", "shows") else "movies" + + yield SearchResult( + id_=f"https://www.cwtv.com/{video_type}/{result.get('show_slug')}", + title=result.get("title"), + description=result.get("description_long"), + label=result.get("type").capitalize(), + url=f"https://www.cwtv.com/{video_type}/{result.get('show_slug')}", + ) + + def get_titles(self) -> Movies | Series: + url_pattern = re.compile( + r"^https:\/\/www\.cwtv\.com\/" + r"(?Pseries|shows|movies)\/" + r"(?P[\w-]+(?:\/[\w-]+)?)" + r"(?:\/?\?play=(?P[\w-]+))?" + ) + + match = url_pattern.match(self.title) + if not match: + raise ValueError(f"Could not parse ID from title: {self.title}") + + kind, guid, play_id = (match.group(i) for i in ("type", "id", "play_id")) + + if kind in ("series", "shows") and not play_id: + episodes = self._series(guid) + return Series(episodes) + + elif kind == "movies" and not play_id: + movie = self._movie(guid) + return Movies(movie) + + elif kind in ("series", "shows") and play_id: + episode = self._episode(play_id) + return Series(episode) + + else: + raise ValueError(f"Could not parse conent type from title: {self.title}") + + def get_tracks(self, title: Movie | Episode) -> Tracks: + data = self._request( + "GET", self.config["endpoints"]["playback"].format(title.id), + headers={"accept": f'application/json;pk={self.config["policy_key"]}'}, + ) + has_drm = data.get("custom_fields", {}).get("is_drm") == "1" + + title.data["chapters"] = data.get("cue_points") + + source_manifest = next( + (source.get("src") for source in data["sources"] if source.get("type") == "application/dash+xml"), + None, + ) + if not source_manifest: + raise ValueError("Could not find DASH manifest") + + license_url = next(( + source.get("key_systems", {}).get("com.widevine.alpha", {}).get("license_url") + for source in data["sources"] if source.get("src") == source_manifest), + None, + ) + if has_drm and not license_url: + raise ValueError("Could not find license URL") + + title.data["license_url"] = license_url + + manifest = self.trim_duration(source_manifest) + tracks = DASH.from_text(manifest, source_manifest).to_tracks(language="en") + + for track in tracks.audio: + role = track.data["dash"]["representation"].find("Role") + if role is not None and role.get("value") in ["description", "alternative", "alternate"]: + track.descriptive = True + + return tracks + + def get_chapters(self, title: Movie | Episode) -> Chapters: + if not title.data.get("chapters"): + return Chapters() + + chapters = [] + for cue in title.data["chapters"]: + if cue["time"] > 0: + chapters.append(Chapter(timestamp=cue["time"])) + + return Chapters(chapters) + + def get_widevine_service_certificate(self, **_: Any) -> str: + return None + + def get_widevine_license(self, *, challenge: bytes, title: Movie | Episode, track: Any) -> bytes | str | None: + if license_url := title.data.get("license_url"): + r = self.session.post(url=license_url, data=challenge) + if r.status_code != 200: + raise ConnectionError(r.text) + return r.content + + return None + + # Service specific + + def _series(self, guid: str) -> list[Episode]: + series = self._request("GET", f"/feed/app-2/videos/show_{guid}/type_episodes/apiversion_24/device_androidtv") + if not series.get("items"): + raise ValueError(f"Could not find any episodes with ID {guid}") + + episodes = [ + Episode( + id_=episode.get("bc_video_id"), + service=self.__class__, + name=episode.get("title"), + season=int(episode.get("season") or 0), + number=int(episode.get("episode_in_season") or 0), + title=episode.get("series_name") or episode.get("show_title"), + year=episode.get("release_year"), + data=episode, + ) + for episode in series.get("items") + if episode.get("fullep", 0) == 1 + ] + + return episodes + + def _movie(self, guid: str) -> Movie: + data = self._request("GET", f"/feed/app-2/videos/show_{guid}/type_episodes/apiversion_24/device_androidtv") + if not data.get("items"): + raise ValueError(f"Could not find any data for ID {guid}") + + movies = [ + Movie( + id_=movie.get("bc_video_id"), + service=self.__class__, + name=movie.get("series_name") or movie.get("show_title"), + year=movie.get("release_year"), + data=movie, + ) + for movie in data.get("items") + if movie.get("fullep", 0) == 1 + ] + + return movies + + def _episode(self, guid: str) -> Episode: + data = self._request("GET", f"/feed/app-2/video-meta/guid_{guid}/apiversion_24/device_androidtv") + if not data.get("video"): + raise ValueError(f"Could not find any data for ID {guid}") + + episodes = [ + Episode( + id_=data.get("video", {}).get("bc_video_id"), + service=self.__class__, + name=data.get("video", {}).get("title"), + season=int(data.get("video", {}).get("season") or 0), + number=int(data.get("video", {}).get("episode_in_season") or 0), + title=data.get("video", {}).get("series_name") or data.get("video", {}).get("show_title"), + year=data.get("video", {}).get("release_year"), + data=data.get("video"), + ) + ] + + return episodes + + def _request(self, method: str, endpoint: str, **kwargs: Any) -> Any[dict | str]: + url = urljoin(self.config["endpoints"]["base_url"], endpoint) + + prep = self.session.prepare_request(Request(method, url, **kwargs)) + + response = self.session.send(prep) + if response.status_code != 200: + raise ConnectionError(f"{response.text}") + + try: + return json.loads(response.content) + + except json.JSONDecodeError: + return response.text + + @staticmethod + def trim_duration(source_manifest: str) -> str: + """ + The last segment on all tracks return a 404 for some reason, causing a failed download. + So we trim the duration by exactly one segment to account for that. + + TODO: Calculate the segment duration instead of assuming length. + """ + manifest = DASH.from_url(source_manifest).manifest + period_duration = manifest.get("mediaPresentationDuration") + period_duration = DASH.pt_to_sec(period_duration) + + hours, minutes, seconds = str(timedelta(seconds=period_duration - 6)).split(":") + new_duration = f"PT{hours}H{minutes}M{seconds}S" + manifest.set("mediaPresentationDuration", new_duration) + + return etree.tostring(manifest, encoding="unicode") + diff --git a/unshackle/services/CWTV/config.yaml b/unshackle/services/CWTV/config.yaml new file mode 100644 index 0000000..c37cf6c --- /dev/null +++ b/unshackle/services/CWTV/config.yaml @@ -0,0 +1,9 @@ +headers: + User-Agent: Mozilla/5.0 (Linux; Android 11; Smart TV Build/AR2101; wv) + +endpoints: + base_url: https://images.cwtv.com + playback: https://edge.api.brightcove.com/playback/v1/accounts/6415823816001/videos/{} + +policy_key: BCpkADawqM0t2qFXB_K2XdHv2JmeRgQjpP6De9_Fl7d4akhL5aeqYwErorzsAxa7dyOF2FdxuG5wWVOREHEwb0DI-M8CGBBDpqwvDBEPfDKQg7kYGnccdNDErkvEh2O28CrGR3sEG6MZBlZ03I0xH7EflYKooIhfwvNWWw + \ No newline at end of file diff --git a/unshackle/services/PLEX/__init__.py b/unshackle/services/PLEX/__init__.py new file mode 100644 index 0000000..964e9d5 --- /dev/null +++ b/unshackle/services/PLEX/__init__.py @@ -0,0 +1,330 @@ +from __future__ import annotations + +import json +import re +import uuid +from collections.abc import Generator +from typing import Any, Optional +from urllib.parse import urlparse, urljoin, quote +from http.cookiejar import MozillaCookieJar +from concurrent.futures import ThreadPoolExecutor + +import click +from click import Context + +try: + from devine.core.credential import Credential # type: ignore + from devine.core.manifests import DASH, HLS # type: ignore + from devine.core.search_result import SearchResult # type: ignore + from devine.core.service import Service # type: ignore + from devine.core.titles import Episode, Movie, Movies, Series # type: ignore + from devine.core.tracks import Chapter, Chapters, Tracks # type: ignore +except ImportError: + try: + from unshackle.core.credential import Credential + from unshackle.core.manifests import DASH, HLS + from unshackle.core.search_result import SearchResult + from unshackle.core.service import Service + from unshackle.core.titles import Episode, Movie, Movies, Series + from unshackle.core.tracks import Chapter, Chapters, Tracks + except ImportError: + raise ImportError("PLEX service requires devine or unshackle to be installed") + +from requests import Request + + +class PLEX(Service): + """ + \b + Service code for Plex's free streaming service (https://watch.plex.tv/). + + \b + Version: 1.0.0 + Author: stabbedbybrick + Authorization: None + Geofence: API and downloads are locked into whatever region the user is in + Robustness: + L3: 720p, AAC2.0 + + \b + Tips: + - Input should be complete URL: + SHOW: https://watch.plex.tv/show/taboo-2017 + EPISODE: https://watch.plex.tv/show/taboo-2017/season/1/episode/1 + MOVIE: https://watch.plex.tv/movie/the-longest-yard + """ + + ALIASES = ("plextv",) + + @staticmethod + @click.command(name="PLEX", short_help="https://watch.plex.tv/", help=__doc__) + @click.argument("title", type=str) + @click.pass_context + def cli(ctx: Context, **kwargs: Any) -> PLEX: + return PLEX(ctx, **kwargs) + + def __init__(self, ctx: Context, title: str): + self.title = title + super().__init__(ctx) + + def authenticate(self, cookies: Optional[MozillaCookieJar] = None, credential: Optional[Credential] = None) -> None: + super().authenticate(cookies, credential) + + self.session.headers.update( + { + "accept": "application/json", + "x-plex-client-identifier": str(uuid.uuid4()), + "x-plex-language": "en", + "x-plex-product": "Plex Mediaverse", + "x-plex-provider-version": "6.5.0", + } + ) + user = self._request("POST", self.config["endpoints"]["user"]) + if not (auth_token := user.get("authToken")): + raise ValueError(f"PLEX authentication failed: {user}") + + self.auth_token = auth_token + self.session.headers.update({"x-plex-token": self.auth_token}) + + + def search(self) -> Generator[SearchResult, None, None]: + results = self._request( + "GET", "https://discover.provider.plex.tv/library/search", + params={ + "searchTypes": "movies,tv", + "searchProviders": "discover,plexAVOD,plexFAST", + "includeMetadata": 1, + "filterPeople": 1, + "limit": 10, + "query": quote(self.title), + }, + ) + + for result in results["MediaContainer"]["SearchResults"]: + if "free on demand" not in result.get("title", "").lower(): + continue + + for result in result["SearchResult"]: + kind = result.get("Metadata", {}).get("type") + slug = result.get("Metadata", {}).get("slug") + + yield SearchResult( + id_=f"https://watch.plex.tv/{kind}/{slug}", + title=result.get("Metadata", {}).get("title"), + description=result.get("Metadata", {}).get("description"), + label=kind, + url=f"https://watch.plex.tv/{kind}/{slug}", + ) + + def get_titles(self) -> Movies | Series: + url_pattern = re.compile( + r"^https://watch.plex.tv/" + r"(?Pmovie|show)/" + r"(?P[\w-]+)" + r"(?P(/season/\d+/episode/\d+))?" + ) + + match = url_pattern.match(self.title) + if not match: + raise ValueError(f"Could not parse ID from title: {self.title}") + + kind, guid, path = (match.group(i) for i in ("type", "id", "url_path")) + + if kind == "show": + if path is not None: + episode = self._episode(urlparse(self.title).path) + return Series(episode) + + episodes = self._series(guid) + return Series(episodes) + + elif kind == "movie": + movie = self._movie(guid) + return Movies(movie) + + else: + raise ValueError(f"Could not parse content type from title: {self.title}") + + def get_tracks(self, title: Movie | Episode) -> Tracks: + dash_media = next((x for x in title.data.get("Media", []) if x.get("protocol", "").lower() == "dash"), None) + if not dash_media: + hls_media = next((x for x in title.data.get("Media", []) if x.get("protocol", "").lower() == "hls"), None) + + media = dash_media or hls_media + if not media: + raise ValueError("Failed to find either DASH or HLS media") + + manifest = DASH if dash_media else HLS + + media_key = media.get("id") + has_drm = media.get("drm") + + if has_drm: + manifest_url = ( + self.config["endpoints"]["base_url"] + + self.config["endpoints"]["manifest_drm"].format(media_key, self.auth_token) + ) + title.data["license_url"] = ( + self.config["endpoints"]["base_url"] + + self.config["endpoints"]["license"].format(media_key, self.auth_token) + ) + else: + manifest_url = ( + self.config["endpoints"]["base_url"] + + self.config["endpoints"]["manifest_clear"].format(media_key, self.auth_token) + ) + title.data["license_url"] = None + + tracks = manifest.from_url(manifest_url, self.session).to_tracks(language="en") + + return tracks + + def get_chapters(self, title: Movie | Episode) -> Chapters: + if not (markers := title.data.get("Marker", [])): + try: + metadata = self._request( + "POST", "/playQueues", + params={ + "uri": self.config["endpoints"]["provider"] + title.data.get("key"), + "type": "video", + "continuous": "1", + }, + ) + markers = next(( + x.get("Marker") for x in metadata.get("MediaContainer", {}).get("Metadata", []) + if x.get("key") == title.data.get("key")), []) + + except Exception as e: + self.log.debug("Failed to fetch markers: %s", e) + pass + + chapters = [] + for cue in markers: + if cue.get("startTimeOffset", 0) > 0: + chapters.append(Chapter(name=cue.get("type", "").title(), timestamp=cue.get("startTimeOffset"))) + + return Chapters(chapters) + + def get_widevine_service_certificate(self, **_: Any) -> str: + return None + + def get_widevine_license(self, *, challenge: bytes, title: Movie | Episode, track: Any) -> bytes | str | None: + if license_url := title.data.get("license_url"): + r = self.session.post(url=license_url, data=challenge) + if r.status_code != 200: + raise ConnectionError(r.text) + return r.content + + return None + + # Service specific + + def _fetch_season(self, url: str) -> list: + return self._request("GET", url).get("MediaContainer", {}).get("Metadata", []) + + def _series(self, guid: str) -> list[Episode]: + data = self._request("GET", f"/library/metadata/show:{guid}") + + meta_key = data.get("MediaContainer", {}).get("Metadata", [])[0].get("key") + if not meta_key: + raise ValueError("Failed to find metadata for title") + + series = self._request("GET", f"{self.config['endpoints']['base_url']}/{meta_key}") + + seasons = [ + self.config["endpoints"]["base_url"] + item.get("key") + for item in series.get("MediaContainer", {}).get("Metadata", []) + if item.get("type") == "season" + ] + if not seasons: + raise ValueError("Failed to find seasons for title") + + with ThreadPoolExecutor(max_workers=10) as executor: + results = list(executor.map(self._fetch_season, seasons)) + + episodes = [ + Episode( + id_=episode.get("ratingKey"), + service=self.__class__, + name=episode.get("title"), + season=int(episode.get("parentIndex", 0)), + number=int(episode.get("index", 0)), + title=re.sub(r"\s*\(\d{4}\)", "", episode.get("grandparentTitle", "")), + # year=episode.get("year"), + data=episode, + ) + for season in results + for episode in season + if episode.get("type") == "episode" + ] + + return episodes + + def _movie(self, guid: str) -> Movie: + data = self._request("GET", f"/library/metadata/movie:{guid}") + movie = data.get("MediaContainer", {}).get("Metadata", [])[0] + if not movie: + raise ValueError(f"Could not find any data for ID {guid}") + + movies = [ + Movie( + id_=movie.get("ratingKey"), + service=self.__class__, + name=movie.get("title"), + year=movie.get("year"), + data=movie, + ) + ] + + return movies + + def _episode(self, path: str) -> Episode: + data = self._request("GET", self.config["endpoints"]["screen"] + path) + meta_key = data.get("actions", [])[0].get("data", {}).get("key") + if not meta_key: + raise ValueError("Failed to find metadata for title") + + metadata = self._request( + "POST", "/playQueues", + params={ + "uri": self.config["endpoints"]["provider"] + meta_key, + "type": "video", + "continuous": "1", + }, + ) + + episode = next((x for x in metadata.get("MediaContainer", {}).get("Metadata", []) if x.get("key") == meta_key), None) + if not episode: + raise ValueError("Failed to find metadata for title") + + episodes = [ + Episode( + id_=episode.get("ratingKey"), + service=self.__class__, + name=episode.get("title"), + season=int(episode.get("parentIndex", 0)), + number=int(episode.get("index", 0)), + title=re.sub(r"\s*\(\d{4}\)", "", episode.get("grandparentTitle", "")), + # year=episode.get("year"), + data=episode, + ) + ] + + return episodes + + def _request(self, method: str, endpoint: str, **kwargs: Any) -> Any[dict | str]: + url = urljoin(self.config["endpoints"]["base_url"], endpoint) + + prep = self.session.prepare_request(Request(method, url, **kwargs)) + + response = self.session.send(prep) + if response.status_code not in (200, 201, 426): + raise ConnectionError(f"{response.text}") + + try: + return json.loads(response.content) + + except json.JSONDecodeError: + return response.text + + diff --git a/unshackle/services/PLEX/config.yaml b/unshackle/services/PLEX/config.yaml new file mode 100644 index 0000000..5f9c165 --- /dev/null +++ b/unshackle/services/PLEX/config.yaml @@ -0,0 +1,12 @@ +headers: + User-Agent: Mozilla/5.0 (Linux; Android 11; Smart TV Build/AR2101; wv) + +endpoints: + base_url: https://vod.provider.plex.tv + user: https://plex.tv/api/v2/users/anonymous + screen: https://luma.plex.tv/api/screen + provider: provider://tv.plex.provider.vod + manifest_clear: /library/parts/{}?includeAllStreams=1&X-Plex-Product=Plex+Mediaverse&X-Plex-Token={} + manifest_drm: /library/parts/{}?includeAllStreams=1&X-Plex-Product=Plex+Mediaverse&X-Plex-Token={}&X-Plex-DRM=widevine + license: /library/parts/{}/license?X-Plex-Token={}&X-Plex-DRM=widevine + \ No newline at end of file