diff --git a/.env.example b/.env.example
new file mode 100644
index 0000000..5c6d927
--- /dev/null
+++ b/.env.example
@@ -0,0 +1,6 @@
+[DEFAULT]
+FLASK_DEBUG = # True or False
+FLASK_ENV = # Development or Production
+FLASK_APP = # app.py
+FLASK_RUN_HOST = # LOCAL IP
+FLASK_RUN_PORT = # LOCAL PORT
\ No newline at end of file
diff --git a/app.py b/app.py
new file mode 100644
index 0000000..10cd425
--- /dev/null
+++ b/app.py
@@ -0,0 +1,39 @@
+from flask import Flask, request, render_template, Response, jsonify
+from modules.banners import banners, clear_terminal
+from routes.playready import playready_bp
+from flask_cors import CORS, cross_origin
+from modules.config import setup_config
+from modules.logging import setup_logging
+
+
+banners()
+logging = setup_logging()
+config = setup_config()
+app = Flask(__name__)
+CORS(app, resources={r"/*": {"origins": "*"}}, supports_credentials=True, allow_headers=["Content-Type", "X-API-KEY"])
+
+# ========================================================================================================================================== #
+
+prefix_url_api = '/api'
+app.register_blueprint(playready_bp, url_prefix=f'{prefix_url_api}/playready')
+
+# ========================================================================================================================================== #
+
+@app.route('/api')
+@cross_origin()
+def backend_api():
+ result = {"message": "Ping Pong! PlayReady Proxy API"}
+ return jsonify({"responseData": result})
+
+# ========================================================================================================================================== #
+
+@app.route('/api/')
+@cross_origin()
+def backend_api_slash():
+ result = {"message": "Ping Pong! PlayReady Proxy API"}
+ return jsonify({"responseData": result})
+
+# ========================================================================================================================================== #
+
+if __name__ == "__main__":
+ app.run()
diff --git a/config.ini.example b/config.ini.example
new file mode 100644
index 0000000..9983c41
--- /dev/null
+++ b/config.ini.example
@@ -0,0 +1,2 @@
+[PLAYREADY]
+DEVICE = device/example.prd # your device filename
\ No newline at end of file
diff --git a/device/ssup nigga b/device/ssup nigga
new file mode 100644
index 0000000..e69de29
diff --git a/generate_apikey.py b/generate_apikey.py
new file mode 100644
index 0000000..6d70fdc
--- /dev/null
+++ b/generate_apikey.py
@@ -0,0 +1,5 @@
+from modules.config import generate_api_key
+
+new_user = input("Enter username: ")
+new_key = generate_api_key(new_user)
+print(f"Generated API key for 'new_user': {new_key}")
\ No newline at end of file
diff --git a/modules/__init__.py b/modules/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/modules/banners.py b/modules/banners.py
new file mode 100644
index 0000000..68abcba
--- /dev/null
+++ b/modules/banners.py
@@ -0,0 +1,26 @@
+import pyfiglet
+import random, os
+from colorama import Fore, Back, Style, init
+
+init(autoreset=True)
+
+def clear_terminal():
+ os.system('cls' if os.name == 'nt' else 'clear')
+
+def banners():
+ clear_terminal()
+ banners = """
+ ██████╗ ██████╗ ██████╗ ███████╗ ██████╗██████╗ ██╗ ██╗██████╗ ████████╗ ██████╗ ██████╗
+ ██╔══██╗██╔══██╗ ██╔══██╗██╔════╝██╔════╝██╔══██╗╚██╗ ██╔╝██╔══██╗╚══██╔══╝██╔═══██╗██╔══██╗
+ ██████╔╝██████╔╝█████╗██║ ██║█████╗ ██║ ██████╔╝ ╚████╔╝ ██████╔╝ ██║ ██║ ██║██████╔╝
+ ██╔═══╝ ██╔══██╗╚════╝██║ ██║██╔══╝ ██║ ██╔══██╗ ╚██╔╝ ██╔═══╝ ██║ ██║ ██║██╔══██╗
+ ██║ ██║ ██║ ██████╔╝███████╗╚██████╗██║ ██║ ██║ ██║ ██║ ╚██████╔╝██║ ██║
+ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚══════╝ ╚═════╝╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝
+ Author: GITHUB.COM/THATNOTEASY
+ """
+ # Split the banner into lines
+ banner_lines = banners.split('\n')
+ colors = [Fore.RED, Fore.GREEN, Fore.YELLOW, Fore.BLUE, Fore.CYAN, Fore.MAGENTA]
+ for line in banner_lines:
+ random_color = random.choice(colors)
+ print(random_color + line)
\ No newline at end of file
diff --git a/modules/config.py b/modules/config.py
new file mode 100644
index 0000000..7908dfa
--- /dev/null
+++ b/modules/config.py
@@ -0,0 +1,65 @@
+import configparser, json, secrets, logging, coloredlogs
+from functools import wraps
+from modules.logging import setup_logging
+from flask import request, jsonify
+
+API_KEY_FILE = 'APIKEY.json'
+
+def setup_config():
+ config = configparser.ConfigParser()
+ config.read('config.ini')
+ return config
+
+def load_api_keys():
+ try:
+ with open(API_KEY_FILE, 'r') as file:
+ return json.load(file)
+ except FileNotFoundError:
+ logger.error("APIKEY.json file not found.")
+ return []
+ except json.JSONDecodeError:
+ logger.error("Error decoding APIKEY.json.")
+ return []
+
+def save_api_keys(api_keys):
+ with open(API_KEY_FILE, 'w') as file:
+ json.dump(api_keys, file, indent=4)
+
+def is_valid_api_key(provided_key):
+ api_keys = load_api_keys()
+ for entry in api_keys:
+ if entry.get("apikey") == provided_key:
+ return True
+ return False
+
+def generate_api_key(username):
+ random_key = secrets.token_hex(16)
+ new_api_key = f"{username}_{random_key}"
+ api_keys = load_api_keys()
+ user_found = False
+ for entry in api_keys:
+ if entry.get("username") == username:
+ entry["apikey"] = new_api_key
+ user_found = True
+ break
+
+ if not user_found:
+ api_keys.append({
+ "username": username,
+ "apikey": new_api_key
+ })
+ save_api_keys(api_keys)
+ return new_api_key
+
+def apikey_required(func):
+ @wraps(func)
+ def decorated_function(*args, **kwargs):
+ provided_key = request.headers.get('X-API-KEY')
+ if not provided_key:
+ logging.error("X-API-KEY header is missing.")
+ return jsonify({"responseData": "Opss! API key is missing"}), 403
+ if not is_valid_api_key(provided_key):
+ logging.error("Invalid X-API-KEY.")
+ return jsonify({"responseData": "Opss! Invalid API key"}), 403
+ return func(*args, **kwargs)
+ return decorated_function
\ No newline at end of file
diff --git a/modules/logging.py b/modules/logging.py
new file mode 100644
index 0000000..0a47fa1
--- /dev/null
+++ b/modules/logging.py
@@ -0,0 +1,17 @@
+import logging, coloredlogs
+import os
+from datetime import datetime
+
+def setup_logging():
+ log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+ current_time = datetime.now().strftime("%I-%M-%S_%p_%b-%d-%Y")
+ log_filename = f'{current_time}.log'
+ logs_folder = 'logs'
+ if not os.path.exists(logs_folder):
+ os.makedirs(logs_folder)
+
+ coloredlogs.install(level='DEBUG', fmt=log_format, humanize=True)
+ log_filepath = os.path.join(logs_folder, log_filename)
+ logging.basicConfig(level=logging.DEBUG, format=log_format, filename=log_filepath)
+
+ return logging
\ No newline at end of file
diff --git a/modules/playready.py b/modules/playready.py
new file mode 100644
index 0000000..ae1e78a
--- /dev/null
+++ b/modules/playready.py
@@ -0,0 +1,70 @@
+import requests, base64, logging, coloredlogs, re
+import xml.dom.minidom as Dom
+import xml.etree.ElementTree as ET
+import json
+
+from flask import jsonify
+from modules.logging import setup_logging
+from modules.config import setup_config
+
+from modules.pyplayready.pssh import PSSH
+from modules.pyplayready.device import Device
+from modules.pyplayready.cdm import Cdm
+
+class PLAYREADY:
+ def __init__(self):
+ self.pssh = None
+ self.session_id = None
+ self.challenge = None
+ self.license = None
+
+ self.config = setup_config()
+ self.device = self.config["PLAYREADY"]["DEVICE"]
+ self.logging = setup_logging()
+
+ def get_license_challenge(self):
+ device = Device.load(self.device)
+ cdm = Cdm.from_device(self.device)
+ pssh = PSSH(self.pssh)
+ wrm_headers = pssh.get_wrm_headers(downgrade_to_v4=False)
+ raw_challenge = cdm.get_license_challenge(wrm_headers[0])
+ if isinstance(raw_challenge, str):
+ raw_challenge = raw_challenge.encode('utf-8')
+
+ challenge_b64 = base64.b64encode(raw_challenge).decode('utf-8')
+ return challenge_b64
+
+ def get_license_keys(self):
+ device = Device.load(self.device)
+ cdm = Cdm.from_device(device)
+ license_data = self.license
+ try:
+ try:
+ decoded_license = base64.b64decode(license_data, validate=True).decode('utf-8', errors='replace')
+ except (base64.binascii.Error, UnicodeDecodeError):
+ return jsonify({"error": "License must be provided as valid Base64-encoded data"}), 400
+
+ cleaned_license = re.sub(r'([a-zA-Z0-9:]+)(xmlns)', r'\1 \2', decoded_license)
+ try:
+ parsed_xml = ET.fromstring(cleaned_license)
+ pretty_xml = Dom.parseString(cleaned_license).toprettyxml()
+ except ET.ParseError as parse_error:
+ return jsonify({"error": f"Invalid XML in license data: {parse_error}"}), 400
+ except ValueError as decode_error:
+ return jsonify({"error": "Invalid license format"}), 400
+ try:
+ cdm.parse_license(cleaned_license)
+ except Exception as e:
+ return jsonify({"error": f"Unable to parse license: {e}"}), 500
+
+ try:
+ key_pairs = []
+ for key in cdm.get_keys():
+ self.kid = key.key_id.hex() if isinstance(key.key_id, bytes) else str(key.key_id).replace("-", "")
+ self.key = key.key.hex() if isinstance(key.key, bytes) else str(key.key)
+ key_pairs.append({"key_id": self.kid, "key": self.key})
+ print(key_pairs)
+ return jsonify(key_pairs)
+
+ except Exception as e:
+ return jsonify({"error": f"Unable to extract keys: {e}"}), 500
\ No newline at end of file
diff --git a/modules/pyplayready/bcert.py b/modules/pyplayready/bcert.py
new file mode 100644
index 0000000..bf9f13a
--- /dev/null
+++ b/modules/pyplayready/bcert.py
@@ -0,0 +1,486 @@
+from __future__ import annotations
+import collections.abc
+
+from Crypto.PublicKey import ECC
+
+from modules.pyplayready.crypto import Crypto
+from modules.pyplayready.exceptions import InvalidCertificateChain
+
+# monkey patch for construct 2.8.8 compatibility
+if not hasattr(collections, 'Sequence'):
+ collections.Sequence = collections.abc.Sequence
+
+import base64
+from pathlib import Path
+from typing import Union
+
+from construct import Bytes, Const, Int32ub, GreedyRange, Switch, Container, ListContainer
+from construct import Int16ub, Array
+from construct import Struct, this
+
+from modules.pyplayready.crypto.ecc_key import ECCKey
+
+
+class _BCertStructs:
+ DrmBCertBasicInfo = Struct(
+ "cert_id" / Bytes(16),
+ "security_level" / Int32ub,
+ "flags" / Int32ub,
+ "cert_type" / Int32ub,
+ "public_key_digest" / Bytes(32),
+ "expiration_date" / Int32ub,
+ "client_id" / Bytes(16)
+ )
+
+ # TODO: untested
+ DrmBCertDomainInfo = Struct(
+ "service_id" / Bytes(16),
+ "account_id" / Bytes(16),
+ "revision_timestamp" / Int32ub,
+ "domain_url_length" / Int32ub,
+ "domain_url" / Bytes((this.domain_url_length + 3) & 0xfffffffc)
+ )
+
+ # TODO: untested
+ DrmBCertPCInfo = Struct(
+ "security_version" / Int32ub
+ )
+
+ # TODO: untested
+ DrmBCertDeviceInfo = Struct(
+ "max_license" / Int32ub,
+ "max_header" / Int32ub,
+ "max_chain_depth" / Int32ub
+ )
+
+ DrmBCertFeatureInfo = Struct(
+ "feature_count" / Int32ub, # max. 32
+ "features" / Array(this.feature_count, Int32ub)
+ )
+
+ DrmBCertKeyInfo = Struct(
+ "key_count" / Int32ub,
+ "cert_keys" / Array(this.key_count, Struct(
+ "type" / Int16ub,
+ "length" / Int16ub,
+ "flags" / Int32ub,
+ "key" / Bytes(this.length // 8),
+ "usages_count" / Int32ub,
+ "usages" / Array(this.usages_count, Int32ub)
+ ))
+ )
+
+ DrmBCertManufacturerInfo = Struct(
+ "flags" / Int32ub,
+ "manufacturer_name_length" / Int32ub,
+ "manufacturer_name" / Bytes((this.manufacturer_name_length + 3) & 0xfffffffc),
+ "model_name_length" / Int32ub,
+ "model_name" / Bytes((this.model_name_length + 3) & 0xfffffffc),
+ "model_number_length" / Int32ub,
+ "model_number" / Bytes((this.model_number_length + 3) & 0xfffffffc),
+ )
+
+ DrmBCertSignatureInfo = Struct(
+ "signature_type" / Int16ub,
+ "signature_size" / Int16ub,
+ "signature" / Bytes(this.signature_size),
+ "signature_key_size" / Int32ub,
+ "signature_key" / Bytes(this.signature_key_size // 8)
+ )
+
+ # TODO: untested
+ DrmBCertSilverlightInfo = Struct(
+ "security_version" / Int32ub,
+ "platform_identifier" / Int32ub
+ )
+
+ # TODO: untested
+ DrmBCertMeteringInfo = Struct(
+ "metering_id" / Bytes(16),
+ "metering_url_length" / Int32ub,
+ "metering_url" / Bytes((this.metering_url_length + 3) & 0xfffffffc)
+ )
+
+ # TODO: untested
+ DrmBCertExtDataSignKeyInfo = Struct(
+ "key_type" / Int16ub,
+ "key_length" / Int16ub,
+ "flags" / Int32ub,
+ "key" / Bytes(this.length // 8)
+ )
+
+ # TODO: untested
+ BCertExtDataRecord = Struct(
+ "data_size" / Int32ub,
+ "data" / Bytes(this.data_size)
+ )
+
+ # TODO: untested
+ DrmBCertExtDataSignature = Struct(
+ "signature_type" / Int16ub,
+ "signature_size" / Int16ub,
+ "signature" / Bytes(this.signature_size)
+ )
+
+ # TODO: untested
+ BCertExtDataContainer = Struct(
+ "record_count" / Int32ub, # always 1
+ "records" / Array(this.record_count, BCertExtDataRecord),
+ "signature" / DrmBCertExtDataSignature
+ )
+
+ # TODO: untested
+ DrmBCertServerInfo = Struct(
+ "warning_days" / Int32ub
+ )
+
+ # TODO: untested
+ DrmBcertSecurityVersion = Struct(
+ "security_version" / Int32ub,
+ "platform_identifier" / Int32ub
+ )
+
+ Attribute = Struct(
+ "flags" / Int16ub,
+ "tag" / Int16ub,
+ "length" / Int32ub,
+ "attribute" / Switch(
+ lambda this_: this_.tag,
+ {
+ 1: DrmBCertBasicInfo,
+ 2: DrmBCertDomainInfo,
+ 3: DrmBCertPCInfo,
+ 4: DrmBCertDeviceInfo,
+ 5: DrmBCertFeatureInfo,
+ 6: DrmBCertKeyInfo,
+ 7: DrmBCertManufacturerInfo,
+ 8: DrmBCertSignatureInfo,
+ 9: DrmBCertSilverlightInfo,
+ 10: DrmBCertMeteringInfo,
+ 11: DrmBCertExtDataSignKeyInfo,
+ 12: BCertExtDataContainer,
+ 13: DrmBCertExtDataSignature,
+ 14: Bytes(this.length - 8),
+ 15: DrmBCertServerInfo,
+ 16: DrmBcertSecurityVersion,
+ 17: DrmBcertSecurityVersion
+ },
+ default=Bytes(this.length - 8)
+ )
+ )
+
+ BCert = Struct(
+ "signature" / Const(b"CERT"),
+ "version" / Int32ub,
+ "total_length" / Int32ub,
+ "certificate_length" / Int32ub,
+ "attributes" / GreedyRange(Attribute)
+ )
+
+ BCertChain = Struct(
+ "signature" / Const(b"CHAI"),
+ "version" / Int32ub,
+ "total_length" / Int32ub,
+ "flags" / Int32ub,
+ "certificate_count" / Int32ub,
+ "certificates" / GreedyRange(BCert)
+ )
+
+
+class Certificate(_BCertStructs):
+ """Represents a BCert"""
+
+ def __init__(
+ self,
+ parsed_bcert: Container,
+ bcert_obj: _BCertStructs.BCert = _BCertStructs.BCert
+ ):
+ self.parsed = parsed_bcert
+ self._BCERT = bcert_obj
+
+ @classmethod
+ def new_leaf_cert(
+ cls,
+ cert_id: bytes,
+ security_level: int,
+ client_id: bytes,
+ signing_key: ECCKey,
+ encryption_key: ECCKey,
+ group_key: ECCKey,
+ parent: CertificateChain,
+ expiry: int = 0xFFFFFFFF,
+ max_license: int = 10240,
+ max_header: int = 15360,
+ max_chain_depth: int = 2
+ ) -> Certificate:
+ basic_info = Container(
+ cert_id=cert_id,
+ security_level=security_level,
+ flags=0,
+ cert_type=2,
+ public_key_digest=signing_key.public_sha256_digest(),
+ expiration_date=expiry,
+ client_id=client_id
+ )
+ basic_info_attribute = Container(
+ flags=1,
+ tag=1,
+ length=len(_BCertStructs.DrmBCertBasicInfo.build(basic_info)) + 8,
+ attribute=basic_info
+ )
+
+ device_info = Container(
+ max_license=max_license,
+ max_header=max_header,
+ max_chain_depth=max_chain_depth
+ )
+ device_info_attribute = Container(
+ flags=1,
+ tag=4,
+ length=len(_BCertStructs.DrmBCertDeviceInfo.build(device_info)) + 8,
+ attribute=device_info
+ )
+
+ feature = Container(
+ feature_count=3,
+ features=ListContainer([
+ # 1, # Transmitter
+ # 2, # Receiver
+ # 3, # SharedCertificate
+ 4, # SecureClock
+ # 5, # AntiRollBackClock
+ # 6, # ReservedMetering
+ # 7, # ReservedLicSync
+ # 8, # ReservedSymOpt
+ 9, # CRLS (Revocation Lists)
+ # 10, # ServerBasicEdition
+ # 11, # ServerStandardEdition
+ # 12, # ServerPremiumEdition
+ 13, # PlayReady3Features
+ # 14, # DeprecatedSecureStop
+ ])
+ )
+ feature_attribute = Container(
+ flags=1,
+ tag=5,
+ length=len(_BCertStructs.DrmBCertFeatureInfo.build(feature)) + 8,
+ attribute=feature
+ )
+
+ cert_key_sign = Container(
+ type=1,
+ length=512, # bits
+ flags=0,
+ key=signing_key.public_bytes(),
+ usages_count=1,
+ usages=ListContainer([
+ 1 # KEYUSAGE_SIGN
+ ])
+ )
+ cert_key_encrypt = Container(
+ type=1,
+ length=512, # bits
+ flags=0,
+ key=encryption_key.public_bytes(),
+ usages_count=1,
+ usages=ListContainer([
+ 2 # KEYUSAGE_ENCRYPT_KEY
+ ])
+ )
+ key_info = Container(
+ key_count=2,
+ cert_keys=ListContainer([
+ cert_key_sign,
+ cert_key_encrypt
+ ])
+ )
+ key_info_attribute = Container(
+ flags=1,
+ tag=6,
+ length=len(_BCertStructs.DrmBCertKeyInfo.build(key_info)) + 8,
+ attribute=key_info
+ )
+
+ manufacturer_info = parent.get_certificate(0).get_attribute(7)
+
+ new_bcert_container = Container(
+ signature=b"CERT",
+ version=1,
+ total_length=0, # filled at a later time
+ certificate_length=0, # filled at a later time
+ attributes=ListContainer([
+ basic_info_attribute,
+ device_info_attribute,
+ feature_attribute,
+ key_info_attribute,
+ manufacturer_info,
+ ])
+ )
+
+ payload = _BCertStructs.BCert.build(new_bcert_container)
+ new_bcert_container.certificate_length = len(payload)
+ new_bcert_container.total_length = len(payload) + 144 # signature length
+
+ sign_payload = _BCertStructs.BCert.build(new_bcert_container)
+ signature = Crypto.ecc256_sign(group_key, sign_payload)
+
+ signature_info = Container(
+ signature_type=1,
+ signature_size=64,
+ signature=signature,
+ signature_key_size=512, # bits
+ signature_key=group_key.public_bytes()
+ )
+ signature_info_attribute = Container(
+ flags=1,
+ tag=8,
+ length=len(_BCertStructs.DrmBCertSignatureInfo.build(signature_info)) + 8,
+ attribute=signature_info
+ )
+ new_bcert_container.attributes.append(signature_info_attribute)
+
+ return cls(new_bcert_container)
+
+ @classmethod
+ def loads(cls, data: Union[str, bytes]) -> Certificate:
+ if isinstance(data, str):
+ data = base64.b64decode(data)
+ if not isinstance(data, bytes):
+ raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
+
+ cert = _BCertStructs.BCert
+ return cls(
+ parsed_bcert=cert.parse(data),
+ bcert_obj=cert
+ )
+
+ @classmethod
+ def load(cls, path: Union[Path, str]) -> Certificate:
+ if not isinstance(path, (Path, str)):
+ raise ValueError(f"Expecting Path object or path string, got {path!r}")
+ with Path(path).open(mode="rb") as f:
+ return cls.loads(f.read())
+
+ def get_attribute(self, type_: int):
+ for attribute in self.parsed.attributes:
+ if attribute.tag == type_:
+ return attribute
+
+ def get_security_level(self) -> int:
+ basic_info_attribute = self.get_attribute(1).attribute
+ if basic_info_attribute:
+ return basic_info_attribute.security_level
+
+ @staticmethod
+ def _unpad(name: bytes):
+ return name.rstrip(b'\x00').decode("utf-8", errors="ignore")
+
+ def get_name(self):
+ manufacturer_info = self.get_attribute(7).attribute
+ if manufacturer_info:
+ return f"{self._unpad(manufacturer_info.manufacturer_name)} {self._unpad(manufacturer_info.model_name)} {self._unpad(manufacturer_info.model_number)}"
+
+ def dumps(self) -> bytes:
+ return self._BCERT.build(self.parsed)
+
+ def struct(self) -> _BCertStructs.BCert:
+ return self._BCERT
+
+ def verify_signature(self):
+ signature_object = self.get_attribute(8)
+ signature_attribute = signature_object.attribute
+
+ sign_payload = self.dumps()[:-signature_object.length]
+
+ raw_signature_key = signature_attribute.signature_key
+ signature_key = ECC.construct(
+ curve='P-256',
+ point_x=int.from_bytes(raw_signature_key[:32], 'big'),
+ point_y=int.from_bytes(raw_signature_key[32:], 'big')
+ )
+
+ return Crypto.ecc256_verify(
+ public_key=signature_key,
+ data=sign_payload,
+ signature=signature_attribute.signature
+ )
+
+
+class CertificateChain(_BCertStructs):
+ """Represents a BCertChain"""
+
+ def __init__(
+ self,
+ parsed_bcert_chain: Container,
+ bcert_chain_obj: _BCertStructs.BCertChain = _BCertStructs.BCertChain
+ ):
+ self.parsed = parsed_bcert_chain
+ self._BCERT_CHAIN = bcert_chain_obj
+
+ @classmethod
+ def loads(cls, data: Union[str, bytes]) -> CertificateChain:
+ if isinstance(data, str):
+ data = base64.b64decode(data)
+ if not isinstance(data, bytes):
+ raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
+
+ cert_chain = _BCertStructs.BCertChain
+ return cls(
+ parsed_bcert_chain=cert_chain.parse(data),
+ bcert_chain_obj=cert_chain
+ )
+
+ @classmethod
+ def load(cls, path: Union[Path, str]) -> CertificateChain:
+ if not isinstance(path, (Path, str)):
+ raise ValueError(f"Expecting Path object or path string, got {path!r}")
+ with Path(path).open(mode="rb") as f:
+ return cls.loads(f.read())
+
+ def dumps(self) -> bytes:
+ return self._BCERT_CHAIN.build(self.parsed)
+
+ def struct(self) -> _BCertStructs.BCertChain:
+ return self._BCERT_CHAIN
+
+ def get_certificate(self, index: int) -> Certificate:
+ return Certificate(self.parsed.certificates[index])
+
+ def get_security_level(self) -> int:
+ # not sure if there's a better way than this
+ return self.get_certificate(0).get_security_level()
+
+ def get_name(self) -> str:
+ return self.get_certificate(0).get_name()
+
+ def append(self, bcert: Certificate) -> None:
+ self.parsed.certificate_count += 1
+ self.parsed.certificates.append(bcert.parsed)
+ self.parsed.total_length += len(bcert.dumps())
+
+ def prepend(self, bcert: Certificate) -> None:
+ self.parsed.certificate_count += 1
+ self.parsed.certificates.insert(0, bcert.parsed)
+ self.parsed.total_length += len(bcert.dumps())
+
+ def remove(self, index: int) -> None:
+ if self.parsed.certificate_count <= 0:
+ raise InvalidCertificateChain("CertificateChain does not contain any Certificates")
+ if index >= self.parsed.certificate_count:
+ raise IndexError(f"No Certificate at index {index}, {self.parsed.certificate_count} total")
+
+ self.parsed.certificate_count -= 1
+ bcert = Certificate(self.parsed.certificates[index])
+ self.parsed.total_length -= len(bcert.dumps())
+ self.parsed.certificates.pop(index)
+
+ def get(self, index: int) -> Certificate:
+ if self.parsed.certificate_count <= 0:
+ raise InvalidCertificateChain("CertificateChain does not contain any Certificates")
+ if index >= self.parsed.certificate_count:
+ raise IndexError(f"No Certificate at index {index}, {self.parsed.certificate_count} total")
+
+ return Certificate(self.parsed.certificates[index])
+
+ def count(self) -> int:
+ return self.parsed.certificate_count
\ No newline at end of file
diff --git a/modules/pyplayready/cdm.py b/modules/pyplayready/cdm.py
new file mode 100644
index 0000000..6bfea68
--- /dev/null
+++ b/modules/pyplayready/cdm.py
@@ -0,0 +1,218 @@
+from __future__ import annotations
+
+import base64
+import math
+import time
+from typing import List
+from uuid import UUID
+import xml.etree.ElementTree as ET
+
+from Crypto.Cipher import AES
+from Crypto.Hash import SHA256
+from Crypto.Random import get_random_bytes
+from Crypto.Signature import DSS
+from Crypto.Util.Padding import pad
+from ecpy.curves import Point, Curve
+
+from modules.pyplayready.bcert import CertificateChain
+from modules.pyplayready.crypto.ecc_key import ECCKey
+from modules.pyplayready.key import Key
+from modules.pyplayready.xml_key import XmlKey
+from modules.pyplayready.crypto.elgamal import ElGamal
+from modules.pyplayready.xmrlicense import XMRLicense
+
+
+class Cdm:
+ def __init__(
+ self,
+ security_level: int,
+ certificate_chain: CertificateChain,
+ encryption_key: ECCKey,
+ signing_key: ECCKey,
+ client_version: str = "10.0.16384.10011",
+ protocol_version: int = 1
+ ):
+ self.security_level = security_level
+ self.certificate_chain = certificate_chain
+ self.encryption_key = encryption_key
+ self.signing_key = signing_key
+ self.client_version = client_version
+ self.protocol_version = protocol_version
+
+ self.curve = Curve.get_curve("secp256r1")
+ self.elgamal = ElGamal(self.curve)
+
+ self._wmrm_key = Point(
+ x=0xc8b6af16ee941aadaa5389b4af2c10e356be42af175ef3face93254e7b0b3d9b,
+ y=0x982b27b5cb2341326e56aa857dbfd5c634ce2cf9ea74fca8f2af5957efeea562,
+ curve=self.curve
+ )
+ self._xml_key = XmlKey()
+
+ self._keys: List[Key] = []
+
+ @classmethod
+ def from_device(cls, device) -> Cdm:
+ """Initialize a Playready CDM from a Playready Device (.prd) file"""
+ return cls(
+ security_level=device.security_level,
+ certificate_chain=device.group_certificate,
+ encryption_key=device.encryption_key,
+ signing_key=device.signing_key
+ )
+
+ def get_key_data(self) -> bytes:
+ point1, point2 = self.elgamal.encrypt(
+ message_point=self._xml_key.get_point(self.elgamal.curve),
+ public_key=self._wmrm_key
+ )
+ return self.elgamal.to_bytes(point1.x) + self.elgamal.to_bytes(point1.y) + self.elgamal.to_bytes(point2.x) + self.elgamal.to_bytes(point2.y)
+
+ def get_cipher_data(self) -> bytes:
+ b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode()
+ body = f"{b64_chain}"
+
+ cipher = AES.new(
+ key=self._xml_key.aes_key,
+ mode=AES.MODE_CBC,
+ iv=self._xml_key.aes_iv
+ )
+
+ ciphertext = cipher.encrypt(pad(
+ body.encode(),
+ AES.block_size
+ ))
+
+ return self._xml_key.aes_iv + ciphertext
+
+ def _build_digest_content(
+ self,
+ content_header: str,
+ nonce: str,
+ wmrm_cipher: str,
+ cert_cipher: str
+ ) -> str:
+ return (
+ ''
+ f'{self.protocol_version}'
+ f'{content_header}'
+ ''
+ f'{self.client_version}'
+ ''
+ f'{nonce}'
+ f'{math.floor(time.time())}'
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ 'WMRMServer'
+ ''
+ ''
+ f'{wmrm_cipher}'
+ ''
+ ''
+ ''
+ ''
+ f'{cert_cipher}'
+ ''
+ ''
+ ''
+ )
+
+ @staticmethod
+ def _build_signed_info(digest_value: str) -> str:
+ return (
+ ''
+ ''
+ ''
+ ''
+ ''
+ f'{digest_value}'
+ ''
+ ''
+ )
+
+ def get_license_challenge(self, content_header: str) -> str:
+ la_content = self._build_digest_content(
+ content_header=content_header,
+ nonce=base64.b64encode(get_random_bytes(16)).decode(),
+ wmrm_cipher=base64.b64encode(self.get_key_data()).decode(),
+ cert_cipher=base64.b64encode(self.get_cipher_data()).decode()
+ )
+
+ la_hash_obj = SHA256.new()
+ la_hash_obj.update(la_content.encode())
+ la_hash = la_hash_obj.digest()
+
+ signed_info = self._build_signed_info(base64.b64encode(la_hash).decode())
+ signed_info_digest = SHA256.new(signed_info.encode())
+
+ signer = DSS.new(self.signing_key.key, 'fips-186-3')
+ signature = signer.sign(signed_info_digest)
+
+ # haven't found a better way to do this. xmltodict.unparse doesn't work
+ main_body = (
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ + la_content +
+ ''
+ + signed_info +
+ f'{base64.b64encode(signature).decode()}'
+ ''
+ ''
+ ''
+ f'{base64.b64encode(self.signing_key.public_bytes()).decode()}'
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ )
+
+ return main_body
+
+ def _decrypt_ecc256_key(self, encrypted_key: bytes) -> bytes:
+ point1 = Point(
+ x=int.from_bytes(encrypted_key[:32], 'big'),
+ y=int.from_bytes(encrypted_key[32:64], 'big'),
+ curve=self.curve
+ )
+ point2 = Point(
+ x=int.from_bytes(encrypted_key[64:96], 'big'),
+ y=int.from_bytes(encrypted_key[96:128], 'big'),
+ curve=self.curve
+ )
+
+ decrypted = self.elgamal.decrypt((point1, point2), int(self.encryption_key.key.d))
+ return self.elgamal.to_bytes(decrypted.x)[16:32]
+
+ def parse_license(self, licence: str) -> None:
+ try:
+ root = ET.fromstring(licence)
+ license_elements = root.findall(".//{http://schemas.microsoft.com/DRM/2007/03/protocols}License")
+ for license_element in license_elements:
+ parsed_licence = XMRLicense.loads(license_element.text)
+ for key in parsed_licence.get_content_keys():
+ if Key.CipherType(key.cipher_type) == Key.CipherType.ECC_256:
+ self._keys.append(Key(
+ key_id=UUID(bytes_le=key.key_id),
+ key_type=key.key_type,
+ cipher_type=key.cipher_type,
+ key_length=key.key_length,
+ key=self._decrypt_ecc256_key(key.encrypted_key)
+ ))
+ except Exception as e:
+ raise Exception(f"Unable to parse license, {e}")
+
+ def get_keys(self) -> List[Key]:
+ return self._keys
\ No newline at end of file
diff --git a/modules/pyplayready/crypto/__init__.py b/modules/pyplayready/crypto/__init__.py
new file mode 100644
index 0000000..6e5cf58
--- /dev/null
+++ b/modules/pyplayready/crypto/__init__.py
@@ -0,0 +1,96 @@
+from typing import Union, Tuple
+
+from Crypto.Hash import SHA256
+from Crypto.Hash.SHA256 import SHA256Hash
+from Crypto.PublicKey.ECC import EccKey
+from Crypto.Signature import DSS
+from ecpy.curves import Point, Curve
+
+from modules.pyplayready.crypto.elgamal import ElGamal
+from modules.pyplayready.crypto.ecc_key import ECCKey
+
+
+class Crypto:
+ def __init__(self, curve: str = "secp256r1"):
+ self.curve = Curve.get_curve(curve)
+ self.elgamal = ElGamal(self.curve)
+
+ def ecc256_encrypt(self, public_key: Union[ECCKey, Point], plaintext: Union[Point, bytes]) -> bytes:
+ if isinstance(public_key, ECCKey):
+ public_key = public_key.get_point(self.curve)
+ if not isinstance(public_key, Point):
+ raise ValueError(f"Expecting ECCKey or Point input, got {public_key!r}")
+
+ if isinstance(plaintext, bytes):
+ plaintext = Point(
+ x=int.from_bytes(plaintext[:32], 'big'),
+ y=int.from_bytes(plaintext[32:64], 'big'),
+ curve=self.curve
+ )
+ if not isinstance(plaintext, Point):
+ raise ValueError(f"Expecting Point or Bytes input, got {plaintext!r}")
+
+ point1, point2 = self.elgamal.encrypt(
+ message_point=plaintext,
+ public_key=public_key
+ )
+ return b''.join([
+ self.elgamal.to_bytes(point1.x),
+ self.elgamal.to_bytes(point1.y),
+ self.elgamal.to_bytes(point2.x),
+ self.elgamal.to_bytes(point2.y)
+ ])
+
+ def ecc256_decrypt(self, private_key: ECCKey, ciphertext: Union[Tuple[Point, Point], bytes]) -> bytes:
+ if isinstance(ciphertext, bytes):
+ ciphertext = (
+ Point(
+ x=int.from_bytes(ciphertext[:32], 'big'),
+ y=int.from_bytes(ciphertext[32:64], 'big'),
+ curve=self.curve
+ ),
+ Point(
+ x=int.from_bytes(ciphertext[64:96], 'big'),
+ y=int.from_bytes(ciphertext[96:128], 'big'),
+ curve=self.curve
+ )
+ )
+ if not isinstance(ciphertext, Tuple):
+ raise ValueError(f"Expecting Tuple[Point, Point] or Bytes input, got {ciphertext!r}")
+
+ decrypted = self.elgamal.decrypt(ciphertext, int(private_key.key.d))
+ return self.elgamal.to_bytes(decrypted.x)
+
+ @staticmethod
+ def ecc256_sign(private_key: Union[ECCKey, EccKey], data: Union[SHA256Hash, bytes]) -> bytes:
+ if isinstance(private_key, ECCKey):
+ private_key = private_key.key
+ if not isinstance(private_key, EccKey):
+ raise ValueError(f"Expecting ECCKey or EccKey input, got {private_key!r}")
+
+ if isinstance(data, bytes):
+ data = SHA256.new(data)
+ if not isinstance(data, SHA256Hash):
+ raise ValueError(f"Expecting SHA256Hash or Bytes input, got {data!r}")
+
+ signer = DSS.new(private_key, 'fips-186-3')
+ return signer.sign(data)
+
+ @staticmethod
+ def ecc256_verify(public_key: Union[ECCKey, EccKey], data: Union[SHA256Hash, bytes], signature: bytes) -> bool:
+ if isinstance(public_key, ECCKey):
+ public_key = public_key.key
+ if not isinstance(public_key, EccKey):
+ raise ValueError(f"Expecting ECCKey or EccKey input, got {public_key!r}")
+
+ if isinstance(data, bytes):
+ data = SHA256.new(data)
+ if not isinstance(data, SHA256Hash):
+ raise ValueError(f"Expecting SHA256Hash or Bytes input, got {data!r}")
+
+ verifier = DSS.new(public_key, 'fips-186-3')
+ try:
+ verifier.verify(data, signature)
+ return True
+ except ValueError:
+ return False
\ No newline at end of file
diff --git a/modules/pyplayready/crypto/ecc_key.py b/modules/pyplayready/crypto/ecc_key.py
new file mode 100644
index 0000000..ac366b9
--- /dev/null
+++ b/modules/pyplayready/crypto/ecc_key.py
@@ -0,0 +1,95 @@
+from __future__ import annotations
+
+import base64
+from pathlib import Path
+from typing import Union
+
+from Crypto.Hash import SHA256
+from Crypto.PublicKey import ECC
+from Crypto.PublicKey.ECC import EccKey
+from ecpy.curves import Curve, Point
+
+
+class ECCKey:
+ """Represents a PlayReady ECC key pair"""
+
+ def __init__(self, key: EccKey):
+ self.key = key
+
+ @classmethod
+ def generate(cls):
+ """Generate a new ECC key pair"""
+ return cls(key=ECC.generate(curve='P-256'))
+
+ @classmethod
+ def construct(cls, private_key: Union[bytes, int]):
+ """Construct an ECC key pair from private/public bytes/ints"""
+ if isinstance(private_key, bytes):
+ private_key = int.from_bytes(private_key, 'big')
+ if not isinstance(private_key, int):
+ raise ValueError(f"Expecting Bytes or Int input, got {private_key!r}")
+
+ # The public is always derived from the private key; loading the other stuff won't work
+ key = ECC.construct(
+ curve='P-256',
+ d=private_key,
+ )
+
+ return cls(key=key)
+
+ @classmethod
+ def loads(cls, data: Union[str, bytes]) -> ECCKey:
+ if isinstance(data, str):
+ data = base64.b64decode(data)
+ if not isinstance(data, bytes):
+ raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
+
+ if len(data) not in [96, 32]:
+ raise ValueError(f"Invalid data length. Expecting 96 or 32 bytes, got {len(data)}")
+
+ return cls.construct(private_key=data[:32])
+
+ @classmethod
+ def load(cls, path: Union[Path, str]) -> ECCKey:
+ if not isinstance(path, (Path, str)):
+ raise ValueError(f"Expecting Path object or path string, got {path!r}")
+ with Path(path).open(mode="rb") as f:
+ return cls.loads(f.read())
+
+ def dumps(self, private_only=False):
+ if private_only:
+ return self.private_bytes()
+ return self.private_bytes() + self.public_bytes()
+
+ def dump(self, path: Union[Path, str], private_only=False) -> None:
+ if not isinstance(path, (Path, str)):
+ raise ValueError(f"Expecting Path object or path string, got {path!r}")
+ path = Path(path)
+ path.parent.mkdir(parents=True, exist_ok=True)
+ path.write_bytes(self.dumps(private_only))
+
+ @staticmethod
+ def _to_bytes(n: int) -> bytes:
+ byte_len = (n.bit_length() + 7) // 8
+ if byte_len % 2 != 0:
+ byte_len += 1
+ return n.to_bytes(byte_len, 'big')
+
+ def get_point(self, curve: Curve) -> Point:
+ return Point(self.key.pointQ.x, self.key.pointQ.y, curve)
+
+ def private_bytes(self) -> bytes:
+ return self._to_bytes(int(self.key.d))
+
+ def private_sha256_digest(self) -> bytes:
+ hash_object = SHA256.new()
+ hash_object.update(self.private_bytes())
+ return hash_object.digest()
+
+ def public_bytes(self) -> bytes:
+ return self._to_bytes(int(self.key.pointQ.x)) + self._to_bytes(int(self.key.pointQ.y))
+
+ def public_sha256_digest(self) -> bytes:
+ hash_object = SHA256.new()
+ hash_object.update(self.public_bytes())
+ return hash_object.digest()
\ No newline at end of file
diff --git a/modules/pyplayready/crypto/elgamal.py b/modules/pyplayready/crypto/elgamal.py
new file mode 100644
index 0000000..9409561
--- /dev/null
+++ b/modules/pyplayready/crypto/elgamal.py
@@ -0,0 +1,36 @@
+from typing import Tuple
+
+from ecpy.curves import Curve, Point
+import secrets
+
+
+class ElGamal:
+ def __init__(self, curve: Curve):
+ self.curve = curve
+
+ @staticmethod
+ def to_bytes(n: int) -> bytes:
+ byte_len = (n.bit_length() + 7) // 8
+ if byte_len % 2 != 0:
+ byte_len += 1
+ return n.to_bytes(byte_len, 'big')
+
+ def encrypt(
+ self,
+ message_point: Point,
+ public_key: Point
+ ) -> Tuple[Point, Point]:
+ ephemeral_key = secrets.randbelow(self.curve.order)
+ point1 = ephemeral_key * self.curve.generator
+ point2 = message_point + (ephemeral_key * public_key)
+ return point1, point2
+
+ @staticmethod
+ def decrypt(
+ encrypted: Tuple[Point, Point],
+ private_key: int
+ ) -> Point:
+ point1, point2 = encrypted
+ shared_secret = private_key * point1
+ decrypted_message = point2 - shared_secret
+ return decrypted_message
\ No newline at end of file
diff --git a/modules/pyplayready/device.py b/modules/pyplayready/device.py
new file mode 100644
index 0000000..e0c1a0d
--- /dev/null
+++ b/modules/pyplayready/device.py
@@ -0,0 +1,136 @@
+from __future__ import annotations
+
+import base64
+from enum import IntEnum
+from pathlib import Path
+from typing import Union, Any
+
+from modules.pyplayready.bcert import CertificateChain
+from modules.pyplayready.crypto.ecc_key import ECCKey
+
+from construct import Struct, Const, Int8ub, Bytes, this, Int32ub
+
+
+class DeviceStructs:
+ magic = Const(b"PRD")
+
+ header = Struct(
+ "signature" / magic,
+ "version" / Int8ub,
+ )
+
+ # was never in production
+ v1 = Struct(
+ "signature" / magic,
+ "version" / Int8ub,
+ "group_key_length" / Int32ub,
+ "group_key" / Bytes(this.group_key_length),
+ "group_certificate_length" / Int32ub,
+ "group_certificate" / Bytes(this.group_certificate_length)
+ )
+
+ v2 = Struct(
+ "signature" / magic,
+ "version" / Int8ub,
+ "group_certificate_length" / Int32ub,
+ "group_certificate" / Bytes(this.group_certificate_length),
+ "encryption_key" / Bytes(96),
+ "signing_key" / Bytes(96),
+ )
+
+ v3 = Struct(
+ "signature" / magic,
+ "version" / Int8ub,
+ "group_key" / Bytes(96),
+ "encryption_key" / Bytes(96),
+ "signing_key" / Bytes(96),
+ "group_certificate_length" / Int32ub,
+ "group_certificate" / Bytes(this.group_certificate_length),
+ )
+
+class Device:
+ """Represents a PlayReady Device (.prd)"""
+ CURRENT_STRUCT = DeviceStructs.v3
+ CURRENT_VERSION = 3
+
+ class SecurityLevel(IntEnum):
+ SL150 = 150
+ SL2000 = 2000
+ SL3000 = 3000
+
+ def __init__(
+ self,
+ *_: Any,
+ group_key: Union[str, bytes, None],
+ encryption_key: Union[str, bytes],
+ signing_key: Union[str, bytes],
+ group_certificate: Union[str, bytes],
+ **__: Any
+ ):
+ if isinstance(group_key, str):
+ group_key = base64.b64decode(group_key)
+
+ if isinstance(encryption_key, str):
+ encryption_key = base64.b64decode(encryption_key)
+ if not isinstance(encryption_key, bytes):
+ raise ValueError(f"Expecting Bytes or Base64 input, got {encryption_key!r}")
+
+ if isinstance(signing_key, str):
+ signing_key = base64.b64decode(signing_key)
+ if not isinstance(signing_key, bytes):
+ raise ValueError(f"Expecting Bytes or Base64 input, got {signing_key!r}")
+
+ if isinstance(group_certificate, str):
+ group_certificate = base64.b64decode(group_certificate)
+ if not isinstance(group_certificate, bytes):
+ raise ValueError(f"Expecting Bytes or Base64 input, got {group_certificate!r}")
+
+ self.group_key = None if group_key is None else ECCKey.loads(group_key)
+ self.encryption_key = ECCKey.loads(encryption_key)
+ self.signing_key = ECCKey.loads(signing_key)
+ self.group_certificate = CertificateChain.loads(group_certificate)
+ self.security_level = self.group_certificate.get_security_level()
+
+ @classmethod
+ def loads(cls, data: Union[str, bytes]) -> Device:
+ if isinstance(data, str):
+ data = base64.b64decode(data)
+ if not isinstance(data, bytes):
+ raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
+
+ prd_header = DeviceStructs.header.parse(data)
+ if prd_header.version == 2:
+ return cls(
+ group_key=None,
+ **DeviceStructs.v2.parse(data)
+ )
+
+ return cls(**cls.CURRENT_STRUCT.parse(data))
+
+ @classmethod
+ def load(cls, path: Union[Path, str]) -> Device:
+ if not isinstance(path, (Path, str)):
+ raise ValueError(f"Expecting Path object or path string, got {path!r}")
+ with Path(path).open(mode="rb") as f:
+ return cls.loads(f.read())
+
+ def dumps(self) -> bytes:
+ return self.CURRENT_STRUCT.build(dict(
+ version=self.CURRENT_VERSION,
+ group_key=self.group_key.dumps(),
+ encryption_key=self.encryption_key.dumps(),
+ signing_key=self.signing_key.dumps(),
+ group_certificate_length=len(self.group_certificate.dumps()),
+ group_certificate=self.group_certificate.dumps(),
+ ))
+
+ def dump(self, path: Union[Path, str]) -> None:
+ if not isinstance(path, (Path, str)):
+ raise ValueError(f"Expecting Path object or path string, got {path!r}")
+ path = Path(path)
+ path.parent.mkdir(parents=True, exist_ok=True)
+ path.write_bytes(self.dumps())
+
+ def get_name(self) -> str:
+ name = f"{self.group_certificate.get_name()}_sl{self.group_certificate.get_security_level()}"
+ return ''.join(char for char in name if (char.isalnum() or char in '_- ')).strip().lower().replace(" ", "_")
\ No newline at end of file
diff --git a/modules/pyplayready/exceptions.py b/modules/pyplayready/exceptions.py
new file mode 100644
index 0000000..68dc942
--- /dev/null
+++ b/modules/pyplayready/exceptions.py
@@ -0,0 +1,26 @@
+class PyPlayreadyException(Exception):
+ """Exceptions used by pyplayready."""
+
+
+class InvalidPssh(PyPlayreadyException):
+ """The Playready PSSH is invalid or empty."""
+
+
+class InvalidInitData(PyPlayreadyException):
+ """The Playready Cenc Header Data is invalid or empty."""
+
+
+class DeviceMismatch(PyPlayreadyException):
+ """The Remote CDMs Device information and the APIs Device information did not match."""
+
+
+class InvalidLicense(PyPlayreadyException):
+ """Unable to parse XMR License."""
+
+
+class InvalidCertificateChain(PyPlayreadyException):
+ """The BCert is not correctly formatted."""
+
+
+class OutdatedDevice(PyPlayreadyException):
+ """The PlayReady Device is outdated and does not support a specific operation."""
\ No newline at end of file
diff --git a/modules/pyplayready/key.py b/modules/pyplayready/key.py
new file mode 100644
index 0000000..b9e34af
--- /dev/null
+++ b/modules/pyplayready/key.py
@@ -0,0 +1,68 @@
+import base64
+from enum import Enum
+from uuid import UUID
+from typing import Union
+
+
+class Key:
+ class KeyType(Enum):
+ INVALID = 0x0000
+ AES_128_CTR = 0x0001
+ RC4_CIPHER = 0x0002
+ AES_128_ECB = 0x0003
+ COCKTAIL = 0x0004
+ AES_128_CBC = 0x0005
+ KEYEXCHANGE = 0x0006
+ UNKNOWN = 0xffff
+
+ @classmethod
+ def _missing_(cls, value):
+ return cls.UNKNOWN
+
+ class CipherType(Enum):
+ INVALID = 0x0000
+ RSA_1024 = 0x0001
+ CHAINED_LICENSE = 0x0002
+ ECC_256 = 0x0003
+ ECC_256_WITH_KZ = 0x0004
+ TEE_TRANSIENT = 0x0005
+ ECC_256_VIA_SYMMETRIC = 0x0006
+ UNKNOWN = 0xffff
+
+ @classmethod
+ def _missing_(cls, value):
+ return cls.UNKNOWN
+
+ def __init__(
+ self,
+ key_id: UUID,
+ key_type: int,
+ cipher_type: int,
+ key_length: int,
+ key: bytes
+ ):
+ self.key_id = key_id
+ self.key_type = self.KeyType(key_type)
+ self.cipher_type = self.CipherType(cipher_type)
+ self.key_length = key_length
+ self.key = key
+
+ @staticmethod
+ def kid_to_uuid(kid: Union[str, bytes]) -> UUID:
+ """
+ Convert a Key ID from a string or bytes to a UUID object.
+ At first, this may seem very simple, but some types of Key IDs
+ may not be 16 bytes and some may be decimal vs. hex.
+ """
+ if isinstance(kid, str):
+ kid = base64.b64decode(kid)
+ if not kid:
+ kid = b"\x00" * 16
+
+ if kid.decode(errors="replace").isdigit():
+ return UUID(int=int(kid.decode()))
+
+ if len(kid) < 16:
+ kid += b"\x00" * (16 - len(kid))
+
+ return UUID(bytes=kid)
\ No newline at end of file
diff --git a/modules/pyplayready/main.py b/modules/pyplayready/main.py
new file mode 100644
index 0000000..f0cbab0
--- /dev/null
+++ b/modules/pyplayready/main.py
@@ -0,0 +1,311 @@
+import logging
+from datetime import datetime
+from pathlib import Path
+from typing import Optional
+
+import click
+import requests
+from Crypto.Random import get_random_bytes
+
+from modules.pyplayready.system.bcert import CertificateChain, Certificate
+from modules.pyplayready.cdm import Cdm
+from modules.pyplayready.device import Device
+from modules.pyplayready.crypto.ecc_key import ECCKey
+from modules.pyplayready.exceptions import OutdatedDevice
+from modules.pyplayready.system.pssh import PSSH
+
+@click.group(invoke_without_command=True)
+@click.option("-v", "--version", is_flag=True, default=False, help="Print version information.")
+@click.option("-d", "--debug", is_flag=True, default=False, help="Enable DEBUG level logs.")
+def main(version: bool, debug: bool) -> None:
+ """Python PlayReady CDM implementation"""
+ logging.basicConfig(level=logging.DEBUG if debug else logging.INFO)
+ log = logging.getLogger()
+
+ current_year = datetime.now().year
+ copyright_years = f"2024-{current_year}"
+
+ log.info("pyplayready version %s Copyright (c) %s DevLARLEY, Erevoc, DevataDev", __version__, copyright_years)
+ log.info("https://github.com/ready-dl/pyplayready")
+ log.info("Run 'pyplayready --help' for help")
+ if version:
+ return
+
+
+@main.command(name="license")
+@click.argument("device_path", type=Path)
+@click.argument("pssh", type=PSSH)
+@click.argument("server", type=str)
+def license_(device_path: Path, pssh: PSSH, server: str) -> None:
+ """
+ Make a License Request to a server using a given PSSH
+ Will return a list of all keys within the returned license
+
+ Only works for standard license servers that don't use any license wrapping
+ """
+ log = logging.getLogger("license")
+
+ device = Device.load(device_path)
+ log.info(f"Loaded Device: {device.get_name()}")
+
+ cdm = Cdm.from_device(device)
+ log.info("Loaded CDM")
+
+ session_id = cdm.open()
+ log.info("Opened Session")
+
+ challenge = cdm.get_license_challenge(session_id, pssh.get_wrm_headers(downgrade_to_v4=True)[0])
+ log.info("Created License Request (Challenge)")
+ log.debug(challenge)
+
+ license_res = requests.post(
+ url=server,
+ headers={
+ 'Content-Type': 'text/xml; charset=UTF-8',
+ },
+ data=challenge
+ )
+
+ if license_res.status_code != 200:
+ log.error(f"Failed to send challenge: [{license_res.status_code}] {license_res.text}")
+ return
+
+ licence = license_res.text
+ log.info("Got License Message")
+ log.debug(licence)
+
+ cdm.parse_license(session_id, licence)
+ log.info("License Parsed Successfully")
+
+ for key in cdm.get_keys(session_id):
+ log.info(f"{key.key_id.hex}:{key.key.hex()}")
+
+ cdm.close(session_id)
+ log.info("Clossed Session")
+
+
+@main.command()
+@click.argument("device", type=Path)
+@click.pass_context
+def test(ctx: click.Context, device: Path) -> None:
+ """
+ Test the CDM code by getting Content Keys for the Tears Of Steel demo on the Playready Test Server.
+ https://testweb.playready.microsoft.com/Content/Content2X
+ + DASH Manifest URL: https://test.playready.microsoft.com/media/profficialsite/tearsofsteel_4k.ism/manifest.mpd
+ + MSS Manifest URL: https://test.playready.microsoft.com/media/profficialsite/tearsofsteel_4k.ism.smoothstreaming/manifest
+
+ The device argument is a Path to a Playready Device (.prd) file which contains the device's group key and
+ group certificate.
+ """
+ pssh = PSSH(
+ "AAADfHBzc2gAAAAAmgTweZhAQoarkuZb4IhflQAAA1xcAwAAAQABAFIDPABXAFIATQBIAEUAQQBEAEUAUgAgAHgAbQBsAG4AcwA9ACIAaAB0AH"
+ "QAcAA6AC8ALwBzAGMAaABlAG0AYQBzAC4AbQBpAGMAcgBvAHMAbwBmAHQALgBjAG8AbQAvAEQAUgBNAC8AMgAwADAANwAvADAAMwAvAFAAbABh"
+ "AHkAUgBlAGEAZAB5AEgAZQBhAGQAZQByACIAIAB2AGUAcgBzAGkAbwBuAD0AIgA0AC4AMAAuADAALgAwACIAPgA8AEQAQQBUAEEAPgA8AFAAUg"
+ "BPAFQARQBDAFQASQBOAEYATwA+ADwASwBFAFkATABFAE4APgAxADYAPAAvAEsARQBZAEwARQBOAD4APABBAEwARwBJAEQAPgBBAEUAUwBDAFQA"
+ "UgA8AC8AQQBMAEcASQBEAD4APAAvAFAAUgBPAFQARQBDAFQASQBOAEYATwA+ADwASwBJAEQAPgA0AFIAcABsAGIAKwBUAGIATgBFAFMAOAB0AE"
+ "cAawBOAEYAVwBUAEUASABBAD0APQA8AC8ASwBJAEQAPgA8AEMASABFAEMASwBTAFUATQA+AEsATABqADMAUQB6AFEAUAAvAE4AQQA9ADwALwBD"
+ "AEgARQBDAEsAUwBVAE0APgA8AEwAQQBfAFUAUgBMAD4AaAB0AHQAcABzADoALwAvAHAAcgBvAGYAZgBpAGMAaQBhAGwAcwBpAHQAZQAuAGsAZQ"
+ "B5AGQAZQBsAGkAdgBlAHIAeQAuAG0AZQBkAGkAYQBzAGUAcgB2AGkAYwBlAHMALgB3AGkAbgBkAG8AdwBzAC4AbgBlAHQALwBQAGwAYQB5AFIA"
+ "ZQBhAGQAeQAvADwALwBMAEEAXwBVAFIATAA+ADwAQwBVAFMAVABPAE0AQQBUAFQAUgBJAEIAVQBUAEUAUwA+ADwASQBJAFMAXwBEAFIATQBfAF"
+ "YARQBSAFMASQBPAE4APgA4AC4AMQAuADIAMwAwADQALgAzADEAPAAvAEkASQBTAF8ARABSAE0AXwBWAEUAUgBTAEkATwBOAD4APAAvAEMAVQBT"
+ "AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA=="
+ )
+
+ license_server = "https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:2000)"
+
+ ctx.invoke(
+ license_,
+ device_path=device,
+ pssh=pssh,
+ server=license_server
+ )
+
+
+@main.command()
+@click.option("-k", "--group_key", type=Path, required=True, help="Device ECC private group key")
+@click.option("-c", "--group_certificate", type=Path, required=True, help="Device group certificate chain")
+@click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory")
+@click.pass_context
+def create_device(
+ ctx: click.Context,
+ group_key: Path,
+ group_certificate: Path,
+ output: Optional[Path] = None
+) -> None:
+ """Create a Playready Device (.prd) file from an ECC private group key and group certificate chain"""
+ if not group_key.is_file():
+ raise click.UsageError("group_key: Not a path to a file, or it doesn't exist.", ctx)
+ if not group_certificate.is_file():
+ raise click.UsageError("group_certificate: Not a path to a file, or it doesn't exist.", ctx)
+
+ log = logging.getLogger("create-device")
+
+ encryption_key = ECCKey.generate()
+ signing_key = ECCKey.generate()
+
+ group_key = ECCKey.load(group_key)
+ certificate_chain = CertificateChain.load(group_certificate)
+
+ new_certificate = Certificate.new_leaf_cert(
+ cert_id=get_random_bytes(16),
+ security_level=certificate_chain.get_security_level(),
+ client_id=get_random_bytes(16),
+ signing_key=signing_key,
+ encryption_key=encryption_key,
+ group_key=group_key,
+ parent=certificate_chain
+ )
+ certificate_chain.prepend(new_certificate)
+
+ device = Device(
+ group_key=group_key.dumps(),
+ encryption_key=encryption_key.dumps(),
+ signing_key=signing_key.dumps(),
+ group_certificate=certificate_chain.dumps(),
+ )
+
+ if output and output.suffix:
+ if output.suffix.lower() != ".prd":
+ log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.")
+ out_path = output
+ else:
+ out_dir = output or Path.cwd()
+ out_path = out_dir / f"{device.get_name()}.prd"
+
+ if out_path.exists():
+ log.error(f"A file already exists at the path '{out_path}', cannot overwrite.")
+ return
+
+ out_path.parent.mkdir(parents=True, exist_ok=True)
+ out_path.write_bytes(device.dumps())
+
+ log.info("Created Playready Device (.prd) file, %s", out_path.name)
+ log.info(" + Security Level: %s", device.security_level)
+ log.info(" + Group Key: %s bytes", len(device.group_key.dumps()))
+ log.info(" + Encryption Key: %s bytes", len(device.encryption_key.dumps()))
+ log.info(" + Signing Key: %s bytes", len(device.signing_key.dumps()))
+ log.info(" + Group Certificate: %s bytes", len(device.group_certificate.dumps()))
+ log.info(" + Saved to: %s", out_path.absolute())
+
+
+@main.command()
+@click.argument("prd_path", type=Path)
+@click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory")
+@click.pass_context
+def reprovision_device(ctx: click.Context, prd_path: Path, output: Optional[Path] = None) -> None:
+ """
+ Reprovision a Playready Device (.prd) by creating a new leaf certificate and new encryption/signing keys.
+ Will override the device if an output path or directory is not specified
+
+ Only works on PRD Devices of v3 or higher
+ """
+ if not prd_path.is_file():
+ raise click.UsageError("prd_path: Not a path to a file, or it doesn't exist.", ctx)
+
+ log = logging.getLogger("reprovision-device")
+ log.info("Reprovisioning Playready Device (.prd) file, %s", prd_path.name)
+
+ device = Device.load(prd_path)
+
+ if device.group_key is None:
+ raise OutdatedDevice("Device does not support reprovisioning, re-create it or use a Device with a version of 3 or higher")
+
+ device.group_certificate.remove(0)
+
+ encryption_key = ECCKey.generate()
+ signing_key = ECCKey.generate()
+
+ device.encryption_key = encryption_key
+ device.signing_key = signing_key
+
+ new_certificate = Certificate.new_leaf_cert(
+ cert_id=get_random_bytes(16),
+ security_level=device.group_certificate.get_security_level(),
+ client_id=get_random_bytes(16),
+ signing_key=signing_key,
+ encryption_key=encryption_key,
+ group_key=device.group_key,
+ parent=device.group_certificate
+ )
+ device.group_certificate.prepend(new_certificate)
+
+ if output and output.suffix:
+ if output.suffix.lower() != ".prd":
+ log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.")
+ out_path = output
+ else:
+ out_path = prd_path
+
+ out_path.parent.mkdir(parents=True, exist_ok=True)
+ out_path.write_bytes(device.dumps())
+
+ log.info("Reprovisioned Playready Device (.prd) file, %s", out_path.name)
+
+
+@main.command()
+@click.argument("prd_path", type=Path)
+@click.option("-o", "--out_dir", type=Path, default=None, help="Output Directory")
+@click.pass_context
+def export_device(ctx: click.Context, prd_path: Path, out_dir: Optional[Path] = None) -> None:
+ """
+ Export a Playready Device (.prd) file to a Group Key and Group Certificate
+ If an output directory is not specified, it will be stored in the current working directory
+ """
+ if not prd_path.is_file():
+ raise click.UsageError("prd_path: Not a path to a file, or it doesn't exist.", ctx)
+
+ log = logging.getLogger("export-device")
+ log.info("Exporting Playready Device (.prd) file, %s", prd_path.stem)
+
+ if not out_dir:
+ out_dir = Path.cwd()
+
+ out_path = out_dir / prd_path.stem
+ if out_path.exists():
+ if any(out_path.iterdir()):
+ log.error("Output directory is not empty, cannot overwrite.")
+ return
+ else:
+ log.warning("Output directory already exists, but is empty.")
+ else:
+ out_path.mkdir(parents=True)
+
+ device = Device.load(prd_path)
+
+ log.info(f"SL{device.security_level} {device.get_name()}")
+ log.info(f"Saving to: {out_path}")
+
+ if device.group_key:
+ group_key_path = out_path / "zgpriv.dat"
+ group_key_path.write_bytes(device.group_key.dumps(private_only=True))
+ log.info("Exported Group Key as zgpriv.dat")
+ else:
+ log.warning("Cannot export zgpriv.dat, as v2 devices do not save the group key")
+
+ # remove leaf cert to unprovision it
+ device.group_certificate.remove(0)
+
+ client_id_path = out_path / "bgroupcert.dat"
+ client_id_path.write_bytes(device.group_certificate.dumps())
+ log.info("Exported Group Certificate to bgroupcert.dat")
+
+
+@main.command("serve", short_help="Serve your local CDM and Playready Devices Remotely.")
+@click.argument("config_path", type=Path)
+@click.option("-h", "--host", type=str, default="127.0.0.1", help="Host to serve from.")
+@click.option("-p", "--port", type=int, default=7723, help="Port to serve from.")
+def serve_(config_path: Path, host: str, port: int) -> None:
+ """
+ Serve your local CDM and Playready Devices Remotely.
+
+ [CONFIG] is a path to a serve config file.
+ See `serve.example.yml` for an example config file.
+
+ Host as 127.0.0.1 may block remote access even if port-forwarded.
+ Instead, use 0.0.0.0 and ensure the TCP port you choose is forwarded.
+ """
+ from pyplayready.remote import serve
+ import yaml
+
+ config = yaml.safe_load(config_path.read_text(encoding="utf8"))
+ serve.run(config, host, port)
\ No newline at end of file
diff --git a/modules/pyplayready/pssh.py b/modules/pyplayready/pssh.py
new file mode 100644
index 0000000..d68bfc3
--- /dev/null
+++ b/modules/pyplayready/pssh.py
@@ -0,0 +1,98 @@
+import base64
+from typing import Union, List
+from uuid import UUID
+
+from construct import Struct, Int32ul, Int16ul, Array, this, Bytes, Switch, Int32ub, Const, Container, ConstructError
+
+from modules.pyplayready.exceptions import InvalidPssh
+from modules.pyplayready.wrmheader import WRMHeader
+
+
+class _PlayreadyPSSHStructs:
+ PSSHBox = Struct(
+ "length" / Int32ub,
+ "pssh" / Const(b"pssh"),
+ "fullbox" / Int32ub,
+ "system_id" / Bytes(16),
+ "data_length" / Int32ub,
+ "data" / Bytes(this.data_length)
+ )
+
+ PlayreadyObject = Struct(
+ "type" / Int16ul,
+ "length" / Int16ul,
+ "data" / Switch(
+ this.type,
+ {
+ 1: Bytes(this.length)
+ },
+ default=Bytes(this.length)
+ )
+ )
+
+ PlayreadyHeader = Struct(
+ "length" / Int32ul,
+ "record_count" / Int16ul,
+ "records" / Array(this.record_count, PlayreadyObject)
+ )
+
+
+class PSSH(_PlayreadyPSSHStructs):
+ """Represents a PlayReady PSSH"""
+
+ SYSTEM_ID = UUID(hex="9a04f07998404286ab92e65be0885f95")
+
+ def __init__(self, data: Union[str, bytes]):
+ """Load a PSSH Box, PlayReady Header or PlayReady Object"""
+
+ if not data:
+ raise InvalidPssh("Data must not be empty")
+
+ if isinstance(data, str):
+ try:
+ data = base64.b64decode(data)
+ except Exception as e:
+ raise InvalidPssh(f"Could not decode data as Base64, {e}")
+
+ self.wrm_headers: List[WRMHeader]
+ try:
+ # PSSH Box -> PlayReady Header
+ box = self.PSSHBox.parse(data)
+ prh = self.PlayreadyHeader.parse(box.data)
+ self.wrm_headers = self._read_playready_objects(prh)
+ except ConstructError:
+ if int.from_bytes(data[:2], byteorder="little") > 3:
+ try:
+ # PlayReady Header
+ prh = self.PlayreadyHeader.parse(data)
+ self.wrm_headers = self._read_playready_objects(prh)
+ except ConstructError:
+ raise InvalidPssh("Could not parse data as a PSSH Box nor a PlayReady Header")
+ else:
+ try:
+ # PlayReady Object
+ pro = self.PlayreadyObject.parse(data)
+ self.wrm_headers = [WRMHeader(pro.data)]
+ except ConstructError:
+ raise InvalidPssh("Could not parse data as a PSSH Box nor a PlayReady Object")
+
+ @staticmethod
+ def _read_playready_objects(header: Container) -> List[WRMHeader]:
+ return list(map(
+ lambda pro: WRMHeader(pro.data),
+ filter(
+ lambda pro: pro.type == 1,
+ header.records
+ )
+ ))
+
+ def get_wrm_headers(self, downgrade_to_v4: bool = False) -> List[str]:
+ """
+ Return a list of all WRM Headers in the PSSH as plaintext strings
+
+ downgrade_to_v4: Downgrade the WRM Header to version 4.0.0.0 to use AES-CBC instead of AES-CTR
+ """
+ return list(map(
+ lambda wrm_header: wrm_header.to_v4_0_0_0() if downgrade_to_v4 else wrm_header.dumps(),
+ self.wrm_headers
+ ))
\ No newline at end of file
diff --git a/modules/pyplayready/wrmheader.py b/modules/pyplayready/wrmheader.py
new file mode 100644
index 0000000..5626d7c
--- /dev/null
+++ b/modules/pyplayready/wrmheader.py
@@ -0,0 +1,201 @@
+import base64
+import binascii
+from enum import Enum
+from typing import Optional, List, Union, Tuple
+
+import xmltodict
+
+
+class WRMHeader:
+ """Represents a PlayReady WRM Header"""
+
+ class SignedKeyID:
+ def __init__(
+ self,
+ alg_id: str,
+ value: str,
+ checksum: str
+ ):
+ self.alg_id = alg_id
+ self.value = value
+ self.checksum = checksum
+
+ def __repr__(self):
+ return f'SignedKeyID(alg_id={self.alg_id}, value="{self.value}", checksum="{self.checksum}")'
+
+ class Version(Enum):
+ VERSION_4_0_0_0 = "4.0.0.0"
+ VERSION_4_1_0_0 = "4.1.0.0"
+ VERSION_4_2_0_0 = "4.2.0.0"
+ VERSION_4_3_0_0 = "4.3.0.0"
+ UNKNOWN = "UNKNOWN"
+
+ @classmethod
+ def _missing_(cls, value):
+ return cls.UNKNOWN
+
+ _RETURN_STRUCTURE = Tuple[List[SignedKeyID], Union[str, None], Union[str, None], Union[str, None]]
+
+ def __init__(self, data: Union[str, bytes]):
+ """Load a WRM Header from either a string, base64 encoded data or bytes"""
+
+ if not data:
+ raise ValueError("Data must not be empty")
+
+ if isinstance(data, str):
+ try:
+ data = base64.b64decode(data).decode()
+ except (binascii.Error, binascii.Incomplete):
+ data = data.encode()
+
+ self._raw_data: bytes = data
+ self._parsed = xmltodict.parse(self._raw_data)
+
+ self._header = self._parsed.get('WRMHEADER')
+ if not self._header:
+ raise ValueError("Data is not a valid WRMHEADER")
+
+ self.version = self.Version(self._header.get('@version'))
+
+ @staticmethod
+ def _ensure_list(element: Union[dict, list]) -> List:
+ if isinstance(element, dict):
+ return [element]
+ return element
+
+ def to_v4_0_0_0(self) -> str:
+ """
+ Build a v4.0.0.0 WRM header from any possible WRM Header version
+
+ Note: Will ignore any remaining Key IDs if there's more than just one
+ """
+ return self._build_v4_0_0_0_wrm_header(*self.read_attributes())
+
+ @staticmethod
+ def _read_v4_0_0_0(data: dict) -> _RETURN_STRUCTURE:
+ protect_info = data.get("PROTECTINFO")
+
+ return (
+ [WRMHeader.SignedKeyID(
+ alg_id=protect_info["ALGID"],
+ value=data["KID"],
+ checksum=data.get("CHECKSUM")
+ )],
+ data.get("LA_URL"),
+ data.get("LUI_URL"),
+ data.get("DS_ID")
+ )
+
+ @staticmethod
+ def _read_v4_1_0_0(data: dict) -> _RETURN_STRUCTURE:
+ protect_info = data.get("PROTECTINFO")
+
+ key_ids = []
+ if protect_info:
+ kid = protect_info["KID"]
+ if kid:
+ key_ids = [WRMHeader.SignedKeyID(
+ alg_id=kid["@ALGID"],
+ value=kid["@VALUE"],
+ checksum=kid.get("@CHECKSUM")
+ )]
+
+ return (
+ key_ids,
+ data.get("LA_URL"),
+ data.get("LUI_URL"),
+ data.get("DS_ID")
+ )
+
+ @staticmethod
+ def _read_v4_2_0_0(data: dict) -> _RETURN_STRUCTURE:
+ protect_info = data.get("PROTECTINFO")
+
+ key_ids = []
+ if protect_info:
+ kids = protect_info["KIDS"]
+ if kids:
+ for kid in WRMHeader._ensure_list(kids["KID"]):
+ key_ids.append(WRMHeader.SignedKeyID(
+ alg_id=kid["@ALGID"],
+ value=kid["@VALUE"],
+ checksum=kid.get("@CHECKSUM")
+ ))
+
+ return (
+ key_ids,
+ data.get("LA_URL"),
+ data.get("LUI_URL"),
+ data.get("DS_ID")
+ )
+
+ @staticmethod
+ def _read_v4_3_0_0(data: dict) -> _RETURN_STRUCTURE:
+ protect_info = data.get("PROTECTINFO")
+
+ key_ids = []
+ if protect_info:
+ kids = protect_info["KIDS"]
+ for kid in WRMHeader._ensure_list(kids["KID"]):
+ key_ids.append(WRMHeader.SignedKeyID(
+ alg_id=kid.get("@ALGID"),
+ value=kid["@VALUE"],
+ checksum=kid.get("@CHECKSUM")
+ ))
+
+ return (
+ key_ids,
+ data.get("LA_URL"),
+ data.get("LUI_URL"),
+ data.get("DS_ID")
+ )
+
+ def read_attributes(self) -> _RETURN_STRUCTURE:
+ """
+ Read any non-custom XML attributes
+
+ Returns a tuple structured like this: Tuple[List[SignedKeyID], , , ]
+ """
+
+ data = self._header.get("DATA")
+ if not data:
+ raise ValueError("Not a valid PlayReady Header Record, WRMHEADER/DATA required")
+
+ if self.version == self.Version.VERSION_4_0_0_0:
+ return self._read_v4_0_0_0(data)
+ elif self.version == self.Version.VERSION_4_1_0_0:
+ return self._read_v4_1_0_0(data)
+ elif self.version == self.Version.VERSION_4_2_0_0:
+ return self._read_v4_2_0_0(data)
+ elif self.version == self.Version.VERSION_4_3_0_0:
+ return self._read_v4_3_0_0(data)
+
+ @staticmethod
+ def _build_v4_0_0_0_wrm_header(
+ key_ids: List[SignedKeyID],
+ la_url: Optional[str],
+ lui_url: Optional[str],
+ ds_id: Optional[str]
+ ) -> str:
+ if len(key_ids) == 0:
+ raise Exception("No Key IDs available")
+
+ key_id = key_ids[0]
+ return (
+ ''
+ ''
+ ''
+ '16'
+ 'AESCTR'
+ ''
+ f'{key_id.value}' +
+ (f'{la_url}' if la_url else '') +
+ (f'{lui_url}' if lui_url else '') +
+ (f'{ds_id}' if ds_id else '') +
+ (f'{key_id.checksum}' if key_id.checksum else '') +
+ ''
+ ''
+ )
+
+ def dumps(self) -> str:
+ return self._raw_data.decode("utf-16-le")
\ No newline at end of file
diff --git a/modules/pyplayready/xml_key.py b/modules/pyplayready/xml_key.py
new file mode 100644
index 0000000..424f1d6
--- /dev/null
+++ b/modules/pyplayready/xml_key.py
@@ -0,0 +1,18 @@
+from ecpy.curves import Point, Curve
+
+from modules.pyplayready.crypto.ecc_key import ECCKey
+from modules.pyplayready.crypto.elgamal import ElGamal
+
+
+class XmlKey:
+ def __init__(self):
+ self._shared_point = ECCKey.generate()
+ self.shared_key_x = self._shared_point.key.pointQ.x
+ self.shared_key_y = self._shared_point.key.pointQ.y
+
+ self._shared_key_x_bytes = ElGamal.to_bytes(int(self.shared_key_x))
+ self.aes_iv = self._shared_key_x_bytes[:16]
+ self.aes_key = self._shared_key_x_bytes[16:]
+
+ def get_point(self, curve: Curve) -> Point:
+ return Point(self.shared_key_x, self.shared_key_y, curve)
\ No newline at end of file
diff --git a/modules/pyplayready/xmrlicense.py b/modules/pyplayready/xmrlicense.py
new file mode 100644
index 0000000..13edab0
--- /dev/null
+++ b/modules/pyplayready/xmrlicense.py
@@ -0,0 +1,252 @@
+from __future__ import annotations
+
+import base64
+from pathlib import Path
+from typing import Union
+
+from construct import Const, GreedyRange, Struct, Int32ub, Bytes, Int16ub, this, Switch, LazyBound, Array, Container
+
+
+class _XMRLicenseStructs:
+ PlayEnablerType = Struct(
+ "player_enabler_type" / Bytes(16)
+ )
+
+ DomainRestrictionObject = Struct(
+ "account_id" / Bytes(16),
+ "revision" / Int32ub
+ )
+
+ IssueDateObject = Struct(
+ "issue_date" / Int32ub
+ )
+
+ RevInfoVersionObject = Struct(
+ "sequence" / Int32ub
+ )
+
+ SecurityLevelObject = Struct(
+ "minimum_security_level" / Int16ub
+ )
+
+ EmbeddedLicenseSettingsObject = Struct(
+ "indicator" / Int16ub
+ )
+
+ ECCKeyObject = Struct(
+ "curve_type" / Int16ub,
+ "key_length" / Int16ub,
+ "key" / Bytes(this.key_length)
+ )
+
+ SignatureObject = Struct(
+ "signature_type" / Int16ub,
+ "signature_data_length" / Int16ub,
+ "signature_data" / Bytes(this.signature_data_length)
+ )
+
+ ContentKeyObject = Struct(
+ "key_id" / Bytes(16),
+ "key_type" / Int16ub,
+ "cipher_type" / Int16ub,
+ "key_length" / Int16ub,
+ "encrypted_key" / Bytes(this.key_length)
+ )
+
+ RightsSettingsObject = Struct(
+ "rights" / Int16ub
+ )
+
+ OutputProtectionLevelRestrictionObject = Struct(
+ "minimum_compressed_digital_video_opl" / Int16ub,
+ "minimum_uncompressed_digital_video_opl" / Int16ub,
+ "minimum_analog_video_opl" / Int16ub,
+ "minimum_digital_compressed_audio_opl" / Int16ub,
+ "minimum_digital_uncompressed_audio_opl" / Int16ub,
+ )
+
+ ExpirationRestrictionObject = Struct(
+ "begin_date" / Int32ub,
+ "end_date" / Int32ub
+ )
+
+ RemovalDateObject = Struct(
+ "removal_date" / Int32ub
+ )
+
+ UplinkKIDObject = Struct(
+ "uplink_kid" / Bytes(16),
+ "chained_checksum_type" / Int16ub,
+ "chained_checksum_length" / Int16ub,
+ "chained_checksum" / Bytes(this.chained_checksum_length)
+ )
+
+ AnalogVideoOutputConfigurationRestriction = Struct(
+ "video_output_protection_id" / Bytes(16),
+ "binary_configuration_data" / Bytes(this._.length - 24)
+ )
+
+ DigitalVideoOutputRestrictionObject = Struct(
+ "video_output_protection_id" / Bytes(16),
+ "binary_configuration_data" / Bytes(this._.length - 24)
+ )
+
+ DigitalAudioOutputRestrictionObject = Struct(
+ "audio_output_protection_id" / Bytes(16),
+ "binary_configuration_data" / Bytes(this._.length - 24)
+ )
+
+ PolicyMetadataObject = Struct(
+ "metadata_type" / Bytes(16),
+ "policy_data" / Bytes(this._.length)
+ )
+
+ SecureStopRestrictionObject = Struct(
+ "metering_id" / Bytes(16)
+ )
+
+ MeteringRestrictionObject = Struct(
+ "metering_id" / Bytes(16)
+ )
+
+ ExpirationAfterFirstPlayRestrictionObject = Struct(
+ "seconds" / Int32ub
+ )
+
+ GracePeriodObject = Struct(
+ "grace_period" / Int32ub
+ )
+
+ SourceIdObject = Struct(
+ "source_id" / Int32ub
+ )
+
+ AuxiliaryKey = Struct(
+ "location" / Int32ub,
+ "key" / Bytes(16)
+ )
+
+ AuxiliaryKeysObject = Struct(
+ "count" / Int16ub,
+ "auxiliary_keys" / Array(this.count, AuxiliaryKey)
+ )
+
+ UplinkKeyObject3 = Struct(
+ "uplink_key_id" / Bytes(16),
+ "chained_length" / Int16ub,
+ "checksum" / Bytes(this.chained_length),
+ "count" / Int16ub,
+ "entries" / Array(this.count, Int32ub)
+ )
+
+ CopyEnablerObject = Struct(
+ "copy_enabler_type" / Bytes(16)
+ )
+
+ CopyCountRestrictionObject = Struct(
+ "count" / Int32ub
+ )
+
+ MoveObject = Struct(
+ "minimum_move_protection_level" / Int32ub
+ )
+
+ XmrObject = Struct(
+ "flags" / Int16ub,
+ "type" / Int16ub,
+ "length" / Int32ub,
+ "data" / Switch(
+ lambda ctx: ctx.type,
+ {
+ 0x0005: OutputProtectionLevelRestrictionObject,
+ 0x0008: AnalogVideoOutputConfigurationRestriction,
+ 0x000a: ContentKeyObject,
+ 0x000b: SignatureObject,
+ 0x000d: RightsSettingsObject,
+ 0x0012: ExpirationRestrictionObject,
+ 0x0013: IssueDateObject,
+ 0x0016: MeteringRestrictionObject,
+ 0x001a: GracePeriodObject,
+ 0x0022: SourceIdObject,
+ 0x002a: ECCKeyObject,
+ 0x002c: PolicyMetadataObject,
+ 0x0029: DomainRestrictionObject,
+ 0x0030: ExpirationAfterFirstPlayRestrictionObject,
+ 0x0031: DigitalAudioOutputRestrictionObject,
+ 0x0032: RevInfoVersionObject,
+ 0x0033: EmbeddedLicenseSettingsObject,
+ 0x0034: SecurityLevelObject,
+ 0x0037: MoveObject,
+ 0x0039: PlayEnablerType,
+ 0x003a: CopyEnablerObject,
+ 0x003b: UplinkKIDObject,
+ 0x003d: CopyCountRestrictionObject,
+ 0x0050: RemovalDateObject,
+ 0x0051: AuxiliaryKeysObject,
+ 0x0052: UplinkKeyObject3,
+ 0x005a: SecureStopRestrictionObject,
+ 0x0059: DigitalVideoOutputRestrictionObject
+ },
+ default=LazyBound(lambda ctx: _XMRLicenseStructs.XmrObject)
+ )
+ )
+
+ XmrLicense = Struct(
+ "signature" / Const(b"XMR\x00"),
+ "xmr_version" / Int32ub,
+ "rights_id" / Bytes(16),
+ "containers" / GreedyRange(XmrObject)
+ )
+
+
+class XMRLicense(_XMRLicenseStructs):
+ """Represents an XMRLicense"""
+
+ def __init__(
+ self,
+ parsed_license: Container,
+ license_obj: _XMRLicenseStructs.XmrLicense = _XMRLicenseStructs.XmrLicense
+ ):
+ self.parsed = parsed_license
+ self._license_obj = license_obj
+
+ @classmethod
+ def loads(cls, data: Union[str, bytes]) -> XMRLicense:
+ if isinstance(data, str):
+ data = base64.b64decode(data)
+ if not isinstance(data, bytes):
+ raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
+
+ licence = _XMRLicenseStructs.XmrLicense
+ return cls(
+ parsed_license=licence.parse(data),
+ license_obj=licence
+ )
+
+ @classmethod
+ def load(cls, path: Union[Path, str]) -> XMRLicense:
+ if not isinstance(path, (Path, str)):
+ raise ValueError(f"Expecting Path object or path string, got {path!r}")
+ with Path(path).open(mode="rb") as f:
+ return cls.loads(f.read())
+
+ def dumps(self) -> bytes:
+ return self._license_obj.build(self.parsed)
+
+ def struct(self) -> _XMRLicenseStructs.XmrLicense:
+ return self._license_obj
+
+ def _locate(self, container: Container):
+ if container.flags == 2 or container.flags == 3:
+ return self._locate(container.data)
+ else:
+ return container
+
+ def get_object(self, type_: int):
+ for obj in self.parsed.containers:
+ container = self._locate(obj)
+ if container.type == type_:
+ yield container.data
+
+ def get_content_keys(self):
+ yield from self.get_object(10)
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..d843672
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,36 @@
+aiohappyeyeballs==2.4.4
+aiohttp==3.11.11
+aiosignal==1.3.2
+async-timeout==5.0.1
+attrs==25.1.0
+blinker==1.9.0
+certifi==2025.1.31
+charset-normalizer==3.4.1
+click==8.1.8
+colorama==0.4.6
+coloredlogs==15.0.1
+construct==2.8.8
+ECPy==1.2.5
+Flask==3.1.0
+Flask-Cors==5.0.0
+frozenlist==1.5.0
+humanfriendly==10.0
+idna==3.10
+itsdangerous==2.2.0
+Jinja2==3.1.5
+loguru==0.7.3
+MarkupSafe==3.0.2
+multidict==6.1.0
+propcache==0.2.1
+pycryptodome==3.21.0
+pyfiglet==1.0.2
+pyreadline3==3.5.4
+python-dotenv==1.0.1
+PyYAML==6.0.2
+requests==2.32.3
+typing_extensions==4.12.2
+urllib3==2.3.0
+Werkzeug==3.1.3
+win32_setctime==1.2.0
+xmltodict==0.14.2
+yarl==1.18.3
diff --git a/routes/__init__.py b/routes/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/routes/playready.py b/routes/playready.py
new file mode 100644
index 0000000..6e76440
--- /dev/null
+++ b/routes/playready.py
@@ -0,0 +1,47 @@
+from flask import Blueprint, request, jsonify
+from flask_cors import cross_origin, CORS
+from modules.config import apikey_required
+from modules.playready import PLAYREADY
+from modules.logging import setup_logging
+
+# Setup logging
+logging = setup_logging()
+
+playready_bp = Blueprint('playready_bp', __name__)
+CORS(playready_bp, resources={r"/*": {"origins": "*"}}, supports_credentials=True, allow_headers=["Content-Type", "X-API-KEY"])
+
+@playready_bp.route('/extension', methods=['POST'])
+@apikey_required
+@cross_origin()
+def extension():
+ if not request.is_json:
+ return jsonify({"error": "Missing JSON in request"}), 400
+
+ data = request.get_json()
+ action = data.get('action', None)
+
+ if not action:
+ return jsonify({"error": "Missing action in request"}), 400
+
+ if action == "Challenge?":
+ pssh = data.get('pssh', None)
+
+ if not pssh:
+ return jsonify({"error": "Missing pssh in request"}), 400
+
+ playready = PLAYREADY()
+ playready.pssh = pssh
+ return playready.get_license_challenge()
+
+ elif action == "Keys?":
+ license = data.get('license', None)
+
+ if not license:
+ return jsonify({"error": "Missing license in request"}), 400
+
+ playready = PLAYREADY()
+ playready.license = license
+ return playready.get_license_keys()
+
+ else:
+ return jsonify({"error": "Unknown action"}), 400
\ No newline at end of file