476 lines
14 KiB
Python
476 lines
14 KiB
Python
import base64
|
||
import datetime
|
||
import http.cookiejar
|
||
import json
|
||
import logging
|
||
import re
|
||
import shutil
|
||
import xml.etree.ElementTree as ET
|
||
from datetime import datetime, timedelta # noqa: F811
|
||
from pathlib import Path
|
||
|
||
import click
|
||
import m3u8
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
from lxml import etree
|
||
from pywidevine.device import Device, DeviceTypes
|
||
from rich.console import Console
|
||
from subby import (
|
||
CommonIssuesFixer,
|
||
ISMTConverter,
|
||
SAMIConverter,
|
||
SMPTEConverter,
|
||
WebVTTConverter,
|
||
WVTTConverter,
|
||
)
|
||
from unidecode import unidecode
|
||
|
||
console = Console()
|
||
log = logging.getLogger()
|
||
|
||
|
||
def create_wvd(dir: Path) -> Path:
|
||
"""
|
||
Check for both untouched and renamed RSA keys and identification blobs
|
||
Create a new WVD from key pair if available
|
||
"""
|
||
private_key = None
|
||
client_id = None
|
||
|
||
files = dir.glob("*")
|
||
for file in files:
|
||
if file.suffix == ".pem" or file.stem == "device_private_key":
|
||
private_key = file
|
||
if file.suffix == ".bin" or file.stem == "device_client_id_blob":
|
||
client_id = file
|
||
|
||
if not private_key and not client_id:
|
||
log.error("Required key and client ID not found")
|
||
exit(1)
|
||
|
||
device = Device(
|
||
type_=DeviceTypes["ANDROID"],
|
||
security_level=3,
|
||
flags=None,
|
||
private_key=private_key.read_bytes(),
|
||
client_id=client_id.read_bytes(),
|
||
)
|
||
|
||
out_path = (
|
||
dir / f"{device.type.name}_{device.system_id}_l{device.security_level}.wvd"
|
||
)
|
||
device.dump(out_path)
|
||
log.info("New WVD file successfully created")
|
||
|
||
return next(dir.glob("*.wvd"), None)
|
||
|
||
|
||
def get_wvd(cwd: Path) -> Path:
|
||
"""Get path to WVD file"""
|
||
|
||
dir = cwd / "utils" / "wvd"
|
||
wvd = next(dir.glob("*.wvd"), None)
|
||
|
||
if not wvd:
|
||
log.info("WVD file is missing. Attempting to create a new one...")
|
||
wvd = create_wvd(dir)
|
||
|
||
return wvd
|
||
|
||
|
||
def info(text: str) -> str:
|
||
"""Custom info 'logger' designed to match N_m3u8DL-RE output"""
|
||
|
||
time = datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
||
stamp = click.style(f"{time}")
|
||
info = click.style("INFO", fg="green", underline=True)
|
||
message = click.style(f" : {text}")
|
||
return click.echo(f"{stamp} {info}{message}")
|
||
|
||
|
||
def error(text: str) -> str:
|
||
"""Custom error 'logger' designed to match N_m3u8DL-RE output"""
|
||
|
||
time = datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
||
stamp = click.style(f"{time}")
|
||
info = click.style("ERROR", fg="red", underline=True)
|
||
message = click.style(f" : {text}")
|
||
return click.echo(f"{stamp} {info}{message}")
|
||
|
||
|
||
def notification(text: str) -> str:
|
||
"""Custom error 'logger' designed to match N_m3u8DL-RE output"""
|
||
|
||
time = datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
||
stamp = click.style(f"{time}")
|
||
info = click.style("[!!]", fg="bright_magenta")
|
||
message = click.style(f" : {text}")
|
||
return click.echo(f"{stamp} {info}{message}")
|
||
|
||
|
||
def is_url(value):
|
||
if value is not None:
|
||
return True if re.match("^https?://", value, re.IGNORECASE) else False
|
||
else:
|
||
return False
|
||
|
||
|
||
def is_title_match(string: str, title: re):
|
||
return True if re.match(title, string, re.IGNORECASE) else False
|
||
|
||
|
||
def get_binary(*names: str) -> Path:
|
||
for name in names:
|
||
path = shutil.which(name)
|
||
if path:
|
||
return Path(path)
|
||
return None
|
||
|
||
|
||
def is_path(s):
|
||
p = Path(s)
|
||
return p.exists() and p.is_file()
|
||
|
||
|
||
def contains_ip_address(input_string):
|
||
pattern = r"\b((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b"
|
||
return bool(re.search(pattern, input_string))
|
||
|
||
|
||
def get_heights(session: requests.Session, manifest: str) -> tuple:
|
||
r = session.get(manifest)
|
||
r.raise_for_status()
|
||
|
||
soup = BeautifulSoup(r.text, "xml")
|
||
elements = soup.find_all("Representation")
|
||
heights = sorted(
|
||
[int(x.attrs["height"]) for x in elements if x.attrs.get("height")],
|
||
reverse=True,
|
||
)
|
||
return heights, soup
|
||
|
||
|
||
def force_numbering(content: list) -> list:
|
||
season_episode_counter = {}
|
||
for episode in content:
|
||
if episode.season not in season_episode_counter:
|
||
season_episode_counter[episode.season] = 1
|
||
else:
|
||
season_episode_counter[episode.season] += 1
|
||
episode.number = season_episode_counter[episode.season]
|
||
|
||
return content
|
||
|
||
|
||
def append_id(content: list) -> list:
|
||
for episode in content:
|
||
episode.name += f" {[episode.id]}"
|
||
|
||
return content
|
||
|
||
|
||
def load_cookies(path: Path) -> http.cookiejar.MozillaCookieJar:
|
||
cookie_jar = http.cookiejar.MozillaCookieJar(path)
|
||
cookie_jar.load()
|
||
return cookie_jar
|
||
|
||
|
||
def get_cookie(cookie_jar: http.cookiejar.MozillaCookieJar, name: str) -> dict:
|
||
for cookie in cookie_jar:
|
||
if cookie.name == name:
|
||
return {"value": cookie.value, "expires": cookie.expires}
|
||
return None
|
||
|
||
|
||
def in_cache(cache: json, download: object) -> bool:
|
||
video = str(download.id)
|
||
title = str(download.title)
|
||
if video in cache and cache[video].get("title", {}) == title:
|
||
log.warning(
|
||
f'"{str(download)}" was found in cache, skipping download. Use "--no-cache" to ignore.'
|
||
)
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
|
||
def update_cache(cache: json, config: dict, download: object) -> None:
|
||
video = str(download.id)
|
||
title = str(download.title)
|
||
cache[video] = {"title": title}
|
||
|
||
with config["download_cache"].open("w") as f:
|
||
json.dump(cache, f, indent=4)
|
||
|
||
|
||
def string_cleaning(filename: str) -> str:
|
||
filename = unidecode(filename)
|
||
filename = filename.replace("&", "and")
|
||
filename = re.sub(r"[:;/]", "", filename)
|
||
filename = re.sub(r"[\\*!?¿,'\"<>|$#`’]", "", filename)
|
||
filename = re.sub(rf"[{'.'}]{{2,}}", ".", filename)
|
||
filename = re.sub(rf"[{'_'}]{{2,}}", "_", filename)
|
||
filename = re.sub(rf"[{' '}]{{2,}}", " ", filename)
|
||
return filename
|
||
|
||
|
||
def slugify(string: str) -> str:
|
||
string = string.lower()
|
||
string = re.sub(r"\W+", "-", string)
|
||
string = re.sub(r"^-|-$", "", string)
|
||
return string
|
||
|
||
|
||
def set_range(episode: str) -> list:
|
||
start, end = episode.split("-")
|
||
start_season, start_episode = start.split("E")
|
||
end_season, end_episode = end.split("E")
|
||
|
||
start_season = int(start_season[1:])
|
||
start_episode = int(start_episode)
|
||
end_season = int(end_season[1:])
|
||
end_episode = int(end_episode)
|
||
|
||
return [
|
||
f"S{season:02d}E{episode:02d}"
|
||
for season in range(start_season, end_season + 1)
|
||
for episode in range(start_episode, end_episode + 1)
|
||
]
|
||
|
||
|
||
def set_filename(service: object, stream: object, res: str, audio: str):
|
||
if service.movie:
|
||
filename = service.config["filename"]["movies"].format(
|
||
title=stream.title,
|
||
year=stream.year or "",
|
||
resolution=f"{res}p" or "",
|
||
service=stream.service,
|
||
audio=audio,
|
||
)
|
||
else:
|
||
filename = service.config["filename"]["series"].format(
|
||
title=stream.title,
|
||
year=stream.year or "",
|
||
season=f"{stream.season:02}" if stream.season > 0 else "",
|
||
episode=f"{stream.number:02}" if stream.number > 0 else "",
|
||
name=stream.name or "",
|
||
resolution=f"{res}p" or "",
|
||
service=stream.service,
|
||
audio=audio,
|
||
)
|
||
|
||
no_ep = r"(S\d+)E"
|
||
no_sea = r"S(E\d+)"
|
||
no_num = r"SE"
|
||
if stream.number == 0:
|
||
filename = re.sub(no_ep, r"\1", filename)
|
||
if stream.season == 0:
|
||
filename = re.sub(no_sea, r"\1", filename)
|
||
if stream.season == 0 and stream.number == 0:
|
||
filename = re.sub(no_num, "", filename)
|
||
|
||
filename = string_cleaning(filename)
|
||
return (
|
||
filename.replace(" ", ".").replace(".-.", ".")
|
||
if filename.count(".") >= 2
|
||
else filename
|
||
)
|
||
|
||
|
||
def add_subtitles(soup: object, subtitle: str, language: str = None) -> object:
|
||
"""Add subtitle stream to manifest"""
|
||
|
||
lang = language if language else "English"
|
||
|
||
adaptation_set = soup.new_tag(
|
||
"AdaptationSet",
|
||
id="3",
|
||
group="3",
|
||
contentType="text",
|
||
mimeType="text/vtt",
|
||
startWithSAP="1",
|
||
)
|
||
representation = soup.new_tag("Representation", id=f"{lang}", bandwidth="0")
|
||
base_url = soup.new_tag("BaseURL")
|
||
base_url.string = f"{subtitle}"
|
||
|
||
adaptation_set.append(representation)
|
||
representation.append(base_url)
|
||
|
||
period = soup.find("Period")
|
||
period.append(adaptation_set)
|
||
|
||
return soup
|
||
|
||
|
||
def convert_subtitles(tmp: Path, filename: str, sub_type: str) -> Path:
|
||
converters = {
|
||
"vtt": WebVTTConverter(),
|
||
"ttml": SMPTEConverter(),
|
||
"mp4": WVTTConverter(),
|
||
"dfxp": ISMTConverter(),
|
||
"sami": SAMIConverter(),
|
||
}
|
||
|
||
converter = converters[sub_type]
|
||
fixer = CommonIssuesFixer()
|
||
|
||
file = Path(tmp / f"{filename}.{sub_type}")
|
||
srt, _ = fixer.from_srt(converter.from_file(file))
|
||
output = Path(tmp / f"{filename}.srt")
|
||
srt.save(output)
|
||
return output
|
||
|
||
|
||
def from_mpd(mpd_data: str, url: str = None):
|
||
root = ET.fromstring(mpd_data)
|
||
items = []
|
||
|
||
for adaptationSet in root.iter("{urn:mpeg:dash:schema:mpd:2011}AdaptationSet"):
|
||
for representation in adaptationSet.iter(
|
||
"{urn:mpeg:dash:schema:mpd:2011}Representation"
|
||
):
|
||
if representation.get("mimeType") in ["video/mp4", "audio/mp4"]:
|
||
item = {}
|
||
if representation.get("id"):
|
||
item["id"] = representation.get("id")
|
||
if representation.get("codecs"):
|
||
item["codecs"] = representation.get("codecs")
|
||
if representation.get("height"):
|
||
item["height"] = representation.get("height")
|
||
if representation.get("bandwidth"):
|
||
item["bandwidth"] = int(representation.get("bandwidth"))
|
||
items.append(item)
|
||
|
||
if url is not None:
|
||
items.insert(0, {"url": url})
|
||
|
||
return items
|
||
|
||
|
||
def from_m3u8(m3u8_data: str):
|
||
heights = []
|
||
codecs = []
|
||
|
||
m3u8_obj = m3u8.loads(m3u8_data)
|
||
|
||
for playlist in m3u8_obj.playlists:
|
||
if playlist.stream_info.resolution:
|
||
heights.append(playlist.stream_info.resolution[1])
|
||
if playlist.stream_info.codecs:
|
||
codecs.append(playlist.stream_info.codecs)
|
||
|
||
return heights, codecs
|
||
|
||
|
||
def load_xml(xml):
|
||
"""Safely parse XML data to an ElementTree, without namespaces in tags."""
|
||
if not isinstance(xml, bytes):
|
||
xml = xml.encode("utf8")
|
||
root = etree.fromstring(xml)
|
||
for elem in root.getiterator():
|
||
if not hasattr(elem.tag, "find"):
|
||
# e.g. comment elements
|
||
continue
|
||
elem.tag = etree.QName(elem).localname
|
||
for name, value in elem.attrib.items():
|
||
local_name = etree.QName(name).localname
|
||
if local_name == name:
|
||
continue
|
||
del elem.attrib[name]
|
||
elem.attrib[local_name] = value
|
||
etree.cleanup_namespaces(root)
|
||
return root
|
||
|
||
|
||
def kid_to_pssh(soup: object) -> str:
|
||
kid = (
|
||
soup.select_one("ContentProtection")
|
||
.attrs.get("cenc:default_KID")
|
||
.replace("-", "")
|
||
)
|
||
|
||
array_of_bytes = bytearray(b"\x00\x00\x002pssh\x00\x00\x00\x00")
|
||
array_of_bytes.extend(bytes.fromhex("edef8ba979d64acea3c827dcd51d21ed"))
|
||
array_of_bytes.extend(b"\x00\x00\x00\x12\x12\x10")
|
||
array_of_bytes.extend(bytes.fromhex(kid.replace("-", "")))
|
||
return base64.b64encode(bytes.fromhex(array_of_bytes.hex())).decode("utf-8")
|
||
|
||
|
||
def construct_pssh(soup: object) -> str:
|
||
kid = (
|
||
soup.select_one("ContentProtection")
|
||
.attrs.get("cenc:default_KID")
|
||
.replace("-", "")
|
||
)
|
||
version = "3870737368"
|
||
system_id = "EDEF8BA979D64ACEA3C827DCD51D21ED"
|
||
data = "48E3DC959B06"
|
||
s = f"000000{version}00000000{system_id}000000181210{kid}{data}"
|
||
return base64.b64encode(bytes.fromhex(s)).decode()
|
||
|
||
|
||
def pssh_from_init(path: Path) -> str:
|
||
raw = Path(path).read_bytes()
|
||
wv = raw.rfind(bytes.fromhex("edef8ba979d64acea3c827dcd51d21ed"))
|
||
if wv == -1:
|
||
return None
|
||
return base64.b64encode(raw[wv - 12 : wv - 12 + raw[wv - 9]]).decode("utf-8")
|
||
|
||
|
||
def set_save_path(stream: object, service: object, title: str) -> Path:
|
||
if service.skip_download:
|
||
save_path = service.tmp / service.filename
|
||
save_path.mkdir(parents=True, exist_ok=True)
|
||
|
||
elif service.save_dir != "False":
|
||
save_path = Path(service.save_dir)
|
||
save_path.mkdir(parents=True, exist_ok=True)
|
||
|
||
else:
|
||
downloads = (
|
||
Path(service.config["save_dir"]["movies"])
|
||
if stream.__class__.__name__ == "Movie"
|
||
else Path(service.config["save_dir"]["series"])
|
||
)
|
||
|
||
save_path = downloads.joinpath(title)
|
||
save_path.mkdir(parents=True, exist_ok=True)
|
||
|
||
if (
|
||
stream.__class__.__name__ == "Episode"
|
||
and service.config["seasons"] == "true"
|
||
and stream.season > 0
|
||
):
|
||
_season = f"Season {stream.season:02d}"
|
||
save_path = save_path.joinpath(_season)
|
||
save_path.mkdir(parents=True, exist_ok=True)
|
||
|
||
return save_path
|
||
|
||
|
||
def expiration(expiry: str = None, issued: str = None) -> str:
|
||
"""Simple timestamps only"""
|
||
issued_at = datetime.fromtimestamp(int(issued) / 1000)
|
||
return issued_at + timedelta(seconds=int(expiry))
|
||
|
||
|
||
def check_version(local_version: str):
|
||
r = requests.get(
|
||
"https://api.github.com/repos/stabbedbybrick/freevine/releases/latest"
|
||
)
|
||
if not r.ok:
|
||
return
|
||
|
||
version = r.json().get("tag_name")
|
||
|
||
if version:
|
||
local_version = int(re.sub(r"[v.]", "", local_version))
|
||
latest_version = int(re.sub(r"[v.]", "", version))
|
||
|
||
if latest_version and local_version < latest_version:
|
||
notification(
|
||
f"{version} available: https://github.com/stabbedbybrick/freevine/releases/latest\n"
|
||
)
|