Files
freevine/utils/utilities.py
stabbedbybrick eba008c104 [core] Cleanup
2023-12-01 09:53:31 +01:00

299 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import re
import datetime
import shutil
import base64
from pathlib import Path
import click
import requests
from unidecode import unidecode
from pywidevine.device import Device, DeviceTypes
def create_wvd(dir: Path) -> Path:
"""
Check for both untouched and renamed RSA keys and identification blobs
Create a new WVD from key pair if available
"""
private_key = None
client_id = None
files = dir.glob("*")
for file in files:
if file.suffix == ".pem" or file.stem == "device_private_key":
private_key = file
if file.suffix == ".bin" or file.stem == "device_client_id_blob":
client_id = file
if not private_key and not client_id:
error("Required key and client ID not found")
exit(1)
device = Device(
type_=DeviceTypes["ANDROID"],
security_level=3,
flags=None,
private_key=private_key.read_bytes(),
client_id=client_id.read_bytes(),
)
out_path = (
dir / f"{device.type.name}_{device.system_id}_l{device.security_level}.wvd"
)
device.dump(out_path)
info("New WVD file successfully created")
return next(dir.glob("*.wvd"), None)
def get_wvd(cwd: Path) -> Path:
"""Get path to WVD file"""
dir = cwd / "utils" / "wvd"
wvd = next(dir.glob("*.wvd"), None)
if not wvd:
info("WVD file is missing. Attempting to create a new one...")
wvd = create_wvd(dir)
return wvd
def info(text: str) -> str:
"""Custom info 'logger' designed to match N_m3u8DL-RE output"""
time = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
stamp = click.style(f"{time}")
info = click.style(f"INFO", fg="green", underline=True)
message = click.style(f" : {text}")
return click.echo(f"{stamp} {info}{message}")
def error(text: str) -> str:
"""Custom error 'logger' designed to match N_m3u8DL-RE output"""
time = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
stamp = click.style(f"{time}")
info = click.style(f"ERROR", fg="red", underline=True)
message = click.style(f" : {text}")
return click.echo(f"{stamp} {info}{message}")
def notification(text: str) -> str:
"""Custom error 'logger' designed to match N_m3u8DL-RE output"""
time = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
stamp = click.style(f"{time}")
info = click.style(f"[!!]", fg="bright_magenta")
message = click.style(f" : {text}")
return click.echo(f"{stamp} {info}{message}")
def is_url(value):
if value is not None:
return True if re.match("^https?://", value, re.IGNORECASE) else False
else:
return False
def is_title_match(string: str, title: re):
if re.match(title, string, re.IGNORECASE):
return True
else:
general_error("Title URL is incorrect. See --help for more information")
def string_cleaning(filename: str) -> str:
filename = unidecode(filename)
filename = filename.replace("&", "and")
filename = re.sub(r"[:;/]", "", filename)
filename = re.sub(r"[\\*!?¿,'\"<>|$#`]", "", filename)
filename = re.sub(rf"[{'.'}]{{2,}}", ".", filename)
filename = re.sub(rf"[{'_'}]{{2,}}", "_", filename)
filename = re.sub(rf"[{' '}]{{2,}}", " ", filename)
return filename
def set_range(episode: str) -> list:
start, end = episode.split("-")
start_season, start_episode = start.split("E")
end_season, end_episode = end.split("E")
start_season = int(start_season[1:])
start_episode = int(start_episode)
end_season = int(end_season[1:])
end_episode = int(end_episode)
return [
f"S{season:02d}E{episode:02d}"
for season in range(start_season, end_season + 1)
for episode in range(start_episode, end_episode + 1)
]
def set_filename(service: object, stream: object, res: str, audio: str):
if service.movie:
filename = service.config["filename"]["movies"].format(
title=stream.title,
year=stream.year or "",
resolution=f"{res}p" or "",
service=stream.service,
audio=audio,
)
else:
filename = service.config["filename"]["series"].format(
title=stream.title,
year=stream.year or "",
season=f"{stream.season:02}" if stream.season > 0 else "",
episode=f"{stream.number:02}" if stream.number > 0 else "",
name=stream.name or "",
resolution=f"{res}p" or "",
service=stream.service,
audio=audio,
)
no_ep = r"(S\d+)E"
no_sea = r"S(E\d+)"
no_num = r"SE"
if stream.number == 0:
filename = re.sub(no_ep, r"\1", filename)
if stream.season == 0:
filename = re.sub(no_sea, r"\1", filename)
if stream.season == 0 and stream.number == 0:
filename = re.sub(no_num, "", filename)
filename = string_cleaning(filename)
return (
filename.replace(" ", ".").replace(".-.", ".")
if filename.count(".") >= 2
else filename
)
def add_subtitles(soup: object, subtitle: str) -> object:
"""Add subtitle stream to manifest"""
adaptation_set = soup.new_tag(
"AdaptationSet",
id="3",
group="3",
contentType="text",
mimeType="text/vtt",
startWithSAP="1",
)
representation = soup.new_tag("Representation", id="English", bandwidth="0")
base_url = soup.new_tag("BaseURL")
base_url.string = f"{subtitle}"
adaptation_set.append(representation)
representation.append(base_url)
period = soup.find("Period")
period.append(adaptation_set)
return soup
def kid_to_pssh(soup: object) -> str:
kid = (
soup.select_one("ContentProtection")
.attrs.get("cenc:default_KID")
.replace("-", "")
)
array_of_bytes = bytearray(b"\x00\x00\x002pssh\x00\x00\x00\x00")
array_of_bytes.extend(bytes.fromhex("edef8ba979d64acea3c827dcd51d21ed"))
array_of_bytes.extend(b"\x00\x00\x00\x12\x12\x10")
array_of_bytes.extend(bytes.fromhex(kid.replace("-", "")))
return base64.b64encode(bytes.fromhex(array_of_bytes.hex())).decode("utf-8")
def construct_pssh(soup: object) -> str:
kid = (
soup.select_one("ContentProtection")
.attrs.get("cenc:default_KID")
.replace("-", "")
)
version = "3870737368"
system_id = "EDEF8BA979D64ACEA3C827DCD51D21ED"
data = "48E3DC959B06"
s = f"000000{version}00000000{system_id}000000181210{kid}{data}"
return base64.b64encode(bytes.fromhex(s)).decode()
def pssh_from_init(path: Path) -> str:
raw = Path(path).read_bytes()
wv = raw.rfind(bytes.fromhex("edef8ba979d64acea3c827dcd51d21ed"))
if wv == -1:
return None
return base64.b64encode(raw[wv - 12 : wv - 12 + raw[wv - 9]]).decode("utf-8")
def set_save_path(stream: object, service: object, title: str) -> Path:
if service.save_dir != "False":
save_path = Path(service.save_dir)
save_path.mkdir(parents=True, exist_ok=True)
else:
downloads = (
Path(service.config["save_dir"]["movies"])
if stream.__class__.__name__ == "Movie"
else Path(service.config["save_dir"]["series"])
)
save_path = downloads.joinpath(title)
save_path.mkdir(parents=True, exist_ok=True)
if (
stream.__class__.__name__ == "Episode"
and service.config["seasons"] == "true"
and stream.season > 0
):
_season = f"Season {stream.season:02d}"
save_path = save_path.joinpath(_season)
save_path.mkdir(parents=True, exist_ok=True)
return save_path
def check_version(local_version: str):
r = requests.get(
"https://api.github.com/repos/stabbedbybrick/freevine/releases/latest"
)
if not r.ok:
return
version = r.json().get("tag_name")
if version:
local_version = int(re.sub(r"[v.]", "", local_version))
latest_version = int(re.sub(r"[v.]", "", version))
if latest_version and local_version < latest_version:
notification(f"New version available! {version}\n")
def general_error(message: str) -> str:
click.echo("\n")
error(f"{message}")
shutil.rmtree("tmp") if Path("tmp").exists() else None
exit(1)
def geo_error(status: int, message: str = None, location: str = None) -> str:
msg = message if message is not None else f"Content unavailable outside {location}"
click.echo("\n")
error(f"<Response [{status}]> {msg}")
shutil.rmtree("tmp") if Path("tmp").exists() else None
exit(1)
def premium_error(status: int) -> str:
click.echo("\n")
error(f"<Response [{status}]> Content requires subscription and is not supported")
shutil.rmtree("tmp") if Path("tmp").exists() else None
exit(1)