This commit is contained in:
Pari Malam
2025-02-19 17:23:48 +08:00
parent c057abec2a
commit 8997df208b
18 changed files with 237 additions and 2049 deletions

View File

@@ -1,6 +1,6 @@
[DEFAULT]
FLASK_DEBUG = # True or False
FLASK_ENV = # Development or Production
FLASK_APP = # app.py
FLASK_RUN_HOST = # LOCAL IP
FLASK_RUN_PORT = # LOCAL PORT
FLASK_DEBUG = True
FLASK_ENV = Development
FLASK_APP = app.py
FLASK_RUN_HOST = 0.0.0.0
FLASK_RUN_PORT = 1337

View File

@@ -1,2 +1,3 @@
[PLAYREADY]
DEVICE = device/example.prd # your device filename
[CDM]
DEVICE_FILE = device/ # DEVICE FILENAME
DEVICE_NAME = # DEVICE NAME

View File

@@ -10,12 +10,13 @@ def clear_terminal():
def banners():
clear_terminal()
banners = """
██████╗ ██████╗ ██████╗ ███████╗ █████╗██████╗ ██╗ ██╗██████╗ ████████╗ ██████╗ ██████╗
██╔══██╗██╔══██╗ ██╔══██╗██╔════╝██╔════╝██╔══██╗╚██╗ ██╔╝██╔══██╗╚══██╔══╝██╔═══██╗██╔══██╗
██████╔╝██████╔╝███████║ ██║█████╗ ████████╔╝ ╚████╔╝ ██████╔╝ ██║ ██║ ██║██████╔╝
██╔═══╝ ██╔══██╗╚════██║ ██║██╔══╝ ██║ ██╔══██╗██╔╝ ██╔═══╝ ██║ ██║ ██║██╔══██
██║ ██║ ██║ ██████╔╝███████╗████████║ ██║ ██║ ██║ ██║ ╚██████╔╝██║ ██║
╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚══════╝ ╚═════╝╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝
██████╗ ██╗ █████╗ ██╗ ████████╗ ███████╗ █████╗ ██████╗ ██╗ ██╗ █████╗ ██████╗ ██
██╔══██╗██║ ██╔══██╗╚██╗ ██╔╝██╔══██╗██╔════╝██╔══██╗██╔══██╗╚██╗ ██╔╝ ██╔══██╗██╔══██╗██║
██████╔╝█████████║ ╚████╔╝ ██████╔╝████╗ █████████║ ██║ ╚████╔╝█████╗███████║██████╔╝██║
██╔═══╝ ██║ ██╔══██║ ██╔╝ ██╔══██╗██╔══╝ ██╔══██║██║ ██║ ╚██╔╝ ╚════╝██╔══██║██╔══██
██║ ███████╗██║ ██║ ██║ ██║ ██║███████╗██████████╔╝ ██║ ██║ ██║██║ ██║
╚═╝ ╚══════╝╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝╚═════╝ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝
Author: GITHUB.COM/THATNOTEASY
"""
# Split the banner into lines

View File

@@ -4,6 +4,7 @@ from modules.logging import setup_logging
from flask import request, jsonify
API_KEY_FILE = 'APIKEY.json'
logging = setup_logging()
def setup_config():
config = configparser.ConfigParser()
@@ -15,10 +16,10 @@ def load_api_keys():
with open(API_KEY_FILE, 'r') as file:
return json.load(file)
except FileNotFoundError:
logger.error("APIKEY.json file not found.")
logging.error("APIKEY.json file not found.")
return []
except json.JSONDecodeError:
logger.error("Error decoding APIKEY.json.")
logging.error("Error decoding APIKEY.json.")
return []
def save_api_keys(api_keys):
@@ -57,9 +58,11 @@ def apikey_required(func):
provided_key = request.headers.get('X-API-KEY')
if not provided_key:
logging.error("X-API-KEY header is missing.")
return jsonify({"responseData": "Opss! API key is missing"}), 403
response_data = {"message": "Opss! X-API-KEY is missing."}
return jsonify({"responseData": response_data}), 403
if not is_valid_api_key(provided_key):
logging.error("Invalid X-API-KEY.")
return jsonify({"responseData": "Opss! Invalid API key"}), 403
response_data = {"message": "Opss! Invalid APIKEY :P"}
return jsonify({"responseData": response_data}), 403
return func(*args, **kwargs)
return decorated_function

View File

@@ -4,67 +4,167 @@ import xml.etree.ElementTree as ET
import json
from flask import jsonify
from pathlib import Path
from modules.logging import setup_logging
from modules.config import setup_config
from modules.pyplayready.pssh import PSSH
from modules.pyplayready.device import Device
from modules.pyplayready.cdm import Cdm
from pyplayready.system.pssh import PSSH
from pyplayready.device import Device
from pyplayready.cdm import Cdm
from pyplayready.exceptions import InvalidSession, InvalidLicense
class PLAYREADY:
def __init__(self):
_instance = None # Singleton
def __new__(cls):
if cls._instance is None:
cls._instance = super(PLAYREADY, cls).__new__(cls)
cls._instance._initialize()
return cls._instance
def _initialize(self):
self.pssh = None
self.session_id = None
self.challenge = None
self.license = None
self.session_id = None
self.config = setup_config()
self.device = self.config["PLAYREADY"]["DEVICE"]
self.logging = setup_logging()
self.device_path = self.config["CDM"]["DEVICE_FILE"]
self.device_name = self.config["CDM"]["DEVICE_NAME"]
self.device = Device.load(Path(self.device_path))
self.store_session = {}
def get_license_challenge(self):
device = Device.load(self.device)
cdm = Cdm.from_device(device)
pssh = PSSH(self.pssh)
wrm_headers = pssh.get_wrm_headers(downgrade_to_v4=False)
raw_challenge = cdm.get_license_challenge(wrm_headers[0])
if isinstance(raw_challenge, str):
raw_challenge = raw_challenge.encode('utf-8')
# ============================================================================================================================== #
challenge_b64 = base64.b64encode(raw_challenge).decode('utf-8')
return challenge_b64
def open_devices(self):
if self.device_name in self.store_session:
existing_session_id = self.store_session[self.device_name]["session_id"]
self.logging.info(f"Existing session for {self.device_name}: {existing_session_id}")
return jsonify({
"responseData": {
"session_id": existing_session_id,
"device_name": self.device_name,
"security_level": self.store_session[self.device_name]["cdm"].security_level
}
}), 200
cdm = Cdm.from_device(self.device)
session_id = cdm.open().hex()
self.store_session[self.device_name] = {"cdm": cdm, "session_id": session_id}
self.logging.info(f"CDM Session Opened: {session_id}")
return jsonify({
"responseData": {
"session_id": session_id,
"device_name": self.device_name,
"security_level": cdm.security_level
}
}), 200
# ============================================================================================================================== #
def close_devices(self, session_id):
session_id_str = session_id.decode() if isinstance(session_id, bytes) else session_id
if self.device_name not in self.store_session:
self.logging.error(f"No active session for device {self.device_name}.")
return jsonify({"responseData": {"message": "No active session for this device."}}), 400
stored_session = self.store_session[self.device_name]
if session_id_str != stored_session["session_id"]:
self.logging.error(f"Session mismatch: Expected {stored_session['session_id']}, got {session_id_str}")
return jsonify({"responseData": {"message": "Invalid Session ID :P"}}), 404
try:
stored_session["cdm"].close(bytes.fromhex(session_id_str))
self.logging.info(f"CDM Session Closed: {session_id_str}")
del self.store_session[self.device_name]
return jsonify({"responseData": {"message": f"Session {session_id_str} closed successfully."}}), 200
except InvalidSession:
self.logging.error(f"Invalid session ID: {session_id_str}")
return jsonify({"responseData": {"message": "Invalid Session ID, it may have expired."}}), 400
except Exception as e:
self.logging.error(f"Error closing session: {str(e)}")
return jsonify({"responseData": {"message": "Unexpected error while closing session."}}), 500
# ============================================================================================================================== #
def get_challenges(self, device):
session_entry = self.store_session.get(device)
if not session_entry or "cdm" not in session_entry:
return jsonify({"responseData": {"message": f"No Cdm session for {device} has been opened yet. No session to use."}}), 400
cdm = session_entry["cdm"]
try:
session_id = bytes.fromhex(self.session_id)
except ValueError:
return jsonify({"responseData": {"message": "Invalid session_id format."}}), 400
if not self.pssh.startswith("<WRMHEADER"):
try:
pssh = PSSH(self.pssh)
if pssh.wrm_headers:
self.pssh = pssh.wrm_headers[0]
except InvalidPssh as e:
return jsonify({"responseData": {"message": f"Unable to parse base64 PSSH, {e}"}}), 500
try:
license_request = cdm.get_license_challenge(session_id=session_id,wrm_header=self.pssh)
except InvalidSession:
return jsonify({"responseData": {"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."}}), 400
except Exception as e:
return jsonify({"responseData": {"message": f"Error, {e}"}}), 500
response_data = {"challenge_b64": base64.b64encode(license_request.encode() if isinstance(license_request, str) else license_request).decode()}
return jsonify({"responseData": response_data}), 200
# ============================================================================================================================== #
def get_keys(self, device):
session_entry = self.store_session.get(device)
if not session_entry or "cdm" not in session_entry:
return jsonify({"responseData": {"message": f"No Cdm session for {device} has been opened yet. No session to use."}}), 400
cdm = session_entry["cdm"]
try:
session_id = bytes.fromhex(self.session_id)
except ValueError:
return jsonify({"responseData": {"message": "Invalid session_id format."}}), 400
if not isinstance(self.license, str) or not self.license.strip():
return jsonify({"responseData": {"message": "Invalid or empty license_message."}}), 400
try:
self.logging.info(f"Parsing license for session {session_id.hex()} on device {device}")
decoded_license = base64.b64decode(self.license).decode("utf-8", errors="ignore")
cdm.parse_license(session_id, decoded_license)
keys = cdm.get_keys(session_id)
response_keys = [
{
"kid": key.key_id.hex,
"key": key.key.hex(),
}
for key in keys
]
except InvalidSession:
return jsonify({"responseData": {"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."}}), 400
except InvalidLicense as e:
return jsonify({"responseData": {"message": f"Invalid License, {e}"}}), 400
except Exception as e:
return jsonify({"responseData": {"message": f"Error, {e}"}}), 500
return jsonify({
"responseData": {
"message": "Successfully parsed and loaded the Keys from the License message.",
"keys": response_keys
}
}), 200
def get_license_keys(self):
device = Device.load(self.device)
cdm = Cdm.from_device(device)
license_data = self.license
try:
try:
decoded_license = base64.b64decode(license_data, validate=True).decode('utf-8', errors='replace')
except (base64.binascii.Error, UnicodeDecodeError):
return jsonify({"error": "License must be provided as valid Base64-encoded data"}), 400
# ============================================================================================================================== #
cleaned_license = re.sub(r'([a-zA-Z0-9:]+)(xmlns)', r'\1 \2', decoded_license)
try:
parsed_xml = ET.fromstring(cleaned_license)
pretty_xml = Dom.parseString(cleaned_license).toprettyxml()
except ET.ParseError as parse_error:
return jsonify({"error": f"Invalid XML in license data: {parse_error}"}), 400
except ValueError as decode_error:
return jsonify({"error": "Invalid license format"}), 400
try:
cdm.parse_license(cleaned_license)
except Exception as e:
return jsonify({"error": f"Unable to parse license: {e}"}), 500
try:
key_pairs = []
for key in cdm.get_keys():
self.kid = key.key_id.hex() if isinstance(key.key_id, bytes) else str(key.key_id).replace("-", "")
self.key = key.key.hex() if isinstance(key.key, bytes) else str(key.key)
key_pairs.append({"key_id": self.kid, "key": self.key})
print(key_pairs)
return jsonify(key_pairs)
except Exception as e:
return jsonify({"error": f"Unable to extract keys: {e}"}), 500

View File

@@ -1,486 +0,0 @@
from __future__ import annotations
import collections.abc
from Crypto.PublicKey import ECC
from modules.pyplayready.crypto import Crypto
from modules.pyplayready.exceptions import InvalidCertificateChain
# monkey patch for construct 2.8.8 compatibility
if not hasattr(collections, 'Sequence'):
collections.Sequence = collections.abc.Sequence
import base64
from pathlib import Path
from typing import Union
from construct import Bytes, Const, Int32ub, GreedyRange, Switch, Container, ListContainer
from construct import Int16ub, Array
from construct import Struct, this
from modules.pyplayready.crypto.ecc_key import ECCKey
class _BCertStructs:
DrmBCertBasicInfo = Struct(
"cert_id" / Bytes(16),
"security_level" / Int32ub,
"flags" / Int32ub,
"cert_type" / Int32ub,
"public_key_digest" / Bytes(32),
"expiration_date" / Int32ub,
"client_id" / Bytes(16)
)
# TODO: untested
DrmBCertDomainInfo = Struct(
"service_id" / Bytes(16),
"account_id" / Bytes(16),
"revision_timestamp" / Int32ub,
"domain_url_length" / Int32ub,
"domain_url" / Bytes((this.domain_url_length + 3) & 0xfffffffc)
)
# TODO: untested
DrmBCertPCInfo = Struct(
"security_version" / Int32ub
)
# TODO: untested
DrmBCertDeviceInfo = Struct(
"max_license" / Int32ub,
"max_header" / Int32ub,
"max_chain_depth" / Int32ub
)
DrmBCertFeatureInfo = Struct(
"feature_count" / Int32ub, # max. 32
"features" / Array(this.feature_count, Int32ub)
)
DrmBCertKeyInfo = Struct(
"key_count" / Int32ub,
"cert_keys" / Array(this.key_count, Struct(
"type" / Int16ub,
"length" / Int16ub,
"flags" / Int32ub,
"key" / Bytes(this.length // 8),
"usages_count" / Int32ub,
"usages" / Array(this.usages_count, Int32ub)
))
)
DrmBCertManufacturerInfo = Struct(
"flags" / Int32ub,
"manufacturer_name_length" / Int32ub,
"manufacturer_name" / Bytes((this.manufacturer_name_length + 3) & 0xfffffffc),
"model_name_length" / Int32ub,
"model_name" / Bytes((this.model_name_length + 3) & 0xfffffffc),
"model_number_length" / Int32ub,
"model_number" / Bytes((this.model_number_length + 3) & 0xfffffffc),
)
DrmBCertSignatureInfo = Struct(
"signature_type" / Int16ub,
"signature_size" / Int16ub,
"signature" / Bytes(this.signature_size),
"signature_key_size" / Int32ub,
"signature_key" / Bytes(this.signature_key_size // 8)
)
# TODO: untested
DrmBCertSilverlightInfo = Struct(
"security_version" / Int32ub,
"platform_identifier" / Int32ub
)
# TODO: untested
DrmBCertMeteringInfo = Struct(
"metering_id" / Bytes(16),
"metering_url_length" / Int32ub,
"metering_url" / Bytes((this.metering_url_length + 3) & 0xfffffffc)
)
# TODO: untested
DrmBCertExtDataSignKeyInfo = Struct(
"key_type" / Int16ub,
"key_length" / Int16ub,
"flags" / Int32ub,
"key" / Bytes(this.length // 8)
)
# TODO: untested
BCertExtDataRecord = Struct(
"data_size" / Int32ub,
"data" / Bytes(this.data_size)
)
# TODO: untested
DrmBCertExtDataSignature = Struct(
"signature_type" / Int16ub,
"signature_size" / Int16ub,
"signature" / Bytes(this.signature_size)
)
# TODO: untested
BCertExtDataContainer = Struct(
"record_count" / Int32ub, # always 1
"records" / Array(this.record_count, BCertExtDataRecord),
"signature" / DrmBCertExtDataSignature
)
# TODO: untested
DrmBCertServerInfo = Struct(
"warning_days" / Int32ub
)
# TODO: untested
DrmBcertSecurityVersion = Struct(
"security_version" / Int32ub,
"platform_identifier" / Int32ub
)
Attribute = Struct(
"flags" / Int16ub,
"tag" / Int16ub,
"length" / Int32ub,
"attribute" / Switch(
lambda this_: this_.tag,
{
1: DrmBCertBasicInfo,
2: DrmBCertDomainInfo,
3: DrmBCertPCInfo,
4: DrmBCertDeviceInfo,
5: DrmBCertFeatureInfo,
6: DrmBCertKeyInfo,
7: DrmBCertManufacturerInfo,
8: DrmBCertSignatureInfo,
9: DrmBCertSilverlightInfo,
10: DrmBCertMeteringInfo,
11: DrmBCertExtDataSignKeyInfo,
12: BCertExtDataContainer,
13: DrmBCertExtDataSignature,
14: Bytes(this.length - 8),
15: DrmBCertServerInfo,
16: DrmBcertSecurityVersion,
17: DrmBcertSecurityVersion
},
default=Bytes(this.length - 8)
)
)
BCert = Struct(
"signature" / Const(b"CERT"),
"version" / Int32ub,
"total_length" / Int32ub,
"certificate_length" / Int32ub,
"attributes" / GreedyRange(Attribute)
)
BCertChain = Struct(
"signature" / Const(b"CHAI"),
"version" / Int32ub,
"total_length" / Int32ub,
"flags" / Int32ub,
"certificate_count" / Int32ub,
"certificates" / GreedyRange(BCert)
)
class Certificate(_BCertStructs):
"""Represents a BCert"""
def __init__(
self,
parsed_bcert: Container,
bcert_obj: _BCertStructs.BCert = _BCertStructs.BCert
):
self.parsed = parsed_bcert
self._BCERT = bcert_obj
@classmethod
def new_leaf_cert(
cls,
cert_id: bytes,
security_level: int,
client_id: bytes,
signing_key: ECCKey,
encryption_key: ECCKey,
group_key: ECCKey,
parent: CertificateChain,
expiry: int = 0xFFFFFFFF,
max_license: int = 10240,
max_header: int = 15360,
max_chain_depth: int = 2
) -> Certificate:
basic_info = Container(
cert_id=cert_id,
security_level=security_level,
flags=0,
cert_type=2,
public_key_digest=signing_key.public_sha256_digest(),
expiration_date=expiry,
client_id=client_id
)
basic_info_attribute = Container(
flags=1,
tag=1,
length=len(_BCertStructs.DrmBCertBasicInfo.build(basic_info)) + 8,
attribute=basic_info
)
device_info = Container(
max_license=max_license,
max_header=max_header,
max_chain_depth=max_chain_depth
)
device_info_attribute = Container(
flags=1,
tag=4,
length=len(_BCertStructs.DrmBCertDeviceInfo.build(device_info)) + 8,
attribute=device_info
)
feature = Container(
feature_count=3,
features=ListContainer([
# 1, # Transmitter
# 2, # Receiver
# 3, # SharedCertificate
4, # SecureClock
# 5, # AntiRollBackClock
# 6, # ReservedMetering
# 7, # ReservedLicSync
# 8, # ReservedSymOpt
9, # CRLS (Revocation Lists)
# 10, # ServerBasicEdition
# 11, # ServerStandardEdition
# 12, # ServerPremiumEdition
13, # PlayReady3Features
# 14, # DeprecatedSecureStop
])
)
feature_attribute = Container(
flags=1,
tag=5,
length=len(_BCertStructs.DrmBCertFeatureInfo.build(feature)) + 8,
attribute=feature
)
cert_key_sign = Container(
type=1,
length=512, # bits
flags=0,
key=signing_key.public_bytes(),
usages_count=1,
usages=ListContainer([
1 # KEYUSAGE_SIGN
])
)
cert_key_encrypt = Container(
type=1,
length=512, # bits
flags=0,
key=encryption_key.public_bytes(),
usages_count=1,
usages=ListContainer([
2 # KEYUSAGE_ENCRYPT_KEY
])
)
key_info = Container(
key_count=2,
cert_keys=ListContainer([
cert_key_sign,
cert_key_encrypt
])
)
key_info_attribute = Container(
flags=1,
tag=6,
length=len(_BCertStructs.DrmBCertKeyInfo.build(key_info)) + 8,
attribute=key_info
)
manufacturer_info = parent.get_certificate(0).get_attribute(7)
new_bcert_container = Container(
signature=b"CERT",
version=1,
total_length=0, # filled at a later time
certificate_length=0, # filled at a later time
attributes=ListContainer([
basic_info_attribute,
device_info_attribute,
feature_attribute,
key_info_attribute,
manufacturer_info,
])
)
payload = _BCertStructs.BCert.build(new_bcert_container)
new_bcert_container.certificate_length = len(payload)
new_bcert_container.total_length = len(payload) + 144 # signature length
sign_payload = _BCertStructs.BCert.build(new_bcert_container)
signature = Crypto.ecc256_sign(group_key, sign_payload)
signature_info = Container(
signature_type=1,
signature_size=64,
signature=signature,
signature_key_size=512, # bits
signature_key=group_key.public_bytes()
)
signature_info_attribute = Container(
flags=1,
tag=8,
length=len(_BCertStructs.DrmBCertSignatureInfo.build(signature_info)) + 8,
attribute=signature_info
)
new_bcert_container.attributes.append(signature_info_attribute)
return cls(new_bcert_container)
@classmethod
def loads(cls, data: Union[str, bytes]) -> Certificate:
if isinstance(data, str):
data = base64.b64decode(data)
if not isinstance(data, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
cert = _BCertStructs.BCert
return cls(
parsed_bcert=cert.parse(data),
bcert_obj=cert
)
@classmethod
def load(cls, path: Union[Path, str]) -> Certificate:
if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}")
with Path(path).open(mode="rb") as f:
return cls.loads(f.read())
def get_attribute(self, type_: int):
for attribute in self.parsed.attributes:
if attribute.tag == type_:
return attribute
def get_security_level(self) -> int:
basic_info_attribute = self.get_attribute(1).attribute
if basic_info_attribute:
return basic_info_attribute.security_level
@staticmethod
def _unpad(name: bytes):
return name.rstrip(b'\x00').decode("utf-8", errors="ignore")
def get_name(self):
manufacturer_info = self.get_attribute(7).attribute
if manufacturer_info:
return f"{self._unpad(manufacturer_info.manufacturer_name)} {self._unpad(manufacturer_info.model_name)} {self._unpad(manufacturer_info.model_number)}"
def dumps(self) -> bytes:
return self._BCERT.build(self.parsed)
def struct(self) -> _BCertStructs.BCert:
return self._BCERT
def verify_signature(self):
signature_object = self.get_attribute(8)
signature_attribute = signature_object.attribute
sign_payload = self.dumps()[:-signature_object.length]
raw_signature_key = signature_attribute.signature_key
signature_key = ECC.construct(
curve='P-256',
point_x=int.from_bytes(raw_signature_key[:32], 'big'),
point_y=int.from_bytes(raw_signature_key[32:], 'big')
)
return Crypto.ecc256_verify(
public_key=signature_key,
data=sign_payload,
signature=signature_attribute.signature
)
class CertificateChain(_BCertStructs):
"""Represents a BCertChain"""
def __init__(
self,
parsed_bcert_chain: Container,
bcert_chain_obj: _BCertStructs.BCertChain = _BCertStructs.BCertChain
):
self.parsed = parsed_bcert_chain
self._BCERT_CHAIN = bcert_chain_obj
@classmethod
def loads(cls, data: Union[str, bytes]) -> CertificateChain:
if isinstance(data, str):
data = base64.b64decode(data)
if not isinstance(data, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
cert_chain = _BCertStructs.BCertChain
return cls(
parsed_bcert_chain=cert_chain.parse(data),
bcert_chain_obj=cert_chain
)
@classmethod
def load(cls, path: Union[Path, str]) -> CertificateChain:
if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}")
with Path(path).open(mode="rb") as f:
return cls.loads(f.read())
def dumps(self) -> bytes:
return self._BCERT_CHAIN.build(self.parsed)
def struct(self) -> _BCertStructs.BCertChain:
return self._BCERT_CHAIN
def get_certificate(self, index: int) -> Certificate:
return Certificate(self.parsed.certificates[index])
def get_security_level(self) -> int:
# not sure if there's a better way than this
return self.get_certificate(0).get_security_level()
def get_name(self) -> str:
return self.get_certificate(0).get_name()
def append(self, bcert: Certificate) -> None:
self.parsed.certificate_count += 1
self.parsed.certificates.append(bcert.parsed)
self.parsed.total_length += len(bcert.dumps())
def prepend(self, bcert: Certificate) -> None:
self.parsed.certificate_count += 1
self.parsed.certificates.insert(0, bcert.parsed)
self.parsed.total_length += len(bcert.dumps())
def remove(self, index: int) -> None:
if self.parsed.certificate_count <= 0:
raise InvalidCertificateChain("CertificateChain does not contain any Certificates")
if index >= self.parsed.certificate_count:
raise IndexError(f"No Certificate at index {index}, {self.parsed.certificate_count} total")
self.parsed.certificate_count -= 1
bcert = Certificate(self.parsed.certificates[index])
self.parsed.total_length -= len(bcert.dumps())
self.parsed.certificates.pop(index)
def get(self, index: int) -> Certificate:
if self.parsed.certificate_count <= 0:
raise InvalidCertificateChain("CertificateChain does not contain any Certificates")
if index >= self.parsed.certificate_count:
raise IndexError(f"No Certificate at index {index}, {self.parsed.certificate_count} total")
return Certificate(self.parsed.certificates[index])
def count(self) -> int:
return self.parsed.certificate_count

View File

@@ -1,218 +0,0 @@
from __future__ import annotations
import base64
import math
import time
from typing import List
from uuid import UUID
import xml.etree.ElementTree as ET
from Crypto.Cipher import AES
from Crypto.Hash import SHA256
from Crypto.Random import get_random_bytes
from Crypto.Signature import DSS
from Crypto.Util.Padding import pad
from ecpy.curves import Point, Curve
from modules.pyplayready.bcert import CertificateChain
from modules.pyplayready.crypto.ecc_key import ECCKey
from modules.pyplayready.key import Key
from modules.pyplayready.xml_key import XmlKey
from modules.pyplayready.crypto.elgamal import ElGamal
from modules.pyplayready.xmrlicense import XMRLicense
class Cdm:
def __init__(
self,
security_level: int,
certificate_chain: CertificateChain,
encryption_key: ECCKey,
signing_key: ECCKey,
client_version: str = "10.0.16384.10011",
protocol_version: int = 1
):
self.security_level = security_level
self.certificate_chain = certificate_chain
self.encryption_key = encryption_key
self.signing_key = signing_key
self.client_version = client_version
self.protocol_version = protocol_version
self.curve = Curve.get_curve("secp256r1")
self.elgamal = ElGamal(self.curve)
self._wmrm_key = Point(
x=0xc8b6af16ee941aadaa5389b4af2c10e356be42af175ef3face93254e7b0b3d9b,
y=0x982b27b5cb2341326e56aa857dbfd5c634ce2cf9ea74fca8f2af5957efeea562,
curve=self.curve
)
self._xml_key = XmlKey()
self._keys: List[Key] = []
@classmethod
def from_device(cls, device) -> Cdm:
"""Initialize a Playready CDM from a Playready Device (.prd) file"""
return cls(
security_level=device.security_level,
certificate_chain=device.group_certificate,
encryption_key=device.encryption_key,
signing_key=device.signing_key
)
def get_key_data(self) -> bytes:
point1, point2 = self.elgamal.encrypt(
message_point=self._xml_key.get_point(self.elgamal.curve),
public_key=self._wmrm_key
)
return self.elgamal.to_bytes(point1.x) + self.elgamal.to_bytes(point1.y) + self.elgamal.to_bytes(point2.x) + self.elgamal.to_bytes(point2.y)
def get_cipher_data(self) -> bytes:
b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode()
body = f"<Data><CertificateChains><CertificateChain>{b64_chain}</CertificateChain></CertificateChains></Data>"
cipher = AES.new(
key=self._xml_key.aes_key,
mode=AES.MODE_CBC,
iv=self._xml_key.aes_iv
)
ciphertext = cipher.encrypt(pad(
body.encode(),
AES.block_size
))
return self._xml_key.aes_iv + ciphertext
def _build_digest_content(
self,
content_header: str,
nonce: str,
wmrm_cipher: str,
cert_cipher: str
) -> str:
return (
'<LA xmlns="http://schemas.microsoft.com/DRM/2007/03/protocols" Id="SignedData" xml:space="preserve">'
f'<Version>{self.protocol_version}</Version>'
f'<ContentHeader>{content_header}</ContentHeader>'
'<CLIENTINFO>'
f'<CLIENTVERSION>{self.client_version}</CLIENTVERSION>'
'</CLIENTINFO>'
f'<LicenseNonce>{nonce}</LicenseNonce>'
f'<ClientTime>{math.floor(time.time())}</ClientTime>'
'<EncryptedData xmlns="http://www.w3.org/2001/04/xmlenc#" Type="http://www.w3.org/2001/04/xmlenc#Element">'
'<EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#aes128-cbc"></EncryptionMethod>'
'<KeyInfo xmlns="http://www.w3.org/2000/09/xmldsig#">'
'<EncryptedKey xmlns="http://www.w3.org/2001/04/xmlenc#">'
'<EncryptionMethod Algorithm="http://schemas.microsoft.com/DRM/2007/03/protocols#ecc256"></EncryptionMethod>'
'<KeyInfo xmlns="http://www.w3.org/2000/09/xmldsig#">'
'<KeyName>WMRMServer</KeyName>'
'</KeyInfo>'
'<CipherData>'
f'<CipherValue>{wmrm_cipher}</CipherValue>'
'</CipherData>'
'</EncryptedKey>'
'</KeyInfo>'
'<CipherData>'
f'<CipherValue>{cert_cipher}</CipherValue>'
'</CipherData>'
'</EncryptedData>'
'</LA>'
)
@staticmethod
def _build_signed_info(digest_value: str) -> str:
return (
'<SignedInfo xmlns="http://www.w3.org/2000/09/xmldsig#">'
'<CanonicalizationMethod Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315"></CanonicalizationMethod>'
'<SignatureMethod Algorithm="http://schemas.microsoft.com/DRM/2007/03/protocols#ecdsa-sha256"></SignatureMethod>'
'<Reference URI="#SignedData">'
'<DigestMethod Algorithm="http://schemas.microsoft.com/DRM/2007/03/protocols#sha256"></DigestMethod>'
f'<DigestValue>{digest_value}</DigestValue>'
'</Reference>'
'</SignedInfo>'
)
def get_license_challenge(self, content_header: str) -> str:
la_content = self._build_digest_content(
content_header=content_header,
nonce=base64.b64encode(get_random_bytes(16)).decode(),
wmrm_cipher=base64.b64encode(self.get_key_data()).decode(),
cert_cipher=base64.b64encode(self.get_cipher_data()).decode()
)
la_hash_obj = SHA256.new()
la_hash_obj.update(la_content.encode())
la_hash = la_hash_obj.digest()
signed_info = self._build_signed_info(base64.b64encode(la_hash).decode())
signed_info_digest = SHA256.new(signed_info.encode())
signer = DSS.new(self.signing_key.key, 'fips-186-3')
signature = signer.sign(signed_info_digest)
# haven't found a better way to do this. xmltodict.unparse doesn't work
main_body = (
'<?xml version="1.0" encoding="utf-8"?>'
'<soap:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">'
'<soap:Body>'
'<AcquireLicense xmlns="http://schemas.microsoft.com/DRM/2007/03/protocols">'
'<challenge>'
'<Challenge xmlns="http://schemas.microsoft.com/DRM/2007/03/protocols/messages">'
+ la_content +
'<Signature xmlns="http://www.w3.org/2000/09/xmldsig#">'
+ signed_info +
f'<SignatureValue>{base64.b64encode(signature).decode()}</SignatureValue>'
'<KeyInfo xmlns="http://www.w3.org/2000/09/xmldsig#">'
'<KeyValue>'
'<ECCKeyValue>'
f'<PublicKey>{base64.b64encode(self.signing_key.public_bytes()).decode()}</PublicKey>'
'</ECCKeyValue>'
'</KeyValue>'
'</KeyInfo>'
'</Signature>'
'</Challenge>'
'</challenge>'
'</AcquireLicense>'
'</soap:Body>'
'</soap:Envelope>'
)
return main_body
def _decrypt_ecc256_key(self, encrypted_key: bytes) -> bytes:
point1 = Point(
x=int.from_bytes(encrypted_key[:32], 'big'),
y=int.from_bytes(encrypted_key[32:64], 'big'),
curve=self.curve
)
point2 = Point(
x=int.from_bytes(encrypted_key[64:96], 'big'),
y=int.from_bytes(encrypted_key[96:128], 'big'),
curve=self.curve
)
decrypted = self.elgamal.decrypt((point1, point2), int(self.encryption_key.key.d))
return self.elgamal.to_bytes(decrypted.x)[16:32]
def parse_license(self, licence: str) -> None:
try:
root = ET.fromstring(licence)
license_elements = root.findall(".//{http://schemas.microsoft.com/DRM/2007/03/protocols}License")
for license_element in license_elements:
parsed_licence = XMRLicense.loads(license_element.text)
for key in parsed_licence.get_content_keys():
if Key.CipherType(key.cipher_type) == Key.CipherType.ECC_256:
self._keys.append(Key(
key_id=UUID(bytes_le=key.key_id),
key_type=key.key_type,
cipher_type=key.cipher_type,
key_length=key.key_length,
key=self._decrypt_ecc256_key(key.encrypted_key)
))
except Exception as e:
raise Exception(f"Unable to parse license, {e}")
def get_keys(self) -> List[Key]:
return self._keys

View File

@@ -1,95 +0,0 @@
from __future__ import annotations
import base64
from pathlib import Path
from typing import Union
from Crypto.Hash import SHA256
from Crypto.PublicKey import ECC
from Crypto.PublicKey.ECC import EccKey
from ecpy.curves import Curve, Point
class ECCKey:
"""Represents a PlayReady ECC key pair"""
def __init__(self, key: EccKey):
self.key = key
@classmethod
def generate(cls):
"""Generate a new ECC key pair"""
return cls(key=ECC.generate(curve='P-256'))
@classmethod
def construct(cls, private_key: Union[bytes, int]):
"""Construct an ECC key pair from private/public bytes/ints"""
if isinstance(private_key, bytes):
private_key = int.from_bytes(private_key, 'big')
if not isinstance(private_key, int):
raise ValueError(f"Expecting Bytes or Int input, got {private_key!r}")
# The public is always derived from the private key; loading the other stuff won't work
key = ECC.construct(
curve='P-256',
d=private_key,
)
return cls(key=key)
@classmethod
def loads(cls, data: Union[str, bytes]) -> ECCKey:
if isinstance(data, str):
data = base64.b64decode(data)
if not isinstance(data, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
if len(data) not in [96, 32]:
raise ValueError(f"Invalid data length. Expecting 96 or 32 bytes, got {len(data)}")
return cls.construct(private_key=data[:32])
@classmethod
def load(cls, path: Union[Path, str]) -> ECCKey:
if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}")
with Path(path).open(mode="rb") as f:
return cls.loads(f.read())
def dumps(self, private_only=False):
if private_only:
return self.private_bytes()
return self.private_bytes() + self.public_bytes()
def dump(self, path: Union[Path, str], private_only=False) -> None:
if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}")
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(self.dumps(private_only))
@staticmethod
def _to_bytes(n: int) -> bytes:
byte_len = (n.bit_length() + 7) // 8
if byte_len % 2 != 0:
byte_len += 1
return n.to_bytes(byte_len, 'big')
def get_point(self, curve: Curve) -> Point:
return Point(self.key.pointQ.x, self.key.pointQ.y, curve)
def private_bytes(self) -> bytes:
return self._to_bytes(int(self.key.d))
def private_sha256_digest(self) -> bytes:
hash_object = SHA256.new()
hash_object.update(self.private_bytes())
return hash_object.digest()
def public_bytes(self) -> bytes:
return self._to_bytes(int(self.key.pointQ.x)) + self._to_bytes(int(self.key.pointQ.y))
def public_sha256_digest(self) -> bytes:
hash_object = SHA256.new()
hash_object.update(self.public_bytes())
return hash_object.digest()

View File

@@ -1,36 +0,0 @@
from typing import Tuple
from ecpy.curves import Curve, Point
import secrets
class ElGamal:
def __init__(self, curve: Curve):
self.curve = curve
@staticmethod
def to_bytes(n: int) -> bytes:
byte_len = (n.bit_length() + 7) // 8
if byte_len % 2 != 0:
byte_len += 1
return n.to_bytes(byte_len, 'big')
def encrypt(
self,
message_point: Point,
public_key: Point
) -> Tuple[Point, Point]:
ephemeral_key = secrets.randbelow(self.curve.order)
point1 = ephemeral_key * self.curve.generator
point2 = message_point + (ephemeral_key * public_key)
return point1, point2
@staticmethod
def decrypt(
encrypted: Tuple[Point, Point],
private_key: int
) -> Point:
point1, point2 = encrypted
shared_secret = private_key * point1
decrypted_message = point2 - shared_secret
return decrypted_message

View File

@@ -1,136 +0,0 @@
from __future__ import annotations
import base64
from enum import IntEnum
from pathlib import Path
from typing import Union, Any
from modules.pyplayready.bcert import CertificateChain
from modules.pyplayready.crypto.ecc_key import ECCKey
from construct import Struct, Const, Int8ub, Bytes, this, Int32ub
class DeviceStructs:
magic = Const(b"PRD")
header = Struct(
"signature" / magic,
"version" / Int8ub,
)
# was never in production
v1 = Struct(
"signature" / magic,
"version" / Int8ub,
"group_key_length" / Int32ub,
"group_key" / Bytes(this.group_key_length),
"group_certificate_length" / Int32ub,
"group_certificate" / Bytes(this.group_certificate_length)
)
v2 = Struct(
"signature" / magic,
"version" / Int8ub,
"group_certificate_length" / Int32ub,
"group_certificate" / Bytes(this.group_certificate_length),
"encryption_key" / Bytes(96),
"signing_key" / Bytes(96),
)
v3 = Struct(
"signature" / magic,
"version" / Int8ub,
"group_key" / Bytes(96),
"encryption_key" / Bytes(96),
"signing_key" / Bytes(96),
"group_certificate_length" / Int32ub,
"group_certificate" / Bytes(this.group_certificate_length),
)
class Device:
"""Represents a PlayReady Device (.prd)"""
CURRENT_STRUCT = DeviceStructs.v3
CURRENT_VERSION = 3
class SecurityLevel(IntEnum):
SL150 = 150
SL2000 = 2000
SL3000 = 3000
def __init__(
self,
*_: Any,
group_key: Union[str, bytes, None],
encryption_key: Union[str, bytes],
signing_key: Union[str, bytes],
group_certificate: Union[str, bytes],
**__: Any
):
if isinstance(group_key, str):
group_key = base64.b64decode(group_key)
if isinstance(encryption_key, str):
encryption_key = base64.b64decode(encryption_key)
if not isinstance(encryption_key, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {encryption_key!r}")
if isinstance(signing_key, str):
signing_key = base64.b64decode(signing_key)
if not isinstance(signing_key, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {signing_key!r}")
if isinstance(group_certificate, str):
group_certificate = base64.b64decode(group_certificate)
if not isinstance(group_certificate, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {group_certificate!r}")
self.group_key = None if group_key is None else ECCKey.loads(group_key)
self.encryption_key = ECCKey.loads(encryption_key)
self.signing_key = ECCKey.loads(signing_key)
self.group_certificate = CertificateChain.loads(group_certificate)
self.security_level = self.group_certificate.get_security_level()
@classmethod
def loads(cls, data: Union[str, bytes]) -> Device:
if isinstance(data, str):
data = base64.b64decode(data)
if not isinstance(data, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
prd_header = DeviceStructs.header.parse(data)
if prd_header.version == 2:
return cls(
group_key=None,
**DeviceStructs.v2.parse(data)
)
return cls(**cls.CURRENT_STRUCT.parse(data))
@classmethod
def load(cls, path: Union[Path, str]) -> Device:
if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}")
with Path(path).open(mode="rb") as f:
return cls.loads(f.read())
def dumps(self) -> bytes:
return self.CURRENT_STRUCT.build(dict(
version=self.CURRENT_VERSION,
group_key=self.group_key.dumps(),
encryption_key=self.encryption_key.dumps(),
signing_key=self.signing_key.dumps(),
group_certificate_length=len(self.group_certificate.dumps()),
group_certificate=self.group_certificate.dumps(),
))
def dump(self, path: Union[Path, str]) -> None:
if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}")
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(self.dumps())
def get_name(self) -> str:
name = f"{self.group_certificate.get_name()}_sl{self.group_certificate.get_security_level()}"
return ''.join(char for char in name if (char.isalnum() or char in '_- ')).strip().lower().replace(" ", "_")

View File

@@ -1,26 +0,0 @@
class PyPlayreadyException(Exception):
"""Exceptions used by pyplayready."""
class InvalidPssh(PyPlayreadyException):
"""The Playready PSSH is invalid or empty."""
class InvalidInitData(PyPlayreadyException):
"""The Playready Cenc Header Data is invalid or empty."""
class DeviceMismatch(PyPlayreadyException):
"""The Remote CDMs Device information and the APIs Device information did not match."""
class InvalidLicense(PyPlayreadyException):
"""Unable to parse XMR License."""
class InvalidCertificateChain(PyPlayreadyException):
"""The BCert is not correctly formatted."""
class OutdatedDevice(PyPlayreadyException):
"""The PlayReady Device is outdated and does not support a specific operation."""

View File

@@ -1,68 +0,0 @@
import base64
from enum import Enum
from uuid import UUID
from typing import Union
class Key:
class KeyType(Enum):
INVALID = 0x0000
AES_128_CTR = 0x0001
RC4_CIPHER = 0x0002
AES_128_ECB = 0x0003
COCKTAIL = 0x0004
AES_128_CBC = 0x0005
KEYEXCHANGE = 0x0006
UNKNOWN = 0xffff
@classmethod
def _missing_(cls, value):
return cls.UNKNOWN
class CipherType(Enum):
INVALID = 0x0000
RSA_1024 = 0x0001
CHAINED_LICENSE = 0x0002
ECC_256 = 0x0003
ECC_256_WITH_KZ = 0x0004
TEE_TRANSIENT = 0x0005
ECC_256_VIA_SYMMETRIC = 0x0006
UNKNOWN = 0xffff
@classmethod
def _missing_(cls, value):
return cls.UNKNOWN
def __init__(
self,
key_id: UUID,
key_type: int,
cipher_type: int,
key_length: int,
key: bytes
):
self.key_id = key_id
self.key_type = self.KeyType(key_type)
self.cipher_type = self.CipherType(cipher_type)
self.key_length = key_length
self.key = key
@staticmethod
def kid_to_uuid(kid: Union[str, bytes]) -> UUID:
"""
Convert a Key ID from a string or bytes to a UUID object.
At first, this may seem very simple, but some types of Key IDs
may not be 16 bytes and some may be decimal vs. hex.
"""
if isinstance(kid, str):
kid = base64.b64decode(kid)
if not kid:
kid = b"\x00" * 16
if kid.decode(errors="replace").isdigit():
return UUID(int=int(kid.decode()))
if len(kid) < 16:
kid += b"\x00" * (16 - len(kid))
return UUID(bytes=kid)

View File

@@ -1,311 +0,0 @@
import logging
from datetime import datetime
from pathlib import Path
from typing import Optional
import click
import requests
from Crypto.Random import get_random_bytes
from modules.pyplayready.system.bcert import CertificateChain, Certificate
from modules.pyplayready.cdm import Cdm
from modules.pyplayready.device import Device
from modules.pyplayready.crypto.ecc_key import ECCKey
from modules.pyplayready.exceptions import OutdatedDevice
from modules.pyplayready.system.pssh import PSSH
@click.group(invoke_without_command=True)
@click.option("-v", "--version", is_flag=True, default=False, help="Print version information.")
@click.option("-d", "--debug", is_flag=True, default=False, help="Enable DEBUG level logs.")
def main(version: bool, debug: bool) -> None:
"""Python PlayReady CDM implementation"""
logging.basicConfig(level=logging.DEBUG if debug else logging.INFO)
log = logging.getLogger()
current_year = datetime.now().year
copyright_years = f"2024-{current_year}"
log.info("pyplayready version %s Copyright (c) %s DevLARLEY, Erevoc, DevataDev", __version__, copyright_years)
log.info("https://github.com/ready-dl/pyplayready")
log.info("Run 'pyplayready --help' for help")
if version:
return
@main.command(name="license")
@click.argument("device_path", type=Path)
@click.argument("pssh", type=PSSH)
@click.argument("server", type=str)
def license_(device_path: Path, pssh: PSSH, server: str) -> None:
"""
Make a License Request to a server using a given PSSH
Will return a list of all keys within the returned license
Only works for standard license servers that don't use any license wrapping
"""
log = logging.getLogger("license")
device = Device.load(device_path)
log.info(f"Loaded Device: {device.get_name()}")
cdm = Cdm.from_device(device)
log.info("Loaded CDM")
session_id = cdm.open()
log.info("Opened Session")
challenge = cdm.get_license_challenge(session_id, pssh.get_wrm_headers(downgrade_to_v4=True)[0])
log.info("Created License Request (Challenge)")
log.debug(challenge)
license_res = requests.post(
url=server,
headers={
'Content-Type': 'text/xml; charset=UTF-8',
},
data=challenge
)
if license_res.status_code != 200:
log.error(f"Failed to send challenge: [{license_res.status_code}] {license_res.text}")
return
licence = license_res.text
log.info("Got License Message")
log.debug(licence)
cdm.parse_license(session_id, licence)
log.info("License Parsed Successfully")
for key in cdm.get_keys(session_id):
log.info(f"{key.key_id.hex}:{key.key.hex()}")
cdm.close(session_id)
log.info("Clossed Session")
@main.command()
@click.argument("device", type=Path)
@click.pass_context
def test(ctx: click.Context, device: Path) -> None:
"""
Test the CDM code by getting Content Keys for the Tears Of Steel demo on the Playready Test Server.
https://testweb.playready.microsoft.com/Content/Content2X
+ DASH Manifest URL: https://test.playready.microsoft.com/media/profficialsite/tearsofsteel_4k.ism/manifest.mpd
+ MSS Manifest URL: https://test.playready.microsoft.com/media/profficialsite/tearsofsteel_4k.ism.smoothstreaming/manifest
The device argument is a Path to a Playready Device (.prd) file which contains the device's group key and
group certificate.
"""
pssh = PSSH(
"AAADfHBzc2gAAAAAmgTweZhAQoarkuZb4IhflQAAA1xcAwAAAQABAFIDPABXAFIATQBIAEUAQQBEAEUAUgAgAHgAbQBsAG4AcwA9ACIAaAB0AH"
"QAcAA6AC8ALwBzAGMAaABlAG0AYQBzAC4AbQBpAGMAcgBvAHMAbwBmAHQALgBjAG8AbQAvAEQAUgBNAC8AMgAwADAANwAvADAAMwAvAFAAbABh"
"AHkAUgBlAGEAZAB5AEgAZQBhAGQAZQByACIAIAB2AGUAcgBzAGkAbwBuAD0AIgA0AC4AMAAuADAALgAwACIAPgA8AEQAQQBUAEEAPgA8AFAAUg"
"BPAFQARQBDAFQASQBOAEYATwA+ADwASwBFAFkATABFAE4APgAxADYAPAAvAEsARQBZAEwARQBOAD4APABBAEwARwBJAEQAPgBBAEUAUwBDAFQA"
"UgA8AC8AQQBMAEcASQBEAD4APAAvAFAAUgBPAFQARQBDAFQASQBOAEYATwA+ADwASwBJAEQAPgA0AFIAcABsAGIAKwBUAGIATgBFAFMAOAB0AE"
"cAawBOAEYAVwBUAEUASABBAD0APQA8AC8ASwBJAEQAPgA8AEMASABFAEMASwBTAFUATQA+AEsATABqADMAUQB6AFEAUAAvAE4AQQA9ADwALwBD"
"AEgARQBDAEsAUwBVAE0APgA8AEwAQQBfAFUAUgBMAD4AaAB0AHQAcABzADoALwAvAHAAcgBvAGYAZgBpAGMAaQBhAGwAcwBpAHQAZQAuAGsAZQ"
"B5AGQAZQBsAGkAdgBlAHIAeQAuAG0AZQBkAGkAYQBzAGUAcgB2AGkAYwBlAHMALgB3AGkAbgBkAG8AdwBzAC4AbgBlAHQALwBQAGwAYQB5AFIA"
"ZQBhAGQAeQAvADwALwBMAEEAXwBVAFIATAA+ADwAQwBVAFMAVABPAE0AQQBUAFQAUgBJAEIAVQBUAEUAUwA+ADwASQBJAFMAXwBEAFIATQBfAF"
"YARQBSAFMASQBPAE4APgA4AC4AMQAuADIAMwAwADQALgAzADEAPAAvAEkASQBTAF8ARABSAE0AXwBWAEUAUgBTAEkATwBOAD4APAAvAEMAVQBT"
"AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA=="
)
license_server = "https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:2000)"
ctx.invoke(
license_,
device_path=device,
pssh=pssh,
server=license_server
)
@main.command()
@click.option("-k", "--group_key", type=Path, required=True, help="Device ECC private group key")
@click.option("-c", "--group_certificate", type=Path, required=True, help="Device group certificate chain")
@click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory")
@click.pass_context
def create_device(
ctx: click.Context,
group_key: Path,
group_certificate: Path,
output: Optional[Path] = None
) -> None:
"""Create a Playready Device (.prd) file from an ECC private group key and group certificate chain"""
if not group_key.is_file():
raise click.UsageError("group_key: Not a path to a file, or it doesn't exist.", ctx)
if not group_certificate.is_file():
raise click.UsageError("group_certificate: Not a path to a file, or it doesn't exist.", ctx)
log = logging.getLogger("create-device")
encryption_key = ECCKey.generate()
signing_key = ECCKey.generate()
group_key = ECCKey.load(group_key)
certificate_chain = CertificateChain.load(group_certificate)
new_certificate = Certificate.new_leaf_cert(
cert_id=get_random_bytes(16),
security_level=certificate_chain.get_security_level(),
client_id=get_random_bytes(16),
signing_key=signing_key,
encryption_key=encryption_key,
group_key=group_key,
parent=certificate_chain
)
certificate_chain.prepend(new_certificate)
device = Device(
group_key=group_key.dumps(),
encryption_key=encryption_key.dumps(),
signing_key=signing_key.dumps(),
group_certificate=certificate_chain.dumps(),
)
if output and output.suffix:
if output.suffix.lower() != ".prd":
log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.")
out_path = output
else:
out_dir = output or Path.cwd()
out_path = out_dir / f"{device.get_name()}.prd"
if out_path.exists():
log.error(f"A file already exists at the path '{out_path}', cannot overwrite.")
return
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_bytes(device.dumps())
log.info("Created Playready Device (.prd) file, %s", out_path.name)
log.info(" + Security Level: %s", device.security_level)
log.info(" + Group Key: %s bytes", len(device.group_key.dumps()))
log.info(" + Encryption Key: %s bytes", len(device.encryption_key.dumps()))
log.info(" + Signing Key: %s bytes", len(device.signing_key.dumps()))
log.info(" + Group Certificate: %s bytes", len(device.group_certificate.dumps()))
log.info(" + Saved to: %s", out_path.absolute())
@main.command()
@click.argument("prd_path", type=Path)
@click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory")
@click.pass_context
def reprovision_device(ctx: click.Context, prd_path: Path, output: Optional[Path] = None) -> None:
"""
Reprovision a Playready Device (.prd) by creating a new leaf certificate and new encryption/signing keys.
Will override the device if an output path or directory is not specified
Only works on PRD Devices of v3 or higher
"""
if not prd_path.is_file():
raise click.UsageError("prd_path: Not a path to a file, or it doesn't exist.", ctx)
log = logging.getLogger("reprovision-device")
log.info("Reprovisioning Playready Device (.prd) file, %s", prd_path.name)
device = Device.load(prd_path)
if device.group_key is None:
raise OutdatedDevice("Device does not support reprovisioning, re-create it or use a Device with a version of 3 or higher")
device.group_certificate.remove(0)
encryption_key = ECCKey.generate()
signing_key = ECCKey.generate()
device.encryption_key = encryption_key
device.signing_key = signing_key
new_certificate = Certificate.new_leaf_cert(
cert_id=get_random_bytes(16),
security_level=device.group_certificate.get_security_level(),
client_id=get_random_bytes(16),
signing_key=signing_key,
encryption_key=encryption_key,
group_key=device.group_key,
parent=device.group_certificate
)
device.group_certificate.prepend(new_certificate)
if output and output.suffix:
if output.suffix.lower() != ".prd":
log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.")
out_path = output
else:
out_path = prd_path
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_bytes(device.dumps())
log.info("Reprovisioned Playready Device (.prd) file, %s", out_path.name)
@main.command()
@click.argument("prd_path", type=Path)
@click.option("-o", "--out_dir", type=Path, default=None, help="Output Directory")
@click.pass_context
def export_device(ctx: click.Context, prd_path: Path, out_dir: Optional[Path] = None) -> None:
"""
Export a Playready Device (.prd) file to a Group Key and Group Certificate
If an output directory is not specified, it will be stored in the current working directory
"""
if not prd_path.is_file():
raise click.UsageError("prd_path: Not a path to a file, or it doesn't exist.", ctx)
log = logging.getLogger("export-device")
log.info("Exporting Playready Device (.prd) file, %s", prd_path.stem)
if not out_dir:
out_dir = Path.cwd()
out_path = out_dir / prd_path.stem
if out_path.exists():
if any(out_path.iterdir()):
log.error("Output directory is not empty, cannot overwrite.")
return
else:
log.warning("Output directory already exists, but is empty.")
else:
out_path.mkdir(parents=True)
device = Device.load(prd_path)
log.info(f"SL{device.security_level} {device.get_name()}")
log.info(f"Saving to: {out_path}")
if device.group_key:
group_key_path = out_path / "zgpriv.dat"
group_key_path.write_bytes(device.group_key.dumps(private_only=True))
log.info("Exported Group Key as zgpriv.dat")
else:
log.warning("Cannot export zgpriv.dat, as v2 devices do not save the group key")
# remove leaf cert to unprovision it
device.group_certificate.remove(0)
client_id_path = out_path / "bgroupcert.dat"
client_id_path.write_bytes(device.group_certificate.dumps())
log.info("Exported Group Certificate to bgroupcert.dat")
@main.command("serve", short_help="Serve your local CDM and Playready Devices Remotely.")
@click.argument("config_path", type=Path)
@click.option("-h", "--host", type=str, default="127.0.0.1", help="Host to serve from.")
@click.option("-p", "--port", type=int, default=7723, help="Port to serve from.")
def serve_(config_path: Path, host: str, port: int) -> None:
"""
Serve your local CDM and Playready Devices Remotely.
[CONFIG] is a path to a serve config file.
See `serve.example.yml` for an example config file.
Host as 127.0.0.1 may block remote access even if port-forwarded.
Instead, use 0.0.0.0 and ensure the TCP port you choose is forwarded.
"""
from pyplayready.remote import serve
import yaml
config = yaml.safe_load(config_path.read_text(encoding="utf8"))
serve.run(config, host, port)

View File

@@ -1,98 +0,0 @@
import base64
from typing import Union, List
from uuid import UUID
from construct import Struct, Int32ul, Int16ul, Array, this, Bytes, Switch, Int32ub, Const, Container, ConstructError
from modules.pyplayready.exceptions import InvalidPssh
from modules.pyplayready.wrmheader import WRMHeader
class _PlayreadyPSSHStructs:
PSSHBox = Struct(
"length" / Int32ub,
"pssh" / Const(b"pssh"),
"fullbox" / Int32ub,
"system_id" / Bytes(16),
"data_length" / Int32ub,
"data" / Bytes(this.data_length)
)
PlayreadyObject = Struct(
"type" / Int16ul,
"length" / Int16ul,
"data" / Switch(
this.type,
{
1: Bytes(this.length)
},
default=Bytes(this.length)
)
)
PlayreadyHeader = Struct(
"length" / Int32ul,
"record_count" / Int16ul,
"records" / Array(this.record_count, PlayreadyObject)
)
class PSSH(_PlayreadyPSSHStructs):
"""Represents a PlayReady PSSH"""
SYSTEM_ID = UUID(hex="9a04f07998404286ab92e65be0885f95")
def __init__(self, data: Union[str, bytes]):
"""Load a PSSH Box, PlayReady Header or PlayReady Object"""
if not data:
raise InvalidPssh("Data must not be empty")
if isinstance(data, str):
try:
data = base64.b64decode(data)
except Exception as e:
raise InvalidPssh(f"Could not decode data as Base64, {e}")
self.wrm_headers: List[WRMHeader]
try:
# PSSH Box -> PlayReady Header
box = self.PSSHBox.parse(data)
prh = self.PlayreadyHeader.parse(box.data)
self.wrm_headers = self._read_playready_objects(prh)
except ConstructError:
if int.from_bytes(data[:2], byteorder="little") > 3:
try:
# PlayReady Header
prh = self.PlayreadyHeader.parse(data)
self.wrm_headers = self._read_playready_objects(prh)
except ConstructError:
raise InvalidPssh("Could not parse data as a PSSH Box nor a PlayReady Header")
else:
try:
# PlayReady Object
pro = self.PlayreadyObject.parse(data)
self.wrm_headers = [WRMHeader(pro.data)]
except ConstructError:
raise InvalidPssh("Could not parse data as a PSSH Box nor a PlayReady Object")
@staticmethod
def _read_playready_objects(header: Container) -> List[WRMHeader]:
return list(map(
lambda pro: WRMHeader(pro.data),
filter(
lambda pro: pro.type == 1,
header.records
)
))
def get_wrm_headers(self, downgrade_to_v4: bool = False) -> List[str]:
"""
Return a list of all WRM Headers in the PSSH as plaintext strings
downgrade_to_v4: Downgrade the WRM Header to version 4.0.0.0 to use AES-CBC instead of AES-CTR
"""
return list(map(
lambda wrm_header: wrm_header.to_v4_0_0_0() if downgrade_to_v4 else wrm_header.dumps(),
self.wrm_headers
))

View File

@@ -1,201 +0,0 @@
import base64
import binascii
from enum import Enum
from typing import Optional, List, Union, Tuple
import xmltodict
class WRMHeader:
"""Represents a PlayReady WRM Header"""
class SignedKeyID:
def __init__(
self,
alg_id: str,
value: str,
checksum: str
):
self.alg_id = alg_id
self.value = value
self.checksum = checksum
def __repr__(self):
return f'SignedKeyID(alg_id={self.alg_id}, value="{self.value}", checksum="{self.checksum}")'
class Version(Enum):
VERSION_4_0_0_0 = "4.0.0.0"
VERSION_4_1_0_0 = "4.1.0.0"
VERSION_4_2_0_0 = "4.2.0.0"
VERSION_4_3_0_0 = "4.3.0.0"
UNKNOWN = "UNKNOWN"
@classmethod
def _missing_(cls, value):
return cls.UNKNOWN
_RETURN_STRUCTURE = Tuple[List[SignedKeyID], Union[str, None], Union[str, None], Union[str, None]]
def __init__(self, data: Union[str, bytes]):
"""Load a WRM Header from either a string, base64 encoded data or bytes"""
if not data:
raise ValueError("Data must not be empty")
if isinstance(data, str):
try:
data = base64.b64decode(data).decode()
except (binascii.Error, binascii.Incomplete):
data = data.encode()
self._raw_data: bytes = data
self._parsed = xmltodict.parse(self._raw_data)
self._header = self._parsed.get('WRMHEADER')
if not self._header:
raise ValueError("Data is not a valid WRMHEADER")
self.version = self.Version(self._header.get('@version'))
@staticmethod
def _ensure_list(element: Union[dict, list]) -> List:
if isinstance(element, dict):
return [element]
return element
def to_v4_0_0_0(self) -> str:
"""
Build a v4.0.0.0 WRM header from any possible WRM Header version
Note: Will ignore any remaining Key IDs if there's more than just one
"""
return self._build_v4_0_0_0_wrm_header(*self.read_attributes())
@staticmethod
def _read_v4_0_0_0(data: dict) -> _RETURN_STRUCTURE:
protect_info = data.get("PROTECTINFO")
return (
[WRMHeader.SignedKeyID(
alg_id=protect_info["ALGID"],
value=data["KID"],
checksum=data.get("CHECKSUM")
)],
data.get("LA_URL"),
data.get("LUI_URL"),
data.get("DS_ID")
)
@staticmethod
def _read_v4_1_0_0(data: dict) -> _RETURN_STRUCTURE:
protect_info = data.get("PROTECTINFO")
key_ids = []
if protect_info:
kid = protect_info["KID"]
if kid:
key_ids = [WRMHeader.SignedKeyID(
alg_id=kid["@ALGID"],
value=kid["@VALUE"],
checksum=kid.get("@CHECKSUM")
)]
return (
key_ids,
data.get("LA_URL"),
data.get("LUI_URL"),
data.get("DS_ID")
)
@staticmethod
def _read_v4_2_0_0(data: dict) -> _RETURN_STRUCTURE:
protect_info = data.get("PROTECTINFO")
key_ids = []
if protect_info:
kids = protect_info["KIDS"]
if kids:
for kid in WRMHeader._ensure_list(kids["KID"]):
key_ids.append(WRMHeader.SignedKeyID(
alg_id=kid["@ALGID"],
value=kid["@VALUE"],
checksum=kid.get("@CHECKSUM")
))
return (
key_ids,
data.get("LA_URL"),
data.get("LUI_URL"),
data.get("DS_ID")
)
@staticmethod
def _read_v4_3_0_0(data: dict) -> _RETURN_STRUCTURE:
protect_info = data.get("PROTECTINFO")
key_ids = []
if protect_info:
kids = protect_info["KIDS"]
for kid in WRMHeader._ensure_list(kids["KID"]):
key_ids.append(WRMHeader.SignedKeyID(
alg_id=kid.get("@ALGID"),
value=kid["@VALUE"],
checksum=kid.get("@CHECKSUM")
))
return (
key_ids,
data.get("LA_URL"),
data.get("LUI_URL"),
data.get("DS_ID")
)
def read_attributes(self) -> _RETURN_STRUCTURE:
"""
Read any non-custom XML attributes
Returns a tuple structured like this: Tuple[List[SignedKeyID], <LA_URL>, <LUI_URL>, <DS_ID>]
"""
data = self._header.get("DATA")
if not data:
raise ValueError("Not a valid PlayReady Header Record, WRMHEADER/DATA required")
if self.version == self.Version.VERSION_4_0_0_0:
return self._read_v4_0_0_0(data)
elif self.version == self.Version.VERSION_4_1_0_0:
return self._read_v4_1_0_0(data)
elif self.version == self.Version.VERSION_4_2_0_0:
return self._read_v4_2_0_0(data)
elif self.version == self.Version.VERSION_4_3_0_0:
return self._read_v4_3_0_0(data)
@staticmethod
def _build_v4_0_0_0_wrm_header(
key_ids: List[SignedKeyID],
la_url: Optional[str],
lui_url: Optional[str],
ds_id: Optional[str]
) -> str:
if len(key_ids) == 0:
raise Exception("No Key IDs available")
key_id = key_ids[0]
return (
'<WRMHEADER xmlns="http://schemas.microsoft.com/DRM/2007/03/PlayReadyHeader" version="4.0.0.0">'
'<DATA>'
'<PROTECTINFO>'
'<KEYLEN>16</KEYLEN>'
'<ALGID>AESCTR</ALGID>'
'</PROTECTINFO>'
f'<KID>{key_id.value}</KID>' +
(f'<LA_URL>{la_url}</LA_URL>' if la_url else '') +
(f'<LUI_URL>{lui_url}</LUI_URL>' if lui_url else '') +
(f'<DS_ID>{ds_id}</DS_ID>' if ds_id else '') +
(f'<CHECKSUM>{key_id.checksum}</CHECKSUM>' if key_id.checksum else '') +
'</DATA>'
'</WRMHEADER>'
)
def dumps(self) -> str:
return self._raw_data.decode("utf-16-le")

View File

@@ -1,18 +0,0 @@
from ecpy.curves import Point, Curve
from modules.pyplayready.crypto.ecc_key import ECCKey
from modules.pyplayready.crypto.elgamal import ElGamal
class XmlKey:
def __init__(self):
self._shared_point = ECCKey.generate()
self.shared_key_x = self._shared_point.key.pointQ.x
self.shared_key_y = self._shared_point.key.pointQ.y
self._shared_key_x_bytes = ElGamal.to_bytes(int(self.shared_key_x))
self.aes_iv = self._shared_key_x_bytes[:16]
self.aes_key = self._shared_key_x_bytes[16:]
def get_point(self, curve: Curve) -> Point:
return Point(self.shared_key_x, self.shared_key_y, curve)

View File

@@ -1,252 +0,0 @@
from __future__ import annotations
import base64
from pathlib import Path
from typing import Union
from construct import Const, GreedyRange, Struct, Int32ub, Bytes, Int16ub, this, Switch, LazyBound, Array, Container
class _XMRLicenseStructs:
PlayEnablerType = Struct(
"player_enabler_type" / Bytes(16)
)
DomainRestrictionObject = Struct(
"account_id" / Bytes(16),
"revision" / Int32ub
)
IssueDateObject = Struct(
"issue_date" / Int32ub
)
RevInfoVersionObject = Struct(
"sequence" / Int32ub
)
SecurityLevelObject = Struct(
"minimum_security_level" / Int16ub
)
EmbeddedLicenseSettingsObject = Struct(
"indicator" / Int16ub
)
ECCKeyObject = Struct(
"curve_type" / Int16ub,
"key_length" / Int16ub,
"key" / Bytes(this.key_length)
)
SignatureObject = Struct(
"signature_type" / Int16ub,
"signature_data_length" / Int16ub,
"signature_data" / Bytes(this.signature_data_length)
)
ContentKeyObject = Struct(
"key_id" / Bytes(16),
"key_type" / Int16ub,
"cipher_type" / Int16ub,
"key_length" / Int16ub,
"encrypted_key" / Bytes(this.key_length)
)
RightsSettingsObject = Struct(
"rights" / Int16ub
)
OutputProtectionLevelRestrictionObject = Struct(
"minimum_compressed_digital_video_opl" / Int16ub,
"minimum_uncompressed_digital_video_opl" / Int16ub,
"minimum_analog_video_opl" / Int16ub,
"minimum_digital_compressed_audio_opl" / Int16ub,
"minimum_digital_uncompressed_audio_opl" / Int16ub,
)
ExpirationRestrictionObject = Struct(
"begin_date" / Int32ub,
"end_date" / Int32ub
)
RemovalDateObject = Struct(
"removal_date" / Int32ub
)
UplinkKIDObject = Struct(
"uplink_kid" / Bytes(16),
"chained_checksum_type" / Int16ub,
"chained_checksum_length" / Int16ub,
"chained_checksum" / Bytes(this.chained_checksum_length)
)
AnalogVideoOutputConfigurationRestriction = Struct(
"video_output_protection_id" / Bytes(16),
"binary_configuration_data" / Bytes(this._.length - 24)
)
DigitalVideoOutputRestrictionObject = Struct(
"video_output_protection_id" / Bytes(16),
"binary_configuration_data" / Bytes(this._.length - 24)
)
DigitalAudioOutputRestrictionObject = Struct(
"audio_output_protection_id" / Bytes(16),
"binary_configuration_data" / Bytes(this._.length - 24)
)
PolicyMetadataObject = Struct(
"metadata_type" / Bytes(16),
"policy_data" / Bytes(this._.length)
)
SecureStopRestrictionObject = Struct(
"metering_id" / Bytes(16)
)
MeteringRestrictionObject = Struct(
"metering_id" / Bytes(16)
)
ExpirationAfterFirstPlayRestrictionObject = Struct(
"seconds" / Int32ub
)
GracePeriodObject = Struct(
"grace_period" / Int32ub
)
SourceIdObject = Struct(
"source_id" / Int32ub
)
AuxiliaryKey = Struct(
"location" / Int32ub,
"key" / Bytes(16)
)
AuxiliaryKeysObject = Struct(
"count" / Int16ub,
"auxiliary_keys" / Array(this.count, AuxiliaryKey)
)
UplinkKeyObject3 = Struct(
"uplink_key_id" / Bytes(16),
"chained_length" / Int16ub,
"checksum" / Bytes(this.chained_length),
"count" / Int16ub,
"entries" / Array(this.count, Int32ub)
)
CopyEnablerObject = Struct(
"copy_enabler_type" / Bytes(16)
)
CopyCountRestrictionObject = Struct(
"count" / Int32ub
)
MoveObject = Struct(
"minimum_move_protection_level" / Int32ub
)
XmrObject = Struct(
"flags" / Int16ub,
"type" / Int16ub,
"length" / Int32ub,
"data" / Switch(
lambda ctx: ctx.type,
{
0x0005: OutputProtectionLevelRestrictionObject,
0x0008: AnalogVideoOutputConfigurationRestriction,
0x000a: ContentKeyObject,
0x000b: SignatureObject,
0x000d: RightsSettingsObject,
0x0012: ExpirationRestrictionObject,
0x0013: IssueDateObject,
0x0016: MeteringRestrictionObject,
0x001a: GracePeriodObject,
0x0022: SourceIdObject,
0x002a: ECCKeyObject,
0x002c: PolicyMetadataObject,
0x0029: DomainRestrictionObject,
0x0030: ExpirationAfterFirstPlayRestrictionObject,
0x0031: DigitalAudioOutputRestrictionObject,
0x0032: RevInfoVersionObject,
0x0033: EmbeddedLicenseSettingsObject,
0x0034: SecurityLevelObject,
0x0037: MoveObject,
0x0039: PlayEnablerType,
0x003a: CopyEnablerObject,
0x003b: UplinkKIDObject,
0x003d: CopyCountRestrictionObject,
0x0050: RemovalDateObject,
0x0051: AuxiliaryKeysObject,
0x0052: UplinkKeyObject3,
0x005a: SecureStopRestrictionObject,
0x0059: DigitalVideoOutputRestrictionObject
},
default=LazyBound(lambda ctx: _XMRLicenseStructs.XmrObject)
)
)
XmrLicense = Struct(
"signature" / Const(b"XMR\x00"),
"xmr_version" / Int32ub,
"rights_id" / Bytes(16),
"containers" / GreedyRange(XmrObject)
)
class XMRLicense(_XMRLicenseStructs):
"""Represents an XMRLicense"""
def __init__(
self,
parsed_license: Container,
license_obj: _XMRLicenseStructs.XmrLicense = _XMRLicenseStructs.XmrLicense
):
self.parsed = parsed_license
self._license_obj = license_obj
@classmethod
def loads(cls, data: Union[str, bytes]) -> XMRLicense:
if isinstance(data, str):
data = base64.b64decode(data)
if not isinstance(data, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
licence = _XMRLicenseStructs.XmrLicense
return cls(
parsed_license=licence.parse(data),
license_obj=licence
)
@classmethod
def load(cls, path: Union[Path, str]) -> XMRLicense:
if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}")
with Path(path).open(mode="rb") as f:
return cls.loads(f.read())
def dumps(self) -> bytes:
return self._license_obj.build(self.parsed)
def struct(self) -> _XMRLicenseStructs.XmrLicense:
return self._license_obj
def _locate(self, container: Container):
if container.flags == 2 or container.flags == 3:
return self._locate(container.data)
else:
return container
def get_object(self, type_: int):
for obj in self.parsed.containers:
container = self._locate(obj)
if container.type == type_:
yield container.data
def get_content_keys(self):
yield from self.get_object(10)

View File

@@ -4,49 +4,77 @@ from modules.config import apikey_required
from modules.playready import PLAYREADY
from modules.logging import setup_logging
# Setup logging
logging = setup_logging()
# ============================================================================================================================== #
logging = setup_logging()
playready_bp = Blueprint('playready_bp', __name__)
CORS(playready_bp, resources={r"/*": {"origins": "*"}}, supports_credentials=True, allow_headers=["Content-Type", "X-API-KEY"])
@playready_bp.route('/extension', methods=['POST'])
# ============================================================================================================================== #
@playready_bp.route("/<device>/open", methods=["GET"])
@apikey_required
@cross_origin()
def extension():
if not request.is_json:
response_data = {"message": "Missing JSON in request."}
return jsonify({"responseData": response_data}), 400
def open_device(device):
playready = PLAYREADY()
if playready.device_name != device:
response_data = {"message": "Ops! Invalid Device :P"}
return jsonify({"responseData": response_data}), 404
return playready.open_devices()
# ============================================================================================================================== #
@playready_bp.route("/<device>/close/<session_id>", methods=["GET"])
@apikey_required
@cross_origin()
def close_device(device, session_id):
playready = PLAYREADY()
if playready.device_name != device:
response_data = {"message": "Ops! Invalid Device :P"}
return jsonify({"responseData": response_data}), 404
return playready.close_devices(session_id)
# ============================================================================================================================== #
@playready_bp.route("/<device>/get_challenge", methods=["POST"])
@apikey_required
@cross_origin()
def get_challenge(device):
playready = PLAYREADY()
if playready.device_name != device:
return jsonify({"responseData": {"message": "Ops! Invalid Device :P"}}), 404
data = request.get_json()
action = data.get('action', None)
pssh = data.get('pssh', None)
session_id = data.get('session_id', None)
if not pssh or not session_id:
return jsonify({"responseData": {"message": "Missing required fields in JSON body."}}), 400
if not action:
response_data = {"message": "Missing action in request."}
return jsonify({"responseData": response_data}), 400
if action == "Challenge?":
pssh = data.get('pssh', None)
if not pssh:
response_data = {"message": "Missing pssh in request."}
return jsonify({"responseData": response_data}), 400
playready = PLAYREADY()
playready.pssh = pssh
return playready.get_license_challenge()
elif action == "Keys?":
license = data.get('license', None)
if not license:
response_data = {"message": "Missing license in request."}
return jsonify({"responseData": response_data}), 400
playready = PLAYREADY()
playready.license = license
return playready.get_license_keys()
else:
response_data = {"message": "Unknown action."}
return jsonify({"responseData": response_data}), 400
playready.pssh = pssh
playready.session_id = session_id
return playready.get_challenges(device)
# ============================================================================================================================== #
@playready_bp.route("/<device>/get_keys", methods=["POST"])
@apikey_required
@cross_origin()
def get_key(device):
playready = PLAYREADY()
if playready.device_name != device:
return jsonify({"responseData": {"message": "Ops! Invalid Device :P"}}), 404
data = request.get_json()
license = data.get('license_b64', None)
session_id = data.get('session_id', None)
if not license or not session_id:
return jsonify({"responseData": {"message": "Missing required fields in JSON body."}}), 400
playready.license = license
playready.session_id = session_id
return playready.get_keys(device)
# ============================================================================================================================== #