Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f09a06857a | ||
|
|
e4f6a23725 | ||
|
|
f21a21712b | ||
|
|
a1494a3742 | ||
|
|
5b13e1a689 | ||
|
|
c9288dc391 | ||
|
|
7640d6fcab | ||
|
|
3d794ad659 | ||
|
|
5788dde7b1 | ||
|
|
ddf755f82f | ||
|
|
e8785fcd84 | ||
|
|
c969d80931 |
38
CHANGELOG.md
38
CHANGELOG.md
@@ -5,6 +5,42 @@ All notable changes to this project will be documented in this file.
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [1.3.1] - 2022-08-04
|
||||
|
||||
### Added
|
||||
|
||||
- Cdm and RemoteCdm can now be supplied a string value for `device_type` for scenarios where providing it as a string
|
||||
is more convenient (e.g., from Config files).
|
||||
|
||||
### Fixed
|
||||
|
||||
- The `force_privacy_mode` key no longer needs to be defined at all in the configuration file. This was previously
|
||||
crashing serve APIs if it wasn't set before starting.
|
||||
- RemoteCdm's Server version check will no longer fail under certain serving conditions e.g., Caddy prepending `Caddy`
|
||||
to the Server header value. It also fixes case sensitivity and removed the full url from the header.
|
||||
|
||||
## [1.3.0] - 2022-08-04
|
||||
|
||||
### Added
|
||||
|
||||
- New RemoteCdm class to be used as Client code for the `serve` Remote CDM API server. The RemoteCdm should be used
|
||||
entirely separately from the normal Cdm class. All serve APIs must update to v1.3.0 to be compatible. The RemoteCdm
|
||||
verifies the server version to ensure compatibility. Changes to the serve API schema will be immediately reflected in
|
||||
the RemoteCdm code in the future.
|
||||
- Implemented `/set_service_certificate` endpoint in serve schema as an improved way of setting the service certificate
|
||||
than passing it to `/challenge`.
|
||||
- You can now unset the service certificate by providing an empty service certificate value (or None or null). This
|
||||
includes support for doing so even in serve API and the new RemoteCdm.
|
||||
|
||||
### Changed
|
||||
|
||||
- The Construction of the Cdm object has changed. You can now initialize it with more direct values if you don't want
|
||||
to use the Device class or don't want to use `.wvd` files. To use Device classes, you must now use the
|
||||
`Cdm.from_device()` class method.
|
||||
- The ability to pass the certificate to `/challenge` has been removed. Please use the new `/set_service_certificate`
|
||||
endpoint before calling `/challenge`. You do not need to set it every time. Once per session is enough unless you
|
||||
now want to use a different certificate.
|
||||
|
||||
## [1.2.1] - 2022-08-02
|
||||
|
||||
This release is primarily a maintenance release for `serve` functionality but some Cdm fixes are also present.
|
||||
@@ -143,6 +179,8 @@ This release is primarily a maintenance release for `serve` functionality but so
|
||||
|
||||
Initial Release.
|
||||
|
||||
[1.3.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.3.1
|
||||
[1.3.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.3.0
|
||||
[1.2.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.2.1
|
||||
[1.2.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.2.0
|
||||
[1.1.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.1.1
|
||||
|
||||
@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[tool.poetry]
|
||||
name = "pywidevine"
|
||||
version = "1.2.1"
|
||||
version = "1.3.1"
|
||||
description = "Widevine CDM (Content Decryption Module) implementation in Python."
|
||||
authors = ["rlaphoenix <rlaphoenix@pm.me>"]
|
||||
license = "GPL-3.0-only"
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "1.2.1"
|
||||
__version__ = "1.3.1"
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
import random
|
||||
@@ -62,14 +64,63 @@ class Cdm:
|
||||
|
||||
MAX_NUM_OF_SESSIONS = 50 # most common limit
|
||||
|
||||
def __init__(self, device: Device):
|
||||
def __init__(
|
||||
self,
|
||||
device_type: Union[Device.Types, str],
|
||||
system_id: int,
|
||||
security_level: int,
|
||||
client_id: ClientIdentification,
|
||||
rsa_key: RSA.RsaKey
|
||||
):
|
||||
"""Initialize a Widevine Content Decryption Module (CDM)."""
|
||||
if not device:
|
||||
raise ValueError("A Widevine Device must be provided.")
|
||||
self.device = device
|
||||
if not device_type:
|
||||
raise ValueError("Device Type must be provided")
|
||||
if isinstance(device_type, str):
|
||||
device_type = Device.Types[device_type]
|
||||
if not isinstance(device_type, Device.Types):
|
||||
raise TypeError(f"Expected device_type to be a {Device.Types!r} not {device_type!r}")
|
||||
|
||||
if not system_id:
|
||||
raise ValueError("System ID must be provided")
|
||||
if not isinstance(system_id, int):
|
||||
raise TypeError(f"Expected system_id to be a {int} not {system_id!r}")
|
||||
|
||||
if not security_level:
|
||||
raise ValueError("Security Level must be provided")
|
||||
if not isinstance(security_level, int):
|
||||
raise TypeError(f"Expected security_level to be a {int} not {security_level!r}")
|
||||
|
||||
if not client_id:
|
||||
raise ValueError("Client ID must be provided")
|
||||
if not isinstance(client_id, ClientIdentification):
|
||||
raise TypeError(f"Expected client_id to be a {ClientIdentification} not {client_id!r}")
|
||||
|
||||
if not rsa_key:
|
||||
raise ValueError("RSA Key must be provided")
|
||||
if not isinstance(rsa_key, RSA.RsaKey):
|
||||
raise TypeError(f"Expected rsa_key to be a {RSA.RsaKey} not {rsa_key!r}")
|
||||
|
||||
self.device_type = device_type
|
||||
self.system_id = system_id
|
||||
self.security_level = security_level
|
||||
self.__client_id = client_id
|
||||
|
||||
self.__signer = pss.new(rsa_key)
|
||||
self.__decrypter = PKCS1_OAEP.new(rsa_key)
|
||||
|
||||
self._sessions: dict[bytes, Session] = {}
|
||||
|
||||
@classmethod
|
||||
def from_device(cls, device: Device) -> Cdm:
|
||||
"""Initialize a Widevine CDM from a Widevine Device (.wvd) file."""
|
||||
return cls(
|
||||
device_type=device.type,
|
||||
system_id=device.system_id,
|
||||
security_level=device.security_level,
|
||||
client_id=device.client_id,
|
||||
rsa_key=device.private_key
|
||||
)
|
||||
|
||||
def open(self) -> bytes:
|
||||
"""
|
||||
Open a Widevine Content Decryption Module (CDM) session.
|
||||
@@ -100,7 +151,7 @@ class Cdm:
|
||||
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
|
||||
del self._sessions[session_id]
|
||||
|
||||
def set_service_certificate(self, session_id: bytes, certificate: Union[bytes, str]) -> str:
|
||||
def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
|
||||
"""
|
||||
Set a Service Privacy Certificate for Privacy Mode. (optional but recommended)
|
||||
|
||||
@@ -116,7 +167,8 @@ class Cdm:
|
||||
session_id: Session identifier.
|
||||
certificate: SignedDrmCertificate (or SignedMessage containing one) in Base64
|
||||
or Bytes form obtained from the Service. Some services have their own,
|
||||
but most use the common privacy cert, (common_privacy_cert).
|
||||
but most use the common privacy cert, (common_privacy_cert). If None, it
|
||||
will remove the current certificate.
|
||||
|
||||
Raises:
|
||||
InvalidSession: If the Session identifier is invalid.
|
||||
@@ -126,11 +178,18 @@ class Cdm:
|
||||
match the underlying DrmCertificate.
|
||||
|
||||
Returns the Service Provider ID of the verified DrmCertificate if successful.
|
||||
If certificate is None, it will return the now unset certificate's Provider ID.
|
||||
"""
|
||||
session = self._sessions.get(session_id)
|
||||
if not session:
|
||||
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
|
||||
|
||||
if certificate is None:
|
||||
drm_certificate = DrmCertificate()
|
||||
drm_certificate.ParseFromString(session.service_certificate.drm_certificate)
|
||||
session.service_certificate = None
|
||||
return drm_certificate.provider_id
|
||||
|
||||
if isinstance(certificate, str):
|
||||
try:
|
||||
certificate = base64.b64decode(certificate) # assuming base64
|
||||
@@ -238,19 +297,16 @@ class Cdm:
|
||||
if session.service_certificate and privacy_mode:
|
||||
# encrypt the client id for privacy mode
|
||||
license_request.encrypted_client_id.CopyFrom(self.encrypt_client_id(
|
||||
client_id=self.device.client_id,
|
||||
client_id=self.__client_id,
|
||||
service_certificate=session.service_certificate
|
||||
))
|
||||
else:
|
||||
license_request.client_id.CopyFrom(self.device.client_id)
|
||||
license_request.client_id.CopyFrom(self.__client_id)
|
||||
|
||||
license_message = SignedMessage()
|
||||
license_message.type = SignedMessage.MessageType.LICENSE_REQUEST
|
||||
license_message.msg = license_request.SerializeToString()
|
||||
|
||||
license_message.signature = pss. \
|
||||
new(self.device.private_key). \
|
||||
sign(SHA1.new(license_message.msg))
|
||||
license_message.signature = self.__signer.sign(SHA1.new(license_message.msg))
|
||||
|
||||
session.context[request_id] = self.derive_context(license_message.msg)
|
||||
|
||||
@@ -315,11 +371,10 @@ class Cdm:
|
||||
if not context:
|
||||
raise InvalidContext("Cannot parse a license message without first making a license request")
|
||||
|
||||
session_key = PKCS1_OAEP. \
|
||||
new(self.device.private_key). \
|
||||
decrypt(license_message.session_key)
|
||||
|
||||
enc_key, mac_key_server, _ = self.derive_keys(*context, session_key)
|
||||
enc_key, mac_key_server, _ = self.derive_keys(
|
||||
*context,
|
||||
key=self.__decrypter.decrypt(license_message.session_key)
|
||||
)
|
||||
|
||||
computed_signature = HMAC. \
|
||||
new(mac_key_server, digestmod=SHA256). \
|
||||
|
||||
@@ -32,3 +32,7 @@ class SignatureMismatch(PyWidevineException):
|
||||
|
||||
class NoKeysLoaded(PyWidevineException):
|
||||
"""No License was parsed for this Session, No Keys available."""
|
||||
|
||||
|
||||
class DeviceMismatch(PyWidevineException):
|
||||
"""The Remote CDMs Device information and the APIs Device information did not match."""
|
||||
|
||||
@@ -68,7 +68,7 @@ def license_(device: Path, pssh: str, server: str, type_: str, privacy: bool):
|
||||
log.debug(device)
|
||||
|
||||
# load cdm
|
||||
cdm = Cdm(device)
|
||||
cdm = Cdm.from_device(device)
|
||||
log.info(f"[+] Loaded CDM")
|
||||
log.debug(cdm)
|
||||
|
||||
|
||||
258
pywidevine/remotecdm.py
Normal file
258
pywidevine/remotecdm.py
Normal file
@@ -0,0 +1,258 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
import re
|
||||
from typing import Union, Optional
|
||||
|
||||
import requests
|
||||
from Crypto.PublicKey import RSA
|
||||
from construct import Container
|
||||
from google.protobuf.message import DecodeError
|
||||
from pywidevine.cdm import Cdm
|
||||
from pywidevine.device import Device
|
||||
from pywidevine.exceptions import InvalidSession, InvalidInitData, InvalidLicenseType, TooManySessions, \
|
||||
InvalidLicenseMessage, DeviceMismatch
|
||||
from pywidevine.key import Key
|
||||
|
||||
from pywidevine.license_protocol_pb2 import LicenseType, SignedMessage, License, ClientIdentification
|
||||
from pywidevine.pssh import PSSH
|
||||
from pywidevine.session import Session
|
||||
|
||||
|
||||
class RemoteCdm(Cdm):
|
||||
"""Remote Accessible CDM using pywidevine's serve schema."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
device_type: Union[Device.Types, str],
|
||||
system_id: int,
|
||||
security_level: int,
|
||||
host: str,
|
||||
secret: str,
|
||||
device_name: str
|
||||
):
|
||||
"""Initialize a Widevine Content Decryption Module (CDM)."""
|
||||
if not device_type:
|
||||
raise ValueError("Device Type must be provided")
|
||||
if isinstance(device_type, str):
|
||||
device_type = Device.Types[device_type]
|
||||
if not isinstance(device_type, Device.Types):
|
||||
raise TypeError(f"Expected device_type to be a {Device.Types!r} not {device_type!r}")
|
||||
|
||||
if not system_id:
|
||||
raise ValueError("System ID must be provided")
|
||||
if not isinstance(system_id, int):
|
||||
raise TypeError(f"Expected system_id to be a {int} not {system_id!r}")
|
||||
|
||||
if not security_level:
|
||||
raise ValueError("Security Level must be provided")
|
||||
if not isinstance(security_level, int):
|
||||
raise TypeError(f"Expected security_level to be a {int} not {security_level!r}")
|
||||
|
||||
if not host:
|
||||
raise ValueError("API Host must be provided")
|
||||
if not isinstance(host, str):
|
||||
raise TypeError(f"Expected host to be a {str} not {host!r}")
|
||||
|
||||
if not secret:
|
||||
raise ValueError("API Secret must be provided")
|
||||
if not isinstance(secret, str):
|
||||
raise TypeError(f"Expected secret to be a {str} not {secret!r}")
|
||||
|
||||
if not device_name:
|
||||
raise ValueError("API Device name must be provided")
|
||||
if not isinstance(device_name, str):
|
||||
raise TypeError(f"Expected device_name to be a {str} not {device_name!r}")
|
||||
|
||||
self.device_type = device_type
|
||||
self.system_id = system_id
|
||||
self.security_level = security_level
|
||||
self.host = host
|
||||
self.device_name = device_name
|
||||
|
||||
# spoof client_id and rsa_key just so we can construct via super call
|
||||
super().__init__(device_type, system_id, security_level, ClientIdentification(), RSA.generate(2048))
|
||||
|
||||
self.__session = requests.Session()
|
||||
self.__session.headers.update({
|
||||
"X-Secret-Key": secret
|
||||
})
|
||||
|
||||
r = requests.head(self.host)
|
||||
if r.status_code != 200:
|
||||
raise ValueError(f"Could not test Remote API version [{r.status_code}]")
|
||||
server = r.headers.get("Server")
|
||||
if not server or "pywidevine serve" not in server.lower():
|
||||
raise ValueError(f"This Remote CDM API does not seem to be a pywidevine serve API ({server}).")
|
||||
server_version = re.search(r"pywidevine serve v([\d.]+)", server, re.IGNORECASE)
|
||||
if not server_version:
|
||||
raise ValueError(f"The pywidevine server API is not stating the version correctly, cannot continue.")
|
||||
server_version = server_version.group(1)
|
||||
if server_version < "1.3.0":
|
||||
raise ValueError(f"This pywidevine serve API version ({server_version}) is not supported.")
|
||||
|
||||
@classmethod
|
||||
def from_device(cls, device: Device) -> RemoteCdm:
|
||||
raise NotImplementedError("You cannot load a RemoteCdm from a local Device file.")
|
||||
|
||||
def open(self) -> bytes:
|
||||
if len(self._sessions) > self.MAX_NUM_OF_SESSIONS:
|
||||
raise TooManySessions(f"Too many Sessions open ({self.MAX_NUM_OF_SESSIONS}).")
|
||||
|
||||
r = self.__session.get(f"{self.host}/{self.device_name}/open")
|
||||
if r.status_code != 200:
|
||||
raise ValueError(f"Cannot Open CDM Session, {r.text} [{r.status_code}]")
|
||||
r = r.json()["data"]
|
||||
|
||||
session = Session()
|
||||
session.id = bytes.fromhex(r["session_id"])
|
||||
self._sessions[session.id] = session
|
||||
|
||||
if int(r["device"]["system_id"]) != self.system_id:
|
||||
raise DeviceMismatch("The System ID specified does not match the one specified in the API response.")
|
||||
|
||||
if int(r["device"]["security_level"]) != self.security_level:
|
||||
raise DeviceMismatch("The Security Level specified does not match the one specified in the API response.")
|
||||
|
||||
return session.id
|
||||
|
||||
def close(self, session_id: bytes) -> None:
|
||||
r = self.__session.get(f"{self.host}/{self.device_name}/close/{session_id.hex()}")
|
||||
if r.status_code != 200:
|
||||
raise ValueError(f"Cannot Close CDM Session, {r.text} [{r.status_code}]")
|
||||
del self._sessions[session_id]
|
||||
|
||||
def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
|
||||
session = self._sessions.get(session_id)
|
||||
if not session:
|
||||
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
|
||||
|
||||
if certificate is None:
|
||||
certificate_b64 = None
|
||||
elif isinstance(certificate, str):
|
||||
certificate_b64 = certificate # assuming base64
|
||||
elif isinstance(certificate, bytes):
|
||||
certificate_b64 = base64.b64encode(certificate).decode()
|
||||
else:
|
||||
raise DecodeError(f"Expecting Certificate to be base64 or bytes, not {certificate!r}")
|
||||
|
||||
r = self.__session.post(
|
||||
url=f"{self.host}/{self.device_name}/set_service_certificate",
|
||||
json={
|
||||
"session_id": session_id.hex(),
|
||||
"certificate": certificate_b64
|
||||
}
|
||||
)
|
||||
if r.status_code != 200:
|
||||
raise ValueError(f"Cannot Set CDMs Service Certificate, {r.text} [{r.status_code}]")
|
||||
r = r.json()["data"]
|
||||
|
||||
return r["provider_id"]
|
||||
|
||||
def get_license_challenge(
|
||||
self,
|
||||
session_id: bytes,
|
||||
init_data: Union[Container, bytes, str],
|
||||
type_: Union[int, str] = LicenseType.STREAMING,
|
||||
privacy_mode: bool = True
|
||||
) -> bytes:
|
||||
session = self._sessions.get(session_id)
|
||||
if not session:
|
||||
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
|
||||
|
||||
if not init_data:
|
||||
raise InvalidInitData("The init_data must not be empty.")
|
||||
try:
|
||||
init_data = PSSH.get_as_box(init_data).init_data
|
||||
except (ValueError, binascii.Error, DecodeError) as e:
|
||||
raise InvalidInitData(str(e))
|
||||
|
||||
try:
|
||||
if isinstance(type_, int):
|
||||
type_ = LicenseType.Name(int(type_))
|
||||
elif isinstance(type_, str):
|
||||
type_ = LicenseType.Name(LicenseType.Value(type_))
|
||||
elif isinstance(type_, LicenseType):
|
||||
type_ = LicenseType.Name(type_)
|
||||
else:
|
||||
raise InvalidLicenseType()
|
||||
except ValueError:
|
||||
raise InvalidLicenseType(f"License Type {type_!r} is invalid")
|
||||
|
||||
r = self.__session.post(
|
||||
url=f"{self.host}/{self.device_name}/challenge/{type_}",
|
||||
json={
|
||||
"session_id": session_id.hex(),
|
||||
"init_data": base64.b64encode(init_data).decode()
|
||||
}
|
||||
)
|
||||
if r.status_code != 200:
|
||||
raise ValueError(f"Cannot get Challenge, {r.text} [{r.status_code}]")
|
||||
r = r.json()["data"]
|
||||
|
||||
try:
|
||||
license_message = SignedMessage()
|
||||
license_message.ParseFromString(base64.b64decode(r["challenge_b64"]))
|
||||
except DecodeError as e:
|
||||
raise InvalidLicenseMessage(f"Failed to parse license request, {e}")
|
||||
|
||||
return license_message.SerializeToString()
|
||||
|
||||
def parse_license(self, session_id: bytes, license_message: Union[SignedMessage, bytes, str]) -> None:
|
||||
session = self._sessions.get(session_id)
|
||||
if not session:
|
||||
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
|
||||
|
||||
if not license_message:
|
||||
raise InvalidLicenseMessage("Cannot parse an empty license_message")
|
||||
|
||||
if isinstance(license_message, str):
|
||||
try:
|
||||
license_message = base64.b64decode(license_message)
|
||||
except (binascii.Error, binascii.Incomplete) as e:
|
||||
raise InvalidLicenseMessage(f"Could not decode license_message as Base64, {e}")
|
||||
|
||||
if isinstance(license_message, bytes):
|
||||
signed_message = SignedMessage()
|
||||
try:
|
||||
signed_message.ParseFromString(license_message)
|
||||
except DecodeError as e:
|
||||
raise InvalidLicenseMessage(f"Could not parse license_message as a SignedMessage, {e}")
|
||||
license_message = signed_message
|
||||
|
||||
if not isinstance(license_message, SignedMessage):
|
||||
raise InvalidLicenseMessage(f"Expecting license_response to be a SignedMessage, got {license_message!r}")
|
||||
|
||||
if license_message.type != SignedMessage.MessageType.LICENSE:
|
||||
raise InvalidLicenseMessage(
|
||||
f"Expecting a LICENSE message, not a "
|
||||
f"'{SignedMessage.MessageType.Name(license_message.type)}' message."
|
||||
)
|
||||
|
||||
licence = License()
|
||||
licence.ParseFromString(license_message.msg)
|
||||
|
||||
r = self.__session.post(
|
||||
url=f"{self.host}/{self.device_name}/keys/ALL",
|
||||
json={
|
||||
"session_id": session_id.hex(),
|
||||
"license_message": base64.b64encode(license_message.SerializeToString()).decode()
|
||||
}
|
||||
)
|
||||
if r.status_code != 200:
|
||||
raise ValueError(f"Cannot parse License, {r.text} [{r.status_code}]")
|
||||
r = r.json()["data"]
|
||||
|
||||
session.keys = [
|
||||
Key(
|
||||
type_=key["type"],
|
||||
kid=Key.kid_to_uuid(bytes.fromhex(key["key_id"])),
|
||||
key=bytes.fromhex(key["key"]),
|
||||
permissions=key["permissions"]
|
||||
)
|
||||
for key in r["keys"]
|
||||
]
|
||||
|
||||
|
||||
__ALL__ = (RemoteCdm,)
|
||||
@@ -66,7 +66,7 @@ async def open(request: web.Request) -> web.Response:
|
||||
cdm = request.app["cdms"].get((secret_key, device_name))
|
||||
if not cdm:
|
||||
device = Device.load(request.app["config"]["devices"][device_name])
|
||||
cdm = request.app["cdms"][(secret_key, device_name)] = Cdm(device)
|
||||
cdm = request.app["cdms"][(secret_key, device_name)] = Cdm.from_device(device)
|
||||
|
||||
try:
|
||||
session_id = cdm.open()
|
||||
@@ -82,8 +82,8 @@ async def open(request: web.Request) -> web.Response:
|
||||
"data": {
|
||||
"session_id": session_id.hex(),
|
||||
"device": {
|
||||
"system_id": cdm.device.system_id,
|
||||
"security_level": cdm.device.security_level
|
||||
"system_id": cdm.system_id,
|
||||
"security_level": cdm.security_level
|
||||
}
|
||||
}
|
||||
})
|
||||
@@ -116,6 +116,58 @@ async def close(request: web.Request) -> web.Response:
|
||||
})
|
||||
|
||||
|
||||
@routes.post("/{device}/set_service_certificate")
|
||||
async def set_service_certificate(request: web.Request) -> web.Response:
|
||||
secret_key = request.headers["X-Secret-Key"]
|
||||
device_name = request.match_info["device"]
|
||||
|
||||
body = await request.json()
|
||||
for required_field in ("session_id", "certificate"):
|
||||
if required_field == "certificate":
|
||||
has_field = required_field in body # it needs the key, but can be empty/null
|
||||
else:
|
||||
has_field = body.get(required_field)
|
||||
if not has_field:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": f"Missing required field '{required_field}' in JSON body."
|
||||
}, status=400)
|
||||
|
||||
# get session id
|
||||
session_id = bytes.fromhex(body["session_id"])
|
||||
|
||||
# get cdm
|
||||
cdm = request.app["cdms"].get((secret_key, device_name))
|
||||
if not cdm:
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": f"No Cdm session for {device_name} has been opened yet. No session to use."
|
||||
}, status=400)
|
||||
|
||||
if session_id not in cdm._sessions:
|
||||
# This can happen if:
|
||||
# - API server gets shutdown/restarted,
|
||||
# - The user calls /challenge before /open,
|
||||
# - The user called /open on a different IP Address
|
||||
# - The user closed the session
|
||||
return web.json_response({
|
||||
"status": 400,
|
||||
"message": "Invalid Session ID. Session ID may have Expired."
|
||||
}, status=400)
|
||||
|
||||
# set service certificate
|
||||
certificate = body.get("certificate")
|
||||
provider_id = cdm.set_service_certificate(session_id, certificate)
|
||||
|
||||
return web.json_response({
|
||||
"status": 200,
|
||||
"message": f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.",
|
||||
"data": {
|
||||
"provider_id": provider_id
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
@routes.post("/{device}/challenge/{license_type}")
|
||||
async def challenge(request: web.Request) -> web.Response:
|
||||
secret_key = request.headers["X-Secret-Key"]
|
||||
@@ -151,15 +203,12 @@ async def challenge(request: web.Request) -> web.Response:
|
||||
"message": "Invalid Session ID. Session ID may have Expired."
|
||||
}, status=400)
|
||||
|
||||
# set service certificate
|
||||
service_certificate = body.get("service_certificate")
|
||||
if request.app["config"]["force_privacy_mode"] and not service_certificate:
|
||||
# enforce service certificate (opt-in)
|
||||
if request.app["config"].get("force_privacy_mode") and not cdm._sessions[session_id].service_certificate:
|
||||
return web.json_response({
|
||||
"status": 403,
|
||||
"message": "No Service Certificate provided but Privacy Mode is Enforced."
|
||||
"message": "No Service Certificate set but Privacy Mode is Enforced."
|
||||
}, status=403)
|
||||
if service_certificate:
|
||||
cdm.set_service_certificate(session_id, service_certificate)
|
||||
|
||||
# get challenge
|
||||
license_request = cdm.get_license_challenge(
|
||||
|
||||
Reference in New Issue
Block a user