Compare commits
22 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d72607b080 | ||
|
|
60bb779c59 | ||
|
|
e1532b1451 | ||
|
|
e1951d20d0 | ||
|
|
35abd2962f | ||
|
|
b262e115d3 | ||
|
|
95982725c3 | ||
|
|
70e79825b3 | ||
|
|
f2174dfa72 | ||
|
|
fe21bfe88c | ||
|
|
93f70f73c2 | ||
|
|
1442c945cc | ||
|
|
a729648a34 | ||
|
|
3d6ddb8dcd | ||
|
|
b41f09bee4 | ||
|
|
db80776ac0 | ||
|
|
02ca1b00c9 | ||
|
|
7b06a3c053 | ||
|
|
14126c67b1 | ||
|
|
5e93d6321d | ||
|
|
1f389dbab9 | ||
|
|
ac4c8affb0 |
42
CHANGELOG.md
42
CHANGELOG.md
@@ -5,7 +5,41 @@ All notable changes to this project will be documented in this file.
|
|||||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||||
|
|
||||||
## [1.0.1] - 2021-07-21
|
## [1.1.1] - 2022-07-22
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
- The --vmp argument of the create-device command is now optional.
|
||||||
|
|
||||||
|
## [1.1.0] - 2022-07-21
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
- Added support for setting a Service Certificate in SignedDrmCertificate form as well as raw DrmCertificate form.
|
||||||
|
However, It's unlikely for the service to provide the certificate in raw DrmCertificate form without a signature.
|
||||||
|
- Added a CLI command `create-device` to create Widevine Device (`.wvd`) files from RSA PEM/DER Private Keys and
|
||||||
|
Client ID blobs. You can also provide VMP (FileHashes) data which will be merged into the Client ID blob.
|
||||||
|
- Added a CLI command `migrate` that uses `Device.migrate()` and `dump()` to migrate older v1 Widevine Device files
|
||||||
|
to v2.
|
||||||
|
- Added the v1 Structure of Widevine Devices for migration use.
|
||||||
|
- Added `Device.migrate()` class method that effectively loads older format WVD data. You can then use `dumps()` to
|
||||||
|
get back the WVD data in the latest supported format.
|
||||||
|
- Added ability to use Privacy mode on the test command.
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
- Set Service Certificates are now stored as the raw underlying DrmCertificate as the signature data is unused by
|
||||||
|
the CDM.
|
||||||
|
- Moved all Widevine Device structures under a Structures class.
|
||||||
|
- I removed the `send_key_control_nonce` flag from all Structures even though it was technically used.
|
||||||
|
This is because the flag was never used as of this project, and I do not want to take up the flag slot.
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
- Devices `dump()` function now uses the correct `type_` parameter when building the struct.
|
||||||
|
- Fixed release date year of v1.0.0 and v1.0.1 in the changelog.
|
||||||
|
|
||||||
|
## [1.0.1] - 2022-07-21
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|
||||||
@@ -24,14 +58,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
|
||||||
- Cdm's `set_service_certificate()` now correctly raises a DecodeError on Decode Error instead of a ValueError.
|
- CDMs `set_service_certificate()` now correctly raises a DecodeError on Decode Error instead of a ValueError.
|
||||||
- Context Data will now always match to their corresponding License Responses. This fixes an issue where creating
|
- Context Data will now always match to their corresponding License Responses. This fixes an issue where creating
|
||||||
a second challenge would overwrite the context data of the first challenge. Parsing the first challenge after
|
a second challenge would overwrite the context data of the first challenge. Parsing the first challenge after
|
||||||
would result in either a key decrypt error, or garbage key data.
|
would result in either a key decrypt error, or garbage key data.
|
||||||
|
|
||||||
## [1.0.0] - 2021-07-20
|
## [1.0.0] - 2022-07-20
|
||||||
|
|
||||||
Initial Release.
|
Initial Release.
|
||||||
|
|
||||||
|
[1.1.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.1.1
|
||||||
|
[1.1.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.1.0
|
||||||
[1.0.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.0.1
|
[1.0.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.0.1
|
||||||
[1.0.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.0.0
|
[1.0.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.0.0
|
||||||
|
|||||||
14
poetry.lock
generated
14
poetry.lock
generated
@@ -137,6 +137,14 @@ category = "main"
|
|||||||
optional = false
|
optional = false
|
||||||
python-versions = ">=3.7"
|
python-versions = ">=3.7"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "unidecode"
|
||||||
|
version = "1.3.4"
|
||||||
|
description = "ASCII transliterations of Unicode text"
|
||||||
|
category = "main"
|
||||||
|
optional = false
|
||||||
|
python-versions = ">=3.5"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "urllib3"
|
name = "urllib3"
|
||||||
version = "1.26.10"
|
version = "1.26.10"
|
||||||
@@ -165,7 +173,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-
|
|||||||
[metadata]
|
[metadata]
|
||||||
lock-version = "1.1"
|
lock-version = "1.1"
|
||||||
python-versions = ">=3.7,<3.11"
|
python-versions = ">=3.7,<3.11"
|
||||||
content-hash = "9c6a76629e0f0a4e98b6c47707899519f930debd70312d2e909eb42f94cd212f"
|
content-hash = "0720732d3b990a11e3b35f4604103364ae5654d1ac6e31ead17c03e566a016d0"
|
||||||
|
|
||||||
[metadata.files]
|
[metadata.files]
|
||||||
certifi = []
|
certifi = []
|
||||||
@@ -195,5 +203,9 @@ pycryptodome = []
|
|||||||
pymp4 = []
|
pymp4 = []
|
||||||
requests = []
|
requests = []
|
||||||
typing-extensions = []
|
typing-extensions = []
|
||||||
|
unidecode = [
|
||||||
|
{file = "Unidecode-1.3.4-py3-none-any.whl", hash = "sha256:afa04efcdd818a93237574791be9b2817d7077c25a068b00f8cff7baa4e59257"},
|
||||||
|
{file = "Unidecode-1.3.4.tar.gz", hash = "sha256:8e4352fb93d5a735c788110d2e7ac8e8031eb06ccbfe8d324ab71735015f9342"},
|
||||||
|
]
|
||||||
urllib3 = []
|
urllib3 = []
|
||||||
zipp = []
|
zipp = []
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
|
|||||||
|
|
||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
name = "pywidevine"
|
name = "pywidevine"
|
||||||
version = "1.0.1"
|
version = "1.1.1"
|
||||||
description = "Widevine CDM (Content Decryption Module) implementation in Python."
|
description = "Widevine CDM (Content Decryption Module) implementation in Python."
|
||||||
authors = ["rlaphoenix <rlaphoenix@pm.me>"]
|
authors = ["rlaphoenix <rlaphoenix@pm.me>"]
|
||||||
license = "GPL-3.0-only"
|
license = "GPL-3.0-only"
|
||||||
@@ -33,6 +33,7 @@ pycryptodome = "^3.15.0"
|
|||||||
click = "^8.1.3"
|
click = "^8.1.3"
|
||||||
requests = "^2.28.1"
|
requests = "^2.28.1"
|
||||||
lxml = "^4.9.1"
|
lxml = "^4.9.1"
|
||||||
|
Unidecode = "^1.3.4"
|
||||||
|
|
||||||
[tool.poetry.scripts]
|
[tool.poetry.scripts]
|
||||||
pywidevine = "pywidevine.main:main"
|
pywidevine = "pywidevine.main:main"
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
__version__ = "1.0.1"
|
__version__ = "1.1.1"
|
||||||
|
|||||||
@@ -84,10 +84,10 @@ class Cdm:
|
|||||||
self.init_data = PSSH.get_as_box(pssh).init_data
|
self.init_data = PSSH.get_as_box(pssh).init_data
|
||||||
|
|
||||||
self.session_id = get_random_bytes(16)
|
self.session_id = get_random_bytes(16)
|
||||||
self.service_certificate: Optional[SignedMessage] = None
|
self.service_certificate: Optional[DrmCertificate] = None
|
||||||
self.context: dict[bytes, tuple[bytes, bytes]] = {}
|
self.context: dict[bytes, tuple[bytes, bytes]] = {}
|
||||||
|
|
||||||
def set_service_certificate(self, certificate: Union[bytes, str]) -> SignedMessage:
|
def set_service_certificate(self, certificate: Union[bytes, str]) -> DrmCertificate:
|
||||||
"""
|
"""
|
||||||
Set a Service Privacy Certificate for Privacy Mode. (optional but recommended)
|
Set a Service Privacy Certificate for Privacy Mode. (optional but recommended)
|
||||||
|
|
||||||
@@ -96,7 +96,7 @@ class Cdm:
|
|||||||
Some services have their own, but most use the common privacy cert,
|
Some services have their own, but most use the common privacy cert,
|
||||||
(common_privacy_cert).
|
(common_privacy_cert).
|
||||||
|
|
||||||
Returns the parsed Signed Message if successful, otherwise raises a DecodeError.
|
Returns the parsed Drm Certificate if successful, otherwise raises a DecodeError.
|
||||||
|
|
||||||
The Service Certificate is used to encrypt Client IDs in Licenses. This is also
|
The Service Certificate is used to encrypt Client IDs in Licenses. This is also
|
||||||
known as Privacy Mode and may be required for some services or for some devices.
|
known as Privacy Mode and may be required for some services or for some devices.
|
||||||
@@ -106,13 +106,48 @@ class Cdm:
|
|||||||
certificate = base64.b64decode(certificate) # assuming base64
|
certificate = base64.b64decode(certificate) # assuming base64
|
||||||
|
|
||||||
signed_message = SignedMessage()
|
signed_message = SignedMessage()
|
||||||
try:
|
signed_drm_certificate = SignedDrmCertificate()
|
||||||
signed_message.ParseFromString(certificate)
|
drm_certificate = DrmCertificate()
|
||||||
except DecodeError as e:
|
|
||||||
raise DecodeError(f"Could not parse certificate as a Signed Message: {e}")
|
|
||||||
|
|
||||||
self.service_certificate = signed_message
|
# Note: A secure CDM would likely reject any Service Certificate that is
|
||||||
return signed_message
|
# not either a SignedMessage or a SignedDrmCertificate. This is because
|
||||||
|
# the DrmCertificate itself is not signed. This CDM does not verify the
|
||||||
|
# signatures as I'm not sure what HMAC key is used. At this stage of the
|
||||||
|
# CDM flow, we wouldn't have any mac_keys, and those might not work for
|
||||||
|
# verifying service certificate signatures (likely not).
|
||||||
|
|
||||||
|
# All these 3 schemas can sort of parse each other in a minimal buggy way,
|
||||||
|
# so we have to parse down the full chain instead of relaying each step
|
||||||
|
|
||||||
|
try: # SignedMessage input
|
||||||
|
signed_message.ParseFromString(certificate)
|
||||||
|
signed_drm_certificate.ParseFromString(signed_message.msg)
|
||||||
|
if not signed_drm_certificate.drm_certificate:
|
||||||
|
raise DecodeError()
|
||||||
|
drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate)
|
||||||
|
self.service_certificate = drm_certificate
|
||||||
|
return self.service_certificate
|
||||||
|
except DecodeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try: # SignedDrmCertificate input
|
||||||
|
signed_drm_certificate.ParseFromString(certificate)
|
||||||
|
if not signed_drm_certificate.drm_certificate:
|
||||||
|
raise DecodeError()
|
||||||
|
drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate)
|
||||||
|
self.service_certificate = drm_certificate
|
||||||
|
return self.service_certificate
|
||||||
|
except DecodeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try: # DrmCertificate input
|
||||||
|
drm_certificate.ParseFromString(certificate)
|
||||||
|
self.service_certificate = drm_certificate
|
||||||
|
return self.service_certificate
|
||||||
|
except DecodeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
raise DecodeError("Could not parse certificate as a Service Certificate")
|
||||||
|
|
||||||
def get_license_challenge(self, type_: LicenseType = LicenseType.STREAMING, privacy_mode: bool = True) -> bytes:
|
def get_license_challenge(self, type_: LicenseType = LicenseType.STREAMING, privacy_mode: bool = True) -> bytes:
|
||||||
"""
|
"""
|
||||||
@@ -253,7 +288,7 @@ class Cdm:
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def encrypt_client_id(
|
def encrypt_client_id(
|
||||||
client_id: ClientIdentification,
|
client_id: ClientIdentification,
|
||||||
service_certificate: Union[SignedMessage, SignedDrmCertificate, DrmCertificate],
|
service_certificate: DrmCertificate,
|
||||||
key: bytes = None,
|
key: bytes = None,
|
||||||
iv: bytes = None
|
iv: bytes = None
|
||||||
) -> EncryptedClientIdentification:
|
) -> EncryptedClientIdentification:
|
||||||
@@ -261,18 +296,8 @@ class Cdm:
|
|||||||
privacy_key = key or get_random_bytes(16)
|
privacy_key = key or get_random_bytes(16)
|
||||||
privacy_iv = iv or get_random_bytes(16)
|
privacy_iv = iv or get_random_bytes(16)
|
||||||
|
|
||||||
if isinstance(service_certificate, SignedMessage):
|
|
||||||
signed_service_certificate = SignedDrmCertificate()
|
|
||||||
signed_service_certificate.ParseFromString(service_certificate.msg)
|
|
||||||
service_certificate = signed_service_certificate
|
|
||||||
|
|
||||||
if isinstance(service_certificate, SignedDrmCertificate):
|
|
||||||
service_service_drm_certificate = DrmCertificate()
|
|
||||||
service_service_drm_certificate.ParseFromString(service_certificate.drm_certificate)
|
|
||||||
service_certificate = service_service_drm_certificate
|
|
||||||
|
|
||||||
if not isinstance(service_certificate, DrmCertificate):
|
if not isinstance(service_certificate, DrmCertificate):
|
||||||
raise ValueError(f"Service Certificate is in an unexpected type {service_certificate!r}")
|
raise ValueError(f"Expecting Service Certificate to be a DrmCertificate, not {service_certificate!r}")
|
||||||
|
|
||||||
enc_client_id = EncryptedClientIdentification()
|
enc_client_id = EncryptedClientIdentification()
|
||||||
enc_client_id.provider_id = service_certificate.provider_id
|
enc_client_id.provider_id = service_certificate.provider_id
|
||||||
|
|||||||
@@ -1,13 +1,14 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import base64
|
import base64
|
||||||
|
import logging
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Optional, Union
|
from typing import Any, Optional, Union
|
||||||
|
|
||||||
from construct import BitStruct, Bytes, Const
|
from construct import BitStruct, Bytes, Const, ConstructError, Container
|
||||||
from construct import Enum as CEnum
|
from construct import Enum as CEnum
|
||||||
from construct import Flag, Int8ub, Int16ub
|
from construct import Int8ub, Int16ub
|
||||||
from construct import Optional as COptional
|
from construct import Optional as COptional
|
||||||
from construct import Padded, Padding, Struct, this
|
from construct import Padded, Padding, Struct, this
|
||||||
from Crypto.PublicKey import RSA
|
from Crypto.PublicKey import RSA
|
||||||
@@ -21,12 +22,16 @@ class _Types(Enum):
|
|||||||
ANDROID = 2
|
ANDROID = 2
|
||||||
|
|
||||||
|
|
||||||
class Device:
|
class _Structures:
|
||||||
# needed so bin_format can enumerate the types
|
magic = Const(b"WVD")
|
||||||
Types = _Types
|
|
||||||
|
|
||||||
bin_format = Struct(
|
header = Struct(
|
||||||
"signature" / Const(b"WVD"),
|
"signature" / magic,
|
||||||
|
"version" / Int8ub
|
||||||
|
)
|
||||||
|
|
||||||
|
v2 = Struct(
|
||||||
|
"signature" / magic,
|
||||||
"version" / Const(Int8ub, 2),
|
"version" / Const(Int8ub, 2),
|
||||||
"type_" / CEnum(
|
"type_" / CEnum(
|
||||||
Int8ub,
|
Int8ub,
|
||||||
@@ -34,8 +39,8 @@ class Device:
|
|||||||
),
|
),
|
||||||
"security_level" / Int8ub,
|
"security_level" / Int8ub,
|
||||||
"flags" / Padded(1, COptional(BitStruct(
|
"flags" / Padded(1, COptional(BitStruct(
|
||||||
Padding(7),
|
# no per-device flags yet
|
||||||
"send_key_control_nonce" / Flag # deprecated, do not use
|
Padding(8)
|
||||||
))),
|
))),
|
||||||
"private_key_len" / Int16ub,
|
"private_key_len" / Int16ub,
|
||||||
"private_key" / Bytes(this.private_key_len),
|
"private_key" / Bytes(this.private_key_len),
|
||||||
@@ -43,6 +48,32 @@ class Device:
|
|||||||
"client_id" / Bytes(this.client_id_len)
|
"client_id" / Bytes(this.client_id_len)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
v1 = Struct(
|
||||||
|
"signature" / magic,
|
||||||
|
"version" / Const(Int8ub, 1),
|
||||||
|
"type_" / CEnum(
|
||||||
|
Int8ub,
|
||||||
|
**{t.name: t.value for t in _Types}
|
||||||
|
),
|
||||||
|
"security_level" / Int8ub,
|
||||||
|
"flags" / Padded(1, COptional(BitStruct(
|
||||||
|
# no per-device flags yet
|
||||||
|
Padding(8)
|
||||||
|
))),
|
||||||
|
"private_key_len" / Int16ub,
|
||||||
|
"private_key" / Bytes(this.private_key_len),
|
||||||
|
"client_id_len" / Int16ub,
|
||||||
|
"client_id" / Bytes(this.client_id_len),
|
||||||
|
"vmp_len" / Int16ub,
|
||||||
|
"vmp" / Bytes(this.vmp_len)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class Device:
|
||||||
|
Types = _Types
|
||||||
|
Structures = _Structures
|
||||||
|
supported_structure = Structures.v2
|
||||||
|
|
||||||
# == Bin Format Revisions == #
|
# == Bin Format Revisions == #
|
||||||
# Version 2: Removed vmp and vmp_len as it should already be within the Client ID
|
# Version 2: Removed vmp and vmp_len as it should already be within the Client ID
|
||||||
# Version 1: Removed system_id as it can be retrieved from the Client ID's DRM Certificate
|
# Version 1: Removed system_id as it can be retrieved from the Client ID's DRM Certificate
|
||||||
@@ -109,20 +140,20 @@ class Device:
|
|||||||
data = base64.b64decode(data)
|
data = base64.b64decode(data)
|
||||||
if not isinstance(data, bytes):
|
if not isinstance(data, bytes):
|
||||||
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
|
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
|
||||||
return cls(**cls.bin_format.parse(data))
|
return cls(**cls.supported_structure.parse(data))
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def load(cls, path: Union[Path, str]) -> Device:
|
def load(cls, path: Union[Path, str]) -> Device:
|
||||||
if not isinstance(path, (Path, str)):
|
if not isinstance(path, (Path, str)):
|
||||||
raise ValueError(f"Expecting Path object or path string, got {path!r}")
|
raise ValueError(f"Expecting Path object or path string, got {path!r}")
|
||||||
with Path(path).open(mode="rb") as f:
|
with Path(path).open(mode="rb") as f:
|
||||||
return cls(**cls.bin_format.parse_stream(f))
|
return cls(**cls.supported_structure.parse_stream(f))
|
||||||
|
|
||||||
def dumps(self) -> bytes:
|
def dumps(self) -> bytes:
|
||||||
private_key = self.private_key.export_key("DER") if self.private_key else None
|
private_key = self.private_key.export_key("DER") if self.private_key else None
|
||||||
return self.bin_format.build(dict(
|
return self.supported_structure.build(dict(
|
||||||
version=2,
|
version=2,
|
||||||
type=self.type.value,
|
type_=self.type.value,
|
||||||
security_level=self.security_level,
|
security_level=self.security_level,
|
||||||
flags=self.flags,
|
flags=self.flags,
|
||||||
private_key_len=len(private_key) if private_key else 0,
|
private_key_len=len(private_key) if private_key else 0,
|
||||||
@@ -138,5 +169,54 @@ class Device:
|
|||||||
path.parent.mkdir(parents=True, exist_ok=True)
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
path.write_bytes(self.dumps())
|
path.write_bytes(self.dumps())
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def migrate(cls, data: Union[bytes, str]) -> Device:
|
||||||
|
if isinstance(data, str):
|
||||||
|
data = base64.b64decode(data)
|
||||||
|
if not isinstance(data, bytes):
|
||||||
|
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
|
||||||
|
|
||||||
|
header = _Structures.header.parse(data)
|
||||||
|
if header.version == 2:
|
||||||
|
raise ValueError("Device Data is already migrated to the latest version.")
|
||||||
|
if header.version == 0 or header.version > 2:
|
||||||
|
# we have never used version 0, likely data that just so happened to use the WVD magic
|
||||||
|
raise ValueError("Device Data does not seem to be a WVD file (v0).")
|
||||||
|
|
||||||
|
if header.version == 1: # v1 to v2
|
||||||
|
data = _Structures.v1.parse(data)
|
||||||
|
data.version = 2 # update version to 2 to allow loading
|
||||||
|
data.flags = Container() # blank flags that may have been used in v1
|
||||||
|
|
||||||
|
vmp = FileHashes()
|
||||||
|
if data.vmp:
|
||||||
|
try:
|
||||||
|
vmp.ParseFromString(data.vmp)
|
||||||
|
except DecodeError as e:
|
||||||
|
raise DecodeError(f"Failed to parse VMP data as FileHashes, {e}")
|
||||||
|
data.vmp = vmp
|
||||||
|
|
||||||
|
client_id = ClientIdentification()
|
||||||
|
try:
|
||||||
|
client_id.ParseFromString(data.client_id)
|
||||||
|
except DecodeError as e:
|
||||||
|
raise DecodeError(f"Failed to parse VMP data as FileHashes, {e}")
|
||||||
|
|
||||||
|
new_vmp_data = data.vmp.SerializeToString()
|
||||||
|
if client_id.vmp_data and client_id.vmp_data != new_vmp_data:
|
||||||
|
logging.getLogger("migrate").warning("Client ID already has Verified Media Path data")
|
||||||
|
client_id.vmp_data = new_vmp_data
|
||||||
|
data.client_id = client_id.SerializeToString()
|
||||||
|
|
||||||
|
try:
|
||||||
|
data = _Structures.v2.build(data)
|
||||||
|
except ConstructError as e:
|
||||||
|
raise ValueError(f"Migration failed, {e}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
return cls.loads(data)
|
||||||
|
except ConstructError as e:
|
||||||
|
raise ValueError(f"Device Data seems to be corrupt or invalid, or migration failed, {e}")
|
||||||
|
|
||||||
|
|
||||||
__ALL__ = (Device,)
|
__ALL__ = (Device,)
|
||||||
|
|||||||
@@ -1,14 +1,18 @@
|
|||||||
import logging
|
import logging
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
from zlib import crc32
|
||||||
|
|
||||||
import click
|
import click
|
||||||
import requests
|
import requests
|
||||||
|
from construct import ConstructError
|
||||||
|
from unidecode import unidecode, UnidecodeError
|
||||||
|
|
||||||
from pywidevine import __version__
|
from pywidevine import __version__
|
||||||
from pywidevine.cdm import Cdm
|
from pywidevine.cdm import Cdm
|
||||||
from pywidevine.device import Device
|
from pywidevine.device import Device
|
||||||
from pywidevine.license_protocol_pb2 import LicenseType
|
from pywidevine.license_protocol_pb2 import LicenseType, FileHashes
|
||||||
|
|
||||||
|
|
||||||
@click.group(invoke_without_command=True)
|
@click.group(invoke_without_command=True)
|
||||||
@@ -113,8 +117,10 @@ def license_(device: Path, pssh: str, server: str, type_: str, raw: bool, privac
|
|||||||
|
|
||||||
@main.command()
|
@main.command()
|
||||||
@click.argument("device", type=Path)
|
@click.argument("device", type=Path)
|
||||||
|
@click.option("-p", "--privacy", is_flag=True, default=False,
|
||||||
|
help="Use Privacy Mode, off by default.")
|
||||||
@click.pass_context
|
@click.pass_context
|
||||||
def test(ctx: click.Context, device: Path):
|
def test(ctx: click.Context, device: Path, privacy: bool):
|
||||||
"""
|
"""
|
||||||
Test the CDM code by getting Content Keys for Bitmovin's Art of Motion example.
|
Test the CDM code by getting Content Keys for Bitmovin's Art of Motion example.
|
||||||
https://bitmovin.com/demos/drm
|
https://bitmovin.com/demos/drm
|
||||||
@@ -148,5 +154,108 @@ def test(ctx: click.Context, device: Path):
|
|||||||
pssh=pssh,
|
pssh=pssh,
|
||||||
server=license_server,
|
server=license_server,
|
||||||
type_=LicenseType.Name(license_type),
|
type_=LicenseType.Name(license_type),
|
||||||
raw=raw
|
raw=raw,
|
||||||
|
privacy=privacy
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@main.command()
|
||||||
|
@click.option("-t", "--type", "type_", type=click.Choice([x.name for x in Device.Types], case_sensitive=False),
|
||||||
|
required=True, help="Device Type")
|
||||||
|
@click.option("-l", "--level", type=click.IntRange(1, 3), required=True, help="Device Security Level")
|
||||||
|
@click.option("-k", "--key", type=Path, required=True, help="Device RSA Private Key in PEM or DER format")
|
||||||
|
@click.option("-c", "--client_id", type=Path, required=True, help="Widevine ClientIdentification Blob file")
|
||||||
|
@click.option("-v", "--vmp", type=Path, default=None, help="Widevine FileHashes Blob file")
|
||||||
|
@click.option("-o", "--output", type=Path, default=None, help="Output Directory")
|
||||||
|
@click.pass_context
|
||||||
|
def create_device(
|
||||||
|
ctx: click.Context,
|
||||||
|
type_: str,
|
||||||
|
level: int,
|
||||||
|
key: Path,
|
||||||
|
client_id: Path,
|
||||||
|
vmp: Optional[Path] = None,
|
||||||
|
output: Optional[Path] = None
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Create a Widevine Device (.wvd) file from an RSA Private Key (PEM or DER) and Client ID Blob.
|
||||||
|
Optionally also a VMP (Verified Media Path) Blob, which will be stored in the Client ID.
|
||||||
|
"""
|
||||||
|
if not key.is_file():
|
||||||
|
raise click.UsageError("key: Not a path to a file, or it doesn't exist.", ctx)
|
||||||
|
if not client_id.is_file():
|
||||||
|
raise click.UsageError("client_id: Not a path to a file, or it doesn't exist.", ctx)
|
||||||
|
if vmp and not vmp.is_file():
|
||||||
|
raise click.UsageError("vmp: Not a path to a file, or it doesn't exist.", ctx)
|
||||||
|
|
||||||
|
log = logging.getLogger("create-device")
|
||||||
|
|
||||||
|
device = Device(
|
||||||
|
type_=Device.Types[type_.upper()],
|
||||||
|
security_level=level,
|
||||||
|
flags=None,
|
||||||
|
private_key=key.read_bytes(),
|
||||||
|
client_id=client_id.read_bytes()
|
||||||
|
)
|
||||||
|
|
||||||
|
if vmp:
|
||||||
|
new_vmp_data = vmp.read_bytes()
|
||||||
|
if device.client_id.vmp_data and device.client_id.vmp_data != new_vmp_data:
|
||||||
|
log.warning("Client ID already has Verified Media Path data")
|
||||||
|
device.client_id.vmp_data = new_vmp_data
|
||||||
|
|
||||||
|
client_info = {}
|
||||||
|
for entry in device.client_id.client_info:
|
||||||
|
client_info[entry.name] = entry.value
|
||||||
|
|
||||||
|
wvd_bin = device.dumps()
|
||||||
|
|
||||||
|
name = f"{client_info['company_name']} {client_info['model_name']}"
|
||||||
|
if client_info.get("widevine_cdm_version"):
|
||||||
|
name += f" {client_info['widevine_cdm_version']}"
|
||||||
|
name += f" {crc32(wvd_bin).to_bytes(4, 'big').hex()}"
|
||||||
|
|
||||||
|
try:
|
||||||
|
name = unidecode(name.strip().lower().replace(" ", "_"))
|
||||||
|
except UnidecodeError as e:
|
||||||
|
raise click.ClickException(f"Failed to sanitize name, {e}")
|
||||||
|
|
||||||
|
out_path = (output or Path.cwd()) / f"{name}_{device.system_id}_l{device.security_level}.wvd"
|
||||||
|
out_path.write_bytes(wvd_bin)
|
||||||
|
|
||||||
|
log.info(f"Created Widevine Device (.wvd) file, {out_path.name}")
|
||||||
|
log.info(f" + Type: {device.type.name}")
|
||||||
|
log.info(f" + System ID: {device.system_id}")
|
||||||
|
log.info(f" + Security Level: {device.security_level}")
|
||||||
|
log.info(f" + Flags: {device.flags}")
|
||||||
|
log.info(f" + Private Key: {bool(device.private_key)} ({device.private_key.size_in_bits()} bit)")
|
||||||
|
log.info(f" + Client ID: {bool(device.client_id)} ({len(device.client_id.SerializeToString())} bytes)")
|
||||||
|
if device.client_id.vmp_data:
|
||||||
|
file_hashes_ = FileHashes()
|
||||||
|
file_hashes_.ParseFromString(device.client_id.vmp_data)
|
||||||
|
log.info(f" + VMP: True ({len(file_hashes_.signatures)} signatures)")
|
||||||
|
else:
|
||||||
|
log.info(f" + VMP: False")
|
||||||
|
log.info(f" + Saved to: {out_path.absolute()}")
|
||||||
|
|
||||||
|
|
||||||
|
@main.command()
|
||||||
|
@click.argument("device", type=Path)
|
||||||
|
@click.pass_context
|
||||||
|
def migrate(ctx: click.Context, device: Path) -> None:
|
||||||
|
"""Upgrade from earlier versions of the Widevine Device (.wvd) format."""
|
||||||
|
if not device.is_file():
|
||||||
|
raise click.UsageError("device: Not a path to a file, or it doesn't exist.", ctx)
|
||||||
|
|
||||||
|
log = logging.getLogger("migrate")
|
||||||
|
|
||||||
|
try:
|
||||||
|
new_device = Device.migrate(device.read_bytes())
|
||||||
|
except (ConstructError, ValueError) as e:
|
||||||
|
raise click.UsageError(str(e), ctx)
|
||||||
|
|
||||||
|
# save
|
||||||
|
log.debug(new_device)
|
||||||
|
new_device.dump(device)
|
||||||
|
|
||||||
|
log.info("Successfully migrated the Widevine Device (.wvd) file!")
|
||||||
|
|||||||
Reference in New Issue
Block a user