mirror of
https://github.com/ThatNotEasy/PlayReadyProxy-API.git
synced 2026-04-02 02:28:12 +00:00
released
This commit is contained in:
6
.env.example
Normal file
6
.env.example
Normal file
@@ -0,0 +1,6 @@
|
||||
[DEFAULT]
|
||||
FLASK_DEBUG = # True or False
|
||||
FLASK_ENV = # Development or Production
|
||||
FLASK_APP = # app.py
|
||||
FLASK_RUN_HOST = # LOCAL IP
|
||||
FLASK_RUN_PORT = # LOCAL PORT
|
||||
39
app.py
Normal file
39
app.py
Normal file
@@ -0,0 +1,39 @@
|
||||
from flask import Flask, request, render_template, Response, jsonify
|
||||
from modules.banners import banners, clear_terminal
|
||||
from routes.playready import playready_bp
|
||||
from flask_cors import CORS, cross_origin
|
||||
from modules.config import setup_config
|
||||
from modules.logging import setup_logging
|
||||
|
||||
|
||||
banners()
|
||||
logging = setup_logging()
|
||||
config = setup_config()
|
||||
app = Flask(__name__)
|
||||
CORS(app, resources={r"/*": {"origins": "*"}}, supports_credentials=True, allow_headers=["Content-Type", "X-API-KEY"])
|
||||
|
||||
# ========================================================================================================================================== #
|
||||
|
||||
prefix_url_api = '/api'
|
||||
app.register_blueprint(playready_bp, url_prefix=f'{prefix_url_api}/playready')
|
||||
|
||||
# ========================================================================================================================================== #
|
||||
|
||||
@app.route('/api')
|
||||
@cross_origin()
|
||||
def backend_api():
|
||||
result = {"message": "Ping Pong! PlayReady Proxy API"}
|
||||
return jsonify({"responseData": result})
|
||||
|
||||
# ========================================================================================================================================== #
|
||||
|
||||
@app.route('/api/')
|
||||
@cross_origin()
|
||||
def backend_api_slash():
|
||||
result = {"message": "Ping Pong! PlayReady Proxy API"}
|
||||
return jsonify({"responseData": result})
|
||||
|
||||
# ========================================================================================================================================== #
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run()
|
||||
2
config.ini.example
Normal file
2
config.ini.example
Normal file
@@ -0,0 +1,2 @@
|
||||
[PLAYREADY]
|
||||
DEVICE = device/example.prd # your device filename
|
||||
0
device/ssup nigga
Normal file
0
device/ssup nigga
Normal file
5
generate_apikey.py
Normal file
5
generate_apikey.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from modules.config import generate_api_key
|
||||
|
||||
new_user = input("Enter username: ")
|
||||
new_key = generate_api_key(new_user)
|
||||
print(f"Generated API key for 'new_user': {new_key}")
|
||||
0
modules/__init__.py
Normal file
0
modules/__init__.py
Normal file
26
modules/banners.py
Normal file
26
modules/banners.py
Normal file
@@ -0,0 +1,26 @@
|
||||
import pyfiglet
|
||||
import random, os
|
||||
from colorama import Fore, Back, Style, init
|
||||
|
||||
init(autoreset=True)
|
||||
|
||||
def clear_terminal():
|
||||
os.system('cls' if os.name == 'nt' else 'clear')
|
||||
|
||||
def banners():
|
||||
clear_terminal()
|
||||
banners = """
|
||||
██████╗ ██████╗ ██████╗ ███████╗ ██████╗██████╗ ██╗ ██╗██████╗ ████████╗ ██████╗ ██████╗
|
||||
██╔══██╗██╔══██╗ ██╔══██╗██╔════╝██╔════╝██╔══██╗╚██╗ ██╔╝██╔══██╗╚══██╔══╝██╔═══██╗██╔══██╗
|
||||
██████╔╝██████╔╝█████╗██║ ██║█████╗ ██║ ██████╔╝ ╚████╔╝ ██████╔╝ ██║ ██║ ██║██████╔╝
|
||||
██╔═══╝ ██╔══██╗╚════╝██║ ██║██╔══╝ ██║ ██╔══██╗ ╚██╔╝ ██╔═══╝ ██║ ██║ ██║██╔══██╗
|
||||
██║ ██║ ██║ ██████╔╝███████╗╚██████╗██║ ██║ ██║ ██║ ██║ ╚██████╔╝██║ ██║
|
||||
╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚══════╝ ╚═════╝╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝
|
||||
Author: GITHUB.COM/THATNOTEASY
|
||||
"""
|
||||
# Split the banner into lines
|
||||
banner_lines = banners.split('\n')
|
||||
colors = [Fore.RED, Fore.GREEN, Fore.YELLOW, Fore.BLUE, Fore.CYAN, Fore.MAGENTA]
|
||||
for line in banner_lines:
|
||||
random_color = random.choice(colors)
|
||||
print(random_color + line)
|
||||
65
modules/config.py
Normal file
65
modules/config.py
Normal file
@@ -0,0 +1,65 @@
|
||||
import configparser, json, secrets, logging, coloredlogs
|
||||
from functools import wraps
|
||||
from modules.logging import setup_logging
|
||||
from flask import request, jsonify
|
||||
|
||||
API_KEY_FILE = 'APIKEY.json'
|
||||
|
||||
def setup_config():
|
||||
config = configparser.ConfigParser()
|
||||
config.read('config.ini')
|
||||
return config
|
||||
|
||||
def load_api_keys():
|
||||
try:
|
||||
with open(API_KEY_FILE, 'r') as file:
|
||||
return json.load(file)
|
||||
except FileNotFoundError:
|
||||
logger.error("APIKEY.json file not found.")
|
||||
return []
|
||||
except json.JSONDecodeError:
|
||||
logger.error("Error decoding APIKEY.json.")
|
||||
return []
|
||||
|
||||
def save_api_keys(api_keys):
|
||||
with open(API_KEY_FILE, 'w') as file:
|
||||
json.dump(api_keys, file, indent=4)
|
||||
|
||||
def is_valid_api_key(provided_key):
|
||||
api_keys = load_api_keys()
|
||||
for entry in api_keys:
|
||||
if entry.get("apikey") == provided_key:
|
||||
return True
|
||||
return False
|
||||
|
||||
def generate_api_key(username):
|
||||
random_key = secrets.token_hex(16)
|
||||
new_api_key = f"{username}_{random_key}"
|
||||
api_keys = load_api_keys()
|
||||
user_found = False
|
||||
for entry in api_keys:
|
||||
if entry.get("username") == username:
|
||||
entry["apikey"] = new_api_key
|
||||
user_found = True
|
||||
break
|
||||
|
||||
if not user_found:
|
||||
api_keys.append({
|
||||
"username": username,
|
||||
"apikey": new_api_key
|
||||
})
|
||||
save_api_keys(api_keys)
|
||||
return new_api_key
|
||||
|
||||
def apikey_required(func):
|
||||
@wraps(func)
|
||||
def decorated_function(*args, **kwargs):
|
||||
provided_key = request.headers.get('X-API-KEY')
|
||||
if not provided_key:
|
||||
logging.error("X-API-KEY header is missing.")
|
||||
return jsonify({"responseData": "Opss! API key is missing"}), 403
|
||||
if not is_valid_api_key(provided_key):
|
||||
logging.error("Invalid X-API-KEY.")
|
||||
return jsonify({"responseData": "Opss! Invalid API key"}), 403
|
||||
return func(*args, **kwargs)
|
||||
return decorated_function
|
||||
17
modules/logging.py
Normal file
17
modules/logging.py
Normal file
@@ -0,0 +1,17 @@
|
||||
import logging, coloredlogs
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
def setup_logging():
|
||||
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
current_time = datetime.now().strftime("%I-%M-%S_%p_%b-%d-%Y")
|
||||
log_filename = f'{current_time}.log'
|
||||
logs_folder = 'logs'
|
||||
if not os.path.exists(logs_folder):
|
||||
os.makedirs(logs_folder)
|
||||
|
||||
coloredlogs.install(level='DEBUG', fmt=log_format, humanize=True)
|
||||
log_filepath = os.path.join(logs_folder, log_filename)
|
||||
logging.basicConfig(level=logging.DEBUG, format=log_format, filename=log_filepath)
|
||||
|
||||
return logging
|
||||
70
modules/playready.py
Normal file
70
modules/playready.py
Normal file
@@ -0,0 +1,70 @@
|
||||
import requests, base64, logging, coloredlogs, re
|
||||
import xml.dom.minidom as Dom
|
||||
import xml.etree.ElementTree as ET
|
||||
import json
|
||||
|
||||
from flask import jsonify
|
||||
from modules.logging import setup_logging
|
||||
from modules.config import setup_config
|
||||
|
||||
from modules.pyplayready.pssh import PSSH
|
||||
from modules.pyplayready.device import Device
|
||||
from modules.pyplayready.cdm import Cdm
|
||||
|
||||
class PLAYREADY:
|
||||
def __init__(self):
|
||||
self.pssh = None
|
||||
self.session_id = None
|
||||
self.challenge = None
|
||||
self.license = None
|
||||
|
||||
self.config = setup_config()
|
||||
self.device = self.config["PLAYREADY"]["DEVICE"]
|
||||
self.logging = setup_logging()
|
||||
|
||||
def get_license_challenge(self):
|
||||
device = Device.load(self.device)
|
||||
cdm = Cdm.from_device(self.device)
|
||||
pssh = PSSH(self.pssh)
|
||||
wrm_headers = pssh.get_wrm_headers(downgrade_to_v4=False)
|
||||
raw_challenge = cdm.get_license_challenge(wrm_headers[0])
|
||||
if isinstance(raw_challenge, str):
|
||||
raw_challenge = raw_challenge.encode('utf-8')
|
||||
|
||||
challenge_b64 = base64.b64encode(raw_challenge).decode('utf-8')
|
||||
return challenge_b64
|
||||
|
||||
def get_license_keys(self):
|
||||
device = Device.load(self.device)
|
||||
cdm = Cdm.from_device(device)
|
||||
license_data = self.license
|
||||
try:
|
||||
try:
|
||||
decoded_license = base64.b64decode(license_data, validate=True).decode('utf-8', errors='replace')
|
||||
except (base64.binascii.Error, UnicodeDecodeError):
|
||||
return jsonify({"error": "License must be provided as valid Base64-encoded data"}), 400
|
||||
|
||||
cleaned_license = re.sub(r'([a-zA-Z0-9:]+)(xmlns)', r'\1 \2', decoded_license)
|
||||
try:
|
||||
parsed_xml = ET.fromstring(cleaned_license)
|
||||
pretty_xml = Dom.parseString(cleaned_license).toprettyxml()
|
||||
except ET.ParseError as parse_error:
|
||||
return jsonify({"error": f"Invalid XML in license data: {parse_error}"}), 400
|
||||
except ValueError as decode_error:
|
||||
return jsonify({"error": "Invalid license format"}), 400
|
||||
try:
|
||||
cdm.parse_license(cleaned_license)
|
||||
except Exception as e:
|
||||
return jsonify({"error": f"Unable to parse license: {e}"}), 500
|
||||
|
||||
try:
|
||||
key_pairs = []
|
||||
for key in cdm.get_keys():
|
||||
self.kid = key.key_id.hex() if isinstance(key.key_id, bytes) else str(key.key_id).replace("-", "")
|
||||
self.key = key.key.hex() if isinstance(key.key, bytes) else str(key.key)
|
||||
key_pairs.append({"key_id": self.kid, "key": self.key})
|
||||
print(key_pairs)
|
||||
return jsonify(key_pairs)
|
||||
|
||||
except Exception as e:
|
||||
return jsonify({"error": f"Unable to extract keys: {e}"}), 500
|
||||
486
modules/pyplayready/bcert.py
Normal file
486
modules/pyplayready/bcert.py
Normal file
@@ -0,0 +1,486 @@
|
||||
from __future__ import annotations
|
||||
import collections.abc
|
||||
|
||||
from Crypto.PublicKey import ECC
|
||||
|
||||
from modules.pyplayready.crypto import Crypto
|
||||
from modules.pyplayready.exceptions import InvalidCertificateChain
|
||||
|
||||
# monkey patch for construct 2.8.8 compatibility
|
||||
if not hasattr(collections, 'Sequence'):
|
||||
collections.Sequence = collections.abc.Sequence
|
||||
|
||||
import base64
|
||||
from pathlib import Path
|
||||
from typing import Union
|
||||
|
||||
from construct import Bytes, Const, Int32ub, GreedyRange, Switch, Container, ListContainer
|
||||
from construct import Int16ub, Array
|
||||
from construct import Struct, this
|
||||
|
||||
from modules.pyplayready.crypto.ecc_key import ECCKey
|
||||
|
||||
|
||||
class _BCertStructs:
|
||||
DrmBCertBasicInfo = Struct(
|
||||
"cert_id" / Bytes(16),
|
||||
"security_level" / Int32ub,
|
||||
"flags" / Int32ub,
|
||||
"cert_type" / Int32ub,
|
||||
"public_key_digest" / Bytes(32),
|
||||
"expiration_date" / Int32ub,
|
||||
"client_id" / Bytes(16)
|
||||
)
|
||||
|
||||
# TODO: untested
|
||||
DrmBCertDomainInfo = Struct(
|
||||
"service_id" / Bytes(16),
|
||||
"account_id" / Bytes(16),
|
||||
"revision_timestamp" / Int32ub,
|
||||
"domain_url_length" / Int32ub,
|
||||
"domain_url" / Bytes((this.domain_url_length + 3) & 0xfffffffc)
|
||||
)
|
||||
|
||||
# TODO: untested
|
||||
DrmBCertPCInfo = Struct(
|
||||
"security_version" / Int32ub
|
||||
)
|
||||
|
||||
# TODO: untested
|
||||
DrmBCertDeviceInfo = Struct(
|
||||
"max_license" / Int32ub,
|
||||
"max_header" / Int32ub,
|
||||
"max_chain_depth" / Int32ub
|
||||
)
|
||||
|
||||
DrmBCertFeatureInfo = Struct(
|
||||
"feature_count" / Int32ub, # max. 32
|
||||
"features" / Array(this.feature_count, Int32ub)
|
||||
)
|
||||
|
||||
DrmBCertKeyInfo = Struct(
|
||||
"key_count" / Int32ub,
|
||||
"cert_keys" / Array(this.key_count, Struct(
|
||||
"type" / Int16ub,
|
||||
"length" / Int16ub,
|
||||
"flags" / Int32ub,
|
||||
"key" / Bytes(this.length // 8),
|
||||
"usages_count" / Int32ub,
|
||||
"usages" / Array(this.usages_count, Int32ub)
|
||||
))
|
||||
)
|
||||
|
||||
DrmBCertManufacturerInfo = Struct(
|
||||
"flags" / Int32ub,
|
||||
"manufacturer_name_length" / Int32ub,
|
||||
"manufacturer_name" / Bytes((this.manufacturer_name_length + 3) & 0xfffffffc),
|
||||
"model_name_length" / Int32ub,
|
||||
"model_name" / Bytes((this.model_name_length + 3) & 0xfffffffc),
|
||||
"model_number_length" / Int32ub,
|
||||
"model_number" / Bytes((this.model_number_length + 3) & 0xfffffffc),
|
||||
)
|
||||
|
||||
DrmBCertSignatureInfo = Struct(
|
||||
"signature_type" / Int16ub,
|
||||
"signature_size" / Int16ub,
|
||||
"signature" / Bytes(this.signature_size),
|
||||
"signature_key_size" / Int32ub,
|
||||
"signature_key" / Bytes(this.signature_key_size // 8)
|
||||
)
|
||||
|
||||
# TODO: untested
|
||||
DrmBCertSilverlightInfo = Struct(
|
||||
"security_version" / Int32ub,
|
||||
"platform_identifier" / Int32ub
|
||||
)
|
||||
|
||||
# TODO: untested
|
||||
DrmBCertMeteringInfo = Struct(
|
||||
"metering_id" / Bytes(16),
|
||||
"metering_url_length" / Int32ub,
|
||||
"metering_url" / Bytes((this.metering_url_length + 3) & 0xfffffffc)
|
||||
)
|
||||
|
||||
# TODO: untested
|
||||
DrmBCertExtDataSignKeyInfo = Struct(
|
||||
"key_type" / Int16ub,
|
||||
"key_length" / Int16ub,
|
||||
"flags" / Int32ub,
|
||||
"key" / Bytes(this.length // 8)
|
||||
)
|
||||
|
||||
# TODO: untested
|
||||
BCertExtDataRecord = Struct(
|
||||
"data_size" / Int32ub,
|
||||
"data" / Bytes(this.data_size)
|
||||
)
|
||||
|
||||
# TODO: untested
|
||||
DrmBCertExtDataSignature = Struct(
|
||||
"signature_type" / Int16ub,
|
||||
"signature_size" / Int16ub,
|
||||
"signature" / Bytes(this.signature_size)
|
||||
)
|
||||
|
||||
# TODO: untested
|
||||
BCertExtDataContainer = Struct(
|
||||
"record_count" / Int32ub, # always 1
|
||||
"records" / Array(this.record_count, BCertExtDataRecord),
|
||||
"signature" / DrmBCertExtDataSignature
|
||||
)
|
||||
|
||||
# TODO: untested
|
||||
DrmBCertServerInfo = Struct(
|
||||
"warning_days" / Int32ub
|
||||
)
|
||||
|
||||
# TODO: untested
|
||||
DrmBcertSecurityVersion = Struct(
|
||||
"security_version" / Int32ub,
|
||||
"platform_identifier" / Int32ub
|
||||
)
|
||||
|
||||
Attribute = Struct(
|
||||
"flags" / Int16ub,
|
||||
"tag" / Int16ub,
|
||||
"length" / Int32ub,
|
||||
"attribute" / Switch(
|
||||
lambda this_: this_.tag,
|
||||
{
|
||||
1: DrmBCertBasicInfo,
|
||||
2: DrmBCertDomainInfo,
|
||||
3: DrmBCertPCInfo,
|
||||
4: DrmBCertDeviceInfo,
|
||||
5: DrmBCertFeatureInfo,
|
||||
6: DrmBCertKeyInfo,
|
||||
7: DrmBCertManufacturerInfo,
|
||||
8: DrmBCertSignatureInfo,
|
||||
9: DrmBCertSilverlightInfo,
|
||||
10: DrmBCertMeteringInfo,
|
||||
11: DrmBCertExtDataSignKeyInfo,
|
||||
12: BCertExtDataContainer,
|
||||
13: DrmBCertExtDataSignature,
|
||||
14: Bytes(this.length - 8),
|
||||
15: DrmBCertServerInfo,
|
||||
16: DrmBcertSecurityVersion,
|
||||
17: DrmBcertSecurityVersion
|
||||
},
|
||||
default=Bytes(this.length - 8)
|
||||
)
|
||||
)
|
||||
|
||||
BCert = Struct(
|
||||
"signature" / Const(b"CERT"),
|
||||
"version" / Int32ub,
|
||||
"total_length" / Int32ub,
|
||||
"certificate_length" / Int32ub,
|
||||
"attributes" / GreedyRange(Attribute)
|
||||
)
|
||||
|
||||
BCertChain = Struct(
|
||||
"signature" / Const(b"CHAI"),
|
||||
"version" / Int32ub,
|
||||
"total_length" / Int32ub,
|
||||
"flags" / Int32ub,
|
||||
"certificate_count" / Int32ub,
|
||||
"certificates" / GreedyRange(BCert)
|
||||
)
|
||||
|
||||
|
||||
class Certificate(_BCertStructs):
|
||||
"""Represents a BCert"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parsed_bcert: Container,
|
||||
bcert_obj: _BCertStructs.BCert = _BCertStructs.BCert
|
||||
):
|
||||
self.parsed = parsed_bcert
|
||||
self._BCERT = bcert_obj
|
||||
|
||||
@classmethod
|
||||
def new_leaf_cert(
|
||||
cls,
|
||||
cert_id: bytes,
|
||||
security_level: int,
|
||||
client_id: bytes,
|
||||
signing_key: ECCKey,
|
||||
encryption_key: ECCKey,
|
||||
group_key: ECCKey,
|
||||
parent: CertificateChain,
|
||||
expiry: int = 0xFFFFFFFF,
|
||||
max_license: int = 10240,
|
||||
max_header: int = 15360,
|
||||
max_chain_depth: int = 2
|
||||
) -> Certificate:
|
||||
basic_info = Container(
|
||||
cert_id=cert_id,
|
||||
security_level=security_level,
|
||||
flags=0,
|
||||
cert_type=2,
|
||||
public_key_digest=signing_key.public_sha256_digest(),
|
||||
expiration_date=expiry,
|
||||
client_id=client_id
|
||||
)
|
||||
basic_info_attribute = Container(
|
||||
flags=1,
|
||||
tag=1,
|
||||
length=len(_BCertStructs.DrmBCertBasicInfo.build(basic_info)) + 8,
|
||||
attribute=basic_info
|
||||
)
|
||||
|
||||
device_info = Container(
|
||||
max_license=max_license,
|
||||
max_header=max_header,
|
||||
max_chain_depth=max_chain_depth
|
||||
)
|
||||
device_info_attribute = Container(
|
||||
flags=1,
|
||||
tag=4,
|
||||
length=len(_BCertStructs.DrmBCertDeviceInfo.build(device_info)) + 8,
|
||||
attribute=device_info
|
||||
)
|
||||
|
||||
feature = Container(
|
||||
feature_count=3,
|
||||
features=ListContainer([
|
||||
# 1, # Transmitter
|
||||
# 2, # Receiver
|
||||
# 3, # SharedCertificate
|
||||
4, # SecureClock
|
||||
# 5, # AntiRollBackClock
|
||||
# 6, # ReservedMetering
|
||||
# 7, # ReservedLicSync
|
||||
# 8, # ReservedSymOpt
|
||||
9, # CRLS (Revocation Lists)
|
||||
# 10, # ServerBasicEdition
|
||||
# 11, # ServerStandardEdition
|
||||
# 12, # ServerPremiumEdition
|
||||
13, # PlayReady3Features
|
||||
# 14, # DeprecatedSecureStop
|
||||
])
|
||||
)
|
||||
feature_attribute = Container(
|
||||
flags=1,
|
||||
tag=5,
|
||||
length=len(_BCertStructs.DrmBCertFeatureInfo.build(feature)) + 8,
|
||||
attribute=feature
|
||||
)
|
||||
|
||||
cert_key_sign = Container(
|
||||
type=1,
|
||||
length=512, # bits
|
||||
flags=0,
|
||||
key=signing_key.public_bytes(),
|
||||
usages_count=1,
|
||||
usages=ListContainer([
|
||||
1 # KEYUSAGE_SIGN
|
||||
])
|
||||
)
|
||||
cert_key_encrypt = Container(
|
||||
type=1,
|
||||
length=512, # bits
|
||||
flags=0,
|
||||
key=encryption_key.public_bytes(),
|
||||
usages_count=1,
|
||||
usages=ListContainer([
|
||||
2 # KEYUSAGE_ENCRYPT_KEY
|
||||
])
|
||||
)
|
||||
key_info = Container(
|
||||
key_count=2,
|
||||
cert_keys=ListContainer([
|
||||
cert_key_sign,
|
||||
cert_key_encrypt
|
||||
])
|
||||
)
|
||||
key_info_attribute = Container(
|
||||
flags=1,
|
||||
tag=6,
|
||||
length=len(_BCertStructs.DrmBCertKeyInfo.build(key_info)) + 8,
|
||||
attribute=key_info
|
||||
)
|
||||
|
||||
manufacturer_info = parent.get_certificate(0).get_attribute(7)
|
||||
|
||||
new_bcert_container = Container(
|
||||
signature=b"CERT",
|
||||
version=1,
|
||||
total_length=0, # filled at a later time
|
||||
certificate_length=0, # filled at a later time
|
||||
attributes=ListContainer([
|
||||
basic_info_attribute,
|
||||
device_info_attribute,
|
||||
feature_attribute,
|
||||
key_info_attribute,
|
||||
manufacturer_info,
|
||||
])
|
||||
)
|
||||
|
||||
payload = _BCertStructs.BCert.build(new_bcert_container)
|
||||
new_bcert_container.certificate_length = len(payload)
|
||||
new_bcert_container.total_length = len(payload) + 144 # signature length
|
||||
|
||||
sign_payload = _BCertStructs.BCert.build(new_bcert_container)
|
||||
signature = Crypto.ecc256_sign(group_key, sign_payload)
|
||||
|
||||
signature_info = Container(
|
||||
signature_type=1,
|
||||
signature_size=64,
|
||||
signature=signature,
|
||||
signature_key_size=512, # bits
|
||||
signature_key=group_key.public_bytes()
|
||||
)
|
||||
signature_info_attribute = Container(
|
||||
flags=1,
|
||||
tag=8,
|
||||
length=len(_BCertStructs.DrmBCertSignatureInfo.build(signature_info)) + 8,
|
||||
attribute=signature_info
|
||||
)
|
||||
new_bcert_container.attributes.append(signature_info_attribute)
|
||||
|
||||
return cls(new_bcert_container)
|
||||
|
||||
@classmethod
|
||||
def loads(cls, data: Union[str, bytes]) -> Certificate:
|
||||
if isinstance(data, str):
|
||||
data = base64.b64decode(data)
|
||||
if not isinstance(data, bytes):
|
||||
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
|
||||
|
||||
cert = _BCertStructs.BCert
|
||||
return cls(
|
||||
parsed_bcert=cert.parse(data),
|
||||
bcert_obj=cert
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def load(cls, path: Union[Path, str]) -> Certificate:
|
||||
if not isinstance(path, (Path, str)):
|
||||
raise ValueError(f"Expecting Path object or path string, got {path!r}")
|
||||
with Path(path).open(mode="rb") as f:
|
||||
return cls.loads(f.read())
|
||||
|
||||
def get_attribute(self, type_: int):
|
||||
for attribute in self.parsed.attributes:
|
||||
if attribute.tag == type_:
|
||||
return attribute
|
||||
|
||||
def get_security_level(self) -> int:
|
||||
basic_info_attribute = self.get_attribute(1).attribute
|
||||
if basic_info_attribute:
|
||||
return basic_info_attribute.security_level
|
||||
|
||||
@staticmethod
|
||||
def _unpad(name: bytes):
|
||||
return name.rstrip(b'\x00').decode("utf-8", errors="ignore")
|
||||
|
||||
def get_name(self):
|
||||
manufacturer_info = self.get_attribute(7).attribute
|
||||
if manufacturer_info:
|
||||
return f"{self._unpad(manufacturer_info.manufacturer_name)} {self._unpad(manufacturer_info.model_name)} {self._unpad(manufacturer_info.model_number)}"
|
||||
|
||||
def dumps(self) -> bytes:
|
||||
return self._BCERT.build(self.parsed)
|
||||
|
||||
def struct(self) -> _BCertStructs.BCert:
|
||||
return self._BCERT
|
||||
|
||||
def verify_signature(self):
|
||||
signature_object = self.get_attribute(8)
|
||||
signature_attribute = signature_object.attribute
|
||||
|
||||
sign_payload = self.dumps()[:-signature_object.length]
|
||||
|
||||
raw_signature_key = signature_attribute.signature_key
|
||||
signature_key = ECC.construct(
|
||||
curve='P-256',
|
||||
point_x=int.from_bytes(raw_signature_key[:32], 'big'),
|
||||
point_y=int.from_bytes(raw_signature_key[32:], 'big')
|
||||
)
|
||||
|
||||
return Crypto.ecc256_verify(
|
||||
public_key=signature_key,
|
||||
data=sign_payload,
|
||||
signature=signature_attribute.signature
|
||||
)
|
||||
|
||||
|
||||
class CertificateChain(_BCertStructs):
|
||||
"""Represents a BCertChain"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parsed_bcert_chain: Container,
|
||||
bcert_chain_obj: _BCertStructs.BCertChain = _BCertStructs.BCertChain
|
||||
):
|
||||
self.parsed = parsed_bcert_chain
|
||||
self._BCERT_CHAIN = bcert_chain_obj
|
||||
|
||||
@classmethod
|
||||
def loads(cls, data: Union[str, bytes]) -> CertificateChain:
|
||||
if isinstance(data, str):
|
||||
data = base64.b64decode(data)
|
||||
if not isinstance(data, bytes):
|
||||
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
|
||||
|
||||
cert_chain = _BCertStructs.BCertChain
|
||||
return cls(
|
||||
parsed_bcert_chain=cert_chain.parse(data),
|
||||
bcert_chain_obj=cert_chain
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def load(cls, path: Union[Path, str]) -> CertificateChain:
|
||||
if not isinstance(path, (Path, str)):
|
||||
raise ValueError(f"Expecting Path object or path string, got {path!r}")
|
||||
with Path(path).open(mode="rb") as f:
|
||||
return cls.loads(f.read())
|
||||
|
||||
def dumps(self) -> bytes:
|
||||
return self._BCERT_CHAIN.build(self.parsed)
|
||||
|
||||
def struct(self) -> _BCertStructs.BCertChain:
|
||||
return self._BCERT_CHAIN
|
||||
|
||||
def get_certificate(self, index: int) -> Certificate:
|
||||
return Certificate(self.parsed.certificates[index])
|
||||
|
||||
def get_security_level(self) -> int:
|
||||
# not sure if there's a better way than this
|
||||
return self.get_certificate(0).get_security_level()
|
||||
|
||||
def get_name(self) -> str:
|
||||
return self.get_certificate(0).get_name()
|
||||
|
||||
def append(self, bcert: Certificate) -> None:
|
||||
self.parsed.certificate_count += 1
|
||||
self.parsed.certificates.append(bcert.parsed)
|
||||
self.parsed.total_length += len(bcert.dumps())
|
||||
|
||||
def prepend(self, bcert: Certificate) -> None:
|
||||
self.parsed.certificate_count += 1
|
||||
self.parsed.certificates.insert(0, bcert.parsed)
|
||||
self.parsed.total_length += len(bcert.dumps())
|
||||
|
||||
def remove(self, index: int) -> None:
|
||||
if self.parsed.certificate_count <= 0:
|
||||
raise InvalidCertificateChain("CertificateChain does not contain any Certificates")
|
||||
if index >= self.parsed.certificate_count:
|
||||
raise IndexError(f"No Certificate at index {index}, {self.parsed.certificate_count} total")
|
||||
|
||||
self.parsed.certificate_count -= 1
|
||||
bcert = Certificate(self.parsed.certificates[index])
|
||||
self.parsed.total_length -= len(bcert.dumps())
|
||||
self.parsed.certificates.pop(index)
|
||||
|
||||
def get(self, index: int) -> Certificate:
|
||||
if self.parsed.certificate_count <= 0:
|
||||
raise InvalidCertificateChain("CertificateChain does not contain any Certificates")
|
||||
if index >= self.parsed.certificate_count:
|
||||
raise IndexError(f"No Certificate at index {index}, {self.parsed.certificate_count} total")
|
||||
|
||||
return Certificate(self.parsed.certificates[index])
|
||||
|
||||
def count(self) -> int:
|
||||
return self.parsed.certificate_count
|
||||
218
modules/pyplayready/cdm.py
Normal file
218
modules/pyplayready/cdm.py
Normal file
@@ -0,0 +1,218 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import math
|
||||
import time
|
||||
from typing import List
|
||||
from uuid import UUID
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Hash import SHA256
|
||||
from Crypto.Random import get_random_bytes
|
||||
from Crypto.Signature import DSS
|
||||
from Crypto.Util.Padding import pad
|
||||
from ecpy.curves import Point, Curve
|
||||
|
||||
from modules.pyplayready.bcert import CertificateChain
|
||||
from modules.pyplayready.crypto.ecc_key import ECCKey
|
||||
from modules.pyplayready.key import Key
|
||||
from modules.pyplayready.xml_key import XmlKey
|
||||
from modules.pyplayready.crypto.elgamal import ElGamal
|
||||
from modules.pyplayready.xmrlicense import XMRLicense
|
||||
|
||||
|
||||
class Cdm:
|
||||
def __init__(
|
||||
self,
|
||||
security_level: int,
|
||||
certificate_chain: CertificateChain,
|
||||
encryption_key: ECCKey,
|
||||
signing_key: ECCKey,
|
||||
client_version: str = "10.0.16384.10011",
|
||||
protocol_version: int = 1
|
||||
):
|
||||
self.security_level = security_level
|
||||
self.certificate_chain = certificate_chain
|
||||
self.encryption_key = encryption_key
|
||||
self.signing_key = signing_key
|
||||
self.client_version = client_version
|
||||
self.protocol_version = protocol_version
|
||||
|
||||
self.curve = Curve.get_curve("secp256r1")
|
||||
self.elgamal = ElGamal(self.curve)
|
||||
|
||||
self._wmrm_key = Point(
|
||||
x=0xc8b6af16ee941aadaa5389b4af2c10e356be42af175ef3face93254e7b0b3d9b,
|
||||
y=0x982b27b5cb2341326e56aa857dbfd5c634ce2cf9ea74fca8f2af5957efeea562,
|
||||
curve=self.curve
|
||||
)
|
||||
self._xml_key = XmlKey()
|
||||
|
||||
self._keys: List[Key] = []
|
||||
|
||||
@classmethod
|
||||
def from_device(cls, device) -> Cdm:
|
||||
"""Initialize a Playready CDM from a Playready Device (.prd) file"""
|
||||
return cls(
|
||||
security_level=device.security_level,
|
||||
certificate_chain=device.group_certificate,
|
||||
encryption_key=device.encryption_key,
|
||||
signing_key=device.signing_key
|
||||
)
|
||||
|
||||
def get_key_data(self) -> bytes:
|
||||
point1, point2 = self.elgamal.encrypt(
|
||||
message_point=self._xml_key.get_point(self.elgamal.curve),
|
||||
public_key=self._wmrm_key
|
||||
)
|
||||
return self.elgamal.to_bytes(point1.x) + self.elgamal.to_bytes(point1.y) + self.elgamal.to_bytes(point2.x) + self.elgamal.to_bytes(point2.y)
|
||||
|
||||
def get_cipher_data(self) -> bytes:
|
||||
b64_chain = base64.b64encode(self.certificate_chain.dumps()).decode()
|
||||
body = f"<Data><CertificateChains><CertificateChain>{b64_chain}</CertificateChain></CertificateChains></Data>"
|
||||
|
||||
cipher = AES.new(
|
||||
key=self._xml_key.aes_key,
|
||||
mode=AES.MODE_CBC,
|
||||
iv=self._xml_key.aes_iv
|
||||
)
|
||||
|
||||
ciphertext = cipher.encrypt(pad(
|
||||
body.encode(),
|
||||
AES.block_size
|
||||
))
|
||||
|
||||
return self._xml_key.aes_iv + ciphertext
|
||||
|
||||
def _build_digest_content(
|
||||
self,
|
||||
content_header: str,
|
||||
nonce: str,
|
||||
wmrm_cipher: str,
|
||||
cert_cipher: str
|
||||
) -> str:
|
||||
return (
|
||||
'<LA xmlns="http://schemas.microsoft.com/DRM/2007/03/protocols" Id="SignedData" xml:space="preserve">'
|
||||
f'<Version>{self.protocol_version}</Version>'
|
||||
f'<ContentHeader>{content_header}</ContentHeader>'
|
||||
'<CLIENTINFO>'
|
||||
f'<CLIENTVERSION>{self.client_version}</CLIENTVERSION>'
|
||||
'</CLIENTINFO>'
|
||||
f'<LicenseNonce>{nonce}</LicenseNonce>'
|
||||
f'<ClientTime>{math.floor(time.time())}</ClientTime>'
|
||||
'<EncryptedData xmlns="http://www.w3.org/2001/04/xmlenc#" Type="http://www.w3.org/2001/04/xmlenc#Element">'
|
||||
'<EncryptionMethod Algorithm="http://www.w3.org/2001/04/xmlenc#aes128-cbc"></EncryptionMethod>'
|
||||
'<KeyInfo xmlns="http://www.w3.org/2000/09/xmldsig#">'
|
||||
'<EncryptedKey xmlns="http://www.w3.org/2001/04/xmlenc#">'
|
||||
'<EncryptionMethod Algorithm="http://schemas.microsoft.com/DRM/2007/03/protocols#ecc256"></EncryptionMethod>'
|
||||
'<KeyInfo xmlns="http://www.w3.org/2000/09/xmldsig#">'
|
||||
'<KeyName>WMRMServer</KeyName>'
|
||||
'</KeyInfo>'
|
||||
'<CipherData>'
|
||||
f'<CipherValue>{wmrm_cipher}</CipherValue>'
|
||||
'</CipherData>'
|
||||
'</EncryptedKey>'
|
||||
'</KeyInfo>'
|
||||
'<CipherData>'
|
||||
f'<CipherValue>{cert_cipher}</CipherValue>'
|
||||
'</CipherData>'
|
||||
'</EncryptedData>'
|
||||
'</LA>'
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _build_signed_info(digest_value: str) -> str:
|
||||
return (
|
||||
'<SignedInfo xmlns="http://www.w3.org/2000/09/xmldsig#">'
|
||||
'<CanonicalizationMethod Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315"></CanonicalizationMethod>'
|
||||
'<SignatureMethod Algorithm="http://schemas.microsoft.com/DRM/2007/03/protocols#ecdsa-sha256"></SignatureMethod>'
|
||||
'<Reference URI="#SignedData">'
|
||||
'<DigestMethod Algorithm="http://schemas.microsoft.com/DRM/2007/03/protocols#sha256"></DigestMethod>'
|
||||
f'<DigestValue>{digest_value}</DigestValue>'
|
||||
'</Reference>'
|
||||
'</SignedInfo>'
|
||||
)
|
||||
|
||||
def get_license_challenge(self, content_header: str) -> str:
|
||||
la_content = self._build_digest_content(
|
||||
content_header=content_header,
|
||||
nonce=base64.b64encode(get_random_bytes(16)).decode(),
|
||||
wmrm_cipher=base64.b64encode(self.get_key_data()).decode(),
|
||||
cert_cipher=base64.b64encode(self.get_cipher_data()).decode()
|
||||
)
|
||||
|
||||
la_hash_obj = SHA256.new()
|
||||
la_hash_obj.update(la_content.encode())
|
||||
la_hash = la_hash_obj.digest()
|
||||
|
||||
signed_info = self._build_signed_info(base64.b64encode(la_hash).decode())
|
||||
signed_info_digest = SHA256.new(signed_info.encode())
|
||||
|
||||
signer = DSS.new(self.signing_key.key, 'fips-186-3')
|
||||
signature = signer.sign(signed_info_digest)
|
||||
|
||||
# haven't found a better way to do this. xmltodict.unparse doesn't work
|
||||
main_body = (
|
||||
'<?xml version="1.0" encoding="utf-8"?>'
|
||||
'<soap:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">'
|
||||
'<soap:Body>'
|
||||
'<AcquireLicense xmlns="http://schemas.microsoft.com/DRM/2007/03/protocols">'
|
||||
'<challenge>'
|
||||
'<Challenge xmlns="http://schemas.microsoft.com/DRM/2007/03/protocols/messages">'
|
||||
+ la_content +
|
||||
'<Signature xmlns="http://www.w3.org/2000/09/xmldsig#">'
|
||||
+ signed_info +
|
||||
f'<SignatureValue>{base64.b64encode(signature).decode()}</SignatureValue>'
|
||||
'<KeyInfo xmlns="http://www.w3.org/2000/09/xmldsig#">'
|
||||
'<KeyValue>'
|
||||
'<ECCKeyValue>'
|
||||
f'<PublicKey>{base64.b64encode(self.signing_key.public_bytes()).decode()}</PublicKey>'
|
||||
'</ECCKeyValue>'
|
||||
'</KeyValue>'
|
||||
'</KeyInfo>'
|
||||
'</Signature>'
|
||||
'</Challenge>'
|
||||
'</challenge>'
|
||||
'</AcquireLicense>'
|
||||
'</soap:Body>'
|
||||
'</soap:Envelope>'
|
||||
)
|
||||
|
||||
return main_body
|
||||
|
||||
def _decrypt_ecc256_key(self, encrypted_key: bytes) -> bytes:
|
||||
point1 = Point(
|
||||
x=int.from_bytes(encrypted_key[:32], 'big'),
|
||||
y=int.from_bytes(encrypted_key[32:64], 'big'),
|
||||
curve=self.curve
|
||||
)
|
||||
point2 = Point(
|
||||
x=int.from_bytes(encrypted_key[64:96], 'big'),
|
||||
y=int.from_bytes(encrypted_key[96:128], 'big'),
|
||||
curve=self.curve
|
||||
)
|
||||
|
||||
decrypted = self.elgamal.decrypt((point1, point2), int(self.encryption_key.key.d))
|
||||
return self.elgamal.to_bytes(decrypted.x)[16:32]
|
||||
|
||||
def parse_license(self, licence: str) -> None:
|
||||
try:
|
||||
root = ET.fromstring(licence)
|
||||
license_elements = root.findall(".//{http://schemas.microsoft.com/DRM/2007/03/protocols}License")
|
||||
for license_element in license_elements:
|
||||
parsed_licence = XMRLicense.loads(license_element.text)
|
||||
for key in parsed_licence.get_content_keys():
|
||||
if Key.CipherType(key.cipher_type) == Key.CipherType.ECC_256:
|
||||
self._keys.append(Key(
|
||||
key_id=UUID(bytes_le=key.key_id),
|
||||
key_type=key.key_type,
|
||||
cipher_type=key.cipher_type,
|
||||
key_length=key.key_length,
|
||||
key=self._decrypt_ecc256_key(key.encrypted_key)
|
||||
))
|
||||
except Exception as e:
|
||||
raise Exception(f"Unable to parse license, {e}")
|
||||
|
||||
def get_keys(self) -> List[Key]:
|
||||
return self._keys
|
||||
96
modules/pyplayready/crypto/__init__.py
Normal file
96
modules/pyplayready/crypto/__init__.py
Normal file
@@ -0,0 +1,96 @@
|
||||
from typing import Union, Tuple
|
||||
|
||||
from Crypto.Hash import SHA256
|
||||
from Crypto.Hash.SHA256 import SHA256Hash
|
||||
from Crypto.PublicKey.ECC import EccKey
|
||||
from Crypto.Signature import DSS
|
||||
from ecpy.curves import Point, Curve
|
||||
|
||||
from modules.pyplayready.crypto.elgamal import ElGamal
|
||||
from modules.pyplayready.crypto.ecc_key import ECCKey
|
||||
|
||||
|
||||
class Crypto:
|
||||
def __init__(self, curve: str = "secp256r1"):
|
||||
self.curve = Curve.get_curve(curve)
|
||||
self.elgamal = ElGamal(self.curve)
|
||||
|
||||
def ecc256_encrypt(self, public_key: Union[ECCKey, Point], plaintext: Union[Point, bytes]) -> bytes:
|
||||
if isinstance(public_key, ECCKey):
|
||||
public_key = public_key.get_point(self.curve)
|
||||
if not isinstance(public_key, Point):
|
||||
raise ValueError(f"Expecting ECCKey or Point input, got {public_key!r}")
|
||||
|
||||
if isinstance(plaintext, bytes):
|
||||
plaintext = Point(
|
||||
x=int.from_bytes(plaintext[:32], 'big'),
|
||||
y=int.from_bytes(plaintext[32:64], 'big'),
|
||||
curve=self.curve
|
||||
)
|
||||
if not isinstance(plaintext, Point):
|
||||
raise ValueError(f"Expecting Point or Bytes input, got {plaintext!r}")
|
||||
|
||||
point1, point2 = self.elgamal.encrypt(
|
||||
message_point=plaintext,
|
||||
public_key=public_key
|
||||
)
|
||||
return b''.join([
|
||||
self.elgamal.to_bytes(point1.x),
|
||||
self.elgamal.to_bytes(point1.y),
|
||||
self.elgamal.to_bytes(point2.x),
|
||||
self.elgamal.to_bytes(point2.y)
|
||||
])
|
||||
|
||||
def ecc256_decrypt(self, private_key: ECCKey, ciphertext: Union[Tuple[Point, Point], bytes]) -> bytes:
|
||||
if isinstance(ciphertext, bytes):
|
||||
ciphertext = (
|
||||
Point(
|
||||
x=int.from_bytes(ciphertext[:32], 'big'),
|
||||
y=int.from_bytes(ciphertext[32:64], 'big'),
|
||||
curve=self.curve
|
||||
),
|
||||
Point(
|
||||
x=int.from_bytes(ciphertext[64:96], 'big'),
|
||||
y=int.from_bytes(ciphertext[96:128], 'big'),
|
||||
curve=self.curve
|
||||
)
|
||||
)
|
||||
if not isinstance(ciphertext, Tuple):
|
||||
raise ValueError(f"Expecting Tuple[Point, Point] or Bytes input, got {ciphertext!r}")
|
||||
|
||||
decrypted = self.elgamal.decrypt(ciphertext, int(private_key.key.d))
|
||||
return self.elgamal.to_bytes(decrypted.x)
|
||||
|
||||
@staticmethod
|
||||
def ecc256_sign(private_key: Union[ECCKey, EccKey], data: Union[SHA256Hash, bytes]) -> bytes:
|
||||
if isinstance(private_key, ECCKey):
|
||||
private_key = private_key.key
|
||||
if not isinstance(private_key, EccKey):
|
||||
raise ValueError(f"Expecting ECCKey or EccKey input, got {private_key!r}")
|
||||
|
||||
if isinstance(data, bytes):
|
||||
data = SHA256.new(data)
|
||||
if not isinstance(data, SHA256Hash):
|
||||
raise ValueError(f"Expecting SHA256Hash or Bytes input, got {data!r}")
|
||||
|
||||
signer = DSS.new(private_key, 'fips-186-3')
|
||||
return signer.sign(data)
|
||||
|
||||
@staticmethod
|
||||
def ecc256_verify(public_key: Union[ECCKey, EccKey], data: Union[SHA256Hash, bytes], signature: bytes) -> bool:
|
||||
if isinstance(public_key, ECCKey):
|
||||
public_key = public_key.key
|
||||
if not isinstance(public_key, EccKey):
|
||||
raise ValueError(f"Expecting ECCKey or EccKey input, got {public_key!r}")
|
||||
|
||||
if isinstance(data, bytes):
|
||||
data = SHA256.new(data)
|
||||
if not isinstance(data, SHA256Hash):
|
||||
raise ValueError(f"Expecting SHA256Hash or Bytes input, got {data!r}")
|
||||
|
||||
verifier = DSS.new(public_key, 'fips-186-3')
|
||||
try:
|
||||
verifier.verify(data, signature)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
95
modules/pyplayready/crypto/ecc_key.py
Normal file
95
modules/pyplayready/crypto/ecc_key.py
Normal file
@@ -0,0 +1,95 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
from pathlib import Path
|
||||
from typing import Union
|
||||
|
||||
from Crypto.Hash import SHA256
|
||||
from Crypto.PublicKey import ECC
|
||||
from Crypto.PublicKey.ECC import EccKey
|
||||
from ecpy.curves import Curve, Point
|
||||
|
||||
|
||||
class ECCKey:
|
||||
"""Represents a PlayReady ECC key pair"""
|
||||
|
||||
def __init__(self, key: EccKey):
|
||||
self.key = key
|
||||
|
||||
@classmethod
|
||||
def generate(cls):
|
||||
"""Generate a new ECC key pair"""
|
||||
return cls(key=ECC.generate(curve='P-256'))
|
||||
|
||||
@classmethod
|
||||
def construct(cls, private_key: Union[bytes, int]):
|
||||
"""Construct an ECC key pair from private/public bytes/ints"""
|
||||
if isinstance(private_key, bytes):
|
||||
private_key = int.from_bytes(private_key, 'big')
|
||||
if not isinstance(private_key, int):
|
||||
raise ValueError(f"Expecting Bytes or Int input, got {private_key!r}")
|
||||
|
||||
# The public is always derived from the private key; loading the other stuff won't work
|
||||
key = ECC.construct(
|
||||
curve='P-256',
|
||||
d=private_key,
|
||||
)
|
||||
|
||||
return cls(key=key)
|
||||
|
||||
@classmethod
|
||||
def loads(cls, data: Union[str, bytes]) -> ECCKey:
|
||||
if isinstance(data, str):
|
||||
data = base64.b64decode(data)
|
||||
if not isinstance(data, bytes):
|
||||
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
|
||||
|
||||
if len(data) not in [96, 32]:
|
||||
raise ValueError(f"Invalid data length. Expecting 96 or 32 bytes, got {len(data)}")
|
||||
|
||||
return cls.construct(private_key=data[:32])
|
||||
|
||||
@classmethod
|
||||
def load(cls, path: Union[Path, str]) -> ECCKey:
|
||||
if not isinstance(path, (Path, str)):
|
||||
raise ValueError(f"Expecting Path object or path string, got {path!r}")
|
||||
with Path(path).open(mode="rb") as f:
|
||||
return cls.loads(f.read())
|
||||
|
||||
def dumps(self, private_only=False):
|
||||
if private_only:
|
||||
return self.private_bytes()
|
||||
return self.private_bytes() + self.public_bytes()
|
||||
|
||||
def dump(self, path: Union[Path, str], private_only=False) -> None:
|
||||
if not isinstance(path, (Path, str)):
|
||||
raise ValueError(f"Expecting Path object or path string, got {path!r}")
|
||||
path = Path(path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_bytes(self.dumps(private_only))
|
||||
|
||||
@staticmethod
|
||||
def _to_bytes(n: int) -> bytes:
|
||||
byte_len = (n.bit_length() + 7) // 8
|
||||
if byte_len % 2 != 0:
|
||||
byte_len += 1
|
||||
return n.to_bytes(byte_len, 'big')
|
||||
|
||||
def get_point(self, curve: Curve) -> Point:
|
||||
return Point(self.key.pointQ.x, self.key.pointQ.y, curve)
|
||||
|
||||
def private_bytes(self) -> bytes:
|
||||
return self._to_bytes(int(self.key.d))
|
||||
|
||||
def private_sha256_digest(self) -> bytes:
|
||||
hash_object = SHA256.new()
|
||||
hash_object.update(self.private_bytes())
|
||||
return hash_object.digest()
|
||||
|
||||
def public_bytes(self) -> bytes:
|
||||
return self._to_bytes(int(self.key.pointQ.x)) + self._to_bytes(int(self.key.pointQ.y))
|
||||
|
||||
def public_sha256_digest(self) -> bytes:
|
||||
hash_object = SHA256.new()
|
||||
hash_object.update(self.public_bytes())
|
||||
return hash_object.digest()
|
||||
36
modules/pyplayready/crypto/elgamal.py
Normal file
36
modules/pyplayready/crypto/elgamal.py
Normal file
@@ -0,0 +1,36 @@
|
||||
from typing import Tuple
|
||||
|
||||
from ecpy.curves import Curve, Point
|
||||
import secrets
|
||||
|
||||
|
||||
class ElGamal:
|
||||
def __init__(self, curve: Curve):
|
||||
self.curve = curve
|
||||
|
||||
@staticmethod
|
||||
def to_bytes(n: int) -> bytes:
|
||||
byte_len = (n.bit_length() + 7) // 8
|
||||
if byte_len % 2 != 0:
|
||||
byte_len += 1
|
||||
return n.to_bytes(byte_len, 'big')
|
||||
|
||||
def encrypt(
|
||||
self,
|
||||
message_point: Point,
|
||||
public_key: Point
|
||||
) -> Tuple[Point, Point]:
|
||||
ephemeral_key = secrets.randbelow(self.curve.order)
|
||||
point1 = ephemeral_key * self.curve.generator
|
||||
point2 = message_point + (ephemeral_key * public_key)
|
||||
return point1, point2
|
||||
|
||||
@staticmethod
|
||||
def decrypt(
|
||||
encrypted: Tuple[Point, Point],
|
||||
private_key: int
|
||||
) -> Point:
|
||||
point1, point2 = encrypted
|
||||
shared_secret = private_key * point1
|
||||
decrypted_message = point2 - shared_secret
|
||||
return decrypted_message
|
||||
136
modules/pyplayready/device.py
Normal file
136
modules/pyplayready/device.py
Normal file
@@ -0,0 +1,136 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
from enum import IntEnum
|
||||
from pathlib import Path
|
||||
from typing import Union, Any
|
||||
|
||||
from modules.pyplayready.bcert import CertificateChain
|
||||
from modules.pyplayready.crypto.ecc_key import ECCKey
|
||||
|
||||
from construct import Struct, Const, Int8ub, Bytes, this, Int32ub
|
||||
|
||||
|
||||
class DeviceStructs:
|
||||
magic = Const(b"PRD")
|
||||
|
||||
header = Struct(
|
||||
"signature" / magic,
|
||||
"version" / Int8ub,
|
||||
)
|
||||
|
||||
# was never in production
|
||||
v1 = Struct(
|
||||
"signature" / magic,
|
||||
"version" / Int8ub,
|
||||
"group_key_length" / Int32ub,
|
||||
"group_key" / Bytes(this.group_key_length),
|
||||
"group_certificate_length" / Int32ub,
|
||||
"group_certificate" / Bytes(this.group_certificate_length)
|
||||
)
|
||||
|
||||
v2 = Struct(
|
||||
"signature" / magic,
|
||||
"version" / Int8ub,
|
||||
"group_certificate_length" / Int32ub,
|
||||
"group_certificate" / Bytes(this.group_certificate_length),
|
||||
"encryption_key" / Bytes(96),
|
||||
"signing_key" / Bytes(96),
|
||||
)
|
||||
|
||||
v3 = Struct(
|
||||
"signature" / magic,
|
||||
"version" / Int8ub,
|
||||
"group_key" / Bytes(96),
|
||||
"encryption_key" / Bytes(96),
|
||||
"signing_key" / Bytes(96),
|
||||
"group_certificate_length" / Int32ub,
|
||||
"group_certificate" / Bytes(this.group_certificate_length),
|
||||
)
|
||||
|
||||
class Device:
|
||||
"""Represents a PlayReady Device (.prd)"""
|
||||
CURRENT_STRUCT = DeviceStructs.v3
|
||||
CURRENT_VERSION = 3
|
||||
|
||||
class SecurityLevel(IntEnum):
|
||||
SL150 = 150
|
||||
SL2000 = 2000
|
||||
SL3000 = 3000
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*_: Any,
|
||||
group_key: Union[str, bytes, None],
|
||||
encryption_key: Union[str, bytes],
|
||||
signing_key: Union[str, bytes],
|
||||
group_certificate: Union[str, bytes],
|
||||
**__: Any
|
||||
):
|
||||
if isinstance(group_key, str):
|
||||
group_key = base64.b64decode(group_key)
|
||||
|
||||
if isinstance(encryption_key, str):
|
||||
encryption_key = base64.b64decode(encryption_key)
|
||||
if not isinstance(encryption_key, bytes):
|
||||
raise ValueError(f"Expecting Bytes or Base64 input, got {encryption_key!r}")
|
||||
|
||||
if isinstance(signing_key, str):
|
||||
signing_key = base64.b64decode(signing_key)
|
||||
if not isinstance(signing_key, bytes):
|
||||
raise ValueError(f"Expecting Bytes or Base64 input, got {signing_key!r}")
|
||||
|
||||
if isinstance(group_certificate, str):
|
||||
group_certificate = base64.b64decode(group_certificate)
|
||||
if not isinstance(group_certificate, bytes):
|
||||
raise ValueError(f"Expecting Bytes or Base64 input, got {group_certificate!r}")
|
||||
|
||||
self.group_key = None if group_key is None else ECCKey.loads(group_key)
|
||||
self.encryption_key = ECCKey.loads(encryption_key)
|
||||
self.signing_key = ECCKey.loads(signing_key)
|
||||
self.group_certificate = CertificateChain.loads(group_certificate)
|
||||
self.security_level = self.group_certificate.get_security_level()
|
||||
|
||||
@classmethod
|
||||
def loads(cls, data: Union[str, bytes]) -> Device:
|
||||
if isinstance(data, str):
|
||||
data = base64.b64decode(data)
|
||||
if not isinstance(data, bytes):
|
||||
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
|
||||
|
||||
prd_header = DeviceStructs.header.parse(data)
|
||||
if prd_header.version == 2:
|
||||
return cls(
|
||||
group_key=None,
|
||||
**DeviceStructs.v2.parse(data)
|
||||
)
|
||||
|
||||
return cls(**cls.CURRENT_STRUCT.parse(data))
|
||||
|
||||
@classmethod
|
||||
def load(cls, path: Union[Path, str]) -> Device:
|
||||
if not isinstance(path, (Path, str)):
|
||||
raise ValueError(f"Expecting Path object or path string, got {path!r}")
|
||||
with Path(path).open(mode="rb") as f:
|
||||
return cls.loads(f.read())
|
||||
|
||||
def dumps(self) -> bytes:
|
||||
return self.CURRENT_STRUCT.build(dict(
|
||||
version=self.CURRENT_VERSION,
|
||||
group_key=self.group_key.dumps(),
|
||||
encryption_key=self.encryption_key.dumps(),
|
||||
signing_key=self.signing_key.dumps(),
|
||||
group_certificate_length=len(self.group_certificate.dumps()),
|
||||
group_certificate=self.group_certificate.dumps(),
|
||||
))
|
||||
|
||||
def dump(self, path: Union[Path, str]) -> None:
|
||||
if not isinstance(path, (Path, str)):
|
||||
raise ValueError(f"Expecting Path object or path string, got {path!r}")
|
||||
path = Path(path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_bytes(self.dumps())
|
||||
|
||||
def get_name(self) -> str:
|
||||
name = f"{self.group_certificate.get_name()}_sl{self.group_certificate.get_security_level()}"
|
||||
return ''.join(char for char in name if (char.isalnum() or char in '_- ')).strip().lower().replace(" ", "_")
|
||||
26
modules/pyplayready/exceptions.py
Normal file
26
modules/pyplayready/exceptions.py
Normal file
@@ -0,0 +1,26 @@
|
||||
class PyPlayreadyException(Exception):
|
||||
"""Exceptions used by pyplayready."""
|
||||
|
||||
|
||||
class InvalidPssh(PyPlayreadyException):
|
||||
"""The Playready PSSH is invalid or empty."""
|
||||
|
||||
|
||||
class InvalidInitData(PyPlayreadyException):
|
||||
"""The Playready Cenc Header Data is invalid or empty."""
|
||||
|
||||
|
||||
class DeviceMismatch(PyPlayreadyException):
|
||||
"""The Remote CDMs Device information and the APIs Device information did not match."""
|
||||
|
||||
|
||||
class InvalidLicense(PyPlayreadyException):
|
||||
"""Unable to parse XMR License."""
|
||||
|
||||
|
||||
class InvalidCertificateChain(PyPlayreadyException):
|
||||
"""The BCert is not correctly formatted."""
|
||||
|
||||
|
||||
class OutdatedDevice(PyPlayreadyException):
|
||||
"""The PlayReady Device is outdated and does not support a specific operation."""
|
||||
68
modules/pyplayready/key.py
Normal file
68
modules/pyplayready/key.py
Normal file
@@ -0,0 +1,68 @@
|
||||
import base64
|
||||
from enum import Enum
|
||||
from uuid import UUID
|
||||
from typing import Union
|
||||
|
||||
|
||||
class Key:
|
||||
class KeyType(Enum):
|
||||
INVALID = 0x0000
|
||||
AES_128_CTR = 0x0001
|
||||
RC4_CIPHER = 0x0002
|
||||
AES_128_ECB = 0x0003
|
||||
COCKTAIL = 0x0004
|
||||
AES_128_CBC = 0x0005
|
||||
KEYEXCHANGE = 0x0006
|
||||
UNKNOWN = 0xffff
|
||||
|
||||
@classmethod
|
||||
def _missing_(cls, value):
|
||||
return cls.UNKNOWN
|
||||
|
||||
class CipherType(Enum):
|
||||
INVALID = 0x0000
|
||||
RSA_1024 = 0x0001
|
||||
CHAINED_LICENSE = 0x0002
|
||||
ECC_256 = 0x0003
|
||||
ECC_256_WITH_KZ = 0x0004
|
||||
TEE_TRANSIENT = 0x0005
|
||||
ECC_256_VIA_SYMMETRIC = 0x0006
|
||||
UNKNOWN = 0xffff
|
||||
|
||||
@classmethod
|
||||
def _missing_(cls, value):
|
||||
return cls.UNKNOWN
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
key_id: UUID,
|
||||
key_type: int,
|
||||
cipher_type: int,
|
||||
key_length: int,
|
||||
key: bytes
|
||||
):
|
||||
self.key_id = key_id
|
||||
self.key_type = self.KeyType(key_type)
|
||||
self.cipher_type = self.CipherType(cipher_type)
|
||||
self.key_length = key_length
|
||||
self.key = key
|
||||
|
||||
@staticmethod
|
||||
def kid_to_uuid(kid: Union[str, bytes]) -> UUID:
|
||||
"""
|
||||
Convert a Key ID from a string or bytes to a UUID object.
|
||||
At first, this may seem very simple, but some types of Key IDs
|
||||
may not be 16 bytes and some may be decimal vs. hex.
|
||||
"""
|
||||
if isinstance(kid, str):
|
||||
kid = base64.b64decode(kid)
|
||||
if not kid:
|
||||
kid = b"\x00" * 16
|
||||
|
||||
if kid.decode(errors="replace").isdigit():
|
||||
return UUID(int=int(kid.decode()))
|
||||
|
||||
if len(kid) < 16:
|
||||
kid += b"\x00" * (16 - len(kid))
|
||||
|
||||
return UUID(bytes=kid)
|
||||
311
modules/pyplayready/main.py
Normal file
311
modules/pyplayready/main.py
Normal file
@@ -0,0 +1,311 @@
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import click
|
||||
import requests
|
||||
from Crypto.Random import get_random_bytes
|
||||
|
||||
from modules.pyplayready.system.bcert import CertificateChain, Certificate
|
||||
from modules.pyplayready.cdm import Cdm
|
||||
from modules.pyplayready.device import Device
|
||||
from modules.pyplayready.crypto.ecc_key import ECCKey
|
||||
from modules.pyplayready.exceptions import OutdatedDevice
|
||||
from modules.pyplayready.system.pssh import PSSH
|
||||
|
||||
@click.group(invoke_without_command=True)
|
||||
@click.option("-v", "--version", is_flag=True, default=False, help="Print version information.")
|
||||
@click.option("-d", "--debug", is_flag=True, default=False, help="Enable DEBUG level logs.")
|
||||
def main(version: bool, debug: bool) -> None:
|
||||
"""Python PlayReady CDM implementation"""
|
||||
logging.basicConfig(level=logging.DEBUG if debug else logging.INFO)
|
||||
log = logging.getLogger()
|
||||
|
||||
current_year = datetime.now().year
|
||||
copyright_years = f"2024-{current_year}"
|
||||
|
||||
log.info("pyplayready version %s Copyright (c) %s DevLARLEY, Erevoc, DevataDev", __version__, copyright_years)
|
||||
log.info("https://github.com/ready-dl/pyplayready")
|
||||
log.info("Run 'pyplayready --help' for help")
|
||||
if version:
|
||||
return
|
||||
|
||||
|
||||
@main.command(name="license")
|
||||
@click.argument("device_path", type=Path)
|
||||
@click.argument("pssh", type=PSSH)
|
||||
@click.argument("server", type=str)
|
||||
def license_(device_path: Path, pssh: PSSH, server: str) -> None:
|
||||
"""
|
||||
Make a License Request to a server using a given PSSH
|
||||
Will return a list of all keys within the returned license
|
||||
|
||||
Only works for standard license servers that don't use any license wrapping
|
||||
"""
|
||||
log = logging.getLogger("license")
|
||||
|
||||
device = Device.load(device_path)
|
||||
log.info(f"Loaded Device: {device.get_name()}")
|
||||
|
||||
cdm = Cdm.from_device(device)
|
||||
log.info("Loaded CDM")
|
||||
|
||||
session_id = cdm.open()
|
||||
log.info("Opened Session")
|
||||
|
||||
challenge = cdm.get_license_challenge(session_id, pssh.get_wrm_headers(downgrade_to_v4=True)[0])
|
||||
log.info("Created License Request (Challenge)")
|
||||
log.debug(challenge)
|
||||
|
||||
license_res = requests.post(
|
||||
url=server,
|
||||
headers={
|
||||
'Content-Type': 'text/xml; charset=UTF-8',
|
||||
},
|
||||
data=challenge
|
||||
)
|
||||
|
||||
if license_res.status_code != 200:
|
||||
log.error(f"Failed to send challenge: [{license_res.status_code}] {license_res.text}")
|
||||
return
|
||||
|
||||
licence = license_res.text
|
||||
log.info("Got License Message")
|
||||
log.debug(licence)
|
||||
|
||||
cdm.parse_license(session_id, licence)
|
||||
log.info("License Parsed Successfully")
|
||||
|
||||
for key in cdm.get_keys(session_id):
|
||||
log.info(f"{key.key_id.hex}:{key.key.hex()}")
|
||||
|
||||
cdm.close(session_id)
|
||||
log.info("Clossed Session")
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.argument("device", type=Path)
|
||||
@click.pass_context
|
||||
def test(ctx: click.Context, device: Path) -> None:
|
||||
"""
|
||||
Test the CDM code by getting Content Keys for the Tears Of Steel demo on the Playready Test Server.
|
||||
https://testweb.playready.microsoft.com/Content/Content2X
|
||||
+ DASH Manifest URL: https://test.playready.microsoft.com/media/profficialsite/tearsofsteel_4k.ism/manifest.mpd
|
||||
+ MSS Manifest URL: https://test.playready.microsoft.com/media/profficialsite/tearsofsteel_4k.ism.smoothstreaming/manifest
|
||||
|
||||
The device argument is a Path to a Playready Device (.prd) file which contains the device's group key and
|
||||
group certificate.
|
||||
"""
|
||||
pssh = PSSH(
|
||||
"AAADfHBzc2gAAAAAmgTweZhAQoarkuZb4IhflQAAA1xcAwAAAQABAFIDPABXAFIATQBIAEUAQQBEAEUAUgAgAHgAbQBsAG4AcwA9ACIAaAB0AH"
|
||||
"QAcAA6AC8ALwBzAGMAaABlAG0AYQBzAC4AbQBpAGMAcgBvAHMAbwBmAHQALgBjAG8AbQAvAEQAUgBNAC8AMgAwADAANwAvADAAMwAvAFAAbABh"
|
||||
"AHkAUgBlAGEAZAB5AEgAZQBhAGQAZQByACIAIAB2AGUAcgBzAGkAbwBuAD0AIgA0AC4AMAAuADAALgAwACIAPgA8AEQAQQBUAEEAPgA8AFAAUg"
|
||||
"BPAFQARQBDAFQASQBOAEYATwA+ADwASwBFAFkATABFAE4APgAxADYAPAAvAEsARQBZAEwARQBOAD4APABBAEwARwBJAEQAPgBBAEUAUwBDAFQA"
|
||||
"UgA8AC8AQQBMAEcASQBEAD4APAAvAFAAUgBPAFQARQBDAFQASQBOAEYATwA+ADwASwBJAEQAPgA0AFIAcABsAGIAKwBUAGIATgBFAFMAOAB0AE"
|
||||
"cAawBOAEYAVwBUAEUASABBAD0APQA8AC8ASwBJAEQAPgA8AEMASABFAEMASwBTAFUATQA+AEsATABqADMAUQB6AFEAUAAvAE4AQQA9ADwALwBD"
|
||||
"AEgARQBDAEsAUwBVAE0APgA8AEwAQQBfAFUAUgBMAD4AaAB0AHQAcABzADoALwAvAHAAcgBvAGYAZgBpAGMAaQBhAGwAcwBpAHQAZQAuAGsAZQ"
|
||||
"B5AGQAZQBsAGkAdgBlAHIAeQAuAG0AZQBkAGkAYQBzAGUAcgB2AGkAYwBlAHMALgB3AGkAbgBkAG8AdwBzAC4AbgBlAHQALwBQAGwAYQB5AFIA"
|
||||
"ZQBhAGQAeQAvADwALwBMAEEAXwBVAFIATAA+ADwAQwBVAFMAVABPAE0AQQBUAFQAUgBJAEIAVQBUAEUAUwA+ADwASQBJAFMAXwBEAFIATQBfAF"
|
||||
"YARQBSAFMASQBPAE4APgA4AC4AMQAuADIAMwAwADQALgAzADEAPAAvAEkASQBTAF8ARABSAE0AXwBWAEUAUgBTAEkATwBOAD4APAAvAEMAVQBT"
|
||||
"AFQATwBNAEEAVABUAFIASQBCAFUAVABFAFMAPgA8AC8ARABBAFQAQQA+ADwALwBXAFIATQBIAEUAQQBEAEUAUgA+AA=="
|
||||
)
|
||||
|
||||
license_server = "https://test.playready.microsoft.com/service/rightsmanager.asmx?cfg=(persist:false,sl:2000)"
|
||||
|
||||
ctx.invoke(
|
||||
license_,
|
||||
device_path=device,
|
||||
pssh=pssh,
|
||||
server=license_server
|
||||
)
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.option("-k", "--group_key", type=Path, required=True, help="Device ECC private group key")
|
||||
@click.option("-c", "--group_certificate", type=Path, required=True, help="Device group certificate chain")
|
||||
@click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory")
|
||||
@click.pass_context
|
||||
def create_device(
|
||||
ctx: click.Context,
|
||||
group_key: Path,
|
||||
group_certificate: Path,
|
||||
output: Optional[Path] = None
|
||||
) -> None:
|
||||
"""Create a Playready Device (.prd) file from an ECC private group key and group certificate chain"""
|
||||
if not group_key.is_file():
|
||||
raise click.UsageError("group_key: Not a path to a file, or it doesn't exist.", ctx)
|
||||
if not group_certificate.is_file():
|
||||
raise click.UsageError("group_certificate: Not a path to a file, or it doesn't exist.", ctx)
|
||||
|
||||
log = logging.getLogger("create-device")
|
||||
|
||||
encryption_key = ECCKey.generate()
|
||||
signing_key = ECCKey.generate()
|
||||
|
||||
group_key = ECCKey.load(group_key)
|
||||
certificate_chain = CertificateChain.load(group_certificate)
|
||||
|
||||
new_certificate = Certificate.new_leaf_cert(
|
||||
cert_id=get_random_bytes(16),
|
||||
security_level=certificate_chain.get_security_level(),
|
||||
client_id=get_random_bytes(16),
|
||||
signing_key=signing_key,
|
||||
encryption_key=encryption_key,
|
||||
group_key=group_key,
|
||||
parent=certificate_chain
|
||||
)
|
||||
certificate_chain.prepend(new_certificate)
|
||||
|
||||
device = Device(
|
||||
group_key=group_key.dumps(),
|
||||
encryption_key=encryption_key.dumps(),
|
||||
signing_key=signing_key.dumps(),
|
||||
group_certificate=certificate_chain.dumps(),
|
||||
)
|
||||
|
||||
if output and output.suffix:
|
||||
if output.suffix.lower() != ".prd":
|
||||
log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.")
|
||||
out_path = output
|
||||
else:
|
||||
out_dir = output or Path.cwd()
|
||||
out_path = out_dir / f"{device.get_name()}.prd"
|
||||
|
||||
if out_path.exists():
|
||||
log.error(f"A file already exists at the path '{out_path}', cannot overwrite.")
|
||||
return
|
||||
|
||||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
out_path.write_bytes(device.dumps())
|
||||
|
||||
log.info("Created Playready Device (.prd) file, %s", out_path.name)
|
||||
log.info(" + Security Level: %s", device.security_level)
|
||||
log.info(" + Group Key: %s bytes", len(device.group_key.dumps()))
|
||||
log.info(" + Encryption Key: %s bytes", len(device.encryption_key.dumps()))
|
||||
log.info(" + Signing Key: %s bytes", len(device.signing_key.dumps()))
|
||||
log.info(" + Group Certificate: %s bytes", len(device.group_certificate.dumps()))
|
||||
log.info(" + Saved to: %s", out_path.absolute())
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.argument("prd_path", type=Path)
|
||||
@click.option("-o", "--output", type=Path, default=None, help="Output Path or Directory")
|
||||
@click.pass_context
|
||||
def reprovision_device(ctx: click.Context, prd_path: Path, output: Optional[Path] = None) -> None:
|
||||
"""
|
||||
Reprovision a Playready Device (.prd) by creating a new leaf certificate and new encryption/signing keys.
|
||||
Will override the device if an output path or directory is not specified
|
||||
|
||||
Only works on PRD Devices of v3 or higher
|
||||
"""
|
||||
if not prd_path.is_file():
|
||||
raise click.UsageError("prd_path: Not a path to a file, or it doesn't exist.", ctx)
|
||||
|
||||
log = logging.getLogger("reprovision-device")
|
||||
log.info("Reprovisioning Playready Device (.prd) file, %s", prd_path.name)
|
||||
|
||||
device = Device.load(prd_path)
|
||||
|
||||
if device.group_key is None:
|
||||
raise OutdatedDevice("Device does not support reprovisioning, re-create it or use a Device with a version of 3 or higher")
|
||||
|
||||
device.group_certificate.remove(0)
|
||||
|
||||
encryption_key = ECCKey.generate()
|
||||
signing_key = ECCKey.generate()
|
||||
|
||||
device.encryption_key = encryption_key
|
||||
device.signing_key = signing_key
|
||||
|
||||
new_certificate = Certificate.new_leaf_cert(
|
||||
cert_id=get_random_bytes(16),
|
||||
security_level=device.group_certificate.get_security_level(),
|
||||
client_id=get_random_bytes(16),
|
||||
signing_key=signing_key,
|
||||
encryption_key=encryption_key,
|
||||
group_key=device.group_key,
|
||||
parent=device.group_certificate
|
||||
)
|
||||
device.group_certificate.prepend(new_certificate)
|
||||
|
||||
if output and output.suffix:
|
||||
if output.suffix.lower() != ".prd":
|
||||
log.warning(f"Saving PRD with the file extension '{output.suffix}' but '.prd' is recommended.")
|
||||
out_path = output
|
||||
else:
|
||||
out_path = prd_path
|
||||
|
||||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
out_path.write_bytes(device.dumps())
|
||||
|
||||
log.info("Reprovisioned Playready Device (.prd) file, %s", out_path.name)
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.argument("prd_path", type=Path)
|
||||
@click.option("-o", "--out_dir", type=Path, default=None, help="Output Directory")
|
||||
@click.pass_context
|
||||
def export_device(ctx: click.Context, prd_path: Path, out_dir: Optional[Path] = None) -> None:
|
||||
"""
|
||||
Export a Playready Device (.prd) file to a Group Key and Group Certificate
|
||||
If an output directory is not specified, it will be stored in the current working directory
|
||||
"""
|
||||
if not prd_path.is_file():
|
||||
raise click.UsageError("prd_path: Not a path to a file, or it doesn't exist.", ctx)
|
||||
|
||||
log = logging.getLogger("export-device")
|
||||
log.info("Exporting Playready Device (.prd) file, %s", prd_path.stem)
|
||||
|
||||
if not out_dir:
|
||||
out_dir = Path.cwd()
|
||||
|
||||
out_path = out_dir / prd_path.stem
|
||||
if out_path.exists():
|
||||
if any(out_path.iterdir()):
|
||||
log.error("Output directory is not empty, cannot overwrite.")
|
||||
return
|
||||
else:
|
||||
log.warning("Output directory already exists, but is empty.")
|
||||
else:
|
||||
out_path.mkdir(parents=True)
|
||||
|
||||
device = Device.load(prd_path)
|
||||
|
||||
log.info(f"SL{device.security_level} {device.get_name()}")
|
||||
log.info(f"Saving to: {out_path}")
|
||||
|
||||
if device.group_key:
|
||||
group_key_path = out_path / "zgpriv.dat"
|
||||
group_key_path.write_bytes(device.group_key.dumps(private_only=True))
|
||||
log.info("Exported Group Key as zgpriv.dat")
|
||||
else:
|
||||
log.warning("Cannot export zgpriv.dat, as v2 devices do not save the group key")
|
||||
|
||||
# remove leaf cert to unprovision it
|
||||
device.group_certificate.remove(0)
|
||||
|
||||
client_id_path = out_path / "bgroupcert.dat"
|
||||
client_id_path.write_bytes(device.group_certificate.dumps())
|
||||
log.info("Exported Group Certificate to bgroupcert.dat")
|
||||
|
||||
|
||||
@main.command("serve", short_help="Serve your local CDM and Playready Devices Remotely.")
|
||||
@click.argument("config_path", type=Path)
|
||||
@click.option("-h", "--host", type=str, default="127.0.0.1", help="Host to serve from.")
|
||||
@click.option("-p", "--port", type=int, default=7723, help="Port to serve from.")
|
||||
def serve_(config_path: Path, host: str, port: int) -> None:
|
||||
"""
|
||||
Serve your local CDM and Playready Devices Remotely.
|
||||
|
||||
[CONFIG] is a path to a serve config file.
|
||||
See `serve.example.yml` for an example config file.
|
||||
|
||||
Host as 127.0.0.1 may block remote access even if port-forwarded.
|
||||
Instead, use 0.0.0.0 and ensure the TCP port you choose is forwarded.
|
||||
"""
|
||||
from pyplayready.remote import serve
|
||||
import yaml
|
||||
|
||||
config = yaml.safe_load(config_path.read_text(encoding="utf8"))
|
||||
serve.run(config, host, port)
|
||||
98
modules/pyplayready/pssh.py
Normal file
98
modules/pyplayready/pssh.py
Normal file
@@ -0,0 +1,98 @@
|
||||
import base64
|
||||
from typing import Union, List
|
||||
from uuid import UUID
|
||||
|
||||
from construct import Struct, Int32ul, Int16ul, Array, this, Bytes, Switch, Int32ub, Const, Container, ConstructError
|
||||
|
||||
from modules.pyplayready.exceptions import InvalidPssh
|
||||
from modules.pyplayready.wrmheader import WRMHeader
|
||||
|
||||
|
||||
class _PlayreadyPSSHStructs:
|
||||
PSSHBox = Struct(
|
||||
"length" / Int32ub,
|
||||
"pssh" / Const(b"pssh"),
|
||||
"fullbox" / Int32ub,
|
||||
"system_id" / Bytes(16),
|
||||
"data_length" / Int32ub,
|
||||
"data" / Bytes(this.data_length)
|
||||
)
|
||||
|
||||
PlayreadyObject = Struct(
|
||||
"type" / Int16ul,
|
||||
"length" / Int16ul,
|
||||
"data" / Switch(
|
||||
this.type,
|
||||
{
|
||||
1: Bytes(this.length)
|
||||
},
|
||||
default=Bytes(this.length)
|
||||
)
|
||||
)
|
||||
|
||||
PlayreadyHeader = Struct(
|
||||
"length" / Int32ul,
|
||||
"record_count" / Int16ul,
|
||||
"records" / Array(this.record_count, PlayreadyObject)
|
||||
)
|
||||
|
||||
|
||||
class PSSH(_PlayreadyPSSHStructs):
|
||||
"""Represents a PlayReady PSSH"""
|
||||
|
||||
SYSTEM_ID = UUID(hex="9a04f07998404286ab92e65be0885f95")
|
||||
|
||||
def __init__(self, data: Union[str, bytes]):
|
||||
"""Load a PSSH Box, PlayReady Header or PlayReady Object"""
|
||||
|
||||
if not data:
|
||||
raise InvalidPssh("Data must not be empty")
|
||||
|
||||
if isinstance(data, str):
|
||||
try:
|
||||
data = base64.b64decode(data)
|
||||
except Exception as e:
|
||||
raise InvalidPssh(f"Could not decode data as Base64, {e}")
|
||||
|
||||
self.wrm_headers: List[WRMHeader]
|
||||
try:
|
||||
# PSSH Box -> PlayReady Header
|
||||
box = self.PSSHBox.parse(data)
|
||||
prh = self.PlayreadyHeader.parse(box.data)
|
||||
self.wrm_headers = self._read_playready_objects(prh)
|
||||
except ConstructError:
|
||||
if int.from_bytes(data[:2], byteorder="little") > 3:
|
||||
try:
|
||||
# PlayReady Header
|
||||
prh = self.PlayreadyHeader.parse(data)
|
||||
self.wrm_headers = self._read_playready_objects(prh)
|
||||
except ConstructError:
|
||||
raise InvalidPssh("Could not parse data as a PSSH Box nor a PlayReady Header")
|
||||
else:
|
||||
try:
|
||||
# PlayReady Object
|
||||
pro = self.PlayreadyObject.parse(data)
|
||||
self.wrm_headers = [WRMHeader(pro.data)]
|
||||
except ConstructError:
|
||||
raise InvalidPssh("Could not parse data as a PSSH Box nor a PlayReady Object")
|
||||
|
||||
@staticmethod
|
||||
def _read_playready_objects(header: Container) -> List[WRMHeader]:
|
||||
return list(map(
|
||||
lambda pro: WRMHeader(pro.data),
|
||||
filter(
|
||||
lambda pro: pro.type == 1,
|
||||
header.records
|
||||
)
|
||||
))
|
||||
|
||||
def get_wrm_headers(self, downgrade_to_v4: bool = False) -> List[str]:
|
||||
"""
|
||||
Return a list of all WRM Headers in the PSSH as plaintext strings
|
||||
|
||||
downgrade_to_v4: Downgrade the WRM Header to version 4.0.0.0 to use AES-CBC instead of AES-CTR
|
||||
"""
|
||||
return list(map(
|
||||
lambda wrm_header: wrm_header.to_v4_0_0_0() if downgrade_to_v4 else wrm_header.dumps(),
|
||||
self.wrm_headers
|
||||
))
|
||||
201
modules/pyplayready/wrmheader.py
Normal file
201
modules/pyplayready/wrmheader.py
Normal file
@@ -0,0 +1,201 @@
|
||||
import base64
|
||||
import binascii
|
||||
from enum import Enum
|
||||
from typing import Optional, List, Union, Tuple
|
||||
|
||||
import xmltodict
|
||||
|
||||
|
||||
class WRMHeader:
|
||||
"""Represents a PlayReady WRM Header"""
|
||||
|
||||
class SignedKeyID:
|
||||
def __init__(
|
||||
self,
|
||||
alg_id: str,
|
||||
value: str,
|
||||
checksum: str
|
||||
):
|
||||
self.alg_id = alg_id
|
||||
self.value = value
|
||||
self.checksum = checksum
|
||||
|
||||
def __repr__(self):
|
||||
return f'SignedKeyID(alg_id={self.alg_id}, value="{self.value}", checksum="{self.checksum}")'
|
||||
|
||||
class Version(Enum):
|
||||
VERSION_4_0_0_0 = "4.0.0.0"
|
||||
VERSION_4_1_0_0 = "4.1.0.0"
|
||||
VERSION_4_2_0_0 = "4.2.0.0"
|
||||
VERSION_4_3_0_0 = "4.3.0.0"
|
||||
UNKNOWN = "UNKNOWN"
|
||||
|
||||
@classmethod
|
||||
def _missing_(cls, value):
|
||||
return cls.UNKNOWN
|
||||
|
||||
_RETURN_STRUCTURE = Tuple[List[SignedKeyID], Union[str, None], Union[str, None], Union[str, None]]
|
||||
|
||||
def __init__(self, data: Union[str, bytes]):
|
||||
"""Load a WRM Header from either a string, base64 encoded data or bytes"""
|
||||
|
||||
if not data:
|
||||
raise ValueError("Data must not be empty")
|
||||
|
||||
if isinstance(data, str):
|
||||
try:
|
||||
data = base64.b64decode(data).decode()
|
||||
except (binascii.Error, binascii.Incomplete):
|
||||
data = data.encode()
|
||||
|
||||
self._raw_data: bytes = data
|
||||
self._parsed = xmltodict.parse(self._raw_data)
|
||||
|
||||
self._header = self._parsed.get('WRMHEADER')
|
||||
if not self._header:
|
||||
raise ValueError("Data is not a valid WRMHEADER")
|
||||
|
||||
self.version = self.Version(self._header.get('@version'))
|
||||
|
||||
@staticmethod
|
||||
def _ensure_list(element: Union[dict, list]) -> List:
|
||||
if isinstance(element, dict):
|
||||
return [element]
|
||||
return element
|
||||
|
||||
def to_v4_0_0_0(self) -> str:
|
||||
"""
|
||||
Build a v4.0.0.0 WRM header from any possible WRM Header version
|
||||
|
||||
Note: Will ignore any remaining Key IDs if there's more than just one
|
||||
"""
|
||||
return self._build_v4_0_0_0_wrm_header(*self.read_attributes())
|
||||
|
||||
@staticmethod
|
||||
def _read_v4_0_0_0(data: dict) -> _RETURN_STRUCTURE:
|
||||
protect_info = data.get("PROTECTINFO")
|
||||
|
||||
return (
|
||||
[WRMHeader.SignedKeyID(
|
||||
alg_id=protect_info["ALGID"],
|
||||
value=data["KID"],
|
||||
checksum=data.get("CHECKSUM")
|
||||
)],
|
||||
data.get("LA_URL"),
|
||||
data.get("LUI_URL"),
|
||||
data.get("DS_ID")
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _read_v4_1_0_0(data: dict) -> _RETURN_STRUCTURE:
|
||||
protect_info = data.get("PROTECTINFO")
|
||||
|
||||
key_ids = []
|
||||
if protect_info:
|
||||
kid = protect_info["KID"]
|
||||
if kid:
|
||||
key_ids = [WRMHeader.SignedKeyID(
|
||||
alg_id=kid["@ALGID"],
|
||||
value=kid["@VALUE"],
|
||||
checksum=kid.get("@CHECKSUM")
|
||||
)]
|
||||
|
||||
return (
|
||||
key_ids,
|
||||
data.get("LA_URL"),
|
||||
data.get("LUI_URL"),
|
||||
data.get("DS_ID")
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _read_v4_2_0_0(data: dict) -> _RETURN_STRUCTURE:
|
||||
protect_info = data.get("PROTECTINFO")
|
||||
|
||||
key_ids = []
|
||||
if protect_info:
|
||||
kids = protect_info["KIDS"]
|
||||
if kids:
|
||||
for kid in WRMHeader._ensure_list(kids["KID"]):
|
||||
key_ids.append(WRMHeader.SignedKeyID(
|
||||
alg_id=kid["@ALGID"],
|
||||
value=kid["@VALUE"],
|
||||
checksum=kid.get("@CHECKSUM")
|
||||
))
|
||||
|
||||
return (
|
||||
key_ids,
|
||||
data.get("LA_URL"),
|
||||
data.get("LUI_URL"),
|
||||
data.get("DS_ID")
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _read_v4_3_0_0(data: dict) -> _RETURN_STRUCTURE:
|
||||
protect_info = data.get("PROTECTINFO")
|
||||
|
||||
key_ids = []
|
||||
if protect_info:
|
||||
kids = protect_info["KIDS"]
|
||||
for kid in WRMHeader._ensure_list(kids["KID"]):
|
||||
key_ids.append(WRMHeader.SignedKeyID(
|
||||
alg_id=kid.get("@ALGID"),
|
||||
value=kid["@VALUE"],
|
||||
checksum=kid.get("@CHECKSUM")
|
||||
))
|
||||
|
||||
return (
|
||||
key_ids,
|
||||
data.get("LA_URL"),
|
||||
data.get("LUI_URL"),
|
||||
data.get("DS_ID")
|
||||
)
|
||||
|
||||
def read_attributes(self) -> _RETURN_STRUCTURE:
|
||||
"""
|
||||
Read any non-custom XML attributes
|
||||
|
||||
Returns a tuple structured like this: Tuple[List[SignedKeyID], <LA_URL>, <LUI_URL>, <DS_ID>]
|
||||
"""
|
||||
|
||||
data = self._header.get("DATA")
|
||||
if not data:
|
||||
raise ValueError("Not a valid PlayReady Header Record, WRMHEADER/DATA required")
|
||||
|
||||
if self.version == self.Version.VERSION_4_0_0_0:
|
||||
return self._read_v4_0_0_0(data)
|
||||
elif self.version == self.Version.VERSION_4_1_0_0:
|
||||
return self._read_v4_1_0_0(data)
|
||||
elif self.version == self.Version.VERSION_4_2_0_0:
|
||||
return self._read_v4_2_0_0(data)
|
||||
elif self.version == self.Version.VERSION_4_3_0_0:
|
||||
return self._read_v4_3_0_0(data)
|
||||
|
||||
@staticmethod
|
||||
def _build_v4_0_0_0_wrm_header(
|
||||
key_ids: List[SignedKeyID],
|
||||
la_url: Optional[str],
|
||||
lui_url: Optional[str],
|
||||
ds_id: Optional[str]
|
||||
) -> str:
|
||||
if len(key_ids) == 0:
|
||||
raise Exception("No Key IDs available")
|
||||
|
||||
key_id = key_ids[0]
|
||||
return (
|
||||
'<WRMHEADER xmlns="http://schemas.microsoft.com/DRM/2007/03/PlayReadyHeader" version="4.0.0.0">'
|
||||
'<DATA>'
|
||||
'<PROTECTINFO>'
|
||||
'<KEYLEN>16</KEYLEN>'
|
||||
'<ALGID>AESCTR</ALGID>'
|
||||
'</PROTECTINFO>'
|
||||
f'<KID>{key_id.value}</KID>' +
|
||||
(f'<LA_URL>{la_url}</LA_URL>' if la_url else '') +
|
||||
(f'<LUI_URL>{lui_url}</LUI_URL>' if lui_url else '') +
|
||||
(f'<DS_ID>{ds_id}</DS_ID>' if ds_id else '') +
|
||||
(f'<CHECKSUM>{key_id.checksum}</CHECKSUM>' if key_id.checksum else '') +
|
||||
'</DATA>'
|
||||
'</WRMHEADER>'
|
||||
)
|
||||
|
||||
def dumps(self) -> str:
|
||||
return self._raw_data.decode("utf-16-le")
|
||||
18
modules/pyplayready/xml_key.py
Normal file
18
modules/pyplayready/xml_key.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from ecpy.curves import Point, Curve
|
||||
|
||||
from modules.pyplayready.crypto.ecc_key import ECCKey
|
||||
from modules.pyplayready.crypto.elgamal import ElGamal
|
||||
|
||||
|
||||
class XmlKey:
|
||||
def __init__(self):
|
||||
self._shared_point = ECCKey.generate()
|
||||
self.shared_key_x = self._shared_point.key.pointQ.x
|
||||
self.shared_key_y = self._shared_point.key.pointQ.y
|
||||
|
||||
self._shared_key_x_bytes = ElGamal.to_bytes(int(self.shared_key_x))
|
||||
self.aes_iv = self._shared_key_x_bytes[:16]
|
||||
self.aes_key = self._shared_key_x_bytes[16:]
|
||||
|
||||
def get_point(self, curve: Curve) -> Point:
|
||||
return Point(self.shared_key_x, self.shared_key_y, curve)
|
||||
252
modules/pyplayready/xmrlicense.py
Normal file
252
modules/pyplayready/xmrlicense.py
Normal file
@@ -0,0 +1,252 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
from pathlib import Path
|
||||
from typing import Union
|
||||
|
||||
from construct import Const, GreedyRange, Struct, Int32ub, Bytes, Int16ub, this, Switch, LazyBound, Array, Container
|
||||
|
||||
|
||||
class _XMRLicenseStructs:
|
||||
PlayEnablerType = Struct(
|
||||
"player_enabler_type" / Bytes(16)
|
||||
)
|
||||
|
||||
DomainRestrictionObject = Struct(
|
||||
"account_id" / Bytes(16),
|
||||
"revision" / Int32ub
|
||||
)
|
||||
|
||||
IssueDateObject = Struct(
|
||||
"issue_date" / Int32ub
|
||||
)
|
||||
|
||||
RevInfoVersionObject = Struct(
|
||||
"sequence" / Int32ub
|
||||
)
|
||||
|
||||
SecurityLevelObject = Struct(
|
||||
"minimum_security_level" / Int16ub
|
||||
)
|
||||
|
||||
EmbeddedLicenseSettingsObject = Struct(
|
||||
"indicator" / Int16ub
|
||||
)
|
||||
|
||||
ECCKeyObject = Struct(
|
||||
"curve_type" / Int16ub,
|
||||
"key_length" / Int16ub,
|
||||
"key" / Bytes(this.key_length)
|
||||
)
|
||||
|
||||
SignatureObject = Struct(
|
||||
"signature_type" / Int16ub,
|
||||
"signature_data_length" / Int16ub,
|
||||
"signature_data" / Bytes(this.signature_data_length)
|
||||
)
|
||||
|
||||
ContentKeyObject = Struct(
|
||||
"key_id" / Bytes(16),
|
||||
"key_type" / Int16ub,
|
||||
"cipher_type" / Int16ub,
|
||||
"key_length" / Int16ub,
|
||||
"encrypted_key" / Bytes(this.key_length)
|
||||
)
|
||||
|
||||
RightsSettingsObject = Struct(
|
||||
"rights" / Int16ub
|
||||
)
|
||||
|
||||
OutputProtectionLevelRestrictionObject = Struct(
|
||||
"minimum_compressed_digital_video_opl" / Int16ub,
|
||||
"minimum_uncompressed_digital_video_opl" / Int16ub,
|
||||
"minimum_analog_video_opl" / Int16ub,
|
||||
"minimum_digital_compressed_audio_opl" / Int16ub,
|
||||
"minimum_digital_uncompressed_audio_opl" / Int16ub,
|
||||
)
|
||||
|
||||
ExpirationRestrictionObject = Struct(
|
||||
"begin_date" / Int32ub,
|
||||
"end_date" / Int32ub
|
||||
)
|
||||
|
||||
RemovalDateObject = Struct(
|
||||
"removal_date" / Int32ub
|
||||
)
|
||||
|
||||
UplinkKIDObject = Struct(
|
||||
"uplink_kid" / Bytes(16),
|
||||
"chained_checksum_type" / Int16ub,
|
||||
"chained_checksum_length" / Int16ub,
|
||||
"chained_checksum" / Bytes(this.chained_checksum_length)
|
||||
)
|
||||
|
||||
AnalogVideoOutputConfigurationRestriction = Struct(
|
||||
"video_output_protection_id" / Bytes(16),
|
||||
"binary_configuration_data" / Bytes(this._.length - 24)
|
||||
)
|
||||
|
||||
DigitalVideoOutputRestrictionObject = Struct(
|
||||
"video_output_protection_id" / Bytes(16),
|
||||
"binary_configuration_data" / Bytes(this._.length - 24)
|
||||
)
|
||||
|
||||
DigitalAudioOutputRestrictionObject = Struct(
|
||||
"audio_output_protection_id" / Bytes(16),
|
||||
"binary_configuration_data" / Bytes(this._.length - 24)
|
||||
)
|
||||
|
||||
PolicyMetadataObject = Struct(
|
||||
"metadata_type" / Bytes(16),
|
||||
"policy_data" / Bytes(this._.length)
|
||||
)
|
||||
|
||||
SecureStopRestrictionObject = Struct(
|
||||
"metering_id" / Bytes(16)
|
||||
)
|
||||
|
||||
MeteringRestrictionObject = Struct(
|
||||
"metering_id" / Bytes(16)
|
||||
)
|
||||
|
||||
ExpirationAfterFirstPlayRestrictionObject = Struct(
|
||||
"seconds" / Int32ub
|
||||
)
|
||||
|
||||
GracePeriodObject = Struct(
|
||||
"grace_period" / Int32ub
|
||||
)
|
||||
|
||||
SourceIdObject = Struct(
|
||||
"source_id" / Int32ub
|
||||
)
|
||||
|
||||
AuxiliaryKey = Struct(
|
||||
"location" / Int32ub,
|
||||
"key" / Bytes(16)
|
||||
)
|
||||
|
||||
AuxiliaryKeysObject = Struct(
|
||||
"count" / Int16ub,
|
||||
"auxiliary_keys" / Array(this.count, AuxiliaryKey)
|
||||
)
|
||||
|
||||
UplinkKeyObject3 = Struct(
|
||||
"uplink_key_id" / Bytes(16),
|
||||
"chained_length" / Int16ub,
|
||||
"checksum" / Bytes(this.chained_length),
|
||||
"count" / Int16ub,
|
||||
"entries" / Array(this.count, Int32ub)
|
||||
)
|
||||
|
||||
CopyEnablerObject = Struct(
|
||||
"copy_enabler_type" / Bytes(16)
|
||||
)
|
||||
|
||||
CopyCountRestrictionObject = Struct(
|
||||
"count" / Int32ub
|
||||
)
|
||||
|
||||
MoveObject = Struct(
|
||||
"minimum_move_protection_level" / Int32ub
|
||||
)
|
||||
|
||||
XmrObject = Struct(
|
||||
"flags" / Int16ub,
|
||||
"type" / Int16ub,
|
||||
"length" / Int32ub,
|
||||
"data" / Switch(
|
||||
lambda ctx: ctx.type,
|
||||
{
|
||||
0x0005: OutputProtectionLevelRestrictionObject,
|
||||
0x0008: AnalogVideoOutputConfigurationRestriction,
|
||||
0x000a: ContentKeyObject,
|
||||
0x000b: SignatureObject,
|
||||
0x000d: RightsSettingsObject,
|
||||
0x0012: ExpirationRestrictionObject,
|
||||
0x0013: IssueDateObject,
|
||||
0x0016: MeteringRestrictionObject,
|
||||
0x001a: GracePeriodObject,
|
||||
0x0022: SourceIdObject,
|
||||
0x002a: ECCKeyObject,
|
||||
0x002c: PolicyMetadataObject,
|
||||
0x0029: DomainRestrictionObject,
|
||||
0x0030: ExpirationAfterFirstPlayRestrictionObject,
|
||||
0x0031: DigitalAudioOutputRestrictionObject,
|
||||
0x0032: RevInfoVersionObject,
|
||||
0x0033: EmbeddedLicenseSettingsObject,
|
||||
0x0034: SecurityLevelObject,
|
||||
0x0037: MoveObject,
|
||||
0x0039: PlayEnablerType,
|
||||
0x003a: CopyEnablerObject,
|
||||
0x003b: UplinkKIDObject,
|
||||
0x003d: CopyCountRestrictionObject,
|
||||
0x0050: RemovalDateObject,
|
||||
0x0051: AuxiliaryKeysObject,
|
||||
0x0052: UplinkKeyObject3,
|
||||
0x005a: SecureStopRestrictionObject,
|
||||
0x0059: DigitalVideoOutputRestrictionObject
|
||||
},
|
||||
default=LazyBound(lambda ctx: _XMRLicenseStructs.XmrObject)
|
||||
)
|
||||
)
|
||||
|
||||
XmrLicense = Struct(
|
||||
"signature" / Const(b"XMR\x00"),
|
||||
"xmr_version" / Int32ub,
|
||||
"rights_id" / Bytes(16),
|
||||
"containers" / GreedyRange(XmrObject)
|
||||
)
|
||||
|
||||
|
||||
class XMRLicense(_XMRLicenseStructs):
|
||||
"""Represents an XMRLicense"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
parsed_license: Container,
|
||||
license_obj: _XMRLicenseStructs.XmrLicense = _XMRLicenseStructs.XmrLicense
|
||||
):
|
||||
self.parsed = parsed_license
|
||||
self._license_obj = license_obj
|
||||
|
||||
@classmethod
|
||||
def loads(cls, data: Union[str, bytes]) -> XMRLicense:
|
||||
if isinstance(data, str):
|
||||
data = base64.b64decode(data)
|
||||
if not isinstance(data, bytes):
|
||||
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
|
||||
|
||||
licence = _XMRLicenseStructs.XmrLicense
|
||||
return cls(
|
||||
parsed_license=licence.parse(data),
|
||||
license_obj=licence
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def load(cls, path: Union[Path, str]) -> XMRLicense:
|
||||
if not isinstance(path, (Path, str)):
|
||||
raise ValueError(f"Expecting Path object or path string, got {path!r}")
|
||||
with Path(path).open(mode="rb") as f:
|
||||
return cls.loads(f.read())
|
||||
|
||||
def dumps(self) -> bytes:
|
||||
return self._license_obj.build(self.parsed)
|
||||
|
||||
def struct(self) -> _XMRLicenseStructs.XmrLicense:
|
||||
return self._license_obj
|
||||
|
||||
def _locate(self, container: Container):
|
||||
if container.flags == 2 or container.flags == 3:
|
||||
return self._locate(container.data)
|
||||
else:
|
||||
return container
|
||||
|
||||
def get_object(self, type_: int):
|
||||
for obj in self.parsed.containers:
|
||||
container = self._locate(obj)
|
||||
if container.type == type_:
|
||||
yield container.data
|
||||
|
||||
def get_content_keys(self):
|
||||
yield from self.get_object(10)
|
||||
36
requirements.txt
Normal file
36
requirements.txt
Normal file
@@ -0,0 +1,36 @@
|
||||
aiohappyeyeballs==2.4.4
|
||||
aiohttp==3.11.11
|
||||
aiosignal==1.3.2
|
||||
async-timeout==5.0.1
|
||||
attrs==25.1.0
|
||||
blinker==1.9.0
|
||||
certifi==2025.1.31
|
||||
charset-normalizer==3.4.1
|
||||
click==8.1.8
|
||||
colorama==0.4.6
|
||||
coloredlogs==15.0.1
|
||||
construct==2.8.8
|
||||
ECPy==1.2.5
|
||||
Flask==3.1.0
|
||||
Flask-Cors==5.0.0
|
||||
frozenlist==1.5.0
|
||||
humanfriendly==10.0
|
||||
idna==3.10
|
||||
itsdangerous==2.2.0
|
||||
Jinja2==3.1.5
|
||||
loguru==0.7.3
|
||||
MarkupSafe==3.0.2
|
||||
multidict==6.1.0
|
||||
propcache==0.2.1
|
||||
pycryptodome==3.21.0
|
||||
pyfiglet==1.0.2
|
||||
pyreadline3==3.5.4
|
||||
python-dotenv==1.0.1
|
||||
PyYAML==6.0.2
|
||||
requests==2.32.3
|
||||
typing_extensions==4.12.2
|
||||
urllib3==2.3.0
|
||||
Werkzeug==3.1.3
|
||||
win32_setctime==1.2.0
|
||||
xmltodict==0.14.2
|
||||
yarl==1.18.3
|
||||
0
routes/__init__.py
Normal file
0
routes/__init__.py
Normal file
47
routes/playready.py
Normal file
47
routes/playready.py
Normal file
@@ -0,0 +1,47 @@
|
||||
from flask import Blueprint, request, jsonify
|
||||
from flask_cors import cross_origin, CORS
|
||||
from modules.config import apikey_required
|
||||
from modules.playready import PLAYREADY
|
||||
from modules.logging import setup_logging
|
||||
|
||||
# Setup logging
|
||||
logging = setup_logging()
|
||||
|
||||
playready_bp = Blueprint('playready_bp', __name__)
|
||||
CORS(playready_bp, resources={r"/*": {"origins": "*"}}, supports_credentials=True, allow_headers=["Content-Type", "X-API-KEY"])
|
||||
|
||||
@playready_bp.route('/extension', methods=['POST'])
|
||||
@apikey_required
|
||||
@cross_origin()
|
||||
def extension():
|
||||
if not request.is_json:
|
||||
return jsonify({"error": "Missing JSON in request"}), 400
|
||||
|
||||
data = request.get_json()
|
||||
action = data.get('action', None)
|
||||
|
||||
if not action:
|
||||
return jsonify({"error": "Missing action in request"}), 400
|
||||
|
||||
if action == "Challenge?":
|
||||
pssh = data.get('pssh', None)
|
||||
|
||||
if not pssh:
|
||||
return jsonify({"error": "Missing pssh in request"}), 400
|
||||
|
||||
playready = PLAYREADY()
|
||||
playready.pssh = pssh
|
||||
return playready.get_license_challenge()
|
||||
|
||||
elif action == "Keys?":
|
||||
license = data.get('license', None)
|
||||
|
||||
if not license:
|
||||
return jsonify({"error": "Missing license in request"}), 400
|
||||
|
||||
playready = PLAYREADY()
|
||||
playready.license = license
|
||||
return playready.get_license_keys()
|
||||
|
||||
else:
|
||||
return jsonify({"error": "Unknown action"}), 400
|
||||
Reference in New Issue
Block a user