diff --git a/services/bbc.py b/services/bbc.py new file mode 100644 index 0000000..add0878 --- /dev/null +++ b/services/bbc.py @@ -0,0 +1,305 @@ +""" +BBC iplayer +Author: stabbedbybrick + +Info: +up to 1080p + +""" + +import subprocess +import re + +from collections import Counter +from urllib.parse import urlparse, urlunparse + +import click +import requests + +from bs4 import BeautifulSoup + +from helpers.utilities import ( + info, + string_cleaning, + set_save_path, + print_info, + set_filename, +) +from helpers.titles import Episode, Series, Movie, Movies +from helpers.args import Options, get_args +from helpers.config import Config + + +class BBC(Config): + def __init__(self, config, srvc, **kwargs): + super().__init__(config, srvc, **kwargs) + + self.get_options() + + def get_data(self, pid: str, slice_id:str) -> dict: + + json_data = { + 'id': '9fd1636abe711717c2baf00cebb668de', + 'variables': { + 'id': pid, + 'perPage': 200, + 'page': 1, + 'sliceId': slice_id if slice_id else None, + }, + } + + response = self.client.post(self.srvc["bbc"]["api"], json=json_data).json() + + return response["data"]["programme"] + + def create_episode(self, episode): + subtitle = episode["episode"]["subtitle"] + title = subtitle.get("default") or subtitle.get("slice") or "" + season_match = re.search(r"Series (\d+):", subtitle.get("default")) + season = int(season_match.group(1)) if season_match else 0 + number_match = re.finditer(r"(\d+)\.|Episode (\d+)|Week (\d+)", title) + number = int(next((m.group(1) or m.group(2) or m.group(3) for m in number_match), 0)) + name_match = re.search(r"\d+\. (.+)", subtitle.get("slice") or subtitle.get("default") or "") + name = name_match.group(1) if name_match else "" + + return Episode( + id_=episode["episode"]["id"], + service="iPLAYER", + title=episode["episode"]["title"]["default"], + season=season, + number=number, + name=name, + description=episode["episode"]["synopsis"].get("small"), + ) + + def get_series(self, pid: str, slice_id:str) -> Series: + data = self.get_data(pid, slice_id) + seasons = [self.get_data(pid, x["id"]) for x in data["slices"] or [{"id": None}]] + + episodes = [ + self.create_episode(episode) + for season in seasons + for episode in season["entities"]["results"] + ] + return Series(episodes) + + def get_movies(self, pid: str, slice_id: str) -> Movies: + data = self.get_data(pid, slice_id) + + return Movies( + [ + Movie( + id_=data["id"], + service="iPLAYER", + title=data["title"]["default"], + year=None, # TODO + name=data["title"]["default"], + synopsis=data["synopsis"].get("small"), + ) + ] + ) + + def add_stream(self, soup: object, init: str) -> object: + representation = soup.new_tag("Representation", + id="video=12000000", + bandwidth="8490000", + width="1920", + height="1080", + frameRate="50", + codecs="avc3.640020", + scanType="progressive", + ) + + template = soup.new_tag("SegmentTemplate", + timescale="5000", + duration="19200", + initialization=f"{init}-$RepresentationID$.dash", + media=f"{init}-$RepresentationID$-$Number$.m4s", + ) + + representation.append(template) + + soup.find("AdaptationSet", {"contentType": "video"}).append(representation) + + return soup + + def get_playlist(self, pid: str) -> tuple: + resp = self.client.get( + self.srvc["bbc"]["playlist"].format(pid=pid)).json() + + vpid = resp["defaultAvailableVersion"]["smpConfig"]["items"][0]["vpid"] + + media = self.client.get( + self.srvc["bbc"]["media"].format(vpid=vpid) + ).json() + + subtitle = None + + for item in media["media"]: + if item["kind"] == "video": + videos = item["connection"] + + for item in media["media"]: + if item["kind"] == "captions": + captions = item["connection"] + + for video in videos: + if video["supplier"] == "mf_bidi" and video["transferFormat"] == "dash": # TODO + manifest = video["href"] + + for caption in captions: + if caption["supplier"] == "mf_bidi" or "mf_cloudfront": + subtitle = caption["href"] + + soup = BeautifulSoup(requests.get(manifest).content, "xml") + + parse = urlparse(manifest) + _path = parse.path.split("/") + _path[-1] = "dash/" + init = _path[-2].replace(".ism", "") + + base_url = urlunparse(parse._replace( + scheme="https", + netloc=self.srvc["bbc"]["base"], + path="/".join(_path), + query="" + ) + ) + soup.select_one("BaseURL").string = base_url + + tag = soup.find(id="video=5070000") + if tag: + soup = self.add_stream(soup, init) + + with open(self.tmp / "manifest.mpd", "w") as f: + f.write(str(soup.prettify())) + + self.soup = soup + return soup, subtitle + + def get_mediainfo(self, soup: object, quality: str) -> str: + elements = soup.find_all("Representation") + heights = sorted( + [int(x.attrs["height"]) for x in elements if x.attrs.get("height")], + reverse=True, + ) + + if quality is not None: + if int(quality) in heights: + return quality + else: + closest_match = min(heights, key=lambda x: abs(int(x) - int(quality))) + return closest_match + + return heights[0] + + def get_content(self, url: str) -> object: + if self.movie: + with self.console.status("Fetching titles..."): + parse = urlparse(url) + pid = parse.path.split("/")[3] + slice_id = parse.query.split("=")[1] if parse.query else None + content = self.get_movies(pid, slice_id) + title = string_cleaning(str(content)) + + info(f"{str(content)}\n") + + else: + with self.console.status("Fetching titles..."): + parse = urlparse(url) + pid = parse.path.split("/")[3] + slice_id = parse.query.split("=")[1] if parse.query else None + content = self.get_series(pid, slice_id) + + counter = 1 + for episode in content: + episode.name = episode.get_filename() + if "E00" in episode.name: + episode.name = episode.name.replace("E00", f"E{counter:03d}") + counter += 1 + + title = string_cleaning(str(content)) + seasons = Counter(x.season for x in content) + num_seasons = len(seasons) + num_episodes = sum(seasons.values()) + + info( + f"{str(content)}: {num_seasons} Season(s), {num_episodes} Episode(s)\n" + ) + + return content, title + + def get_options(self) -> None: + opt = Options(self) + content, title = self.get_content(self.url) + + if self.episode: + downloads = opt.get_episode(content) + if self.season: + downloads = opt.get_season(content) + if self.complete: + downloads = opt.get_complete(content) + if self.movie: + downloads = opt.get_movie(content) + if self.titles: + opt.list_titles(content) + + for download in downloads: + self.download(download, title) + + def clean_subtitles(self, subtitle: str, filename: str): + """ + Temporary solution, but seems to work for the most part + """ + with self.console.status("Cleaning up subtitles..."): + soup = BeautifulSoup(requests.get(subtitle).content, "xml") + for tag in soup.find_all(): + if tag.name != "p" and tag.name != "br" and tag.name != "span": + tag.unwrap() + + for br in soup.find_all("br"): + br.replace_with(" ") + + srt = "" + for i, tag in enumerate(soup.find_all("p")): + start = tag["begin"] + end = tag["end"] + text = tag.get_text().strip() + srt += f"{i+1}\n{start.replace('.', ',')} --> {end.replace('.', ',')}\n{text}\n\n" + + with open(self.tmp / f"{filename}.srt", "w") as f: + f.write(srt) + + self.sub_path = self.tmp / f"{filename}.srt" + + def download(self, stream: object, title: str) -> None: + with self.console.status("Getting media info..."): + soup, subtitle = self.get_playlist(stream.id) + res = self.get_mediainfo(soup, self.quality) + + self.filename = set_filename(self, stream, res, audio="AAC2.0") + self.save_path = set_save_path(stream, self.config, title) + self.manifest = self.tmp / "manifest.mpd" + self.key_file = None # not encrypted + self.sub_path = None + + if subtitle is not None: + self.clean_subtitles(subtitle, self.filename) + + if self.info: + print_info(self, stream, keys=None) + + info(f"{stream.name}") + click.echo("") + + args, file_path = get_args(self, res) + + if not file_path.exists(): + try: + subprocess.run(args, check=True) + except: + raise ValueError("Download failed or was interrupted") + else: + info(f"{self.filename} already exist. Skipping download\n") + self.sub_path.unlink() if self.sub_path else None + pass \ No newline at end of file