bbc service

This commit is contained in:
stabbedbybrick
2023-09-30 19:37:11 +02:00
parent eb59aec1be
commit ba6ef7385f

305
services/bbc.py Normal file
View File

@@ -0,0 +1,305 @@
"""
BBC iplayer
Author: stabbedbybrick
Info:
up to 1080p
"""
import subprocess
import re
from collections import Counter
from urllib.parse import urlparse, urlunparse
import click
import requests
from bs4 import BeautifulSoup
from helpers.utilities import (
info,
string_cleaning,
set_save_path,
print_info,
set_filename,
)
from helpers.titles import Episode, Series, Movie, Movies
from helpers.args import Options, get_args
from helpers.config import Config
class BBC(Config):
def __init__(self, config, srvc, **kwargs):
super().__init__(config, srvc, **kwargs)
self.get_options()
def get_data(self, pid: str, slice_id:str) -> dict:
json_data = {
'id': '9fd1636abe711717c2baf00cebb668de',
'variables': {
'id': pid,
'perPage': 200,
'page': 1,
'sliceId': slice_id if slice_id else None,
},
}
response = self.client.post(self.srvc["bbc"]["api"], json=json_data).json()
return response["data"]["programme"]
def create_episode(self, episode):
subtitle = episode["episode"]["subtitle"]
title = subtitle.get("default") or subtitle.get("slice") or ""
season_match = re.search(r"Series (\d+):", subtitle.get("default"))
season = int(season_match.group(1)) if season_match else 0
number_match = re.finditer(r"(\d+)\.|Episode (\d+)|Week (\d+)", title)
number = int(next((m.group(1) or m.group(2) or m.group(3) for m in number_match), 0))
name_match = re.search(r"\d+\. (.+)", subtitle.get("slice") or subtitle.get("default") or "")
name = name_match.group(1) if name_match else ""
return Episode(
id_=episode["episode"]["id"],
service="iPLAYER",
title=episode["episode"]["title"]["default"],
season=season,
number=number,
name=name,
description=episode["episode"]["synopsis"].get("small"),
)
def get_series(self, pid: str, slice_id:str) -> Series:
data = self.get_data(pid, slice_id)
seasons = [self.get_data(pid, x["id"]) for x in data["slices"] or [{"id": None}]]
episodes = [
self.create_episode(episode)
for season in seasons
for episode in season["entities"]["results"]
]
return Series(episodes)
def get_movies(self, pid: str, slice_id: str) -> Movies:
data = self.get_data(pid, slice_id)
return Movies(
[
Movie(
id_=data["id"],
service="iPLAYER",
title=data["title"]["default"],
year=None, # TODO
name=data["title"]["default"],
synopsis=data["synopsis"].get("small"),
)
]
)
def add_stream(self, soup: object, init: str) -> object:
representation = soup.new_tag("Representation",
id="video=12000000",
bandwidth="8490000",
width="1920",
height="1080",
frameRate="50",
codecs="avc3.640020",
scanType="progressive",
)
template = soup.new_tag("SegmentTemplate",
timescale="5000",
duration="19200",
initialization=f"{init}-$RepresentationID$.dash",
media=f"{init}-$RepresentationID$-$Number$.m4s",
)
representation.append(template)
soup.find("AdaptationSet", {"contentType": "video"}).append(representation)
return soup
def get_playlist(self, pid: str) -> tuple:
resp = self.client.get(
self.srvc["bbc"]["playlist"].format(pid=pid)).json()
vpid = resp["defaultAvailableVersion"]["smpConfig"]["items"][0]["vpid"]
media = self.client.get(
self.srvc["bbc"]["media"].format(vpid=vpid)
).json()
subtitle = None
for item in media["media"]:
if item["kind"] == "video":
videos = item["connection"]
for item in media["media"]:
if item["kind"] == "captions":
captions = item["connection"]
for video in videos:
if video["supplier"] == "mf_bidi" and video["transferFormat"] == "dash": # TODO
manifest = video["href"]
for caption in captions:
if caption["supplier"] == "mf_bidi" or "mf_cloudfront":
subtitle = caption["href"]
soup = BeautifulSoup(requests.get(manifest).content, "xml")
parse = urlparse(manifest)
_path = parse.path.split("/")
_path[-1] = "dash/"
init = _path[-2].replace(".ism", "")
base_url = urlunparse(parse._replace(
scheme="https",
netloc=self.srvc["bbc"]["base"],
path="/".join(_path),
query=""
)
)
soup.select_one("BaseURL").string = base_url
tag = soup.find(id="video=5070000")
if tag:
soup = self.add_stream(soup, init)
with open(self.tmp / "manifest.mpd", "w") as f:
f.write(str(soup.prettify()))
self.soup = soup
return soup, subtitle
def get_mediainfo(self, soup: object, quality: str) -> str:
elements = soup.find_all("Representation")
heights = sorted(
[int(x.attrs["height"]) for x in elements if x.attrs.get("height")],
reverse=True,
)
if quality is not None:
if int(quality) in heights:
return quality
else:
closest_match = min(heights, key=lambda x: abs(int(x) - int(quality)))
return closest_match
return heights[0]
def get_content(self, url: str) -> object:
if self.movie:
with self.console.status("Fetching titles..."):
parse = urlparse(url)
pid = parse.path.split("/")[3]
slice_id = parse.query.split("=")[1] if parse.query else None
content = self.get_movies(pid, slice_id)
title = string_cleaning(str(content))
info(f"{str(content)}\n")
else:
with self.console.status("Fetching titles..."):
parse = urlparse(url)
pid = parse.path.split("/")[3]
slice_id = parse.query.split("=")[1] if parse.query else None
content = self.get_series(pid, slice_id)
counter = 1
for episode in content:
episode.name = episode.get_filename()
if "E00" in episode.name:
episode.name = episode.name.replace("E00", f"E{counter:03d}")
counter += 1
title = string_cleaning(str(content))
seasons = Counter(x.season for x in content)
num_seasons = len(seasons)
num_episodes = sum(seasons.values())
info(
f"{str(content)}: {num_seasons} Season(s), {num_episodes} Episode(s)\n"
)
return content, title
def get_options(self) -> None:
opt = Options(self)
content, title = self.get_content(self.url)
if self.episode:
downloads = opt.get_episode(content)
if self.season:
downloads = opt.get_season(content)
if self.complete:
downloads = opt.get_complete(content)
if self.movie:
downloads = opt.get_movie(content)
if self.titles:
opt.list_titles(content)
for download in downloads:
self.download(download, title)
def clean_subtitles(self, subtitle: str, filename: str):
"""
Temporary solution, but seems to work for the most part
"""
with self.console.status("Cleaning up subtitles..."):
soup = BeautifulSoup(requests.get(subtitle).content, "xml")
for tag in soup.find_all():
if tag.name != "p" and tag.name != "br" and tag.name != "span":
tag.unwrap()
for br in soup.find_all("br"):
br.replace_with(" ")
srt = ""
for i, tag in enumerate(soup.find_all("p")):
start = tag["begin"]
end = tag["end"]
text = tag.get_text().strip()
srt += f"{i+1}\n{start.replace('.', ',')} --> {end.replace('.', ',')}\n{text}\n\n"
with open(self.tmp / f"{filename}.srt", "w") as f:
f.write(srt)
self.sub_path = self.tmp / f"{filename}.srt"
def download(self, stream: object, title: str) -> None:
with self.console.status("Getting media info..."):
soup, subtitle = self.get_playlist(stream.id)
res = self.get_mediainfo(soup, self.quality)
self.filename = set_filename(self, stream, res, audio="AAC2.0")
self.save_path = set_save_path(stream, self.config, title)
self.manifest = self.tmp / "manifest.mpd"
self.key_file = None # not encrypted
self.sub_path = None
if subtitle is not None:
self.clean_subtitles(subtitle, self.filename)
if self.info:
print_info(self, stream, keys=None)
info(f"{stream.name}")
click.echo("")
args, file_path = get_args(self, res)
if not file_path.exists():
try:
subprocess.run(args, check=True)
except:
raise ValueError("Download failed or was interrupted")
else:
info(f"{self.filename} already exist. Skipping download\n")
self.sub_path.unlink() if self.sub_path else None
pass