Files
freevine/utils/utilities.py
2024-03-13 10:42:42 +01:00

476 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import base64
import datetime
import http.cookiejar
import json
import logging
import re
import shutil
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta # noqa: F811
from pathlib import Path
import click
import m3u8
import requests
from bs4 import BeautifulSoup
from lxml import etree
from pywidevine.device import Device, DeviceTypes
from rich.console import Console
from subby import (
CommonIssuesFixer,
ISMTConverter,
SAMIConverter,
SMPTEConverter,
WebVTTConverter,
WVTTConverter,
)
from unidecode import unidecode
console = Console()
log = logging.getLogger()
def create_wvd(dir: Path) -> Path:
"""
Check for both untouched and renamed RSA keys and identification blobs
Create a new WVD from key pair if available
"""
private_key = None
client_id = None
files = dir.glob("*")
for file in files:
if file.suffix == ".pem" or file.stem == "device_private_key":
private_key = file
if file.suffix == ".bin" or file.stem == "device_client_id_blob":
client_id = file
if not private_key and not client_id:
log.error("Required key and client ID not found")
exit(1)
device = Device(
type_=DeviceTypes["ANDROID"],
security_level=3,
flags=None,
private_key=private_key.read_bytes(),
client_id=client_id.read_bytes(),
)
out_path = (
dir / f"{device.type.name}_{device.system_id}_l{device.security_level}.wvd"
)
device.dump(out_path)
log.info("New WVD file successfully created")
return next(dir.glob("*.wvd"), None)
def get_wvd(cwd: Path) -> Path:
"""Get path to WVD file"""
dir = cwd / "utils" / "wvd"
wvd = next(dir.glob("*.wvd"), None)
if not wvd:
log.info("WVD file is missing. Attempting to create a new one...")
wvd = create_wvd(dir)
return wvd
def info(text: str) -> str:
"""Custom info 'logger' designed to match N_m3u8DL-RE output"""
time = datetime.now().strftime("%H:%M:%S.%f")[:-3]
stamp = click.style(f"{time}")
info = click.style("INFO", fg="green", underline=True)
message = click.style(f" : {text}")
return click.echo(f"{stamp} {info}{message}")
def error(text: str) -> str:
"""Custom error 'logger' designed to match N_m3u8DL-RE output"""
time = datetime.now().strftime("%H:%M:%S.%f")[:-3]
stamp = click.style(f"{time}")
info = click.style("ERROR", fg="red", underline=True)
message = click.style(f" : {text}")
return click.echo(f"{stamp} {info}{message}")
def notification(text: str) -> str:
"""Custom error 'logger' designed to match N_m3u8DL-RE output"""
time = datetime.now().strftime("%H:%M:%S.%f")[:-3]
stamp = click.style(f"{time}")
info = click.style("[!!]", fg="bright_magenta")
message = click.style(f" : {text}")
return click.echo(f"{stamp} {info}{message}")
def is_url(value):
if value is not None:
return True if re.match("^https?://", value, re.IGNORECASE) else False
else:
return False
def is_title_match(string: str, title: re):
return True if re.match(title, string, re.IGNORECASE) else False
def get_binary(*names: str) -> Path:
for name in names:
path = shutil.which(name)
if path:
return Path(path)
return None
def is_path(s):
p = Path(s)
return p.exists() and p.is_file()
def contains_ip_address(input_string):
pattern = r"\b((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b"
return bool(re.search(pattern, input_string))
def get_heights(session: requests.Session, manifest: str) -> tuple:
r = session.get(manifest)
r.raise_for_status()
soup = BeautifulSoup(r.text, "xml")
elements = soup.find_all("Representation")
heights = sorted(
[int(x.attrs["height"]) for x in elements if x.attrs.get("height")],
reverse=True,
)
return heights, soup
def force_numbering(content: list) -> list:
season_episode_counter = {}
for episode in content:
if episode.season not in season_episode_counter:
season_episode_counter[episode.season] = 1
else:
season_episode_counter[episode.season] += 1
episode.number = season_episode_counter[episode.season]
return content
def append_id(content: list) -> list:
for episode in content:
episode.name += f" {[episode.id]}"
return content
def load_cookies(path: Path) -> http.cookiejar.MozillaCookieJar:
cookie_jar = http.cookiejar.MozillaCookieJar(path)
cookie_jar.load()
return cookie_jar
def get_cookie(cookie_jar: http.cookiejar.MozillaCookieJar, name: str) -> dict:
for cookie in cookie_jar:
if cookie.name == name:
return {"value": cookie.value, "expires": cookie.expires}
return None
def in_cache(cache: json, download: object) -> bool:
video = str(download.id)
title = str(download.title)
if video in cache and cache[video].get("title", {}) == title:
log.warning(
f'"{str(download)}" was found in cache, skipping download. Use "--no-cache" to ignore.'
)
return True
else:
return False
def update_cache(cache: json, config: dict, download: object) -> None:
video = str(download.id)
title = str(download.title)
cache[video] = {"title": title}
with config["download_cache"].open("w") as f:
json.dump(cache, f, indent=4)
def string_cleaning(filename: str) -> str:
filename = unidecode(filename)
filename = filename.replace("&", "and")
filename = re.sub(r"[:;/]", "", filename)
filename = re.sub(r"[\\*!?¿,'\"<>|$#`]", "", filename)
filename = re.sub(rf"[{'.'}]{{2,}}", ".", filename)
filename = re.sub(rf"[{'_'}]{{2,}}", "_", filename)
filename = re.sub(rf"[{' '}]{{2,}}", " ", filename)
return filename
def slugify(string: str) -> str:
string = string.lower()
string = re.sub(r"\W+", "-", string)
string = re.sub(r"^-|-$", "", string)
return string
def set_range(episode: str) -> list:
start, end = episode.split("-")
start_season, start_episode = start.split("E")
end_season, end_episode = end.split("E")
start_season = int(start_season[1:])
start_episode = int(start_episode)
end_season = int(end_season[1:])
end_episode = int(end_episode)
return [
f"S{season:02d}E{episode:02d}"
for season in range(start_season, end_season + 1)
for episode in range(start_episode, end_episode + 1)
]
def set_filename(service: object, stream: object, res: str, audio: str):
if service.movie:
filename = service.config["filename"]["movies"].format(
title=stream.title,
year=stream.year or "",
resolution=f"{res}p" or "",
service=stream.service,
audio=audio,
)
else:
filename = service.config["filename"]["series"].format(
title=stream.title,
year=stream.year or "",
season=f"{stream.season:02}" if stream.season > 0 else "",
episode=f"{stream.number:02}" if stream.number > 0 else "",
name=stream.name or "",
resolution=f"{res}p" or "",
service=stream.service,
audio=audio,
)
no_ep = r"(S\d+)E"
no_sea = r"S(E\d+)"
no_num = r"SE"
if stream.number == 0:
filename = re.sub(no_ep, r"\1", filename)
if stream.season == 0:
filename = re.sub(no_sea, r"\1", filename)
if stream.season == 0 and stream.number == 0:
filename = re.sub(no_num, "", filename)
filename = string_cleaning(filename)
return (
filename.replace(" ", ".").replace(".-.", ".")
if filename.count(".") >= 2
else filename
)
def add_subtitles(soup: object, subtitle: str, language: str = None) -> object:
"""Add subtitle stream to manifest"""
lang = language if language else "English"
adaptation_set = soup.new_tag(
"AdaptationSet",
id="3",
group="3",
contentType="text",
mimeType="text/vtt",
startWithSAP="1",
)
representation = soup.new_tag("Representation", id=f"{lang}", bandwidth="0")
base_url = soup.new_tag("BaseURL")
base_url.string = f"{subtitle}"
adaptation_set.append(representation)
representation.append(base_url)
period = soup.find("Period")
period.append(adaptation_set)
return soup
def convert_subtitles(tmp: Path, filename: str, sub_type: str) -> Path:
converters = {
"vtt": WebVTTConverter(),
"ttml": SMPTEConverter(),
"mp4": WVTTConverter(),
"dfxp": ISMTConverter(),
"sami": SAMIConverter(),
}
converter = converters[sub_type]
fixer = CommonIssuesFixer()
file = Path(tmp / f"{filename}.{sub_type}")
srt, _ = fixer.from_srt(converter.from_file(file))
output = Path(tmp / f"{filename}.srt")
srt.save(output)
return output
def from_mpd(mpd_data: str, url: str = None):
root = ET.fromstring(mpd_data)
items = []
for adaptationSet in root.iter("{urn:mpeg:dash:schema:mpd:2011}AdaptationSet"):
for representation in adaptationSet.iter(
"{urn:mpeg:dash:schema:mpd:2011}Representation"
):
if representation.get("mimeType") in ["video/mp4", "audio/mp4"]:
item = {}
if representation.get("id"):
item["id"] = representation.get("id")
if representation.get("codecs"):
item["codecs"] = representation.get("codecs")
if representation.get("height"):
item["height"] = representation.get("height")
if representation.get("bandwidth"):
item["bandwidth"] = int(representation.get("bandwidth"))
items.append(item)
if url is not None:
items.insert(0, {"url": url})
return items
def from_m3u8(m3u8_data: str):
heights = []
codecs = []
m3u8_obj = m3u8.loads(m3u8_data)
for playlist in m3u8_obj.playlists:
if playlist.stream_info.resolution:
heights.append(playlist.stream_info.resolution[1])
if playlist.stream_info.codecs:
codecs.append(playlist.stream_info.codecs)
return heights, codecs
def load_xml(xml):
"""Safely parse XML data to an ElementTree, without namespaces in tags."""
if not isinstance(xml, bytes):
xml = xml.encode("utf8")
root = etree.fromstring(xml)
for elem in root.getiterator():
if not hasattr(elem.tag, "find"):
# e.g. comment elements
continue
elem.tag = etree.QName(elem).localname
for name, value in elem.attrib.items():
local_name = etree.QName(name).localname
if local_name == name:
continue
del elem.attrib[name]
elem.attrib[local_name] = value
etree.cleanup_namespaces(root)
return root
def kid_to_pssh(soup: object) -> str:
kid = (
soup.select_one("ContentProtection")
.attrs.get("cenc:default_KID")
.replace("-", "")
)
array_of_bytes = bytearray(b"\x00\x00\x002pssh\x00\x00\x00\x00")
array_of_bytes.extend(bytes.fromhex("edef8ba979d64acea3c827dcd51d21ed"))
array_of_bytes.extend(b"\x00\x00\x00\x12\x12\x10")
array_of_bytes.extend(bytes.fromhex(kid.replace("-", "")))
return base64.b64encode(bytes.fromhex(array_of_bytes.hex())).decode("utf-8")
def construct_pssh(soup: object) -> str:
kid = (
soup.select_one("ContentProtection")
.attrs.get("cenc:default_KID")
.replace("-", "")
)
version = "3870737368"
system_id = "EDEF8BA979D64ACEA3C827DCD51D21ED"
data = "48E3DC959B06"
s = f"000000{version}00000000{system_id}000000181210{kid}{data}"
return base64.b64encode(bytes.fromhex(s)).decode()
def pssh_from_init(path: Path) -> str:
raw = Path(path).read_bytes()
wv = raw.rfind(bytes.fromhex("edef8ba979d64acea3c827dcd51d21ed"))
if wv == -1:
return None
return base64.b64encode(raw[wv - 12 : wv - 12 + raw[wv - 9]]).decode("utf-8")
def set_save_path(stream: object, service: object, title: str) -> Path:
if service.skip_download:
save_path = service.tmp / service.filename
save_path.mkdir(parents=True, exist_ok=True)
elif service.save_dir != "False":
save_path = Path(service.save_dir)
save_path.mkdir(parents=True, exist_ok=True)
else:
downloads = (
Path(service.config["save_dir"]["movies"])
if stream.__class__.__name__ == "Movie"
else Path(service.config["save_dir"]["series"])
)
save_path = downloads.joinpath(title)
save_path.mkdir(parents=True, exist_ok=True)
if (
stream.__class__.__name__ == "Episode"
and service.config["seasons"] == "true"
and stream.season > 0
):
_season = f"Season {stream.season:02d}"
save_path = save_path.joinpath(_season)
save_path.mkdir(parents=True, exist_ok=True)
return save_path
def expiration(expiry: str = None, issued: str = None) -> str:
"""Simple timestamps only"""
issued_at = datetime.fromtimestamp(int(issued) / 1000)
return issued_at + timedelta(seconds=int(expiry))
def check_version(local_version: str):
r = requests.get(
"https://api.github.com/repos/stabbedbybrick/freevine/releases/latest"
)
if not r.ok:
return
version = r.json().get("tag_name")
if version:
local_version = int(re.sub(r"[v.]", "", local_version))
latest_version = int(re.sub(r"[v.]", "", version))
if latest_version and local_version < latest_version:
notification(
f"{version} available: https://github.com/stabbedbybrick/freevine/releases/latest\n"
)