Coverage for backend/django/core/managers.py: 94%
122 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1from dataclasses import dataclass
2from django.db import models
3from authentication.user.models import User
4from core.auxiliary.enums.FlowsheetTemplateType import FlowsheetTemplateType
5from core.auxiliary.models.Flowsheet import Flowsheet
6from core.validation import get_current_flowsheet, cache_access_result
7from authentication.user.AccessTable import AccessTable
8from rest_framework.exceptions import PermissionDenied
9from typing import TypeVar
10t = TypeVar('t', bound=models.Model, covariant=True)
13@dataclass(frozen=True)
14class FlowsheetAccessState:
15 """
16 Normalized access result shared between the manager and view-layer helpers.
17 """
18 has_read_access: bool
19 has_write_access: bool
22def get_flowsheet_access(user: User, flowsheet_id: int) -> FlowsheetAccessState:
23 """
24 Compute read/write access for a user on a flowsheet and cache it in request context.
26 :param user: User object
27 :param flowsheet_id: Flowsheet ID to check access for
29 :return: read/write access state
30 """
31 direct_access = AccessTable.objects.filter(
32 user_id=user.id,
33 flowsheet_id=flowsheet_id
34 ).values("read_only").first()
35 user_has_direct_access = direct_access is not None
36 is_read_only_share = bool(direct_access["read_only"]) if direct_access else False
38 user_can_edit_template = False
40 # Check if this is a public flowsheet template that the user can edit (only admins can edit public templates)
41 if user.is_staff and not user_has_direct_access:
42 flowsheet: Flowsheet | None = Flowsheet.objects.filter(id=flowsheet_id).only("flowsheet_template_type").first()
43 user_can_edit_template = (
44 flowsheet is not None
45 and flowsheet.flowsheet_template_type == FlowsheetTemplateType.PublicTemplate
46 )
48 has_read_access = user_has_direct_access or user_can_edit_template
49 has_write_access = user_can_edit_template or (user_has_direct_access and not is_read_only_share)
51 ctx = get_current_flowsheet() or {}
52 if str(ctx.get("flowsheet")) == str(flowsheet_id):
53 # Reuse the computed values for the rest of the request so queryset-level
54 # guards do not need to repeat the same AccessTable lookup.
55 cache_access_result(has_read_access=has_read_access, has_write_access=has_write_access)
57 return FlowsheetAccessState(
58 has_read_access=has_read_access,
59 has_write_access=has_write_access,
60 )
63def has_flowsheet_read_access(user: User, flowsheet_id: int) -> bool:
64 """
65 Convenience helper for read access checks.
66 """
67 return get_flowsheet_access(user=user, flowsheet_id=flowsheet_id).has_read_access
70def has_flowsheet_write_access(user: User, flowsheet_id: int) -> bool:
71 """
72 Convenience helper for write access checks.
73 """
74 return get_flowsheet_access(user=user, flowsheet_id=flowsheet_id).has_write_access
77def _get_or_compute_access_state(ctx: dict) -> FlowsheetAccessState | None:
78 """
79 Read access state from the active request context, computing it lazily when
80 the manager/queryset is the first layer to ask for it.
81 """
82 flowsheet = ctx.get("flowsheet")
83 user = ctx.get("user")
85 if flowsheet is None or user is None:
86 return None
88 has_read_access = ctx.get("has_read_access")
89 has_write_access = ctx.get("has_write_access")
91 if has_read_access is None or has_write_access is None:
92 return get_flowsheet_access(user=user, flowsheet_id=flowsheet)
94 return FlowsheetAccessState(
95 has_read_access=has_read_access,
96 has_write_access=has_write_access,
97 )
100class AccessControlQuerySet(models.QuerySet):
101 """
102 QuerySet-level write guard for mutating bulk operations that bypass serializer logic.
103 """
104 def _require_write_access(self):
105 ctx = get_current_flowsheet() or {}
106 if ctx.get("bypass_write_checks"):
107 # Used by internal flows such as copy, where the caller has already
108 # validated the source flowsheet and is creating rows for a new owner.
109 return
111 access_state = _get_or_compute_access_state(ctx)
112 flowsheet = ctx.get("flowsheet")
113 user = ctx.get("user")
115 # This is only the case when tests use objects directly without going through a view.
116 if flowsheet is None or user is None:
117 return
119 if access_state is None or not access_state.has_write_access: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true
120 raise PermissionDenied("User does not have write access to this flowsheet.")
122 def update(self, **kwargs):
123 self._require_write_access()
124 return super().update(**kwargs)
126 def delete(self):
127 self._require_write_access()
128 return super().delete()
130 def bulk_update(self, objs, fields, batch_size=None):
131 self._require_write_access()
132 return super().bulk_update(objs, fields, batch_size=batch_size)
135class AccessControlManager(models.Manager.from_queryset(AccessControlQuerySet)):
136 """
137 Custom object manager to enforce access control based on the current flowsheet and user context.
138 This manager overrides the default queryset to filter objects based on the flowsheet.
140 By default, object modification methods (update, delete, etc.) will be scoped only to the
141 active flowsheet that the user has access to. Creation methods check explicitly if the user has
142 access to the flowsheet before allowing creation.
143 """
145 def __init__(self):
146 super().__init__()
147 self.flowsheet_related_name = "flowsheet"
149 def _require_write_access(self, ctx: dict):
150 if ctx.get("bypass_write_checks"): 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true
151 return
153 access_state = _get_or_compute_access_state(ctx)
154 if access_state is None: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true
155 return
156 if not access_state.has_write_access: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true
157 raise PermissionDenied("User does not have write access to this flowsheet.")
159 def create(self, **kwargs):
160 ctx = get_current_flowsheet() or {}
162 flowsheet = ctx.get("flowsheet")
163 user = ctx.get("user")
165 # This is only the case when tests uses the objects directly without going through a view
166 if flowsheet == None or user == None:
167 return super().create(**kwargs)
169 # Always pin the created object to the flowsheet from request context so
170 # callers cannot create rows under some other flowsheet id in the payload.
171 kwargs[f"{self.flowsheet_related_name}_id"] = flowsheet
172 self._require_write_access(ctx)
174 return super().create(**kwargs)
176 def bulk_create(self, objs, **kwargs):
177 ctx = get_current_flowsheet() or {}
179 flowsheet = ctx.get("flowsheet")
180 user = ctx.get("user")
182 # This is only the case when tests uses the objects directly without going through a view
183 if flowsheet == None or user == None:
184 return super().bulk_create(objs, **kwargs)
186 # Force each bulk-created object onto the active flowsheet for the same
187 # reason as create(): request context, not payload, is the source of truth.
188 for obj in objs:
189 setattr(obj, f"{self.flowsheet_related_name}_id", flowsheet)
191 self._require_write_access(ctx)
192 return super().bulk_create(objs, **kwargs)
194 def get_queryset(self):
195 """
196 Override the get_queryset method to filter objects based on the current flowsheet and user access.
197 """
198 qs = super().get_queryset()
199 ctx = get_current_flowsheet() or {}
201 flowsheet = ctx.get("flowsheet")
202 user = ctx.get("user")
203 write_intent = bool(ctx.get("write_intent"))
205 # This is only the case when tests uses the objects directly without going through a view
206 if flowsheet == None or user == None:
207 return qs
209 access_state = _get_or_compute_access_state(ctx)
210 if access_state is None: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true
211 return qs.none()
213 filter_kwargs = {f"{self.flowsheet_related_name}_id": flowsheet}
215 # For users with read access but no write access (read-only share),
216 # fail mutating viewset actions with an explicit 403.
217 if write_intent and access_state.has_read_access and not access_state.has_write_access:
218 raise PermissionDenied("This flowsheet is shared with read-only access.")
220 if access_state.has_read_access:
221 return qs.filter(**filter_kwargs)
222 else:
223 return qs.none()
225class SoftDeleteManager(AccessControlManager):
226 def __init__(self):
227 super().__init__()
228 def get_queryset(self):
229 # filter out deleted objects
230 return super().get_queryset().filter(is_deleted=False)
232 def include_deleted(self):
233 # include deleted objects
234 return super().get_queryset()
238def include_soft_deleted[t](objects: models.Manager[t]) -> models.QuerySet[t]:
239 """
240 In some operations, including copying a flowsheet,
241 we need to include the soft deleted objects
242 Example usage:
243 include_soft_deleted(SimulationObject.objects).filter(flowsheet_id=1)
244 """
245 if objects is not None and isinstance(objects, SoftDeleteManager):
246 return objects.include_deleted().all()
247 else:
248 return objects.all()