12 Commits

Author SHA1 Message Date
rlaphoenix
f09a06857a Update Changelog for v1.3.1 2022-08-04 08:39:50 +01:00
rlaphoenix
e4f6a23725 Bump to v1.3.1 2022-08-04 08:39:42 +01:00
rlaphoenix
f21a21712b RemoteCdm: Improve Server Version testing
Some systems like Caddy or Nginx will prefix their own word to the Server header, e.g., `Caddy, pywidevine server v1.2.3` so I had to change a fair bit of the code to have wider compatibility across some unknowns that may occur with the Serve header.
2022-08-04 08:33:33 +01:00
rlaphoenix
a1494a3742 Allow specification of Cdm device_type as string 2022-08-04 08:26:41 +01:00
rlaphoenix
5b13e1a689 serve: Don't require force_privacy_mode to be defined on config 2022-08-04 08:22:06 +01:00
rlaphoenix
c9288dc391 Update Changelog for v1.3.0 2022-08-04 05:56:47 +01:00
rlaphoenix
7640d6fcab Bump to v1.3.0 2022-08-04 05:56:38 +01:00
rlaphoenix
3d794ad659 RemoteCdm: Implement /set_service_certificate 2022-08-04 05:54:15 +01:00
rlaphoenix
5788dde7b1 serve: Implement /set_service_certificate
Removed service certificate setting related code from /challenge.
2022-08-04 05:54:15 +01:00
rlaphoenix
ddf755f82f Cdm: Add ability to unset certificate via set_service_certificate()
To unset, just provide `None` as the certificate param.
2022-08-04 05:43:10 +01:00
rlaphoenix
e8785fcd84 Create RemoteCdm class as Client code for the serve feature
This can be considered the Client-side code for the `serve` feature.

The RemoteCdm object can be used with the same underlying interface as the normal `Cdm` object. Including stuff like .open(), .get_license_challenge(), .decrypt(), even same access to data like `cdm.system_id`, or even `cdm._sessions` just like normal.

However, since we don't have any private key and client ID, we spoof the super construction with dummy data. You wont have access to any data that uses the underlying Client ID and Private Key like the signer or decrypter. Any Cdm code trying to access them on RemoteCdm will fail.
2022-08-04 05:43:10 +01:00
rlaphoenix
c969d80931 Cdm: Change construction interface to allow manual creation
This is so you can construct a Cdm object without using `.wvd` files (nor the Device class). It also improves enforcement of some required data from the Device. The underlying Device object is discarded for it's data as it won't be required.

Note that the Client ID and Private Key related variables are now stored as private `__var` variables to further amplify their private nature and to really discourage manual read write. This is not impossible to workaround in Python but further discourages manual read/writes to the variable that could cause serious issues.

The RSA Key is also no longer stored as-is. It is now stored as PSS and PKCS1_OAEP objects, as they will be used like so. This makes it even more annoying to directly read/write the RSA key (but not impossible).
2022-08-04 04:52:26 +01:00
8 changed files with 433 additions and 29 deletions

View File

@@ -5,6 +5,42 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.3.1] - 2022-08-04
### Added
- Cdm and RemoteCdm can now be supplied a string value for `device_type` for scenarios where providing it as a string
is more convenient (e.g., from Config files).
### Fixed
- The `force_privacy_mode` key no longer needs to be defined at all in the configuration file. This was previously
crashing serve APIs if it wasn't set before starting.
- RemoteCdm's Server version check will no longer fail under certain serving conditions e.g., Caddy prepending `Caddy`
to the Server header value. It also fixes case sensitivity and removed the full url from the header.
## [1.3.0] - 2022-08-04
### Added
- New RemoteCdm class to be used as Client code for the `serve` Remote CDM API server. The RemoteCdm should be used
entirely separately from the normal Cdm class. All serve APIs must update to v1.3.0 to be compatible. The RemoteCdm
verifies the server version to ensure compatibility. Changes to the serve API schema will be immediately reflected in
the RemoteCdm code in the future.
- Implemented `/set_service_certificate` endpoint in serve schema as an improved way of setting the service certificate
than passing it to `/challenge`.
- You can now unset the service certificate by providing an empty service certificate value (or None or null). This
includes support for doing so even in serve API and the new RemoteCdm.
### Changed
- The Construction of the Cdm object has changed. You can now initialize it with more direct values if you don't want
to use the Device class or don't want to use `.wvd` files. To use Device classes, you must now use the
`Cdm.from_device()` class method.
- The ability to pass the certificate to `/challenge` has been removed. Please use the new `/set_service_certificate`
endpoint before calling `/challenge`. You do not need to set it every time. Once per session is enough unless you
now want to use a different certificate.
## [1.2.1] - 2022-08-02
This release is primarily a maintenance release for `serve` functionality but some Cdm fixes are also present.
@@ -143,6 +179,8 @@ This release is primarily a maintenance release for `serve` functionality but so
Initial Release.
[1.3.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.3.1
[1.3.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.3.0
[1.2.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.2.1
[1.2.0]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.2.0
[1.1.1]: https://github.com/rlaphoenix/pywidevine/releases/tag/v1.1.1

View File

@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "pywidevine"
version = "1.2.1"
version = "1.3.1"
description = "Widevine CDM (Content Decryption Module) implementation in Python."
authors = ["rlaphoenix <rlaphoenix@pm.me>"]
license = "GPL-3.0-only"

View File

@@ -1 +1 @@
__version__ = "1.2.1"
__version__ = "1.3.1"

View File

@@ -1,3 +1,5 @@
from __future__ import annotations
import base64
import binascii
import random
@@ -62,14 +64,63 @@ class Cdm:
MAX_NUM_OF_SESSIONS = 50 # most common limit
def __init__(self, device: Device):
def __init__(
self,
device_type: Union[Device.Types, str],
system_id: int,
security_level: int,
client_id: ClientIdentification,
rsa_key: RSA.RsaKey
):
"""Initialize a Widevine Content Decryption Module (CDM)."""
if not device:
raise ValueError("A Widevine Device must be provided.")
self.device = device
if not device_type:
raise ValueError("Device Type must be provided")
if isinstance(device_type, str):
device_type = Device.Types[device_type]
if not isinstance(device_type, Device.Types):
raise TypeError(f"Expected device_type to be a {Device.Types!r} not {device_type!r}")
if not system_id:
raise ValueError("System ID must be provided")
if not isinstance(system_id, int):
raise TypeError(f"Expected system_id to be a {int} not {system_id!r}")
if not security_level:
raise ValueError("Security Level must be provided")
if not isinstance(security_level, int):
raise TypeError(f"Expected security_level to be a {int} not {security_level!r}")
if not client_id:
raise ValueError("Client ID must be provided")
if not isinstance(client_id, ClientIdentification):
raise TypeError(f"Expected client_id to be a {ClientIdentification} not {client_id!r}")
if not rsa_key:
raise ValueError("RSA Key must be provided")
if not isinstance(rsa_key, RSA.RsaKey):
raise TypeError(f"Expected rsa_key to be a {RSA.RsaKey} not {rsa_key!r}")
self.device_type = device_type
self.system_id = system_id
self.security_level = security_level
self.__client_id = client_id
self.__signer = pss.new(rsa_key)
self.__decrypter = PKCS1_OAEP.new(rsa_key)
self._sessions: dict[bytes, Session] = {}
@classmethod
def from_device(cls, device: Device) -> Cdm:
"""Initialize a Widevine CDM from a Widevine Device (.wvd) file."""
return cls(
device_type=device.type,
system_id=device.system_id,
security_level=device.security_level,
client_id=device.client_id,
rsa_key=device.private_key
)
def open(self) -> bytes:
"""
Open a Widevine Content Decryption Module (CDM) session.
@@ -100,7 +151,7 @@ class Cdm:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
del self._sessions[session_id]
def set_service_certificate(self, session_id: bytes, certificate: Union[bytes, str]) -> str:
def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
"""
Set a Service Privacy Certificate for Privacy Mode. (optional but recommended)
@@ -116,7 +167,8 @@ class Cdm:
session_id: Session identifier.
certificate: SignedDrmCertificate (or SignedMessage containing one) in Base64
or Bytes form obtained from the Service. Some services have their own,
but most use the common privacy cert, (common_privacy_cert).
but most use the common privacy cert, (common_privacy_cert). If None, it
will remove the current certificate.
Raises:
InvalidSession: If the Session identifier is invalid.
@@ -126,11 +178,18 @@ class Cdm:
match the underlying DrmCertificate.
Returns the Service Provider ID of the verified DrmCertificate if successful.
If certificate is None, it will return the now unset certificate's Provider ID.
"""
session = self._sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
if certificate is None:
drm_certificate = DrmCertificate()
drm_certificate.ParseFromString(session.service_certificate.drm_certificate)
session.service_certificate = None
return drm_certificate.provider_id
if isinstance(certificate, str):
try:
certificate = base64.b64decode(certificate) # assuming base64
@@ -238,19 +297,16 @@ class Cdm:
if session.service_certificate and privacy_mode:
# encrypt the client id for privacy mode
license_request.encrypted_client_id.CopyFrom(self.encrypt_client_id(
client_id=self.device.client_id,
client_id=self.__client_id,
service_certificate=session.service_certificate
))
else:
license_request.client_id.CopyFrom(self.device.client_id)
license_request.client_id.CopyFrom(self.__client_id)
license_message = SignedMessage()
license_message.type = SignedMessage.MessageType.LICENSE_REQUEST
license_message.msg = license_request.SerializeToString()
license_message.signature = pss. \
new(self.device.private_key). \
sign(SHA1.new(license_message.msg))
license_message.signature = self.__signer.sign(SHA1.new(license_message.msg))
session.context[request_id] = self.derive_context(license_message.msg)
@@ -315,11 +371,10 @@ class Cdm:
if not context:
raise InvalidContext("Cannot parse a license message without first making a license request")
session_key = PKCS1_OAEP. \
new(self.device.private_key). \
decrypt(license_message.session_key)
enc_key, mac_key_server, _ = self.derive_keys(*context, session_key)
enc_key, mac_key_server, _ = self.derive_keys(
*context,
key=self.__decrypter.decrypt(license_message.session_key)
)
computed_signature = HMAC. \
new(mac_key_server, digestmod=SHA256). \

View File

@@ -32,3 +32,7 @@ class SignatureMismatch(PyWidevineException):
class NoKeysLoaded(PyWidevineException):
"""No License was parsed for this Session, No Keys available."""
class DeviceMismatch(PyWidevineException):
"""The Remote CDMs Device information and the APIs Device information did not match."""

View File

@@ -68,7 +68,7 @@ def license_(device: Path, pssh: str, server: str, type_: str, privacy: bool):
log.debug(device)
# load cdm
cdm = Cdm(device)
cdm = Cdm.from_device(device)
log.info(f"[+] Loaded CDM")
log.debug(cdm)

258
pywidevine/remotecdm.py Normal file
View File

@@ -0,0 +1,258 @@
from __future__ import annotations
import base64
import binascii
import re
from typing import Union, Optional
import requests
from Crypto.PublicKey import RSA
from construct import Container
from google.protobuf.message import DecodeError
from pywidevine.cdm import Cdm
from pywidevine.device import Device
from pywidevine.exceptions import InvalidSession, InvalidInitData, InvalidLicenseType, TooManySessions, \
InvalidLicenseMessage, DeviceMismatch
from pywidevine.key import Key
from pywidevine.license_protocol_pb2 import LicenseType, SignedMessage, License, ClientIdentification
from pywidevine.pssh import PSSH
from pywidevine.session import Session
class RemoteCdm(Cdm):
"""Remote Accessible CDM using pywidevine's serve schema."""
def __init__(
self,
device_type: Union[Device.Types, str],
system_id: int,
security_level: int,
host: str,
secret: str,
device_name: str
):
"""Initialize a Widevine Content Decryption Module (CDM)."""
if not device_type:
raise ValueError("Device Type must be provided")
if isinstance(device_type, str):
device_type = Device.Types[device_type]
if not isinstance(device_type, Device.Types):
raise TypeError(f"Expected device_type to be a {Device.Types!r} not {device_type!r}")
if not system_id:
raise ValueError("System ID must be provided")
if not isinstance(system_id, int):
raise TypeError(f"Expected system_id to be a {int} not {system_id!r}")
if not security_level:
raise ValueError("Security Level must be provided")
if not isinstance(security_level, int):
raise TypeError(f"Expected security_level to be a {int} not {security_level!r}")
if not host:
raise ValueError("API Host must be provided")
if not isinstance(host, str):
raise TypeError(f"Expected host to be a {str} not {host!r}")
if not secret:
raise ValueError("API Secret must be provided")
if not isinstance(secret, str):
raise TypeError(f"Expected secret to be a {str} not {secret!r}")
if not device_name:
raise ValueError("API Device name must be provided")
if not isinstance(device_name, str):
raise TypeError(f"Expected device_name to be a {str} not {device_name!r}")
self.device_type = device_type
self.system_id = system_id
self.security_level = security_level
self.host = host
self.device_name = device_name
# spoof client_id and rsa_key just so we can construct via super call
super().__init__(device_type, system_id, security_level, ClientIdentification(), RSA.generate(2048))
self.__session = requests.Session()
self.__session.headers.update({
"X-Secret-Key": secret
})
r = requests.head(self.host)
if r.status_code != 200:
raise ValueError(f"Could not test Remote API version [{r.status_code}]")
server = r.headers.get("Server")
if not server or "pywidevine serve" not in server.lower():
raise ValueError(f"This Remote CDM API does not seem to be a pywidevine serve API ({server}).")
server_version = re.search(r"pywidevine serve v([\d.]+)", server, re.IGNORECASE)
if not server_version:
raise ValueError(f"The pywidevine server API is not stating the version correctly, cannot continue.")
server_version = server_version.group(1)
if server_version < "1.3.0":
raise ValueError(f"This pywidevine serve API version ({server_version}) is not supported.")
@classmethod
def from_device(cls, device: Device) -> RemoteCdm:
raise NotImplementedError("You cannot load a RemoteCdm from a local Device file.")
def open(self) -> bytes:
if len(self._sessions) > self.MAX_NUM_OF_SESSIONS:
raise TooManySessions(f"Too many Sessions open ({self.MAX_NUM_OF_SESSIONS}).")
r = self.__session.get(f"{self.host}/{self.device_name}/open")
if r.status_code != 200:
raise ValueError(f"Cannot Open CDM Session, {r.text} [{r.status_code}]")
r = r.json()["data"]
session = Session()
session.id = bytes.fromhex(r["session_id"])
self._sessions[session.id] = session
if int(r["device"]["system_id"]) != self.system_id:
raise DeviceMismatch("The System ID specified does not match the one specified in the API response.")
if int(r["device"]["security_level"]) != self.security_level:
raise DeviceMismatch("The Security Level specified does not match the one specified in the API response.")
return session.id
def close(self, session_id: bytes) -> None:
r = self.__session.get(f"{self.host}/{self.device_name}/close/{session_id.hex()}")
if r.status_code != 200:
raise ValueError(f"Cannot Close CDM Session, {r.text} [{r.status_code}]")
del self._sessions[session_id]
def set_service_certificate(self, session_id: bytes, certificate: Optional[Union[bytes, str]]) -> str:
session = self._sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
if certificate is None:
certificate_b64 = None
elif isinstance(certificate, str):
certificate_b64 = certificate # assuming base64
elif isinstance(certificate, bytes):
certificate_b64 = base64.b64encode(certificate).decode()
else:
raise DecodeError(f"Expecting Certificate to be base64 or bytes, not {certificate!r}")
r = self.__session.post(
url=f"{self.host}/{self.device_name}/set_service_certificate",
json={
"session_id": session_id.hex(),
"certificate": certificate_b64
}
)
if r.status_code != 200:
raise ValueError(f"Cannot Set CDMs Service Certificate, {r.text} [{r.status_code}]")
r = r.json()["data"]
return r["provider_id"]
def get_license_challenge(
self,
session_id: bytes,
init_data: Union[Container, bytes, str],
type_: Union[int, str] = LicenseType.STREAMING,
privacy_mode: bool = True
) -> bytes:
session = self._sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
if not init_data:
raise InvalidInitData("The init_data must not be empty.")
try:
init_data = PSSH.get_as_box(init_data).init_data
except (ValueError, binascii.Error, DecodeError) as e:
raise InvalidInitData(str(e))
try:
if isinstance(type_, int):
type_ = LicenseType.Name(int(type_))
elif isinstance(type_, str):
type_ = LicenseType.Name(LicenseType.Value(type_))
elif isinstance(type_, LicenseType):
type_ = LicenseType.Name(type_)
else:
raise InvalidLicenseType()
except ValueError:
raise InvalidLicenseType(f"License Type {type_!r} is invalid")
r = self.__session.post(
url=f"{self.host}/{self.device_name}/challenge/{type_}",
json={
"session_id": session_id.hex(),
"init_data": base64.b64encode(init_data).decode()
}
)
if r.status_code != 200:
raise ValueError(f"Cannot get Challenge, {r.text} [{r.status_code}]")
r = r.json()["data"]
try:
license_message = SignedMessage()
license_message.ParseFromString(base64.b64decode(r["challenge_b64"]))
except DecodeError as e:
raise InvalidLicenseMessage(f"Failed to parse license request, {e}")
return license_message.SerializeToString()
def parse_license(self, session_id: bytes, license_message: Union[SignedMessage, bytes, str]) -> None:
session = self._sessions.get(session_id)
if not session:
raise InvalidSession(f"Session identifier {session_id!r} is invalid.")
if not license_message:
raise InvalidLicenseMessage("Cannot parse an empty license_message")
if isinstance(license_message, str):
try:
license_message = base64.b64decode(license_message)
except (binascii.Error, binascii.Incomplete) as e:
raise InvalidLicenseMessage(f"Could not decode license_message as Base64, {e}")
if isinstance(license_message, bytes):
signed_message = SignedMessage()
try:
signed_message.ParseFromString(license_message)
except DecodeError as e:
raise InvalidLicenseMessage(f"Could not parse license_message as a SignedMessage, {e}")
license_message = signed_message
if not isinstance(license_message, SignedMessage):
raise InvalidLicenseMessage(f"Expecting license_response to be a SignedMessage, got {license_message!r}")
if license_message.type != SignedMessage.MessageType.LICENSE:
raise InvalidLicenseMessage(
f"Expecting a LICENSE message, not a "
f"'{SignedMessage.MessageType.Name(license_message.type)}' message."
)
licence = License()
licence.ParseFromString(license_message.msg)
r = self.__session.post(
url=f"{self.host}/{self.device_name}/keys/ALL",
json={
"session_id": session_id.hex(),
"license_message": base64.b64encode(license_message.SerializeToString()).decode()
}
)
if r.status_code != 200:
raise ValueError(f"Cannot parse License, {r.text} [{r.status_code}]")
r = r.json()["data"]
session.keys = [
Key(
type_=key["type"],
kid=Key.kid_to_uuid(bytes.fromhex(key["key_id"])),
key=bytes.fromhex(key["key"]),
permissions=key["permissions"]
)
for key in r["keys"]
]
__ALL__ = (RemoteCdm,)

View File

@@ -66,7 +66,7 @@ async def open(request: web.Request) -> web.Response:
cdm = request.app["cdms"].get((secret_key, device_name))
if not cdm:
device = Device.load(request.app["config"]["devices"][device_name])
cdm = request.app["cdms"][(secret_key, device_name)] = Cdm(device)
cdm = request.app["cdms"][(secret_key, device_name)] = Cdm.from_device(device)
try:
session_id = cdm.open()
@@ -82,8 +82,8 @@ async def open(request: web.Request) -> web.Response:
"data": {
"session_id": session_id.hex(),
"device": {
"system_id": cdm.device.system_id,
"security_level": cdm.device.security_level
"system_id": cdm.system_id,
"security_level": cdm.security_level
}
}
})
@@ -116,6 +116,58 @@ async def close(request: web.Request) -> web.Response:
})
@routes.post("/{device}/set_service_certificate")
async def set_service_certificate(request: web.Request) -> web.Response:
secret_key = request.headers["X-Secret-Key"]
device_name = request.match_info["device"]
body = await request.json()
for required_field in ("session_id", "certificate"):
if required_field == "certificate":
has_field = required_field in body # it needs the key, but can be empty/null
else:
has_field = body.get(required_field)
if not has_field:
return web.json_response({
"status": 400,
"message": f"Missing required field '{required_field}' in JSON body."
}, status=400)
# get session id
session_id = bytes.fromhex(body["session_id"])
# get cdm
cdm = request.app["cdms"].get((secret_key, device_name))
if not cdm:
return web.json_response({
"status": 400,
"message": f"No Cdm session for {device_name} has been opened yet. No session to use."
}, status=400)
if session_id not in cdm._sessions:
# This can happen if:
# - API server gets shutdown/restarted,
# - The user calls /challenge before /open,
# - The user called /open on a different IP Address
# - The user closed the session
return web.json_response({
"status": 400,
"message": "Invalid Session ID. Session ID may have Expired."
}, status=400)
# set service certificate
certificate = body.get("certificate")
provider_id = cdm.set_service_certificate(session_id, certificate)
return web.json_response({
"status": 200,
"message": f"Successfully {['set', 'unset'][not certificate]} the Service Certificate.",
"data": {
"provider_id": provider_id
}
})
@routes.post("/{device}/challenge/{license_type}")
async def challenge(request: web.Request) -> web.Response:
secret_key = request.headers["X-Secret-Key"]
@@ -151,15 +203,12 @@ async def challenge(request: web.Request) -> web.Response:
"message": "Invalid Session ID. Session ID may have Expired."
}, status=400)
# set service certificate
service_certificate = body.get("service_certificate")
if request.app["config"]["force_privacy_mode"] and not service_certificate:
# enforce service certificate (opt-in)
if request.app["config"].get("force_privacy_mode") and not cdm._sessions[session_id].service_certificate:
return web.json_response({
"status": 403,
"message": "No Service Certificate provided but Privacy Mode is Enforced."
"message": "No Service Certificate set but Privacy Mode is Enforced."
}, status=403)
if service_certificate:
cdm.set_service_certificate(session_id, service_certificate)
# get challenge
license_request = cdm.get_license_challenge(