Commit 725bda0d authored by Jacek Lebioda's avatar Jacek Lebioda
Browse files

Refactor

parent 0a664454
......@@ -7,6 +7,9 @@ The basic idea is to pass all the required values in **kwargs by their names.
"""
import copy
import re
from beacon import helpers
def inject_required(what_to_inject, from_what, to_what):
......@@ -228,3 +231,51 @@ def create_beacon_dataset(**kwargs):
'sampleCount', 'externalUrl', 'info']
return populate_fields({}, fields_required, fields_optional, **kwargs)
def validate_input(**kwargs):
"""
Validates query parameters. If there's a violation,
an `APIArgumentError` exception is raised
"""
def wrap(name, message, regexp):
re_regexp = re.compile(regexp)
if not helpers.check_input(kwargs, name, re_regexp):
arg = kwargs.get(name) if kwargs.get(name) is not None else '<<None>>'
raise APIArgumentError("Invalid {0} ({1}) - {2}".format(name, arg, message))
wrap('referenceName',
'accepted ones are: 1-22, X, Y, M, MT',
r'^([1-9]|1\d|2[012]|[Xx]|[Yy]|[Mm][Tt]?)$')
wrap('start',
'should be a positive number',
r'^\d+$')
wrap('assemblyId',
'accepted ones are: GRCh38, GRCh37 and NCBI36',
r'^([Gg][Rr][Cc][Hh](38|37)|[Nn][Cc][Bb][Ii]36)$')
wrap('referenceBases',
'should contain only A, C, T, G, D',
r'^[AaCcTtGgDd]*$')
wrap('alternateBases',
'should contain only A, C, T, G, D',
r'^[AaCcTtGgDd]*$')
whitelist = [True, False, 'true', 'false', 'True', 'False', '1']
if 'includeDatasetResponses' in kwargs and kwargs['includeDatasetResponses'] not in whitelist:
raise APIArgumentError("Invalid {0} ({1}) - {2}".format("includeDatasetResponses",
kwargs['includeDatasetResponses'],
'a boolean was expected'))
return True
class APIArgumentError(ValueError):
"""
Exception thrown if there's something wrong with the argument
provided to API query
"""
pass
......@@ -9,6 +9,9 @@ import json
import os
import urllib.request
from beacon.data_types import APIArgumentError
from beacon.middleware import build_query
def make_http_request(uri):
"""
......@@ -92,3 +95,20 @@ def load_json(path_to_file):
return the_object
except ValueError as e:
raise ValueError("It is not a correct JSON! {0}".format(str(e.args)))
def build_urls(instances, **kwargs):
"""
Returns list of all URLs to query.
"""
the_query = build_query(**kwargs)
if not len(instances):
message = "The beacon have no datasets or the specified datasets do not exist!"
raise APIArgumentError(message)
urls = [
uri['endpoint'] + the_query for uri in instances
]
return urls
......@@ -10,15 +10,9 @@ import re
from flask import jsonify
from beacon import data_types, helpers, liftover
from beacon.settings_loader import DATASET_CONTROLLED, DATASET_PUBLIC
class APIArgumentError(ValueError):
"""
Exception thrown if there's something wrong with the argument
provided to API query
"""
pass
from beacon.data_types import validate_input, APIArgumentError
from beacon.helpers import build_urls
from beacon.settings import restrict_access, raise_error_if_not_sufficient_permissions
def query(configuration, request, user_session_data):
......@@ -33,10 +27,12 @@ def query(configuration, request, user_session_data):
try:
validate_input(**request)
data_sets = request.get('datasetIds', [])
instances = configuration.get_databases(data_sets)
instances, filtered_instances = restrict_access(instances, user_session_data)
raise_error_if_not_sufficient_permissions(filtered_instances, **request)
urls = build_urls(instances, **request)
variantdb_responses = helpers.make_http_requests(urls)
......@@ -58,63 +54,6 @@ def query(configuration, request, user_session_data):
return jsonify(response)
def validate_input(**kwargs):
"""
Validates query parameters. If there's a violation,
an `APIArgumentError` exception is raised
"""
def wrap(name, message, regexp):
re_regexp = re.compile(regexp)
if not helpers.check_input(kwargs, name, re_regexp):
arg = kwargs.get(name) if kwargs.get(name) is not None else '<<None>>'
raise APIArgumentError("Invalid {0} ({1}) - {2}".format(name, arg, message))
wrap('referenceName',
'accepted ones are: 1-22, X, Y, M, MT',
r'^([1-9]|1\d|2[012]|[Xx]|[Yy]|[Mm][Tt]?)$')
wrap('start',
'should be a positive number',
r'^\d+$')
wrap('assemblyId',
'accepted ones are: GRCh38, GRCh37 and NCBI36',
r'^([Gg][Rr][Cc][Hh](38|37)|[Nn][Cc][Bb][Ii]36)$')
wrap('referenceBases',
'should contain only A, C, T, G, D',
r'^[AaCcTtGgDd]*$')
wrap('alternateBases',
'should contain only A, C, T, G, D',
r'^[AaCcTtGgDd]*$')
whitelist = [True, False, 'true', 'false', 'True', 'False']
if 'includeDatasetResponses' in kwargs and kwargs['includeDatasetResponses'] not in whitelist:
raise APIArgumentError("Invalid {0} ({1}) - {2}".format("includeDatasetResponses",
kwargs['includeDatasetResponses'],
'a boolean was expected'))
return True
def build_urls(instances, **kwargs):
"""
Returns list of all URLs to query.
"""
the_query = build_query(**kwargs)
if not len(instances):
message = "The beacon have no datasets or the specified datasets do not exist!"
raise APIArgumentError(message)
urls = [
uri['endpoint'] + the_query for uri in instances
]
return urls
def build_query(**kwargs):
"""
Translates Beacon API query to an URL query to the VariantDB API
......@@ -164,58 +103,3 @@ def include_dataset_responses(where_to_inject, responses_to_inject, databases):
# This is a place where obtaining more information would happen
def get_configuration_for_user(configuration, user_data):
"""Returns information about the configuration with respect to the user's privileges"""
all_datasets = configuration['datasets']
configuration['datasets'] = []
for dataset in all_datasets:
if should_be_accessible_by(dataset, user_data):
dataset['info']['authorized'] = "true"
else:
dataset['info']['authorized'] = "false"
configuration['datasets'].append(dataset)
return configuration
def should_be_accessible_by(dataset, user_data):
"""Checks, if the dataset provided in the argument should be accessible by the user"""
if is_dataset_public(dataset):
return True
if dataset['name'] in user_data:
return True
return False
def is_dataset_public(dataset):
"""Checks, if the dataset provided in the argument is marked as public"""
try:
if dataset['info'].get('accessType', DATASET_PUBLIC) != DATASET_CONTROLLED:
return True
except KeyError:
return True
return False
def restrict_access(instances, user_session_data):
"""
Removes from the list of instances those, which user has no access to.
:param instances: the list of instances metadata
:param user_session_data: the list of user permissions
:return: tuple (filtered list of instances, list of restricted instances)
"""
accessible_instances = []
filtered_instances = []
for instance in instances:
if instance.get('protected', '') == 'aai':
if instance.get('aaiResourceName', 'name_not_specified') in user_session_data:
accessible_instances.append(instance)
else:
filtered_instances.append(instance)
else:
accessible_instances.append(instance)
return accessible_instances, filtered_instances
def raise_error_if_not_sufficient_permissions(filtered_instances, **request):
pass
......@@ -37,7 +37,10 @@ def register_extensions(app):
logger.debug('')
def create_app_and_configuration(config_object={}):
def create_app_and_configuration(config_object=None):
if config_object is None:
config_object = {}
logger.debug('Creating application:')
# The main Flask application
......
......@@ -11,6 +11,8 @@ import os
import sys
from beacon import settings_loader
from beacon.data_types import APIArgumentError
from beacon.settings_loader import DATASET_PUBLIC, DATASET_CONTROLLED
API_PREFIX = u'/v1'
......@@ -42,9 +44,12 @@ class Settings:
self.Beacon = {}
self.DatasetConnections = {}
def get_databases(self, data_sets=[]):
def get_databases(self, data_sets=None):
"""Provides a way to filter database connection strings"""
if data_sets is None:
data_sets = []
if not self.correct:
raise ValueError("Settings were not correctly loaded!")
......@@ -102,3 +107,70 @@ class Settings:
def _try_to_load_from_file(self, filename=FALLBACK_CONFIG_FILE_LOCATION):
self._try_to_load_from_source(filename, "config file")
def get_configuration_for_user(configuration, user_data):
"""Returns information about the configuration with respect to the user's privileges"""
all_datasets = configuration['datasets']
configuration['datasets'] = []
for dataset in all_datasets:
if should_be_accessible_by(dataset, user_data):
dataset['info']['authorized'] = "true"
else:
dataset['info']['authorized'] = "false"
configuration['datasets'].append(dataset)
return configuration
def should_be_accessible_by(dataset, user_data):
"""Checks, if the dataset provided in the argument should be accessible by the user"""
if is_dataset_public(dataset):
return True
if dataset['name'] in user_data:
return True
return False
def is_dataset_public(dataset):
"""Checks, if the dataset provided in the argument is marked as public"""
try:
if dataset['info'].get('accessType', DATASET_PUBLIC) != DATASET_CONTROLLED:
return True
except KeyError:
return True
return False
def restrict_access(all_instances, user_session_data):
"""
Removes from the list of instances those, which user has no access to.
:param all_instances: the list of instances' metadata
:param user_session_data: the list of user permissions
:return: tuple (filtered list of instances, list of restricted instances)
"""
accessible_instances = []
filtered_instances = []
for instance in all_instances:
if instance.get('protected', '') == 'aai':
if instance.get('aaiResourceName', 'name_not_specified') in user_session_data:
accessible_instances.append(instance)
else:
filtered_instances.append(instance)
else:
accessible_instances.append(instance)
return accessible_instances, filtered_instances
def raise_error_if_not_sufficient_permissions(filtered_instances, **request):
"""
Raises APIArgumentError, if client requested access to a dataset she doesn't have permissions to
:param filtered_instances: which instances are forbidden to the user
:param request: dictionary of what user queried for
:return: This function doesn't return anything, it just can raise an exception
"""
restricted_ids = [instance.get('id', None) for instance in filtered_instances]
for requested_dataset_id in request.get('datasetIds', []):
if requested_dataset_id in restricted_ids:
raise APIArgumentError('Insufficient permissions to access dataset of ID={}'.format(requested_dataset_id))
......@@ -63,7 +63,7 @@ def register_views(app, configuration):
def api_beacon():
"""`/beacon/` API endpoint"""
user_session_data = session.get('permissions', [])
beacon_information = middleware.get_configuration_for_user(configuration.Beacon, user_session_data)
beacon_information = settings.get_configuration_for_user(configuration.Beacon, user_session_data)
return jsonify(beacon_information)
@app.route(settings.prefix('/beacon/query'),
......
......@@ -75,7 +75,8 @@ class DataTypesTestCase(MetaBeaconTestCase):
assert to_what[what_to_inject] == from_what__existent[what_to_inject], \
the_message + " inject the proper value"
def test__populate_fields(self):
@staticmethod
def test__populate_fields():
initial_dict = {'a': '123'}
params = {'b': '234'}
......
......@@ -4,7 +4,7 @@
"""
Unit/integration tests for the Beacon's middleware.
"""
import beacon.data_types
import beacon.helpers
import beacon.middleware
from beacon import data_types, middleware
......@@ -34,8 +34,8 @@ class MiddlewareTestCase(MetaBeaconTestCase):
the_message = "`validate_input` should fail: incorrect `{0}`"
try:
the_request = build_request(*arguments)
middleware.validate_input(**the_request)
except middleware.APIArgumentError:
beacon.data_types.validate_input(**the_request)
except beacon.data_types.APIArgumentError:
return
self.fail(the_message.format(argument_name)) # pragma: no cover
......@@ -44,7 +44,7 @@ class MiddlewareTestCase(MetaBeaconTestCase):
for name in correct_reference_names:
request['referenceName'] = name
message = "`validate_input` with correct arguments should work"
assert middleware.validate_input(**request), message
assert beacon.data_types.validate_input(**request), message
case(['x', '-1', 'a', 't', 'GRCh37'], 'start')
case(['x', '1', 'e', 't', 'GRCh37'], 'reference_bases')
......@@ -97,7 +97,7 @@ class MiddlewareTestCase(MetaBeaconTestCase):
alternateBases=alternate_bases,
assemblyId='GRCh37')
urls = beacon.middleware.build_urls(self.configuration.DatasetConnections, **request)
urls = beacon.helpers.build_urls(self.configuration.DatasetConnections, **request)
import urllib.parse
for url in urls:
......
......@@ -8,6 +8,7 @@ Unit/integration tests for the Beacon's server.
from json import loads
from urllib.error import URLError
import beacon.helpers
from beacon import helpers, middleware, data_types
from tests.test import MetaBeaconTestCase
......@@ -66,7 +67,7 @@ class BeaconTestCase(MetaBeaconTestCase):
def case(**kwargs):
params = data_types.create_beacon_allele_request(**kwargs)
urls = middleware.build_urls(self.configuration.DatasetConnections, **params)
urls = beacon.helpers.build_urls(self.configuration.DatasetConnections, **params)
url = urls[0]
try:
......@@ -77,7 +78,7 @@ class BeaconTestCase(MetaBeaconTestCase):
except NotImplementedError as _: # pragma: no cover
self.fail("/api/beacon/query is not fully implemented, see the source")
except URLError as error: # pragma: no cover
self.fail("/api/beacon/query experienced an URL error: " + error.args)
self.fail("/api/beacon/query experienced an URL error: " + str(error.args))
r = "VariantDB should reply to the query"
assert 'start' in data['fields'], r
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment