diff --git a/.env.example b/.env.example index 5c6d927..7c416dc 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,6 @@ [DEFAULT] -FLASK_DEBUG = # True or False -FLASK_ENV = # Development or Production -FLASK_APP = # app.py -FLASK_RUN_HOST = # LOCAL IP -FLASK_RUN_PORT = # LOCAL PORT \ No newline at end of file +FLASK_DEBUG = True +FLASK_ENV = Development +FLASK_APP = app.py +FLASK_RUN_HOST = 0.0.0.0 +FLASK_RUN_PORT = 1337 \ No newline at end of file diff --git a/config.ini.example b/config.ini.example index 9983c41..eda22f8 100644 --- a/config.ini.example +++ b/config.ini.example @@ -1,2 +1,3 @@ -[PLAYREADY] -DEVICE = device/example.prd # your device filename \ No newline at end of file +[CDM] +DEVICE_FILE = device/ # DEVICE FILENAME +DEVICE_NAME = # DEVICE NAME \ No newline at end of file diff --git a/modules/banners.py b/modules/banners.py index 68abcba..3802a32 100644 --- a/modules/banners.py +++ b/modules/banners.py @@ -10,12 +10,13 @@ def clear_terminal(): def banners(): clear_terminal() banners = """ - ██████╗ ██████╗ ██████╗ ███████╗ ██████╗██████╗ ██╗ ██╗██████╗ ████████╗ ██████╗ ██████╗ - ██╔══██╗██╔══██╗ ██╔══██╗██╔════╝██╔════╝██╔══██╗╚██╗ ██╔╝██╔══██╗╚══██╔══╝██╔═══██╗██╔══██╗ - ██████╔╝██████╔╝█████╗██║ ██║█████╗ ██║ ██████╔╝ ╚████╔╝ ██████╔╝ ██║ ██║ ██║██████╔╝ - ██╔═══╝ ██╔══██╗╚════╝██║ ██║██╔══╝ ██║ ██╔══██╗ ╚██╔╝ ██╔═══╝ ██║ ██║ ██║██╔══██╗ - ██║ ██║ ██║ ██████╔╝███████╗╚██████╗██║ ██║ ██║ ██║ ██║ ╚██████╔╝██║ ██║ - ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚══════╝ ╚═════╝╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝ + ██████╗ ██╗ █████╗ ██╗ ██╗██████╗ ███████╗ █████╗ ██████╗ ██╗ ██╗ █████╗ ██████╗ ██╗ + ██╔══██╗██║ ██╔══██╗╚██╗ ██╔╝██╔══██╗██╔════╝██╔══██╗██╔══██╗╚██╗ ██╔╝ ██╔══██╗██╔══██╗██║ + ██████╔╝██║ ███████║ ╚████╔╝ ██████╔╝█████╗ ███████║██║ ██║ ╚████╔╝█████╗███████║██████╔╝██║ + ██╔═══╝ ██║ ██╔══██║ ╚██╔╝ ██╔══██╗██╔══╝ ██╔══██║██║ ██║ ╚██╔╝ ╚════╝██╔══██║██╔═══╝ ██║ + ██║ ███████╗██║ ██║ ██║ ██║ ██║███████╗██║ ██║██████╔╝ ██║ ██║ ██║██║ ██║ + ╚═╝ ╚══════╝╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝╚═════╝ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝ + Author: GITHUB.COM/THATNOTEASY """ # Split the banner into lines diff --git a/modules/config.py b/modules/config.py index 7908dfa..0767e9a 100644 --- a/modules/config.py +++ b/modules/config.py @@ -4,6 +4,7 @@ from modules.logging import setup_logging from flask import request, jsonify API_KEY_FILE = 'APIKEY.json' +logging = setup_logging() def setup_config(): config = configparser.ConfigParser() @@ -15,10 +16,10 @@ def load_api_keys(): with open(API_KEY_FILE, 'r') as file: return json.load(file) except FileNotFoundError: - logger.error("APIKEY.json file not found.") + logging.error("APIKEY.json file not found.") return [] except json.JSONDecodeError: - logger.error("Error decoding APIKEY.json.") + logging.error("Error decoding APIKEY.json.") return [] def save_api_keys(api_keys): @@ -57,9 +58,11 @@ def apikey_required(func): provided_key = request.headers.get('X-API-KEY') if not provided_key: logging.error("X-API-KEY header is missing.") - return jsonify({"responseData": "Opss! API key is missing"}), 403 + response_data = {"message": "Opss! X-API-KEY is missing."} + return jsonify({"responseData": response_data}), 403 if not is_valid_api_key(provided_key): logging.error("Invalid X-API-KEY.") - return jsonify({"responseData": "Opss! Invalid API key"}), 403 + response_data = {"message": "Opss! Invalid APIKEY :P"} + return jsonify({"responseData": response_data}), 403 return func(*args, **kwargs) return decorated_function \ No newline at end of file diff --git a/modules/playready.py b/modules/playready.py index b415a6a..7a8c2b3 100644 --- a/modules/playready.py +++ b/modules/playready.py @@ -4,67 +4,167 @@ import xml.etree.ElementTree as ET import json from flask import jsonify +from pathlib import Path from modules.logging import setup_logging from modules.config import setup_config -from modules.pyplayready.pssh import PSSH -from modules.pyplayready.device import Device -from modules.pyplayready.cdm import Cdm +from pyplayready.system.pssh import PSSH +from pyplayready.device import Device +from pyplayready.cdm import Cdm +from pyplayready.exceptions import InvalidSession, InvalidLicense class PLAYREADY: - def __init__(self): + _instance = None # Singleton + + def __new__(cls): + if cls._instance is None: + cls._instance = super(PLAYREADY, cls).__new__(cls) + cls._instance._initialize() + return cls._instance + + def _initialize(self): self.pssh = None - self.session_id = None self.challenge = None self.license = None + self.session_id = None self.config = setup_config() - self.device = self.config["PLAYREADY"]["DEVICE"] self.logging = setup_logging() + self.device_path = self.config["CDM"]["DEVICE_FILE"] + self.device_name = self.config["CDM"]["DEVICE_NAME"] + self.device = Device.load(Path(self.device_path)) + self.store_session = {} - def get_license_challenge(self): - device = Device.load(self.device) - cdm = Cdm.from_device(device) - pssh = PSSH(self.pssh) - wrm_headers = pssh.get_wrm_headers(downgrade_to_v4=False) - raw_challenge = cdm.get_license_challenge(wrm_headers[0]) - if isinstance(raw_challenge, str): - raw_challenge = raw_challenge.encode('utf-8') +# ============================================================================================================================== # - challenge_b64 = base64.b64encode(raw_challenge).decode('utf-8') - return challenge_b64 + def open_devices(self): + if self.device_name in self.store_session: + existing_session_id = self.store_session[self.device_name]["session_id"] + self.logging.info(f"Existing session for {self.device_name}: {existing_session_id}") + return jsonify({ + "responseData": { + "session_id": existing_session_id, + "device_name": self.device_name, + "security_level": self.store_session[self.device_name]["cdm"].security_level + } + }), 200 + + cdm = Cdm.from_device(self.device) + session_id = cdm.open().hex() + self.store_session[self.device_name] = {"cdm": cdm, "session_id": session_id} + self.logging.info(f"CDM Session Opened: {session_id}") + return jsonify({ + "responseData": { + "session_id": session_id, + "device_name": self.device_name, + "security_level": cdm.security_level + } + }), 200 + +# ============================================================================================================================== # + + def close_devices(self, session_id): + session_id_str = session_id.decode() if isinstance(session_id, bytes) else session_id + if self.device_name not in self.store_session: + self.logging.error(f"No active session for device {self.device_name}.") + return jsonify({"responseData": {"message": "No active session for this device."}}), 400 + + stored_session = self.store_session[self.device_name] + if session_id_str != stored_session["session_id"]: + self.logging.error(f"Session mismatch: Expected {stored_session['session_id']}, got {session_id_str}") + return jsonify({"responseData": {"message": "Invalid Session ID :P"}}), 404 + + try: + stored_session["cdm"].close(bytes.fromhex(session_id_str)) + self.logging.info(f"CDM Session Closed: {session_id_str}") + + del self.store_session[self.device_name] + return jsonify({"responseData": {"message": f"Session {session_id_str} closed successfully."}}), 200 + + except InvalidSession: + self.logging.error(f"Invalid session ID: {session_id_str}") + return jsonify({"responseData": {"message": "Invalid Session ID, it may have expired."}}), 400 + + except Exception as e: + self.logging.error(f"Error closing session: {str(e)}") + return jsonify({"responseData": {"message": "Unexpected error while closing session."}}), 500 + +# ============================================================================================================================== # + + def get_challenges(self, device): + session_entry = self.store_session.get(device) + if not session_entry or "cdm" not in session_entry: + return jsonify({"responseData": {"message": f"No Cdm session for {device} has been opened yet. No session to use."}}), 400 + + cdm = session_entry["cdm"] + + try: + session_id = bytes.fromhex(self.session_id) + except ValueError: + return jsonify({"responseData": {"message": "Invalid session_id format."}}), 400 + + if not self.pssh.startswith(" Certificate: - basic_info = Container( - cert_id=cert_id, - security_level=security_level, - flags=0, - cert_type=2, - public_key_digest=signing_key.public_sha256_digest(), - expiration_date=expiry, - client_id=client_id - ) - basic_info_attribute = Container( - flags=1, - tag=1, - length=len(_BCertStructs.DrmBCertBasicInfo.build(basic_info)) + 8, - attribute=basic_info - ) - - device_info = Container( - max_license=max_license, - max_header=max_header, - max_chain_depth=max_chain_depth - ) - device_info_attribute = Container( - flags=1, - tag=4, - length=len(_BCertStructs.DrmBCertDeviceInfo.build(device_info)) + 8, - attribute=device_info - ) - - feature = Container( - feature_count=3, - features=ListContainer([ - # 1, # Transmitter - # 2, # Receiver - # 3, # SharedCertificate - 4, # SecureClock - # 5, # AntiRollBackClock - # 6, # ReservedMetering - # 7, # ReservedLicSync - # 8, # ReservedSymOpt - 9, # CRLS (Revocation Lists) - # 10, # ServerBasicEdition - # 11, # ServerStandardEdition - # 12, # ServerPremiumEdition - 13, # PlayReady3Features - # 14, # DeprecatedSecureStop - ]) - ) - feature_attribute = Container( - flags=1, - tag=5, - length=len(_BCertStructs.DrmBCertFeatureInfo.build(feature)) + 8, - attribute=feature - ) - - cert_key_sign = Container( - type=1, - length=512, # bits - flags=0, - key=signing_key.public_bytes(), - usages_count=1, - usages=ListContainer([ - 1 # KEYUSAGE_SIGN - ]) - ) - cert_key_encrypt = Container( - type=1, - length=512, # bits - flags=0, - key=encryption_key.public_bytes(), - usages_count=1, - usages=ListContainer([ - 2 # KEYUSAGE_ENCRYPT_KEY - ]) - ) - key_info = Container( - key_count=2, - cert_keys=ListContainer([ - cert_key_sign, - cert_key_encrypt - ]) - ) - key_info_attribute = Container( - flags=1, - tag=6, - length=len(_BCertStructs.DrmBCertKeyInfo.build(key_info)) + 8, - attribute=key_info - ) - - manufacturer_info = parent.get_certificate(0).get_attribute(7) - - new_bcert_container = Container( - signature=b"CERT", - version=1, - total_length=0, # filled at a later time - certificate_length=0, # filled at a later time - attributes=ListContainer([ - basic_info_attribute, - device_info_attribute, - feature_attribute, - key_info_attribute, - manufacturer_info, - ]) - ) - - payload = _BCertStructs.BCert.build(new_bcert_container) - new_bcert_container.certificate_length = len(payload) - new_bcert_container.total_length = len(payload) + 144 # signature length - - sign_payload = _BCertStructs.BCert.build(new_bcert_container) - signature = Crypto.ecc256_sign(group_key, sign_payload) - - signature_info = Container( - signature_type=1, - signature_size=64, - signature=signature, - signature_key_size=512, # bits - signature_key=group_key.public_bytes() - ) - signature_info_attribute = Container( - flags=1, - tag=8, - length=len(_BCertStructs.DrmBCertSignatureInfo.build(signature_info)) + 8, - attribute=signature_info - ) - new_bcert_container.attributes.append(signature_info_attribute) - - return cls(new_bcert_container) - - @classmethod - def loads(cls, data: Union[str, bytes]) -> Certificate: - if isinstance(data, str): - data = base64.b64decode(data) - if not isinstance(data, bytes): - raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}") - - cert = _BCertStructs.BCert - return cls( - parsed_bcert=cert.parse(data), - bcert_obj=cert - ) - - @classmethod - def load(cls, path: Union[Path, str]) -> Certificate: - if not isinstance(path, (Path, str)): - raise ValueError(f"Expecting Path object or path string, got {path!r}") - with Path(path).open(mode="rb") as f: - return cls.loads(f.read()) - - def get_attribute(self, type_: int): - for attribute in self.parsed.attributes: - if attribute.tag == type_: - return attribute - - def get_security_level(self) -> int: - basic_info_attribute = self.get_attribute(1).attribute - if basic_info_attribute: - return basic_info_attribute.security_level - - @staticmethod - def _unpad(name: bytes): - return name.rstrip(b'\x00').decode("utf-8", errors="ignore") - - def get_name(self): - manufacturer_info = self.get_attribute(7).attribute - if manufacturer_info: - return f"{self._unpad(manufacturer_info.manufacturer_name)} {self._unpad(manufacturer_info.model_name)} {self._unpad(manufacturer_info.model_number)}" - - def dumps(self) -> bytes: - return self._BCERT.build(self.parsed) - - def struct(self) -> _BCertStructs.BCert: - return self._BCERT - - def verify_signature(self): - signature_object = self.get_attribute(8) - signature_attribute = signature_object.attribute - - sign_payload = self.dumps()[:-signature_object.length] - - raw_signature_key = signature_attribute.signature_key - signature_key = ECC.construct( - curve='P-256', - point_x=int.from_bytes(raw_signature_key[:32], 'big'), - point_y=int.from_bytes(raw_signature_key[32:], 'big') - ) - - return Crypto.ecc256_verify( - public_key=signature_key, - data=sign_payload, - signature=signature_attribute.signature - ) - - -class CertificateChain(_BCertStructs): - """Represents a BCertChain""" - - def __init__( - self, - parsed_bcert_chain: Container, - bcert_chain_obj: _BCertStructs.BCertChain = _BCertStructs.BCertChain - ): - self.parsed = parsed_bcert_chain - self._BCERT_CHAIN = bcert_chain_obj - - @classmethod - def loads(cls, data: Union[str, bytes]) -> CertificateChain: - if isinstance(data, str): - data = base64.b64decode(data) - if not isinstance(data, bytes): - raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}") - - cert_chain = _BCertStructs.BCertChain - return cls( - parsed_bcert_chain=cert_chain.parse(data), - bcert_chain_obj=cert_chain - ) - - @classmethod - def load(cls, path: Union[Path, str]) -> CertificateChain: - if not isinstance(path, (Path, str)): - raise ValueError(f"Expecting Path object or path string, got {path!r}") - with Path(path).open(mode="rb") as f: - return cls.loads(f.read()) - - def dumps(self) -> bytes: - return self._BCERT_CHAIN.build(self.parsed) - - def struct(self) -> _BCertStructs.BCertChain: - return self._BCERT_CHAIN - - def get_certificate(self, index: int) -> Certificate: - return Certificate(self.parsed.certificates[index]) - - def get_security_level(self) -> int: - # not sure if there's a better way than this - return self.get_certificate(0).get_security_level() - - def get_name(self) -> str: - return self.get_certificate(0).get_name() - - def append(self, bcert: Certificate) -> None: - self.parsed.certificate_count += 1 - self.parsed.certificates.append(bcert.parsed) - self.parsed.total_length += len(bcert.dumps()) - - def prepend(self, bcert: Certificate) -> None: - self.parsed.certificate_count += 1 - self.parsed.certificates.insert(0, bcert.parsed) - self.parsed.total_length += len(bcert.dumps()) - - def remove(self, index: int) -> None: - if self.parsed.certificate_count <= 0: - raise InvalidCertificateChain("CertificateChain does not contain any Certificates") - if index >= self.parsed.certificate_count: - raise IndexError(f"No Certificate at index {index}, {self.parsed.certificate_count} total") - - self.parsed.certificate_count -= 1 - bcert = Certificate(self.parsed.certificates[index]) - self.parsed.total_length -= len(bcert.dumps()) - self.parsed.certificates.pop(index) - - def get(self, index: int) -> Certificate: - if self.parsed.certificate_count <= 0: - raise InvalidCertificateChain("CertificateChain does not contain any Certificates") - if index >= self.parsed.certificate_count: - raise IndexError(f"No Certificate at index {index}, {self.parsed.certificate_count} total") - - return Certificate(self.parsed.certificates[index]) - - def count(self) -> int: - return self.parsed.certificate_count \ No newline at end of file diff --git a/modules/pyplayready/cdm.py b/modules/pyplayready/cdm.py deleted file mode 100644 index 6bfea68..0000000 --- a/modules/pyplayready/cdm.py +++ /dev/null @@ -1,218 +0,0 @@ -from __future__ import annotations - -import base64 -import math -import time -from typing import List -from uuid import UUID -import xml.etree.ElementTree as ET - -from Crypto.Cipher import AES -from Crypto.Hash import SHA256 -from Crypto.Random import get_random_bytes -from Crypto.Signature import DSS -from Crypto.Util.Padding import pad -from ecpy.curves import Point, Curve - -from modules.pyplayready.bcert import CertificateChain -from modules.pyplayready.crypto.ecc_key import ECCKey -from modules.pyplayready.key import Key -from modules.pyplayready.xml_key import XmlKey -from modules.pyplayready.crypto.elgamal import ElGamal -from modules.pyplayready.xmrlicense import XMRLicense - - -class Cdm: - def __init__( - self, - security_level: int, - certificate_chain: CertificateChain, - encryption_key: ECCKey, - signing_key: ECCKey, - client_version: str = "10.0.16384.10011", - protocol_version: int = 1 - ): - self.security_level = security_level - self.certificate_chain = certificate_chain - self.encryption_key = encryption_key - self.signing_key = signing_key - self.client_version = client_version - self.protocol_version = protocol_version - - self.curve = Curve.get_curve("secp256r1") - self.elgamal = ElGamal(self.curve) - - self._wmrm_key = Point( - x=0xc8b6af16ee941aadaa5389b4af2c10e356be42af175ef3face93254e7b0b3d9b, - y=0x982b27b5cb2341326e56aa857dbfd5c634ce2cf9ea74fca8f2af5957efeea562, - curve=self.curve - ) - self._xml_key = XmlKey() - - self._keys: List[Key] = [] - - @classmethod - def from_device(cls, device) -> Cdm: - """Initialize a Playready CDM from a Playready Device (.prd) file""" - return cls( - security_level=device.security_level, - certificate_chain=device.group_certificate, - encryption_key=device.encryption_key, - signing_key=device.signing_key - ) - - def get_key_data(self) -> bytes: - point1, point2 = self.elgamal.encrypt( - message_point=self._xml_key.get_point(self.elgamal.curve), - public_key=self._wmrm_key - ) - return self.elgamal.to_bytes(point1.x) + self.elgamal.to_bytes(point1.y) + self.elgamal.to_bytes(point2.x) + self.elgamal.to_bytes(point2.y) - - def get_cipher_data(self) -> bytes: - b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode() - body = f"{b64_chain}" - - cipher = AES.new( - key=self._xml_key.aes_key, - mode=AES.MODE_CBC, - iv=self._xml_key.aes_iv - ) - - ciphertext = cipher.encrypt(pad( - body.encode(), - AES.block_size - )) - - return self._xml_key.aes_iv + ciphertext - - def _build_digest_content( - self, - content_header: str, - nonce: str, - wmrm_cipher: str, - cert_cipher: str - ) -> str: - return ( - '' - f'{self.protocol_version}' - f'{content_header}' - '' - f'{self.client_version}' - '' - f'{nonce}' - f'{math.floor(time.time())}' - '' - '' - '' - '' - '' - '' - 'WMRMServer' - '' - '' - f'{wmrm_cipher}' - '' - '' - '' - '' - f'{cert_cipher}' - '' - '' - '' - ) - - @staticmethod - def _build_signed_info(digest_value: str) -> str: - return ( - '' - '' - '' - '' - '' - f'{digest_value}' - '' - '' - ) - - def get_license_challenge(self, content_header: str) -> str: - la_content = self._build_digest_content( - content_header=content_header, - nonce=base64.b64encode(get_random_bytes(16)).decode(), - wmrm_cipher=base64.b64encode(self.get_key_data()).decode(), - cert_cipher=base64.b64encode(self.get_cipher_data()).decode() - ) - - la_hash_obj = SHA256.new() - la_hash_obj.update(la_content.encode()) - la_hash = la_hash_obj.digest() - - signed_info = self._build_signed_info(base64.b64encode(la_hash).decode()) - signed_info_digest = SHA256.new(signed_info.encode()) - - signer = DSS.new(self.signing_key.key, 'fips-186-3') - signature = signer.sign(signed_info_digest) - - # haven't found a better way to do this. xmltodict.unparse doesn't work - main_body = ( - '' - '' - '' - '' - '' - '' - + la_content + - '' - + signed_info + - f'{base64.b64encode(signature).decode()}' - '' - '' - '' - f'{base64.b64encode(self.signing_key.public_bytes()).decode()}' - '' - '' - '' - '' - '' - '' - '' - '' - '' - ) - - return main_body - - def _decrypt_ecc256_key(self, encrypted_key: bytes) -> bytes: - point1 = Point( - x=int.from_bytes(encrypted_key[:32], 'big'), - y=int.from_bytes(encrypted_key[32:64], 'big'), - curve=self.curve - ) - point2 = Point( - x=int.from_bytes(encrypted_key[64:96], 'big'), - y=int.from_bytes(encrypted_key[96:128], 'big'), - curve=self.curve - ) - - decrypted = self.elgamal.decrypt((point1, point2), int(self.encryption_key.key.d)) - return self.elgamal.to_bytes(decrypted.x)[16:32] - - def parse_license(self, licence: str) -> None: - try: - root = ET.fromstring(licence) - license_elements = root.findall(".//{http://schemas.microsoft.com/DRM/2007/03/protocols}License") - for license_element in license_elements: - parsed_licence = XMRLicense.loads(license_element.text) - for key in parsed_licence.get_content_keys(): - if Key.CipherType(key.cipher_type) == Key.CipherType.ECC_256: - self._keys.append(Key( - key_id=UUID(bytes_le=key.key_id), - key_type=key.key_type, - cipher_type=key.cipher_type, - key_length=key.key_length, - key=self._decrypt_ecc256_key(key.encrypted_key) - )) - except Exception as e: - raise Exception(f"Unable to parse license, {e}") - - def get_keys(self) -> List[Key]: - return self._keys \ No newline at end of file diff --git a/modules/pyplayready/crypto/ecc_key.py b/modules/pyplayready/crypto/ecc_key.py deleted file mode 100644 index ac366b9..0000000 --- a/modules/pyplayready/crypto/ecc_key.py +++ /dev/null @@ -1,95 +0,0 @@ -from __future__ import annotations - -import base64 -from pathlib import Path -from typing import Union - -from Crypto.Hash import SHA256 -from Crypto.PublicKey import ECC -from Crypto.PublicKey.ECC import EccKey -from ecpy.curves import Curve, Point - - -class ECCKey: - """Represents a PlayReady ECC key pair""" - - def __init__(self, key: EccKey): - self.key = key - - @classmethod - def generate(cls): - """Generate a new ECC key pair""" - return cls(key=ECC.generate(curve='P-256')) - - @classmethod - def construct(cls, private_key: Union[bytes, int]): - """Construct an ECC key pair from private/public bytes/ints""" - if isinstance(private_key, bytes): - private_key = int.from_bytes(private_key, 'big') - if not isinstance(private_key, int): - raise ValueError(f"Expecting Bytes or Int input, got {private_key!r}") - - # The public is always derived from the private key; loading the other stuff won't work - key = ECC.construct( - curve='P-256', - d=private_key, - ) - - return cls(key=key) - - @classmethod - def loads(cls, data: Union[str, bytes]) -> ECCKey: - if isinstance(data, str): - data = base64.b64decode(data) - if not isinstance(data, bytes): - raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}") - - if len(data) not in [96, 32]: - raise ValueError(f"Invalid data length. Expecting 96 or 32 bytes, got {len(data)}") - - return cls.construct(private_key=data[:32]) - - @classmethod - def load(cls, path: Union[Path, str]) -> ECCKey: - if not isinstance(path, (Path, str)): - raise ValueError(f"Expecting Path object or path string, got {path!r}") - with Path(path).open(mode="rb") as f: - return cls.loads(f.read()) - - def dumps(self, private_only=False): - if private_only: - return self.private_bytes() - return self.private_bytes() + self.public_bytes() - - def dump(self, path: Union[Path, str], private_only=False) -> None: - if not isinstance(path, (Path, str)): - raise ValueError(f"Expecting Path object or path string, got {path!r}") - path = Path(path) - path.parent.mkdir(parents=True, exist_ok=True) - path.write_bytes(self.dumps(private_only)) - - @staticmethod - def _to_bytes(n: int) -> bytes: - byte_len = (n.bit_length() + 7) // 8 - if byte_len % 2 != 0: - byte_len += 1 - return n.to_bytes(byte_len, 'big') - - def get_point(self, curve: Curve) -> Point: - return Point(self.key.pointQ.x, self.key.pointQ.y, curve) - - def private_bytes(self) -> bytes: - return self._to_bytes(int(self.key.d)) - - def private_sha256_digest(self) -> bytes: - hash_object = SHA256.new() - hash_object.update(self.private_bytes()) - return hash_object.digest() - - def public_bytes(self) -> bytes: - return self._to_bytes(int(self.key.pointQ.x)) + self._to_bytes(int(self.key.pointQ.y)) - - def public_sha256_digest(self) -> bytes: - hash_object = SHA256.new() - hash_object.update(self.public_bytes()) - return hash_object.digest() \ No newline at end of file diff --git a/modules/pyplayready/crypto/elgamal.py b/modules/pyplayready/crypto/elgamal.py deleted file mode 100644 index 9409561..0000000 --- a/modules/pyplayready/crypto/elgamal.py +++ /dev/null @@ -1,36 +0,0 @@ -from typing import Tuple - -from ecpy.curves import Curve, Point -import secrets - - -class ElGamal: - def __init__(self, curve: Curve): - self.curve = curve - - @staticmethod - def to_bytes(n: int) -> bytes: - byte_len = (n.bit_length() + 7) // 8 - if byte_len % 2 != 0: - byte_len += 1 - return n.to_bytes(byte_len, 'big') - - def encrypt( - self, - message_point: Point, - public_key: Point - ) -> Tuple[Point, Point]: - ephemeral_key = secrets.randbelow(self.curve.order) - point1 = ephemeral_key * self.curve.generator - point2 = message_point + (ephemeral_key * public_key) - return point1, point2 - - @staticmethod - def decrypt( - encrypted: Tuple[Point, Point], - private_key: int - ) -> Point: - point1, point2 = encrypted - shared_secret = private_key * point1 - decrypted_message = point2 - shared_secret - return decrypted_message \ No newline at end of file diff --git a/modules/pyplayready/device.py b/modules/pyplayready/device.py deleted file mode 100644 index e0c1a0d..0000000 --- a/modules/pyplayready/device.py +++ /dev/null @@ -1,136 +0,0 @@ -from __future__ import annotations - -import base64 -from enum import IntEnum -from pathlib import Path -from typing import Union, Any - -from modules.pyplayready.bcert import CertificateChain -from modules.pyplayready.crypto.ecc_key import ECCKey - -from construct import Struct, Const, Int8ub, Bytes, this, Int32ub - - -class DeviceStructs: - magic = Const(b"PRD") - - header = Struct( - "signature" / magic, - "version" / Int8ub, - ) - - # was never in production - v1 = Struct( - "signature" / magic, - "version" / Int8ub, - "group_key_length" / Int32ub, - "group_key" / Bytes(this.group_key_length), - "group_certificate_length" / Int32ub, - "group_certificate" / Bytes(this.group_certificate_length) - ) - - v2 = Struct( - "signature" / magic, - "version" / Int8ub, - "group_certificate_length" / Int32ub, - "group_certificate" / Bytes(this.group_certificate_length), - "encryption_key" / Bytes(96), - "signing_key" / Bytes(96), - ) - - v3 = Struct( - "signature" / magic, - "version" / Int8ub, - "group_key" / Bytes(96), - "encryption_key" / Bytes(96), - "signing_key" / Bytes(96), - "group_certificate_length" / Int32ub, - "group_certificate" / Bytes(this.group_certificate_length), - ) - -class Device: - """Represents a PlayReady Device (.prd)""" - CURRENT_STRUCT = DeviceStructs.v3 - CURRENT_VERSION = 3 - - class SecurityLevel(IntEnum): - SL150 = 150 - SL2000 = 2000 - SL3000 = 3000 - - def __init__( - self, - *_: Any, - group_key: Union[str, bytes, None], - encryption_key: Union[str, bytes], - signing_key: Union[str, bytes], - group_certificate: Union[str, bytes], - **__: Any - ): - if isinstance(group_key, str): - group_key = base64.b64decode(group_key) - - if isinstance(encryption_key, str): - encryption_key = base64.b64decode(encryption_key) - if not isinstance(encryption_key, bytes): - raise ValueError(f"Expecting Bytes or Base64 input, got {encryption_key!r}") - - if isinstance(signing_key, str): - signing_key = base64.b64decode(signing_key) - if not isinstance(signing_key, bytes): - raise ValueError(f"Expecting Bytes or Base64 input, got {signing_key!r}") - - if isinstance(group_certificate, str): - group_certificate = base64.b64decode(group_certificate) - if not isinstance(group_certificate, bytes): - raise ValueError(f"Expecting Bytes or Base64 input, got {group_certificate!r}") - - self.group_key = None if group_key is None else ECCKey.loads(group_key) - self.encryption_key = ECCKey.loads(encryption_key) - self.signing_key = ECCKey.loads(signing_key) - self.group_certificate = CertificateChain.loads(group_certificate) - self.security_level = self.group_certificate.get_security_level() - - @classmethod - def loads(cls, data: Union[str, bytes]) -> Device: - if isinstance(data, str): - data = base64.b64decode(data) - if not isinstance(data, bytes): - raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}") - - prd_header = DeviceStructs.header.parse(data) - if prd_header.version == 2: - return cls( - group_key=None, - **DeviceStructs.v2.parse(data) - ) - - return cls(**cls.CURRENT_STRUCT.parse(data)) - - @classmethod - def load(cls, path: Union[Path, str]) -> Device: - if not isinstance(path, (Path, str)): - raise ValueError(f"Expecting Path object or path string, got {path!r}") - with Path(path).open(mode="rb") as f: - return cls.loads(f.read()) - - def dumps(self) -> bytes: - return self.CURRENT_STRUCT.build(dict( - version=self.CURRENT_VERSION, - group_key=self.group_key.dumps(), - encryption_key=self.encryption_key.dumps(), - signing_key=self.signing_key.dumps(), - group_certificate_length=len(self.group_certificate.dumps()), - group_certificate=self.group_certificate.dumps(), - )) - - def dump(self, path: Union[Path, str]) -> None: - if not isinstance(path, (Path, str)): - raise ValueError(f"Expecting Path object or path string, got {path!r}") - path = Path(path) - path.parent.mkdir(parents=True, exist_ok=True) - path.write_bytes(self.dumps()) - - def get_name(self) -> str: - name = f"{self.group_certificate.get_name()}_sl{self.group_certificate.get_security_level()}" - return ''.join(char for char in name if (char.isalnum() or char in '_- ')).strip().lower().replace(" ", "_") \ No newline at end of file diff --git a/modules/pyplayready/exceptions.py b/modules/pyplayready/exceptions.py deleted file mode 100644 index 68dc942..0000000 --- a/modules/pyplayready/exceptions.py +++ /dev/null @@ -1,26 +0,0 @@ -class PyPlayreadyException(Exception): - """Exceptions used by pyplayready.""" - - -class InvalidPssh(PyPlayreadyException): - """The Playready PSSH is invalid or empty.""" - - -class InvalidInitData(PyPlayreadyException): - """The Playready Cenc Header Data is invalid or empty.""" - - -class DeviceMismatch(PyPlayreadyException): - """The Remote CDMs Device information and the APIs Device information did not match.""" - - -class InvalidLicense(PyPlayreadyException): - """Unable to parse XMR License.""" - - -class InvalidCertificateChain(PyPlayreadyException): - """The BCert is not correctly formatted.""" - - -class OutdatedDevice(PyPlayreadyException): - """The PlayReady Device is outdated and does not support a specific operation.""" \ No newline at end of file diff --git a/modules/pyplayready/key.py b/modules/pyplayready/key.py deleted file mode 100644 index b9e34af..0000000 --- a/modules/pyplayready/key.py +++ /dev/null @@ -1,68 +0,0 @@ -import base64 -from enum import Enum -from uuid import UUID -from typing import Union - - -class Key: - class KeyType(Enum): - INVALID = 0x0000 - AES_128_CTR = 0x0001 - RC4_CIPHER = 0x0002 - AES_128_ECB = 0x0003 - COCKTAIL = 0x0004 - AES_128_CBC = 0x0005 - KEYEXCHANGE = 0x0006 - UNKNOWN = 0xffff - - @classmethod - def _missing_(cls, value): - return cls.UNKNOWN - - class CipherType(Enum): - INVALID = 0x0000 - RSA_1024 = 0x0001 - CHAINED_LICENSE = 0x0002 - ECC_256 = 0x0003 - ECC_256_WITH_KZ = 0x0004 - TEE_TRANSIENT = 0x0005 - ECC_256_VIA_SYMMETRIC = 0x0006 - UNKNOWN = 0xffff - - @classmethod - def _missing_(cls, value): - return cls.UNKNOWN - - def __init__( - self, - key_id: UUID, - key_type: int, - cipher_type: int, - key_length: int, - key: bytes - ): - self.key_id = key_id - self.key_type = self.KeyType(key_type) - self.cipher_type = self.CipherType(cipher_type) - self.key_length = key_length - self.key = key - - @staticmethod - def kid_to_uuid(kid: Union[str, bytes]) -> UUID: - """ - Convert a Key ID from a string or bytes to a UUID object. - At first, this may seem very simple, but some types of Key IDs - may not be 16 bytes and some may be decimal vs. hex. - """ - if isinstance(kid, str): - kid = base64.b64decode(kid) - if not kid: - kid = b"\x00" * 16 - - if kid.decode(errors="replace").isdigit(): - return UUID(int=int(kid.decode())) - - if len(kid) < 16: - kid += b"\x00" * (16 - len(kid)) - - return UUID(bytes=kid) \ No newline at end of file diff --git a/modules/pyplayready/main.py b/modules/pyplayready/main.py deleted file mode 100644 index f0cbab0..0000000 --- a/modules/pyplayready/main.py +++ /dev/null @@ -1,311 +0,0 @@ -import logging -from datetime import datetime -from pathlib import Path -from typing import Optional - -import click -import requests -from Crypto.Random import get_random_bytes - -from modules.pyplayready.system.bcert import CertificateChain, Certificate -from modules.pyplayready.cdm import Cdm -from modules.pyplayready.device import Device -from modules.pyplayready.crypto.ecc_key import ECCKey -from modules.pyplayready.exceptions import OutdatedDevice -from modules.pyplayready.system.pssh import PSSH - -@click.group(invoke_without_command=True) -@click.option("-v", "--version", is_flag=True, default=False, help="Print version information.") -@click.option("-d", "--debug", is_flag=True, default=False, help="Enable DEBUG level logs.") -def main(version: bool, debug: bool) -> None: - """Python PlayReady CDM implementation""" - logging.basicConfig(level=logging.DEBUG if debug else logging.INFO) - log = logging.getLogger() - - current_year = datetime.now().year - copyright_years = f"2024-{current_year}" - - log.info("pyplayready version %s Copyright (c) %s DevLARLEY, Erevoc, DevataDev", __version__, copyright_years) - log.info("https://github.com/ready-dl/pyplayready") - log.info("Run 'pyplayready --help' for help") - if version: - return - - -@main.command(name="license") -@click.argument("device_path", type=Path) -@click.argument("pssh", type=PSSH) -@click.argument("server", type=str) -def license_(device_path: Path, pssh: PSSH, server: str) -> None: - """ - Make a License Request to a server using a given PSSH - Will return a list of all keys within the returned license - - Only works for standard license servers that don't use any license wrapping - """ - log = logging.getLogger("license") - - device = Device.load(device_path) - log.info(f"Loaded Device: {device.get_name()}") - - cdm = Cdm.from_device(device) - log.info("Loaded CDM") - - session_id = cdm.open() - log.info("Opened Session") - - challenge = cdm.get_license_challenge(session_id, pssh.get_wrm_headers(downgrade_to_v4=True)[0]) - log.info("Created License Request (Challenge)") - log.debug(challenge) - - license_res = requests.post( - url=server, - headers={ - 'Content-Type': 'text/xml; charset=UTF-8', - }, - data=challenge - ) - - if license_res.status_code != 200: - log.error(f"Failed to send challenge: [{license_res.status_code}] {license_res.text}") - return - - licence = license_res.text - log.info("Got License Message") - log.debug(licence) - - cdm.parse_license(session_id, licence) - log.info("License Parsed Successfully") - - for key in cdm.get_keys(session_id): - log.info(f"{key.key_id.hex}:{key.key.hex()}") - - cdm.close(session_id) - log.info("Clossed Session") - - -@main.command() -@click.argument("device", type=Path) -@click.pass_context -def test(ctx: click.Context, device: Path) -> None: - """ - Test the CDM code by getting Content Keys for the Tears Of Steel demo on the Playready Test Server. - https://testweb.playready.microsoft.com/Content/Content2X - + DASH Manifest URL: https://test.playready.microsoft.com/media/profficialsite/tearsofsteel_4k.ism/manifest.mpd - + MSS Manifest URL: https://test.playready.microsoft.com/media/profficialsite/tearsofsteel_4k.ism.smoothstreaming/manifest - - The device argument is a Path to a Playready Device (.prd) file which contains the device's group key and - group certificate. - """ - pssh = PSSH( - "AAADfHBzc2gAAAAAmgTweZhAQoarkuZb4IhflQAAA1xcAwAAAQABAFIDPABXAFIATQBIAEUAQQBEAEUAUgAgAHgAbQBsAG4AcwA9ACIAaAB0AH" - "QAcAA6AC8ALwBzAGMAaABlAG0AYQBzAC4AbQBpAGMAcgBvAHMAbwBmAHQALgBjAG8AbQAvAEQAUgBNAC8AMgAwADAANwAvADAAMwAvAFAAbABh" - "AHkAUgBlAGEAZAB5AEgAZQBhAGQAZQByACIAIAB2AGUAcgBzAGkAbwBuAD0AIgA0AC4AMAAuADAALgAwACIAPgA8AEQAQQBUAEEAPgA8AFAAUg" - "BPAFQARQBDAFQASQBOAEYATwA+ADwASwBFAFkATABFAE4APgAxADYAPAAvAEsARQBZAEwARQBOAD4APABBAEwARwBJAEQAPgBBAEUAUwBDAFQA" - "UgA8AC8AQQBMAEcASQBEAD4APAAvAFAAUgBPAFQARQBDAFQASQBOAEYATwA+ADwASwBJAEQAPgA0AFIAcABsAGIAKwBUAGIATgBFAFMAOAB0AE" - "cAawBOAEYAVwBUAEUASABBAD0APQA8AC8ASwBJAEQAPgA8AEMASABFAEMASwBTAFUATQA+AEsATABqADMAUQB6AFEAUAAvAE4AQQA9ADwALwBD" - "AEgARQBDAEsAUwBVAE0APgA8AEwAQQBfAFUAUgBMAD4AaAB0AHQAcABzADoALwAvAHAAcgBvAGYAZgBpAGMAaQBhAGwAcwBpAHQAZQAuAGsAZQ" - "B5AGQAZQBsAGkAdgBlAHIAeQAuAG0AZQBkAGkAYQBzAGUAcgB2AGkAYwBlAHMALgB3AGkAbgBkAG8AdwBzAC4AbgBlAHQALwBQAGwAYQB5AFIA" - "ZQBhAGQAeQAvADwALwBMAEEAXwBVAFIATAA+ADwAQwBVAFMAVABPAE0AQQBUAFQAUgBJAEIAVQBUAEUAUwA+ADwASQBJAFMAXwBEAFIATQBfAF" - "YARQBSAFMASQBPAE4APgA4AC4AMQAuADIAMwAwADQALgAzADEAPAAvAEkASQBTAF8ARABSAE0AXwBWAEUAUgBTAEkATwBOAD4APAAvAEMAVQBT" - "AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA==" - ) - - license_server = "https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:2000)" - - ctx.invoke( - license_, - device_path=device, - pssh=pssh, - server=license_server - ) - - -@main.command() -@click.option("-k", "--group_key", type=Path, required=True, help="Device ECC private group key") -@click.option("-c", "--group_certificate", type=Path, required=True, help="Device group certificate chain") -@click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory") -@click.pass_context -def create_device( - ctx: click.Context, - group_key: Path, - group_certificate: Path, - output: Optional[Path] = None -) -> None: - """Create a Playready Device (.prd) file from an ECC private group key and group certificate chain""" - if not group_key.is_file(): - raise click.UsageError("group_key: Not a path to a file, or it doesn't exist.", ctx) - if not group_certificate.is_file(): - raise click.UsageError("group_certificate: Not a path to a file, or it doesn't exist.", ctx) - - log = logging.getLogger("create-device") - - encryption_key = ECCKey.generate() - signing_key = ECCKey.generate() - - group_key = ECCKey.load(group_key) - certificate_chain = CertificateChain.load(group_certificate) - - new_certificate = Certificate.new_leaf_cert( - cert_id=get_random_bytes(16), - security_level=certificate_chain.get_security_level(), - client_id=get_random_bytes(16), - signing_key=signing_key, - encryption_key=encryption_key, - group_key=group_key, - parent=certificate_chain - ) - certificate_chain.prepend(new_certificate) - - device = Device( - group_key=group_key.dumps(), - encryption_key=encryption_key.dumps(), - signing_key=signing_key.dumps(), - group_certificate=certificate_chain.dumps(), - ) - - if output and output.suffix: - if output.suffix.lower() != ".prd": - log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.") - out_path = output - else: - out_dir = output or Path.cwd() - out_path = out_dir / f"{device.get_name()}.prd" - - if out_path.exists(): - log.error(f"A file already exists at the path '{out_path}', cannot overwrite.") - return - - out_path.parent.mkdir(parents=True, exist_ok=True) - out_path.write_bytes(device.dumps()) - - log.info("Created Playready Device (.prd) file, %s", out_path.name) - log.info(" + Security Level: %s", device.security_level) - log.info(" + Group Key: %s bytes", len(device.group_key.dumps())) - log.info(" + Encryption Key: %s bytes", len(device.encryption_key.dumps())) - log.info(" + Signing Key: %s bytes", len(device.signing_key.dumps())) - log.info(" + Group Certificate: %s bytes", len(device.group_certificate.dumps())) - log.info(" + Saved to: %s", out_path.absolute()) - - -@main.command() -@click.argument("prd_path", type=Path) -@click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory") -@click.pass_context -def reprovision_device(ctx: click.Context, prd_path: Path, output: Optional[Path] = None) -> None: - """ - Reprovision a Playready Device (.prd) by creating a new leaf certificate and new encryption/signing keys. - Will override the device if an output path or directory is not specified - - Only works on PRD Devices of v3 or higher - """ - if not prd_path.is_file(): - raise click.UsageError("prd_path: Not a path to a file, or it doesn't exist.", ctx) - - log = logging.getLogger("reprovision-device") - log.info("Reprovisioning Playready Device (.prd) file, %s", prd_path.name) - - device = Device.load(prd_path) - - if device.group_key is None: - raise OutdatedDevice("Device does not support reprovisioning, re-create it or use a Device with a version of 3 or higher") - - device.group_certificate.remove(0) - - encryption_key = ECCKey.generate() - signing_key = ECCKey.generate() - - device.encryption_key = encryption_key - device.signing_key = signing_key - - new_certificate = Certificate.new_leaf_cert( - cert_id=get_random_bytes(16), - security_level=device.group_certificate.get_security_level(), - client_id=get_random_bytes(16), - signing_key=signing_key, - encryption_key=encryption_key, - group_key=device.group_key, - parent=device.group_certificate - ) - device.group_certificate.prepend(new_certificate) - - if output and output.suffix: - if output.suffix.lower() != ".prd": - log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.") - out_path = output - else: - out_path = prd_path - - out_path.parent.mkdir(parents=True, exist_ok=True) - out_path.write_bytes(device.dumps()) - - log.info("Reprovisioned Playready Device (.prd) file, %s", out_path.name) - - -@main.command() -@click.argument("prd_path", type=Path) -@click.option("-o", "--out_dir", type=Path, default=None, help="Output Directory") -@click.pass_context -def export_device(ctx: click.Context, prd_path: Path, out_dir: Optional[Path] = None) -> None: - """ - Export a Playready Device (.prd) file to a Group Key and Group Certificate - If an output directory is not specified, it will be stored in the current working directory - """ - if not prd_path.is_file(): - raise click.UsageError("prd_path: Not a path to a file, or it doesn't exist.", ctx) - - log = logging.getLogger("export-device") - log.info("Exporting Playready Device (.prd) file, %s", prd_path.stem) - - if not out_dir: - out_dir = Path.cwd() - - out_path = out_dir / prd_path.stem - if out_path.exists(): - if any(out_path.iterdir()): - log.error("Output directory is not empty, cannot overwrite.") - return - else: - log.warning("Output directory already exists, but is empty.") - else: - out_path.mkdir(parents=True) - - device = Device.load(prd_path) - - log.info(f"SL{device.security_level} {device.get_name()}") - log.info(f"Saving to: {out_path}") - - if device.group_key: - group_key_path = out_path / "zgpriv.dat" - group_key_path.write_bytes(device.group_key.dumps(private_only=True)) - log.info("Exported Group Key as zgpriv.dat") - else: - log.warning("Cannot export zgpriv.dat, as v2 devices do not save the group key") - - # remove leaf cert to unprovision it - device.group_certificate.remove(0) - - client_id_path = out_path / "bgroupcert.dat" - client_id_path.write_bytes(device.group_certificate.dumps()) - log.info("Exported Group Certificate to bgroupcert.dat") - - -@main.command("serve", short_help="Serve your local CDM and Playready Devices Remotely.") -@click.argument("config_path", type=Path) -@click.option("-h", "--host", type=str, default="127.0.0.1", help="Host to serve from.") -@click.option("-p", "--port", type=int, default=7723, help="Port to serve from.") -def serve_(config_path: Path, host: str, port: int) -> None: - """ - Serve your local CDM and Playready Devices Remotely. - - [CONFIG] is a path to a serve config file. - See `serve.example.yml` for an example config file. - - Host as 127.0.0.1 may block remote access even if port-forwarded. - Instead, use 0.0.0.0 and ensure the TCP port you choose is forwarded. - """ - from pyplayready.remote import serve - import yaml - - config = yaml.safe_load(config_path.read_text(encoding="utf8")) - serve.run(config, host, port) \ No newline at end of file diff --git a/modules/pyplayready/pssh.py b/modules/pyplayready/pssh.py deleted file mode 100644 index d68bfc3..0000000 --- a/modules/pyplayready/pssh.py +++ /dev/null @@ -1,98 +0,0 @@ -import base64 -from typing import Union, List -from uuid import UUID - -from construct import Struct, Int32ul, Int16ul, Array, this, Bytes, Switch, Int32ub, Const, Container, ConstructError - -from modules.pyplayready.exceptions import InvalidPssh -from modules.pyplayready.wrmheader import WRMHeader - - -class _PlayreadyPSSHStructs: - PSSHBox = Struct( - "length" / Int32ub, - "pssh" / Const(b"pssh"), - "fullbox" / Int32ub, - "system_id" / Bytes(16), - "data_length" / Int32ub, - "data" / Bytes(this.data_length) - ) - - PlayreadyObject = Struct( - "type" / Int16ul, - "length" / Int16ul, - "data" / Switch( - this.type, - { - 1: Bytes(this.length) - }, - default=Bytes(this.length) - ) - ) - - PlayreadyHeader = Struct( - "length" / Int32ul, - "record_count" / Int16ul, - "records" / Array(this.record_count, PlayreadyObject) - ) - - -class PSSH(_PlayreadyPSSHStructs): - """Represents a PlayReady PSSH""" - - SYSTEM_ID = UUID(hex="9a04f07998404286ab92e65be0885f95") - - def __init__(self, data: Union[str, bytes]): - """Load a PSSH Box, PlayReady Header or PlayReady Object""" - - if not data: - raise InvalidPssh("Data must not be empty") - - if isinstance(data, str): - try: - data = base64.b64decode(data) - except Exception as e: - raise InvalidPssh(f"Could not decode data as Base64, {e}") - - self.wrm_headers: List[WRMHeader] - try: - # PSSH Box -> PlayReady Header - box = self.PSSHBox.parse(data) - prh = self.PlayreadyHeader.parse(box.data) - self.wrm_headers = self._read_playready_objects(prh) - except ConstructError: - if int.from_bytes(data[:2], byteorder="little") > 3: - try: - # PlayReady Header - prh = self.PlayreadyHeader.parse(data) - self.wrm_headers = self._read_playready_objects(prh) - except ConstructError: - raise InvalidPssh("Could not parse data as a PSSH Box nor a PlayReady Header") - else: - try: - # PlayReady Object - pro = self.PlayreadyObject.parse(data) - self.wrm_headers = [WRMHeader(pro.data)] - except ConstructError: - raise InvalidPssh("Could not parse data as a PSSH Box nor a PlayReady Object") - - @staticmethod - def _read_playready_objects(header: Container) -> List[WRMHeader]: - return list(map( - lambda pro: WRMHeader(pro.data), - filter( - lambda pro: pro.type == 1, - header.records - ) - )) - - def get_wrm_headers(self, downgrade_to_v4: bool = False) -> List[str]: - """ - Return a list of all WRM Headers in the PSSH as plaintext strings - - downgrade_to_v4: Downgrade the WRM Header to version 4.0.0.0 to use AES-CBC instead of AES-CTR - """ - return list(map( - lambda wrm_header: wrm_header.to_v4_0_0_0() if downgrade_to_v4 else wrm_header.dumps(), - self.wrm_headers - )) \ No newline at end of file diff --git a/modules/pyplayready/wrmheader.py b/modules/pyplayready/wrmheader.py deleted file mode 100644 index 5626d7c..0000000 --- a/modules/pyplayready/wrmheader.py +++ /dev/null @@ -1,201 +0,0 @@ -import base64 -import binascii -from enum import Enum -from typing import Optional, List, Union, Tuple - -import xmltodict - - -class WRMHeader: - """Represents a PlayReady WRM Header""" - - class SignedKeyID: - def __init__( - self, - alg_id: str, - value: str, - checksum: str - ): - self.alg_id = alg_id - self.value = value - self.checksum = checksum - - def __repr__(self): - return f'SignedKeyID(alg_id={self.alg_id}, value="{self.value}", checksum="{self.checksum}")' - - class Version(Enum): - VERSION_4_0_0_0 = "4.0.0.0" - VERSION_4_1_0_0 = "4.1.0.0" - VERSION_4_2_0_0 = "4.2.0.0" - VERSION_4_3_0_0 = "4.3.0.0" - UNKNOWN = "UNKNOWN" - - @classmethod - def _missing_(cls, value): - return cls.UNKNOWN - - _RETURN_STRUCTURE = Tuple[List[SignedKeyID], Union[str, None], Union[str, None], Union[str, None]] - - def __init__(self, data: Union[str, bytes]): - """Load a WRM Header from either a string, base64 encoded data or bytes""" - - if not data: - raise ValueError("Data must not be empty") - - if isinstance(data, str): - try: - data = base64.b64decode(data).decode() - except (binascii.Error, binascii.Incomplete): - data = data.encode() - - self._raw_data: bytes = data - self._parsed = xmltodict.parse(self._raw_data) - - self._header = self._parsed.get('WRMHEADER') - if not self._header: - raise ValueError("Data is not a valid WRMHEADER") - - self.version = self.Version(self._header.get('@version')) - - @staticmethod - def _ensure_list(element: Union[dict, list]) -> List: - if isinstance(element, dict): - return [element] - return element - - def to_v4_0_0_0(self) -> str: - """ - Build a v4.0.0.0 WRM header from any possible WRM Header version - - Note: Will ignore any remaining Key IDs if there's more than just one - """ - return self._build_v4_0_0_0_wrm_header(*self.read_attributes()) - - @staticmethod - def _read_v4_0_0_0(data: dict) -> _RETURN_STRUCTURE: - protect_info = data.get("PROTECTINFO") - - return ( - [WRMHeader.SignedKeyID( - alg_id=protect_info["ALGID"], - value=data["KID"], - checksum=data.get("CHECKSUM") - )], - data.get("LA_URL"), - data.get("LUI_URL"), - data.get("DS_ID") - ) - - @staticmethod - def _read_v4_1_0_0(data: dict) -> _RETURN_STRUCTURE: - protect_info = data.get("PROTECTINFO") - - key_ids = [] - if protect_info: - kid = protect_info["KID"] - if kid: - key_ids = [WRMHeader.SignedKeyID( - alg_id=kid["@ALGID"], - value=kid["@VALUE"], - checksum=kid.get("@CHECKSUM") - )] - - return ( - key_ids, - data.get("LA_URL"), - data.get("LUI_URL"), - data.get("DS_ID") - ) - - @staticmethod - def _read_v4_2_0_0(data: dict) -> _RETURN_STRUCTURE: - protect_info = data.get("PROTECTINFO") - - key_ids = [] - if protect_info: - kids = protect_info["KIDS"] - if kids: - for kid in WRMHeader._ensure_list(kids["KID"]): - key_ids.append(WRMHeader.SignedKeyID( - alg_id=kid["@ALGID"], - value=kid["@VALUE"], - checksum=kid.get("@CHECKSUM") - )) - - return ( - key_ids, - data.get("LA_URL"), - data.get("LUI_URL"), - data.get("DS_ID") - ) - - @staticmethod - def _read_v4_3_0_0(data: dict) -> _RETURN_STRUCTURE: - protect_info = data.get("PROTECTINFO") - - key_ids = [] - if protect_info: - kids = protect_info["KIDS"] - for kid in WRMHeader._ensure_list(kids["KID"]): - key_ids.append(WRMHeader.SignedKeyID( - alg_id=kid.get("@ALGID"), - value=kid["@VALUE"], - checksum=kid.get("@CHECKSUM") - )) - - return ( - key_ids, - data.get("LA_URL"), - data.get("LUI_URL"), - data.get("DS_ID") - ) - - def read_attributes(self) -> _RETURN_STRUCTURE: - """ - Read any non-custom XML attributes - - Returns a tuple structured like this: Tuple[List[SignedKeyID], , , ] - """ - - data = self._header.get("DATA") - if not data: - raise ValueError("Not a valid PlayReady Header Record, WRMHEADER/DATA required") - - if self.version == self.Version.VERSION_4_0_0_0: - return self._read_v4_0_0_0(data) - elif self.version == self.Version.VERSION_4_1_0_0: - return self._read_v4_1_0_0(data) - elif self.version == self.Version.VERSION_4_2_0_0: - return self._read_v4_2_0_0(data) - elif self.version == self.Version.VERSION_4_3_0_0: - return self._read_v4_3_0_0(data) - - @staticmethod - def _build_v4_0_0_0_wrm_header( - key_ids: List[SignedKeyID], - la_url: Optional[str], - lui_url: Optional[str], - ds_id: Optional[str] - ) -> str: - if len(key_ids) == 0: - raise Exception("No Key IDs available") - - key_id = key_ids[0] - return ( - '' - '' - '' - '16' - 'AESCTR' - '' - f'{key_id.value}' + - (f'{la_url}' if la_url else '') + - (f'{lui_url}' if lui_url else '') + - (f'{ds_id}' if ds_id else '') + - (f'{key_id.checksum}' if key_id.checksum else '') + - '' - '' - ) - - def dumps(self) -> str: - return self._raw_data.decode("utf-16-le") \ No newline at end of file diff --git a/modules/pyplayready/xml_key.py b/modules/pyplayready/xml_key.py deleted file mode 100644 index 424f1d6..0000000 --- a/modules/pyplayready/xml_key.py +++ /dev/null @@ -1,18 +0,0 @@ -from ecpy.curves import Point, Curve - -from modules.pyplayready.crypto.ecc_key import ECCKey -from modules.pyplayready.crypto.elgamal import ElGamal - - -class XmlKey: - def __init__(self): - self._shared_point = ECCKey.generate() - self.shared_key_x = self._shared_point.key.pointQ.x - self.shared_key_y = self._shared_point.key.pointQ.y - - self._shared_key_x_bytes = ElGamal.to_bytes(int(self.shared_key_x)) - self.aes_iv = self._shared_key_x_bytes[:16] - self.aes_key = self._shared_key_x_bytes[16:] - - def get_point(self, curve: Curve) -> Point: - return Point(self.shared_key_x, self.shared_key_y, curve) \ No newline at end of file diff --git a/modules/pyplayready/xmrlicense.py b/modules/pyplayready/xmrlicense.py deleted file mode 100644 index 13edab0..0000000 --- a/modules/pyplayready/xmrlicense.py +++ /dev/null @@ -1,252 +0,0 @@ -from __future__ import annotations - -import base64 -from pathlib import Path -from typing import Union - -from construct import Const, GreedyRange, Struct, Int32ub, Bytes, Int16ub, this, Switch, LazyBound, Array, Container - - -class _XMRLicenseStructs: - PlayEnablerType = Struct( - "player_enabler_type" / Bytes(16) - ) - - DomainRestrictionObject = Struct( - "account_id" / Bytes(16), - "revision" / Int32ub - ) - - IssueDateObject = Struct( - "issue_date" / Int32ub - ) - - RevInfoVersionObject = Struct( - "sequence" / Int32ub - ) - - SecurityLevelObject = Struct( - "minimum_security_level" / Int16ub - ) - - EmbeddedLicenseSettingsObject = Struct( - "indicator" / Int16ub - ) - - ECCKeyObject = Struct( - "curve_type" / Int16ub, - "key_length" / Int16ub, - "key" / Bytes(this.key_length) - ) - - SignatureObject = Struct( - "signature_type" / Int16ub, - "signature_data_length" / Int16ub, - "signature_data" / Bytes(this.signature_data_length) - ) - - ContentKeyObject = Struct( - "key_id" / Bytes(16), - "key_type" / Int16ub, - "cipher_type" / Int16ub, - "key_length" / Int16ub, - "encrypted_key" / Bytes(this.key_length) - ) - - RightsSettingsObject = Struct( - "rights" / Int16ub - ) - - OutputProtectionLevelRestrictionObject = Struct( - "minimum_compressed_digital_video_opl" / Int16ub, - "minimum_uncompressed_digital_video_opl" / Int16ub, - "minimum_analog_video_opl" / Int16ub, - "minimum_digital_compressed_audio_opl" / Int16ub, - "minimum_digital_uncompressed_audio_opl" / Int16ub, - ) - - ExpirationRestrictionObject = Struct( - "begin_date" / Int32ub, - "end_date" / Int32ub - ) - - RemovalDateObject = Struct( - "removal_date" / Int32ub - ) - - UplinkKIDObject = Struct( - "uplink_kid" / Bytes(16), - "chained_checksum_type" / Int16ub, - "chained_checksum_length" / Int16ub, - "chained_checksum" / Bytes(this.chained_checksum_length) - ) - - AnalogVideoOutputConfigurationRestriction = Struct( - "video_output_protection_id" / Bytes(16), - "binary_configuration_data" / Bytes(this._.length - 24) - ) - - DigitalVideoOutputRestrictionObject = Struct( - "video_output_protection_id" / Bytes(16), - "binary_configuration_data" / Bytes(this._.length - 24) - ) - - DigitalAudioOutputRestrictionObject = Struct( - "audio_output_protection_id" / Bytes(16), - "binary_configuration_data" / Bytes(this._.length - 24) - ) - - PolicyMetadataObject = Struct( - "metadata_type" / Bytes(16), - "policy_data" / Bytes(this._.length) - ) - - SecureStopRestrictionObject = Struct( - "metering_id" / Bytes(16) - ) - - MeteringRestrictionObject = Struct( - "metering_id" / Bytes(16) - ) - - ExpirationAfterFirstPlayRestrictionObject = Struct( - "seconds" / Int32ub - ) - - GracePeriodObject = Struct( - "grace_period" / Int32ub - ) - - SourceIdObject = Struct( - "source_id" / Int32ub - ) - - AuxiliaryKey = Struct( - "location" / Int32ub, - "key" / Bytes(16) - ) - - AuxiliaryKeysObject = Struct( - "count" / Int16ub, - "auxiliary_keys" / Array(this.count, AuxiliaryKey) - ) - - UplinkKeyObject3 = Struct( - "uplink_key_id" / Bytes(16), - "chained_length" / Int16ub, - "checksum" / Bytes(this.chained_length), - "count" / Int16ub, - "entries" / Array(this.count, Int32ub) - ) - - CopyEnablerObject = Struct( - "copy_enabler_type" / Bytes(16) - ) - - CopyCountRestrictionObject = Struct( - "count" / Int32ub - ) - - MoveObject = Struct( - "minimum_move_protection_level" / Int32ub - ) - - XmrObject = Struct( - "flags" / Int16ub, - "type" / Int16ub, - "length" / Int32ub, - "data" / Switch( - lambda ctx: ctx.type, - { - 0x0005: OutputProtectionLevelRestrictionObject, - 0x0008: AnalogVideoOutputConfigurationRestriction, - 0x000a: ContentKeyObject, - 0x000b: SignatureObject, - 0x000d: RightsSettingsObject, - 0x0012: ExpirationRestrictionObject, - 0x0013: IssueDateObject, - 0x0016: MeteringRestrictionObject, - 0x001a: GracePeriodObject, - 0x0022: SourceIdObject, - 0x002a: ECCKeyObject, - 0x002c: PolicyMetadataObject, - 0x0029: DomainRestrictionObject, - 0x0030: ExpirationAfterFirstPlayRestrictionObject, - 0x0031: DigitalAudioOutputRestrictionObject, - 0x0032: RevInfoVersionObject, - 0x0033: EmbeddedLicenseSettingsObject, - 0x0034: SecurityLevelObject, - 0x0037: MoveObject, - 0x0039: PlayEnablerType, - 0x003a: CopyEnablerObject, - 0x003b: UplinkKIDObject, - 0x003d: CopyCountRestrictionObject, - 0x0050: RemovalDateObject, - 0x0051: AuxiliaryKeysObject, - 0x0052: UplinkKeyObject3, - 0x005a: SecureStopRestrictionObject, - 0x0059: DigitalVideoOutputRestrictionObject - }, - default=LazyBound(lambda ctx: _XMRLicenseStructs.XmrObject) - ) - ) - - XmrLicense = Struct( - "signature" / Const(b"XMR\x00"), - "xmr_version" / Int32ub, - "rights_id" / Bytes(16), - "containers" / GreedyRange(XmrObject) - ) - - -class XMRLicense(_XMRLicenseStructs): - """Represents an XMRLicense""" - - def __init__( - self, - parsed_license: Container, - license_obj: _XMRLicenseStructs.XmrLicense = _XMRLicenseStructs.XmrLicense - ): - self.parsed = parsed_license - self._license_obj = license_obj - - @classmethod - def loads(cls, data: Union[str, bytes]) -> XMRLicense: - if isinstance(data, str): - data = base64.b64decode(data) - if not isinstance(data, bytes): - raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}") - - licence = _XMRLicenseStructs.XmrLicense - return cls( - parsed_license=licence.parse(data), - license_obj=licence - ) - - @classmethod - def load(cls, path: Union[Path, str]) -> XMRLicense: - if not isinstance(path, (Path, str)): - raise ValueError(f"Expecting Path object or path string, got {path!r}") - with Path(path).open(mode="rb") as f: - return cls.loads(f.read()) - - def dumps(self) -> bytes: - return self._license_obj.build(self.parsed) - - def struct(self) -> _XMRLicenseStructs.XmrLicense: - return self._license_obj - - def _locate(self, container: Container): - if container.flags == 2 or container.flags == 3: - return self._locate(container.data) - else: - return container - - def get_object(self, type_: int): - for obj in self.parsed.containers: - container = self._locate(obj) - if container.type == type_: - yield container.data - - def get_content_keys(self): - yield from self.get_object(10) \ No newline at end of file diff --git a/routes/playready.py b/routes/playready.py index 8e7c1f1..186a740 100644 --- a/routes/playready.py +++ b/routes/playready.py @@ -4,49 +4,77 @@ from modules.config import apikey_required from modules.playready import PLAYREADY from modules.logging import setup_logging -# Setup logging -logging = setup_logging() +# ============================================================================================================================== # +logging = setup_logging() playready_bp = Blueprint('playready_bp', __name__) CORS(playready_bp, resources={r"/*": {"origins": "*"}}, supports_credentials=True, allow_headers=["Content-Type", "X-API-KEY"]) -@playready_bp.route('/extension', methods=['POST']) +# ============================================================================================================================== # + +@playready_bp.route("//open", methods=["GET"]) @apikey_required @cross_origin() -def extension(): - if not request.is_json: - response_data = {"message": "Missing JSON in request."} - return jsonify({"responseData": response_data}), 400 - +def open_device(device): + playready = PLAYREADY() + if playready.device_name != device: + response_data = {"message": "Ops! Invalid Device :P"} + return jsonify({"responseData": response_data}), 404 + return playready.open_devices() + +# ============================================================================================================================== # + +@playready_bp.route("//close/", methods=["GET"]) +@apikey_required +@cross_origin() +def close_device(device, session_id): + playready = PLAYREADY() + if playready.device_name != device: + response_data = {"message": "Ops! Invalid Device :P"} + return jsonify({"responseData": response_data}), 404 + return playready.close_devices(session_id) + +# ============================================================================================================================== # + +@playready_bp.route("//get_challenge", methods=["POST"]) +@apikey_required +@cross_origin() +def get_challenge(device): + + playready = PLAYREADY() + if playready.device_name != device: + return jsonify({"responseData": {"message": "Ops! Invalid Device :P"}}), 404 + data = request.get_json() - action = data.get('action', None) + pssh = data.get('pssh', None) + session_id = data.get('session_id', None) + + if not pssh or not session_id: + return jsonify({"responseData": {"message": "Missing required fields in JSON body."}}), 400 - if not action: - response_data = {"message": "Missing action in request."} - return jsonify({"responseData": response_data}), 400 - - if action == "Challenge?": - pssh = data.get('pssh', None) - - if not pssh: - response_data = {"message": "Missing pssh in request."} - return jsonify({"responseData": response_data}), 400 - - playready = PLAYREADY() - playready.pssh = pssh - return playready.get_license_challenge() - - elif action == "Keys?": - license = data.get('license', None) - - if not license: - response_data = {"message": "Missing license in request."} - return jsonify({"responseData": response_data}), 400 - - playready = PLAYREADY() - playready.license = license - return playready.get_license_keys() - - else: - response_data = {"message": "Unknown action."} - return jsonify({"responseData": response_data}), 400 \ No newline at end of file + playready.pssh = pssh + playready.session_id = session_id + return playready.get_challenges(device) + +# ============================================================================================================================== # + +@playready_bp.route("//get_keys", methods=["POST"]) +@apikey_required +@cross_origin() +def get_key(device): + playready = PLAYREADY() + if playready.device_name != device: + return jsonify({"responseData": {"message": "Ops! Invalid Device :P"}}), 404 + + data = request.get_json() + license = data.get('license_b64', None) + session_id = data.get('session_id', None) + + if not license or not session_id: + return jsonify({"responseData": {"message": "Missing required fields in JSON body."}}), 400 + + playready.license = license + playready.session_id = session_id + return playready.get_keys(device) + +# ============================================================================================================================== #