import logging
from inspect import Parameter, Signature, signature
from os import environ
from flask import abort
from jose import JWTError, jwt
from makefun import add_signature_parameters, wraps
from werkzeug.exceptions import HTTPException
logger = logging.getLogger("NovaAPILogger")
ALLOWED_JWT_ALGORITHMS = ["HS256", "HS384", "HS512",
"RS256", "RS384", "RS512",
"ES256", "ES384", "ES512"]
[docs]def read_decode_algorithms() -> list:
"""
Reads the `JWT_ALGORITHMS` environment variable and builds the list \
that should be used as the decoding algorithms for the JWT tokens.
This function also validates that the algorithm informed is valid \
and supported and ignores it otherwise.
:return: The algorithms list
"""
algorithms = environ.get("JWT_ALGORITHMS", "HS256").strip().split(',')
algorithms = [alg.strip() for alg in algorithms
if alg.strip() in ALLOWED_JWT_ALGORITHMS]
return algorithms
JWT_ALGORITHMS = read_decode_algorithms()
JWT_SECRET = environ.get('JWT_SECRET', "1234567890a")
# pylint: disable=R1710
[docs]def decode_jwt_token(token: str) -> dict:
"""Function to decode JWT token.
Decodes JWT Token using JWT_SECRET environment variable as key. During \
decoding the JWS(signature) is validated along with exp(expiry timestamp),\
iat(issued at timestamp) and nbf(not before) claims. No other claims are \
validated and they should be validated with the validate_jwt_claims \
decorator.
:param token: JWT token base64 encoded and signed
:return: Dictionary of token claims if JWT is valid or raises \
unauthorized if invalid
"""
try:
logger.info("Decoding Token %s", token)
return jwt.decode(
token, JWT_SECRET, algorithms=JWT_ALGORITHMS,
options={'verify_aud': False, 'verify_iss': False,
'verify_sub': False, 'verify_jti': False,
'verify_at_hash': False})
except JWTError:
logger.error("Unauthorized request!", exc_info=True)
unauthorize()
# pylint: disable=W0613
[docs]def unauthorize(*args, **kwargs) -> HTTPException:
"""Aborts API call with status code 401 and message "Unauthorized"
This functions takes any argument as it is used by the auth decorators.
:return: HTTPException with status code 401
"""
return abort(401, "Unauthorized")
[docs]def validate_jwt_claims(add_token_info: bool = False, claims=None):
"""Decorator to authenticate and authorize access to API endpoint
Checks if the received claims are present in token_info and if they match \
the necessary values. If the decorated function doesn't have a keyword \
argument `token_info`, it will be added by the decorator.
Example:
In the following example, if the `token_info` doesn't have the iss \
claim with value "novaweb", `my_endpoint` won't be called. ::
@validate_jwt_claims(claims = {"iss":"novaweb"})
def my_endpoint():
...
:param claims: Dictionary with the necessary claims to authorize the use\
of the endpoint
:param add_token_info: If set to true, token_info will be passed to \
decorated function as `token_info` keyword argument.
:return: Decorated function if token contains correct claims or \
unauthorize.
"""
if claims is None:
claims = {}
def make_call(function):
new_func_sig = _check_and_update_signature(signature(function))
@wraps(function, new_sig=new_func_sig)
def wrapper(*args, **kwargs):
token_info = kwargs.get('token_info', None)
if not _check_claims(token_info, claims):
return unauthorize()
logger.info("Validated claims on call to %s "
"with token %s and claims: %s",
function, token_info, kwargs)
if not add_token_info:
kwargs.pop("token_info")
return function(*args, **kwargs)
return wrapper
return make_call
def _check_and_update_signature(func_sig: Signature):
"""
Checks that the function signature includes token_info parameter \
and includes it if not.
:param func_sig: Original signature of the function.
:return: Signature of the function with the token_info parameter.
"""
new_sig = func_sig
if not func_sig.parameters.get('token_info', None):
token_info_param = Parameter('token_info',
kind=Parameter.KEYWORD_ONLY,
default=None)
new_sig = add_signature_parameters(func_sig,
last=[token_info_param])
return new_sig
def _check_claims(token_info: dict, claims: dict) -> bool:
"""
Checks that token_info is a dict of claims and that contains the \
claims in the claims dict.
:param token_info: Claims of the received token.
:param claims: Claims to check on the received token.
:return: True if claims are valid and False otherwise.
"""
if not token_info or not isinstance(token_info, dict):
logger.error("Token info not received in validate_jwt_claims!")
return False
for claim_name, claim_value in claims.items():
if token_info.get(claim_name, None) != claim_value:
logger.error("Token claim %s wih value "
"%s doesn't match expected "
"value %s!",
claim_name,
token_info.get(claim_name, None),
claim_value)
return False
return True