from ... import models from django.db.utils import OperationalError from django.core.exceptions import ObjectDoesNotExist from django.db import models as db_models from oic.oic import Client from oic.oic.message import RegistrationResponse from oic.utils.authn.client import CLIENT_AUTHN_METHOD from urllib.request import Request, urlopen import json import logging logger = logging.getLogger(__name__) oidc_client = {} class OIDCConfig(db_models.Model): client_id = db_models.CharField(max_length=200) client_secret = db_models.CharField(max_length=200) redirect_uri = db_models.CharField(max_length=200) issuer_uri = db_models.CharField(max_length=200) enabled = db_models.BooleanField(default=False) name = db_models.CharField(max_length=200) @property def registration_response(self): info = {"client_id": self.client_id, "client_secret": self.client_secret} return RegistrationResponse(**info) @property def oidc_client(self): global oidc_client if self.id not in oidc_client: new_oidc_client = Client(client_authn_method=CLIENT_AUTHN_METHOD) new_oidc_client.provider_config(self.issuer_uri) new_oidc_client.store_registration_info(self.registration_response) oidc_client[self.id] = new_oidc_client return oidc_client[self.id] @property def provider_info(self): return self.oidc_client.provider_info @property def name(self): return self.issuer_uri def get_auth_request(self, client, state): args = { 'client_id': self.client_id, 'response_type': 'code', 'scope': ['openid', 'profile', 'email'], 'redirect_uri': self.redirect_uri, 'state': state, } auth_req = client.construct_AuthorizationRequest( request_args=args) return auth_req.request(client.authorization_endpoint) def __str__(self): return self.name def default_idp(): return OIDCConfig.objects.filter(enabled=True).first() class OIDCTokenAuthBackend(object): def get_user_info(self, request, access_token, token_type='Bearer'): try: idp_id = request.session['idp_id'] except OperationalError: logger.error('OperationalError: Unable to get from session') raise oidc_config = OIDCConfig.objects.get(id=idp_id) q = Request(oidc_config.oidc_client.provider_info['userinfo_endpoint']) auth = (token_type + ' ' + access_token) q.add_header('Authorization', auth) userinfo_bytes = urlopen(q).read() return json.loads(userinfo_bytes.decode('UTF-8')) def authenticate(self, request, token=None): if token is None: return None user_info = self.get_user_info(request, token) try: user = models.User.objects.get( sub=user_info['sub']) return user except ObjectDoesNotExist: # if we do not know the user yet, we create him user = models.construct_user(user_info) user.save() return user return None def get_user(self, user_id): try: return models.User.objects.get(pk=user_id) except models.User.DoesNotExist: return None