Compare commits
22 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d72607b080 | ||
|
|
60bb779c59 | ||
|
|
e1532b1451 | ||
|
|
e1951d20d0 | ||
|
|
35abd2962f | ||
|
|
b262e115d3 | ||
|
|
95982725c3 | ||
|
|
70e79825b3 | ||
|
|
f2174dfa72 | ||
|
|
fe21bfe88c | ||
|
|
93f70f73c2 | ||
|
|
1442c945cc | ||
|
|
a729648a34 | ||
|
|
3d6ddb8dcd | ||
|
|
b41f09bee4 | ||
|
|
db80776ac0 | ||
|
|
02ca1b00c9 | ||
|
|
7b06a3c053 | ||
|
|
14126c67b1 | ||
|
|
5e93d6321d | ||
|
|
1f389dbab9 | ||
|
|
ac4c8affb0 |
42
CHANGELOG.md
42
CHANGELOG.md
@@ -5,7 +5,41 @@ All notable changes to this project will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [1.0.1] - 2021-07-21
|
||||
## [1.1.1] - 2022-07-22
|
||||
|
||||
### Fixed
|
||||
|
||||
- The --vmp argument of the create-device command is now optional.
|
||||
|
||||
## [1.1.0] - 2022-07-21
|
||||
|
||||
### Added
|
||||
|
||||
- Added support for setting a Service Certificate in SignedDrmCertificate form as well as raw DrmCertificate form.
|
||||
However, It's unlikely for the service to provide the certificate in raw DrmCertificate form without a signature.
|
||||
- Added a CLI command `create-device` to create Widevine Device (`.wvd`) files from RSA PEM/DER Private Keys and
|
||||
Client ID blobs. You can also provide VMP (FileHashes) data which will be merged into the Client ID blob.
|
||||
- Added a CLI command `migrate` that uses `Device.migrate()` and `dump()` to migrate older v1 Widevine Device files
|
||||
to v2.
|
||||
- Added the v1 Structure of Widevine Devices for migration use.
|
||||
- Added `Device.migrate()` class method that effectively loads older format WVD data. You can then use `dumps()` to
|
||||
get back the WVD data in the latest supported format.
|
||||
- Added ability to use Privacy mode on the test command.
|
||||
|
||||
### Changed
|
||||
|
||||
- Set Service Certificates are now stored as the raw underlying DrmCertificate as the signature data is unused by
|
||||
the CDM.
|
||||
- Moved all Widevine Device structures under a Structures class.
|
||||
- I removed the `send_key_control_nonce` flag from all Structures even though it was technically used.
|
||||
This is because the flag was never used as of this project, and I do not want to take up the flag slot.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Devices `dump()` function now uses the correct `type_` parameter when building the struct.
|
||||
- Fixed release date year of v1.0.0 and v1.0.1 in the changelog.
|
||||
|
||||
## [1.0.1] - 2022-07-21
|
||||
|
||||
### Added
|
||||
|
||||
@@ -24,14 +58,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
### Fixed
|
||||
|
||||
- Cdm's `set_service_certificate()` now correctly raises a DecodeError on Decode Error instead of a ValueError.
|
||||
- CDMs `set_service_certificate()` now correctly raises a DecodeError on Decode Error instead of a ValueError.
|
||||
- Context Data will now always match to their corresponding License Responses. This fixes an issue where creating
|
||||
a second challenge would overwrite the context data of the first challenge. Parsing the first challenge after
|
||||
would result in either a key decrypt error, or garbage key data.
|
||||
|
||||
## [1.0.0] - 2021-07-20
|
||||
## [1.0.0] - 2022-07-20
|
||||
|
||||
Initial Release.
|
||||
|
||||
[1.1.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.1.1
|
||||
[1.1.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.1.0
|
||||
[1.0.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.0.1
|
||||
[1.0.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.0.0
|
||||
|
||||
14
poetry.lock
generated
14
poetry.lock
generated
@@ -137,6 +137,14 @@ category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
|
||||
[[package]]
|
||||
name = "unidecode"
|
||||
version = "1.3.4"
|
||||
description = "ASCII transliterations of Unicode text"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.5"
|
||||
|
||||
[[package]]
|
||||
name = "urllib3"
|
||||
version = "1.26.10"
|
||||
@@ -165,7 +173,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-
|
||||
[metadata]
|
||||
lock-version = "1.1"
|
||||
python-versions = ">=3.7,<3.11"
|
||||
content-hash = "9c6a76629e0f0a4e98b6c47707899519f930debd70312d2e909eb42f94cd212f"
|
||||
content-hash = "0720732d3b990a11e3b35f4604103364ae5654d1ac6e31ead17c03e566a016d0"
|
||||
|
||||
[metadata.files]
|
||||
certifi = []
|
||||
@@ -195,5 +203,9 @@ pycryptodome = []
|
||||
pymp4 = []
|
||||
requests = []
|
||||
typing-extensions = []
|
||||
unidecode = [
|
||||
{file = "Unidecode-1.3.4-py3-none-any.whl", hash = "sha256:afa04efcdd818a93237574791be9b2817d7077c25a068b00f8cff7baa4e59257"},
|
||||
{file = "Unidecode-1.3.4.tar.gz", hash = "sha256:8e4352fb93d5a735c788110d2e7ac8e8031eb06ccbfe8d324ab71735015f9342"},
|
||||
]
|
||||
urllib3 = []
|
||||
zipp = []
|
||||
|
||||
@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[tool.poetry]
|
||||
name = "pywidevine"
|
||||
version = "1.0.1"
|
||||
version = "1.1.1"
|
||||
description = "Widevine CDM (Content Decryption Module) implementation in Python."
|
||||
authors = ["rlaphoenix <rlaphoenix@pm.me>"]
|
||||
license = "GPL-3.0-only"
|
||||
@@ -33,6 +33,7 @@ pycryptodome = "^3.15.0"
|
||||
click = "^8.1.3"
|
||||
requests = "^2.28.1"
|
||||
lxml = "^4.9.1"
|
||||
Unidecode = "^1.3.4"
|
||||
|
||||
[tool.poetry.scripts]
|
||||
pywidevine = "pywidevine.main:main"
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "1.0.1"
|
||||
__version__ = "1.1.1"
|
||||
|
||||
@@ -84,10 +84,10 @@ class Cdm:
|
||||
self.init_data = PSSH.get_as_box(pssh).init_data
|
||||
|
||||
self.session_id = get_random_bytes(16)
|
||||
self.service_certificate: Optional[SignedMessage] = None
|
||||
self.service_certificate: Optional[DrmCertificate] = None
|
||||
self.context: dict[bytes, tuple[bytes, bytes]] = {}
|
||||
|
||||
def set_service_certificate(self, certificate: Union[bytes, str]) -> SignedMessage:
|
||||
def set_service_certificate(self, certificate: Union[bytes, str]) -> DrmCertificate:
|
||||
"""
|
||||
Set a Service Privacy Certificate for Privacy Mode. (optional but recommended)
|
||||
|
||||
@@ -96,7 +96,7 @@ class Cdm:
|
||||
Some services have their own, but most use the common privacy cert,
|
||||
(common_privacy_cert).
|
||||
|
||||
Returns the parsed Signed Message if successful, otherwise raises a DecodeError.
|
||||
Returns the parsed Drm Certificate if successful, otherwise raises a DecodeError.
|
||||
|
||||
The Service Certificate is used to encrypt Client IDs in Licenses. This is also
|
||||
known as Privacy Mode and may be required for some services or for some devices.
|
||||
@@ -106,13 +106,48 @@ class Cdm:
|
||||
certificate = base64.b64decode(certificate) # assuming base64
|
||||
|
||||
signed_message = SignedMessage()
|
||||
try:
|
||||
signed_message.ParseFromString(certificate)
|
||||
except DecodeError as e:
|
||||
raise DecodeError(f"Could not parse certificate as a Signed Message: {e}")
|
||||
signed_drm_certificate = SignedDrmCertificate()
|
||||
drm_certificate = DrmCertificate()
|
||||
|
||||
self.service_certificate = signed_message
|
||||
return signed_message
|
||||
# Note: A secure CDM would likely reject any Service Certificate that is
|
||||
# not either a SignedMessage or a SignedDrmCertificate. This is because
|
||||
# the DrmCertificate itself is not signed. This CDM does not verify the
|
||||
# signatures as I'm not sure what HMAC key is used. At this stage of the
|
||||
# CDM flow, we wouldn't have any mac_keys, and those might not work for
|
||||
# verifying service certificate signatures (likely not).
|
||||
|
||||
# All these 3 schemas can sort of parse each other in a minimal buggy way,
|
||||
# so we have to parse down the full chain instead of relaying each step
|
||||
|
||||
try: # SignedMessage input
|
||||
signed_message.ParseFromString(certificate)
|
||||
signed_drm_certificate.ParseFromString(signed_message.msg)
|
||||
if not signed_drm_certificate.drm_certificate:
|
||||
raise DecodeError()
|
||||
drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate)
|
||||
self.service_certificate = drm_certificate
|
||||
return self.service_certificate
|
||||
except DecodeError:
|
||||
pass
|
||||
|
||||
try: # SignedDrmCertificate input
|
||||
signed_drm_certificate.ParseFromString(certificate)
|
||||
if not signed_drm_certificate.drm_certificate:
|
||||
raise DecodeError()
|
||||
drm_certificate.ParseFromString(signed_drm_certificate.drm_certificate)
|
||||
self.service_certificate = drm_certificate
|
||||
return self.service_certificate
|
||||
except DecodeError:
|
||||
pass
|
||||
|
||||
try: # DrmCertificate input
|
||||
drm_certificate.ParseFromString(certificate)
|
||||
self.service_certificate = drm_certificate
|
||||
return self.service_certificate
|
||||
except DecodeError:
|
||||
pass
|
||||
|
||||
raise DecodeError("Could not parse certificate as a Service Certificate")
|
||||
|
||||
def get_license_challenge(self, type_: LicenseType = LicenseType.STREAMING, privacy_mode: bool = True) -> bytes:
|
||||
"""
|
||||
@@ -253,7 +288,7 @@ class Cdm:
|
||||
@staticmethod
|
||||
def encrypt_client_id(
|
||||
client_id: ClientIdentification,
|
||||
service_certificate: Union[SignedMessage, SignedDrmCertificate, DrmCertificate],
|
||||
service_certificate: DrmCertificate,
|
||||
key: bytes = None,
|
||||
iv: bytes = None
|
||||
) -> EncryptedClientIdentification:
|
||||
@@ -261,18 +296,8 @@ class Cdm:
|
||||
privacy_key = key or get_random_bytes(16)
|
||||
privacy_iv = iv or get_random_bytes(16)
|
||||
|
||||
if isinstance(service_certificate, SignedMessage):
|
||||
signed_service_certificate = SignedDrmCertificate()
|
||||
signed_service_certificate.ParseFromString(service_certificate.msg)
|
||||
service_certificate = signed_service_certificate
|
||||
|
||||
if isinstance(service_certificate, SignedDrmCertificate):
|
||||
service_service_drm_certificate = DrmCertificate()
|
||||
service_service_drm_certificate.ParseFromString(service_certificate.drm_certificate)
|
||||
service_certificate = service_service_drm_certificate
|
||||
|
||||
if not isinstance(service_certificate, DrmCertificate):
|
||||
raise ValueError(f"Service Certificate is in an unexpected type {service_certificate!r}")
|
||||
raise ValueError(f"Expecting Service Certificate to be a DrmCertificate, not {service_certificate!r}")
|
||||
|
||||
enc_client_id = EncryptedClientIdentification()
|
||||
enc_client_id.provider_id = service_certificate.provider_id
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import logging
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
from construct import BitStruct, Bytes, Const
|
||||
from construct import BitStruct, Bytes, Const, ConstructError, Container
|
||||
from construct import Enum as CEnum
|
||||
from construct import Flag, Int8ub, Int16ub
|
||||
from construct import Int8ub, Int16ub
|
||||
from construct import Optional as COptional
|
||||
from construct import Padded, Padding, Struct, this
|
||||
from Crypto.PublicKey import RSA
|
||||
@@ -21,12 +22,16 @@ class _Types(Enum):
|
||||
ANDROID = 2
|
||||
|
||||
|
||||
class Device:
|
||||
# needed so bin_format can enumerate the types
|
||||
Types = _Types
|
||||
class _Structures:
|
||||
magic = Const(b"WVD")
|
||||
|
||||
bin_format = Struct(
|
||||
"signature" / Const(b"WVD"),
|
||||
header = Struct(
|
||||
"signature" / magic,
|
||||
"version" / Int8ub
|
||||
)
|
||||
|
||||
v2 = Struct(
|
||||
"signature" / magic,
|
||||
"version" / Const(Int8ub, 2),
|
||||
"type_" / CEnum(
|
||||
Int8ub,
|
||||
@@ -34,8 +39,8 @@ class Device:
|
||||
),
|
||||
"security_level" / Int8ub,
|
||||
"flags" / Padded(1, COptional(BitStruct(
|
||||
Padding(7),
|
||||
"send_key_control_nonce" / Flag # deprecated, do not use
|
||||
# no per-device flags yet
|
||||
Padding(8)
|
||||
))),
|
||||
"private_key_len" / Int16ub,
|
||||
"private_key" / Bytes(this.private_key_len),
|
||||
@@ -43,6 +48,32 @@ class Device:
|
||||
"client_id" / Bytes(this.client_id_len)
|
||||
)
|
||||
|
||||
v1 = Struct(
|
||||
"signature" / magic,
|
||||
"version" / Const(Int8ub, 1),
|
||||
"type_" / CEnum(
|
||||
Int8ub,
|
||||
**{t.name: t.value for t in _Types}
|
||||
),
|
||||
"security_level" / Int8ub,
|
||||
"flags" / Padded(1, COptional(BitStruct(
|
||||
# no per-device flags yet
|
||||
Padding(8)
|
||||
))),
|
||||
"private_key_len" / Int16ub,
|
||||
"private_key" / Bytes(this.private_key_len),
|
||||
"client_id_len" / Int16ub,
|
||||
"client_id" / Bytes(this.client_id_len),
|
||||
"vmp_len" / Int16ub,
|
||||
"vmp" / Bytes(this.vmp_len)
|
||||
)
|
||||
|
||||
|
||||
class Device:
|
||||
Types = _Types
|
||||
Structures = _Structures
|
||||
supported_structure = Structures.v2
|
||||
|
||||
# == Bin Format Revisions == #
|
||||
# Version 2: Removed vmp and vmp_len as it should already be within the Client ID
|
||||
# Version 1: Removed system_id as it can be retrieved from the Client ID's DRM Certificate
|
||||
@@ -109,20 +140,20 @@ class Device:
|
||||
data = base64.b64decode(data)
|
||||
if not isinstance(data, bytes):
|
||||
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
|
||||
return cls(**cls.bin_format.parse(data))
|
||||
return cls(**cls.supported_structure.parse(data))
|
||||
|
||||
@classmethod
|
||||
def load(cls, path: Union[Path, str]) -> Device:
|
||||
if not isinstance(path, (Path, str)):
|
||||
raise ValueError(f"Expecting Path object or path string, got {path!r}")
|
||||
with Path(path).open(mode="rb") as f:
|
||||
return cls(**cls.bin_format.parse_stream(f))
|
||||
return cls(**cls.supported_structure.parse_stream(f))
|
||||
|
||||
def dumps(self) -> bytes:
|
||||
private_key = self.private_key.export_key("DER") if self.private_key else None
|
||||
return self.bin_format.build(dict(
|
||||
return self.supported_structure.build(dict(
|
||||
version=2,
|
||||
type=self.type.value,
|
||||
type_=self.type.value,
|
||||
security_level=self.security_level,
|
||||
flags=self.flags,
|
||||
private_key_len=len(private_key) if private_key else 0,
|
||||
@@ -138,5 +169,54 @@ class Device:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_bytes(self.dumps())
|
||||
|
||||
@classmethod
|
||||
def migrate(cls, data: Union[bytes, str]) -> Device:
|
||||
if isinstance(data, str):
|
||||
data = base64.b64decode(data)
|
||||
if not isinstance(data, bytes):
|
||||
raise ValueError(f"Expecting Bytes or Base64 input, got {data!r}")
|
||||
|
||||
header = _Structures.header.parse(data)
|
||||
if header.version == 2:
|
||||
raise ValueError("Device Data is already migrated to the latest version.")
|
||||
if header.version == 0 or header.version > 2:
|
||||
# we have never used version 0, likely data that just so happened to use the WVD magic
|
||||
raise ValueError("Device Data does not seem to be a WVD file (v0).")
|
||||
|
||||
if header.version == 1: # v1 to v2
|
||||
data = _Structures.v1.parse(data)
|
||||
data.version = 2 # update version to 2 to allow loading
|
||||
data.flags = Container() # blank flags that may have been used in v1
|
||||
|
||||
vmp = FileHashes()
|
||||
if data.vmp:
|
||||
try:
|
||||
vmp.ParseFromString(data.vmp)
|
||||
except DecodeError as e:
|
||||
raise DecodeError(f"Failed to parse VMP data as FileHashes, {e}")
|
||||
data.vmp = vmp
|
||||
|
||||
client_id = ClientIdentification()
|
||||
try:
|
||||
client_id.ParseFromString(data.client_id)
|
||||
except DecodeError as e:
|
||||
raise DecodeError(f"Failed to parse VMP data as FileHashes, {e}")
|
||||
|
||||
new_vmp_data = data.vmp.SerializeToString()
|
||||
if client_id.vmp_data and client_id.vmp_data != new_vmp_data:
|
||||
logging.getLogger("migrate").warning("Client ID already has Verified Media Path data")
|
||||
client_id.vmp_data = new_vmp_data
|
||||
data.client_id = client_id.SerializeToString()
|
||||
|
||||
try:
|
||||
data = _Structures.v2.build(data)
|
||||
except ConstructError as e:
|
||||
raise ValueError(f"Migration failed, {e}")
|
||||
|
||||
try:
|
||||
return cls.loads(data)
|
||||
except ConstructError as e:
|
||||
raise ValueError(f"Device Data seems to be corrupt or invalid, or migration failed, {e}")
|
||||
|
||||
|
||||
__ALL__ = (Device,)
|
||||
|
||||
@@ -1,14 +1,18 @@
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from zlib import crc32
|
||||
|
||||
import click
|
||||
import requests
|
||||
from construct import ConstructError
|
||||
from unidecode import unidecode, UnidecodeError
|
||||
|
||||
from pywidevine import __version__
|
||||
from pywidevine.cdm import Cdm
|
||||
from pywidevine.device import Device
|
||||
from pywidevine.license_protocol_pb2 import LicenseType
|
||||
from pywidevine.license_protocol_pb2 import LicenseType, FileHashes
|
||||
|
||||
|
||||
@click.group(invoke_without_command=True)
|
||||
@@ -113,8 +117,10 @@ def license_(device: Path, pssh: str, server: str, type_: str, raw: bool, privac
|
||||
|
||||
@main.command()
|
||||
@click.argument("device", type=Path)
|
||||
@click.option("-p", "--privacy", is_flag=True, default=False,
|
||||
help="Use Privacy Mode, off by default.")
|
||||
@click.pass_context
|
||||
def test(ctx: click.Context, device: Path):
|
||||
def test(ctx: click.Context, device: Path, privacy: bool):
|
||||
"""
|
||||
Test the CDM code by getting Content Keys for Bitmovin's Art of Motion example.
|
||||
https://bitmovin.com/demos/drm
|
||||
@@ -148,5 +154,108 @@ def test(ctx: click.Context, device: Path):
|
||||
pssh=pssh,
|
||||
server=license_server,
|
||||
type_=LicenseType.Name(license_type),
|
||||
raw=raw
|
||||
raw=raw,
|
||||
privacy=privacy
|
||||
)
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.option("-t", "--type", "type_", type=click.Choice([x.name for x in Device.Types], case_sensitive=False),
|
||||
required=True, help="Device Type")
|
||||
@click.option("-l", "--level", type=click.IntRange(1, 3), required=True, help="Device Security Level")
|
||||
@click.option("-k", "--key", type=Path, required=True, help="Device RSA Private Key in PEM or DER format")
|
||||
@click.option("-c", "--client_id", type=Path, required=True, help="Widevine ClientIdentification Blob file")
|
||||
@click.option("-v", "--vmp", type=Path, default=None, help="Widevine FileHashes Blob file")
|
||||
@click.option("-o", "--output", type=Path, default=None, help="Output Directory")
|
||||
@click.pass_context
|
||||
def create_device(
|
||||
ctx: click.Context,
|
||||
type_: str,
|
||||
level: int,
|
||||
key: Path,
|
||||
client_id: Path,
|
||||
vmp: Optional[Path] = None,
|
||||
output: Optional[Path] = None
|
||||
) -> None:
|
||||
"""
|
||||
Create a Widevine Device (.wvd) file from an RSA Private Key (PEM or DER) and Client ID Blob.
|
||||
Optionally also a VMP (Verified Media Path) Blob, which will be stored in the Client ID.
|
||||
"""
|
||||
if not key.is_file():
|
||||
raise click.UsageError("key: Not a path to a file, or it doesn't exist.", ctx)
|
||||
if not client_id.is_file():
|
||||
raise click.UsageError("client_id: Not a path to a file, or it doesn't exist.", ctx)
|
||||
if vmp and not vmp.is_file():
|
||||
raise click.UsageError("vmp: Not a path to a file, or it doesn't exist.", ctx)
|
||||
|
||||
log = logging.getLogger("create-device")
|
||||
|
||||
device = Device(
|
||||
type_=Device.Types[type_.upper()],
|
||||
security_level=level,
|
||||
flags=None,
|
||||
private_key=key.read_bytes(),
|
||||
client_id=client_id.read_bytes()
|
||||
)
|
||||
|
||||
if vmp:
|
||||
new_vmp_data = vmp.read_bytes()
|
||||
if device.client_id.vmp_data and device.client_id.vmp_data != new_vmp_data:
|
||||
log.warning("Client ID already has Verified Media Path data")
|
||||
device.client_id.vmp_data = new_vmp_data
|
||||
|
||||
client_info = {}
|
||||
for entry in device.client_id.client_info:
|
||||
client_info[entry.name] = entry.value
|
||||
|
||||
wvd_bin = device.dumps()
|
||||
|
||||
name = f"{client_info['company_name']} {client_info['model_name']}"
|
||||
if client_info.get("widevine_cdm_version"):
|
||||
name += f" {client_info['widevine_cdm_version']}"
|
||||
name += f" {crc32(wvd_bin).to_bytes(4, 'big').hex()}"
|
||||
|
||||
try:
|
||||
name = unidecode(name.strip().lower().replace(" ", "_"))
|
||||
except UnidecodeError as e:
|
||||
raise click.ClickException(f"Failed to sanitize name, {e}")
|
||||
|
||||
out_path = (output or Path.cwd()) / f"{name}_{device.system_id}_l{device.security_level}.wvd"
|
||||
out_path.write_bytes(wvd_bin)
|
||||
|
||||
log.info(f"Created Widevine Device (.wvd) file, {out_path.name}")
|
||||
log.info(f" + Type: {device.type.name}")
|
||||
log.info(f" + System ID: {device.system_id}")
|
||||
log.info(f" + Security Level: {device.security_level}")
|
||||
log.info(f" + Flags: {device.flags}")
|
||||
log.info(f" + Private Key: {bool(device.private_key)} ({device.private_key.size_in_bits()} bit)")
|
||||
log.info(f" + Client ID: {bool(device.client_id)} ({len(device.client_id.SerializeToString())} bytes)")
|
||||
if device.client_id.vmp_data:
|
||||
file_hashes_ = FileHashes()
|
||||
file_hashes_.ParseFromString(device.client_id.vmp_data)
|
||||
log.info(f" + VMP: True ({len(file_hashes_.signatures)} signatures)")
|
||||
else:
|
||||
log.info(f" + VMP: False")
|
||||
log.info(f" + Saved to: {out_path.absolute()}")
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.argument("device", type=Path)
|
||||
@click.pass_context
|
||||
def migrate(ctx: click.Context, device: Path) -> None:
|
||||
"""Upgrade from earlier versions of the Widevine Device (.wvd) format."""
|
||||
if not device.is_file():
|
||||
raise click.UsageError("device: Not a path to a file, or it doesn't exist.", ctx)
|
||||
|
||||
log = logging.getLogger("migrate")
|
||||
|
||||
try:
|
||||
new_device = Device.migrate(device.read_bytes())
|
||||
except (ConstructError, ValueError) as e:
|
||||
raise click.UsageError(str(e), ctx)
|
||||
|
||||
# save
|
||||
log.debug(new_device)
|
||||
new_device.dump(device)
|
||||
|
||||
log.info("Successfully migrated the Widevine Device (.wvd) file!")
|
||||
|
||||
Reference in New Issue
Block a user