Commit 28593f0f authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Rework some tests

Adapt some tests to the refactoring

Add unit test for one auth endpoint

Make tests more verbose

Fix deployment tests
parent d5045fd8
......@@ -2,9 +2,12 @@
import logging
from feudal.backend import models
from feudal.backend.tests import BaseTestCase
from feudal.backend.models import deployments
from feudal.backend.models import Service
from feudal.backend.models.deployments import (
Deployment, get_deployment,
DEPLOYED, NOT_DEPLOYED, DEPLOYMENT_PENDING, REMOVAL_PENDING,
)
LOGGER = logging.getLogger(__name__)
......@@ -12,55 +15,59 @@ LOGGER = logging.getLogger(__name__)
class DeploymentTest(BaseTestCase):
def tearDown(self):
deployments.Deployment.objects.all().delete()
Deployment.objects.all().delete()
def check_new_deployment(self, deployment, service_count):
self.assertIsNotNone(deployment)
self.assertEqual(deployment.state, deployments.NOT_DEPLOYED)
self.assertEqual(deployment.state_target, deployments.NOT_DEPLOYED)
self.assertEqual(deployment.state, NOT_DEPLOYED)
self.assertEqual(deployment.state_target, NOT_DEPLOYED)
self.assertEqual(len(deployment.services), service_count)
self.assertEqual(deployment.states.count(), service_count)
def deployment_run(self, deployment, service_count):
self.check_new_deployment(deployment, service_count)
deployment.state_target = deployments.DEPLOYED
deployment.state_target = DEPLOYED
deployment.save()
deployment.target_changed()
if service_count > 0:
self.assertEqual(deployment.state, deployments.DEPLOYMENT_PENDING)
self.assertEqual(deployment.state, DEPLOYMENT_PENDING)
self.assertFalse(deployment.target_reached)
else:
self.assertEqual(deployment.state, deployments.DEPLOYED)
self.assertEqual(deployment.state, DEPLOYED)
self.assertTrue(deployment.target_reached)
# execute the deployment
for state in deployment.states.all():
self.assertEqual(state.state, deployments.DEPLOYMENT_PENDING)
state.state = deployments.DEPLOYED
self.assertEqual(state.state, DEPLOYMENT_PENDING)
state.state = DEPLOYED
state.save()
state.state_changed()
self.assertEqual(deployment.state, deployments.DEPLOYED)
self.assertEqual(deployment.state, DEPLOYED)
self.assertTrue(deployment.target_reached)
# start removal
deployment.state_target = deployments.NOT_DEPLOYED
deployment.state_target = NOT_DEPLOYED
deployment.save()
deployment.target_changed()
if service_count > 0:
self.assertEqual(deployment.state, deployments.REMOVAL_PENDING)
self.assertEqual(deployment.state, REMOVAL_PENDING)
self.assertFalse(deployment.target_reached)
else:
self.assertEqual(deployment.state, deployments.NOT_DEPLOYED)
self.assertEqual(deployment.state, NOT_DEPLOYED)
self.assertTrue(deployment.target_reached)
# execute removals
for state in deployment.states.all():
self.assertEqual(state.state, deployments.REMOVAL_PENDING)
state.state = deployments.NOT_DEPLOYED
self.assertEqual(state.state, REMOVAL_PENDING)
state.state = NOT_DEPLOYED
state.save()
state.state_changed()
self.assertEqual(deployment.state, deployments.NOT_DEPLOYED)
self.assertEqual(deployment.state, NOT_DEPLOYED)
self.assertTrue(deployment.target_reached)
# service count: prior to delayed service
......@@ -68,15 +75,25 @@ class DeploymentTest(BaseTestCase):
self.check_new_deployment(deployment, service_count)
# start deployment
deployment.state_target = deployments.DEPLOYED
deployment.state_target = DEPLOYED
deployment.save()
deployment.target_changed()
if service_count == 0:
self.assertFalse(deployment.states.exists())
self.assertTrue(deployment.target_reached)
else:
# check constraints
self.assertEqual(deployment.states.count(), service_count)
self.assertFalse(deployment.target_reached)
for state in deployment.states.all():
self.assertEqual(state.state_target, DEPLOYED)
self.assertEqual(state.state, DEPLOYMENT_PENDING)
# add service
delayed_service = models.Service.get_service(
delayed_service = Service.get_service(
'DELAYED_SERVICE',
self.site,
vos=[vo],
......@@ -84,50 +101,52 @@ class DeploymentTest(BaseTestCase):
deployment.service_added(delayed_service)
for state in deployment.states.all():
self.assertEqual(state.state, deployments.DEPLOYMENT_PENDING)
self.assertEqual(state.state, DEPLOYMENT_PENDING)
self.assertEqual(state.state_target, DEPLOYED)
self.assertEqual(deployment.states.count(), service_count + 1)
self.assertEqual(deployment.state, deployments.DEPLOYMENT_PENDING)
self.assertEqual(deployment.state, DEPLOYMENT_PENDING)
# execute deployment
for state in deployment.states.all():
state.state = deployments.DEPLOYED
state.state = DEPLOYED
state.state_changed()
# the deployment was not done for new_service
self.assertEqual(deployment.state, deployments.DEPLOYED)
self.assertEqual(deployment.state, DEPLOYED)
# start removal
deployment.state_target = deployments.NOT_DEPLOYED
deployment.state_target = NOT_DEPLOYED
deployment.save()
deployment.target_changed()
self.assertEqual(deployment.state, deployments.REMOVAL_PENDING)
self.assertEqual(deployment.state, REMOVAL_PENDING)
# execute removals
for state in deployment.states.all():
self.assertEqual(state.state, deployments.REMOVAL_PENDING)
state.state = deployments.NOT_DEPLOYED
self.assertEqual(state.state, REMOVAL_PENDING)
state.state = NOT_DEPLOYED
state.state_changed()
self.assertEqual(deployment.state, deployments.NOT_DEPLOYED)
self.assertEqual(deployment.state, NOT_DEPLOYED)
def test_group_with_no_service(self):
deployment = deployments.get_deployment(self.user, self.group_none)
deployment = get_deployment(self.user, self.group_none)
self.deployment_run(deployment, 0)
def test_group_with_service(self):
deployment = deployments.get_deployment(self.user, vo=self.group_one)
deployment = get_deployment(self.user, vo=self.group_one)
self.deployment_run(deployment, 1)
def test_group_with_two_services(self):
deployment = deployments.get_deployment(self.user, vo=self.group_two)
deployment = get_deployment(self.user, vo=self.group_two)
self.deployment_run(deployment, 2)
# a vo with one service gets another service *after* the user requested the deployment
def test_group_with_delayed_service(self):
deployment = deployments.get_deployment(self.user, self.group_one)
deployment = get_deployment(self.user, self.group_one)
self.deployment_run_delayed_service(deployment, self.group_one, 1)
# a vo with two services loses one service *after* the user requested the deployment
def test_group_with_vanishing_service(self):
deployments.get_deployment(self.user, vo=self.group_two)
get_deployment(self.user, vo=self.group_two)
......@@ -3,7 +3,7 @@ import logging
from django.contrib.auth import authenticate
from feudal.backend import models
from feudal.backend.models.users import User
from feudal.backend.tests import BaseTestCase
LOGGER = logging.getLogger(__name__)
......@@ -12,7 +12,7 @@ LOGGER = logging.getLogger(__name__)
class UserTest(BaseTestCase):
def test_user_construction(self):
user = models.User.get_user(
user = User.get_user(
self.TEST_USERINFO,
self.idp,
)
......
......@@ -69,10 +69,8 @@ class BaseTestCase(TestCase):
cls.user.vos.add(cls.group_one)
cls.user.vos.add(cls.group_two)
cls.site = Site(name='test_site')
cls.site.save()
cls.site2 = Site(name='test_site_2')
cls.site2.save()
cls.site = Site.objects.create(name='test_site')
cls.site2 = Site.objects.create(name='test_site_2')
cls.service_one = Service.get_service(
'test_service_one',
......
from django.test import TestCase, override_settings
from django.test.client import Client
from feudal.backend.models.auth.vos import Group, Entitlement
from feudal.backend.models.brokers import RabbitMQInstance
from feudal.backend.models.users import User
from feudal.backend.models.auth import OIDCConfig
from feudal.backend.models import Site, Service
@override_settings(DEBUG=True, DEBUG_AUTH=True)
class AuthorizationTestCase(TestCase):
@classmethod
def setUpTestData(cls):
cls.TEST_ISSUER = 'https://unity.test-federation.de/oauth2'
cls.idp = OIDCConfig.objects.create(
name='test_idp_name',
client_id='test_idp_client_id',
client_secret='test_idp_secret',
issuer_uri=cls.TEST_ISSUER,
)
cls.downstream_client = User.construct_client('downstream', 'aeuaoeuaoeuaoeua')
cls.downstream_client.save()
cls.group = Group.objects.create(name='_authorization_test_group')
cls.raw_entitlement = 'urn:geant:h-df.de:group:myExampleColab#unity.helmholtz-data-federation.de'
cls.entitlement = Entitlement.get_entitlement(cls.raw_entitlement, cls.idp)
cls.entitlement.save()
cls.site = Site.objects.create(
client=cls.downstream_client,
name='test_site',
)
cls.service = Service.objects.create(
name='test_service',
site=cls.site,
)
cls.service.vos.add(cls.group)
cls.service.vos.add(cls.entitlement)
cls.broker = RabbitMQInstance.load()
def setUp(self):
self.client = Client()
def test_topic_group_success(self):
# https://github.com/rabbitmq/rabbitmq-auth-backend-http#topic_path
response = self.client.post(
'/auth/client/topic',
{
'username': self.downstream_client.username,
'vhost': self.broker.vhost,
'resource': 'topic',
'name': 'groups', # exchange name
'routing_key': self.group.name,
}
)
self.assertEqual(response.content.decode('utf-8'), 'allow')
def test_topic_group_failure(self):
# https://github.com/rabbitmq/rabbitmq-auth-backend-http#topic_path
response = self.client.post(
'/auth/client/topic',
{
'username': self.downstream_client.username,
'vhost': self.broker.vhost,
'resource': 'topic',
'name': 'groups', # exchange name
'routing_key': 'non-existant-group',
}
)
self.assertEqual(response.content.decode('utf-8'), 'deny')
def test_topic_entitlement_success(self):
# https://github.com/rabbitmq/rabbitmq-auth-backend-http#topic_path
response = self.client.post(
'/auth/client/topic',
{
'username': self.downstream_client.username,
'vhost': self.broker.vhost,
'resource': 'topic',
'name': 'entitlements', # exchange name
'routing_key': self.raw_entitlement,
}
)
self.assertEqual(response.content.decode('utf-8'), 'allow')
def test_topic_entitlement_failure(self):
# https://github.com/rabbitmq/rabbitmq-auth-backend-http#topic_path
response = self.client.post(
'/auth/client/topic',
{
'username': self.downstream_client.username,
'vhost': self.broker.vhost,
'resource': 'topic',
'name': 'entitlements', # exchange name
'routing_key': 'non-existant-entitlement',
}
)
self.assertEqual(response.content.decode('utf-8'), 'deny')
......@@ -2,7 +2,7 @@
from django.urls import path, include
from django.views.decorators.csrf import csrf_exempt
from . import webpage, clients
from feudal.backend.views.auth import webpage, clients
clientpatterns = [
path('user', csrf_exempt(clients.user_endpoint)),
......
......@@ -4,7 +4,8 @@ import base64
from django.test import TestCase, Client
from feudal.backend import models
from feudal.backend.models import Site
from feudal.backend.models.users import User
LOGGER = logging.getLogger(__name__)
......@@ -18,7 +19,7 @@ class ClientViewTest(TestCase):
cls.CLIENT_NAME = 'TEST_DOWNSTREAM_CLIENT'
cls.CLIENT_PASSWORD = 'test1234'
cls.api_client = models.User.construct_client(
cls.api_client = User.construct_client(
cls.CLIENT_NAME,
cls.CLIENT_PASSWORD,
)
......@@ -26,13 +27,13 @@ class ClientViewTest(TestCase):
cls.SITE_NAME = 'TEST_SITE'
cls.site = models.Site(
cls.site = Site(
client=cls.api_client,
name=cls.SITE_NAME,
)
cls.site.save()
def setupUp(self):
def setUp(self):
self.client = Client()
def auth_headers(self):
......
import logging
from feudal.backend.models.users import SSHPublicKey
from feudal.backend.tests import LoggedInTest
from feudal.backend.models import deployments
from feudal.backend.models.users import SSHPublicKey
from feudal.backend.models.deployments import (
Deployment, VODeployment, ServiceDeployment, DEPLOYED, NOT_DEPLOYED, QUESTIONNAIRE
)
LOGGER = logging.getLogger(__name__)
......@@ -117,7 +119,7 @@ class DeploymentsViewTest(LoggedInTest):
def test_service_deployment(self):
service = self.service_one
for target in (deployments.DEPLOYED, deployments.NOT_DEPLOYED):
for target in (DEPLOYED, NOT_DEPLOYED):
response = self.client.patch(
make_path('deployment/service/{}', service.id),
data={
......@@ -131,13 +133,13 @@ class DeploymentsViewTest(LoggedInTest):
try:
dep = self.user.deployments.get(servicedeployment__service=service)
self.assertEqual(dep.state_target, target)
except deployments.Deployment.DoesNotExist:
except Deployment.DoesNotExist:
self.fail('User did not get the service deployment')
def test_vo_deployment(self):
vo = self.group_one
for target in (deployments.DEPLOYED, deployments.NOT_DEPLOYED):
for target in (DEPLOYED, NOT_DEPLOYED):
response = self.client.patch(
make_path('deployment/vo/{}', vo.id),
data={
......@@ -151,7 +153,7 @@ class DeploymentsViewTest(LoggedInTest):
try:
dep = self.user.deployments.get(vodeployment__vo=vo)
self.assertEqual(dep.state_target, target)
except deployments.Deployment.DoesNotExist:
except Deployment.DoesNotExist:
self.fail('User did not get the VODeployment')
# pylint misses the reverse accessor 'deployments' on user
......@@ -163,7 +165,7 @@ class DeploymentsViewTest(LoggedInTest):
self.assertEqual(response.status_code, 200)
vo_deps = response.json()
self.assertEqual(
self.user.deployments.instance_of(deployments.VODeployment).count(),
self.user.deployments.instance_of(VODeployment).count(),
len(vo_deps),
)
......@@ -176,7 +178,7 @@ class DeploymentsViewTest(LoggedInTest):
self.assertEqual(response.status_code, 200)
service_deps = response.json()
self.assertEqual(
self.user.deployments.instance_of(deployments.ServiceDeployment).count(),
self.user.deployments.instance_of(ServiceDeployment).count(),
len(service_deps),
)
......@@ -188,7 +190,7 @@ class DeploymentStateViewTest(LoggedInTest):
response = self.client.patch(
make_path('deployment/service/{}', service.id),
data={
'state_target': deployments.DEPLOYED,
'state_target': DEPLOYED,
},
content_type='application/json',
)
......@@ -200,7 +202,7 @@ class DeploymentStateViewTest(LoggedInTest):
state = self.user.states.get(id=state_id)
# mock client questionnaire response
state.state = deployments.QUESTIONNAIRE
state.state = QUESTIONNAIRE
state.questionnaire = {
'question_foo': 'description_foo',
'question_bar': 'description_bar',
......
......@@ -4,8 +4,8 @@ import base64
from django.test import Client
from feudal.backend.models import User
from feudal.backend.tests import BaseTestCase
from feudal.backend.models.users import User
LOGGER = logging.getLogger(__name__)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment