Coverage for backend/django/core/auxiliary/views/SolveView.py: 87%

145 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1import logging 

2import traceback 

3from django.views.decorators.csrf import csrf_exempt 

4from pydantic import ValidationError as PydanticValidationError 

5from authentication.custom_drf_authentication import DaprApiTokenAuthentication 

6from common.models.idaes.payloads.solve_request_schema import ( 

7 IdaesSolveCompletionEvent, 

8 DispatchMultiSolveEvent, 

9) 

10from common.models.idaes.payloads.build_state_request_schema import ( 

11 BuildStateCompletionEvent, 

12 BuildStateRequestEvent, 

13) 

14from common.models.solve_completion_email import SolveCompletionEmailRequestEvent 

15from core.auxiliary.serializers import TaskSerializer 

16from idaes_factory import endpoints 

17from pgraph_factory.pg_sheet import PgProcess 

18from drf_spectacular.utils import extend_schema 

19from rest_framework.response import Response 

20from rest_framework.decorators import api_view, authentication_classes, permission_classes 

21from rest_framework.exceptions import NotFound 

22from rest_framework.permissions import IsAuthenticated 

23from rest_framework import serializers, status 

24from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

25from core.auxiliary.models.Flowsheet import Flowsheet 

26from core.auxiliary.models import Task 

27from idaes_factory.endpoints import ( 

28 cancel_idaes_solve, 

29 process_idaes_solve_response, 

30 start_flowsheet_solve_event, 

31 start_multi_steady_state_solve_event, 

32 process_failed_idaes_solve_response, 

33 process_build_state_response, 

34 process_build_state_request_dead_letter, 

35) 

36from idaes_factory.idaes_factory_context import LiveSolveParams 

37from core.auxiliary.services.solve_completion_email import deliver_solve_completion_email 

38from core.validation import api_view_ignore_access_control, api_view_validate 

39from core.auxiliary.models.Scenario import Scenario, ScenarioTabTypeEnum 

40from core.auxiliary.models.Task import Task 

41from core.managers import get_flowsheet_access, has_flowsheet_write_access 

42 

43logger = logging.getLogger(__name__) 

44 

45 

46class SolveRequestSerializer(serializers.Serializer): 

47 group_id = serializers.IntegerField(required=True) 

48 debug = serializers.BooleanField(required=False) 

49 require_variables_fixed = serializers.BooleanField(required=False) 

50 scenario_number = serializers.IntegerField(required=False, allow_null=True) 

51 perform_diagnostics = serializers.BooleanField( 

52 required=False, default=False 

53 ) # currently, this doesn't do anything on MSS solves. 

54 

55 

56def create_error(message, cause) -> Response: 

57 """Build a standardised error response payload for solve requests. 

58 

59 Args: 

60 message: Human-readable description of the failure. 

61 cause: Short identifier describing which phase failed. 

62 

63 Returns: 

64 REST response with a 400 status code and diagnostic metadata. 

65 """ 

66 return Response( 

67 status=400, 

68 data={ 

69 "status": "error", 

70 "error": { 

71 "message": message, 

72 "cause": cause, 

73 "traceback": traceback.format_exc(), 

74 }, 

75 "log": None, 

76 "debug": {"input_flowsheet": None, "output_flowsheet": None, "timing": {}}, 

77 }, 

78 ) 

79 

80 

81@api_view_validate 

82@extend_schema(request=SolveRequestSerializer, responses=TaskSerializer) 

83@api_view(["POST"]) 

84def solve_idaes(request) -> Response: 

85 """Dispatch a solve request to either IDAES or the process-graph solver.""" 

86 # Validate the request data 

87 try: 

88 serializer = SolveRequestSerializer(data=request.data) 

89 serializer.is_valid(raise_exception=True) 

90 validated_data = serializer.validated_data 

91 

92 flowsheet_id = request.query_params.get('flowsheet') 

93 group_id = validated_data.get('group_id') 

94 scenario_number: int = validated_data.get('scenario_number') 

95 perform_diagnostics: bool = validated_data.get( 

96 'perform_diagnostics', False) 

97 

98 except Exception as e: 

99 return create_error("Invalid request data", "validation") 

100 

101 # Create the factory 

102 # This is where the flowsheet should be loaded from the database 

103 try: 

104 access_state = get_flowsheet_access(request.user, flowsheet_id) 

105 if access_state.has_read_access and not access_state.has_write_access: 

106 return Response( 

107 {"error": "This flowsheet is shared with read-only access."}, 

108 status=status.HTTP_403_FORBIDDEN, 

109 ) 

110 

111 # get the optimisation that matches the flowsheet 

112 scenario = Scenario.objects.filter(id=scenario_number).first() 

113 

114 if scenario and scenario.state_name == ScenarioTabTypeEnum.MultiSteadyState: 

115 return start_multi_steady_state_solve_event( 

116 flowsheet_id, request.user, scenario 

117 ) 

118 

119 # TODO: Start using is_optimization to determine if to use the optimisation or not. 

120 # Stop sending multiple optimisations to the solver, just the scenario one. 

121 

122 # Check the existence of a decision node 

123 if ( 123 ↛ 132line 123 didn't jump to line 132 because the condition on line 123 was always true

124 SimulationObject.objects.filter( 

125 objectType="decisionNode", flowsheet=flowsheet_id 

126 ).count() 

127 == 0 

128 ): 

129 # Run as normal 

130 return start_flowsheet_solve_event(flowsheet_id, group_id, request.user, scenario, perform_diagnostics=perform_diagnostics) 

131 else: 

132 pgraph_factory = PgProcess(flowsheet_id) 

133 pgraph_factory.solve() 

134 pgraph_factory.create_process_paths() 

135 

136 return Response( 

137 status=200, 

138 data=[ 

139 [block.componentName for block in solution] 

140 for solution in pgraph_factory.solutions 

141 ], 

142 ) 

143 except Exception as e: 

144 return create_error(str(e), "idaes_factory_run") 

145 

146 

147@extend_schema(exclude=True) 

148@api_view(["POST"]) 

149@authentication_classes([DaprApiTokenAuthentication]) 

150@permission_classes([IsAuthenticated]) 

151@csrf_exempt 

152def process_idaes_solve_completion_event(request) -> Response: 

153 """Handle a solve completion event (sent by Dapr) from the IDAES service.""" 

154 solve_response = IdaesSolveCompletionEvent.model_validate(request.data) 

155 solve_data = solve_response.data 

156 

157 process_idaes_solve_response(solve_data) 

158 

159 return Response(status=200) 

160 

161 

162@extend_schema(exclude=True) 

163@api_view(["POST"]) 

164@authentication_classes([DaprApiTokenAuthentication]) 

165@permission_classes([IsAuthenticated]) 

166@csrf_exempt 

167def process_failed_idaes_solve_event(request) -> Response: 

168 """ 

169 This endpoint is used to process solve completion events that were not received or processed 

170 by Django correctly. Errors could be due to crashes, reaching the message TTL, concurrency issues, etc. 

171 This will allow unprocessed solve tasks to be marked as failed and notify the user. 

172 """ 

173 solve_response = IdaesSolveCompletionEvent.model_validate(request.data) 

174 solve_data = solve_response.data 

175 

176 process_failed_idaes_solve_response(solve_data) 

177 

178 return Response(status=200) 

179 

180 

181@extend_schema(exclude=True) 

182@api_view(["POST"]) 

183@authentication_classes([DaprApiTokenAuthentication]) 

184@permission_classes([IsAuthenticated]) 

185@csrf_exempt 

186def process_build_state_response_event(request) -> Response: 

187 """Handle Dapr-delivered build-state completion events from IDAES.""" 

188 build_state_response = BuildStateCompletionEvent.model_validate(request.data) 

189 process_build_state_response(build_state_response.data) 

190 

191 return Response(status=200) 

192 

193 

194@extend_schema(exclude=True) 

195@api_view(["POST"]) 

196@authentication_classes([DaprApiTokenAuthentication]) 

197@permission_classes([IsAuthenticated]) 

198@csrf_exempt 

199def process_build_state_request_dead_letter_event(request) -> Response: 

200 """Handle dead-lettered build-state requests that never reached IDAES.""" 

201 try: 

202 build_state_request = BuildStateRequestEvent.model_validate(request.data) 

203 except PydanticValidationError: 

204 logger.warning( 

205 "Discarding malformed build-state request dead-letter event.", 

206 exc_info=True, 

207 ) 

208 return Response(status=200) 

209 

210 process_build_state_request_dead_letter(build_state_request.data) 

211 return Response(status=200) 

212 

213 

214@extend_schema(exclude=True) 

215@api_view(["POST"]) 

216@authentication_classes([DaprApiTokenAuthentication]) 

217@permission_classes([IsAuthenticated]) 

218@csrf_exempt 

219def process_dispatch_multi_solve(request) -> Response: 

220 """ 

221 This endpoint is used to process dispatch multi-solve events sent via the primary 

222 solve endpoint when the scenario is a multi-steady state scenario. 

223 """ 

224 

225 dispatch_request = DispatchMultiSolveEvent.model_validate(request.data) 

226 multi_solve_payload = dispatch_request.data 

227 

228 endpoints.dispatch_multi_solves( 

229 multi_solve_payload.task_id, multi_solve_payload.scenario_id 

230 ) 

231 

232 return Response(status=200) 

233 

234 

235@extend_schema(exclude=True) 

236@api_view(["POST"]) 

237@authentication_classes([DaprApiTokenAuthentication]) 

238@permission_classes([IsAuthenticated]) 

239@csrf_exempt 

240@api_view_ignore_access_control 

241def process_solve_completion_email_event(request) -> Response: 

242 """Handle Dapr-delivered solve completion email requests.""" 

243 email_event = SolveCompletionEmailRequestEvent.model_validate(request.data) 

244 deliver_solve_completion_email(email_event.data) 

245 return Response(status=200) 

246 

247 

248class CancelTaskRequestSerializer(serializers.Serializer): 

249 task_id = serializers.IntegerField() 

250 

251 

252@extend_schema(request=CancelTaskRequestSerializer) 

253@api_view_validate 

254@api_view(["POST"]) 

255def cancel_idaes_solve_handler(request) -> Response: 

256 """Accept a client request to cancel a pending or running solve task.""" 

257 cancel_request_serializer = CancelTaskRequestSerializer(data=request.data) 

258 cancel_request_serializer.is_valid(raise_exception=True) 

259 cancel_request = cancel_request_serializer.validated_data 

260 

261 task_id = cancel_request.get("task_id") 

262 

263 flowsheet_id = int(request.query_params.get("flowsheet")) 

264 if not has_flowsheet_write_access(request.user, flowsheet_id): 264 ↛ 265line 264 didn't jump to line 265 because the condition on line 264 was never true

265 return Response( 

266 {"error": "You do not have write access to this flowsheet."}, 

267 status=status.HTTP_403_FORBIDDEN, 

268 ) 

269 

270 task = ( 

271 Task.objects 

272 .filter(id=task_id, flowsheet_id=flowsheet_id) 

273 .first() 

274 ) 

275 if task is None: 

276 return Response( 

277 {"error": "Task not found for this flowsheet."}, 

278 status=status.HTTP_404_NOT_FOUND, 

279 ) 

280 

281 # Need to add task_id query parameter 

282 cancel_idaes_solve(task_id) 

283 

284 return Response(status=200)