Compare commits
45 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fc77f064ca | ||
|
|
f30ca45550 | ||
|
|
576d7212d5 | ||
|
|
4f32b4b790 | ||
|
|
2e2b5d528a | ||
|
|
2179987986 | ||
|
|
665b77bd24 | ||
|
|
3499c0cf4d | ||
|
|
e4e109b9f3 | ||
|
|
1d606a9e54 | ||
|
|
f36977ef19 | ||
|
|
dd1a355691 | ||
|
|
6eceaaf410 | ||
|
|
bd62b8d131 | ||
|
|
11a2358002 | ||
|
|
f2ed83205b | ||
|
|
796cf7ffb0 | ||
|
|
2c33af79df | ||
|
|
93d9561fac | ||
|
|
c73078b7a9 | ||
|
|
2445297ae8 | ||
|
|
01416f6513 | ||
|
|
60e3ef0201 | ||
|
|
a1844fb195 | ||
|
|
26d81a7bef | ||
|
|
27a701aaea | ||
|
|
2a87d55e20 | ||
|
|
76c7a402eb | ||
|
|
10fb954097 | ||
|
|
9d7eaf4949 | ||
|
|
0537c9666c | ||
|
|
fc47bbb436 | ||
|
|
1ea57865ad | ||
|
|
f09a06857a | ||
|
|
e4f6a23725 | ||
|
|
f21a21712b | ||
|
|
a1494a3742 | ||
|
|
5b13e1a689 | ||
|
|
c9288dc391 | ||
|
|
7640d6fcab | ||
|
|
3d794ad659 | ||
|
|
5788dde7b1 | ||
|
|
ddf755f82f | ||
|
|
e8785fcd84 | ||
|
|
c969d80931 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1,3 +1,6 @@
|
||||
# pywidevine
|
||||
*.wvd
|
||||
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
|
||||
82
CHANGELOG.md
82
CHANGELOG.md
@@ -5,6 +5,86 @@ All notable changes to this project will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [1.4.0] - 2022-08-06
|
||||
|
||||
This release is a face-lift for the PSSH class with a moderate amount of Cdm and Serve interface changes.
|
||||
You will likely need to make a moderate amount of changes in your client code, please study the changelog.
|
||||
|
||||
Please note that while it was always privatized as `_sessions`, accessing the Session directly for any purpose was
|
||||
never recommended or supported. With v1.4.0, there will be drastic problems if you continue to do so. One of the
|
||||
few reasons to do that was to get the license keys which is no longer required with CDMs new `get_keys()` method.
|
||||
|
||||
RemoteCdm minimum supported Serve API version is now v1.4.0.
|
||||
|
||||
### Added
|
||||
|
||||
- The PSSH class now has a `new()` method to craft a new PSSH box. The box can be crafted from arbitrary init_data
|
||||
and/or key_ids. If only key_ids is supplied a new Widevine Cenc Header will be created and the key IDs will be put
|
||||
into it. This allows you to make compliant v0 or v1 boxes with as little data as just a Key ID.
|
||||
- The PSSH class now has `dump()` and `dumps()` methods to serialize the data as binary or base64 respectively. It will
|
||||
be serialized as a pymp4 PSSH box, ready to be used in an MP4 file.
|
||||
- Cdm now has a method `get_keys()` to get the keys of the loaded license. This is the alternative to manually
|
||||
accessing the keys by navigating the `_sessions` class instance variable.
|
||||
- Serve API now also has a `/get_keys` endpoint to call the `get_keys()` method of the underlying Cdm session.
|
||||
|
||||
### Changed
|
||||
|
||||
- Cdm and RemoteCdm now expect a PSSH object as the `init_data` param for `get_license_challenge`. You can no longer
|
||||
provide it anything else, that includes base64 or bytes form. It must be a PSSH object.
|
||||
- Serve no longer returns license keys in the response of the `/keys` endpoint.
|
||||
- Serve has changed the endpoint `/challenge` to `/get_license_challenge` and `/keys` to `/parse_license`. This is to
|
||||
be consistent with the method names of the underlying Cdm class.
|
||||
- The PSSH class has been reworked from being a static helper class to a proper PSSH class.
|
||||
- PSSH.from_playready_pssh is now a class method and returns as a PSSH object.
|
||||
|
||||
### Removed
|
||||
|
||||
- PSSH.get_as_box has been removed and merged into the PSSH constructor.
|
||||
- PSSH.from_key_ids has been removed entirely, you should now use `PSSH.new(key_ids=...)` instead.
|
||||
- All uses of a local Session() object has been removed from RemoteCdm. The session is now fully controlled by the
|
||||
remote API and de-synchronization by external alteration or unexpected exceptions is no longer a possibility.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Various uses of the `key_ids` field of WidevinePsshData proto has been fixed in the PSSH class.
|
||||
- Fixed a few Serve API crashes in edge cases with improved error handling on Cdm method calls.
|
||||
|
||||
## [1.3.1] - 2022-08-04
|
||||
|
||||
### Added
|
||||
|
||||
- Cdm and RemoteCdm can now be supplied a string value for `device_type` for scenarios where providing it as a string
|
||||
is more convenient (e.g., from Config files).
|
||||
|
||||
### Fixed
|
||||
|
||||
- The `force_privacy_mode` key no longer needs to be defined at all in the configuration file. This was previously
|
||||
crashing serve APIs if it wasn't set before starting.
|
||||
- RemoteCdm's Server version check will no longer fail under certain serving conditions e.g., Caddy prepending `Caddy`
|
||||
to the Server header value. It also fixes case sensitivity and removed the full url from the header.
|
||||
|
||||
## [1.3.0] - 2022-08-04
|
||||
|
||||
### Added
|
||||
|
||||
- New RemoteCdm class to be used as Client code for the `serve` Remote CDM API server. The RemoteCdm should be used
|
||||
entirely separately from the normal Cdm class. All serve APIs must update to v1.3.0 to be compatible. The RemoteCdm
|
||||
verifies the server version to ensure compatibility. Changes to the serve API schema will be immediately reflected in
|
||||
the RemoteCdm code in the future.
|
||||
- Implemented `/set_service_certificate` endpoint in serve schema as an improved way of setting the service certificate
|
||||
than passing it to `/challenge`.
|
||||
- You can now unset the service certificate by providing an empty service certificate value (or None or null). This
|
||||
includes support for doing so even in serve API and the new RemoteCdm.
|
||||
|
||||
### Changed
|
||||
|
||||
- The Construction of the Cdm object has changed. You can now initialize it with more direct values if you don't want
|
||||
to use the Device class or don't want to use `.wvd` files. To use Device classes, you must now use the
|
||||
`Cdm.from_device()` class method.
|
||||
- The ability to pass the certificate to `/challenge` has been removed. Please use the new `/set_service_certificate`
|
||||
endpoint before calling `/challenge`. You do not need to set it every time. Once per session is enough unless you
|
||||
now want to use a different certificate.
|
||||
|
||||
## [1.2.1] - 2022-08-02
|
||||
|
||||
This release is primarily a maintenance release for `serve` functionality but some Cdm fixes are also present.
|
||||
@@ -143,6 +223,8 @@ This release is primarily a maintenance release for `serve` functionality but so
|
||||
|
||||
Initial Release.
|
||||
|
||||
[1.3.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.3.1
|
||||
[1.3.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.3.0
|
||||
[1.2.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.2.1
|
||||
[1.2.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.2.0
|
||||
[1.1.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.1.1
|
||||
|
||||
@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[tool.poetry]
|
||||
name = "pywidevine"
|
||||
version = "1.2.1"
|
||||
version = "1.4.0"
|
||||
description = "Widevine CDM (Content Decryption Module) implementation in Python."
|
||||
authors = ["rlaphoenix <rlaphoenix@pm.me>"]
|
||||
license = "GPL-3.0-only"
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "1.2.1"
|
||||
__version__ = "1.4.0"
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
import random
|
||||
@@ -5,7 +7,7 @@ import subprocess
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Union, Container, Optional
|
||||
from typing import Union, Optional
|
||||
from uuid import UUID
|
||||
|
||||
from Crypto.Cipher import AES, PKCS1_OAEP
|
||||
@@ -62,13 +64,62 @@ class Cdm:
|
||||
|
||||
MAX_NUM_OF_SESSIONS = 50 # most common limit
|
||||
|
||||
def __init__(self, device: Device):
|
||||
def __init__(
|
||||
self,
|
||||
device_type: Union[Device.Types, str],
|
||||
system_id: int,
|
||||
security_level: int,
|
||||
client_id: ClientIdentification,
|
||||
rsa_key: RSA.RsaKey
|
||||
):
|
||||
"""Initialize a Widevine Content Decryption Module (CDM)."""
|
||||
if not device:
|
||||
raise ValueError("A Widevine Device must be provided.")
|
||||
self.device = device
|
||||
if not device_type:
|
||||
raise ValueError("Device Type must be provided")
|
||||
if isinstance(device_type, str):
|
||||
device_type = Device.Types[device_type]
|
||||
if not isinstance(device_type, Device.Types):
|
||||
raise TypeError(f"Expected device_type to be a {Device.Types!r} not {device_type!r}")
|
||||
|
||||
self._sessions: dict[bytes, Session] = {}
|
||||
if not system_id:
|
||||
raise ValueError("System ID must be provided")
|
||||
if not isinstance(system_id, int):
|
||||
raise TypeError(f"Expected system_id to be a {int} not {system_id!r}")
|
||||
|
||||
if not security_level:
|
||||
raise ValueError("Security Level must be provided")
|
||||
if not isinstance(security_level, int):
|
||||
raise TypeError(f"Expected security_level to be a {int} not {security_level!r}")
|
||||
|
||||
if not client_id:
|
||||
raise ValueError("Client ID must be provided")
|
||||
if not isinstance(client_id, ClientIdentification):
|
||||
raise TypeError(f"Expected client_id to be a {ClientIdentification} not {client_id!r}")
|
||||
|
||||
if not rsa_key:
|
||||
raise ValueError("RSA Key must be provided")
|
||||
if not isinstance(rsa_key, RSA.RsaKey):
|
||||
raise TypeError(f"Expected rsa_key to be a {RSA.RsaKey} not {rsa_key!r}")
|
||||
|
||||
self.device_type = device_type
|
||||
self.system_id = system_id
|
||||
self.security_level = security_level
|
||||
self.__client_id = client_id
|
||||
|
||||
self.__signer = pss.new(rsa_key)
|
||||
self.__decrypter = PKCS1_OAEP.new(rsa_key)
|
||||
|
||||
self.__sessions: dict[bytes, Session] = {}
|
||||
|
||||
@classmethod
|
||||
def from_device(cls, device: Device) -> Cdm:
|
||||
"""Initialize a Widevine CDM from a Widevine Device (.wvd) file."""
|
||||
return cls(
|
||||
device_type=device.type,
|
||||
system_id=device.system_id,
|
||||
security_level=device.security_level,
|
||||
client_id=device.client_id,
|
||||
rsa_key=device.private_key
|
||||
)
|
||||
|
||||
def open(self) -> bytes:
|
||||
"""
|
||||
@@ -77,11 +128,11 @@ class Cdm:
|
||||
Raises:
|
||||
TooManySessions: If the session cannot be opened as limit has been reached.
|
||||
"""
|
||||
if len(self._sessions) > self.MAX_NUM_OF_SESSIONS:
|
||||
if len(self.__sessions) > self.MAX_NUM_OF_SESSIONS:
|
||||
raise TooManySessions(f"Too many Sessions open ({self.MAX_NUM_OF_SESSIONS}).")
|
||||
|
||||
session = Session()
|
||||
self._sessions[session.id] = session
|
||||
self.__sessions[session.id] = session
|
||||
|
||||
return session.id
|
||||
|
||||
@@ -95,12 +146,12 @@ class Cdm:
|
||||
Raises:
|
||||
InvalidSession: If the Session identifier is invalid.
|
||||
"""
|
||||
session = self._sessions.get(session_id)
|
||||
session = self.__sessions.get(session_id)
|
||||
if not session:
|
||||
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
|
||||
del self._sessions[session_id]
|
||||
del self.__sessions[session_id]
|
||||
|
||||
def set_service_certificate(self, session_id: bytes, certificate: Union[bytes, str]) -> str:
|
||||
def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
|
||||
"""
|
||||
Set a Service Privacy Certificate for Privacy Mode. (optional but recommended)
|
||||
|
||||
@@ -116,7 +167,8 @@ class Cdm:
|
||||
session_id: Session identifier.
|
||||
certificate: SignedDrmCertificate (or SignedMessage containing one) in Base64
|
||||
or Bytes form obtained from the Service. Some services have their own,
|
||||
but most use the common privacy cert, (common_privacy_cert).
|
||||
but most use the common privacy cert, (common_privacy_cert). If None, it
|
||||
will remove the current certificate.
|
||||
|
||||
Raises:
|
||||
InvalidSession: If the Session identifier is invalid.
|
||||
@@ -126,11 +178,18 @@ class Cdm:
|
||||
match the underlying DrmCertificate.
|
||||
|
||||
Returns the Service Provider ID of the verified DrmCertificate if successful.
|
||||
If certificate is None, it will return the now unset certificate's Provider ID.
|
||||
"""
|
||||
session = self._sessions.get(session_id)
|
||||
session = self.__sessions.get(session_id)
|
||||
if not session:
|
||||
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
|
||||
|
||||
if certificate is None:
|
||||
drm_certificate = DrmCertificate()
|
||||
drm_certificate.ParseFromString(session.service_certificate.drm_certificate)
|
||||
session.service_certificate = None
|
||||
return drm_certificate.provider_id
|
||||
|
||||
if isinstance(certificate, str):
|
||||
try:
|
||||
certificate = base64.b64decode(certificate) # assuming base64
|
||||
@@ -176,7 +235,7 @@ class Cdm:
|
||||
def get_license_challenge(
|
||||
self,
|
||||
session_id: bytes,
|
||||
init_data: Union[Container, bytes, str],
|
||||
pssh: PSSH,
|
||||
type_: Union[int, str] = LicenseType.STREAMING,
|
||||
privacy_mode: bool = True
|
||||
) -> bytes:
|
||||
@@ -185,8 +244,7 @@ class Cdm:
|
||||
|
||||
Parameters:
|
||||
session_id: Session identifier.
|
||||
init_data: Widevine Cenc Header (Init Data) or a Protection System Specific
|
||||
Header Box to take the init data from.
|
||||
pssh: PSSH Object to get the init data from.
|
||||
type_: Type of License you wish to exchange, often `STREAMING`. The `OFFLINE`
|
||||
Licenses are for Offline licensing of Downloaded content.
|
||||
privacy_mode: Encrypt the Client ID using the Privacy Certificate. If the
|
||||
@@ -202,16 +260,14 @@ class Cdm:
|
||||
Returns a SignedMessage containing a LicenseRequest message. It's signed with
|
||||
the Private Key of the device provision.
|
||||
"""
|
||||
session = self._sessions.get(session_id)
|
||||
session = self.__sessions.get(session_id)
|
||||
if not session:
|
||||
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
|
||||
|
||||
if not init_data:
|
||||
raise InvalidInitData("The init_data must not be empty.")
|
||||
try:
|
||||
init_data = PSSH.get_as_box(init_data).init_data
|
||||
except (ValueError, binascii.Error, DecodeError) as e:
|
||||
raise InvalidInitData(str(e))
|
||||
if not pssh:
|
||||
raise InvalidInitData("A pssh must be provided.")
|
||||
if not isinstance(pssh, PSSH):
|
||||
raise InvalidInitData(f"Expected pssh to be a {PSSH}, not {pssh!r}")
|
||||
|
||||
try:
|
||||
if isinstance(type_, int):
|
||||
@@ -231,26 +287,25 @@ class Cdm:
|
||||
license_request.protocol_version = ProtocolVersion.Value("VERSION_2_1")
|
||||
license_request.key_control_nonce = random.randrange(1, 2 ** 31)
|
||||
|
||||
license_request.content_id.widevine_pssh_data.pssh_data.append(init_data)
|
||||
# pssh_data may be either a WidevineCencHeader or custom data
|
||||
# we have to assume the pssh.init_data value is valid, we cannot test
|
||||
license_request.content_id.widevine_pssh_data.pssh_data.append(pssh.init_data)
|
||||
license_request.content_id.widevine_pssh_data.license_type = type_
|
||||
license_request.content_id.widevine_pssh_data.request_id = request_id
|
||||
|
||||
if session.service_certificate and privacy_mode:
|
||||
# encrypt the client id for privacy mode
|
||||
license_request.encrypted_client_id.CopyFrom(self.encrypt_client_id(
|
||||
client_id=self.device.client_id,
|
||||
client_id=self.__client_id,
|
||||
service_certificate=session.service_certificate
|
||||
))
|
||||
else:
|
||||
license_request.client_id.CopyFrom(self.device.client_id)
|
||||
license_request.client_id.CopyFrom(self.__client_id)
|
||||
|
||||
license_message = SignedMessage()
|
||||
license_message.type = SignedMessage.MessageType.LICENSE_REQUEST
|
||||
license_message.msg = license_request.SerializeToString()
|
||||
|
||||
license_message.signature = pss. \
|
||||
new(self.device.private_key). \
|
||||
sign(SHA1.new(license_message.msg))
|
||||
license_message.signature = self.__signer.sign(SHA1.new(license_message.msg))
|
||||
|
||||
session.context[request_id] = self.derive_context(license_message.msg)
|
||||
|
||||
@@ -278,7 +333,7 @@ class Cdm:
|
||||
SignatureMismatch: If the Signature of the License SignedMessage does not
|
||||
match the underlying License.
|
||||
"""
|
||||
session = self._sessions.get(session_id)
|
||||
session = self.__sessions.get(session_id)
|
||||
if not session:
|
||||
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
|
||||
|
||||
@@ -315,11 +370,10 @@ class Cdm:
|
||||
if not context:
|
||||
raise InvalidContext("Cannot parse a license message without first making a license request")
|
||||
|
||||
session_key = PKCS1_OAEP. \
|
||||
new(self.device.private_key). \
|
||||
decrypt(license_message.session_key)
|
||||
|
||||
enc_key, mac_key_server, _ = self.derive_keys(*context, session_key)
|
||||
enc_key, mac_key_server, _ = self.derive_keys(
|
||||
*context,
|
||||
key=self.__decrypter.decrypt(license_message.session_key)
|
||||
)
|
||||
|
||||
computed_signature = HMAC. \
|
||||
new(mac_key_server, digestmod=SHA256). \
|
||||
@@ -336,6 +390,39 @@ class Cdm:
|
||||
|
||||
del session.context[licence.id.request_id]
|
||||
|
||||
def get_keys(self, session_id: bytes, type_: Optional[Union[int, str]] = None) -> list[Key]:
|
||||
"""
|
||||
Get Keys from the loaded License message.
|
||||
|
||||
Parameters:
|
||||
session_id: Session identifier.
|
||||
type_: (optional) Key Type to filter by and return.
|
||||
|
||||
Raises:
|
||||
InvalidSession: If the Session identifier is invalid.
|
||||
TypeError: If the provided type_ is an unexpected value type.
|
||||
ValueError: If the provided type_ is not a valid Key Type.
|
||||
"""
|
||||
session = self.__sessions.get(session_id)
|
||||
if not session:
|
||||
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
|
||||
|
||||
try:
|
||||
if isinstance(type_, str):
|
||||
type_ = License.KeyContainer.KeyType.Value(type_)
|
||||
elif isinstance(type_, int):
|
||||
License.KeyContainer.KeyType.Name(type_) # only test
|
||||
elif type_ is not None:
|
||||
raise TypeError(f"Expected type_ to be a {License.KeyContainer.KeyType} or int, not {type_!r}")
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Could not parse type_ as a {License.KeyContainer.KeyType}, {e}")
|
||||
|
||||
return [
|
||||
key
|
||||
for key in session.keys
|
||||
if not type_ or key.type == License.KeyContainer.KeyType.Name(type_)
|
||||
]
|
||||
|
||||
def decrypt(
|
||||
self,
|
||||
session_id: bytes,
|
||||
@@ -388,7 +475,7 @@ class Cdm:
|
||||
if output_file.is_file() and not exists_ok:
|
||||
raise FileExistsError(f"Output file already exists, {output_file}")
|
||||
|
||||
session = self._sessions.get(session_id)
|
||||
session = self.__sessions.get(session_id)
|
||||
if not session:
|
||||
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
|
||||
|
||||
|
||||
@@ -32,3 +32,7 @@ class SignatureMismatch(PyWidevineException):
|
||||
|
||||
class NoKeysLoaded(PyWidevineException):
|
||||
"""No License was parsed for this Session, No Keys available."""
|
||||
|
||||
|
||||
class DeviceMismatch(PyWidevineException):
|
||||
"""The Remote CDMs Device information and the APIs Device information did not match."""
|
||||
|
||||
@@ -13,6 +13,7 @@ from pywidevine import __version__
|
||||
from pywidevine.cdm import Cdm
|
||||
from pywidevine.device import Device
|
||||
from pywidevine.license_protocol_pb2 import LicenseType, FileHashes
|
||||
from pywidevine.pssh import PSSH
|
||||
|
||||
|
||||
@click.group(invoke_without_command=True)
|
||||
@@ -62,13 +63,16 @@ def license_(device: Path, pssh: str, server: str, type_: str, privacy: bool):
|
||||
"""
|
||||
log = logging.getLogger("license")
|
||||
|
||||
# prepare pssh
|
||||
pssh = PSSH(pssh)
|
||||
|
||||
# load device
|
||||
device = Device.load(device)
|
||||
log.info(f"[+] Loaded Device ({device.system_id} L{device.security_level})")
|
||||
log.debug(device)
|
||||
|
||||
# load cdm
|
||||
cdm = Cdm(device)
|
||||
cdm = Cdm.from_device(device)
|
||||
log.info(f"[+] Loaded CDM")
|
||||
log.debug(cdm)
|
||||
|
||||
@@ -113,9 +117,7 @@ def license_(device: Path, pssh: str, server: str, type_: str, privacy: bool):
|
||||
log.info("[+] License Parsed Successfully")
|
||||
|
||||
# print keys
|
||||
# Note: This showcases how insecure a Python CDM implementation is
|
||||
# The keys should not be given to the user, but we cannot prevent this
|
||||
for key in cdm._sessions[session_id].keys:
|
||||
for key in cdm.get_keys(session_id):
|
||||
log.info(f"[{key.type}] {key.kid.hex}:{key.key.hex()}")
|
||||
|
||||
# close session, disposes of session data
|
||||
|
||||
@@ -2,7 +2,8 @@ from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
from typing import Union
|
||||
import string
|
||||
from typing import Union, Optional
|
||||
from uuid import UUID
|
||||
|
||||
import construct
|
||||
@@ -21,17 +22,172 @@ class PSSH:
|
||||
Widevine = UUID(bytes=b"\xed\xef\x8b\xa9\x79\xd6\x4a\xce\xa3\xc8\x27\xdc\xd5\x1d\x21\xed")
|
||||
PlayReady = UUID(bytes=b"\x9a\x04\xf0\x79\x98\x40\x42\x86\xab\x92\xe6\x5b\xe0\x88\x5f\x95")
|
||||
|
||||
def __init__(self, box: Container):
|
||||
self._box = box
|
||||
|
||||
@staticmethod
|
||||
def from_playready_pssh(box: Container) -> Container:
|
||||
def __init__(self, data: Union[Container, str, bytes], strict: bool = False):
|
||||
"""
|
||||
Convert a PlayReady PSSH to a Widevine PSSH.
|
||||
Load a PSSH box or Widevine Cenc Header data as a new v0 PSSH box.
|
||||
|
||||
[Strict mode (strict=True)]
|
||||
|
||||
Supports the following forms of input data in either Base64 or Bytes form:
|
||||
- Full PSSH mp4 boxes (as defined by pymp4 Box).
|
||||
- Full Widevine Cenc Headers (as defined by WidevinePsshData proto).
|
||||
|
||||
[Lenient mode (strict=False, default)]
|
||||
|
||||
If the data is not supported in Strict mode, and is assumed not to be corrupt or
|
||||
parsed incorrectly, the License Server likely accepts a custom init_data value
|
||||
during a License Request call. This is uncommon behavior but not out of realm of
|
||||
possibilities. For example, Netflix does this with it's MSL WidevineExchange
|
||||
scheme.
|
||||
|
||||
Lenient mode will craft a new v0 PSSH box with the init_data field set to
|
||||
the provided data as-is. The data will first be base64 decoded. This behavior
|
||||
may not work in your scenario and if that's the case please manually craft
|
||||
your own PSSH box with the init_data field to be used in License Requests.
|
||||
|
||||
Raises:
|
||||
ValueError: If the data is empty.
|
||||
TypeError: If the data is an unexpected type.
|
||||
binascii.Error: If the data could not be decoded as Base64 if provided as a
|
||||
string.
|
||||
DecodeError: If the data could not be parsed as a PSSH mp4 box nor a Widevine
|
||||
Cenc Header and strict mode is enabled.
|
||||
"""
|
||||
if not data:
|
||||
raise ValueError("Data must not be empty.")
|
||||
|
||||
if isinstance(data, Container):
|
||||
box = data
|
||||
else:
|
||||
if isinstance(data, str):
|
||||
try:
|
||||
data = base64.b64decode(data)
|
||||
except (binascii.Error, binascii.Incomplete) as e:
|
||||
raise binascii.Error(f"Could not decode data as Base64, {e}")
|
||||
|
||||
if not isinstance(data, bytes):
|
||||
raise TypeError(f"Expected data to be a {Container}, bytes, or base64, not {data!r}")
|
||||
|
||||
try:
|
||||
box = Box.parse(data)
|
||||
except (IOError, construct.ConstructError): # not a box
|
||||
try:
|
||||
cenc_header = WidevinePsshData()
|
||||
cenc_header.ParseFromString(data)
|
||||
cenc_header = cenc_header.SerializeToString()
|
||||
if cenc_header != data: # not actually a WidevinePsshData
|
||||
raise DecodeError()
|
||||
except DecodeError: # not a widevine cenc header
|
||||
if strict:
|
||||
raise DecodeError(f"Could not parse data as a {Container} nor a {WidevinePsshData}.")
|
||||
# Data is not a Widevine Cenc Header, it's something custom.
|
||||
# The license server likely has something custom to parse it.
|
||||
# See doc-string about Lenient mode for more information.
|
||||
cenc_header = data
|
||||
|
||||
box = Box.parse(Box.build(dict(
|
||||
type=b"pssh",
|
||||
version=0,
|
||||
flags=0,
|
||||
system_ID=PSSH.SystemId.Widevine,
|
||||
init_data=cenc_header
|
||||
)))
|
||||
|
||||
self.version = box.version
|
||||
self.flags = box.flags
|
||||
self.system_id = box.system_ID
|
||||
self.key_ids = box.key_IDs
|
||||
self.init_data = box.init_data
|
||||
|
||||
@classmethod
|
||||
def new(
|
||||
cls,
|
||||
key_ids: Optional[list[Union[UUID, str, bytes]]] = None,
|
||||
init_data: Optional[Union[WidevinePsshData, str, bytes]] = None,
|
||||
version: int = 0,
|
||||
flags: int = 0
|
||||
) -> PSSH:
|
||||
"""Craft a new version 0 or 1 PSSH Box."""
|
||||
if key_ids is not None:
|
||||
if not isinstance(key_ids, list):
|
||||
raise TypeError(f"Expected key_ids to be a list not {key_ids!r}")
|
||||
|
||||
if init_data is not None:
|
||||
if not isinstance(init_data, (WidevinePsshData, str, bytes)):
|
||||
raise TypeError(f"Expected init_data to be a {WidevinePsshData}, base64, or bytes, not {init_data!r}")
|
||||
|
||||
if not isinstance(version, int):
|
||||
raise TypeError(f"Expected version to be an int not {version!r}")
|
||||
if version not in (0, 1):
|
||||
raise ValueError(f"Invalid version, must be either 0 or 1, not {version}.")
|
||||
|
||||
if not isinstance(flags, int):
|
||||
raise TypeError(f"Expected flags to be an int not {flags!r}")
|
||||
if flags < 0:
|
||||
raise ValueError(f"Invalid flags, cannot be less than 0.")
|
||||
|
||||
if version == 0 and key_ids is not None and init_data is not None:
|
||||
# v0 boxes use only init_data in the pssh field, but we can use the key_ids within the init_data
|
||||
raise ValueError("Version 0 PSSH boxes must use only init_data, not init_data and key_ids.")
|
||||
elif version == 1:
|
||||
# TODO: I cannot tell if they need either init_data or key_ids exclusively, or both is fine
|
||||
# So for now I will just make sure at least one is supplied
|
||||
if init_data is None and key_ids is None:
|
||||
raise ValueError("Version 1 PSSH boxes must use either init_data or key_ids but neither were provided")
|
||||
|
||||
if key_ids is not None:
|
||||
# ensure key_ids are bytes, supports hex, base64, and bytes
|
||||
key_ids = [
|
||||
(
|
||||
x.bytes if isinstance(x, UUID) else
|
||||
bytes.fromhex(x) if all(c in string.hexdigits for c in x) else
|
||||
base64.b64decode(x) if isinstance(x, str) else
|
||||
x
|
||||
)
|
||||
for x in key_ids
|
||||
]
|
||||
if not all(isinstance(x, bytes) for x in key_ids):
|
||||
not_bytes = [x for x in key_ids if not isinstance(x, bytes)]
|
||||
raise TypeError(
|
||||
"Expected all of key_ids to be a UUID, hex, base64, or bytes, but one or more are not, "
|
||||
f"{not_bytes!r}"
|
||||
)
|
||||
|
||||
if init_data is not None:
|
||||
if isinstance(init_data, WidevinePsshData):
|
||||
init_data = init_data.SerializeToString()
|
||||
elif isinstance(init_data, str):
|
||||
if all(c in string.hexdigits for c in init_data):
|
||||
init_data = bytes.fromhex(init_data)
|
||||
else:
|
||||
init_data = base64.b64decode(init_data)
|
||||
elif not isinstance(init_data, bytes):
|
||||
raise TypeError(
|
||||
f"Expecting init_data to be {WidevinePsshData}, hex, base64, or bytes, not {init_data!r}"
|
||||
)
|
||||
|
||||
box = Box.parse(Box.build(dict(
|
||||
type=b"pssh",
|
||||
version=version,
|
||||
flags=flags,
|
||||
system_ID=PSSH.SystemId.Widevine,
|
||||
key_ids=[key_ids, b""][key_ids is None],
|
||||
init_data=[init_data, b""][init_data is None]
|
||||
)))
|
||||
|
||||
if key_ids and version == 0:
|
||||
PSSH.overwrite_key_ids(box, [UUID(bytes=x) for x in key_ids])
|
||||
|
||||
return cls(box)
|
||||
|
||||
@classmethod
|
||||
def from_playready_pssh(cls, box: Container) -> PSSH:
|
||||
"""
|
||||
Convert a PlayReady PSSH Box to a Widevine PSSH Box.
|
||||
|
||||
Note: The resulting Widevine PSSH will likely not be usable for Licensing. This
|
||||
is because there is some data for a Widevine CENC Header that is not going to be
|
||||
listed in a PlayReady PSSH.
|
||||
is because there is some data for a Widevine Cenc Header that is missing from a
|
||||
PlayReady PSSH Box.
|
||||
|
||||
This converted PSSH will only be useful for it's Key IDs, so realistically only
|
||||
for matching Key IDs with a Track. As a fallback.
|
||||
@@ -47,7 +203,7 @@ class PSSH:
|
||||
cenc_header.algorithm = 1 # 0=Clear, 1=AES-CTR
|
||||
|
||||
for key_id in key_ids:
|
||||
cenc_header.key_id.append(key_id.bytes)
|
||||
cenc_header.key_ids.append(key_id.bytes)
|
||||
if box.version == 1:
|
||||
# ensure both cenc header and box has same Key IDs
|
||||
# v1 uses both this and within init data for basically no reason
|
||||
@@ -56,83 +212,22 @@ class PSSH:
|
||||
box.init_data = cenc_header.SerializeToString()
|
||||
box.system_ID = PSSH.SystemId.Widevine
|
||||
|
||||
return box
|
||||
return cls(box)
|
||||
|
||||
@staticmethod
|
||||
def from_key_ids(key_ids: list[UUID]) -> Container:
|
||||
"""
|
||||
Craft a new PSSH Box from just Key IDs.
|
||||
This should only be used as a very last measure.
|
||||
"""
|
||||
cenc_header = WidevinePsshData()
|
||||
for key_id in key_ids:
|
||||
cenc_header.key_id.append(key_id.bytes)
|
||||
cenc_header.algorithm = 1 # 0=Clear, 1=AES-CTR
|
||||
|
||||
box = Box.parse(Box.build(dict(
|
||||
def dump(self) -> bytes:
|
||||
"""Export the PSSH object as a full PSSH box in bytes form."""
|
||||
return Box.build(dict(
|
||||
type=b"pssh",
|
||||
version=0,
|
||||
flags=0,
|
||||
system_ID=PSSH.SystemId.Widevine,
|
||||
init_data=cenc_header.SerializeToString()
|
||||
)))
|
||||
version=self.version,
|
||||
flags=self.flags,
|
||||
system_ID=self.system_id,
|
||||
key_IDs=self.key_ids,
|
||||
init_data=self.init_data
|
||||
))
|
||||
|
||||
return box
|
||||
|
||||
@staticmethod
|
||||
def get_as_box(data: Union[Container, bytes, str], strict: bool = False) -> Container:
|
||||
"""
|
||||
Get possibly arbitrary data as a parsed PSSH mp4 box.
|
||||
|
||||
Parameters:
|
||||
data: PSSH mp4 box, Widevine Cenc Header (init data), or arbitrary data to
|
||||
parse or craft into a PSSH mp4 box.
|
||||
strict: Do not return a PSSH box for arbitrary data. Require the data to be
|
||||
at least a PSSH mp4 box, or a Widevine Cenc Header.
|
||||
|
||||
Raises:
|
||||
ValueError: If the data is empty, or an unexpected type.
|
||||
binascii.Error: If the data could not be decoded as Base64 if provided
|
||||
as a string.
|
||||
DecodeError: If the data could not be parsed as a PSSH mp4 box
|
||||
nor a Widevine Cenc Header while strict=True.
|
||||
"""
|
||||
if not data:
|
||||
raise ValueError("Data must not be empty.")
|
||||
|
||||
if isinstance(data, Container):
|
||||
return data
|
||||
|
||||
if isinstance(data, str):
|
||||
try:
|
||||
data = base64.b64decode(data)
|
||||
except (binascii.Error, binascii.Incomplete) as e:
|
||||
raise binascii.Error(f"Could not decode data as Base64, {e}")
|
||||
|
||||
if isinstance(data, bytes):
|
||||
try:
|
||||
data = Box.parse(data)
|
||||
except (IOError, construct.ConstructError):
|
||||
if strict:
|
||||
try:
|
||||
cenc_header = WidevinePsshData()
|
||||
if cenc_header.MergeFromString(data) < len(data):
|
||||
raise DecodeError()
|
||||
except DecodeError:
|
||||
raise DecodeError(f"Could not parse data as a PSSH mp4 box nor a Widevine Cenc Header.")
|
||||
else:
|
||||
data = cenc_header.SerializeToString()
|
||||
data = Box.parse(Box.build(dict(
|
||||
type=b"pssh",
|
||||
version=0,
|
||||
flags=0,
|
||||
system_ID=PSSH.SystemId.Widevine,
|
||||
init_data=data
|
||||
)))
|
||||
else:
|
||||
raise ValueError(f"Data is an unexpected type, expected bytes got {data!r}.")
|
||||
|
||||
return data
|
||||
def dumps(self) -> str:
|
||||
"""Export the PSSH object as a full PSSH box in base64 form."""
|
||||
return base64.b64encode(self.dump()).decode()
|
||||
|
||||
@staticmethod
|
||||
def get_key_ids(box: Container) -> list[UUID]:
|
||||
@@ -153,7 +248,7 @@ class PSSH:
|
||||
return [
|
||||
# the key_ids value may or may not be hex underlying
|
||||
UUID(bytes=key_id) if len(key_id) == 16 else UUID(hex=key_id.decode())
|
||||
for key_id in init.key_id
|
||||
for key_id in init.key_ids
|
||||
]
|
||||
|
||||
if box.system_ID == PSSH.SystemId.PlayReady:
|
||||
@@ -191,13 +286,10 @@ class PSSH:
|
||||
init = WidevinePsshData()
|
||||
init.ParseFromString(box.init_data)
|
||||
|
||||
# TODO: Is there a better way to clear the Key IDs?
|
||||
for _ in range(len(init.key_id or [])):
|
||||
init.key_id.pop(0)
|
||||
|
||||
# TODO: Is there a .extend or a way to add all without a loop?
|
||||
for key_id in key_ids:
|
||||
init.key_id.append(key_id.bytes)
|
||||
init.key_ids[:] = [
|
||||
key_id.bytes
|
||||
for key_id in key_ids
|
||||
]
|
||||
|
||||
box.init_data = init.SerializeToString()
|
||||
|
||||
|
||||
256
pywidevine/remotecdm.py
Normal file
256
pywidevine/remotecdm.py
Normal file
@@ -0,0 +1,256 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
import re
|
||||
from typing import Union, Optional
|
||||
|
||||
import requests
|
||||
from Crypto.PublicKey import RSA
|
||||
from google.protobuf.message import DecodeError
|
||||
from pywidevine.cdm import Cdm
|
||||
from pywidevine.device import Device
|
||||
from pywidevine.exceptions import InvalidInitData, InvalidLicenseType, InvalidLicenseMessage, DeviceMismatch
|
||||
from pywidevine.key import Key
|
||||
|
||||
from pywidevine.license_protocol_pb2 import LicenseType, SignedMessage, License, ClientIdentification
|
||||
from pywidevine.pssh import PSSH
|
||||
|
||||
|
||||
class RemoteCdm(Cdm):
|
||||
"""Remote Accessible CDM using pywidevine's serve schema."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
device_type: Union[Device.Types, str],
|
||||
system_id: int,
|
||||
security_level: int,
|
||||
host: str,
|
||||
secret: str,
|
||||
device_name: str
|
||||
):
|
||||
"""Initialize a Widevine Content Decryption Module (CDM)."""
|
||||
if not device_type:
|
||||
raise ValueError("Device Type must be provided")
|
||||
if isinstance(device_type, str):
|
||||
device_type = Device.Types[device_type]
|
||||
if not isinstance(device_type, Device.Types):
|
||||
raise TypeError(f"Expected device_type to be a {Device.Types!r} not {device_type!r}")
|
||||
|
||||
if not system_id:
|
||||
raise ValueError("System ID must be provided")
|
||||
if not isinstance(system_id, int):
|
||||
raise TypeError(f"Expected system_id to be a {int} not {system_id!r}")
|
||||
|
||||
if not security_level:
|
||||
raise ValueError("Security Level must be provided")
|
||||
if not isinstance(security_level, int):
|
||||
raise TypeError(f"Expected security_level to be a {int} not {security_level!r}")
|
||||
|
||||
if not host:
|
||||
raise ValueError("API Host must be provided")
|
||||
if not isinstance(host, str):
|
||||
raise TypeError(f"Expected host to be a {str} not {host!r}")
|
||||
|
||||
if not secret:
|
||||
raise ValueError("API Secret must be provided")
|
||||
if not isinstance(secret, str):
|
||||
raise TypeError(f"Expected secret to be a {str} not {secret!r}")
|
||||
|
||||
if not device_name:
|
||||
raise ValueError("API Device name must be provided")
|
||||
if not isinstance(device_name, str):
|
||||
raise TypeError(f"Expected device_name to be a {str} not {device_name!r}")
|
||||
|
||||
self.device_type = device_type
|
||||
self.system_id = system_id
|
||||
self.security_level = security_level
|
||||
self.host = host
|
||||
self.device_name = device_name
|
||||
|
||||
# spoof client_id and rsa_key just so we can construct via super call
|
||||
super().__init__(device_type, system_id, security_level, ClientIdentification(), RSA.generate(2048))
|
||||
|
||||
self.__session = requests.Session()
|
||||
self.__session.headers.update({
|
||||
"X-Secret-Key": secret
|
||||
})
|
||||
|
||||
r = requests.head(self.host)
|
||||
if r.status_code != 200:
|
||||
raise ValueError(f"Could not test Remote API version [{r.status_code}]")
|
||||
server = r.headers.get("Server")
|
||||
if not server or "pywidevine serve" not in server.lower():
|
||||
raise ValueError(f"This Remote CDM API does not seem to be a pywidevine serve API ({server}).")
|
||||
server_version = re.search(r"pywidevine serve v([\d.]+)", server, re.IGNORECASE)
|
||||
if not server_version:
|
||||
raise ValueError(f"The pywidevine server API is not stating the version correctly, cannot continue.")
|
||||
server_version = server_version.group(1)
|
||||
if server_version < "1.4.0":
|
||||
raise ValueError(f"This pywidevine serve API version ({server_version}) is not supported.")
|
||||
|
||||
@classmethod
|
||||
def from_device(cls, device: Device) -> RemoteCdm:
|
||||
raise NotImplementedError("You cannot load a RemoteCdm from a local Device file.")
|
||||
|
||||
def open(self) -> bytes:
|
||||
r = self.__session.get(
|
||||
url=f"{self.host}/{self.device_name}/open"
|
||||
).json()
|
||||
if r['status'] != 200:
|
||||
raise ValueError(f"Cannot Open CDM Session, {r['message']} [{r['status']}]")
|
||||
r = r["data"]
|
||||
|
||||
if int(r["device"]["system_id"]) != self.system_id:
|
||||
raise DeviceMismatch("The System ID specified does not match the one specified in the API response.")
|
||||
|
||||
if int(r["device"]["security_level"]) != self.security_level:
|
||||
raise DeviceMismatch("The Security Level specified does not match the one specified in the API response.")
|
||||
|
||||
return bytes.fromhex(r["session_id"])
|
||||
|
||||
def close(self, session_id: bytes) -> None:
|
||||
r = self.__session.get(
|
||||
url=f"{self.host}/{self.device_name}/close/{session_id.hex()}"
|
||||
).json()
|
||||
if r["status"] != 200:
|
||||
raise ValueError(f"Cannot Close CDM Session, {r['message']} [{r['status']}]")
|
||||
|
||||
def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
|
||||
if certificate is None:
|
||||
certificate_b64 = None
|
||||
elif isinstance(certificate, str):
|
||||
certificate_b64 = certificate # assuming base64
|
||||
elif isinstance(certificate, bytes):
|
||||
certificate_b64 = base64.b64encode(certificate).decode()
|
||||
else:
|
||||
raise DecodeError(f"Expecting Certificate to be base64 or bytes, not {certificate!r}")
|
||||
|
||||
r = self.__session.post(
|
||||
url=f"{self.host}/{self.device_name}/set_service_certificate",
|
||||
json={
|
||||
"session_id": session_id.hex(),
|
||||
"certificate": certificate_b64
|
||||
}
|
||||
).json()
|
||||
if r["status"] != 200:
|
||||
raise ValueError(f"Cannot Set CDMs Service Certificate, {r['message']} [{r['status']}]")
|
||||
r = r["data"]
|
||||
|
||||
return r["provider_id"]
|
||||
|
||||
def get_license_challenge(
|
||||
self,
|
||||
session_id: bytes,
|
||||
pssh: PSSH,
|
||||
type_: Union[int, str] = LicenseType.STREAMING,
|
||||
privacy_mode: bool = True
|
||||
) -> bytes:
|
||||
if not pssh:
|
||||
raise InvalidInitData("A pssh must be provided.")
|
||||
if not isinstance(pssh, PSSH):
|
||||
raise InvalidInitData(f"Expected pssh to be a {PSSH}, not {pssh!r}")
|
||||
|
||||
try:
|
||||
if isinstance(type_, int):
|
||||
type_ = LicenseType.Name(int(type_))
|
||||
elif isinstance(type_, str):
|
||||
type_ = LicenseType.Name(LicenseType.Value(type_))
|
||||
elif isinstance(type_, LicenseType):
|
||||
type_ = LicenseType.Name(type_)
|
||||
else:
|
||||
raise InvalidLicenseType()
|
||||
except ValueError:
|
||||
raise InvalidLicenseType(f"License Type {type_!r} is invalid")
|
||||
|
||||
r = self.__session.post(
|
||||
url=f"{self.host}/{self.device_name}/get_license_challenge/{type_}",
|
||||
json={
|
||||
"session_id": session_id.hex(),
|
||||
"init_data": pssh.dumps()
|
||||
}
|
||||
).json()
|
||||
if r["status"] != 200:
|
||||
raise ValueError(f"Cannot get Challenge, {r['message']} [{r['status']}]")
|
||||
r = r["data"]
|
||||
|
||||
try:
|
||||
license_message = SignedMessage()
|
||||
license_message.ParseFromString(base64.b64decode(r["challenge_b64"]))
|
||||
except DecodeError as e:
|
||||
raise InvalidLicenseMessage(f"Failed to parse license request, {e}")
|
||||
|
||||
return license_message.SerializeToString()
|
||||
|
||||
def parse_license(self, session_id: bytes, license_message: Union[SignedMessage, bytes, str]) -> None:
|
||||
if not license_message:
|
||||
raise InvalidLicenseMessage("Cannot parse an empty license_message")
|
||||
|
||||
if isinstance(license_message, str):
|
||||
try:
|
||||
license_message = base64.b64decode(license_message)
|
||||
except (binascii.Error, binascii.Incomplete) as e:
|
||||
raise InvalidLicenseMessage(f"Could not decode license_message as Base64, {e}")
|
||||
|
||||
if isinstance(license_message, bytes):
|
||||
signed_message = SignedMessage()
|
||||
try:
|
||||
signed_message.ParseFromString(license_message)
|
||||
except DecodeError as e:
|
||||
raise InvalidLicenseMessage(f"Could not parse license_message as a SignedMessage, {e}")
|
||||
license_message = signed_message
|
||||
|
||||
if not isinstance(license_message, SignedMessage):
|
||||
raise InvalidLicenseMessage(f"Expecting license_response to be a SignedMessage, got {license_message!r}")
|
||||
|
||||
if license_message.type != SignedMessage.MessageType.LICENSE:
|
||||
raise InvalidLicenseMessage(
|
||||
f"Expecting a LICENSE message, not a "
|
||||
f"'{SignedMessage.MessageType.Name(license_message.type)}' message."
|
||||
)
|
||||
|
||||
r = self.__session.post(
|
||||
url=f"{self.host}/{self.device_name}/parse_license",
|
||||
json={
|
||||
"session_id": session_id.hex(),
|
||||
"license_message": base64.b64encode(license_message.SerializeToString()).decode()
|
||||
}
|
||||
).json()
|
||||
if r["status"] != 200:
|
||||
raise ValueError(f"Cannot parse License, {r['message']} [{r['status']}]")
|
||||
|
||||
def get_keys(self, session_id: bytes, type_: Optional[Union[int, str]] = None) -> list[Key]:
|
||||
try:
|
||||
if isinstance(type_, str):
|
||||
License.KeyContainer.KeyType.Value(type_) # only test
|
||||
elif isinstance(type_, int):
|
||||
type_ = License.KeyContainer.KeyType.Name(type_)
|
||||
elif type_ is None:
|
||||
type_ = "ALL"
|
||||
else:
|
||||
raise TypeError(f"Expected type_ to be a {License.KeyContainer.KeyType} or int, not {type_!r}")
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Could not parse type_ as a {License.KeyContainer.KeyType}, {e}")
|
||||
|
||||
r = self.__session.post(
|
||||
url=f"{self.host}/{self.device_name}/get_keys/{type_}",
|
||||
json={
|
||||
"session_id": session_id.hex()
|
||||
}
|
||||
).json()
|
||||
if r["status"] != 200:
|
||||
raise ValueError(f"Could not get {type_} Keys, {r['message']} [{r['status']}]")
|
||||
r = r["data"]
|
||||
|
||||
return [
|
||||
Key(
|
||||
type_=key["type"],
|
||||
kid=Key.kid_to_uuid(bytes.fromhex(key["key_id"])),
|
||||
key=bytes.fromhex(key["key"]),
|
||||
permissions=key["permissions"]
|
||||
)
|
||||
for key in r["keys"]
|
||||
]
|
||||
|
||||
|
||||
__ALL__ = (RemoteCdm,)
|
||||
@@ -3,6 +3,10 @@ import sys
|
||||
from pathlib import Path
|
||||
from typing import Optional, Union
|
||||
|
||||
from google.protobuf.message import DecodeError
|
||||
|
||||
from pywidevine.pssh import PSSH
|
||||
|
||||
try:
|
||||
from aiohttp import web
|
||||
except ImportError:
|
||||
@@ -16,8 +20,8 @@ except ImportError:
|
||||
from pywidevine import __version__
|
||||
from pywidevine.cdm import Cdm
|
||||
from pywidevine.device import Device
|
||||
from pywidevine.exceptions import TooManySessions, InvalidSession
|
||||
from pywidevine.license_protocol_pb2 import LicenseType, License
|
||||
from pywidevine.exceptions import TooManySessions, InvalidSession, SignatureMismatch, InvalidInitData, \
|
||||
InvalidLicenseType, InvalidLicenseMessage, InvalidContext
|
||||
|
||||
routes = web.RouteTableDef()
|
||||
|
||||
@@ -63,10 +67,10 @@ async def open(request: web.Request) -> web.Response:
|
||||
"message": f"Device '{device_name}' is not found or you are not authorized to use it."
|
||||
}, status=403)
|
||||
|
||||
cdm = request.app["cdms"].get((secret_key, device_name))
|
||||
cdm: Optional[Cdm] = request.app["cdms"].get((secret_key, device_name))
|
||||
if not cdm:
|
||||
device = Device.load(request.app["config"]["devices"][device_name])
|
||||
cdm = request.app["cdms"][(secret_key, device_name)] = Cdm(device)
|
||||
cdm = request.app["cdms"][(secret_key, device_name)] = Cdm.from_device(device)
|
||||
|
||||
try:
|
||||
session_id = cdm.open()
|
||||
@@ -82,8 +86,8 @@ async def open(request: web.Request) -> web.Response:
|
||||
"data": {
|
||||
"session_id": session_id.hex(),
|
||||
"device": {
|
||||
"system_id": cdm.device.system_id,
|
||||
"security_level": cdm.device.security_level
|
||||
"system_id": cdm.system_id,
|
||||
"security_level": cdm.security_level
|
||||
}
|
||||
}
|
||||
})
|
||||
@@ -95,7 +99,7 @@ async def close(request: web.Request) -> web.Response:
|
||||
device_name = request.match_info["device"]
|
||||
session_id = bytes.fromhex(request.match_info["session_id"])
|
||||
|
||||
cdm = request.app["cdms"].get((secret_key, device_name))
|
||||
cdm: Optional[Cdm] = request.app["cdms"].get((secret_key, device_name))
|
||||
if not cdm:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
@@ -104,10 +108,10 @@ async def close(request: web.Request) -> web.Response:
|
||||
|
||||
try:
|
||||
cdm.close(session_id)
|
||||
except InvalidSession as e:
|
||||
except InvalidSession:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": str(e)
|
||||
"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
|
||||
}, status=400)
|
||||
|
||||
return web.json_response({
|
||||
@@ -116,11 +120,69 @@ async def close(request: web.Request) -> web.Response:
|
||||
})
|
||||
|
||||
|
||||
@routes.post("/{device}/challenge/{license_type}")
|
||||
async def challenge(request: web.Request) -> web.Response:
|
||||
@routes.post("/{device}/set_service_certificate")
|
||||
async def set_service_certificate(request: web.Request) -> web.Response:
|
||||
secret_key = request.headers["X-Secret-Key"]
|
||||
device_name = request.match_info["device"]
|
||||
|
||||
body = await request.json()
|
||||
for required_field in ("session_id", "certificate"):
|
||||
if required_field == "certificate":
|
||||
has_field = required_field in body # it needs the key, but can be empty/null
|
||||
else:
|
||||
has_field = body.get(required_field)
|
||||
if not has_field:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": f"Missing required field '{required_field}' in JSON body."
|
||||
}, status=400)
|
||||
|
||||
# get session id
|
||||
session_id = bytes.fromhex(body["session_id"])
|
||||
|
||||
# get cdm
|
||||
cdm: Optional[Cdm] = request.app["cdms"].get((secret_key, device_name))
|
||||
if not cdm:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": f"No Cdm session for {device_name} has been opened yet. No session to use."
|
||||
}, status=400)
|
||||
|
||||
# set service certificate
|
||||
certificate = body.get("certificate")
|
||||
try:
|
||||
provider_id = cdm.set_service_certificate(session_id, certificate)
|
||||
except InvalidSession:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
|
||||
}, status=400)
|
||||
except DecodeError as e:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": f"Invalid Service Certificate, {e}"
|
||||
}, status=400)
|
||||
except SignatureMismatch:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": "Signature Validation failed on the Service Certificate, rejecting."
|
||||
}, status=400)
|
||||
|
||||
return web.json_response({
|
||||
"status": 200,
|
||||
"message": f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.",
|
||||
"data": {
|
||||
"provider_id": provider_id
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
@routes.post("/{device}/get_license_challenge/{license_type}")
|
||||
async def get_license_challenge(request: web.Request) -> web.Response:
|
||||
secret_key = request.headers["X-Secret-Key"]
|
||||
device_name = request.match_info["device"]
|
||||
license_type = request.match_info["license_type"]
|
||||
|
||||
body = await request.json()
|
||||
for required_field in ("session_id", "init_data"):
|
||||
if not body.get(required_field):
|
||||
@@ -133,41 +195,47 @@ async def challenge(request: web.Request) -> web.Response:
|
||||
session_id = bytes.fromhex(body["session_id"])
|
||||
|
||||
# get cdm
|
||||
cdm = request.app["cdms"].get((secret_key, device_name))
|
||||
cdm: Optional[Cdm] = request.app["cdms"].get((secret_key, device_name))
|
||||
if not cdm:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": f"No Cdm session for {device_name} has been opened yet. No session to use."
|
||||
}, status=400)
|
||||
|
||||
if session_id not in cdm._sessions:
|
||||
# This can happen if:
|
||||
# - API server gets shutdown/restarted,
|
||||
# - The user calls /challenge before /open,
|
||||
# - The user called /open on a different IP Address
|
||||
# - The user closed the session
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": "Invalid Session ID. Session ID may have Expired."
|
||||
}, status=400)
|
||||
|
||||
# set service certificate
|
||||
service_certificate = body.get("service_certificate")
|
||||
if request.app["config"]["force_privacy_mode"] and not service_certificate:
|
||||
# enforce service certificate (opt-in)
|
||||
# TODO: Add a way to check if there's a service certificate set properly
|
||||
if request.app["config"].get("force_privacy_mode") and not cdm._Cdm__sessions[session_id].service_certificate:
|
||||
return web.json_response({
|
||||
"status": 403,
|
||||
"message": "No Service Certificate provided but Privacy Mode is Enforced."
|
||||
"message": "No Service Certificate set but Privacy Mode is Enforced."
|
||||
}, status=403)
|
||||
if service_certificate:
|
||||
cdm.set_service_certificate(session_id, service_certificate)
|
||||
|
||||
# get init data
|
||||
init_data = PSSH(body["init_data"])
|
||||
|
||||
# get challenge
|
||||
license_request = cdm.get_license_challenge(
|
||||
session_id=session_id,
|
||||
init_data=body["init_data"],
|
||||
type_=LicenseType.Value(request.match_info["license_type"]),
|
||||
privacy_mode=True
|
||||
)
|
||||
try:
|
||||
license_request = cdm.get_license_challenge(
|
||||
session_id=session_id,
|
||||
pssh=init_data,
|
||||
type_=license_type,
|
||||
privacy_mode=True
|
||||
)
|
||||
except InvalidSession:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
|
||||
}, status=400)
|
||||
except InvalidInitData as e:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": f"Invalid Init Data, {e}"
|
||||
}, status=400)
|
||||
except InvalidLicenseType:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": f"Invalid License Type '{license_type}'"
|
||||
}, status=400)
|
||||
|
||||
return web.json_response({
|
||||
"status": 200,
|
||||
@@ -178,8 +246,8 @@ async def challenge(request: web.Request) -> web.Response:
|
||||
}, status=200)
|
||||
|
||||
|
||||
@routes.post("/{device}/keys/{key_type}")
|
||||
async def keys(request: web.Request) -> web.Response:
|
||||
@routes.post("/{device}/parse_license")
|
||||
async def parse_license(request: web.Request) -> web.Response:
|
||||
secret_key = request.headers["X-Secret-Key"]
|
||||
device_name = request.match_info["device"]
|
||||
|
||||
@@ -194,21 +262,64 @@ async def keys(request: web.Request) -> web.Response:
|
||||
# get session id
|
||||
session_id = bytes.fromhex(body["session_id"])
|
||||
|
||||
# get cdm
|
||||
cdm: Optional[Cdm] = request.app["cdms"].get((secret_key, device_name))
|
||||
if not cdm:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": f"No Cdm session for {device_name} has been opened yet. No session to use."
|
||||
}, status=400)
|
||||
|
||||
# parse the license message
|
||||
try:
|
||||
cdm.parse_license(session_id, body["license_message"])
|
||||
except InvalidSession:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
|
||||
}, status=400)
|
||||
except InvalidLicenseMessage as e:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": f"Invalid License Message, {e}"
|
||||
}, status=400)
|
||||
except InvalidContext as e:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": f"Invalid Context, {e}"
|
||||
}, status=400)
|
||||
except SignatureMismatch:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": "Signature Validation failed on the License Message, rejecting."
|
||||
}, status=400)
|
||||
|
||||
return web.json_response({
|
||||
"status": 200,
|
||||
"message": "Successfully parsed and loaded the Keys from the License message."
|
||||
})
|
||||
|
||||
|
||||
@routes.post("/{device}/get_keys/{key_type}")
|
||||
async def get_keys(request: web.Request) -> web.Response:
|
||||
secret_key = request.headers["X-Secret-Key"]
|
||||
device_name = request.match_info["device"]
|
||||
|
||||
body = await request.json()
|
||||
for required_field in ("session_id",):
|
||||
if not body.get(required_field):
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": f"Missing required field '{required_field}' in JSON body."
|
||||
}, status=400)
|
||||
|
||||
# get session id
|
||||
session_id = bytes.fromhex(body["session_id"])
|
||||
|
||||
# get key type
|
||||
key_type = request.match_info["key_type"]
|
||||
if key_type == "ALL":
|
||||
key_type = None
|
||||
else:
|
||||
try:
|
||||
if key_type.isdigit():
|
||||
key_type = License.KeyContainer.KeyType.Name(int(key_type))
|
||||
else:
|
||||
License.KeyContainer.KeyType.Value(key_type) # only test
|
||||
except ValueError as e:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": f"The Key Type value is invalid, {e}"
|
||||
}, status=400)
|
||||
|
||||
# get cdm
|
||||
cdm = request.app["cdms"].get((secret_key, device_name))
|
||||
@@ -218,29 +329,29 @@ async def keys(request: web.Request) -> web.Response:
|
||||
"message": f"No Cdm session for {device_name} has been opened yet. No session to use."
|
||||
}, status=400)
|
||||
|
||||
if session_id not in cdm._sessions:
|
||||
# This can happen if:
|
||||
# - API server gets shutdown/restarted,
|
||||
# - The user calls /challenge before /open,
|
||||
# - The user called /open on a different IP Address
|
||||
# - The user closed the session
|
||||
# get keys
|
||||
try:
|
||||
keys = cdm.get_keys(session_id, key_type)
|
||||
except InvalidSession:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": "Invalid Session ID. Session ID may have Expired."
|
||||
"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
|
||||
}, status=400)
|
||||
except ValueError as e:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": f"The Key Type value '{key_type}' is invalid, {e}"
|
||||
}, status=400)
|
||||
|
||||
# parse the license message
|
||||
cdm.parse_license(session_id, body["license_message"])
|
||||
|
||||
# prepare the keys
|
||||
license_keys = [
|
||||
# get the keys in json form
|
||||
keys_json = [
|
||||
{
|
||||
"key_id": key.kid.hex,
|
||||
"key": key.key.hex(),
|
||||
"type": key.type,
|
||||
"permissions": key.permissions,
|
||||
}
|
||||
for key in cdm._sessions[session_id].keys
|
||||
for key in keys
|
||||
if not key_type or key.type == key_type
|
||||
]
|
||||
|
||||
@@ -248,8 +359,7 @@ async def keys(request: web.Request) -> web.Response:
|
||||
"status": 200,
|
||||
"message": "Success",
|
||||
"data": {
|
||||
# TODO: Add derived context keys like enc/mac[client]/mac[server]
|
||||
"keys": license_keys
|
||||
"keys": keys_json
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user