19 Commits

Author SHA1 Message Date
rlaphoenix
e1951d20d0 Update Changelog for v1.1.0 2022-07-21 17:32:14 +01:00
rlaphoenix
35abd2962f Bump to v1.1.0 2022-07-21 17:32:06 +01:00
rlaphoenix
b262e115d3 Add ability to use Privacy mode on test command 2022-07-21 17:28:04 +01:00
rlaphoenix
95982725c3 Cdm: Support providing Service Cert as any 3 schemas
Some service's might provide the Service Certificate as a SignedDrmCertificate instead of a SignedMessage so I added support for supplying such format certificates. I also added support for supplying a DrmCertificate directly, though it's unlikely for a service to provide it raw without a signature like that.

The Service Certificate is now also stored as just the DrmCertificate internally, as it will not be using the signature.
2022-07-21 17:26:14 +01:00
rlaphoenix
70e79825b3 Device: Re-use magic reference across Structures 2022-07-21 16:23:19 +01:00
rlaphoenix
f2174dfa72 Device: Blank flags on v1 WVDs when migrating
This flag was technically used before this project and to ensure it will be unused and ready for safe use in this project and on v3 (if/when), we should blank the flags.
2022-07-21 16:21:22 +01:00
rlaphoenix
fe21bfe88c Fix migrate cmd's error handling, missing ValueError catching 2022-07-21 16:20:22 +01:00
rlaphoenix
93f70f73c2 Device: Fix header structure, should not be a constant 1 2022-07-21 16:19:52 +01:00
rlaphoenix
1442c945cc Move Migration Code to Device.migrate()
Also now more effectively migrates using the v1 Structure data.

Also fixes the migration error of possibly leaving behind VMP data. Will warn you if VMP data is already in the Client ID (if its different).
2022-07-21 16:10:42 +01:00
rlaphoenix
a729648a34 Device: Reference Structures class within Device class 2022-07-21 15:49:17 +01:00
rlaphoenix
3d6ddb8dcd Device: Remove explicit deprecated key control flag
This flag was used before this project was made, never after. Therefore I do not need to actually take this flag slot as deprecated from the get-go.
2022-07-21 15:48:45 +01:00
rlaphoenix
b41f09bee4 Device: Add v1 Structure for Migration 2022-07-21 15:42:17 +01:00
rlaphoenix
db80776ac0 Device: Move the structure under a Structures class 2022-07-21 15:40:46 +01:00
rlaphoenix
02ca1b00c9 Fix year on release dates in Changelog 2022-07-21 14:08:07 +01:00
rlaphoenix
7b06a3c053 Add cmd to migrate older .wvd files to v2 2022-07-21 13:58:41 +01:00
rlaphoenix
14126c67b1 Remove doc-string about non-existent name argument 2022-07-21 13:34:12 +01:00
rlaphoenix
5e93d6321d Add cmd to create a new .wvd device file
It even adds VMP data to the Client ID blob directly (instead of storing possibly duplicated). It will warn you if the Client ID already had VMP data there.

The filename is generated from client id information and has a crc32 checksum to help avoid with conflicts.
The output directory is the current working directory. You can set the directory with -o/--output.
2022-07-21 13:32:13 +01:00
rlaphoenix
1f389dbab9 Device: Fix typo on type_ in dump() 2022-07-21 13:09:53 +01:00
rlaphoenix
ac4c8affb0 deps: Add unidecode 2022-07-21 13:09:33 +01:00
7 changed files with 299 additions and 43 deletions

View File

@@ -5,7 +5,35 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.0.1] - 2021-07-21
## [1.1.0] - 2022-07-21
### Added
- Added support for setting a Service Certificate in SignedDrmCertificate form as well as raw DrmCertificate form.
However, It's unlikely for the service to provide the certificate in raw DrmCertificate form without a signature.
- Added a CLI command `create-device` to create Widevine Device (`.wvd`) files from RSA PEM/DER Private Keys and
Client ID blobs. You can also provide VMP (FileHashes) data which will be merged into the Client ID blob.
- Added a CLI command `migrate` that uses `Device.migrate()` and `dump()` to migrate older v1 Widevine Device files
to v2.
- Added the v1 Structure of Widevine Devices for migration use.
- Added `Device.migrate()` class method that effectively loads older format WVD data. You can then use `dumps()` to
get back the WVD data in the latest supported format.
- Added ability to use Privacy mode on the test command.
### Changed
- Set Service Certificates are now stored as the raw underlying DrmCertificate as the signature data is unused by
the CDM.
- Moved all Widevine Device structures under a Structures class.
- I removed the `send_key_control_nonce` flag from all Structures even though it was technically used.
This is because the flag was never used as of this project, and I do not want to take up the flag slot.
### Fixed
- Devices `dump()` function now uses the correct `type_` parameter when building the struct.
- Fixed release date year of v1.0.0 and v1.0.1 in the changelog.
## [1.0.1] - 2022-07-21
### Added
@@ -24,14 +52,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Cdm's `set_service_certificate()` now correctly raises a DecodeError on Decode Error instead of a ValueError.
- CDMs `set_service_certificate()` now correctly raises a DecodeError on Decode Error instead of a ValueError.
- Context Data will now always match to their corresponding License Responses. This fixes an issue where creating
a second challenge would overwrite the context data of the first challenge. Parsing the first challenge after
would result in either a key decrypt error, or garbage key data.
## [1.0.0] - 2021-07-20
## [1.0.0] - 2022-07-20
Initial Release.
[1.1.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.1.0
[1.0.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.0.1
[1.0.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.0.0

14
poetry.lock generated
View File

@@ -137,6 +137,14 @@ category = "main"
optional = false
python-versions = ">=3.7"
[[package]]
name = "unidecode"
version = "1.3.4"
description = "ASCII transliterations of Unicode text"
category = "main"
optional = false
python-versions = ">=3.5"
[[package]]
name = "urllib3"
version = "1.26.10"
@@ -165,7 +173,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-
[metadata]
lock-version = "1.1"
python-versions = ">=3.7,<3.11"
content-hash = "9c6a76629e0f0a4e98b6c47707899519f930debd70312d2e909eb42f94cd212f"
content-hash = "0720732d3b990a11e3b35f4604103364ae5654d1ac6e31ead17c03e566a016d0"
[metadata.files]
certifi = []
@@ -195,5 +203,9 @@ pycryptodome = []
pymp4 = []
requests = []
typing-extensions = []
unidecode = [
{file = "Unidecode-1.3.4-py3-none-any.whl", hash = "sha256:afa04efcdd818a93237574791be9b2817d7077c25a068b00f8cff7baa4e59257"},
{file = "Unidecode-1.3.4.tar.gz", hash = "sha256:8e4352fb93d5a735c788110d2e7ac8e8031eb06ccbfe8d324ab71735015f9342"},
]
urllib3 = []
zipp = []

View File

@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "pywidevine"
version = "1.0.1"
version = "1.1.0"
description = "Widevine CDM (Content Decryption Module) implementation in Python."
authors = ["rlaphoenix <rlaphoenix@pm.me>"]
license = "GPL-3.0-only"
@@ -33,6 +33,7 @@ pycryptodome = "^3.15.0"
click = "^8.1.3"
requests = "^2.28.1"
lxml = "^4.9.1"
Unidecode = "^1.3.4"
[tool.poetry.scripts]
pywidevine = "pywidevine.main:main"

View File

@@ -1 +1 @@
__version__ = "1.0.1"
__version__ = "1.1.0"

View File

@@ -84,10 +84,10 @@ class Cdm:
self.init_data = PSSH.get_as_box(pssh).init_data
self.session_id = get_random_bytes(16)
self.service_certificate: Optional[SignedMessage] = None
self.service_certificate: Optional[DrmCertificate] = None
self.context: dict[bytes, tuple[bytes, bytes]] = {}
def set_service_certificate(self, certificate: Union[bytes, str]) -> SignedMessage:
def set_service_certificate(self, certificate: Union[bytes, str]) -> DrmCertificate:
"""
Set a Service Privacy Certificate for Privacy Mode. (optional but recommended)
@@ -96,7 +96,7 @@ class Cdm:
Some services have their own, but most use the common privacy cert,
(common_privacy_cert).
Returns the parsed Signed Message if successful, otherwise raises a DecodeError.
Returns the parsed Drm Certificate if successful, otherwise raises a DecodeError.
The Service Certificate is used to encrypt Client IDs in Licenses. This is also
known as Privacy Mode and may be required for some services or for some devices.
@@ -106,13 +106,48 @@ class Cdm:
certificate = base64.b64decode(certificate) # assuming base64
signed_message = SignedMessage()
try:
signed_message.ParseFromString(certificate)
except DecodeError as e:
raise DecodeError(f"Could not parse certificate as a Signed Message: {e}")
signed_drm_certificate = SignedDrmCertificate()
drm_certificate = DrmCertificate()
self.service_certificate = signed_message
return signed_message
# Note: A secure CDM would likely reject any Service Certificate that is
# not either a SignedMessage or a SignedDrmCertificate. This is because
# the DrmCertificate itself is not signed. This CDM does not verify the
# signatures as I'm not sure what HMAC key is used. At this stage of the
# CDM flow, we wouldn't have any mac_keys, and those might not work for
# verifying service certificate signatures (likely not).
# All these 3 schemas can sort of parse each other in a minimal buggy way,
# so we have to parse down the full chain instead of relaying each step
try: # SignedMessage input
signed_message.ParseFromString(certificate)
signed_drm_certificate.ParseFromString(signed_message.msg)
if not signed_drm_certificate.drm_certificate:
raise DecodeError()
drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate)
self.service_certificate = drm_certificate
return self.service_certificate
except DecodeError:
pass
try: # SignedDrmCertificate input
signed_drm_certificate.ParseFromString(certificate)
if not signed_drm_certificate.drm_certificate:
raise DecodeError()
drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate)
self.service_certificate = drm_certificate
return self.service_certificate
except DecodeError:
pass
try: # DrmCertificate input
drm_certificate.ParseFromString(certificate)
self.service_certificate = drm_certificate
return self.service_certificate
except DecodeError:
pass
raise DecodeError("Could not parse certificate as a Service Certificate")
def get_license_challenge(self, type_: LicenseType = LicenseType.STREAMING, privacy_mode: bool = True) -> bytes:
"""
@@ -253,7 +288,7 @@ class Cdm:
@staticmethod
def encrypt_client_id(
client_id: ClientIdentification,
service_certificate: Union[SignedMessage, SignedDrmCertificate, DrmCertificate],
service_certificate: DrmCertificate,
key: bytes = None,
iv: bytes = None
) -> EncryptedClientIdentification:
@@ -261,18 +296,8 @@ class Cdm:
privacy_key = key or get_random_bytes(16)
privacy_iv = iv or get_random_bytes(16)
if isinstance(service_certificate, SignedMessage):
signed_service_certificate = SignedDrmCertificate()
signed_service_certificate.ParseFromString(service_certificate.msg)
service_certificate = signed_service_certificate
if isinstance(service_certificate, SignedDrmCertificate):
service_service_drm_certificate = DrmCertificate()
service_service_drm_certificate.ParseFromString(service_certificate.drm_certificate)
service_certificate = service_service_drm_certificate
if not isinstance(service_certificate, DrmCertificate):
raise ValueError(f"Service Certificate is in an unexpected type {service_certificate!r}")
raise ValueError(f"Expecting Service Certificate to be a DrmCertificate, not {service_certificate!r}")
enc_client_id = EncryptedClientIdentification()
enc_client_id.provider_id = service_certificate.provider_id

View File

@@ -1,13 +1,14 @@
from __future__ import annotations
import base64
import logging
from enum import Enum
from pathlib import Path
from typing import Any, Optional, Union
from construct import BitStruct, Bytes, Const
from construct import BitStruct, Bytes, Const, ConstructError, Container
from construct import Enum as CEnum
from construct import Flag, Int8ub, Int16ub
from construct import Int8ub, Int16ub
from construct import Optional as COptional
from construct import Padded, Padding, Struct, this
from Crypto.PublicKey import RSA
@@ -21,12 +22,16 @@ class _Types(Enum):
ANDROID = 2
class Device:
# needed so bin_format can enumerate the types
Types = _Types
class _Structures:
magic = Const(b"WVD")
bin_format = Struct(
"signature" / Const(b"WVD"),
header = Struct(
"signature" / magic,
"version" / Int8ub
)
v2 = Struct(
"signature" / magic,
"version" / Const(Int8ub, 2),
"type_" / CEnum(
Int8ub,
@@ -34,8 +39,8 @@ class Device:
),
"security_level" / Int8ub,
"flags" / Padded(1, COptional(BitStruct(
Padding(7),
"send_key_control_nonce" / Flag # deprecated, do not use
# no per-device flags yet
Padding(8)
))),
"private_key_len" / Int16ub,
"private_key" / Bytes(this.private_key_len),
@@ -43,6 +48,32 @@ class Device:
"client_id" / Bytes(this.client_id_len)
)
v1 = Struct(
"signature" / magic,
"version" / Const(Int8ub, 1),
"type_" / CEnum(
Int8ub,
**{t.name: t.value for t in _Types}
),
"security_level" / Int8ub,
"flags" / Padded(1, COptional(BitStruct(
# no per-device flags yet
Padding(8)
))),
"private_key_len" / Int16ub,
"private_key" / Bytes(this.private_key_len),
"client_id_len" / Int16ub,
"client_id" / Bytes(this.client_id_len),
"vmp_len" / Int16ub,
"vmp" / Bytes(this.vmp_len)
)
class Device:
Types = _Types
Structures = _Structures
supported_structure = Structures.v2
# == Bin Format Revisions == #
# Version 2: Removed vmp and vmp_len as it should already be within the Client ID
# Version 1: Removed system_id as it can be retrieved from the Client ID's DRM Certificate
@@ -109,20 +140,20 @@ class Device:
data = base64.b64decode(data)
if not isinstance(data, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
return cls(**cls.bin_format.parse(data))
return cls(**cls.supported_structure.parse(data))
@classmethod
def load(cls, path: Union[Path, str]) -> Device:
if not isinstance(path, (Path, str)):
raise ValueError(f"Expecting Path object or path string, got {path!r}")
with Path(path).open(mode="rb") as f:
return cls(**cls.bin_format.parse_stream(f))
return cls(**cls.supported_structure.parse_stream(f))
def dumps(self) -> bytes:
private_key = self.private_key.export_key("DER") if self.private_key else None
return self.bin_format.build(dict(
return self.supported_structure.build(dict(
version=2,
type=self.type.value,
type_=self.type.value,
security_level=self.security_level,
flags=self.flags,
private_key_len=len(private_key) if private_key else 0,
@@ -138,5 +169,54 @@ class Device:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(self.dumps())
@classmethod
def migrate(cls, data: Union[bytes, str]) -> Device:
if isinstance(data, str):
data = base64.b64decode(data)
if not isinstance(data, bytes):
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
header = _Structures.header.parse(data)
if header.version == 2:
raise ValueError("Device Data is already migrated to the latest version.")
if header.version == 0 or header.version > 2:
# we have never used version 0, likely data that just so happened to use the WVD magic
raise ValueError("Device Data does not seem to be a WVD file (v0).")
if header.version == 1: # v1 to v2
data = _Structures.v1.parse(data)
data.version = 2 # update version to 2 to allow loading
data.flags = Container() # blank flags that may have been used in v1
vmp = FileHashes()
if data.vmp:
try:
vmp.ParseFromString(data.vmp)
except DecodeError as e:
raise DecodeError(f"Failed to parse VMP data as FileHashes, {e}")
data.vmp = vmp
client_id = ClientIdentification()
try:
client_id.ParseFromString(data.client_id)
except DecodeError as e:
raise DecodeError(f"Failed to parse VMP data as FileHashes, {e}")
new_vmp_data = data.vmp.SerializeToString()
if client_id.vmp_data and client_id.vmp_data != new_vmp_data:
logging.getLogger("migrate").warning("Client ID already has Verified Media Path data")
client_id.vmp_data = new_vmp_data
data.client_id = client_id.SerializeToString()
try:
data = _Structures.v2.build(data)
except ConstructError as e:
raise ValueError(f"Migration failed, {e}")
try:
return cls.loads(data)
except ConstructError as e:
raise ValueError(f"Device Data seems to be corrupt or invalid, or migration failed, {e}")
__ALL__ = (Device,)

View File

@@ -1,14 +1,18 @@
import logging
from datetime import datetime
from pathlib import Path
from typing import Optional
from zlib import crc32
import click
import requests
from construct import ConstructError
from unidecode import unidecode, UnidecodeError
from pywidevine import __version__
from pywidevine.cdm import Cdm
from pywidevine.device import Device
from pywidevine.license_protocol_pb2 import LicenseType
from pywidevine.license_protocol_pb2 import LicenseType, FileHashes
@click.group(invoke_without_command=True)
@@ -113,8 +117,10 @@ def license_(device: Path, pssh: str, server: str, type_: str, raw: bool, privac
@main.command()
@click.argument("device", type=Path)
@click.option("-p", "--privacy", is_flag=True, default=False,
help="Use Privacy Mode, off by default.")
@click.pass_context
def test(ctx: click.Context, device: Path):
def test(ctx: click.Context, device: Path, privacy: bool):
"""
Test the CDM code by getting Content Keys for Bitmovin's Art of Motion example.
https://bitmovin.com/demos/drm
@@ -148,5 +154,108 @@ def test(ctx: click.Context, device: Path):
pssh=pssh,
server=license_server,
type_=LicenseType.Name(license_type),
raw=raw
raw=raw,
privacy=privacy
)
@main.command()
@click.option("-t", "--type", "type_", type=click.Choice([x.name for x in Device.Types], case_sensitive=False),
required=True, help="Device Type")
@click.option("-l", "--level", type=click.IntRange(1, 3), required=True, help="Device Security Level")
@click.option("-k", "--key", type=Path, required=True, help="Device RSA Private Key in PEM or DER format")
@click.option("-c", "--client_id", type=Path, required=True, help="Widevine ClientIdentification Blob file")
@click.option("-v", "--vmp", type=Path, required=True, help="Widevine FileHashes Blob file")
@click.option("-o", "--output", type=Path, default=None, help="Output Directory")
@click.pass_context
def create_device(
ctx: click.Context,
type_: str,
level: int,
key: Path,
client_id: Path,
vmp: Optional[Path] = None,
output: Optional[Path] = None
) -> None:
"""
Create a Widevine Device (.wvd) file from an RSA Private Key (PEM or DER) and Client ID Blob.
Optionally also a VMP (Verified Media Path) Blob, which will be stored in the Client ID.
"""
if not key.is_file():
raise click.UsageError("key: Not a path to a file, or it doesn't exist.", ctx)
if not client_id.is_file():
raise click.UsageError("client_id: Not a path to a file, or it doesn't exist.", ctx)
if vmp and not vmp.is_file():
raise click.UsageError("vmp: Not a path to a file, or it doesn't exist.", ctx)
log = logging.getLogger("create-device")
device = Device(
type_=Device.Types[type_.upper()],
security_level=level,
flags=None,
private_key=key.read_bytes(),
client_id=client_id.read_bytes()
)
if vmp:
new_vmp_data = vmp.read_bytes()
if device.client_id.vmp_data and device.client_id.vmp_data != new_vmp_data:
log.warning("Client ID already has Verified Media Path data")
device.client_id.vmp_data = new_vmp_data
client_info = {}
for entry in device.client_id.client_info:
client_info[entry.name] = entry.value
wvd_bin = device.dumps()
name = f"{client_info['company_name']} {client_info['model_name']}"
if client_info.get("widevine_cdm_version"):
name += f" {client_info['widevine_cdm_version']}"
name += f" {crc32(wvd_bin).to_bytes(4, 'big').hex()}"
try:
name = unidecode(name.strip().lower().replace(" ", "_"))
except UnidecodeError as e:
raise click.ClickException(f"Failed to sanitize name, {e}")
out_path = (output or Path.cwd()) / f"{name}_{device.system_id}_l{device.security_level}.wvd"
out_path.write_bytes(wvd_bin)
log.info(f"Created Widevine Device (.wvd) file, {out_path.name}")
log.info(f" + Type: {device.type.name}")
log.info(f" + System ID: {device.system_id}")
log.info(f" + Security Level: {device.security_level}")
log.info(f" + Flags: {device.flags}")
log.info(f" + Private Key: {bool(device.private_key)} ({device.private_key.size_in_bits()} bit)")
log.info(f" + Client ID: {bool(device.client_id)} ({len(device.client_id.SerializeToString())} bytes)")
if device.client_id.vmp_data:
file_hashes_ = FileHashes()
file_hashes_.ParseFromString(device.client_id.vmp_data)
log.info(f" + VMP: True ({len(file_hashes_.signatures)} signatures)")
else:
log.info(f" + VMP: False")
log.info(f" + Saved to: {out_path.absolute()}")
@main.command()
@click.argument("device", type=Path)
@click.pass_context
def migrate(ctx: click.Context, device: Path) -> None:
"""Upgrade from earlier versions of the Widevine Device (.wvd) format."""
if not device.is_file():
raise click.UsageError("device: Not a path to a file, or it doesn't exist.", ctx)
log = logging.getLogger("migrate")
try:
new_device = Device.migrate(device.read_bytes())
except (ConstructError, ValueError) as e:
raise click.UsageError(str(e), ctx)
# save
log.debug(new_device)
new_device.dump(device)
log.info("Successfully migrated the Widevine Device (.wvd) file!")