Compare commits
31 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e1951d20d0 | ||
|
|
35abd2962f | ||
|
|
b262e115d3 | ||
|
|
95982725c3 | ||
|
|
70e79825b3 | ||
|
|
f2174dfa72 | ||
|
|
fe21bfe88c | ||
|
|
93f70f73c2 | ||
|
|
1442c945cc | ||
|
|
a729648a34 | ||
|
|
3d6ddb8dcd | ||
|
|
b41f09bee4 | ||
|
|
db80776ac0 | ||
|
|
02ca1b00c9 | ||
|
|
7b06a3c053 | ||
|
|
14126c67b1 | ||
|
|
5e93d6321d | ||
|
|
1f389dbab9 | ||
|
|
ac4c8affb0 | ||
|
|
f34018afe9 | ||
|
|
8298703601 | ||
|
|
e20f251aae | ||
|
|
a55aeb8cce | ||
|
|
23165f92de | ||
|
|
68db728bf0 | ||
|
|
53f7c1dd62 | ||
|
|
e9e65e5760 | ||
|
|
909e83c199 | ||
|
|
2bb5c9e0b5 | ||
|
|
7f60844ee1 | ||
|
|
59615dd804 |
66
CHANGELOG.md
Normal file
66
CHANGELOG.md
Normal file
@@ -0,0 +1,66 @@
|
||||
# Changelog
|
||||
|
||||
All notable changes to this project will be documented in this file.
|
||||
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [1.1.0] - 2022-07-21
|
||||
|
||||
### Added
|
||||
|
||||
- Added support for setting a Service Certificate in SignedDrmCertificate form as well as raw DrmCertificate form.
|
||||
However, It's unlikely for the service to provide the certificate in raw DrmCertificate form without a signature.
|
||||
- Added a CLI command `create-device` to create Widevine Device (`.wvd`) files from RSA PEM/DER Private Keys and
|
||||
Client ID blobs. You can also provide VMP (FileHashes) data which will be merged into the Client ID blob.
|
||||
- Added a CLI command `migrate` that uses `Device.migrate()` and `dump()` to migrate older v1 Widevine Device files
|
||||
to v2.
|
||||
- Added the v1 Structure of Widevine Devices for migration use.
|
||||
- Added `Device.migrate()` class method that effectively loads older format WVD data. You can then use `dumps()` to
|
||||
get back the WVD data in the latest supported format.
|
||||
- Added ability to use Privacy mode on the test command.
|
||||
|
||||
### Changed
|
||||
|
||||
- Set Service Certificates are now stored as the raw underlying DrmCertificate as the signature data is unused by
|
||||
the CDM.
|
||||
- Moved all Widevine Device structures under a Structures class.
|
||||
- I removed the `send_key_control_nonce` flag from all Structures even though it was technically used.
|
||||
This is because the flag was never used as of this project, and I do not want to take up the flag slot.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Devices `dump()` function now uses the correct `type_` parameter when building the struct.
|
||||
- Fixed release date year of v1.0.0 and v1.0.1 in the changelog.
|
||||
|
||||
## [1.0.1] - 2022-07-21
|
||||
|
||||
### Added
|
||||
|
||||
- More information to the PyPI meta information, e.g., classifiers, readme, some URLs.
|
||||
|
||||
### Changed
|
||||
|
||||
- Moved the License Type parameter from the Cdm constructor to `get_license_challenge()`.
|
||||
- The Session ID is no longer used as the Request ID which could help with blocks or replay checks due
|
||||
to it being the same Session ID for each request. It's now a random 16 byte value each time.
|
||||
- Only the Context Data of each license request is now stored instead of the full message.
|
||||
|
||||
### Removed
|
||||
|
||||
- Removed unnecessary and unused `raw` Cdm class instance variable.
|
||||
|
||||
### Fixed
|
||||
|
||||
- CDMs `set_service_certificate()` now correctly raises a DecodeError on Decode Error instead of a ValueError.
|
||||
- Context Data will now always match to their corresponding License Responses. This fixes an issue where creating
|
||||
a second challenge would overwrite the context data of the first challenge. Parsing the first challenge after
|
||||
would result in either a key decrypt error, or garbage key data.
|
||||
|
||||
## [1.0.0] - 2022-07-20
|
||||
|
||||
Initial Release.
|
||||
|
||||
[1.1.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.1.0
|
||||
[1.0.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.0.1
|
||||
[1.0.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.0.0
|
||||
@@ -17,9 +17,6 @@ Widevine CDM (Content Decryption Module) implementation in Python.
|
||||
|
||||
*Credit*: w3.org
|
||||
|
||||
Almost the entire Widevine Protocol seen above is implemented except for Sessions which I will be implementing soon.
|
||||
Currently, only one session can be made and used at a time or problems will happen.
|
||||
|
||||
### Web Server
|
||||
|
||||
This may be an API/Server in front of a License Server. For example, Netflix's Custom MSL-based API front.
|
||||
|
||||
14
poetry.lock
generated
14
poetry.lock
generated
@@ -137,6 +137,14 @@ category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
|
||||
[[package]]
|
||||
name = "unidecode"
|
||||
version = "1.3.4"
|
||||
description = "ASCII transliterations of Unicode text"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.5"
|
||||
|
||||
[[package]]
|
||||
name = "urllib3"
|
||||
version = "1.26.10"
|
||||
@@ -165,7 +173,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-
|
||||
[metadata]
|
||||
lock-version = "1.1"
|
||||
python-versions = ">=3.7,<3.11"
|
||||
content-hash = "9c6a76629e0f0a4e98b6c47707899519f930debd70312d2e909eb42f94cd212f"
|
||||
content-hash = "0720732d3b990a11e3b35f4604103364ae5654d1ac6e31ead17c03e566a016d0"
|
||||
|
||||
[metadata.files]
|
||||
certifi = []
|
||||
@@ -195,5 +203,9 @@ pycryptodome = []
|
||||
pymp4 = []
|
||||
requests = []
|
||||
typing-extensions = []
|
||||
unidecode = [
|
||||
{file = "Unidecode-1.3.4-py3-none-any.whl", hash = "sha256:afa04efcdd818a93237574791be9b2817d7077c25a068b00f8cff7baa4e59257"},
|
||||
{file = "Unidecode-1.3.4.tar.gz", hash = "sha256:8e4352fb93d5a735c788110d2e7ac8e8031eb06ccbfe8d324ab71735015f9342"},
|
||||
]
|
||||
urllib3 = []
|
||||
zipp = []
|
||||
|
||||
@@ -4,10 +4,26 @@ build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[tool.poetry]
|
||||
name = "pywidevine"
|
||||
version = "1.0.0"
|
||||
version = "1.1.0"
|
||||
description = "Widevine CDM (Content Decryption Module) implementation in Python."
|
||||
authors = ["rlaphoenix <rlaphoenix@pm.me>"]
|
||||
license = "GPL-3.0-only"
|
||||
readme = "README.md"
|
||||
repository = "https://github.com/rlaphoenix/pywidevine"
|
||||
keywords = ["widevine", "drm", "google"]
|
||||
classifiers = [
|
||||
"Development Status :: 4 - Beta",
|
||||
"Intended Audience :: Developers",
|
||||
"Intended Audience :: End Users/Desktop",
|
||||
"Natural Language :: English",
|
||||
"Operating System :: OS Independent",
|
||||
"Topic :: Multimedia :: Video",
|
||||
"Topic :: Security :: Cryptography"
|
||||
]
|
||||
|
||||
[tool.poetry.urls]
|
||||
"Bug Tracker" = "https://github.com/rlaphoenix/pywidevine/issues"
|
||||
"Forums" = "https://github.com/rlaphoenix/pywidevine/discussions"
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = ">=3.7,<3.11"
|
||||
@@ -17,6 +33,7 @@ pycryptodome = "^3.15.0"
|
||||
click = "^8.1.3"
|
||||
requests = "^2.28.1"
|
||||
lxml = "^4.9.1"
|
||||
Unidecode = "^1.3.4"
|
||||
|
||||
[tool.poetry.scripts]
|
||||
pywidevine = "pywidevine.main:main"
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "1.0.0"
|
||||
__version__ = "1.1.0"
|
||||
|
||||
@@ -45,13 +45,7 @@ class Cdm:
|
||||
NUM_OF_SESSIONS = 0
|
||||
MAX_NUM_OF_SESSIONS = 50 # most common limit
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
device: Device,
|
||||
pssh: Union[Container, bytes, str],
|
||||
license_type: LicenseType = LicenseType.STREAMING,
|
||||
raw: bool = False
|
||||
):
|
||||
def __init__(self, device: Device, pssh: Union[Container, bytes, str], raw: bool = False):
|
||||
"""
|
||||
Open a Widevine Content Decryption Module (CDM) session.
|
||||
|
||||
@@ -60,8 +54,6 @@ class Cdm:
|
||||
more device-specific information.
|
||||
pssh: Protection System Specific Header Box or Init Data. This should be a
|
||||
compliant mp4 pssh box, or just the init data (Widevine Cenc Header).
|
||||
license_type: Type of License you wish to exchange, often `STREAMING`.
|
||||
The `OFFLINE` Licenses are for Offline licensing of Downloaded content.
|
||||
raw: This should be set to True if the PSSH data provided is arbitrary data.
|
||||
E.g., a PSSH Box where the init data is not a Widevine Cenc Header, or
|
||||
is simply arbitrary data.
|
||||
@@ -70,10 +62,6 @@ class Cdm:
|
||||
The limit is different for each device and security level, most commonly 50.
|
||||
This limit is handled by the OEM Crypto API. Multiple sessions can be open at
|
||||
a time and sessions should be closed when no longer needed.
|
||||
|
||||
If an API or System requests a Widevine Session ID, it is best to provide it
|
||||
the real Session ID created here (self.session_id) instead of an arbitrary or
|
||||
random value.
|
||||
"""
|
||||
if not device:
|
||||
raise ValueError("A Widevine Device must be provided.")
|
||||
@@ -90,18 +78,16 @@ class Cdm:
|
||||
|
||||
self.device = device
|
||||
self.init_data = pssh
|
||||
self.license_type = license_type
|
||||
self.raw = raw
|
||||
|
||||
if not self.raw:
|
||||
if not raw:
|
||||
# we only want the init_data of the pssh box
|
||||
self.init_data = PSSH.get_as_box(pssh).init_data
|
||||
|
||||
self.session_id = self.create_session_id(self.device)
|
||||
self.service_certificate: Optional[SignedMessage] = None
|
||||
self.license_request: Optional[SignedMessage] = None
|
||||
self.session_id = get_random_bytes(16)
|
||||
self.service_certificate: Optional[DrmCertificate] = None
|
||||
self.context: dict[bytes, tuple[bytes, bytes]] = {}
|
||||
|
||||
def set_service_certificate(self, certificate: Union[bytes, str]) -> SignedMessage:
|
||||
def set_service_certificate(self, certificate: Union[bytes, str]) -> DrmCertificate:
|
||||
"""
|
||||
Set a Service Privacy Certificate for Privacy Mode. (optional but recommended)
|
||||
|
||||
@@ -110,7 +96,7 @@ class Cdm:
|
||||
Some services have their own, but most use the common privacy cert,
|
||||
(common_privacy_cert).
|
||||
|
||||
Returns the parsed Signed Message if successful, otherwise raises a DecodeError.
|
||||
Returns the parsed Drm Certificate if successful, otherwise raises a DecodeError.
|
||||
|
||||
The Service Certificate is used to encrypt Client IDs in Licenses. This is also
|
||||
known as Privacy Mode and may be required for some services or for some devices.
|
||||
@@ -120,25 +106,64 @@ class Cdm:
|
||||
certificate = base64.b64decode(certificate) # assuming base64
|
||||
|
||||
signed_message = SignedMessage()
|
||||
try:
|
||||
signed_drm_certificate = SignedDrmCertificate()
|
||||
drm_certificate = DrmCertificate()
|
||||
|
||||
# Note: A secure CDM would likely reject any Service Certificate that is
|
||||
# not either a SignedMessage or a SignedDrmCertificate. This is because
|
||||
# the DrmCertificate itself is not signed. This CDM does not verify the
|
||||
# signatures as I'm not sure what HMAC key is used. At this stage of the
|
||||
# CDM flow, we wouldn't have any mac_keys, and those might not work for
|
||||
# verifying service certificate signatures (likely not).
|
||||
|
||||
# All these 3 schemas can sort of parse each other in a minimal buggy way,
|
||||
# so we have to parse down the full chain instead of relaying each step
|
||||
|
||||
try: # SignedMessage input
|
||||
signed_message.ParseFromString(certificate)
|
||||
signed_drm_certificate.ParseFromString(signed_message.msg)
|
||||
if not signed_drm_certificate.drm_certificate:
|
||||
raise DecodeError()
|
||||
drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate)
|
||||
self.service_certificate = drm_certificate
|
||||
return self.service_certificate
|
||||
except DecodeError:
|
||||
raise ValueError("Could not parse certificate as a Signed Message.")
|
||||
pass
|
||||
|
||||
self.service_certificate = signed_message
|
||||
return signed_message
|
||||
try: # SignedDrmCertificate input
|
||||
signed_drm_certificate.ParseFromString(certificate)
|
||||
if not signed_drm_certificate.drm_certificate:
|
||||
raise DecodeError()
|
||||
drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate)
|
||||
self.service_certificate = drm_certificate
|
||||
return self.service_certificate
|
||||
except DecodeError:
|
||||
pass
|
||||
|
||||
def get_license_challenge(self, privacy_mode: bool = True) -> bytes:
|
||||
try: # DrmCertificate input
|
||||
drm_certificate.ParseFromString(certificate)
|
||||
self.service_certificate = drm_certificate
|
||||
return self.service_certificate
|
||||
except DecodeError:
|
||||
pass
|
||||
|
||||
raise DecodeError("Could not parse certificate as a Service Certificate")
|
||||
|
||||
def get_license_challenge(self, type_: LicenseType = LicenseType.STREAMING, privacy_mode: bool = True) -> bytes:
|
||||
"""
|
||||
Get a License Challenge to send to a License Server.
|
||||
|
||||
Parameters:
|
||||
type_: Type of License you wish to exchange, often `STREAMING`.
|
||||
The `OFFLINE` Licenses are for Offline licensing of Downloaded content.
|
||||
privacy_mode: Encrypt the Client ID using the Privacy Certificate. If the
|
||||
privacy certificate is not set yet, this does nothing.
|
||||
|
||||
Returns a SignedMessage containing a LicenseRequest message. It's signed with
|
||||
the Private Key of the device provision.
|
||||
"""
|
||||
request_id = get_random_bytes(16)
|
||||
|
||||
license_request = LicenseRequest()
|
||||
license_request.type = LicenseRequest.RequestType.Value("NEW")
|
||||
license_request.request_time = int(time.time())
|
||||
@@ -146,8 +171,8 @@ class Cdm:
|
||||
license_request.key_control_nonce = random.randrange(1, 2 ** 31)
|
||||
|
||||
license_request.content_id.widevine_pssh_data.pssh_data.append(self.init_data)
|
||||
license_request.content_id.widevine_pssh_data.license_type = self.license_type
|
||||
license_request.content_id.widevine_pssh_data.request_id = self.session_id
|
||||
license_request.content_id.widevine_pssh_data.license_type = type_
|
||||
license_request.content_id.widevine_pssh_data.request_id = request_id
|
||||
|
||||
if self.service_certificate and privacy_mode:
|
||||
# encrypt the client id for privacy mode
|
||||
@@ -166,17 +191,11 @@ class Cdm:
|
||||
new(self.device.private_key). \
|
||||
sign(SHA1.new(license_message.msg))
|
||||
|
||||
# store it for later, we need it for deriving keys when parsing a license
|
||||
self.license_request = license_message
|
||||
self.context[request_id] = self.derive_context(license_message.msg)
|
||||
|
||||
return license_message.SerializeToString()
|
||||
|
||||
def parse_license(self, license_message: Union[bytes, str]) -> list[Key]:
|
||||
# TODO: What if the CDM generates a 2nd challenge, overwriting the previous making it mismatch
|
||||
# for deriving keys? We need to make self.license_request always match the license_message
|
||||
if not self.license_request:
|
||||
raise ValueError("Cannot parse a license message without first making a license request")
|
||||
|
||||
if not license_message:
|
||||
raise ValueError("Cannot parse an empty license_message as a SignedMessage")
|
||||
|
||||
@@ -195,11 +214,15 @@ class Cdm:
|
||||
licence = License()
|
||||
licence.ParseFromString(license_message.msg)
|
||||
|
||||
context = self.context[licence.id.request_id]
|
||||
if not context:
|
||||
raise ValueError("Cannot parse a license message without first making a license request")
|
||||
|
||||
session_key = PKCS1_OAEP. \
|
||||
new(self.device.private_key). \
|
||||
decrypt(license_message.session_key)
|
||||
|
||||
enc_key, mac_key_server, mac_key_client = self.derive_keys(self.license_request.msg, session_key)
|
||||
enc_key, mac_key_server, mac_key_client = self.derive_keys(*context, session_key)
|
||||
|
||||
license_signature = HMAC. \
|
||||
new(mac_key_server, digestmod=SHA256). \
|
||||
@@ -262,25 +285,10 @@ class Cdm:
|
||||
except subprocess.CalledProcessError as e:
|
||||
raise subprocess.SubprocessError(f"Failed to Decrypt! Shaka Packager Error: {e}")
|
||||
|
||||
def create_session_id(self, device: Device) -> bytes:
|
||||
"""Create a Session ID based on OEM Crypto API session values."""
|
||||
if device.type == device.Types.ANDROID:
|
||||
session_id = "{hex:16X}{counter}".format(
|
||||
hex=random.getrandbits(64),
|
||||
counter=f"{self.NUM_OF_SESSIONS:02}"
|
||||
)
|
||||
session_id.ljust(32, "0")
|
||||
return session_id.encode("ascii")
|
||||
|
||||
if device.type == device.Types.CHROME:
|
||||
return get_random_bytes(16)
|
||||
|
||||
raise ValueError(f"Device Type {device.type.name} is not implemented")
|
||||
|
||||
@staticmethod
|
||||
def encrypt_client_id(
|
||||
client_id: ClientIdentification,
|
||||
service_certificate: Union[SignedMessage, SignedDrmCertificate, DrmCertificate],
|
||||
service_certificate: DrmCertificate,
|
||||
key: bytes = None,
|
||||
iv: bytes = None
|
||||
) -> EncryptedClientIdentification:
|
||||
@@ -288,18 +296,8 @@ class Cdm:
|
||||
privacy_key = key or get_random_bytes(16)
|
||||
privacy_iv = iv or get_random_bytes(16)
|
||||
|
||||
if isinstance(service_certificate, SignedMessage):
|
||||
signed_service_certificate = SignedDrmCertificate()
|
||||
signed_service_certificate.ParseFromString(service_certificate.msg)
|
||||
service_certificate = signed_service_certificate
|
||||
|
||||
if isinstance(service_certificate, SignedDrmCertificate):
|
||||
service_service_drm_certificate = DrmCertificate()
|
||||
service_service_drm_certificate.ParseFromString(service_certificate.drm_certificate)
|
||||
service_certificate = service_service_drm_certificate
|
||||
|
||||
if not isinstance(service_certificate, DrmCertificate):
|
||||
raise ValueError(f"Service Certificate is in an unexpected type {service_certificate!r}")
|
||||
raise ValueError(f"Expecting Service Certificate to be a DrmCertificate, not {service_certificate!r}")
|
||||
|
||||
enc_client_id = EncryptedClientIdentification()
|
||||
enc_client_id.provider_id = service_certificate.provider_id
|
||||
@@ -317,7 +315,23 @@ class Cdm:
|
||||
return enc_client_id
|
||||
|
||||
@staticmethod
|
||||
def derive_keys(msg: bytes, key: bytes) -> tuple[bytes, bytes, bytes]:
|
||||
def derive_context(message: bytes) -> tuple[bytes, bytes]:
|
||||
"""Returns 2 Context Data used for computing the AES Encryption and HMAC Keys."""
|
||||
|
||||
def _get_enc_context(msg: bytes) -> bytes:
|
||||
label = b"ENCRYPTION"
|
||||
key_size = 16 * 8 # 128-bit
|
||||
return label + b"\x00" + msg + key_size.to_bytes(4, "big")
|
||||
|
||||
def _get_mac_context(msg: bytes) -> bytes:
|
||||
label = b"AUTHENTICATION"
|
||||
key_size = 32 * 8 * 2 # 512-bit
|
||||
return label + b"\x00" + msg + key_size.to_bytes(4, "big")
|
||||
|
||||
return _get_enc_context(message), _get_mac_context(message)
|
||||
|
||||
@staticmethod
|
||||
def derive_keys(enc_context: bytes, mac_context: bytes, key: bytes) -> tuple[bytes, bytes, bytes]:
|
||||
"""
|
||||
Returns 3 keys derived from the input message.
|
||||
Key can either be a pre-provision device aes key, provision key, or a session key.
|
||||
@@ -338,24 +352,11 @@ class Cdm:
|
||||
keys and verify licenses.
|
||||
"""
|
||||
|
||||
def get_enc_context(message: bytes) -> bytes:
|
||||
label = b"ENCRYPTION"
|
||||
key_size = 16 * 8 # 128-bit
|
||||
return label + b"\x00" + message + key_size.to_bytes(4, "big")
|
||||
|
||||
def get_mac_context(message: bytes) -> bytes:
|
||||
label = b"AUTHENTICATION"
|
||||
key_size = 32 * 8 * 2 # 512-bit
|
||||
return label + b"\x00" + message + key_size.to_bytes(4, "big")
|
||||
|
||||
def _derive(session_key: bytes, context: bytes, counter: int) -> bytes:
|
||||
return CMAC.new(session_key, ciphermod=AES). \
|
||||
update(counter.to_bytes(1, "big") + context). \
|
||||
digest()
|
||||
|
||||
enc_context = get_enc_context(msg)
|
||||
mac_context = get_mac_context(msg)
|
||||
|
||||
enc_key = _derive(key, enc_context, 1)
|
||||
mac_key_server = _derive(key, mac_context, 1)
|
||||
mac_key_server += _derive(key, mac_context, 2)
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import logging
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
from construct import BitStruct, Bytes, Const
|
||||
from construct import BitStruct, Bytes, Const, ConstructError, Container
|
||||
from construct import Enum as CEnum
|
||||
from construct import Flag, Int8ub, Int16ub
|
||||
from construct import Int8ub, Int16ub
|
||||
from construct import Optional as COptional
|
||||
from construct import Padded, Padding, Struct, this
|
||||
from Crypto.PublicKey import RSA
|
||||
@@ -21,12 +22,16 @@ class _Types(Enum):
|
||||
ANDROID = 2
|
||||
|
||||
|
||||
class Device:
|
||||
# needed so bin_format can enumerate the types
|
||||
Types = _Types
|
||||
class _Structures:
|
||||
magic = Const(b"WVD")
|
||||
|
||||
bin_format = Struct(
|
||||
"signature" / Const(b"WVD"),
|
||||
header = Struct(
|
||||
"signature" / magic,
|
||||
"version" / Int8ub
|
||||
)
|
||||
|
||||
v2 = Struct(
|
||||
"signature" / magic,
|
||||
"version" / Const(Int8ub, 2),
|
||||
"type_" / CEnum(
|
||||
Int8ub,
|
||||
@@ -34,8 +39,8 @@ class Device:
|
||||
),
|
||||
"security_level" / Int8ub,
|
||||
"flags" / Padded(1, COptional(BitStruct(
|
||||
Padding(7),
|
||||
"send_key_control_nonce" / Flag # deprecated, do not use
|
||||
# no per-device flags yet
|
||||
Padding(8)
|
||||
))),
|
||||
"private_key_len" / Int16ub,
|
||||
"private_key" / Bytes(this.private_key_len),
|
||||
@@ -43,6 +48,32 @@ class Device:
|
||||
"client_id" / Bytes(this.client_id_len)
|
||||
)
|
||||
|
||||
v1 = Struct(
|
||||
"signature" / magic,
|
||||
"version" / Const(Int8ub, 1),
|
||||
"type_" / CEnum(
|
||||
Int8ub,
|
||||
**{t.name: t.value for t in _Types}
|
||||
),
|
||||
"security_level" / Int8ub,
|
||||
"flags" / Padded(1, COptional(BitStruct(
|
||||
# no per-device flags yet
|
||||
Padding(8)
|
||||
))),
|
||||
"private_key_len" / Int16ub,
|
||||
"private_key" / Bytes(this.private_key_len),
|
||||
"client_id_len" / Int16ub,
|
||||
"client_id" / Bytes(this.client_id_len),
|
||||
"vmp_len" / Int16ub,
|
||||
"vmp" / Bytes(this.vmp_len)
|
||||
)
|
||||
|
||||
|
||||
class Device:
|
||||
Types = _Types
|
||||
Structures = _Structures
|
||||
supported_structure = Structures.v2
|
||||
|
||||
# == Bin Format Revisions == #
|
||||
# Version 2: Removed vmp and vmp_len as it should already be within the Client ID
|
||||
# Version 1: Removed system_id as it can be retrieved from the Client ID's DRM Certificate
|
||||
@@ -109,20 +140,20 @@ class Device:
|
||||
data = base64.b64decode(data)
|
||||
if not isinstance(data, bytes):
|
||||
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
|
||||
return cls(**cls.bin_format.parse(data))
|
||||
return cls(**cls.supported_structure.parse(data))
|
||||
|
||||
@classmethod
|
||||
def load(cls, path: Union[Path, str]) -> Device:
|
||||
if not isinstance(path, (Path, str)):
|
||||
raise ValueError(f"Expecting Path object or path string, got {path!r}")
|
||||
with Path(path).open(mode="rb") as f:
|
||||
return cls(**cls.bin_format.parse_stream(f))
|
||||
return cls(**cls.supported_structure.parse_stream(f))
|
||||
|
||||
def dumps(self) -> bytes:
|
||||
private_key = self.private_key.export_key("DER") if self.private_key else None
|
||||
return self.bin_format.build(dict(
|
||||
return self.supported_structure.build(dict(
|
||||
version=2,
|
||||
type=self.type.value,
|
||||
type_=self.type.value,
|
||||
security_level=self.security_level,
|
||||
flags=self.flags,
|
||||
private_key_len=len(private_key) if private_key else 0,
|
||||
@@ -138,5 +169,54 @@ class Device:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_bytes(self.dumps())
|
||||
|
||||
@classmethod
|
||||
def migrate(cls, data: Union[bytes, str]) -> Device:
|
||||
if isinstance(data, str):
|
||||
data = base64.b64decode(data)
|
||||
if not isinstance(data, bytes):
|
||||
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
|
||||
|
||||
header = _Structures.header.parse(data)
|
||||
if header.version == 2:
|
||||
raise ValueError("Device Data is already migrated to the latest version.")
|
||||
if header.version == 0 or header.version > 2:
|
||||
# we have never used version 0, likely data that just so happened to use the WVD magic
|
||||
raise ValueError("Device Data does not seem to be a WVD file (v0).")
|
||||
|
||||
if header.version == 1: # v1 to v2
|
||||
data = _Structures.v1.parse(data)
|
||||
data.version = 2 # update version to 2 to allow loading
|
||||
data.flags = Container() # blank flags that may have been used in v1
|
||||
|
||||
vmp = FileHashes()
|
||||
if data.vmp:
|
||||
try:
|
||||
vmp.ParseFromString(data.vmp)
|
||||
except DecodeError as e:
|
||||
raise DecodeError(f"Failed to parse VMP data as FileHashes, {e}")
|
||||
data.vmp = vmp
|
||||
|
||||
client_id = ClientIdentification()
|
||||
try:
|
||||
client_id.ParseFromString(data.client_id)
|
||||
except DecodeError as e:
|
||||
raise DecodeError(f"Failed to parse VMP data as FileHashes, {e}")
|
||||
|
||||
new_vmp_data = data.vmp.SerializeToString()
|
||||
if client_id.vmp_data and client_id.vmp_data != new_vmp_data:
|
||||
logging.getLogger("migrate").warning("Client ID already has Verified Media Path data")
|
||||
client_id.vmp_data = new_vmp_data
|
||||
data.client_id = client_id.SerializeToString()
|
||||
|
||||
try:
|
||||
data = _Structures.v2.build(data)
|
||||
except ConstructError as e:
|
||||
raise ValueError(f"Migration failed, {e}")
|
||||
|
||||
try:
|
||||
return cls.loads(data)
|
||||
except ConstructError as e:
|
||||
raise ValueError(f"Device Data seems to be corrupt or invalid, or migration failed, {e}")
|
||||
|
||||
|
||||
__ALL__ = (Device,)
|
||||
|
||||
@@ -1,14 +1,18 @@
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from zlib import crc32
|
||||
|
||||
import click
|
||||
import requests
|
||||
from construct import ConstructError
|
||||
from unidecode import unidecode, UnidecodeError
|
||||
|
||||
from pywidevine import __version__
|
||||
from pywidevine.cdm import Cdm
|
||||
from pywidevine.device import Device
|
||||
from pywidevine.license_protocol_pb2 import LicenseType
|
||||
from pywidevine.license_protocol_pb2 import LicenseType, FileHashes
|
||||
|
||||
|
||||
@click.group(invoke_without_command=True)
|
||||
@@ -60,15 +64,13 @@ def license_(device: Path, pssh: str, server: str, type_: str, raw: bool, privac
|
||||
"""
|
||||
log = logging.getLogger("license")
|
||||
|
||||
type_ = LicenseType.Value(type_)
|
||||
|
||||
# load device
|
||||
device = Device.load(device)
|
||||
log.info(f"[+] Loaded Device ({device.system_id} L{device.security_level})")
|
||||
log.debug(device)
|
||||
|
||||
# load cdm
|
||||
cdm = Cdm(device, pssh, type_, raw)
|
||||
cdm = Cdm(device, pssh, raw)
|
||||
log.info(f"[+] Loaded CDM with PSSH: {pssh}")
|
||||
log.debug(cdm)
|
||||
|
||||
@@ -87,7 +89,8 @@ def license_(device: Path, pssh: str, server: str, type_: str, raw: bool, privac
|
||||
log.debug(service_cert)
|
||||
|
||||
# get license challenge
|
||||
challenge = cdm.get_license_challenge(privacy_mode=True)
|
||||
license_type = LicenseType.Value(type_)
|
||||
challenge = cdm.get_license_challenge(license_type, privacy_mode=True)
|
||||
log.info("[+] Created License Request Message (Challenge)")
|
||||
log.debug(challenge)
|
||||
|
||||
@@ -114,8 +117,10 @@ def license_(device: Path, pssh: str, server: str, type_: str, raw: bool, privac
|
||||
|
||||
@main.command()
|
||||
@click.argument("device", type=Path)
|
||||
@click.option("-p", "--privacy", is_flag=True, default=False,
|
||||
help="Use Privacy Mode, off by default.")
|
||||
@click.pass_context
|
||||
def test(ctx: click.Context, device: Path):
|
||||
def test(ctx: click.Context, device: Path, privacy: bool):
|
||||
"""
|
||||
Test the CDM code by getting Content Keys for Bitmovin's Art of Motion example.
|
||||
https://bitmovin.com/demos/drm
|
||||
@@ -149,5 +154,108 @@ def test(ctx: click.Context, device: Path):
|
||||
pssh=pssh,
|
||||
server=license_server,
|
||||
type_=LicenseType.Name(license_type),
|
||||
raw=raw
|
||||
raw=raw,
|
||||
privacy=privacy
|
||||
)
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.option("-t", "--type", "type_", type=click.Choice([x.name for x in Device.Types], case_sensitive=False),
|
||||
required=True, help="Device Type")
|
||||
@click.option("-l", "--level", type=click.IntRange(1, 3), required=True, help="Device Security Level")
|
||||
@click.option("-k", "--key", type=Path, required=True, help="Device RSA Private Key in PEM or DER format")
|
||||
@click.option("-c", "--client_id", type=Path, required=True, help="Widevine ClientIdentification Blob file")
|
||||
@click.option("-v", "--vmp", type=Path, required=True, help="Widevine FileHashes Blob file")
|
||||
@click.option("-o", "--output", type=Path, default=None, help="Output Directory")
|
||||
@click.pass_context
|
||||
def create_device(
|
||||
ctx: click.Context,
|
||||
type_: str,
|
||||
level: int,
|
||||
key: Path,
|
||||
client_id: Path,
|
||||
vmp: Optional[Path] = None,
|
||||
output: Optional[Path] = None
|
||||
) -> None:
|
||||
"""
|
||||
Create a Widevine Device (.wvd) file from an RSA Private Key (PEM or DER) and Client ID Blob.
|
||||
Optionally also a VMP (Verified Media Path) Blob, which will be stored in the Client ID.
|
||||
"""
|
||||
if not key.is_file():
|
||||
raise click.UsageError("key: Not a path to a file, or it doesn't exist.", ctx)
|
||||
if not client_id.is_file():
|
||||
raise click.UsageError("client_id: Not a path to a file, or it doesn't exist.", ctx)
|
||||
if vmp and not vmp.is_file():
|
||||
raise click.UsageError("vmp: Not a path to a file, or it doesn't exist.", ctx)
|
||||
|
||||
log = logging.getLogger("create-device")
|
||||
|
||||
device = Device(
|
||||
type_=Device.Types[type_.upper()],
|
||||
security_level=level,
|
||||
flags=None,
|
||||
private_key=key.read_bytes(),
|
||||
client_id=client_id.read_bytes()
|
||||
)
|
||||
|
||||
if vmp:
|
||||
new_vmp_data = vmp.read_bytes()
|
||||
if device.client_id.vmp_data and device.client_id.vmp_data != new_vmp_data:
|
||||
log.warning("Client ID already has Verified Media Path data")
|
||||
device.client_id.vmp_data = new_vmp_data
|
||||
|
||||
client_info = {}
|
||||
for entry in device.client_id.client_info:
|
||||
client_info[entry.name] = entry.value
|
||||
|
||||
wvd_bin = device.dumps()
|
||||
|
||||
name = f"{client_info['company_name']} {client_info['model_name']}"
|
||||
if client_info.get("widevine_cdm_version"):
|
||||
name += f" {client_info['widevine_cdm_version']}"
|
||||
name += f" {crc32(wvd_bin).to_bytes(4, 'big').hex()}"
|
||||
|
||||
try:
|
||||
name = unidecode(name.strip().lower().replace(" ", "_"))
|
||||
except UnidecodeError as e:
|
||||
raise click.ClickException(f"Failed to sanitize name, {e}")
|
||||
|
||||
out_path = (output or Path.cwd()) / f"{name}_{device.system_id}_l{device.security_level}.wvd"
|
||||
out_path.write_bytes(wvd_bin)
|
||||
|
||||
log.info(f"Created Widevine Device (.wvd) file, {out_path.name}")
|
||||
log.info(f" + Type: {device.type.name}")
|
||||
log.info(f" + System ID: {device.system_id}")
|
||||
log.info(f" + Security Level: {device.security_level}")
|
||||
log.info(f" + Flags: {device.flags}")
|
||||
log.info(f" + Private Key: {bool(device.private_key)} ({device.private_key.size_in_bits()} bit)")
|
||||
log.info(f" + Client ID: {bool(device.client_id)} ({len(device.client_id.SerializeToString())} bytes)")
|
||||
if device.client_id.vmp_data:
|
||||
file_hashes_ = FileHashes()
|
||||
file_hashes_.ParseFromString(device.client_id.vmp_data)
|
||||
log.info(f" + VMP: True ({len(file_hashes_.signatures)} signatures)")
|
||||
else:
|
||||
log.info(f" + VMP: False")
|
||||
log.info(f" + Saved to: {out_path.absolute()}")
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.argument("device", type=Path)
|
||||
@click.pass_context
|
||||
def migrate(ctx: click.Context, device: Path) -> None:
|
||||
"""Upgrade from earlier versions of the Widevine Device (.wvd) format."""
|
||||
if not device.is_file():
|
||||
raise click.UsageError("device: Not a path to a file, or it doesn't exist.", ctx)
|
||||
|
||||
log = logging.getLogger("migrate")
|
||||
|
||||
try:
|
||||
new_device = Device.migrate(device.read_bytes())
|
||||
except (ConstructError, ValueError) as e:
|
||||
raise click.UsageError(str(e), ctx)
|
||||
|
||||
# save
|
||||
log.debug(new_device)
|
||||
new_device.dump(device)
|
||||
|
||||
log.info("Successfully migrated the Widevine Device (.wvd) file!")
|
||||
|
||||
Reference in New Issue
Block a user