"""A package to accelerate REST API development"""
import getopt
import logging
import os
import sys
import time
from dataclasses import fields
from functools import wraps
from flask import jsonify, make_response
from flask.wrappers import Response
from nova_api import baseapi
from nova_api.dao import GenericDAO
# Authorization schemas
JWT = 0
AUTHENTICATION_SCHEMAS = {"JWT": 0}
possible_level = {"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL}
logger = logging.getLogger(__name__)
JWT_SECRET = os.environ.get('JWT_SECRET', "1234567890a")
[docs]def default_response(success: bool, status_code: int,
message: str, data: dict) -> Response:
""" Send a flask response with json payload in a default format
Example:
JSON response ::
{
"sucess": True,
"message": "API call OK",
"data": {
"num": "123"
}
}
:param success: bool that represents if the request was successfully \
processed.
:param status_code: integer that represents the http status code of the \
response.
:param message: summary string for the response.
:param data: dictionary (json valid) with data to be sent in the response
:return: a flask response with headers and status codes set
"""
json_content = jsonify({"success": success,
"message": message,
"data": data})
logger.info("Sending message: %s with status code %s and success %s",
str(json_content),
str(status_code),
success)
return make_response(
json_content,
status_code,
{"Content-type": "application/json"}
)
[docs]def error_response(status_code: int = 500, message: str = "Error",
data: dict = None) -> Response:
"""Wrapper of default_response for error responses.
Calls default_response with status_code=500, message=Error
and success=false passing data. Other status code and messages
may be passed.
:param status_code: Integer that represents the http status code of the \
response.
:param message: Summary string for the response.
:param data: Dictionary (json valid) with data to be sent in the response
:return: Default response with success=false
"""
if data is None:
data = dict()
return default_response(success=False, status_code=status_code,
message=message, data=data)
[docs]def success_response(status_code: int = 200, message: str = "OK",
data: dict = None) -> Response:
"""Wrapper of default_response for success responses.
Calls default_response with status_code=200, message=OK
and success=true passing data. Other status code and messages
may be passed.
:param status_code: Integer that represents the http status code of \
the response.
:param message: Summary string for the response.
:param data: Dictionary (json valid) with data to be sent in the \
response
:return: Default response with success=true
"""
if data is None:
data = dict()
return default_response(success=True, status_code=status_code,
message=message, data=data)
[docs]def use_dao(dao_class: GenericDAO,
error_message: str = "Erro",
dao_parameters: dict = None,
retry_delay: float = float(os.environ.get("NOVAAPI_RETRY_DELAY",
"1.0")),
retries: int = int(os.environ.get("NOVAAPI_RETRIES", "3")), ):
"""Decorator to handle database access in an API call
This decorator instantiates the DAO specified in `dao_class` within a try \
except block. If a exception is raised during the API call or database \
access, it generates an `error_response` with `message=error_message` and \
with the exception description in data. The DAO instance is passed to the \
decorated function as a keyword argument `dao`.
:param dao_class: DAO to instantiate and pass to the decorated function
:param error_message: Default error message to send in the error_response \
if an exception is thrown
:param dao_parameters: Parameters to add to the call to the DAO \
constructor. They'll be added to the call with the expansion \
`**dao_parameters`.
:param retries: Number of times to retry connection with database. \
Defaults to 3. May be set through the env variable NOVAAPI_RETRIES
:param retry_delay: Seconds to wait before retrying to connect to \
database. Defaults to 1.0. May be set through the env variable \
NOVAAPI_RETRY_DELAY.
:return: The decorated function
"""
if dao_parameters is None:
dao_parameters = dict()
def make_call(function):
@wraps(function)
def wrapper(*args, **kwargs):
dao = None
try:
logger.info(
"API call to %s with dao %s and args: %s, kwargs: %s",
function,
dao_class,
args,
kwargs
)
attempted_retries = retries
while attempted_retries:
try:
dao = dao_class(**dao_parameters)
break
except ConnectionError as con_error:
print("Connection failed, will retry "
"%s times" % attempted_retries)
time.sleep(retry_delay)
if attempted_retries == 1:
raise con_error
finally:
attempted_retries -= 1
return function(dao=dao, *args, **kwargs)
# pylint: disable=W0703
except Exception as exception:
logger.error(
"Unable to generate api response due to an error.",
exc_info=True)
return error_response(message=error_message,
data={"error": str(exception)})
finally:
if dao:
dao.close()
return wrapper
return make_call
[docs]def generate_api():
"""CLI interface for generate_nova_api. Generates API files.
Must be called at least with -e <Entity>. Generates the api files \
with the arguments. Accepts the following arguments:
* *-e*: Name of the Entity, which must be the same of the file that \
contains it
* *-d*: Name of the Entity DAO class, which must be the same of the \
file that contains it. Only needs to be specified if it's not EntityDAO. \
(Where Entity is the name of the entity passed to -e)
* *-v*: API version string. Will be used in the base path before the \
entity name
* *-a*: Authentication Schema, type of authentication which will be \
applied to endpoints in the generated API.
:return: None.
"""
entity = ''
version = ''
dao_class = ''
auth = None
overwrite = False
usage = 'Usage: %s generate_api -e entity ' \
'[-d entity_dao -v api_version -a auth_schema -o overwrite]'
try:
options, _ = getopt.getopt(sys.argv[1:], "e:d:v:a:o")
for option, value in options:
if option == '-e':
entity = value
elif option == '-d':
dao_class = value
elif option == '-v':
version = value
elif option == '-a':
auth = value.strip()
elif option == '-o':
overwrite = True
except getopt.GetoptError:
logger.error("Error while reading options passed to generate_api. "
"Options received: %s", sys.argv,
exc_info=True)
print(usage % (sys.argv[0]))
sys.exit(os.EX_DATAERR)
if entity == '':
logger.critical("Entity not passed in the arguments! Call: %s",
sys.argv)
print(usage % (sys.argv[0]))
sys.exit(os.EX_USAGE)
try:
sys.path.insert(0, '')
mod = __import__(entity, fromlist=[entity])
ent = getattr(mod, entity)
logger.debug("Entity found and successfully imported. Entity: %s", ent)
if dao_class == '':
dao_class = entity + 'DAO'
logger.debug("DAO class name not passed, infering as %s",
dao_class)
mod = __import__(dao_class, fromlist=[dao_class])
dao = getattr(mod, dao_class)
logger.debug("DAO class found and successfully imported. DAO: %s",
dao_class)
except ModuleNotFoundError:
print("You should run the script in the same folder as your entity and"
" it's DAO class. You must inform the entity name with -e and "
"the DAO name with -d. You may inform the version with -v.")
logger.critical("Not able to import entity and dao class.",
exc_info=True)
sys.exit(os.EX_IOERR)
if auth and auth not in AUTHENTICATION_SCHEMAS.keys():
print(("Schema %s not supported! The supported schemas "
"are: " % auth) + ', '.join(
AUTHENTICATION_SCHEMAS.keys()))
sys.exit(os.EX_DATAERR)
try:
create_api_files(ent, dao, version, overwrite=overwrite,
auth_schema=AUTHENTICATION_SCHEMAS.get(auth, None))
except (OSError, EOFError) as err:
print("Something went wrong while creating the API files...", err)
sys.exit(os.EX_CANTCREAT)
def get_auth_schema_yml(schema: int = None):
if schema is None:
return None
return baseapi.SECURITY_DEFINITIONS[schema]
def create_api_files(entity, dao_class, version,
overwrite=False, auth_schema=None):
entity_lower = entity.__name__.lower()
if os.path.isfile(
"{entity_lower}_api.py".format(entity_lower=entity_lower)) \
and not overwrite:
logger.debug("API already exists. Skipping generation...")
else:
with open("{entity_lower}_api.py".format(entity_lower=entity_lower),
'w+') as api_implementation:
logger.info("Writing api implementation for entity %s...",
entity_lower)
api_implementation.write(baseapi.BASE_API.format(
DAO_CLASS=dao_class.__name__,
ENTITY=entity.__name__,
ENTITY_LOWER=entity_lower))
logger.info("Done writing api for entity %s.", entity_lower)
if version == '':
version = '1'
logger.info("Version for api is %s", version)
parameters = list()
for field in fields(entity):
if not field.metadata.get("database", True):
continue
parameters.append(
baseapi.PARAMETER.format(parameter_name=field.name,
parameter_location='query',
parameter_type='string'))
parameters = '\n'.join(parameters)
if os.path.isfile(
"{entity_lower}_api.yml".format(entity_lower=entity_lower)) \
and not overwrite:
logger.debug(
"API documentation already exists. Skipping generation...")
else:
with open("{entity_lower}_api.yml".format(entity_lower=entity_lower),
'w+') as api_documentation:
logger.info("Writing api documentation for entity %s...",
entity_lower)
api_documentation.write(baseapi.API_SWAGGER.format(
ENTITY=entity.__name__,
ENTITY_LOWER=entity_lower,
VERSION=version,
PARAMETERS=parameters,
SECURITY=baseapi.SECURITY_PARAMETERS[auth_schema]
if auth_schema is not None
else ""))
if auth_schema is not None:
api_documentation.write(get_auth_schema_yml(auth_schema))
logger.info("Done writing api documentation for entity %s.",
entity_lower)