31 Commits

Author SHA1 Message Date
rlaphoenix
e1951d20d0 Update Changelog for v1.1.0 2022-07-21 17:32:14 +01:00
rlaphoenix
35abd2962f Bump to v1.1.0 2022-07-21 17:32:06 +01:00
rlaphoenix
b262e115d3 Add ability to use Privacy mode on test command 2022-07-21 17:28:04 +01:00
rlaphoenix
95982725c3 Cdm: Support providing Service Cert as any 3 schemas
Some service's might provide the Service Certificate as a SignedDrmCertificate instead of a SignedMessage so I added support for supplying such format certificates. I also added support for supplying a DrmCertificate directly, though it's unlikely for a service to provide it raw without a signature like that.

The Service Certificate is now also stored as just the DrmCertificate internally, as it will not be using the signature.
2022-07-21 17:26:14 +01:00
rlaphoenix
70e79825b3 Device: Re-use magic reference across Structures 2022-07-21 16:23:19 +01:00
rlaphoenix
f2174dfa72 Device: Blank flags on v1 WVDs when migrating
This flag was technically used before this project and to ensure it will be unused and ready for safe use in this project and on v3 (if/when), we should blank the flags.
2022-07-21 16:21:22 +01:00
rlaphoenix
fe21bfe88c Fix migrate cmd's error handling, missing ValueError catching 2022-07-21 16:20:22 +01:00
rlaphoenix
93f70f73c2 Device: Fix header structure, should not be a constant 1 2022-07-21 16:19:52 +01:00
rlaphoenix
1442c945cc Move Migration Code to Device.migrate()
Also now more effectively migrates using the v1 Structure data.

Also fixes the migration error of possibly leaving behind VMP data. Will warn you if VMP data is already in the Client ID (if its different).
2022-07-21 16:10:42 +01:00
rlaphoenix
a729648a34 Device: Reference Structures class within Device class 2022-07-21 15:49:17 +01:00
rlaphoenix
3d6ddb8dcd Device: Remove explicit deprecated key control flag
This flag was used before this project was made, never after. Therefore I do not need to actually take this flag slot as deprecated from the get-go.
2022-07-21 15:48:45 +01:00
rlaphoenix
b41f09bee4 Device: Add v1 Structure for Migration 2022-07-21 15:42:17 +01:00
rlaphoenix
db80776ac0 Device: Move the structure under a Structures class 2022-07-21 15:40:46 +01:00
rlaphoenix
02ca1b00c9 Fix year on release dates in Changelog 2022-07-21 14:08:07 +01:00
rlaphoenix
7b06a3c053 Add cmd to migrate older .wvd files to v2 2022-07-21 13:58:41 +01:00
rlaphoenix
14126c67b1 Remove doc-string about non-existent name argument 2022-07-21 13:34:12 +01:00
rlaphoenix
5e93d6321d Add cmd to create a new .wvd device file
It even adds VMP data to the Client ID blob directly (instead of storing possibly duplicated). It will warn you if the Client ID already had VMP data there.

The filename is generated from client id information and has a crc32 checksum to help avoid with conflicts.
The output directory is the current working directory. You can set the directory with -o/--output.
2022-07-21 13:32:13 +01:00
rlaphoenix
1f389dbab9 Device: Fix typo on type_ in dump() 2022-07-21 13:09:53 +01:00
rlaphoenix
ac4c8affb0 deps: Add unidecode 2022-07-21 13:09:33 +01:00
rlaphoenix
f34018afe9 Bump to v1.0.1 2022-07-21 01:38:02 +01:00
rlaphoenix
8298703601 Add Changelog 2022-07-21 01:37:25 +01:00
rlaphoenix
e20f251aae Cdm: Simplify Session ID
The whole Session ID based on some weird half buggy reverse engineering is completely unnecessary.
2022-07-21 01:33:09 +01:00
rlaphoenix
a55aeb8cce docs: Remove outdated comment about Sessions
Sessions are technically implemented in a non-singleton approach, and the issue that provoked this comment has since been fixed (context data <-> license mismatch).
2022-07-21 01:19:10 +01:00
rlaphoenix
23165f92de Cdm: Fix bug where context data may not correspond to the right license
We are using a trick with the request_id to be able to add an identifier between get_license_challenge() and parse_license() without any middleman data needing to be passed by the user.

Otherwise the user would need to either create the context data themselves after get_license_challenge() and pass it to get_license(), or something that is similar at its core to that.
2022-07-21 01:12:28 +01:00
rlaphoenix
68db728bf0 Cdm: Only store license request's context data
This reduces the amount of data needing to be stored, but also simplifies the key derivation.
2022-07-20 22:25:57 +01:00
rlaphoenix
53f7c1dd62 Cdm: Fix size and Improve code of Android Session IDs 2022-07-20 21:40:40 +01:00
rlaphoenix
e9e65e5760 Update license cmd to move license type from Cdm to get_license_challenge() 2022-07-20 21:11:01 +01:00
rlaphoenix
909e83c199 Cdm: Return a DecodeError in set_service_certificate
This is to match with the doc-string stating it returns a DecodeError on failure.
2022-07-20 20:55:08 +01:00
rlaphoenix
2bb5c9e0b5 Cdm: Remove unnecessary raw class instance variable 2022-07-20 20:37:04 +01:00
rlaphoenix
7f60844ee1 Cdm: Move License Type from constructor to get_license_challenge() 2022-07-20 20:36:17 +01:00
rlaphoenix
59615dd804 Add more information to poetry in pyproject 2022-07-20 15:03:37 +01:00
8 changed files with 384 additions and 103 deletions

66
CHANGELOG.md Normal file
View File

@@ -0,0 +1,66 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.1.0] - 2022-07-21
### Added
- Added support for setting a Service Certificate in SignedDrmCertificate form as well as raw DrmCertificate form.
However, It's unlikely for the service to provide the certificate in raw DrmCertificate form without a signature.
- Added a CLI command `create-device` to create Widevine Device (`.wvd`) files from RSA PEM/DER Private Keys and
Client ID blobs. You can also provide VMP (FileHashes) data which will be merged into the Client ID blob.
- Added a CLI command `migrate` that uses `Device.migrate()` and `dump()` to migrate older v1 Widevine Device files
to v2.
- Added the v1 Structure of Widevine Devices for migration use.
- Added `Device.migrate()` class method that effectively loads older format WVD data. You can then use `dumps()` to
get back the WVD data in the latest supported format.
- Added ability to use Privacy mode on the test command.
### Changed
- Set Service Certificates are now stored as the raw underlying DrmCertificate as the signature data is unused by
the CDM.
- Moved all Widevine Device structures under a Structures class.
- I removed the `send_key_control_nonce` flag from all Structures even though it was technically used.
This is because the flag was never used as of this project, and I do not want to take up the flag slot.
### Fixed
- Devices `dump()` function now uses the correct `type_` parameter when building the struct.
- Fixed release date year of v1.0.0 and v1.0.1 in the changelog.
## [1.0.1] - 2022-07-21
### Added
- More information to the PyPI meta information, e.g., classifiers, readme, some URLs.
### Changed
- Moved the License Type parameter from the Cdm constructor to `get_license_challenge()`.
- The Session ID is no longer used as the Request ID which could help with blocks or replay checks due
to it being the same Session ID for each request. It's now a random 16 byte value each time.
- Only the Context Data of each license request is now stored instead of the full message.
### Removed
- Removed unnecessary and unused `raw` Cdm class instance variable.
### Fixed
- CDMs `set_service_certificate()` now correctly raises a DecodeError on Decode Error instead of a ValueError.
- Context Data will now always match to their corresponding License Responses. This fixes an issue where creating
a second challenge would overwrite the context data of the first challenge. Parsing the first challenge after
would result in either a key decrypt error, or garbage key data.
## [1.0.0] - 2022-07-20
Initial Release.
[1.1.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.1.0
[1.0.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.0.1
[1.0.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.0.0

View File

@@ -17,9 +17,6 @@ Widevine CDM (Content Decryption Module) implementation in Python.
*Credit*: w3.org *Credit*: w3.org
Almost the entire Widevine Protocol seen above is implemented except for Sessions which I will be implementing soon.
Currently, only one session can be made and used at a time or problems will happen.
### Web Server ### Web Server
This may be an API/Server in front of a License Server. For example, Netflix's Custom MSL-based API front. This may be an API/Server in front of a License Server. For example, Netflix's Custom MSL-based API front.

14
poetry.lock generated
View File

@@ -137,6 +137,14 @@ category = "main"
optional = false optional = false
python-versions = ">=3.7" python-versions = ">=3.7"
[[package]]
name = "unidecode"
version = "1.3.4"
description = "ASCII transliterations of Unicode text"
category = "main"
optional = false
python-versions = ">=3.5"
[[package]] [[package]]
name = "urllib3" name = "urllib3"
version = "1.26.10" version = "1.26.10"
@@ -165,7 +173,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-
[metadata] [metadata]
lock-version = "1.1" lock-version = "1.1"
python-versions = ">=3.7,<3.11" python-versions = ">=3.7,<3.11"
content-hash = "9c6a76629e0f0a4e98b6c47707899519f930debd70312d2e909eb42f94cd212f" content-hash = "0720732d3b990a11e3b35f4604103364ae5654d1ac6e31ead17c03e566a016d0"
[metadata.files] [metadata.files]
certifi = [] certifi = []
@@ -195,5 +203,9 @@ pycryptodome = []
pymp4 = [] pymp4 = []
requests = [] requests = []
typing-extensions = [] typing-extensions = []
unidecode = [
{file = "Unidecode-1.3.4-py3-none-any.whl", hash = "sha256:afa04efcdd818a93237574791be9b2817d7077c25a068b00f8cff7baa4e59257"},
{file = "Unidecode-1.3.4.tar.gz", hash = "sha256:8e4352fb93d5a735c788110d2e7ac8e8031eb06ccbfe8d324ab71735015f9342"},
]
urllib3 = [] urllib3 = []
zipp = [] zipp = []

View File

@@ -4,10 +4,26 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "pywidevine" name = "pywidevine"
version = "1.0.0" version = "1.1.0"
description = "Widevine CDM (Content Decryption Module) implementation in Python." description = "Widevine CDM (Content Decryption Module) implementation in Python."
authors = ["rlaphoenix <rlaphoenix@pm.me>"] authors = ["rlaphoenix <rlaphoenix@pm.me>"]
license = "GPL-3.0-only" license = "GPL-3.0-only"
readme = "README.md"
repository = "https://github.com/rlaphoenix/pywidevine"
keywords = ["widevine", "drm", "google"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"Intended Audience :: End Users/Desktop",
"Natural Language :: English",
"Operating System :: OS Independent",
"Topic :: Multimedia :: Video",
"Topic :: Security :: Cryptography"
]
[tool.poetry.urls]
"Bug Tracker" = "https://github.com/rlaphoenix/pywidevine/issues"
"Forums" = "https://github.com/rlaphoenix/pywidevine/discussions"
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = ">=3.7,<3.11" python = ">=3.7,<3.11"
@@ -17,6 +33,7 @@ pycryptodome = "^3.15.0"
click = "^8.1.3" click = "^8.1.3"
requests = "^2.28.1" requests = "^2.28.1"
lxml = "^4.9.1" lxml = "^4.9.1"
Unidecode = "^1.3.4"
[tool.poetry.scripts] [tool.poetry.scripts]
pywidevine = "pywidevine.main:main" pywidevine = "pywidevine.main:main"

View File

@@ -1 +1 @@
__version__ = "1.0.0" __version__ = "1.1.0"

View File

@@ -45,13 +45,7 @@ class Cdm:
NUM_OF_SESSIONS = 0 NUM_OF_SESSIONS = 0
MAX_NUM_OF_SESSIONS = 50 # most common limit MAX_NUM_OF_SESSIONS = 50 # most common limit
def __init__( def __init__(self, device: Device, pssh: Union[Container, bytes, str], raw: bool = False):
self,
device: Device,
pssh: Union[Container, bytes, str],
license_type: LicenseType = LicenseType.STREAMING,
raw: bool = False
):
""" """
Open a Widevine Content Decryption Module (CDM) session. Open a Widevine Content Decryption Module (CDM) session.
@@ -60,8 +54,6 @@ class Cdm:
more device-specific information. more device-specific information.
pssh: Protection System Specific Header Box or Init Data. This should be a pssh: Protection System Specific Header Box or Init Data. This should be a
compliant mp4 pssh box, or just the init data (Widevine Cenc Header). compliant mp4 pssh box, or just the init data (Widevine Cenc Header).
license_type: Type of License you wish to exchange, often `STREAMING`.
The `OFFLINE` Licenses are for Offline licensing of Downloaded content.
raw: This should be set to True if the PSSH data provided is arbitrary data. raw: This should be set to True if the PSSH data provided is arbitrary data.
E.g., a PSSH Box where the init data is not a Widevine Cenc Header, or E.g., a PSSH Box where the init data is not a Widevine Cenc Header, or
is simply arbitrary data. is simply arbitrary data.
@@ -70,10 +62,6 @@ class Cdm:
The limit is different for each device and security level, most commonly 50. The limit is different for each device and security level, most commonly 50.
This limit is handled by the OEM Crypto API. Multiple sessions can be open at This limit is handled by the OEM Crypto API. Multiple sessions can be open at
a time and sessions should be closed when no longer needed. a time and sessions should be closed when no longer needed.
If an API or System requests a Widevine Session ID, it is best to provide it
the real Session ID created here (self.session_id) instead of an arbitrary or
random value.
""" """
if not device: if not device:
raise ValueError("A Widevine Device must be provided.") raise ValueError("A Widevine Device must be provided.")
@@ -90,18 +78,16 @@ class Cdm:
self.device = device self.device = device
self.init_data = pssh self.init_data = pssh
self.license_type = license_type
self.raw = raw
if not self.raw: if not raw:
# we only want the init_data of the pssh box # we only want the init_data of the pssh box
self.init_data = PSSH.get_as_box(pssh).init_data self.init_data = PSSH.get_as_box(pssh).init_data
self.session_id = self.create_session_id(self.device) self.session_id = get_random_bytes(16)
self.service_certificate: Optional[SignedMessage] = None self.service_certificate: Optional[DrmCertificate] = None
self.license_request: Optional[SignedMessage] = None self.context: dict[bytes, tuple[bytes, bytes]] = {}
def set_service_certificate(self, certificate: Union[bytes, str]) -> SignedMessage: def set_service_certificate(self, certificate: Union[bytes, str]) -> DrmCertificate:
""" """
Set a Service Privacy Certificate for Privacy Mode. (optional but recommended) Set a Service Privacy Certificate for Privacy Mode. (optional but recommended)
@@ -110,7 +96,7 @@ class Cdm:
Some services have their own, but most use the common privacy cert, Some services have their own, but most use the common privacy cert,
(common_privacy_cert). (common_privacy_cert).
Returns the parsed Signed Message if successful, otherwise raises a DecodeError. Returns the parsed Drm Certificate if successful, otherwise raises a DecodeError.
The Service Certificate is used to encrypt Client IDs in Licenses. This is also The Service Certificate is used to encrypt Client IDs in Licenses. This is also
known as Privacy Mode and may be required for some services or for some devices. known as Privacy Mode and may be required for some services or for some devices.
@@ -120,25 +106,64 @@ class Cdm:
certificate = base64.b64decode(certificate) # assuming base64 certificate = base64.b64decode(certificate) # assuming base64
signed_message = SignedMessage() signed_message = SignedMessage()
try: signed_drm_certificate = SignedDrmCertificate()
drm_certificate = DrmCertificate()
# Note: A secure CDM would likely reject any Service Certificate that is
# not either a SignedMessage or a SignedDrmCertificate. This is because
# the DrmCertificate itself is not signed. This CDM does not verify the
# signatures as I'm not sure what HMAC key is used. At this stage of the
# CDM flow, we wouldn't have any mac_keys, and those might not work for
# verifying service certificate signatures (likely not).
# All these 3 schemas can sort of parse each other in a minimal buggy way,
# so we have to parse down the full chain instead of relaying each step
try: # SignedMessage input
signed_message.ParseFromString(certificate) signed_message.ParseFromString(certificate)
signed_drm_certificate.ParseFromString(signed_message.msg)
if not signed_drm_certificate.drm_certificate:
raise DecodeError()
drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate)
self.service_certificate = drm_certificate
return self.service_certificate
except DecodeError: except DecodeError:
raise ValueError("Could not parse certificate as a Signed Message.") pass
self.service_certificate = signed_message try: # SignedDrmCertificate input
return signed_message signed_drm_certificate.ParseFromString(certificate)
if not signed_drm_certificate.drm_certificate:
raise DecodeError()
drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate)
self.service_certificate = drm_certificate
return self.service_certificate
except DecodeError:
pass
def get_license_challenge(self, privacy_mode: bool = True) -> bytes: try: # DrmCertificate input
drm_certificate.ParseFromString(certificate)
self.service_certificate = drm_certificate
return self.service_certificate
except DecodeError:
pass
raise DecodeError("Could not parse certificate as a Service Certificate")
def get_license_challenge(self, type_: LicenseType = LicenseType.STREAMING, privacy_mode: bool = True) -> bytes:
""" """
Get a License Challenge to send to a License Server. Get a License Challenge to send to a License Server.
Parameters: Parameters:
type_: Type of License you wish to exchange, often `STREAMING`.
The `OFFLINE` Licenses are for Offline licensing of Downloaded content.
privacy_mode: Encrypt the Client ID using the Privacy Certificate. If the privacy_mode: Encrypt the Client ID using the Privacy Certificate. If the
privacy certificate is not set yet, this does nothing. privacy certificate is not set yet, this does nothing.
Returns a SignedMessage containing a LicenseRequest message. It's signed with Returns a SignedMessage containing a LicenseRequest message. It's signed with
the Private Key of the device provision. the Private Key of the device provision.
""" """
request_id = get_random_bytes(16)
license_request = LicenseRequest() license_request = LicenseRequest()
license_request.type = LicenseRequest.RequestType.Value("NEW") license_request.type = LicenseRequest.RequestType.Value("NEW")
license_request.request_time = int(time.time()) license_request.request_time = int(time.time())
@@ -146,8 +171,8 @@ class Cdm:
license_request.key_control_nonce = random.randrange(1, 2 ** 31) license_request.key_control_nonce = random.randrange(1, 2 ** 31)
license_request.content_id.widevine_pssh_data.pssh_data.append(self.init_data) license_request.content_id.widevine_pssh_data.pssh_data.append(self.init_data)
license_request.content_id.widevine_pssh_data.license_type = self.license_type license_request.content_id.widevine_pssh_data.license_type = type_
license_request.content_id.widevine_pssh_data.request_id = self.session_id license_request.content_id.widevine_pssh_data.request_id = request_id
if self.service_certificate and privacy_mode: if self.service_certificate and privacy_mode:
# encrypt the client id for privacy mode # encrypt the client id for privacy mode
@@ -166,17 +191,11 @@ class Cdm:
new(self.device.private_key). \ new(self.device.private_key). \
sign(SHA1.new(license_message.msg)) sign(SHA1.new(license_message.msg))
# store it for later, we need it for deriving keys when parsing a license self.context[request_id] = self.derive_context(license_message.msg)
self.license_request = license_message
return license_message.SerializeToString() return license_message.SerializeToString()
def parse_license(self, license_message: Union[bytes, str]) -> list[Key]: def parse_license(self, license_message: Union[bytes, str]) -> list[Key]:
# TODO: What if the CDM generates a 2nd challenge, overwriting the previous making it mismatch
# for deriving keys? We need to make self.license_request always match the license_message
if not self.license_request:
raise ValueError("Cannot parse a license message without first making a license request")
if not license_message: if not license_message:
raise ValueError("Cannot parse an empty license_message as a SignedMessage") raise ValueError("Cannot parse an empty license_message as a SignedMessage")
@@ -195,11 +214,15 @@ class Cdm:
licence = License() licence = License()
licence.ParseFromString(license_message.msg) licence.ParseFromString(license_message.msg)
context = self.context[licence.id.request_id]
if not context:
raise ValueError("Cannot parse a license message without first making a license request")
session_key = PKCS1_OAEP. \ session_key = PKCS1_OAEP. \
new(self.device.private_key). \ new(self.device.private_key). \
decrypt(license_message.session_key) decrypt(license_message.session_key)
enc_key, mac_key_server, mac_key_client = self.derive_keys(self.license_request.msg, session_key) enc_key, mac_key_server, mac_key_client = self.derive_keys(*context, session_key)
license_signature = HMAC. \ license_signature = HMAC. \
new(mac_key_server, digestmod=SHA256). \ new(mac_key_server, digestmod=SHA256). \
@@ -262,25 +285,10 @@ class Cdm:
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
raise subprocess.SubprocessError(f"Failed to Decrypt! Shaka Packager Error: {e}") raise subprocess.SubprocessError(f"Failed to Decrypt! Shaka Packager Error: {e}")
def create_session_id(self, device: Device) -> bytes:
"""Create a Session ID based on OEM Crypto API session values."""
if device.type == device.Types.ANDROID:
session_id = "{hex:16X}{counter}".format(
hex=random.getrandbits(64),
counter=f"{self.NUM_OF_SESSIONS:02}"
)
session_id.ljust(32, "0")
return session_id.encode("ascii")
if device.type == device.Types.CHROME:
return get_random_bytes(16)
raise ValueError(f"Device Type {device.type.name} is not implemented")
@staticmethod @staticmethod
def encrypt_client_id( def encrypt_client_id(
client_id: ClientIdentification, client_id: ClientIdentification,
service_certificate: Union[SignedMessage, SignedDrmCertificate, DrmCertificate], service_certificate: DrmCertificate,
key: bytes = None, key: bytes = None,
iv: bytes = None iv: bytes = None
) -> EncryptedClientIdentification: ) -> EncryptedClientIdentification:
@@ -288,18 +296,8 @@ class Cdm:
privacy_key = key or get_random_bytes(16) privacy_key = key or get_random_bytes(16)
privacy_iv = iv or get_random_bytes(16) privacy_iv = iv or get_random_bytes(16)
if isinstance(service_certificate, SignedMessage):
signed_service_certificate = SignedDrmCertificate()
signed_service_certificate.ParseFromString(service_certificate.msg)
service_certificate = signed_service_certificate
if isinstance(service_certificate, SignedDrmCertificate):
service_service_drm_certificate = DrmCertificate()
service_service_drm_certificate.ParseFromString(service_certificate.drm_certificate)
service_certificate = service_service_drm_certificate
if not isinstance(service_certificate, DrmCertificate): if not isinstance(service_certificate, DrmCertificate):
raise ValueError(f"Service Certificate is in an unexpected type {service_certificate!r}") raise ValueError(f"Expecting Service Certificate to be a DrmCertificate, not {service_certificate!r}")
enc_client_id = EncryptedClientIdentification() enc_client_id = EncryptedClientIdentification()
enc_client_id.provider_id = service_certificate.provider_id enc_client_id.provider_id = service_certificate.provider_id
@@ -317,7 +315,23 @@ class Cdm:
return enc_client_id return enc_client_id
@staticmethod @staticmethod
def derive_keys(msg: bytes, key: bytes) -> tuple[bytes, bytes, bytes]: def derive_context(message: bytes) -> tuple[bytes, bytes]:
"""Returns 2 Context Data used for computing the AES Encryption and HMAC Keys."""
def _get_enc_context(msg: bytes) -> bytes:
label = b"ENCRYPTION"
key_size = 16 * 8 # 128-bit
return label + b"\x00" + msg + key_size.to_bytes(4, "big")
def _get_mac_context(msg: bytes) -> bytes:
label = b"AUTHENTICATION"
key_size = 32 * 8 * 2 # 512-bit
return label + b"\x00" + msg + key_size.to_bytes(4, "big")
return _get_enc_context(message), _get_mac_context(message)
@staticmethod
def derive_keys(enc_context: bytes, mac_context: bytes, key: bytes) -> tuple[bytes, bytes, bytes]:
""" """
Returns 3 keys derived from the input message. Returns 3 keys derived from the input message.
Key can either be a pre-provision device aes key, provision key, or a session key. Key can either be a pre-provision device aes key, provision key, or a session key.
@@ -338,24 +352,11 @@ class Cdm:
keys and verify licenses. keys and verify licenses.
""" """
def get_enc_context(message: bytes) -> bytes:
label = b"ENCRYPTION"
key_size = 16 * 8 # 128-bit
return label + b"\x00" + message + key_size.to_bytes(4, "big")
def get_mac_context(message: bytes) -> bytes:
label = b"AUTHENTICATION"
key_size = 32 * 8 * 2 # 512-bit
return label + b"\x00" + message + key_size.to_bytes(4, "big")
def _derive(session_key: bytes, context: bytes, counter: int) -> bytes: def _derive(session_key: bytes, context: bytes, counter: int) -> bytes:
return CMAC.new(session_key, ciphermod=AES). \ return CMAC.new(session_key, ciphermod=AES). \
update(counter.to_bytes(1, "big") + context). \ update(counter.to_bytes(1, "big") + context). \
digest() digest()
enc_context = get_enc_context(msg)
mac_context = get_mac_context(msg)
enc_key = _derive(key, enc_context, 1) enc_key = _derive(key, enc_context, 1)
mac_key_server = _derive(key, mac_context, 1) mac_key_server = _derive(key, mac_context, 1)
mac_key_server += _derive(key, mac_context, 2) mac_key_server += _derive(key, mac_context, 2)

View File

@@ -1,13 +1,14 @@
from __future__ import annotations from __future__ import annotations
import base64 import base64
import logging
from enum import Enum from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Any, Optional, Union from typing import Any, Optional, Union
from construct import BitStruct, Bytes, Const from construct import BitStruct, Bytes, Const, ConstructError, Container
from construct import Enum as CEnum from construct import Enum as CEnum
from construct import Flag, Int8ub, Int16ub from construct import Int8ub, Int16ub
from construct import Optional as COptional from construct import Optional as COptional
from construct import Padded, Padding, Struct, this from construct import Padded, Padding, Struct, this
from Crypto.PublicKey import RSA from Crypto.PublicKey import RSA
@@ -21,12 +22,16 @@ class _Types(Enum):
ANDROID = 2 ANDROID = 2
class Device: class _Structures:
# needed so bin_format can enumerate the types magic = Const(b"WVD")
Types = _Types
bin_format = Struct( header = Struct(
"signature" / Const(b"WVD"), "signature" / magic,
"version" / Int8ub
)
v2 = Struct(
"signature" / magic,
"version" / Const(Int8ub, 2), "version" / Const(Int8ub, 2),
"type_" / CEnum( "type_" / CEnum(
Int8ub, Int8ub,
@@ -34,8 +39,8 @@ class Device:
), ),
"security_level" / Int8ub, "security_level" / Int8ub,
"flags" / Padded(1, COptional(BitStruct( "flags" / Padded(1, COptional(BitStruct(
Padding(7), # no per-device flags yet
"send_key_control_nonce" / Flag # deprecated, do not use Padding(8)
))), ))),
"private_key_len" / Int16ub, "private_key_len" / Int16ub,
"private_key" / Bytes(this.private_key_len), "private_key" / Bytes(this.private_key_len),
@@ -43,6 +48,32 @@ class Device:
"client_id" / Bytes(this.client_id_len) "client_id" / Bytes(this.client_id_len)
) )
v1 = Struct(
"signature" / magic,
"version" / Const(Int8ub, 1),
"type_" / CEnum(
Int8ub,
**{t.name: t.value for t in _Types}
),
"security_level" / Int8ub,
"flags" / Padded(1, COptional(BitStruct(
# no per-device flags yet
Padding(8)
))),
"private_key_len" / Int16ub,
"private_key" / Bytes(this.private_key_len),
"client_id_len" / Int16ub,
"client_id" / Bytes(this.client_id_len),
"vmp_len" / Int16ub,
"vmp" / Bytes(this.vmp_len)
)
class Device:
Types = _Types
Structures = _Structures
supported_structure = Structures.v2
# == Bin Format Revisions == # # == Bin Format Revisions == #
# Version 2: Removed vmp and vmp_len as it should already be within the Client ID # Version 2: Removed vmp and vmp_len as it should already be within the Client ID
# Version 1: Removed system_id as it can be retrieved from the Client ID's DRM Certificate # Version 1: Removed system_id as it can be retrieved from the Client ID's DRM Certificate
@@ -109,20 +140,20 @@ class Device:
data = base64.b64decode(data) data = base64.b64decode(data)
if not isinstance(data, bytes): if not isinstance(data, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}") raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
return cls(**cls.bin_format.parse(data)) return cls(**cls.supported_structure.parse(data))
@classmethod @classmethod
def load(cls, path: Union[Path, str]) -> Device: def load(cls, path: Union[Path, str]) -> Device:
if not isinstance(path, (Path, str)): if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}") raise ValueError(f"Expecting Path object or path string, got {path!r}")
with Path(path).open(mode="rb") as f: with Path(path).open(mode="rb") as f:
return cls(**cls.bin_format.parse_stream(f)) return cls(**cls.supported_structure.parse_stream(f))
def dumps(self) -> bytes: def dumps(self) -> bytes:
private_key = self.private_key.export_key("DER") if self.private_key else None private_key = self.private_key.export_key("DER") if self.private_key else None
return self.bin_format.build(dict( return self.supported_structure.build(dict(
version=2, version=2,
type=self.type.value, type_=self.type.value,
security_level=self.security_level, security_level=self.security_level,
flags=self.flags, flags=self.flags,
private_key_len=len(private_key) if private_key else 0, private_key_len=len(private_key) if private_key else 0,
@@ -138,5 +169,54 @@ class Device:
path.parent.mkdir(parents=True, exist_ok=True) path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(self.dumps()) path.write_bytes(self.dumps())
@classmethod
def migrate(cls, data: Union[bytes, str]) -> Device:
if isinstance(data, str):
data = base64.b64decode(data)
if not isinstance(data, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
header = _Structures.header.parse(data)
if header.version == 2:
raise ValueError("Device Data is already migrated to the latest version.")
if header.version == 0 or header.version > 2:
# we have never used version 0, likely data that just so happened to use the WVD magic
raise ValueError("Device Data does not seem to be a WVD file (v0).")
if header.version == 1: # v1 to v2
data = _Structures.v1.parse(data)
data.version = 2 # update version to 2 to allow loading
data.flags = Container() # blank flags that may have been used in v1
vmp = FileHashes()
if data.vmp:
try:
vmp.ParseFromString(data.vmp)
except DecodeError as e:
raise DecodeError(f"Failed to parse VMP data as FileHashes, {e}")
data.vmp = vmp
client_id = ClientIdentification()
try:
client_id.ParseFromString(data.client_id)
except DecodeError as e:
raise DecodeError(f"Failed to parse VMP data as FileHashes, {e}")
new_vmp_data = data.vmp.SerializeToString()
if client_id.vmp_data and client_id.vmp_data != new_vmp_data:
logging.getLogger("migrate").warning("Client ID already has Verified Media Path data")
client_id.vmp_data = new_vmp_data
data.client_id = client_id.SerializeToString()
try:
data = _Structures.v2.build(data)
except ConstructError as e:
raise ValueError(f"Migration failed, {e}")
try:
return cls.loads(data)
except ConstructError as e:
raise ValueError(f"Device Data seems to be corrupt or invalid, or migration failed, {e}")
__ALL__ = (Device,) __ALL__ = (Device,)

View File

@@ -1,14 +1,18 @@
import logging import logging
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Optional
from zlib import crc32
import click import click
import requests import requests
from construct import ConstructError
from unidecode import unidecode, UnidecodeError
from pywidevine import __version__ from pywidevine import __version__
from pywidevine.cdm import Cdm from pywidevine.cdm import Cdm
from pywidevine.device import Device from pywidevine.device import Device
from pywidevine.license_protocol_pb2 import LicenseType from pywidevine.license_protocol_pb2 import LicenseType, FileHashes
@click.group(invoke_without_command=True) @click.group(invoke_without_command=True)
@@ -60,15 +64,13 @@ def license_(device: Path, pssh: str, server: str, type_: str, raw: bool, privac
""" """
log = logging.getLogger("license") log = logging.getLogger("license")
type_ = LicenseType.Value(type_)
# load device # load device
device = Device.load(device) device = Device.load(device)
log.info(f"[+] Loaded Device ({device.system_id} L{device.security_level})") log.info(f"[+] Loaded Device ({device.system_id} L{device.security_level})")
log.debug(device) log.debug(device)
# load cdm # load cdm
cdm = Cdm(device, pssh, type_, raw) cdm = Cdm(device, pssh, raw)
log.info(f"[+] Loaded CDM with PSSH: {pssh}") log.info(f"[+] Loaded CDM with PSSH: {pssh}")
log.debug(cdm) log.debug(cdm)
@@ -87,7 +89,8 @@ def license_(device: Path, pssh: str, server: str, type_: str, raw: bool, privac
log.debug(service_cert) log.debug(service_cert)
# get license challenge # get license challenge
challenge = cdm.get_license_challenge(privacy_mode=True) license_type = LicenseType.Value(type_)
challenge = cdm.get_license_challenge(license_type, privacy_mode=True)
log.info("[+] Created License Request Message (Challenge)") log.info("[+] Created License Request Message (Challenge)")
log.debug(challenge) log.debug(challenge)
@@ -114,8 +117,10 @@ def license_(device: Path, pssh: str, server: str, type_: str, raw: bool, privac
@main.command() @main.command()
@click.argument("device", type=Path) @click.argument("device", type=Path)
@click.option("-p", "--privacy", is_flag=True, default=False,
help="Use Privacy Mode, off by default.")
@click.pass_context @click.pass_context
def test(ctx: click.Context, device: Path): def test(ctx: click.Context, device: Path, privacy: bool):
""" """
Test the CDM code by getting Content Keys for Bitmovin's Art of Motion example. Test the CDM code by getting Content Keys for Bitmovin's Art of Motion example.
https://bitmovin.com/demos/drm https://bitmovin.com/demos/drm
@@ -149,5 +154,108 @@ def test(ctx: click.Context, device: Path):
pssh=pssh, pssh=pssh,
server=license_server, server=license_server,
type_=LicenseType.Name(license_type), type_=LicenseType.Name(license_type),
raw=raw raw=raw,
privacy=privacy
) )
@main.command()
@click.option("-t", "--type", "type_", type=click.Choice([x.name for x in Device.Types], case_sensitive=False),
required=True, help="Device Type")
@click.option("-l", "--level", type=click.IntRange(1, 3), required=True, help="Device Security Level")
@click.option("-k", "--key", type=Path, required=True, help="Device RSA Private Key in PEM or DER format")
@click.option("-c", "--client_id", type=Path, required=True, help="Widevine ClientIdentification Blob file")
@click.option("-v", "--vmp", type=Path, required=True, help="Widevine FileHashes Blob file")
@click.option("-o", "--output", type=Path, default=None, help="Output Directory")
@click.pass_context
def create_device(
ctx: click.Context,
type_: str,
level: int,
key: Path,
client_id: Path,
vmp: Optional[Path] = None,
output: Optional[Path] = None
) -> None:
"""
Create a Widevine Device (.wvd) file from an RSA Private Key (PEM or DER) and Client ID Blob.
Optionally also a VMP (Verified Media Path) Blob, which will be stored in the Client ID.
"""
if not key.is_file():
raise click.UsageError("key: Not a path to a file, or it doesn't exist.", ctx)
if not client_id.is_file():
raise click.UsageError("client_id: Not a path to a file, or it doesn't exist.", ctx)
if vmp and not vmp.is_file():
raise click.UsageError("vmp: Not a path to a file, or it doesn't exist.", ctx)
log = logging.getLogger("create-device")
device = Device(
type_=Device.Types[type_.upper()],
security_level=level,
flags=None,
private_key=key.read_bytes(),
client_id=client_id.read_bytes()
)
if vmp:
new_vmp_data = vmp.read_bytes()
if device.client_id.vmp_data and device.client_id.vmp_data != new_vmp_data:
log.warning("Client ID already has Verified Media Path data")
device.client_id.vmp_data = new_vmp_data
client_info = {}
for entry in device.client_id.client_info:
client_info[entry.name] = entry.value
wvd_bin = device.dumps()
name = f"{client_info['company_name']} {client_info['model_name']}"
if client_info.get("widevine_cdm_version"):
name += f" {client_info['widevine_cdm_version']}"
name += f" {crc32(wvd_bin).to_bytes(4, 'big').hex()}"
try:
name = unidecode(name.strip().lower().replace(" ", "_"))
except UnidecodeError as e:
raise click.ClickException(f"Failed to sanitize name, {e}")
out_path = (output or Path.cwd()) / f"{name}_{device.system_id}_l{device.security_level}.wvd"
out_path.write_bytes(wvd_bin)
log.info(f"Created Widevine Device (.wvd) file, {out_path.name}")
log.info(f" + Type: {device.type.name}")
log.info(f" + System ID: {device.system_id}")
log.info(f" + Security Level: {device.security_level}")
log.info(f" + Flags: {device.flags}")
log.info(f" + Private Key: {bool(device.private_key)} ({device.private_key.size_in_bits()} bit)")
log.info(f" + Client ID: {bool(device.client_id)} ({len(device.client_id.SerializeToString())} bytes)")
if device.client_id.vmp_data:
file_hashes_ = FileHashes()
file_hashes_.ParseFromString(device.client_id.vmp_data)
log.info(f" + VMP: True ({len(file_hashes_.signatures)} signatures)")
else:
log.info(f" + VMP: False")
log.info(f" + Saved to: {out_path.absolute()}")
@main.command()
@click.argument("device", type=Path)
@click.pass_context
def migrate(ctx: click.Context, device: Path) -> None:
"""Upgrade from earlier versions of the Widevine Device (.wvd) format."""
if not device.is_file():
raise click.UsageError("device: Not a path to a file, or it doesn't exist.", ctx)
log = logging.getLogger("migrate")
try:
new_device = Device.migrate(device.read_bytes())
except (ConstructError, ValueError) as e:
raise click.UsageError(str(e), ctx)
# save
log.debug(new_device)
new_device.dump(device)
log.info("Successfully migrated the Widevine Device (.wvd) file!")