Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f34018afe9 | ||
|
|
8298703601 | ||
|
|
e20f251aae | ||
|
|
a55aeb8cce | ||
|
|
23165f92de | ||
|
|
68db728bf0 | ||
|
|
53f7c1dd62 | ||
|
|
e9e65e5760 | ||
|
|
909e83c199 | ||
|
|
2bb5c9e0b5 | ||
|
|
7f60844ee1 | ||
|
|
59615dd804 |
37
CHANGELOG.md
Normal file
37
CHANGELOG.md
Normal file
@@ -0,0 +1,37 @@
|
||||
# Changelog
|
||||
|
||||
All notable changes to this project will be documented in this file.
|
||||
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [1.0.1] - 2021-07-21
|
||||
|
||||
### Added
|
||||
|
||||
- More information to the PyPI meta information, e.g., classifiers, readme, some URLs.
|
||||
|
||||
### Changed
|
||||
|
||||
- Moved the License Type parameter from the Cdm constructor to `get_license_challenge()`.
|
||||
- The Session ID is no longer used as the Request ID which could help with blocks or replay checks due
|
||||
to it being the same Session ID for each request. It's now a random 16 byte value each time.
|
||||
- Only the Context Data of each license request is now stored instead of the full message.
|
||||
|
||||
### Removed
|
||||
|
||||
- Removed unnecessary and unused `raw` Cdm class instance variable.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Cdm's `set_service_certificate()` now correctly raises a DecodeError on Decode Error instead of a ValueError.
|
||||
- Context Data will now always match to their corresponding License Responses. This fixes an issue where creating
|
||||
a second challenge would overwrite the context data of the first challenge. Parsing the first challenge after
|
||||
would result in either a key decrypt error, or garbage key data.
|
||||
|
||||
## [1.0.0] - 2021-07-20
|
||||
|
||||
Initial Release.
|
||||
|
||||
[1.0.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.0.1
|
||||
[1.0.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.0.0
|
||||
@@ -17,9 +17,6 @@ Widevine CDM (Content Decryption Module) implementation in Python.
|
||||
|
||||
*Credit*: w3.org
|
||||
|
||||
Almost the entire Widevine Protocol seen above is implemented except for Sessions which I will be implementing soon.
|
||||
Currently, only one session can be made and used at a time or problems will happen.
|
||||
|
||||
### Web Server
|
||||
|
||||
This may be an API/Server in front of a License Server. For example, Netflix's Custom MSL-based API front.
|
||||
|
||||
@@ -4,10 +4,26 @@ build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[tool.poetry]
|
||||
name = "pywidevine"
|
||||
version = "1.0.0"
|
||||
version = "1.0.1"
|
||||
description = "Widevine CDM (Content Decryption Module) implementation in Python."
|
||||
authors = ["rlaphoenix <rlaphoenix@pm.me>"]
|
||||
license = "GPL-3.0-only"
|
||||
readme = "README.md"
|
||||
repository = "https://github.com/rlaphoenix/pywidevine"
|
||||
keywords = ["widevine", "drm", "google"]
|
||||
classifiers = [
|
||||
"Development Status :: 4 - Beta",
|
||||
"Intended Audience :: Developers",
|
||||
"Intended Audience :: End Users/Desktop",
|
||||
"Natural Language :: English",
|
||||
"Operating System :: OS Independent",
|
||||
"Topic :: Multimedia :: Video",
|
||||
"Topic :: Security :: Cryptography"
|
||||
]
|
||||
|
||||
[tool.poetry.urls]
|
||||
"Bug Tracker" = "https://github.com/rlaphoenix/pywidevine/issues"
|
||||
"Forums" = "https://github.com/rlaphoenix/pywidevine/discussions"
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = ">=3.7,<3.11"
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "1.0.0"
|
||||
__version__ = "1.0.1"
|
||||
|
||||
@@ -45,13 +45,7 @@ class Cdm:
|
||||
NUM_OF_SESSIONS = 0
|
||||
MAX_NUM_OF_SESSIONS = 50 # most common limit
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
device: Device,
|
||||
pssh: Union[Container, bytes, str],
|
||||
license_type: LicenseType = LicenseType.STREAMING,
|
||||
raw: bool = False
|
||||
):
|
||||
def __init__(self, device: Device, pssh: Union[Container, bytes, str], raw: bool = False):
|
||||
"""
|
||||
Open a Widevine Content Decryption Module (CDM) session.
|
||||
|
||||
@@ -60,8 +54,6 @@ class Cdm:
|
||||
more device-specific information.
|
||||
pssh: Protection System Specific Header Box or Init Data. This should be a
|
||||
compliant mp4 pssh box, or just the init data (Widevine Cenc Header).
|
||||
license_type: Type of License you wish to exchange, often `STREAMING`.
|
||||
The `OFFLINE` Licenses are for Offline licensing of Downloaded content.
|
||||
raw: This should be set to True if the PSSH data provided is arbitrary data.
|
||||
E.g., a PSSH Box where the init data is not a Widevine Cenc Header, or
|
||||
is simply arbitrary data.
|
||||
@@ -70,10 +62,6 @@ class Cdm:
|
||||
The limit is different for each device and security level, most commonly 50.
|
||||
This limit is handled by the OEM Crypto API. Multiple sessions can be open at
|
||||
a time and sessions should be closed when no longer needed.
|
||||
|
||||
If an API or System requests a Widevine Session ID, it is best to provide it
|
||||
the real Session ID created here (self.session_id) instead of an arbitrary or
|
||||
random value.
|
||||
"""
|
||||
if not device:
|
||||
raise ValueError("A Widevine Device must be provided.")
|
||||
@@ -90,16 +78,14 @@ class Cdm:
|
||||
|
||||
self.device = device
|
||||
self.init_data = pssh
|
||||
self.license_type = license_type
|
||||
self.raw = raw
|
||||
|
||||
if not self.raw:
|
||||
if not raw:
|
||||
# we only want the init_data of the pssh box
|
||||
self.init_data = PSSH.get_as_box(pssh).init_data
|
||||
|
||||
self.session_id = self.create_session_id(self.device)
|
||||
self.session_id = get_random_bytes(16)
|
||||
self.service_certificate: Optional[SignedMessage] = None
|
||||
self.license_request: Optional[SignedMessage] = None
|
||||
self.context: dict[bytes, tuple[bytes, bytes]] = {}
|
||||
|
||||
def set_service_certificate(self, certificate: Union[bytes, str]) -> SignedMessage:
|
||||
"""
|
||||
@@ -122,23 +108,27 @@ class Cdm:
|
||||
signed_message = SignedMessage()
|
||||
try:
|
||||
signed_message.ParseFromString(certificate)
|
||||
except DecodeError:
|
||||
raise ValueError("Could not parse certificate as a Signed Message.")
|
||||
except DecodeError as e:
|
||||
raise DecodeError(f"Could not parse certificate as a Signed Message: {e}")
|
||||
|
||||
self.service_certificate = signed_message
|
||||
return signed_message
|
||||
|
||||
def get_license_challenge(self, privacy_mode: bool = True) -> bytes:
|
||||
def get_license_challenge(self, type_: LicenseType = LicenseType.STREAMING, privacy_mode: bool = True) -> bytes:
|
||||
"""
|
||||
Get a License Challenge to send to a License Server.
|
||||
|
||||
Parameters:
|
||||
type_: Type of License you wish to exchange, often `STREAMING`.
|
||||
The `OFFLINE` Licenses are for Offline licensing of Downloaded content.
|
||||
privacy_mode: Encrypt the Client ID using the Privacy Certificate. If the
|
||||
privacy certificate is not set yet, this does nothing.
|
||||
|
||||
Returns a SignedMessage containing a LicenseRequest message. It's signed with
|
||||
the Private Key of the device provision.
|
||||
"""
|
||||
request_id = get_random_bytes(16)
|
||||
|
||||
license_request = LicenseRequest()
|
||||
license_request.type = LicenseRequest.RequestType.Value("NEW")
|
||||
license_request.request_time = int(time.time())
|
||||
@@ -146,8 +136,8 @@ class Cdm:
|
||||
license_request.key_control_nonce = random.randrange(1, 2 ** 31)
|
||||
|
||||
license_request.content_id.widevine_pssh_data.pssh_data.append(self.init_data)
|
||||
license_request.content_id.widevine_pssh_data.license_type = self.license_type
|
||||
license_request.content_id.widevine_pssh_data.request_id = self.session_id
|
||||
license_request.content_id.widevine_pssh_data.license_type = type_
|
||||
license_request.content_id.widevine_pssh_data.request_id = request_id
|
||||
|
||||
if self.service_certificate and privacy_mode:
|
||||
# encrypt the client id for privacy mode
|
||||
@@ -166,17 +156,11 @@ class Cdm:
|
||||
new(self.device.private_key). \
|
||||
sign(SHA1.new(license_message.msg))
|
||||
|
||||
# store it for later, we need it for deriving keys when parsing a license
|
||||
self.license_request = license_message
|
||||
self.context[request_id] = self.derive_context(license_message.msg)
|
||||
|
||||
return license_message.SerializeToString()
|
||||
|
||||
def parse_license(self, license_message: Union[bytes, str]) -> list[Key]:
|
||||
# TODO: What if the CDM generates a 2nd challenge, overwriting the previous making it mismatch
|
||||
# for deriving keys? We need to make self.license_request always match the license_message
|
||||
if not self.license_request:
|
||||
raise ValueError("Cannot parse a license message without first making a license request")
|
||||
|
||||
if not license_message:
|
||||
raise ValueError("Cannot parse an empty license_message as a SignedMessage")
|
||||
|
||||
@@ -195,11 +179,15 @@ class Cdm:
|
||||
licence = License()
|
||||
licence.ParseFromString(license_message.msg)
|
||||
|
||||
context = self.context[licence.id.request_id]
|
||||
if not context:
|
||||
raise ValueError("Cannot parse a license message without first making a license request")
|
||||
|
||||
session_key = PKCS1_OAEP. \
|
||||
new(self.device.private_key). \
|
||||
decrypt(license_message.session_key)
|
||||
|
||||
enc_key, mac_key_server, mac_key_client = self.derive_keys(self.license_request.msg, session_key)
|
||||
enc_key, mac_key_server, mac_key_client = self.derive_keys(*context, session_key)
|
||||
|
||||
license_signature = HMAC. \
|
||||
new(mac_key_server, digestmod=SHA256). \
|
||||
@@ -262,21 +250,6 @@ class Cdm:
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise subprocess.SubprocessError(f"Failed to Decrypt! Shaka Packager Error: {e}")
|
||||
|
||||
def create_session_id(self, device: Device) -> bytes:
|
||||
"""Create a Session ID based on OEM Crypto API session values."""
|
||||
if device.type == device.Types.ANDROID:
|
||||
session_id = "{hex:16X}{counter}".format(
|
||||
hex=random.getrandbits(64),
|
||||
counter=f"{self.NUM_OF_SESSIONS:02}"
|
||||
)
|
||||
session_id.ljust(32, "0")
|
||||
return session_id.encode("ascii")
|
||||
|
||||
if device.type == device.Types.CHROME:
|
||||
return get_random_bytes(16)
|
||||
|
||||
raise ValueError(f"Device Type {device.type.name} is not implemented")
|
||||
|
||||
@staticmethod
|
||||
def encrypt_client_id(
|
||||
client_id: ClientIdentification,
|
||||
@@ -317,7 +290,23 @@ class Cdm:
|
||||
return enc_client_id
|
||||
|
||||
@staticmethod
|
||||
def derive_keys(msg: bytes, key: bytes) -> tuple[bytes, bytes, bytes]:
|
||||
def derive_context(message: bytes) -> tuple[bytes, bytes]:
|
||||
"""Returns 2 Context Data used for computing the AES Encryption and HMAC Keys."""
|
||||
|
||||
def _get_enc_context(msg: bytes) -> bytes:
|
||||
label = b"ENCRYPTION"
|
||||
key_size = 16 * 8 # 128-bit
|
||||
return label + b"\x00" + msg + key_size.to_bytes(4, "big")
|
||||
|
||||
def _get_mac_context(msg: bytes) -> bytes:
|
||||
label = b"AUTHENTICATION"
|
||||
key_size = 32 * 8 * 2 # 512-bit
|
||||
return label + b"\x00" + msg + key_size.to_bytes(4, "big")
|
||||
|
||||
return _get_enc_context(message), _get_mac_context(message)
|
||||
|
||||
@staticmethod
|
||||
def derive_keys(enc_context: bytes, mac_context: bytes, key: bytes) -> tuple[bytes, bytes, bytes]:
|
||||
"""
|
||||
Returns 3 keys derived from the input message.
|
||||
Key can either be a pre-provision device aes key, provision key, or a session key.
|
||||
@@ -338,24 +327,11 @@ class Cdm:
|
||||
keys and verify licenses.
|
||||
"""
|
||||
|
||||
def get_enc_context(message: bytes) -> bytes:
|
||||
label = b"ENCRYPTION"
|
||||
key_size = 16 * 8 # 128-bit
|
||||
return label + b"\x00" + message + key_size.to_bytes(4, "big")
|
||||
|
||||
def get_mac_context(message: bytes) -> bytes:
|
||||
label = b"AUTHENTICATION"
|
||||
key_size = 32 * 8 * 2 # 512-bit
|
||||
return label + b"\x00" + message + key_size.to_bytes(4, "big")
|
||||
|
||||
def _derive(session_key: bytes, context: bytes, counter: int) -> bytes:
|
||||
return CMAC.new(session_key, ciphermod=AES). \
|
||||
update(counter.to_bytes(1, "big") + context). \
|
||||
digest()
|
||||
|
||||
enc_context = get_enc_context(msg)
|
||||
mac_context = get_mac_context(msg)
|
||||
|
||||
enc_key = _derive(key, enc_context, 1)
|
||||
mac_key_server = _derive(key, mac_context, 1)
|
||||
mac_key_server += _derive(key, mac_context, 2)
|
||||
|
||||
@@ -60,15 +60,13 @@ def license_(device: Path, pssh: str, server: str, type_: str, raw: bool, privac
|
||||
"""
|
||||
log = logging.getLogger("license")
|
||||
|
||||
type_ = LicenseType.Value(type_)
|
||||
|
||||
# load device
|
||||
device = Device.load(device)
|
||||
log.info(f"[+] Loaded Device ({device.system_id} L{device.security_level})")
|
||||
log.debug(device)
|
||||
|
||||
# load cdm
|
||||
cdm = Cdm(device, pssh, type_, raw)
|
||||
cdm = Cdm(device, pssh, raw)
|
||||
log.info(f"[+] Loaded CDM with PSSH: {pssh}")
|
||||
log.debug(cdm)
|
||||
|
||||
@@ -87,7 +85,8 @@ def license_(device: Path, pssh: str, server: str, type_: str, raw: bool, privac
|
||||
log.debug(service_cert)
|
||||
|
||||
# get license challenge
|
||||
challenge = cdm.get_license_challenge(privacy_mode=True)
|
||||
license_type = LicenseType.Value(type_)
|
||||
challenge = cdm.get_license_challenge(license_type, privacy_mode=True)
|
||||
log.info("[+] Created License Request Message (Challenge)")
|
||||
log.debug(challenge)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user