Coverage for backend/django/core/validation.py: 85%

142 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-12-18 04:00 +0000

1import contextvars 

2from contextlib import contextmanager 

3from django.apps import apps 

4from functools import wraps 

5from authentication.user.models import User 

6from core.viewset import ModelViewSet, ReadOnlyModelViewSet 

7from django.urls.resolvers import URLPattern, URLResolver 

8from rest_framework.exceptions import ValidationError 

9from rest_framework.request import Request 

10from rest_framework.viewsets import ModelViewSet as MDVS 

11 

12 

13def sanitize_flowsheet_id(flowsheet_id): 

14 """ 

15 Ensure the client has provided a valid flowsheet id (positive integer) 

16 """ 

17 try: 

18 flowsheet_id = int(flowsheet_id) 

19 if not flowsheet_id: 

20 raise ValidationError("Invalid flowsheet id") 

21 if flowsheet_id < 1: 

22 raise ValidationError("Invalid flowsheet id") 

23 except: 

24 raise ValidationError("Invalid flowsheet id") 

25 

26 

27def api_view_validate(view_func): 

28 """ 

29 Decorator for every api_view to enforce access control 

30 """ 

31 

32 @wraps(view_func) 

33 def _wrapped_view(request: Request, *args, **kwargs): 

34 flowsheet_id = request.GET.get("flowsheet") 

35 user = request.user 

36 sanitize_flowsheet_id(flowsheet_id) 

37 

38 with flowsheet_context(flowsheet_id, user): 

39 return view_func(request, *args, **kwargs) 

40 _wrapped_view._is_api_view_validated = True 

41 return _wrapped_view 

42 

43def api_view_ignore_access_control(view_func): 

44 """ 

45 Decorator for api_view to ignore access control. This is not for 

46 general use. Attach only to internal endpoints/handlers (such as 

47 for Dapr to invoke). 

48 """ 

49 

50 @wraps(view_func) 

51 def _wrapped_view(request: Request, *args, **kwargs): 

52 return view_func(request, *args, **kwargs) 

53 

54 _wrapped_view.ignore_access_control = True 

55 return _wrapped_view 

56 

57################## Context manager ################## 

58flowsheet_ctx = contextvars.ContextVar("flowsheet", default=None) 

59 

60@contextmanager 

61def flowsheet_context(flowsheet: int, user: User): 

62 """ 

63 Sanitize and inject flowsheet and user id as context for each view 

64 """ 

65 

66 data = { 

67 "flowsheet": flowsheet, 

68 "user": user, 

69 "has_access": None 

70 } 

71 token = flowsheet_ctx.set(data) 

72 

73 try: 

74 # Allow the view to execute with the context set 

75 yield 

76 finally: 

77 try: 

78 flowsheet_ctx.reset(token) 

79 except ValueError: 

80 # This is to reset the context if the server throws an error while processing the request 

81 # So that it doesn't leak to the next request 

82 flowsheet_ctx.set(None) 

83 

84def get_current_flowsheet(): 

85 """ 

86 Get the current flowsheet and user id from the context in format 

87 { 

88 "flowsheet": flowsheet_id, 

89 "user": User, 

90 "has_access": has_access 

91 } 

92 """ 

93 return flowsheet_ctx.get() 

94 

95 

96def cache_result(has_access: bool = False): 

97 """ 

98 Cache the result of the flowsheet context 

99 """ 

100 data = flowsheet_ctx.get() 

101 data["has_access"] = has_access 

102 flowsheet_ctx.set(data) 

103 

104################## Router and urlpattern validation ################## 

105 

106 

107def validate_router(router): 

108 """ 

109 Validate that every registered viewset is a subclass of ModelViewSet 

110 """ 

111 exclude_list = ["flowsheets", "flowsheetTemplates", "compounds"] 

112 viewsets = [ModelViewSet, ReadOnlyModelViewSet] 

113 for (prefix, viewset, basename) in router.registry: 

114 if prefix in exclude_list: 

115 continue 

116 

117 if not any(issubclass(viewset, vs) for vs in viewsets): 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true

118 raise Exception(f"ModelViewSet (from core.viewset) is not being inherited at {prefix}!") 

119 

120 if getattr(viewset, "get_queryset", None) == getattr(MDVS, "get_queryset"): 120 ↛ 121line 120 didn't jump to line 121 because the condition on line 120 was never true

121 raise Exception(f"get_queryset is not being overridden at {prefix}! Please override get_queryset method to provide the queryset") 

122 

123 if getattr(viewset, "queryset", None) != None: 123 ↛ 124line 123 didn't jump to line 124 because the condition on line 123 was never true

124 raise Exception(f"Please remove queryset from {prefix} viewset and create a get_queryset method instead. \ 

125 Avoid creating the queryset attribute to enforce access control.") 

126 

127 

128def validate_urlpatterns(urlpatterns): 

129 """ 

130 Validate that every view in urlpatterns is decorated with api_view_validate 

131 """ 

132 all_views = extract_views_from_urlpatterns(urlpatterns) 

133 for path, view in all_views: 

134 if not hasattr(view, '_is_api_view_validated'): 134 ↛ 135line 134 didn't jump to line 135 because the condition on line 134 was never true

135 raise Exception(f"api_view_validate decorator (from core.validation) is not being used at {path}!!!") 

136 

137 

138def extract_views_from_urlpatterns(urlpatterns, base_path=''): 

139 """ 

140 Recursively extracts views from urlpatterns. 

141 """ 

142 views = [] 

143 for pattern in urlpatterns: 

144 if isinstance(pattern, URLPattern): 144 ↛ 147line 144 didn't jump to line 147 because the condition on line 144 was always true

145 path = base_path + str(pattern.pattern) 

146 views.append((path, pattern.callback)) 

147 elif isinstance(pattern, URLResolver): # nested patterns (like routers) 

148 nested_path = base_path + str(pattern.pattern) 

149 views.extend(extract_views_from_urlpatterns( 

150 pattern.url_patterns, nested_path)) 

151 return views 

152 

153 

154# Check that all models have flowsheet attribute and use AccessControlManager or SoftDeleteManager 

155def validate_models(): 

156 from core.managers import AccessControlManager, SoftDeleteManager 

157 exclude_models = [ 

158 'User', 

159 'Permission', 

160 'Group', 

161 'ContentType', 

162 'Flowsheet', 

163 'AccessTable', 

164 'Session', 

165 'TaskMeta' 

166 ] 

167 

168 models = apps.get_models() 

169 

170 

171 for model in models: 

172 model_name = model.__name__ 

173 module_name = model.__module__ 

174 # Skip silk models 

175 if module_name.startswith('silk'): 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true

176 continue 

177 objects = model.objects 

178 if model_name in exclude_models: 

179 continue 

180 

181 if 'flowsheetOwner' in [field.name for field in model._meta.get_fields()]: 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true

182 raise ValueError("To enforce access control, `flowsheetOwner` should not be used. Please rename to `flowsheet` instead") 

183 

184 has_flowsheet = False 

185 

186 assert ( 

187 isinstance(objects, (AccessControlManager, SoftDeleteManager)) 

188 ), f"Model {model_name} doesn't have AccessControlManager or SoftDeleteManager (from core.managers). Either is required to enforce access control." 

189 

190 # Check if the object has a flowsheet or flowsheet attribute 

191 if hasattr(model, 'flowsheet'): 191 ↛ 194line 191 didn't jump to line 194 because the condition on line 191 was always true

192 has_flowsheet = True 

193 

194 if hasattr(model, 'flowsheetOwner'): 194 ↛ 195line 194 didn't jump to line 195 because the condition on line 194 was never true

195 raise ValueError("To enforce access control, `flowsheetOwner` should not be used. Please rename to `flowsheet` instead") 

196 

197 assert has_flowsheet, f"Models should have flowsheet or flowsheet attribute for handling access control, model: {model_name} doesn't" 

198 

199def validate_routers(): 

200 import os 

201 import ast 

202 

203 IGNORED_DIRS = {"site-packages"} 

204 IGNORED_FILES = { 

205 os.path.normpath("./authentication/routers.py") 

206 } 

207 

208 def is_ignored(path: str) -> bool: 

209 norm = os.path.normpath(path) 

210 parts = norm.split(os.sep) 

211 

212 # Ignore specific directories 

213 if any(part in IGNORED_DIRS for part in parts): 

214 return True 

215 

216 # Ignore specific file paths 

217 if norm in IGNORED_FILES: 

218 return True 

219 

220 return False 

221 

222 issues = [] 

223 

224 for root, _, files in os.walk("./"): 

225 if is_ignored(root): 

226 continue 

227 

228 for file in files: 

229 if file == "routers.py": 

230 path = os.path.join(root, file) 

231 

232 if is_ignored(path): 

233 continue 

234 

235 with open(path, "r", encoding="utf-8") as f: 

236 source = f.read() 

237 

238 tree = ast.parse(source) 

239 

240 found = False 

241 

242 # inspect AST for validate_router(...) calls 

243 for node in ast.walk(tree): 243 ↛ 259line 243 didn't jump to line 259 because the loop on line 243 didn't complete

244 if isinstance(node, ast.Call): 

245 func = node.func 

246 

247 # Extract function name 

248 if isinstance(func, ast.Name): 

249 name = func.id 

250 elif isinstance(func, ast.Attribute): 250 ↛ 253line 250 didn't jump to line 253 because the condition on line 250 was always true

251 name = func.attr 

252 else: 

253 continue 

254 

255 if name == "validate_router": 

256 found = True 

257 break 

258 

259 if not found: 259 ↛ 260line 259 didn't jump to line 260 because the condition on line 259 was never true

260 issues.append(path) 

261 

262 

263 if issues: 263 ↛ 264line 263 didn't jump to line 264 because the condition on line 263 was never true

264 issue_list = "\n".join(issues) 

265 raise Exception(f"The following routers.py files are missing validate_router(...) calls:\n{issue_list}")