Coverage for backend/django/core/validation.py: 85%

147 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-03-26 20:57 +0000

1import contextvars 

2from contextlib import contextmanager 

3from django.apps import apps 

4from functools import wraps 

5from authentication.user.models import User 

6from core.viewset import ModelViewSet, ReadOnlyModelViewSet 

7from django.urls.resolvers import URLPattern, URLResolver 

8from rest_framework.exceptions import ValidationError 

9from rest_framework.request import Request 

10from rest_framework.viewsets import ModelViewSet as MDVS 

11from typing import TypedDict 

12 

13def sanitize_flowsheet_id(flowsheet_id): 

14 """ 

15 Ensure the client has provided a valid flowsheet id (positive integer) 

16 """ 

17 try: 

18 flowsheet_id = int(flowsheet_id) 

19 if not flowsheet_id: 

20 raise ValidationError("Invalid flowsheet id") 

21 if flowsheet_id < 1: 

22 raise ValidationError("Invalid flowsheet id") 

23 except: 

24 raise ValidationError("Invalid flowsheet id") 

25 

26 

27def api_view_validate(view_func): 

28 """ 

29 Decorator for every api_view to enforce access control 

30 """ 

31 

32 @wraps(view_func) 

33 def _wrapped_view(request: Request, *args, **kwargs): 

34 flowsheet_id = request.GET.get("flowsheet") 

35 user = request.user 

36 sanitize_flowsheet_id(flowsheet_id) 

37 

38 with flowsheet_context(flowsheet_id, user): 

39 return view_func(request, *args, **kwargs) 

40 _wrapped_view._is_api_view_validated = True 

41 return _wrapped_view 

42 

43def api_view_ignore_access_control(view_func): 

44 """ 

45 Decorator for api_view to ignore access control. This is not for 

46 general use. Attach only to internal endpoints/handlers (such as 

47 for Dapr to invoke). 

48 """ 

49 

50 @wraps(view_func) 

51 def _wrapped_view(request: Request, *args, **kwargs): 

52 return view_func(request, *args, **kwargs) 

53 

54 _wrapped_view.ignore_access_control = True 

55 return _wrapped_view 

56 

57 

58 

59################## Context manager ################## 

60class FlowsheetContext(TypedDict): 

61 flowsheet: int 

62 user: User 

63 has_access: bool | None 

64 

65flowsheet_ctx = contextvars.ContextVar[FlowsheetContext | None]("flowsheet", default=None) 

66 

67@contextmanager 

68def flowsheet_context(flowsheet: int, user: User): 

69 """ 

70 Sanitize and inject flowsheet and user id as context for each view 

71 """ 

72 

73 data = FlowsheetContext({ 

74 "flowsheet": flowsheet, 

75 "user": user, 

76 "has_access": None 

77 }) 

78 token = flowsheet_ctx.set(data) 

79 

80 try: 

81 # Allow the view to execute with the context set 

82 yield 

83 finally: 

84 try: 

85 flowsheet_ctx.reset(token) 

86 except ValueError: 

87 # This is to reset the context if the server throws an error while processing the request 

88 # So that it doesn't leak to the next request 

89 flowsheet_ctx.set(None) 

90 

91def get_current_flowsheet(): 

92 """ 

93 Get the current flowsheet and user id from the context in format 

94 { 

95 "flowsheet": flowsheet_id, 

96 "user": User, 

97 "has_access": has_access 

98 } 

99 """ 

100 return flowsheet_ctx.get() 

101 

102 

103def cache_result(has_access: bool = False): 

104 """ 

105 Cache the result of the flowsheet context 

106 """ 

107 data = flowsheet_ctx.get() 

108 data["has_access"] = has_access 

109 flowsheet_ctx.set(data) 

110 

111################## Router and urlpattern validation ################## 

112 

113 

114def validate_router(router): 

115 """ 

116 Validate that every registered viewset is a subclass of ModelViewSet 

117 """ 

118 exclude_list = ["flowsheets", "flowsheetTemplates", "compounds"] 

119 viewsets = [ModelViewSet, ReadOnlyModelViewSet] 

120 for (prefix, viewset, basename) in router.registry: 

121 if prefix in exclude_list: 

122 continue 

123 

124 if not any(issubclass(viewset, vs) for vs in viewsets): 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true

125 raise Exception(f"ModelViewSet (from core.viewset) is not being inherited at {prefix}!") 

126 

127 if getattr(viewset, "get_queryset", None) == getattr(MDVS, "get_queryset"): 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true

128 raise Exception(f"get_queryset is not being overridden at {prefix}! Please override get_queryset method to provide the queryset") 

129 

130 if getattr(viewset, "queryset", None) != None: 130 ↛ 131line 130 didn't jump to line 131 because the condition on line 130 was never true

131 raise Exception(f"Please remove queryset from {prefix} viewset and create a get_queryset method instead. \ 

132 Avoid creating the queryset attribute to enforce access control.") 

133 

134 

135def validate_urlpatterns(urlpatterns): 

136 """ 

137 Validate that every view in urlpatterns is decorated with api_view_validate 

138 """ 

139 all_views = extract_views_from_urlpatterns(urlpatterns) 

140 for path, view in all_views: 

141 if not hasattr(view, '_is_api_view_validated'): 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true

142 raise Exception(f"api_view_validate decorator (from core.validation) is not being used at {path}!!!") 

143 

144 

145def extract_views_from_urlpatterns(urlpatterns, base_path=''): 

146 """ 

147 Recursively extracts views from urlpatterns. 

148 """ 

149 views = [] 

150 for pattern in urlpatterns: 

151 if isinstance(pattern, URLPattern): 151 ↛ 154line 151 didn't jump to line 154 because the condition on line 151 was always true

152 path = base_path + str(pattern.pattern) 

153 views.append((path, pattern.callback)) 

154 elif isinstance(pattern, URLResolver): # nested patterns (like routers) 

155 nested_path = base_path + str(pattern.pattern) 

156 views.extend(extract_views_from_urlpatterns( 

157 pattern.url_patterns, nested_path)) 

158 return views 

159 

160 

161# Check that all models have flowsheet attribute and use AccessControlManager or SoftDeleteManager 

162def validate_models(): 

163 from core.managers import AccessControlManager, SoftDeleteManager 

164 exclude_models = [ 

165 'User', 

166 'Permission', 

167 'Group', 

168 'ContentType', 

169 'Flowsheet', 

170 'AccessTable', 

171 'Session', 

172 'TaskMeta' 

173 ] 

174 

175 models = apps.get_models() 

176 

177 

178 for model in models: 

179 model_name = model.__name__ 

180 module_name = model.__module__ 

181 # Skip silk models 

182 if module_name.startswith('silk'): 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true

183 continue 

184 objects = model.objects 

185 if model_name in exclude_models: 

186 continue 

187 

188 if 'flowsheetOwner' in [field.name for field in model._meta.get_fields()]: 188 ↛ 189line 188 didn't jump to line 189 because the condition on line 188 was never true

189 raise ValueError("To enforce access control, `flowsheetOwner` should not be used. Please rename to `flowsheet` instead") 

190 

191 has_flowsheet = False 

192 

193 assert ( 

194 isinstance(objects, (AccessControlManager, SoftDeleteManager)) 

195 ), f"Model {model_name} doesn't have AccessControlManager or SoftDeleteManager (from core.managers). Either is required to enforce access control." 

196 

197 # Check if the object has a flowsheet or flowsheet attribute 

198 if hasattr(model, 'flowsheet'): 198 ↛ 201line 198 didn't jump to line 201 because the condition on line 198 was always true

199 has_flowsheet = True 

200 

201 if hasattr(model, 'flowsheetOwner'): 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 raise ValueError("To enforce access control, `flowsheetOwner` should not be used. Please rename to `flowsheet` instead") 

203 

204 assert has_flowsheet, f"Models should have flowsheet or flowsheet attribute for handling access control, model: {model_name} doesn't" 

205 

206def validate_routers(): 

207 import os 

208 import ast 

209 

210 IGNORED_DIRS = {"site-packages"} 

211 IGNORED_FILES = { 

212 os.path.normpath("./authentication/routers.py") 

213 } 

214 

215 def is_ignored(path: str) -> bool: 

216 norm = os.path.normpath(path) 

217 parts = norm.split(os.sep) 

218 

219 # Ignore specific directories 

220 if any(part in IGNORED_DIRS for part in parts): 

221 return True 

222 

223 # Ignore specific file paths 

224 if norm in IGNORED_FILES: 

225 return True 

226 

227 return False 

228 

229 issues = [] 

230 

231 for root, _, files in os.walk("./"): 

232 if is_ignored(root): 

233 continue 

234 

235 for file in files: 

236 if file == "routers.py": 

237 path = os.path.join(root, file) 

238 

239 if is_ignored(path): 

240 continue 

241 

242 with open(path, "r", encoding="utf-8") as f: 

243 source = f.read() 

244 

245 tree = ast.parse(source) 

246 

247 found = False 

248 

249 # inspect AST for validate_router(...) calls 

250 for node in ast.walk(tree): 250 ↛ 266line 250 didn't jump to line 266 because the loop on line 250 didn't complete

251 if isinstance(node, ast.Call): 

252 func = node.func 

253 

254 # Extract function name 

255 if isinstance(func, ast.Name): 

256 name = func.id 

257 elif isinstance(func, ast.Attribute): 257 ↛ 260line 257 didn't jump to line 260 because the condition on line 257 was always true

258 name = func.attr 

259 else: 

260 continue 

261 

262 if name == "validate_router": 

263 found = True 

264 break 

265 

266 if not found: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true

267 issues.append(path) 

268 

269 

270 if issues: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true

271 issue_list = "\n".join(issues) 

272 raise Exception(f"The following routers.py files are missing validate_router(...) calls:\n{issue_list}")