Skip to content

Commit

Permalink
Decode with PyJWK (#886)
Browse files Browse the repository at this point in the history
* Add `algorithm` property to `PyJWK`

* Allow `PyJWK` as a key when decoding JWTs.

* Use PyJWK algorithm by default when deocding JWTs.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Rename `PyJWK.algorithm` to `PyJWK.algorithm_name`

* Test `PyJWK.algorithm_name`

* Test decode with PyJWK object.

* Test implicit algorithm with PyJWK.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Flake8 fix.

* Refactor to fix a mypy error.

* Update sample usage.

* Fix test.

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
luhn and pre-commit-ci[bot] committed May 11, 2024
1 parent c0a071d commit ab8176a
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 17 deletions.
6 changes: 2 additions & 4 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,7 @@ Retrieve RSA signing keys from a JWKS endpoint
>>> signing_key = jwks_client.get_signing_key_from_jwt(token)
>>> data = jwt.decode(
... token,
... signing_key.key,
... algorithms=["RS256"],
... signing_key,
... audience="https://expenses-api",
... options={"verify_exp": False},
... )
Expand Down Expand Up @@ -356,8 +355,7 @@ is not built into pyjwt.
# now, decode_complete to get payload + header
data = jwt.decode_complete(
id_token,
key=signing_key.key,
algorithms=signing_algos,
key=signing_key,
audience=client_id,
)
payload, header = data["payload"], data["header"]
Expand Down
6 changes: 4 additions & 2 deletions jwt/api_jwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ def __init__(self, jwk_data: JWKDict, algorithm: str | None = None) -> None:
if not has_crypto and algorithm in requires_cryptography:
raise PyJWKError(f"{algorithm} requires 'cryptography' to be installed.")

self.Algorithm = self._algorithms.get(algorithm)
self.algorithm_name = algorithm

if not self.Algorithm:
if algorithm in self._algorithms:
self.Algorithm = self._algorithms[algorithm]
else:
raise PyJWKError(f"Unable to find an algorithm for key: {self._jwk_data}")

self.key = self.Algorithm.from_jwk(self._jwk_data)
Expand Down
25 changes: 16 additions & 9 deletions jwt/api_jws.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
has_crypto,
requires_cryptography,
)
from .api_jwk import PyJWK
from .exceptions import (
DecodeError,
InvalidAlgorithmError,
Expand Down Expand Up @@ -172,7 +173,7 @@ def encode(
def decode_complete(
self,
jwt: str | bytes,
key: AllowedPublicKeys | str | bytes = "",
key: AllowedPublicKeys | PyJWK | str | bytes = "",
algorithms: list[str] | None = None,
options: dict[str, Any] | None = None,
detached_payload: bytes | None = None,
Expand All @@ -190,7 +191,7 @@ def decode_complete(
merged_options = {**self.options, **options}
verify_signature = merged_options["verify_signature"]

if verify_signature and not algorithms:
if verify_signature and not algorithms and not isinstance(key, PyJWK):
raise DecodeError(
'It is required that you pass in a value for the "algorithms" argument when calling decode().'
)
Expand All @@ -217,7 +218,7 @@ def decode_complete(
def decode(
self,
jwt: str | bytes,
key: AllowedPublicKeys | str | bytes = "",
key: AllowedPublicKeys | PyJWK | str | bytes = "",
algorithms: list[str] | None = None,
options: dict[str, Any] | None = None,
detached_payload: bytes | None = None,
Expand Down Expand Up @@ -289,9 +290,11 @@ def _verify_signature(
signing_input: bytes,
header: dict[str, Any],
signature: bytes,
key: AllowedPublicKeys | str | bytes = "",
key: AllowedPublicKeys | PyJWK | str | bytes = "",
algorithms: list[str] | None = None,
) -> None:
if algorithms is None and isinstance(key, PyJWK):
algorithms = [key.algorithm_name]
try:
alg = header["alg"]
except KeyError:
Expand All @@ -300,11 +303,15 @@ def _verify_signature(
if not alg or (algorithms is not None and alg not in algorithms):
raise InvalidAlgorithmError("The specified alg value is not allowed")

try:
alg_obj = self.get_algorithm_by_name(alg)
except NotImplementedError as e:
raise InvalidAlgorithmError("Algorithm not supported") from e
prepared_key = alg_obj.prepare_key(key)
if isinstance(key, PyJWK):
alg_obj = key.Algorithm
prepared_key = key.key
else:
try:
alg_obj = self.get_algorithm_by_name(alg)
except NotImplementedError as e:
raise InvalidAlgorithmError("Algorithm not supported") from e
prepared_key = alg_obj.prepare_key(key)

if not alg_obj.verify(signing_input, prepared_key, signature):
raise InvalidSignatureError("Signature verification failed")
Expand Down
5 changes: 3 additions & 2 deletions jwt/api_jwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

if TYPE_CHECKING:
from .algorithms import AllowedPrivateKeys, AllowedPublicKeys
from .api_jwk import PyJWK


class PyJWT:
Expand Down Expand Up @@ -100,7 +101,7 @@ def _encode_payload(
def decode_complete(
self,
jwt: str | bytes,
key: AllowedPublicKeys | str | bytes = "",
key: AllowedPublicKeys | PyJWK | str | bytes = "",
algorithms: list[str] | None = None,
options: dict[str, Any] | None = None,
# deprecated arg, remove in pyjwt3
Expand Down Expand Up @@ -185,7 +186,7 @@ def _decode_payload(self, decoded: dict[str, Any]) -> Any:
def decode(
self,
jwt: str | bytes,
key: AllowedPublicKeys | str | bytes = "",
key: AllowedPublicKeys | PyJWK | str | bytes = "",
algorithms: list[str] | None = None,
options: dict[str, Any] | None = None,
# deprecated arg, remove in pyjwt3
Expand Down
9 changes: 9 additions & 0 deletions tests/test_api_jwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def test_should_load_key_without_alg_from_dict(self):
assert jwk.key_type == "RSA"
assert isinstance(jwk.Algorithm, RSAAlgorithm)
assert jwk.Algorithm.hash_alg == RSAAlgorithm.SHA256
assert jwk.algorithm_name == "RS256"

@crypto_required
def test_should_load_key_from_dict_with_algorithm(self):
Expand All @@ -76,6 +77,7 @@ def test_should_load_key_from_dict_with_algorithm(self):
assert jwk.key_type == "RSA"
assert isinstance(jwk.Algorithm, RSAAlgorithm)
assert jwk.Algorithm.hash_alg == RSAAlgorithm.SHA256
assert jwk.algorithm_name == "RS256"

@crypto_required
def test_should_load_key_ec_p256_from_dict(self):
Expand All @@ -87,6 +89,7 @@ def test_should_load_key_ec_p256_from_dict(self):
assert jwk.key_type == "EC"
assert isinstance(jwk.Algorithm, ECAlgorithm)
assert jwk.Algorithm.hash_alg == ECAlgorithm.SHA256
assert jwk.algorithm_name == "ES256"

@crypto_required
def test_should_load_key_ec_p384_from_dict(self):
Expand All @@ -98,6 +101,7 @@ def test_should_load_key_ec_p384_from_dict(self):
assert jwk.key_type == "EC"
assert isinstance(jwk.Algorithm, ECAlgorithm)
assert jwk.Algorithm.hash_alg == ECAlgorithm.SHA384
assert jwk.algorithm_name == "ES384"

@crypto_required
def test_should_load_key_ec_p521_from_dict(self):
Expand All @@ -109,6 +113,7 @@ def test_should_load_key_ec_p521_from_dict(self):
assert jwk.key_type == "EC"
assert isinstance(jwk.Algorithm, ECAlgorithm)
assert jwk.Algorithm.hash_alg == ECAlgorithm.SHA512
assert jwk.algorithm_name == "ES512"

@crypto_required
def test_should_load_key_ec_secp256k1_from_dict(self):
Expand All @@ -120,6 +125,7 @@ def test_should_load_key_ec_secp256k1_from_dict(self):
assert jwk.key_type == "EC"
assert isinstance(jwk.Algorithm, ECAlgorithm)
assert jwk.Algorithm.hash_alg == ECAlgorithm.SHA256
assert jwk.algorithm_name == "ES256K"

@crypto_required
def test_should_load_key_hmac_from_dict(self):
Expand All @@ -131,6 +137,7 @@ def test_should_load_key_hmac_from_dict(self):
assert jwk.key_type == "oct"
assert isinstance(jwk.Algorithm, HMACAlgorithm)
assert jwk.Algorithm.hash_alg == HMACAlgorithm.SHA256
assert jwk.algorithm_name == "HS256"

@crypto_required
def test_should_load_key_hmac_without_alg_from_dict(self):
Expand All @@ -143,6 +150,7 @@ def test_should_load_key_hmac_without_alg_from_dict(self):
assert jwk.key_type == "oct"
assert isinstance(jwk.Algorithm, HMACAlgorithm)
assert jwk.Algorithm.hash_alg == HMACAlgorithm.SHA256
assert jwk.algorithm_name == "HS256"

@crypto_required
def test_should_load_key_okp_without_alg_from_dict(self):
Expand All @@ -153,6 +161,7 @@ def test_should_load_key_okp_without_alg_from_dict(self):

assert jwk.key_type == "OKP"
assert isinstance(jwk.Algorithm, OKPAlgorithm)
assert jwk.algorithm_name == "EdDSA"

@crypto_required
def test_from_dict_should_throw_exception_if_arg_is_invalid(self):
Expand Down
54 changes: 54 additions & 0 deletions tests/test_api_jws.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest

from jwt.algorithms import NoneAlgorithm, has_crypto
from jwt.api_jwk import PyJWK
from jwt.api_jws import PyJWS
from jwt.exceptions import (
DecodeError,
Expand Down Expand Up @@ -250,6 +251,59 @@ def test_decodes_complete_valid_jws(self, jws, payload):
),
}

def test_decodes_with_jwk(self, jws, payload):
jwk = PyJWK(
{
"kty": "oct",
"alg": "HS256",
"k": "c2VjcmV0", # "secret"
}
)
example_jws = (
b"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9."
b"aGVsbG8gd29ybGQ."
b"gEW0pdU4kxPthjtehYdhxB9mMOGajt1xCKlGGXDJ8PM"
)

decoded_payload = jws.decode(example_jws, jwk, algorithms=["HS256"])

assert decoded_payload == payload

def test_decodes_with_jwk_and_no_algorithm(self, jws, payload):
jwk = PyJWK(
{
"kty": "oct",
"alg": "HS256",
"k": "c2VjcmV0", # "secret"
}
)
example_jws = (
b"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9."
b"aGVsbG8gd29ybGQ."
b"gEW0pdU4kxPthjtehYdhxB9mMOGajt1xCKlGGXDJ8PM"
)

decoded_payload = jws.decode(example_jws, jwk)

assert decoded_payload == payload

def test_decodes_with_jwk_and_mismatched_algorithm(self, jws, payload):
jwk = PyJWK(
{
"kty": "oct",
"alg": "HS512",
"k": "c2VjcmV0", # "secret"
}
)
example_jws = (
b"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9."
b"aGVsbG8gd29ybGQ."
b"gEW0pdU4kxPthjtehYdhxB9mMOGajt1xCKlGGXDJ8PM"
)

with pytest.raises(InvalidAlgorithmError):
jws.decode(example_jws, jwk)

# 'Control' Elliptic Curve jws created by another library.
# Used to test for regressions that could affect both
# encoding / decoding operations equally (causing tests
Expand Down

0 comments on commit ab8176a

Please sign in to comment.