Commit 2d5debdb authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Add a lot of tests

parent bcbed68b
......@@ -4,6 +4,7 @@ import json
from urllib.request import Request, urlopen
from django.db import models as db_models
from django.core.exceptions import ImproperlyConfigured
from django_mysql.models import JSONField
from oic.oic import Client
from oic.oic.message import RegistrationResponse
......@@ -16,7 +17,7 @@ LOGGER = logging.getLogger(__name__)
OIDC_CLIENT = {}
def scopes_default():
return []
return ['openid', 'profile', 'email', 'credentials']
class OIDCConfig(db_models.Model):
......@@ -76,7 +77,10 @@ class OIDCConfig(db_models.Model):
def default_idp():
return OIDCConfig.objects.filter(enabled=True).first()
available_idps = OIDCConfig.objects.filter(enabled=True)
if not available_idps.exists():
raise ImproperlyConfigured('No OIDCConfig available')
return available_idps.first()
class OIDCTokenAuthBackend(object):
......
import logging
from django.core.exceptions import ImproperlyConfigured
from django.test import TestCase
from feudal.backend.auth.v1.models import default_idp
LOGGER = logging.getLogger(__name__)
class AuthModelsTest(TestCase):
# without setup there is no OIDConfig
def test_default_idp(self):
with self.assertRaises(ImproperlyConfigured):
default_idp()
......@@ -142,5 +142,5 @@ class AuthInfo(generics.RetrieveAPIView):
idps = OIDCConfig.objects.filter(enabled=True)
return {
'idps': idps,
'default': idps.first().id,
'default': default_idp().id,
}
......@@ -157,7 +157,7 @@ class NewDeployment(models.Model):
state_target = models.CharField(
max_length=50,
choices=STATE_CHOICES,
default=NOT_DEPLOYED,
default=DEPLOYED,
)
is_active = models.BooleanField(
......@@ -285,6 +285,14 @@ class NewDeployment(models.Model):
self.publish_to_client()
# each state item publishes its state to the user
def user_credential_removed(self, key):
# TODO implement
pass
def user_credential_added(self, key):
# TODO implement
pass
def service_added(self, service):
LOGGER.debug(self.msg('Adding service {}'.format(service)))
for site in service.site.all():
......
......@@ -130,7 +130,7 @@ def teardown_fixture():
class DeploymentTest(TestCase):
@classmethod
def setUpTestData(cls):
def setUp(cls):
setup_fixture()
delayed_service = models.Service(name='DELAYED_SERVICE')
delayed_service.save()
......@@ -150,8 +150,10 @@ class DeploymentTest(TestCase):
self.assertIsNotNone(deployment)
self.assertEqual(len(deployment.services), service_count)
self.assertEqual(deployment.state_items.count(), service_count)
self.assertEqual(deployment.state_target, models.DEPLOYED)
self.assertEqual(deployment.state, models.NOT_DEPLOYED)
if service_count > 0:
self.assertEqual(deployment.state, models.NOT_DEPLOYED)
deployment.user_deploy()
......
......@@ -156,21 +156,18 @@ class User(AbstractUser):
return self._is_active
def __str__(self):
prefix = ''
if self.is_superuser:
return 'ADMIN {}'.format(self.username)
prefix = 'ADMIN '
elif self.user_type == 'oidcuser':
if not self.is_active:
return 'DEACTIVATED USER {}'.format(self.username)
return 'USER {}'.format(self.username)
prefix = 'DEACTIVATED USER '
else:
prefix = 'USER '
elif self.user_type == 'apiclient':
from . import Site
try:
return 'APICLIENT {}@{}'.format(self.username, self.site)
except Site.DoesNotExist:
return 'APICLIENT {}'.format(self.username)
prefix = 'APICLIENT '
else:
raise Exception()
return prefix+self.username
def msg(self, msg):
return '[{}] {}'.format(self, msg)
......@@ -243,6 +240,8 @@ class User(AbstractUser):
group = Group(name=group_name)
group.save()
self.groups.add(group)
for dep in self.deployments.filter(group=group):
dep.activate()
# include the ssh key from unity
......
......@@ -2,6 +2,7 @@
import logging
from django.contrib.auth.models import Group
from django.core.exceptions import ImproperlyConfigured
from rest_framework import generics, views
from rest_framework.authentication import BasicAuthentication
from rest_framework.response import Response
......@@ -31,6 +32,14 @@ class ConfigurationView(views.APIView):
authentication_classes = AUTHENTICATION_CLASSES
def put(self, request):
# the site where client is located
client_site = None
try:
client_site = request.user.site
except models.Site.DoesNotExist:
raise ImproperlyConfigured("client has no site")
#services = request.data.get('services', None)
group_to_services = request.data.get('group_to_services', None)
......@@ -68,14 +77,14 @@ class ConfigurationView(views.APIView):
except models.Service.DoesNotExist:
LOGGER.info("Site %s pushed new service %s", request.user.site, name)
LOGGER.info("Site %s pushed new service %s", client_site, name)
service = models.Service(
name=name,
description=description,
)
service.save()
service.groups.add(group)
service.site.add(request.user.site)
service.site.add(client_site)
service.handle_group_deployments()
# TODO deactivate vanished services
......
import logging
from django.contrib import auth
from django.contrib.auth.models import Group
from django.test import Client
from django.test import TestCase
from ..models import test_models
from feudal.backend import models
from feudal.backend.models import test_models
LOGGER = logging.getLogger(__name__)
class ViewTest(TestCase):
def setUp(self):
test_models.setup_fixture()
def tearDown(self):
test_models.teardown_fixture()
def get_user():
return auth.authenticate(
username=test_models.TEST_NAME,
password=test_models.TEST_PASSWORD,
)
def get_client(user):
client = Client()
client.force_login(
user=user
)
return client
def test_view(self):
client = Client()
user = auth.authenticate(
username=test_models.TEST_NAME,
password=test_models.TEST_PASSWORD,
)
client.force_login(
user=user
)
response = client.get(
'/backend/api/state/',
)
class StateViewTest(TestCase):
@classmethod
def setUpTestData(cls):
test_models.setup_fixture()
def test_api_state(self):
user = get_user()
client = get_client(user)
response = client.get('/backend/api/state')
self.assertTrue('services' in response.json())
self.assertTrue('user_state' in response.json())
self.assertTrue('user' in response.json())
self.assertEqual(response.json()['user']['userinfo']['email'], test_models.TEST_EMAIL)
class SSHPublicKeyViewTest(TestCase):
@classmethod
def setUpTestData(cls):
test_models.setup_fixture()
def test_error(self):
user = get_user()
client = get_client(user)
response = client.post(
'/backend/api/sshkey',
)
self.assertEqual(response.status_code, 400)
self.assertEqual(response.json()['error'], 'malformed request')
def test_add_success(self):
user = get_user()
client = get_client(user)
response = client.post(
'/backend/api/sshkey',
data={
'type': 'add',
'key': {
'name': 'key_name',
'key': 'key_value'
}
},
content_type='application/json',
)
self.assertEqual(response.status_code, 200)
try:
key = user.ssh_keys.get(name='key_name')
self.assertIsNotNone(key)
self.assertEqual(key.name, 'key_name')
self.assertEqual(key.key, 'key_value')
except models.SSHPublicKey.DoesNotExist:
self.fail('User did not receive the key')
def test_remove_success(self):
user = get_user()
sshkey = models.SSHPublicKey(name='key_name', key='key_value', user=user)
sshkey.save()
client = get_client(user)
response = client.post(
'/backend/api/sshkey',
data={
'type': 'remove',
'id': sshkey.id
},
content_type='application/json',
)
self.assertEqual(response.status_code, 200)
with self.assertRaises(models.SSHPublicKey.DoesNotExist):
user.ssh_keys.get(name='key_name')
class DeploymentsViewTest(TestCase):
@classmethod
def setUpTestData(cls):
test_models.setup_fixture()
group = Group(name='test_group')
group.save()
get_user().groups.add(group)
def test_error(self):
user = get_user()
client = get_client(user)
response = client.post(
'/backend/api/deployments',
)
self.assertEqual(response.status_code, 400)
self.assertEqual(response.json()['error'], 'malformed request')
def test_group_add_success(self):
user = get_user()
client = get_client(user)
group = Group.objects.get(name='test_group')
response = client.post(
'/backend/api/deployments',
data={
'type': 'add',
'group': group.id,
},
content_type='application/json',
)
self.assertEqual(response.status_code, 200)
LOGGER.debug('%s', response.json())
try:
dep = user.deployments.get(group=group)
self.assertEqual(dep.state_target, models.DEPLOYED)
except models.NewDeployment.DoesNotExist:
self.fail('the user did not get the group deployment')
def test_group_remove_success(self):
user = get_user()
client = get_client(user)
group = Group.objects.get(name='test_group')
response = client.post(
'/backend/api/deployments',
data={
'type': 'remove',
'group': group.id
},
content_type='application/json',
)
self.assertEqual(response.status_code, 200)
LOGGER.debug('%s', response.json())
try:
dep = user.deployments.get(group=group)
self.assertEqual(dep.state_target, models.NOT_DEPLOYED)
except models.NewDeployment.DoesNotExist:
self.fail('the user did not get the group deployment')
def test_service_add_success(self):
user = get_user()
client = get_client(user)
service = models.Service.objects.get(name='test_service')
response = client.post(
'/backend/api/deployments',
data={
'type': 'add',
'service': service.id,
},
content_type='application/json',
)
self.assertEqual(response.status_code, 200)
LOGGER.debug('%s', response.json())
try:
dep = user.deployments.get(service=service)
self.assertEqual(dep.state_target, models.DEPLOYED)
except models.NewDeployment.DoesNotExist:
self.fail('the user did not get the service deployment')
def test_service_remove_success(self):
user = get_user()
client = get_client(user)
service = models.Service.objects.get(name='test_service')
response = client.post(
'/backend/api/deployments',
data={
'type': 'remove',
'service': service.id
},
content_type='application/json',
)
self.assertEqual(response.status_code, 200)
LOGGER.debug('%s', response.json())
try:
dep = user.deployments.get(service=service)
self.assertEqual(dep.state_target, models.NOT_DEPLOYED)
except models.NewDeployment.DoesNotExist:
self.fail('the user did not get the service deployment')
......@@ -99,7 +99,7 @@ class SSHPublicKeyView(views.APIView):
).data
)
LOGGER.error('SSHPublicKeyView: malformed request %s', request)
LOGGER.error('SSHPublicKeyView: malformed request %s', request.data)
return _api_error_response("malformed request")
......@@ -107,21 +107,17 @@ class DeploymentView(views.APIView):
def post(self, request):
if (
'type' not in request.data or
'key' not in request.data or
(
'service' not in request.data and
'group' not in request.data
)
):
LOGGER.error('Deployment api got invalid request %s', request.data)
LOGGER.error("Deployment api got malformed request %s: request misses fields (should have: 'type', and ('service' or 'group'))", request.data)
return _api_error_response(
"request misses fields (should have: 'type', 'key', and ('service' or 'group'))"
'malformed request'
)
request_type = request.data['type']
request_key = models.SSHPublicKey.objects.get(
id=request.data['key'],
)
deployment = None
if 'group' in request.data:
......@@ -137,15 +133,15 @@ class DeploymentView(views.APIView):
if request_type == 'add':
deployment.deploy_key(request_key)
deployment.user_deploy()
elif request_type == 'remove':
deployment.remove_key(request_key)
deployment.user_remove()
else:
return _api_error_response("invalid value of field 'type'")
deployment.save()
return Response(
serializers.DeploymentSerializer(deployment).data
serializers.NewDeploymentSerializer(deployment).data
)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment