Coverage for backend/django/core/managers.py: 94%

122 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1from dataclasses import dataclass 

2from django.db import models 

3from authentication.user.models import User 

4from core.auxiliary.enums.FlowsheetTemplateType import FlowsheetTemplateType 

5from core.auxiliary.models.Flowsheet import Flowsheet 

6from core.validation import get_current_flowsheet, cache_access_result 

7from authentication.user.AccessTable import AccessTable 

8from rest_framework.exceptions import PermissionDenied 

9from typing import TypeVar 

10t = TypeVar('t', bound=models.Model, covariant=True) 

11 

12 

13@dataclass(frozen=True) 

14class FlowsheetAccessState: 

15 """ 

16 Normalized access result shared between the manager and view-layer helpers. 

17 """ 

18 has_read_access: bool 

19 has_write_access: bool 

20 

21 

22def get_flowsheet_access(user: User, flowsheet_id: int) -> FlowsheetAccessState: 

23 """ 

24 Compute read/write access for a user on a flowsheet and cache it in request context. 

25 

26 :param user: User object 

27 :param flowsheet_id: Flowsheet ID to check access for 

28 

29 :return: read/write access state 

30 """ 

31 direct_access = AccessTable.objects.filter( 

32 user_id=user.id, 

33 flowsheet_id=flowsheet_id 

34 ).values("read_only").first() 

35 user_has_direct_access = direct_access is not None 

36 is_read_only_share = bool(direct_access["read_only"]) if direct_access else False 

37 

38 user_can_edit_template = False 

39 

40 # Check if this is a public flowsheet template that the user can edit (only admins can edit public templates) 

41 if user.is_staff and not user_has_direct_access: 

42 flowsheet: Flowsheet | None = Flowsheet.objects.filter(id=flowsheet_id).only("flowsheet_template_type").first() 

43 user_can_edit_template = ( 

44 flowsheet is not None 

45 and flowsheet.flowsheet_template_type == FlowsheetTemplateType.PublicTemplate 

46 ) 

47 

48 has_read_access = user_has_direct_access or user_can_edit_template 

49 has_write_access = user_can_edit_template or (user_has_direct_access and not is_read_only_share) 

50 

51 ctx = get_current_flowsheet() or {} 

52 if str(ctx.get("flowsheet")) == str(flowsheet_id): 

53 # Reuse the computed values for the rest of the request so queryset-level 

54 # guards do not need to repeat the same AccessTable lookup. 

55 cache_access_result(has_read_access=has_read_access, has_write_access=has_write_access) 

56 

57 return FlowsheetAccessState( 

58 has_read_access=has_read_access, 

59 has_write_access=has_write_access, 

60 ) 

61 

62 

63def has_flowsheet_read_access(user: User, flowsheet_id: int) -> bool: 

64 """ 

65 Convenience helper for read access checks. 

66 """ 

67 return get_flowsheet_access(user=user, flowsheet_id=flowsheet_id).has_read_access 

68 

69 

70def has_flowsheet_write_access(user: User, flowsheet_id: int) -> bool: 

71 """ 

72 Convenience helper for write access checks. 

73 """ 

74 return get_flowsheet_access(user=user, flowsheet_id=flowsheet_id).has_write_access 

75 

76 

77def _get_or_compute_access_state(ctx: dict) -> FlowsheetAccessState | None: 

78 """ 

79 Read access state from the active request context, computing it lazily when 

80 the manager/queryset is the first layer to ask for it. 

81 """ 

82 flowsheet = ctx.get("flowsheet") 

83 user = ctx.get("user") 

84 

85 if flowsheet is None or user is None: 

86 return None 

87 

88 has_read_access = ctx.get("has_read_access") 

89 has_write_access = ctx.get("has_write_access") 

90 

91 if has_read_access is None or has_write_access is None: 

92 return get_flowsheet_access(user=user, flowsheet_id=flowsheet) 

93 

94 return FlowsheetAccessState( 

95 has_read_access=has_read_access, 

96 has_write_access=has_write_access, 

97 ) 

98 

99 

100class AccessControlQuerySet(models.QuerySet): 

101 """ 

102 QuerySet-level write guard for mutating bulk operations that bypass serializer logic. 

103 """ 

104 def _require_write_access(self): 

105 ctx = get_current_flowsheet() or {} 

106 if ctx.get("bypass_write_checks"): 

107 # Used by internal flows such as copy, where the caller has already 

108 # validated the source flowsheet and is creating rows for a new owner. 

109 return 

110 

111 access_state = _get_or_compute_access_state(ctx) 

112 flowsheet = ctx.get("flowsheet") 

113 user = ctx.get("user") 

114 

115 # This is only the case when tests use objects directly without going through a view. 

116 if flowsheet is None or user is None: 

117 return 

118 

119 if access_state is None or not access_state.has_write_access: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true

120 raise PermissionDenied("User does not have write access to this flowsheet.") 

121 

122 def update(self, **kwargs): 

123 self._require_write_access() 

124 return super().update(**kwargs) 

125 

126 def delete(self): 

127 self._require_write_access() 

128 return super().delete() 

129 

130 def bulk_update(self, objs, fields, batch_size=None): 

131 self._require_write_access() 

132 return super().bulk_update(objs, fields, batch_size=batch_size) 

133 

134 

135class AccessControlManager(models.Manager.from_queryset(AccessControlQuerySet)): 

136 """ 

137 Custom object manager to enforce access control based on the current flowsheet and user context. 

138 This manager overrides the default queryset to filter objects based on the flowsheet. 

139 

140 By default, object modification methods (update, delete, etc.) will be scoped only to the 

141 active flowsheet that the user has access to. Creation methods check explicitly if the user has 

142 access to the flowsheet before allowing creation. 

143 """ 

144 

145 def __init__(self): 

146 super().__init__() 

147 self.flowsheet_related_name = "flowsheet" 

148 

149 def _require_write_access(self, ctx: dict): 

150 if ctx.get("bypass_write_checks"): 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true

151 return 

152 

153 access_state = _get_or_compute_access_state(ctx) 

154 if access_state is None: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true

155 return 

156 if not access_state.has_write_access: 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true

157 raise PermissionDenied("User does not have write access to this flowsheet.") 

158 

159 def create(self, **kwargs): 

160 ctx = get_current_flowsheet() or {} 

161 

162 flowsheet = ctx.get("flowsheet") 

163 user = ctx.get("user") 

164 

165 # This is only the case when tests uses the objects directly without going through a view 

166 if flowsheet == None or user == None: 

167 return super().create(**kwargs) 

168 

169 # Always pin the created object to the flowsheet from request context so 

170 # callers cannot create rows under some other flowsheet id in the payload. 

171 kwargs[f"{self.flowsheet_related_name}_id"] = flowsheet 

172 self._require_write_access(ctx) 

173 

174 return super().create(**kwargs) 

175 

176 def bulk_create(self, objs, **kwargs): 

177 ctx = get_current_flowsheet() or {} 

178 

179 flowsheet = ctx.get("flowsheet") 

180 user = ctx.get("user") 

181 

182 # This is only the case when tests uses the objects directly without going through a view 

183 if flowsheet == None or user == None: 

184 return super().bulk_create(objs, **kwargs) 

185 

186 # Force each bulk-created object onto the active flowsheet for the same 

187 # reason as create(): request context, not payload, is the source of truth. 

188 for obj in objs: 

189 setattr(obj, f"{self.flowsheet_related_name}_id", flowsheet) 

190 

191 self._require_write_access(ctx) 

192 return super().bulk_create(objs, **kwargs) 

193 

194 def get_queryset(self): 

195 """ 

196 Override the get_queryset method to filter objects based on the current flowsheet and user access. 

197 """ 

198 qs = super().get_queryset() 

199 ctx = get_current_flowsheet() or {} 

200 

201 flowsheet = ctx.get("flowsheet") 

202 user = ctx.get("user") 

203 write_intent = bool(ctx.get("write_intent")) 

204 

205 # This is only the case when tests uses the objects directly without going through a view 

206 if flowsheet == None or user == None: 

207 return qs 

208 

209 access_state = _get_or_compute_access_state(ctx) 

210 if access_state is None: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true

211 return qs.none() 

212 

213 filter_kwargs = {f"{self.flowsheet_related_name}_id": flowsheet} 

214 

215 # For users with read access but no write access (read-only share), 

216 # fail mutating viewset actions with an explicit 403. 

217 if write_intent and access_state.has_read_access and not access_state.has_write_access: 

218 raise PermissionDenied("This flowsheet is shared with read-only access.") 

219 

220 if access_state.has_read_access: 

221 return qs.filter(**filter_kwargs) 

222 else: 

223 return qs.none() 

224 

225class SoftDeleteManager(AccessControlManager): 

226 def __init__(self): 

227 super().__init__() 

228 def get_queryset(self): 

229 # filter out deleted objects 

230 return super().get_queryset().filter(is_deleted=False) 

231 

232 def include_deleted(self): 

233 # include deleted objects 

234 return super().get_queryset() 

235 

236 

237 

238def include_soft_deleted[t](objects: models.Manager[t]) -> models.QuerySet[t]: 

239 """ 

240 In some operations, including copying a flowsheet, 

241 we need to include the soft deleted objects 

242 Example usage: 

243 include_soft_deleted(SimulationObject.objects).filter(flowsheet_id=1) 

244 """ 

245 if objects is not None and isinstance(objects, SoftDeleteManager): 

246 return objects.include_deleted().all() 

247 else: 

248 return objects.all() 

249 

250