45 Commits

Author SHA1 Message Date
rlaphoenix
fc77f064ca Update Changelog for v1.4.0 2022-08-06 12:42:02 +01:00
rlaphoenix
f30ca45550 Bump to v1.4.0 2022-08-06 12:41:46 +01:00
rlaphoenix
576d7212d5 Cdm: Privatize the sessions map even harder
This is to further discourage direct access to the sessions directly
2022-08-06 12:36:48 +01:00
rlaphoenix
4f32b4b790 RemoteCdm: Increase minimum supported server to v1.4.0 2022-08-06 12:36:48 +01:00
rlaphoenix
2e2b5d528a RemoteCdm: Improve API error handling 2022-08-06 12:36:48 +01:00
rlaphoenix
2179987986 RemoteCdm: Remove all uses of Session()
This is now possible because everything relating to an underlying session is now finally fully remote thanks to the changes surrounding the new get_keys() method.

Any client code still getting keys by accessing `_sessions` manually should be updated to use the get_keys() method.
2022-08-06 12:36:48 +01:00
rlaphoenix
665b77bd24 serve: No longer return keys in /parse_license
/get_keys should now be used after /parse_license call is made.
2022-08-06 12:36:48 +01:00
rlaphoenix
3499c0cf4d RemoteCdm: Implement get_keys() 2022-08-06 12:36:48 +01:00
rlaphoenix
e4e109b9f3 RemoteCdm: Remove unnecessary parsing of license msg 2022-08-06 09:54:14 +01:00
rlaphoenix
1d606a9e54 Use Cdm.get_keys in license CLI command 2022-08-06 09:54:14 +01:00
rlaphoenix
f36977ef19 serve: Improve type hinting on Cdms gotten from app["cdms"]
For some reason on PyCharm typing doesnt work normally here even though the definition is provided in _startup().
2022-08-06 09:54:14 +01:00
rlaphoenix
dd1a355691 serve: Improve error handling on /parse_license 2022-08-06 09:54:14 +01:00
rlaphoenix
6eceaaf410 serve: Remote TODO that will not be done
We shouldn't really provide the derived context keys. There isn't any use to them outside of that specific license request and license response for which it was derived from. The only use to them would be to allow the client to decrypt the keys manually, which wont be necessary nor secure.
2022-08-06 09:54:14 +01:00
rlaphoenix
bd62b8d131 serve: Provide key_type to get_keys as-is
There's no need for serve code to handle parsing of it when the Cdm code will do so better.
2022-08-06 09:54:14 +01:00
rlaphoenix
11a2358002 serve: Improve error handling on /get_license_challenge 2022-08-06 09:54:14 +01:00
rlaphoenix
f2ed83205b serve: Provide license type to get_license_challenge as-is
There's no need for serve code to handle parsing of it when the Cdm code will do so better.
2022-08-06 09:54:14 +01:00
rlaphoenix
796cf7ffb0 serve: Improve error handling on /set_service_certificate 2022-08-06 09:54:14 +01:00
rlaphoenix
2c33af79df serve: Catch InvalidSession instead of manually ensuring session validity 2022-08-06 09:54:14 +01:00
rlaphoenix
93d9561fac serve: Use Cdm.get_keys() instead of accessing _sessions 2022-08-06 09:54:14 +01:00
rlaphoenix
c73078b7a9 serve: Add /get_keys endpoint 2022-08-06 09:54:14 +01:00
rlaphoenix
2445297ae8 serve: Match endpoints with Cdm class methods 2022-08-06 09:54:14 +01:00
rlaphoenix
01416f6513 Cdm: Add a method to get keys from loaded license 2022-08-06 09:54:14 +01:00
rlaphoenix
60e3ef0201 Remove unused Container import from Cdm and RemoteCdm 2022-08-06 08:27:19 +01:00
rlaphoenix
a1844fb195 gitignore: Exclude *.wvd for security 2022-08-06 08:21:30 +01:00
rlaphoenix
26d81a7bef PSSH: Allow crafting v0 boxes with just Key IDs
This is actually possible and in some cases necessary. While v0 boxes do not use key_IDs field of the PSSH Box, we can store the provided key_ids in the init data. E.g., Apple Music.
2022-08-05 08:31:14 +01:00
rlaphoenix
27a701aaea Cdm: Rework init_data param to expect PSSH object
A by product of this change is dropped support for providing a PSSH or init data directly in any form, that includes base64.

You must now provide it as a PSSH object, e.g., `cdm.get_license_challenge(session_id, PSSH("AAAAW3Bzc2...CSEQyAA=="))`

The idea behind this is to simplify the amount of places where parsing of PSSH and Init Data to a minimal amount. The codebase is getting quite annoying with the constant jumps and places where it needs to test for base64 strings, hex strings, bytes, and direct parsed PSSH boxes or WidevinePsshData. That's a ridiculous amount of code just to take in a pssh/init data, especially when the full pssh box will eventually be discarded/unused by the Cdm, as it just cares about the init data.

Client code should pass any PSSH value they get into a PSSH object appropriately, and then store it as such, instead of as a string or bytes. This makes it overall more powerful thanks to the ability to also access the underlying PSSH data more easily with this change.

It also helps to increase contrast between a compliant Widevine Cenc Header or PSSH Box, and arbitrary data (e.g., Netflix WidevineExchange's init data) because of how you initialize the PSSH.

It also allows the user to more accurately trace the underlying final parse of the PSSH value, instead of looking at it being pinged between multiple functions.

RemoteCdm now also sends the PSSH/init_data in full box form now, the serve API will be able to handle both scenarios but in edge cases providing the full box may be the difference between a working License Request and not.
2022-08-05 08:26:03 +01:00
rlaphoenix
2a87d55e20 PSSH: Add dump and dumps methods 2022-08-05 08:26:03 +01:00
rlaphoenix
76c7a402eb PSSH: Optimize how overwrite_key_ids works with the repeated field
We can clear a repeated field with `del field[:]` and overwrite an entire field with `field[:] = [b"123", b"456"]`. So we can reduce this down to a single call operation.
2022-08-05 08:26:03 +01:00
rlaphoenix
10fb954097 PSSH: Remove from_key_ids, use new() instead 2022-08-05 08:26:03 +01:00
rlaphoenix
9d7eaf4949 PSSH: Rework from_playready_pssh as a class method 2022-08-05 08:26:03 +01:00
rlaphoenix
0537c9666c PSSH: Add new() class method to craft boxes manually 2022-08-05 08:26:03 +01:00
rlaphoenix
fc47bbb436 PSSH: Merge get_as_box into the Constructor
Also improves the code of it overall including documentation.

The _box class instance variable has been removed and the raw box is no longer kept.
2022-08-05 05:33:13 +01:00
rlaphoenix
1ea57865ad PSSH: Fix usage of WidevinePsshData's key_ids field 2022-08-04 09:46:30 +01:00
rlaphoenix
f09a06857a Update Changelog for v1.3.1 2022-08-04 08:39:50 +01:00
rlaphoenix
e4f6a23725 Bump to v1.3.1 2022-08-04 08:39:42 +01:00
rlaphoenix
f21a21712b RemoteCdm: Improve Server Version testing
Some systems like Caddy or Nginx will prefix their own word to the Server header, e.g., `Caddy, pywidevine server v1.2.3` so I had to change a fair bit of the code to have wider compatibility across some unknowns that may occur with the Serve header.
2022-08-04 08:33:33 +01:00
rlaphoenix
a1494a3742 Allow specification of Cdm device_type as string 2022-08-04 08:26:41 +01:00
rlaphoenix
5b13e1a689 serve: Don't require force_privacy_mode to be defined on config 2022-08-04 08:22:06 +01:00
rlaphoenix
c9288dc391 Update Changelog for v1.3.0 2022-08-04 05:56:47 +01:00
rlaphoenix
7640d6fcab Bump to v1.3.0 2022-08-04 05:56:38 +01:00
rlaphoenix
3d794ad659 RemoteCdm: Implement /set_service_certificate 2022-08-04 05:54:15 +01:00
rlaphoenix
5788dde7b1 serve: Implement /set_service_certificate
Removed service certificate setting related code from /challenge.
2022-08-04 05:54:15 +01:00
rlaphoenix
ddf755f82f Cdm: Add ability to unset certificate via set_service_certificate()
To unset, just provide `None` as the certificate param.
2022-08-04 05:43:10 +01:00
rlaphoenix
e8785fcd84 Create RemoteCdm class as Client code for the serve feature
This can be considered the Client-side code for the `serve` feature.

The RemoteCdm object can be used with the same underlying interface as the normal `Cdm` object. Including stuff like .open(), .get_license_challenge(), .decrypt(), even same access to data like `cdm.system_id`, or even `cdm._sessions` just like normal.

However, since we don't have any private key and client ID, we spoof the super construction with dummy data. You wont have access to any data that uses the underlying Client ID and Private Key like the signer or decrypter. Any Cdm code trying to access them on RemoteCdm will fail.
2022-08-04 05:43:10 +01:00
rlaphoenix
c969d80931 Cdm: Change construction interface to allow manual creation
This is so you can construct a Cdm object without using `.wvd` files (nor the Device class). It also improves enforcement of some required data from the Device. The underlying Device object is discarded for it's data as it won't be required.

Note that the Client ID and Private Key related variables are now stored as private `__var` variables to further amplify their private nature and to really discourage manual read write. This is not impossible to workaround in Python but further discourages manual read/writes to the variable that could cause serious issues.

The RSA Key is also no longer stored as-is. It is now stored as PSS and PKCS1_OAEP objects, as they will be used like so. This makes it even more annoying to directly read/write the RSA key (but not impossible).
2022-08-04 04:52:26 +01:00
10 changed files with 834 additions and 198 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,6 @@
# pywidevine
*.wvd
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]

View File

@@ -5,6 +5,86 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.4.0] - 2022-08-06
This release is a face-lift for the PSSH class with a moderate amount of Cdm and Serve interface changes.
You will likely need to make a moderate amount of changes in your client code, please study the changelog.
Please note that while it was always privatized as `_sessions`, accessing the Session directly for any purpose was
never recommended or supported. With v1.4.0, there will be drastic problems if you continue to do so. One of the
few reasons to do that was to get the license keys which is no longer required with CDMs new `get_keys()` method.
RemoteCdm minimum supported Serve API version is now v1.4.0.
### Added
- The PSSH class now has a `new()` method to craft a new PSSH box. The box can be crafted from arbitrary init_data
and/or key_ids. If only key_ids is supplied a new Widevine Cenc Header will be created and the key IDs will be put
into it. This allows you to make compliant v0 or v1 boxes with as little data as just a Key ID.
- The PSSH class now has `dump()` and `dumps()` methods to serialize the data as binary or base64 respectively. It will
be serialized as a pymp4 PSSH box, ready to be used in an MP4 file.
- Cdm now has a method `get_keys()` to get the keys of the loaded license. This is the alternative to manually
accessing the keys by navigating the `_sessions` class instance variable.
- Serve API now also has a `/get_keys` endpoint to call the `get_keys()` method of the underlying Cdm session.
### Changed
- Cdm and RemoteCdm now expect a PSSH object as the `init_data` param for `get_license_challenge`. You can no longer
provide it anything else, that includes base64 or bytes form. It must be a PSSH object.
- Serve no longer returns license keys in the response of the `/keys` endpoint.
- Serve has changed the endpoint `/challenge` to `/get_license_challenge` and `/keys` to `/parse_license`. This is to
be consistent with the method names of the underlying Cdm class.
- The PSSH class has been reworked from being a static helper class to a proper PSSH class.
- PSSH.from_playready_pssh is now a class method and returns as a PSSH object.
### Removed
- PSSH.get_as_box has been removed and merged into the PSSH constructor.
- PSSH.from_key_ids has been removed entirely, you should now use `PSSH.new(key_ids=...)` instead.
- All uses of a local Session() object has been removed from RemoteCdm. The session is now fully controlled by the
remote API and de-synchronization by external alteration or unexpected exceptions is no longer a possibility.
### Fixed
- Various uses of the `key_ids` field of WidevinePsshData proto has been fixed in the PSSH class.
- Fixed a few Serve API crashes in edge cases with improved error handling on Cdm method calls.
## [1.3.1] - 2022-08-04
### Added
- Cdm and RemoteCdm can now be supplied a string value for `device_type` for scenarios where providing it as a string
is more convenient (e.g., from Config files).
### Fixed
- The `force_privacy_mode` key no longer needs to be defined at all in the configuration file. This was previously
crashing serve APIs if it wasn't set before starting.
- RemoteCdm's Server version check will no longer fail under certain serving conditions e.g., Caddy prepending `Caddy`
to the Server header value. It also fixes case sensitivity and removed the full url from the header.
## [1.3.0] - 2022-08-04
### Added
- New RemoteCdm class to be used as Client code for the `serve` Remote CDM API server. The RemoteCdm should be used
entirely separately from the normal Cdm class. All serve APIs must update to v1.3.0 to be compatible. The RemoteCdm
verifies the server version to ensure compatibility. Changes to the serve API schema will be immediately reflected in
the RemoteCdm code in the future.
- Implemented `/set_service_certificate` endpoint in serve schema as an improved way of setting the service certificate
than passing it to `/challenge`.
- You can now unset the service certificate by providing an empty service certificate value (or None or null). This
includes support for doing so even in serve API and the new RemoteCdm.
### Changed
- The Construction of the Cdm object has changed. You can now initialize it with more direct values if you don't want
to use the Device class or don't want to use `.wvd` files. To use Device classes, you must now use the
`Cdm.from_device()` class method.
- The ability to pass the certificate to `/challenge` has been removed. Please use the new `/set_service_certificate`
endpoint before calling `/challenge`. You do not need to set it every time. Once per session is enough unless you
now want to use a different certificate.
## [1.2.1] - 2022-08-02
This release is primarily a maintenance release for `serve` functionality but some Cdm fixes are also present.
@@ -143,6 +223,8 @@ This release is primarily a maintenance release for `serve` functionality but so
Initial Release.
[1.3.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.3.1
[1.3.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.3.0
[1.2.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.2.1
[1.2.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.2.0
[1.1.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.1.1

View File

@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "pywidevine"
version = "1.2.1"
version = "1.4.0"
description = "Widevine CDM (Content Decryption Module) implementation in Python."
authors = ["rlaphoenix <rlaphoenix@pm.me>"]
license = "GPL-3.0-only"

View File

@@ -1 +1 @@
__version__ = "1.2.1"
__version__ = "1.4.0"

View File

@@ -1,3 +1,5 @@
from __future__ import annotations
import base64
import binascii
import random
@@ -5,7 +7,7 @@ import subprocess
import sys
import time
from pathlib import Path
from typing import Union, Container, Optional
from typing import Union, Optional
from uuid import UUID
from Crypto.Cipher import AES, PKCS1_OAEP
@@ -62,13 +64,62 @@ class Cdm:
MAX_NUM_OF_SESSIONS = 50 # most common limit
def __init__(self, device: Device):
def __init__(
self,
device_type: Union[Device.Types, str],
system_id: int,
security_level: int,
client_id: ClientIdentification,
rsa_key: RSA.RsaKey
):
"""Initialize a Widevine Content Decryption Module (CDM)."""
if not device:
raise ValueError("A Widevine Device must be provided.")
self.device = device
if not device_type:
raise ValueError("Device Type must be provided")
if isinstance(device_type, str):
device_type = Device.Types[device_type]
if not isinstance(device_type, Device.Types):
raise TypeError(f"Expected device_type to be a {Device.Types!r} not {device_type!r}")
self._sessions: dict[bytes, Session] = {}
if not system_id:
raise ValueError("System ID must be provided")
if not isinstance(system_id, int):
raise TypeError(f"Expected system_id to be a {int} not {system_id!r}")
if not security_level:
raise ValueError("Security Level must be provided")
if not isinstance(security_level, int):
raise TypeError(f"Expected security_level to be a {int} not {security_level!r}")
if not client_id:
raise ValueError("Client ID must be provided")
if not isinstance(client_id, ClientIdentification):
raise TypeError(f"Expected client_id to be a {ClientIdentification} not {client_id!r}")
if not rsa_key:
raise ValueError("RSA Key must be provided")
if not isinstance(rsa_key, RSA.RsaKey):
raise TypeError(f"Expected rsa_key to be a {RSA.RsaKey} not {rsa_key!r}")
self.device_type = device_type
self.system_id = system_id
self.security_level = security_level
self.__client_id = client_id
self.__signer = pss.new(rsa_key)
self.__decrypter = PKCS1_OAEP.new(rsa_key)
self.__sessions: dict[bytes, Session] = {}
@classmethod
def from_device(cls, device: Device) -> Cdm:
"""Initialize a Widevine CDM from a Widevine Device (.wvd) file."""
return cls(
device_type=device.type,
system_id=device.system_id,
security_level=device.security_level,
client_id=device.client_id,
rsa_key=device.private_key
)
def open(self) -> bytes:
"""
@@ -77,11 +128,11 @@ class Cdm:
Raises:
TooManySessions: If the session cannot be opened as limit has been reached.
"""
if len(self._sessions) > self.MAX_NUM_OF_SESSIONS:
if len(self.__sessions) > self.MAX_NUM_OF_SESSIONS:
raise TooManySessions(f"Too many Sessions open ({self.MAX_NUM_OF_SESSIONS}).")
session = Session()
self._sessions[session.id] = session
self.__sessions[session.id] = session
return session.id
@@ -95,12 +146,12 @@ class Cdm:
Raises:
InvalidSession: If the Session identifier is invalid.
"""
session = self._sessions.get(session_id)
session = self.__sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
del self._sessions[session_id]
del self.__sessions[session_id]
def set_service_certificate(self, session_id: bytes, certificate: Union[bytes, str]) -> str:
def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
"""
Set a Service Privacy Certificate for Privacy Mode. (optional but recommended)
@@ -116,7 +167,8 @@ class Cdm:
session_id: Session identifier.
certificate: SignedDrmCertificate (or SignedMessage containing one) in Base64
or Bytes form obtained from the Service. Some services have their own,
but most use the common privacy cert, (common_privacy_cert).
but most use the common privacy cert, (common_privacy_cert). If None, it
will remove the current certificate.
Raises:
InvalidSession: If the Session identifier is invalid.
@@ -126,11 +178,18 @@ class Cdm:
match the underlying DrmCertificate.
Returns the Service Provider ID of the verified DrmCertificate if successful.
If certificate is None, it will return the now unset certificate's Provider ID.
"""
session = self._sessions.get(session_id)
session = self.__sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
if certificate is None:
drm_certificate = DrmCertificate()
drm_certificate.ParseFromString(session.service_certificate.drm_certificate)
session.service_certificate = None
return drm_certificate.provider_id
if isinstance(certificate, str):
try:
certificate = base64.b64decode(certificate) # assuming base64
@@ -176,7 +235,7 @@ class Cdm:
def get_license_challenge(
self,
session_id: bytes,
init_data: Union[Container, bytes, str],
pssh: PSSH,
type_: Union[int, str] = LicenseType.STREAMING,
privacy_mode: bool = True
) -> bytes:
@@ -185,8 +244,7 @@ class Cdm:
Parameters:
session_id: Session identifier.
init_data: Widevine Cenc Header (Init Data) or a Protection System Specific
Header Box to take the init data from.
pssh: PSSH Object to get the init data from.
type_: Type of License you wish to exchange, often `STREAMING`. The `OFFLINE`
Licenses are for Offline licensing of Downloaded content.
privacy_mode: Encrypt the Client ID using the Privacy Certificate. If the
@@ -202,16 +260,14 @@ class Cdm:
Returns a SignedMessage containing a LicenseRequest message. It's signed with
the Private Key of the device provision.
"""
session = self._sessions.get(session_id)
session = self.__sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
if not init_data:
raise InvalidInitData("The init_data must not be empty.")
try:
init_data = PSSH.get_as_box(init_data).init_data
except (ValueError, binascii.Error, DecodeError) as e:
raise InvalidInitData(str(e))
if not pssh:
raise InvalidInitData("A pssh must be provided.")
if not isinstance(pssh, PSSH):
raise InvalidInitData(f"Expected pssh to be a {PSSH}, not {pssh!r}")
try:
if isinstance(type_, int):
@@ -231,26 +287,25 @@ class Cdm:
license_request.protocol_version = ProtocolVersion.Value("VERSION_2_1")
license_request.key_control_nonce = random.randrange(1, 2 ** 31)
license_request.content_id.widevine_pssh_data.pssh_data.append(init_data)
# pssh_data may be either a WidevineCencHeader or custom data
# we have to assume the pssh.init_data value is valid, we cannot test
license_request.content_id.widevine_pssh_data.pssh_data.append(pssh.init_data)
license_request.content_id.widevine_pssh_data.license_type = type_
license_request.content_id.widevine_pssh_data.request_id = request_id
if session.service_certificate and privacy_mode:
# encrypt the client id for privacy mode
license_request.encrypted_client_id.CopyFrom(self.encrypt_client_id(
client_id=self.device.client_id,
client_id=self.__client_id,
service_certificate=session.service_certificate
))
else:
license_request.client_id.CopyFrom(self.device.client_id)
license_request.client_id.CopyFrom(self.__client_id)
license_message = SignedMessage()
license_message.type = SignedMessage.MessageType.LICENSE_REQUEST
license_message.msg = license_request.SerializeToString()
license_message.signature = pss. \
new(self.device.private_key). \
sign(SHA1.new(license_message.msg))
license_message.signature = self.__signer.sign(SHA1.new(license_message.msg))
session.context[request_id] = self.derive_context(license_message.msg)
@@ -278,7 +333,7 @@ class Cdm:
SignatureMismatch: If the Signature of the License SignedMessage does not
match the underlying License.
"""
session = self._sessions.get(session_id)
session = self.__sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
@@ -315,11 +370,10 @@ class Cdm:
if not context:
raise InvalidContext("Cannot parse a license message without first making a license request")
session_key = PKCS1_OAEP. \
new(self.device.private_key). \
decrypt(license_message.session_key)
enc_key, mac_key_server, _ = self.derive_keys(*context, session_key)
enc_key, mac_key_server, _ = self.derive_keys(
*context,
key=self.__decrypter.decrypt(license_message.session_key)
)
computed_signature = HMAC. \
new(mac_key_server, digestmod=SHA256). \
@@ -336,6 +390,39 @@ class Cdm:
del session.context[licence.id.request_id]
def get_keys(self, session_id: bytes, type_: Optional[Union[int, str]] = None) -> list[Key]:
"""
Get Keys from the loaded License message.
Parameters:
session_id: Session identifier.
type_: (optional) Key Type to filter by and return.
Raises:
InvalidSession: If the Session identifier is invalid.
TypeError: If the provided type_ is an unexpected value type.
ValueError: If the provided type_ is not a valid Key Type.
"""
session = self.__sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
try:
if isinstance(type_, str):
type_ = License.KeyContainer.KeyType.Value(type_)
elif isinstance(type_, int):
License.KeyContainer.KeyType.Name(type_) # only test
elif type_ is not None:
raise TypeError(f"Expected type_ to be a {License.KeyContainer.KeyType} or int, not {type_!r}")
except ValueError as e:
raise ValueError(f"Could not parse type_ as a {License.KeyContainer.KeyType}, {e}")
return [
key
for key in session.keys
if not type_ or key.type == License.KeyContainer.KeyType.Name(type_)
]
def decrypt(
self,
session_id: bytes,
@@ -388,7 +475,7 @@ class Cdm:
if output_file.is_file() and not exists_ok:
raise FileExistsError(f"Output file already exists, {output_file}")
session = self._sessions.get(session_id)
session = self.__sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")

View File

@@ -32,3 +32,7 @@ class SignatureMismatch(PyWidevineException):
class NoKeysLoaded(PyWidevineException):
"""No License was parsed for this Session, No Keys available."""
class DeviceMismatch(PyWidevineException):
"""The Remote CDMs Device information and the APIs Device information did not match."""

View File

@@ -13,6 +13,7 @@ from pywidevine import __version__
from pywidevine.cdm import Cdm
from pywidevine.device import Device
from pywidevine.license_protocol_pb2 import LicenseType, FileHashes
from pywidevine.pssh import PSSH
@click.group(invoke_without_command=True)
@@ -62,13 +63,16 @@ def license_(device: Path, pssh: str, server: str, type_: str, privacy: bool):
"""
log = logging.getLogger("license")
# prepare pssh
pssh = PSSH(pssh)
# load device
device = Device.load(device)
log.info(f"[+] Loaded Device ({device.system_id} L{device.security_level})")
log.debug(device)
# load cdm
cdm = Cdm(device)
cdm = Cdm.from_device(device)
log.info(f"[+] Loaded CDM")
log.debug(cdm)
@@ -113,9 +117,7 @@ def license_(device: Path, pssh: str, server: str, type_: str, privacy: bool):
log.info("[+] License Parsed Successfully")
# print keys
# Note: This showcases how insecure a Python CDM implementation is
# The keys should not be given to the user, but we cannot prevent this
for key in cdm._sessions[session_id].keys:
for key in cdm.get_keys(session_id):
log.info(f"[{key.type}] {key.kid.hex}:{key.key.hex()}")
# close session, disposes of session data

View File

@@ -2,7 +2,8 @@ from __future__ import annotations
import base64
import binascii
from typing import Union
import string
from typing import Union, Optional
from uuid import UUID
import construct
@@ -21,17 +22,172 @@ class PSSH:
Widevine = UUID(bytes=b"\xed\xef\x8b\xa9\x79\xd6\x4a\xce\xa3\xc8\x27\xdc\xd5\x1d\x21\xed")
PlayReady = UUID(bytes=b"\x9a\x04\xf0\x79\x98\x40\x42\x86\xab\x92\xe6\x5b\xe0\x88\x5f\x95")
def __init__(self, box: Container):
self._box = box
@staticmethod
def from_playready_pssh(box: Container) -> Container:
def __init__(self, data: Union[Container, str, bytes], strict: bool = False):
"""
Convert a PlayReady PSSH to a Widevine PSSH.
Load a PSSH box or Widevine Cenc Header data as a new v0 PSSH box.
[Strict mode (strict=True)]
Supports the following forms of input data in either Base64 or Bytes form:
- Full PSSH mp4 boxes (as defined by pymp4 Box).
- Full Widevine Cenc Headers (as defined by WidevinePsshData proto).
[Lenient mode (strict=False, default)]
If the data is not supported in Strict mode, and is assumed not to be corrupt or
parsed incorrectly, the License Server likely accepts a custom init_data value
during a License Request call. This is uncommon behavior but not out of realm of
possibilities. For example, Netflix does this with it's MSL WidevineExchange
scheme.
Lenient mode will craft a new v0 PSSH box with the init_data field set to
the provided data as-is. The data will first be base64 decoded. This behavior
may not work in your scenario and if that's the case please manually craft
your own PSSH box with the init_data field to be used in License Requests.
Raises:
ValueError: If the data is empty.
TypeError: If the data is an unexpected type.
binascii.Error: If the data could not be decoded as Base64 if provided as a
string.
DecodeError: If the data could not be parsed as a PSSH mp4 box nor a Widevine
Cenc Header and strict mode is enabled.
"""
if not data:
raise ValueError("Data must not be empty.")
if isinstance(data, Container):
box = data
else:
if isinstance(data, str):
try:
data = base64.b64decode(data)
except (binascii.Error, binascii.Incomplete) as e:
raise binascii.Error(f"Could not decode data as Base64, {e}")
if not isinstance(data, bytes):
raise TypeError(f"Expected data to be a {Container}, bytes, or base64, not {data!r}")
try:
box = Box.parse(data)
except (IOError, construct.ConstructError): # not a box
try:
cenc_header = WidevinePsshData()
cenc_header.ParseFromString(data)
cenc_header = cenc_header.SerializeToString()
if cenc_header != data: # not actually a WidevinePsshData
raise DecodeError()
except DecodeError: # not a widevine cenc header
if strict:
raise DecodeError(f"Could not parse data as a {Container} nor a {WidevinePsshData}.")
# Data is not a Widevine Cenc Header, it's something custom.
# The license server likely has something custom to parse it.
# See doc-string about Lenient mode for more information.
cenc_header = data
box = Box.parse(Box.build(dict(
type=b"pssh",
version=0,
flags=0,
system_ID=PSSH.SystemId.Widevine,
init_data=cenc_header
)))
self.version = box.version
self.flags = box.flags
self.system_id = box.system_ID
self.key_ids = box.key_IDs
self.init_data = box.init_data
@classmethod
def new(
cls,
key_ids: Optional[list[Union[UUID, str, bytes]]] = None,
init_data: Optional[Union[WidevinePsshData, str, bytes]] = None,
version: int = 0,
flags: int = 0
) -> PSSH:
"""Craft a new version 0 or 1 PSSH Box."""
if key_ids is not None:
if not isinstance(key_ids, list):
raise TypeError(f"Expected key_ids to be a list not {key_ids!r}")
if init_data is not None:
if not isinstance(init_data, (WidevinePsshData, str, bytes)):
raise TypeError(f"Expected init_data to be a {WidevinePsshData}, base64, or bytes, not {init_data!r}")
if not isinstance(version, int):
raise TypeError(f"Expected version to be an int not {version!r}")
if version not in (0, 1):
raise ValueError(f"Invalid version, must be either 0 or 1, not {version}.")
if not isinstance(flags, int):
raise TypeError(f"Expected flags to be an int not {flags!r}")
if flags < 0:
raise ValueError(f"Invalid flags, cannot be less than 0.")
if version == 0 and key_ids is not None and init_data is not None:
# v0 boxes use only init_data in the pssh field, but we can use the key_ids within the init_data
raise ValueError("Version 0 PSSH boxes must use only init_data, not init_data and key_ids.")
elif version == 1:
# TODO: I cannot tell if they need either init_data or key_ids exclusively, or both is fine
# So for now I will just make sure at least one is supplied
if init_data is None and key_ids is None:
raise ValueError("Version 1 PSSH boxes must use either init_data or key_ids but neither were provided")
if key_ids is not None:
# ensure key_ids are bytes, supports hex, base64, and bytes
key_ids = [
(
x.bytes if isinstance(x, UUID) else
bytes.fromhex(x) if all(c in string.hexdigits for c in x) else
base64.b64decode(x) if isinstance(x, str) else
x
)
for x in key_ids
]
if not all(isinstance(x, bytes) for x in key_ids):
not_bytes = [x for x in key_ids if not isinstance(x, bytes)]
raise TypeError(
"Expected all of key_ids to be a UUID, hex, base64, or bytes, but one or more are not, "
f"{not_bytes!r}"
)
if init_data is not None:
if isinstance(init_data, WidevinePsshData):
init_data = init_data.SerializeToString()
elif isinstance(init_data, str):
if all(c in string.hexdigits for c in init_data):
init_data = bytes.fromhex(init_data)
else:
init_data = base64.b64decode(init_data)
elif not isinstance(init_data, bytes):
raise TypeError(
f"Expecting init_data to be {WidevinePsshData}, hex, base64, or bytes, not {init_data!r}"
)
box = Box.parse(Box.build(dict(
type=b"pssh",
version=version,
flags=flags,
system_ID=PSSH.SystemId.Widevine,
key_ids=[key_ids, b""][key_ids is None],
init_data=[init_data, b""][init_data is None]
)))
if key_ids and version == 0:
PSSH.overwrite_key_ids(box, [UUID(bytes=x) for x in key_ids])
return cls(box)
@classmethod
def from_playready_pssh(cls, box: Container) -> PSSH:
"""
Convert a PlayReady PSSH Box to a Widevine PSSH Box.
Note: The resulting Widevine PSSH will likely not be usable for Licensing. This
is because there is some data for a Widevine CENC Header that is not going to be
listed in a PlayReady PSSH.
is because there is some data for a Widevine Cenc Header that is missing from a
PlayReady PSSH Box.
This converted PSSH will only be useful for it's Key IDs, so realistically only
for matching Key IDs with a Track. As a fallback.
@@ -47,7 +203,7 @@ class PSSH:
cenc_header.algorithm = 1 # 0=Clear, 1=AES-CTR
for key_id in key_ids:
cenc_header.key_id.append(key_id.bytes)
cenc_header.key_ids.append(key_id.bytes)
if box.version == 1:
# ensure both cenc header and box has same Key IDs
# v1 uses both this and within init data for basically no reason
@@ -56,83 +212,22 @@ class PSSH:
box.init_data = cenc_header.SerializeToString()
box.system_ID = PSSH.SystemId.Widevine
return box
return cls(box)
@staticmethod
def from_key_ids(key_ids: list[UUID]) -> Container:
"""
Craft a new PSSH Box from just Key IDs.
This should only be used as a very last measure.
"""
cenc_header = WidevinePsshData()
for key_id in key_ids:
cenc_header.key_id.append(key_id.bytes)
cenc_header.algorithm = 1 # 0=Clear, 1=AES-CTR
box = Box.parse(Box.build(dict(
def dump(self) -> bytes:
"""Export the PSSH object as a full PSSH box in bytes form."""
return Box.build(dict(
type=b"pssh",
version=0,
flags=0,
system_ID=PSSH.SystemId.Widevine,
init_data=cenc_header.SerializeToString()
)))
version=self.version,
flags=self.flags,
system_ID=self.system_id,
key_IDs=self.key_ids,
init_data=self.init_data
))
return box
@staticmethod
def get_as_box(data: Union[Container, bytes, str], strict: bool = False) -> Container:
"""
Get possibly arbitrary data as a parsed PSSH mp4 box.
Parameters:
data: PSSH mp4 box, Widevine Cenc Header (init data), or arbitrary data to
parse or craft into a PSSH mp4 box.
strict: Do not return a PSSH box for arbitrary data. Require the data to be
at least a PSSH mp4 box, or a Widevine Cenc Header.
Raises:
ValueError: If the data is empty, or an unexpected type.
binascii.Error: If the data could not be decoded as Base64 if provided
as a string.
DecodeError: If the data could not be parsed as a PSSH mp4 box
nor a Widevine Cenc Header while strict=True.
"""
if not data:
raise ValueError("Data must not be empty.")
if isinstance(data, Container):
return data
if isinstance(data, str):
try:
data = base64.b64decode(data)
except (binascii.Error, binascii.Incomplete) as e:
raise binascii.Error(f"Could not decode data as Base64, {e}")
if isinstance(data, bytes):
try:
data = Box.parse(data)
except (IOError, construct.ConstructError):
if strict:
try:
cenc_header = WidevinePsshData()
if cenc_header.MergeFromString(data) < len(data):
raise DecodeError()
except DecodeError:
raise DecodeError(f"Could not parse data as a PSSH mp4 box nor a Widevine Cenc Header.")
else:
data = cenc_header.SerializeToString()
data = Box.parse(Box.build(dict(
type=b"pssh",
version=0,
flags=0,
system_ID=PSSH.SystemId.Widevine,
init_data=data
)))
else:
raise ValueError(f"Data is an unexpected type, expected bytes got {data!r}.")
return data
def dumps(self) -> str:
"""Export the PSSH object as a full PSSH box in base64 form."""
return base64.b64encode(self.dump()).decode()
@staticmethod
def get_key_ids(box: Container) -> list[UUID]:
@@ -153,7 +248,7 @@ class PSSH:
return [
# the key_ids value may or may not be hex underlying
UUID(bytes=key_id) if len(key_id) == 16 else UUID(hex=key_id.decode())
for key_id in init.key_id
for key_id in init.key_ids
]
if box.system_ID == PSSH.SystemId.PlayReady:
@@ -191,13 +286,10 @@ class PSSH:
init = WidevinePsshData()
init.ParseFromString(box.init_data)
# TODO: Is there a better way to clear the Key IDs?
for _ in range(len(init.key_id or [])):
init.key_id.pop(0)
# TODO: Is there a .extend or a way to add all without a loop?
for key_id in key_ids:
init.key_id.append(key_id.bytes)
init.key_ids[:] = [
key_id.bytes
for key_id in key_ids
]
box.init_data = init.SerializeToString()

256
pywidevine/remotecdm.py Normal file
View File

@@ -0,0 +1,256 @@
from __future__ import annotations
import base64
import binascii
import re
from typing import Union, Optional
import requests
from Crypto.PublicKey import RSA
from google.protobuf.message import DecodeError
from pywidevine.cdm import Cdm
from pywidevine.device import Device
from pywidevine.exceptions import InvalidInitData, InvalidLicenseType, InvalidLicenseMessage, DeviceMismatch
from pywidevine.key import Key
from pywidevine.license_protocol_pb2 import LicenseType, SignedMessage, License, ClientIdentification
from pywidevine.pssh import PSSH
class RemoteCdm(Cdm):
"""Remote Accessible CDM using pywidevine's serve schema."""
def __init__(
self,
device_type: Union[Device.Types, str],
system_id: int,
security_level: int,
host: str,
secret: str,
device_name: str
):
"""Initialize a Widevine Content Decryption Module (CDM)."""
if not device_type:
raise ValueError("Device Type must be provided")
if isinstance(device_type, str):
device_type = Device.Types[device_type]
if not isinstance(device_type, Device.Types):
raise TypeError(f"Expected device_type to be a {Device.Types!r} not {device_type!r}")
if not system_id:
raise ValueError("System ID must be provided")
if not isinstance(system_id, int):
raise TypeError(f"Expected system_id to be a {int} not {system_id!r}")
if not security_level:
raise ValueError("Security Level must be provided")
if not isinstance(security_level, int):
raise TypeError(f"Expected security_level to be a {int} not {security_level!r}")
if not host:
raise ValueError("API Host must be provided")
if not isinstance(host, str):
raise TypeError(f"Expected host to be a {str} not {host!r}")
if not secret:
raise ValueError("API Secret must be provided")
if not isinstance(secret, str):
raise TypeError(f"Expected secret to be a {str} not {secret!r}")
if not device_name:
raise ValueError("API Device name must be provided")
if not isinstance(device_name, str):
raise TypeError(f"Expected device_name to be a {str} not {device_name!r}")
self.device_type = device_type
self.system_id = system_id
self.security_level = security_level
self.host = host
self.device_name = device_name
# spoof client_id and rsa_key just so we can construct via super call
super().__init__(device_type, system_id, security_level, ClientIdentification(), RSA.generate(2048))
self.__session = requests.Session()
self.__session.headers.update({
"X-Secret-Key": secret
})
r = requests.head(self.host)
if r.status_code != 200:
raise ValueError(f"Could not test Remote API version [{r.status_code}]")
server = r.headers.get("Server")
if not server or "pywidevine serve" not in server.lower():
raise ValueError(f"This Remote CDM API does not seem to be a pywidevine serve API ({server}).")
server_version = re.search(r"pywidevine serve v([\d.]+)", server, re.IGNORECASE)
if not server_version:
raise ValueError(f"The pywidevine server API is not stating the version correctly, cannot continue.")
server_version = server_version.group(1)
if server_version < "1.4.0":
raise ValueError(f"This pywidevine serve API version ({server_version}) is not supported.")
@classmethod
def from_device(cls, device: Device) -> RemoteCdm:
raise NotImplementedError("You cannot load a RemoteCdm from a local Device file.")
def open(self) -> bytes:
r = self.__session.get(
url=f"{self.host}/{self.device_name}/open"
).json()
if r['status'] != 200:
raise ValueError(f"Cannot Open CDM Session, {r['message']} [{r['status']}]")
r = r["data"]
if int(r["device"]["system_id"]) != self.system_id:
raise DeviceMismatch("The System ID specified does not match the one specified in the API response.")
if int(r["device"]["security_level"]) != self.security_level:
raise DeviceMismatch("The Security Level specified does not match the one specified in the API response.")
return bytes.fromhex(r["session_id"])
def close(self, session_id: bytes) -> None:
r = self.__session.get(
url=f"{self.host}/{self.device_name}/close/{session_id.hex()}"
).json()
if r["status"] != 200:
raise ValueError(f"Cannot Close CDM Session, {r['message']} [{r['status']}]")
def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
if certificate is None:
certificate_b64 = None
elif isinstance(certificate, str):
certificate_b64 = certificate # assuming base64
elif isinstance(certificate, bytes):
certificate_b64 = base64.b64encode(certificate).decode()
else:
raise DecodeError(f"Expecting Certificate to be base64 or bytes, not {certificate!r}")
r = self.__session.post(
url=f"{self.host}/{self.device_name}/set_service_certificate",
json={
"session_id": session_id.hex(),
"certificate": certificate_b64
}
).json()
if r["status"] != 200:
raise ValueError(f"Cannot Set CDMs Service Certificate, {r['message']} [{r['status']}]")
r = r["data"]
return r["provider_id"]
def get_license_challenge(
self,
session_id: bytes,
pssh: PSSH,
type_: Union[int, str] = LicenseType.STREAMING,
privacy_mode: bool = True
) -> bytes:
if not pssh:
raise InvalidInitData("A pssh must be provided.")
if not isinstance(pssh, PSSH):
raise InvalidInitData(f"Expected pssh to be a {PSSH}, not {pssh!r}")
try:
if isinstance(type_, int):
type_ = LicenseType.Name(int(type_))
elif isinstance(type_, str):
type_ = LicenseType.Name(LicenseType.Value(type_))
elif isinstance(type_, LicenseType):
type_ = LicenseType.Name(type_)
else:
raise InvalidLicenseType()
except ValueError:
raise InvalidLicenseType(f"License Type {type_!r} is invalid")
r = self.__session.post(
url=f"{self.host}/{self.device_name}/get_license_challenge/{type_}",
json={
"session_id": session_id.hex(),
"init_data": pssh.dumps()
}
).json()
if r["status"] != 200:
raise ValueError(f"Cannot get Challenge, {r['message']} [{r['status']}]")
r = r["data"]
try:
license_message = SignedMessage()
license_message.ParseFromString(base64.b64decode(r["challenge_b64"]))
except DecodeError as e:
raise InvalidLicenseMessage(f"Failed to parse license request, {e}")
return license_message.SerializeToString()
def parse_license(self, session_id: bytes, license_message: Union[SignedMessage, bytes, str]) -> None:
if not license_message:
raise InvalidLicenseMessage("Cannot parse an empty license_message")
if isinstance(license_message, str):
try:
license_message = base64.b64decode(license_message)
except (binascii.Error, binascii.Incomplete) as e:
raise InvalidLicenseMessage(f"Could not decode license_message as Base64, {e}")
if isinstance(license_message, bytes):
signed_message = SignedMessage()
try:
signed_message.ParseFromString(license_message)
except DecodeError as e:
raise InvalidLicenseMessage(f"Could not parse license_message as a SignedMessage, {e}")
license_message = signed_message
if not isinstance(license_message, SignedMessage):
raise InvalidLicenseMessage(f"Expecting license_response to be a SignedMessage, got {license_message!r}")
if license_message.type != SignedMessage.MessageType.LICENSE:
raise InvalidLicenseMessage(
f"Expecting a LICENSE message, not a "
f"'{SignedMessage.MessageType.Name(license_message.type)}' message."
)
r = self.__session.post(
url=f"{self.host}/{self.device_name}/parse_license",
json={
"session_id": session_id.hex(),
"license_message": base64.b64encode(license_message.SerializeToString()).decode()
}
).json()
if r["status"] != 200:
raise ValueError(f"Cannot parse License, {r['message']} [{r['status']}]")
def get_keys(self, session_id: bytes, type_: Optional[Union[int, str]] = None) -> list[Key]:
try:
if isinstance(type_, str):
License.KeyContainer.KeyType.Value(type_) # only test
elif isinstance(type_, int):
type_ = License.KeyContainer.KeyType.Name(type_)
elif type_ is None:
type_ = "ALL"
else:
raise TypeError(f"Expected type_ to be a {License.KeyContainer.KeyType} or int, not {type_!r}")
except ValueError as e:
raise ValueError(f"Could not parse type_ as a {License.KeyContainer.KeyType}, {e}")
r = self.__session.post(
url=f"{self.host}/{self.device_name}/get_keys/{type_}",
json={
"session_id": session_id.hex()
}
).json()
if r["status"] != 200:
raise ValueError(f"Could not get {type_} Keys, {r['message']} [{r['status']}]")
r = r["data"]
return [
Key(
type_=key["type"],
kid=Key.kid_to_uuid(bytes.fromhex(key["key_id"])),
key=bytes.fromhex(key["key"]),
permissions=key["permissions"]
)
for key in r["keys"]
]
__ALL__ = (RemoteCdm,)

View File

@@ -3,6 +3,10 @@ import sys
from pathlib import Path
from typing import Optional, Union
from google.protobuf.message import DecodeError
from pywidevine.pssh import PSSH
try:
from aiohttp import web
except ImportError:
@@ -16,8 +20,8 @@ except ImportError:
from pywidevine import __version__
from pywidevine.cdm import Cdm
from pywidevine.device import Device
from pywidevine.exceptions import TooManySessions, InvalidSession
from pywidevine.license_protocol_pb2 import LicenseType, License
from pywidevine.exceptions import TooManySessions, InvalidSession, SignatureMismatch, InvalidInitData, \
InvalidLicenseType, InvalidLicenseMessage, InvalidContext
routes = web.RouteTableDef()
@@ -63,10 +67,10 @@ async def open(request: web.Request) -> web.Response:
"message": f"Device '{device_name}' is not found or you are not authorized to use it."
}, status=403)
cdm = request.app["cdms"].get((secret_key, device_name))
cdm: Optional[Cdm] = request.app["cdms"].get((secret_key, device_name))
if not cdm:
device = Device.load(request.app["config"]["devices"][device_name])
cdm = request.app["cdms"][(secret_key, device_name)] = Cdm(device)
cdm = request.app["cdms"][(secret_key, device_name)] = Cdm.from_device(device)
try:
session_id = cdm.open()
@@ -82,8 +86,8 @@ async def open(request: web.Request) -> web.Response:
"data": {
"session_id": session_id.hex(),
"device": {
"system_id": cdm.device.system_id,
"security_level": cdm.device.security_level
"system_id": cdm.system_id,
"security_level": cdm.security_level
}
}
})
@@ -95,7 +99,7 @@ async def close(request: web.Request) -> web.Response:
device_name = request.match_info["device"]
session_id = bytes.fromhex(request.match_info["session_id"])
cdm = request.app["cdms"].get((secret_key, device_name))
cdm: Optional[Cdm] = request.app["cdms"].get((secret_key, device_name))
if not cdm:
return web.json_response({
"status": 400,
@@ -104,10 +108,10 @@ async def close(request: web.Request) -> web.Response:
try:
cdm.close(session_id)
except InvalidSession as e:
except InvalidSession:
return web.json_response({
"status": 400,
"message": str(e)
"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
}, status=400)
return web.json_response({
@@ -116,11 +120,69 @@ async def close(request: web.Request) -> web.Response:
})
@routes.post("/{device}/challenge/{license_type}")
async def challenge(request: web.Request) -> web.Response:
@routes.post("/{device}/set_service_certificate")
async def set_service_certificate(request: web.Request) -> web.Response:
secret_key = request.headers["X-Secret-Key"]
device_name = request.match_info["device"]
body = await request.json()
for required_field in ("session_id", "certificate"):
if required_field == "certificate":
has_field = required_field in body # it needs the key, but can be empty/null
else:
has_field = body.get(required_field)
if not has_field:
return web.json_response({
"status": 400,
"message": f"Missing required field '{required_field}' in JSON body."
}, status=400)
# get session id
session_id = bytes.fromhex(body["session_id"])
# get cdm
cdm: Optional[Cdm] = request.app["cdms"].get((secret_key, device_name))
if not cdm:
return web.json_response({
"status": 400,
"message": f"No Cdm session for {device_name} has been opened yet. No session to use."
}, status=400)
# set service certificate
certificate = body.get("certificate")
try:
provider_id = cdm.set_service_certificate(session_id, certificate)
except InvalidSession:
return web.json_response({
"status": 400,
"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
}, status=400)
except DecodeError as e:
return web.json_response({
"status": 400,
"message": f"Invalid Service Certificate, {e}"
}, status=400)
except SignatureMismatch:
return web.json_response({
"status": 400,
"message": "Signature Validation failed on the Service Certificate, rejecting."
}, status=400)
return web.json_response({
"status": 200,
"message": f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.",
"data": {
"provider_id": provider_id
}
})
@routes.post("/{device}/get_license_challenge/{license_type}")
async def get_license_challenge(request: web.Request) -> web.Response:
secret_key = request.headers["X-Secret-Key"]
device_name = request.match_info["device"]
license_type = request.match_info["license_type"]
body = await request.json()
for required_field in ("session_id", "init_data"):
if not body.get(required_field):
@@ -133,41 +195,47 @@ async def challenge(request: web.Request) -> web.Response:
session_id = bytes.fromhex(body["session_id"])
# get cdm
cdm = request.app["cdms"].get((secret_key, device_name))
cdm: Optional[Cdm] = request.app["cdms"].get((secret_key, device_name))
if not cdm:
return web.json_response({
"status": 400,
"message": f"No Cdm session for {device_name} has been opened yet. No session to use."
}, status=400)
if session_id not in cdm._sessions:
# This can happen if:
# - API server gets shutdown/restarted,
# - The user calls /challenge before /open,
# - The user called /open on a different IP Address
# - The user closed the session
return web.json_response({
"status": 400,
"message": "Invalid Session ID. Session ID may have Expired."
}, status=400)
# set service certificate
service_certificate = body.get("service_certificate")
if request.app["config"]["force_privacy_mode"] and not service_certificate:
# enforce service certificate (opt-in)
# TODO: Add a way to check if there's a service certificate set properly
if request.app["config"].get("force_privacy_mode") and not cdm._Cdm__sessions[session_id].service_certificate:
return web.json_response({
"status": 403,
"message": "No Service Certificate provided but Privacy Mode is Enforced."
"message": "No Service Certificate set but Privacy Mode is Enforced."
}, status=403)
if service_certificate:
cdm.set_service_certificate(session_id, service_certificate)
# get init data
init_data = PSSH(body["init_data"])
# get challenge
license_request = cdm.get_license_challenge(
session_id=session_id,
init_data=body["init_data"],
type_=LicenseType.Value(request.match_info["license_type"]),
privacy_mode=True
)
try:
license_request = cdm.get_license_challenge(
session_id=session_id,
pssh=init_data,
type_=license_type,
privacy_mode=True
)
except InvalidSession:
return web.json_response({
"status": 400,
"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
}, status=400)
except InvalidInitData as e:
return web.json_response({
"status": 400,
"message": f"Invalid Init Data, {e}"
}, status=400)
except InvalidLicenseType:
return web.json_response({
"status": 400,
"message": f"Invalid License Type '{license_type}'"
}, status=400)
return web.json_response({
"status": 200,
@@ -178,8 +246,8 @@ async def challenge(request: web.Request) -> web.Response:
}, status=200)
@routes.post("/{device}/keys/{key_type}")
async def keys(request: web.Request) -> web.Response:
@routes.post("/{device}/parse_license")
async def parse_license(request: web.Request) -> web.Response:
secret_key = request.headers["X-Secret-Key"]
device_name = request.match_info["device"]
@@ -194,21 +262,64 @@ async def keys(request: web.Request) -> web.Response:
# get session id
session_id = bytes.fromhex(body["session_id"])
# get cdm
cdm: Optional[Cdm] = request.app["cdms"].get((secret_key, device_name))
if not cdm:
return web.json_response({
"status": 400,
"message": f"No Cdm session for {device_name} has been opened yet. No session to use."
}, status=400)
# parse the license message
try:
cdm.parse_license(session_id, body["license_message"])
except InvalidSession:
return web.json_response({
"status": 400,
"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
}, status=400)
except InvalidLicenseMessage as e:
return web.json_response({
"status": 400,
"message": f"Invalid License Message, {e}"
}, status=400)
except InvalidContext as e:
return web.json_response({
"status": 400,
"message": f"Invalid Context, {e}"
}, status=400)
except SignatureMismatch:
return web.json_response({
"status": 400,
"message": "Signature Validation failed on the License Message, rejecting."
}, status=400)
return web.json_response({
"status": 200,
"message": "Successfully parsed and loaded the Keys from the License message."
})
@routes.post("/{device}/get_keys/{key_type}")
async def get_keys(request: web.Request) -> web.Response:
secret_key = request.headers["X-Secret-Key"]
device_name = request.match_info["device"]
body = await request.json()
for required_field in ("session_id",):
if not body.get(required_field):
return web.json_response({
"status": 400,
"message": f"Missing required field '{required_field}' in JSON body."
}, status=400)
# get session id
session_id = bytes.fromhex(body["session_id"])
# get key type
key_type = request.match_info["key_type"]
if key_type == "ALL":
key_type = None
else:
try:
if key_type.isdigit():
key_type = License.KeyContainer.KeyType.Name(int(key_type))
else:
License.KeyContainer.KeyType.Value(key_type) # only test
except ValueError as e:
return web.json_response({
"status": 400,
"message": f"The Key Type value is invalid, {e}"
}, status=400)
# get cdm
cdm = request.app["cdms"].get((secret_key, device_name))
@@ -218,29 +329,29 @@ async def keys(request: web.Request) -> web.Response:
"message": f"No Cdm session for {device_name} has been opened yet. No session to use."
}, status=400)
if session_id not in cdm._sessions:
# This can happen if:
# - API server gets shutdown/restarted,
# - The user calls /challenge before /open,
# - The user called /open on a different IP Address
# - The user closed the session
# get keys
try:
keys = cdm.get_keys(session_id, key_type)
except InvalidSession:
return web.json_response({
"status": 400,
"message": "Invalid Session ID. Session ID may have Expired."
"message": f"Invalid Session ID '{session_id.hex()}', it may have expired."
}, status=400)
except ValueError as e:
return web.json_response({
"status": 400,
"message": f"The Key Type value '{key_type}' is invalid, {e}"
}, status=400)
# parse the license message
cdm.parse_license(session_id, body["license_message"])
# prepare the keys
license_keys = [
# get the keys in json form
keys_json = [
{
"key_id": key.kid.hex,
"key": key.key.hex(),
"type": key.type,
"permissions": key.permissions,
}
for key in cdm._sessions[session_id].keys
for key in keys
if not key_type or key.type == key_type
]
@@ -248,8 +359,7 @@ async def keys(request: web.Request) -> web.Response:
"status": 200,
"message": "Success",
"data": {
# TODO: Add derived context keys like enc/mac[client]/mac[server]
"keys": license_keys
"keys": keys_json
}
})