Coverage for backend/authentication/remote_user_backend.py: 94%
51 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1import logging
2from typing import Any
4import jwt
5from django.conf import settings
6from django.contrib.auth.backends import RemoteUserBackend
7from authentication.user.models import User
10def _set_email_from_header(request, user: User) -> bool:
11 """Synchronise a user's email using the configured remote header.
13 Args:
14 request: Incoming request containing remote-user headers.
15 user: User instance being updated.
17 Returns:
18 True if the email field changed and should be persisted, otherwise False.
19 """
20 email = request.META.get(settings.REMOTE_USER_EMAIL_HEADER)
22 if not email:
23 logging.warning(f"Email header not found in request for user {user.get_username()}. User email will not be updated.")
24 return False
26 old_email = user.email
28 # Only update if the email has changed
29 if old_email != email:
30 user.email = email
31 return True
33 return False
35def _set_is_admin_from_header(request, user: User) -> bool:
36 """Update the user's staff flag based on remote group membership headers.
38 Args:
39 request: Incoming request containing group membership information.
40 user: User instance being updated.
42 Returns:
43 True if the staff flag changed, otherwise False.
44 """
45 groups_header_value: str = request.META.get(settings.REMOTE_USER_GROUPS_HEADER)
47 if not groups_header_value:
48 return False
50 groups = groups_header_value.split(",")
52 is_platform_admin = any(group == settings.PLATFORM_ADMINISTRATORS_GROUP for group in groups)
54 if user.is_staff != is_platform_admin: 54 ↛ 58line 54 didn't jump to line 58 because the condition on line 54 was always true
55 user.is_staff = is_platform_admin
56 return True
58 return False
60def _set_name_from_header(request, user: User) -> bool:
61 """Populate first and last names from the remote access token claims.
63 Args:
64 request: Incoming request containing the access token header.
65 user: User instance being updated.
67 Returns:
68 True if either the first or last name fields changed, otherwise False.
69 """
70 access_token: str = request.META.get(settings.REMOTE_USER_ACCESS_TOKEN_HEADER)
72 if not access_token:
73 return False
75 # The access token is passed by OAuth2 Proxy and can not be set directly by the user.
76 # This is safe from external manipulation. This is only vulnerable to internal cluster traffic.
77 # We should do public key verification in future.
78 access_token: dict[str, Any] = jwt.decode(access_token, options={"verify_signature": False})
80 if not access_token: 80 ↛ 81line 80 didn't jump to line 81 because the condition on line 80 was never true
81 return False
83 first_name = access_token.get("given_name")
84 last_name = access_token.get("family_name")
86 name_has_changed = False
88 if first_name and user.first_name != first_name:
89 user.first_name = first_name
90 name_has_changed = True
92 if last_name and user.last_name != last_name:
93 user.last_name = last_name
94 name_has_changed = True
96 return name_has_changed
99class RemoteUserBackendWithEmail(RemoteUserBackend):
100 """
101 A custom RemoteUserBackend that updates the user's email address from the email
102 in the request.
103 """
105 def configure_user(self, request, user: User, created=True) -> User:
106 """Apply remote-header derived fields to the newly authenticated user."""
107 email_changed = _set_email_from_header(request, user)
108 admin_role_has_changed = _set_is_admin_from_header(request, user)
109 name_has_changed = _set_name_from_header(request, user)
111 if email_changed or admin_role_has_changed or name_has_changed:
112 user.save()
114 return user