import logging import json from urllib.request import Request, urlopen from django.db import models as db_models from django.core.exceptions import ImproperlyConfigured from django_mysql.models import JSONField from oic.oic import Client from oic.oic.message import RegistrationResponse from oic.utils.authn.client import CLIENT_AUTHN_METHOD from .. import utils LOGGER = logging.getLogger(__name__) OIDC_CLIENT = {} def scopes_default(): return [ 'openid', 'profile', 'email', 'credentials', 'eduperson_entitlement', ] class OIDCConfig(db_models.Model): client_id = db_models.CharField(max_length=200) client_secret = db_models.CharField(max_length=200) redirect_uri = db_models.CharField(max_length=200) issuer_uri = db_models.CharField(max_length=200) enabled = db_models.BooleanField(default=False) name = db_models.CharField(max_length=200) # scopes as a list of strings scopes = JSONField( default=scopes_default, editable=True, help_text='The scopes we request when requesting user infos', ) # ENTITLEMENT CHANGES # path in the group tree to the VO Groups # can be empty if we use the root vo_subtree_path = db_models.CharField( max_length=200, blank=True, null=True, help_text='If not emtpy: Operate with groups of the described subtree of group (or entitlements). For example: Let\'s say the groups [/,/foo,/bar] exist and you set vo_subtree_path to "/". In that case the VO-Groups would be /foo and /bar', ) # If True we shall ignore subgroups of the VO-Groups # (VO-Group are the group on the path described by subtree_path) ignore_subgroups = db_models.BooleanField( default=False, help_text='Ignore subgroups of VO describing groups. E.g. ignores the group :foo:bar if :foo exists.', ) # The field in the userinfo (served by this IdP) that describes groups of the user userinfo_field_groups = db_models.CharField( max_length=200, help_text="The field in the userinfo (served by this IdP) that contains groups of the user. Leave blank if you don't want to use groups of this IdP", default=None, blank=True, null=True, ) # The field in the userinfo (served by this IdP) that describes entitlements of the user userinfo_field_entitlements = db_models.CharField( max_length=200, help_text="The field in the userinfo (served by this IdP) that contains entitlements of the user. Leave blank if you don't want to use entitlements of this IdP", default=None, blank=True, null=True, ) @property def registration_response(self): info = { 'client_id': self.client_id, 'client_secret': self.client_secret } return RegistrationResponse(**info) @property def oidc_client(self): # create the client object if does no yet exist if self.id not in OIDC_CLIENT: new_oidc_client = Client(client_authn_method=CLIENT_AUTHN_METHOD) new_oidc_client.provider_config(self.issuer_uri) new_oidc_client.store_registration_info(self.registration_response) OIDC_CLIENT[self.id] = new_oidc_client return OIDC_CLIENT[self.id] @property def provider_info(self): return self.oidc_client.provider_info def __str__(self): return self.name def get_auth_request(self, client, state): args = { 'client_id': self.client_id, 'response_type': 'code', 'scope': self.scopes, 'redirect_uri': self.redirect_uri, 'state': state, } auth_req = client.construct_AuthorizationRequest( request_args=args, ) return auth_req.request(client.authorization_endpoint) def default_idp(): available_idps = OIDCConfig.objects.filter(enabled=True) if not available_idps.exists(): raise ImproperlyConfigured('No OIDCConfig available') return available_idps.first() class OIDCTokenAuthBackend(object): AuthException = Exception('Unable to authenticate user') def get_userinfo(self, oidc_client, access_token=None): user_info = None if access_token is not None: req = Request( oidc_client.provider_info['userinfo_endpoint'], ) auth = ('Bearer ' + access_token) req.add_header('Authorization', auth) userinfo_bytes = urlopen(req).read() user_info = json.loads(userinfo_bytes.decode('UTF-8')) user_info['iss'] = oidc_client.provider_info['issuer'] else: LOGGER.error('Invalid parameters for get_userinfo') # LOGGER.debug('Got user info:\n%s\n', user_info) return user_info def authenticate(self, request, token=None, issuer_uri=None): from ....models.users import User if token is None: return None idp_id = utils.get_session(request, 'idp_id', None) oidc_client = None try: if issuer_uri is not None: LOGGER.debug('Attempting to find IdP %s', issuer_uri) oidc_client = OIDCConfig.objects.get(issuer_uri=issuer_uri) elif idp_id is not None: oidc_client = OIDCConfig.objects.get(id=idp_id) if oidc_client is None: LOGGER.error('Unable to determine IdP for authentication') return None except OIDCConfig.DoesNotExist: LOGGER.error('Unable to determine IdP for authentication') return None # get the user info from the idp userinfo = self.get_userinfo( oidc_client, access_token=token, ) return User.get_user( userinfo, oidc_client, ) def get_user(self, user_id): from ....models.users import User query = User.objects.filter( user_type='oidcuser', pk=user_id ) if query.exists(): if len(query) == 1: return query.first() LOGGER.error( 'OIDCTokenAuthBackend: query for user id %s: %s results', user_id, len(query), ) return None LOGGER.error( 'OIDCTokenAuthBackend: query for user id %s: no results', user_id, ) return None