Source code for nova_api.auth

import logging
from inspect import signature, Parameter

from makefun import wraps, add_signature_parameters
from flask import abort
from jose import jwt, JWTError
from werkzeug.exceptions import HTTPException

from nova_api import JWT_SECRET

logger = logging.getLogger(__name__)


[docs]def decode_jwt_token(token: str) -> dict: """Function to decode JWT token. Decodes JWT Token using JWT_SECRET environment variable as key. During \ decoding the JWS(signature) is validated along with exp(expiry timestamp),\ iat(issued at timestamp) and nbf(not before) claims. No other claims are \ validated and they should be validated with the validate_jwt_claims \ decorator. :param token: JWT token base64 encoded and signed :return: Dictionary of token claims if JWT is valid or raises \ unauthorized if invalid """ try: logger.info(f"Decoding Token {token}") return jwt.decode( token, JWT_SECRET, algorithms=['HS256'], options={'verify_aud': False, 'verify_iss': False, 'verify_sub': False, 'verify_jti': False, 'verify_at_hash': False}) except JWTError: logger.error("Unauthorized request!", exc_info=True) unauthorize()
# pylint: disable=W0613
[docs]def unauthorize(*args, **kwargs) -> HTTPException: """Aborts API call with status code 401 and message "Unauthorized" This functions takes any argument as it is used by the auth decorators. :return: HTTPException with status code 401 """ return abort(401, "Unauthorized")
[docs]def validate_jwt_claims(add_token_info: bool = False, claims={}): """Decorator to authenticate and authorize access to API endpoint Checks if the received claims are present in token_info and if they match \ the necessary values. If the decorated function doesn't have a keyword \ argument `token_info`, it will be added by the decorator. Example: In the following example, if the `token_info` doesn't have the iss \ claim with value "novaweb", `my_endpoint` won't be called. :: @validate_jwt_claims(claims = {iss="novaweb"}) def my_endpoint(): ... :param claims: Dictionary with the necessary claims to authorize the use\ of the endpoint :param add_token_info: If set to true, token_info will be passed to \ decorated function as `token_info` keyword argument. :return: Decorated function if token contains correct claims or \ unauthorize. """ def make_call(function): func_sig = signature(function) new_sig = func_sig if not func_sig.parameters.get('token_info', None): token_info_param = Parameter('token_info', kind=Parameter.KEYWORD_ONLY, default=None) new_sig = add_signature_parameters(func_sig, last=[token_info_param]) @wraps(function, new_sig=new_sig) def wrapper(*args, **kwargs): token_info = kwargs.get('token_info', None) if not token_info: logger.error("Token info not received in validate_jwt_claims!") return unauthorize() for claim_name, claim_value in claims.items(): if token_info.get(claim_name, None) != claim_value: logger.error(f"Token claim {claim_name} wih value " f"{claim_value} doesn't match expected " f"value!") return unauthorize() logger.info( "Validated claims on call to %s with token %s and claims: %s", function, token_info, kwargs) if not add_token_info: kwargs.pop("token_info") return function(*args, **kwargs) return wrapper return make_call