Coverage for backend/django/idaes_factory/endpoints.py: 86%

493 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1import logging 

2import os 

3from typing import TypedDict 

4from common.models.general import TaskPayload 

5import requests 

6import traceback 

7 

8from django.db import IntegrityError, transaction 

9from django.utils import timezone 

10from opentelemetry import trace 

11from rest_framework.response import Response 

12from pydantic import JsonValue 

13 

14from CoreRoot import settings 

15from authentication.user.models import User 

16from ahuora_builder_types.payloads.build_state_request_schema import ( 

17 BuildStateCompletionPayload, 

18 BuildStateRequestContext, 

19 BuildStateRequestSchema, 

20) 

21from ahuora_builder_types.payloads.solve_request_schema import IdaesSolveRequestPayload, IdaesSolveCompletionPayload, \ 

22 MultiSolvePayload 

23from common.models.idaes.payloads.solve_request_schema import CompletionStatus 

24from ahuora_builder_types.payloads.ml_request_schema import MLTrainRequestPayload, MLTrainingCompletionPayload 

25from core.auxiliary.models.MLModel import MLModel 

26from core.auxiliary.models.UploadSession import UploadSessionPurpose 

27from common.models.notifications.payloads import ( 

28 BuildStateCompletedPayload, 

29 TaskCompletedPayload, 

30 NotificationServiceMessageType, 

31 NotificationServiceMessage, 

32) 

33from core.auxiliary.enums.generalEnums import TaskStatus 

34from core.auxiliary.models.Task import Task, TaskType 

35from core.auxiliary.models.BuildStateRequestVersion import BuildStateRequestVersion 

36from core.auxiliary.models.PropertySet import PropertySet 

37from core.auxiliary.models.PropertyValue import PropertyValue 

38from core.auxiliary.models.Flowsheet import Flowsheet 

39from core.auxiliary.serializers import TaskSerializer 

40from common.services.task_cancellation_state import cache_cancelled_tasks 

41from core.auxiliary.services.csv_lifecycle import completed_csv_lifecycle_ttl, expires_at_from_ttl 

42from core.auxiliary.services.object_storage.s3 import schedule_object_expiration 

43from core.auxiliary.services.solve_completion_email import queue_solve_completion_email_for_task 

44from core.exceptions import DetailedException 

45from .idaes_factory import IdaesFactory, save_all_initial_values, store_properties_schema 

46from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

47from .adapters.stream_properties import serialise_stream 

48from .adapters.property_package_adapter import PropertyPackageAdapter 

49from .idaes_factory_context import IdaesFactoryContext 

50from core.auxiliary.models.Scenario import ( 

51 Scenario, 

52 ScenarioTabTypeEnum, 

53 SOLVE_TIMEOUT_DEFAULT_SECONDS, 

54) 

55from common.services import messaging 

56 

57logger = logging.getLogger(__name__) 

58tracer = trace.get_tracer(settings.OPEN_TELEMETRY_TRACER_NAME) 

59PARENT_TASK_CANCEL_CHECK_BATCH_SIZE = 25 

60 

61 

62def _resolve_solve_timeout_seconds( 

63 scenario: Scenario | None, 

64 *, 

65 parent_task_id: int | None, 

66) -> int | None: 

67 """Return the solve timeout that should be forwarded to one IDAES solve. 

68 

69 MSS parent tasks are orchestration-only and are not forwarded to the IDAES 

70 service as solve requests. Their child solves still inherit the scenario 

71 timeout, while direct solves fall back to the platform default 

72 when no scenario was selected. 

73 """ 

74 if scenario is None: 

75 return SOLVE_TIMEOUT_DEFAULT_SECONDS 

76 return scenario.solve_timeout_seconds 

77 

78 

79class IdaesServiceRequestException(Exception): 

80 def __init__(self, message: str) -> None: 

81 super().__init__(message) 

82 self.message = message 

83 

84 

85class SolveFlowsheetError(DetailedException): 

86 pass 

87 

88 

89class ResponseType(TypedDict, total=False): 

90 status: str 

91 error: dict | None 

92 log: str | None 

93 debug: dict | None 

94 

95 

96def idaes_service_request(endpoint: str, data: JsonValue) -> JsonValue: 

97 """Send a JSON payload to the configured IDAES service endpoint. 

98 

99 Args: 

100 endpoint: Relative path of the IDAES service endpoint to call. 

101 data: Serialised payload that conforms to the endpoint schema. 

102 

103 Returns: 

104 Parsed JSON response returned by the IDAES service. 

105 

106 Raises: 

107 IdaesServiceRequestException: If the service responds with a non-200 status. 

108 """ 

109 url = (os.getenv('IDAES_SERVICE_URL') 

110 or "http://localhost:8080") + "/" + endpoint 

111 result = requests.post(url, json=data) 

112 if result.status_code != 200: 

113 raise IdaesServiceRequestException(result.json()) 

114 

115 return result.json() 

116 

117 

118@tracer.start_as_current_span("send_flowsheet_solve_request") 

119def _solve_flowsheet_request( 

120 task_id: int, 

121 built_factory: IdaesFactory, 

122 parent_task_id: int | None = None, 

123 perform_diagnostics: bool = False, 

124 high_priority: bool = False 

125): 

126 """Queue an IDAES solve request for the provided flowsheet build. 

127 

128 Args: 

129 task_id: Identifier of the task tracking the solve request. 

130 built_factory: Fully built factory containing flowsheet data to solve. 

131 perform_diagnostics: Whether to request diagnostic output from IDAES. 

132 high_priority: Whether the message should be prioritised over normal solves. 

133 

134 Raises: 

135 SolveFlowsheetError: If the message cannot be dispatched to the queue. 

136 """ 

137 

138 try: 

139 solve_timeout_seconds = _resolve_solve_timeout_seconds( 

140 built_factory.scenario, 

141 parent_task_id=parent_task_id, 

142 ) 

143 idaes_payload = IdaesSolveRequestPayload( 

144 flowsheet=built_factory.flowsheet, 

145 solve_index=built_factory.solve_index, 

146 scenario_id=( 

147 built_factory.scenario.id if built_factory.scenario else None), 

148 task_id=task_id, 

149 parent_task_id=parent_task_id, 

150 perform_diagnostics=perform_diagnostics, 

151 solve_timeout_seconds=solve_timeout_seconds, 

152 ) 

153 

154 messaging.send_idaes_solve_message( 

155 idaes_payload, high_priority=high_priority) 

156 except Exception as e: 

157 raise SolveFlowsheetError(e, "idaes_factory_solve_message") 

158 

159 

160def _mark_task_dispatched_to_idaes(task: Task) -> None: 

161 """Persist that a solve task was successfully queued to the IDAES service.""" 

162 task.debug = { 

163 **(task.debug or {}), 

164 "idaes_dispatched": True, 

165 } 

166 task.save(update_fields=["debug"]) 

167 

168 

169def start_flowsheet_solve_event( 

170 flowsheet_id: int, 

171 group_id: int, 

172 user: User, 

173 scenario: Scenario = None, 

174 perform_diagnostics: bool = False 

175) -> Response: 

176 """Start a single solve for the given flowsheet and return the tracking task. 

177 

178 Args: 

179 flowsheet_id: Identifier of the flowsheet that should be solved. 

180 user: User initiating the solve request. 

181 scenario: Optional scenario providing context for the solve. 

182 perform_diagnostics: Whether the solve should run with diagnostic output enabled. 

183 

184 Returns: 

185 REST response containing the serialised `Task` used to track the solve. 

186 """ 

187 if scenario and scenario.state_name != ScenarioTabTypeEnum.SteadyState and not scenario.dataRows.exists(): 187 ↛ 189line 187 didn't jump to line 189 because the condition on line 187 was never true

188 # if not steady state solves and no data rows exist, cannot proceed 

189 return Response(status=400, data=f"No data was provided for {scenario.state_name} scenario.") 

190 

191 try: 

192 # Remove all previously created DynamicResults for this scenario 

193 scenario.solutions.all().delete() 

194 except: 

195 # there's no solution yet, that's fine 

196 pass 

197 

198 solve_task = Task.create( 

199 user, flowsheet_id, status=TaskStatus.Pending, save=True) 

200 # Persist the user's intent alongside the task so downstream consumers 

201 # can tell whether this solve was launched with diagnostics enabled. 

202 debug: dict[str, JsonValue] = { 

203 **(solve_task.debug or {}), 

204 "perform_diagnostics": bool(perform_diagnostics), 

205 "solve_timeout_seconds": ( 

206 scenario.solve_timeout_seconds 

207 if scenario is not None 

208 else SOLVE_TIMEOUT_DEFAULT_SECONDS 

209 ), 

210 } 

211 if scenario is not None: 

212 debug["scenario_id"] = scenario.id 

213 solve_task.debug = debug 

214 solve_task.save(update_fields=["debug"]) 

215 

216 try: 

217 factory = IdaesFactory( 

218 group_id=group_id, 

219 scenario=scenario 

220 ) 

221 factory.build() 

222 

223 # We send single solve requests as high priority to ensure 

224 # they are not blocked by large multi-solve requests. 

225 _solve_flowsheet_request( 

226 solve_task.id, 

227 factory, 

228 perform_diagnostics=perform_diagnostics, 

229 high_priority=True 

230 ) 

231 _mark_task_dispatched_to_idaes(solve_task) 

232 except DetailedException as e: 

233 solve_task.set_failure_with_exception(e, save=True) 

234 

235 task_serializer = TaskSerializer(solve_task) 

236 

237 return Response(task_serializer.data, status=200) 

238 

239 

240def start_multi_steady_state_solve_event(flowsheet_id: int, user: User, scenario: Scenario) -> Response: 

241 """Kick off a multi steady-state solve and return the parent tracking task. 

242 

243 Args: 

244 flowsheet_id: Identifier of the flowsheet being solved. 

245 user: User who requested the multi-solve. 

246 scenario: Scenario containing the steady-state configurations to solve. 

247 

248 Returns: 

249 REST response containing the parent `Task` that aggregates child solves. 

250 """ 

251 if not scenario.dataRows.exists(): # empty rows 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true

252 return Response(status=400, data="No data was provided for multi steady-state scenario.") 

253 

254 # Remove all previously created DynamicResults for this scenario 

255 scenario.solutions.all().delete() 

256 solve_iterations = scenario.dataRows.count() 

257 

258 parent_task = Task.create_parent_task( 

259 creator=user, 

260 flowsheet_id=flowsheet_id, 

261 scheduled_tasks=solve_iterations, 

262 status=TaskStatus.Running 

263 ) 

264 parent_task.debug = {"scenario_id": scenario.id} 

265 parent_task.save(update_fields=["debug"]) 

266 child_tasks: list[Task] = [ 

267 Task.create( 

268 user, 

269 flowsheet_id, 

270 parent=parent_task, 

271 status=TaskStatus.Pending 

272 ) for _ in range(solve_iterations) 

273 ] 

274 

275 Task.objects.bulk_create(child_tasks) 

276 

277 messaging.send_dispatch_multi_solve_message(MultiSolvePayload( 

278 task_id=parent_task.id, 

279 scenario_id=scenario.id) 

280 ) 

281 

282 return Response(TaskSerializer(parent_task).data, status=200) 

283 

284 

285def dispatch_multi_solves(parent_task_id: int, scenario_id: int): 

286 """Build and dispatch queued steady-state solves for each child task. 

287 

288 Args: 

289 parent_task_id: Identifier of the parent multi-solve task. 

290 scenario_id: Scenario containing the data rows to iterate through. 

291 """ 

292 parent_task = Task.objects.get(id=parent_task_id) 

293 scenario = Scenario.objects.get(id=scenario_id) 

294 

295 rootGroup = scenario.flowsheet.rootGrouping 

296 factory = IdaesFactory( 

297 group_id=rootGroup.id, 

298 scenario=scenario, 

299 ) 

300 

301 child_tasks = list(parent_task.children.order_by('start_time')) 

302 last_child_index = len(child_tasks) - 1 

303 

304 for solve_index, task in enumerate(child_tasks): 

305 should_check_parent_status = ( 

306 solve_index % PARENT_TASK_CANCEL_CHECK_BATCH_SIZE == 0 

307 or solve_index == last_child_index 

308 ) 

309 if should_check_parent_status: 

310 parent_task.refresh_from_db(fields=["status"]) 

311 if parent_task.status in (TaskStatus.Cancelling, TaskStatus.Cancelled): 

312 logger.info( 

313 "Stopping multi-solve dispatch for parent task %s because it is now %s.", 

314 parent_task.id, 

315 parent_task.status, 

316 ) 

317 break 

318 

319 try: 

320 factory.clear_flowsheet() 

321 factory.use_with_solve_index(solve_index) 

322 factory.build() 

323 

324 _solve_flowsheet_request( 

325 task.id, 

326 factory, 

327 parent_task_id=parent_task.id, 

328 ) 

329 _mark_task_dispatched_to_idaes(task) 

330 except DetailedException as e: 

331 task.set_failure_with_exception(exception=e, save=True) 

332 parent_completed = parent_task.update_status_from_child(task) 

333 if parent_completed: 

334 queue_solve_completion_email_for_task(parent_task, scenario_id=scenario.id) 

335 

336 flowsheet_messages = [ 

337 NotificationServiceMessage( 

338 data=TaskSerializer(task).data, 

339 message_type=NotificationServiceMessageType.TASK_UPDATED 

340 ) for task in [task, parent_task] 

341 ] 

342 

343 messaging.send_flowsheet_notification_messages( 

344 parent_task.flowsheet_id, flowsheet_messages) 

345 

346 

347def start_ml_training_event( 

348 csv_bucket: str, 

349 csv_key: str, 

350 csv_delimiter: str | None, 

351 input_labels: list[str], 

352 output_labels: list[str], 

353 user: User, 

354 flowsheet_id: int, 

355 model_id: int | None = None, 

356): 

357 """Queue an asynchronous machine-learning training job for the given dataset. 

358 

359 Args: 

360 csv_bucket: Bucket containing the training CSV. 

361 csv_key: Object key containing the training CSV. 

362 csv_delimiter: Optional delimiter for the CSV object. 

363 input_labels: Names of the input features. 

364 output_labels: Names of the predicted outputs. 

365 user: User requesting the training run. 

366 flowsheet_id: Flowsheet the training run is associated with. 

367 

368 Returns: 

369 REST response containing the serialised `Task` for the training job. 

370 """ 

371 training_task = Task.create( 

372 user, 

373 flowsheet_id, 

374 task_type=TaskType.ML_TRAINING, 

375 status=TaskStatus.Pending, 

376 save=True 

377 ) 

378 training_task.debug = {"model_id": model_id} 

379 training_task.save(update_fields=["debug"]) 

380 

381 try: 

382 payload = MLTrainRequestPayload( 

383 csv_bucket=csv_bucket, 

384 csv_key=csv_key, 

385 csv_delimiter=csv_delimiter, 

386 input_labels=input_labels, 

387 output_labels=output_labels, 

388 task_id=training_task.id, 

389 ) 

390 messaging.send_ml_training_message(payload) 

391 

392 except DetailedException as e: 

393 training_task.set_failure_with_exception(e, save=True) 

394 if model_id is not None: 394 ↛ 396line 394 didn't jump to line 396 because the condition on line 394 was always true

395 MLModel.objects.filter(id=model_id).update(progress=4) 

396 _send_task_notifications(training_task) 

397 

398 task_serializer = TaskSerializer(training_task) 

399 return Response(task_serializer.data, status=200) 

400 

401 

402def _send_task_notifications(task: Task, scenario_id: int | None = None): 

403 """Broadcast task completion or status updates to interested flowsheet clients. 

404 

405 Args: 

406 task: Task whose status change should be pushed to subscribers. 

407 """ 

408 flowsheet_messages = [] 

409 

410 # If this is a child task, update the parent task status 

411 if task.parent: 

412 parent_completed = task.parent.update_status_from_child(task) 

413 if parent_completed and task.parent.task_type == TaskType.IDAES_SOLVE: 

414 queue_solve_completion_email_for_task(task.parent, scenario_id=scenario_id) 

415 

416 message_type = (NotificationServiceMessageType.TASK_COMPLETED 

417 if task.parent.status == TaskStatus.Completed else 

418 NotificationServiceMessageType.TASK_UPDATED) 

419 

420 flowsheet_messages.append(NotificationServiceMessage( 

421 data=TaskSerializer(task.parent).data, 

422 message_type=message_type 

423 )) 

424 else: 

425 if task.task_type == TaskType.IDAES_SOLVE: 

426 queue_solve_completion_email_for_task(task, scenario_id=scenario_id) 

427 

428 flowsheet_messages.append(NotificationServiceMessage( 

429 data=TaskSerializer(task).data, 

430 message_type=NotificationServiceMessageType.TASK_COMPLETED 

431 )) 

432 

433 messaging.send_flowsheet_notification_messages( 

434 task.flowsheet_id, flowsheet_messages) 

435 

436 

437def _send_task_cancelled_notification(task: Task): 

438 """Broadcast that a task settled in the cancelled state.""" 

439 messaging.send_flowsheet_notification_messages( 

440 task.flowsheet_id, 

441 [ 

442 NotificationServiceMessage( 

443 data=TaskSerializer(task).data, 

444 message_type=NotificationServiceMessageType.TASK_CANCELLED, 

445 ) 

446 ], 

447 ) 

448 

449 

450def process_idaes_solve_response(solve_response: IdaesSolveCompletionPayload): 

451 """Persist the outcome of a completed IDAES solve and notify listeners. 

452 

453 Args: 

454 solve_response: Payload describing the finished solve result. 

455 """ 

456 # Use a transaction to ensure that either everything succeeds or nothing does 

457 should_mark_parent_cancelled = False 

458 with transaction.atomic(): 

459 try: 

460 task = ( 

461 Task.objects 

462 .select_related("flowsheet") # eager load FK/OneToOne relations 

463 .select_for_update() # lock rows in transaction 

464 .get(id=solve_response.task_id) 

465 ) 

466 except Task.DoesNotExist: 

467 logger.info( 

468 "Discarding solve completion for missing task_id=%s.", 

469 solve_response.task_id, 

470 ) 

471 return 

472 

473 

474 # Silently ignore if the task has already been marked as completed. 

475 # This allows us to simulate exactly-once delivery semantics (only process 

476 # a finished task once). 

477 if task.status == TaskStatus.Completed or task.status == TaskStatus.Cancelled: 477 ↛ 478line 477 didn't jump to line 478 because the condition on line 477 was never true

478 return 

479 

480 # If cancellation won the race before the completion payload arrived, 

481 # keep the task cancelled rather than reviving it back to completed/failed. 

482 if task.status == TaskStatus.Cancelling: 

483 task.status = TaskStatus.Cancelled 

484 task.completed_time = timezone.now() 

485 task.log = solve_response.log 

486 task.debug = { 

487 **(task.debug or {}), 

488 "timing": solve_response.timing or {}, 

489 } 

490 task.save(update_fields=["status", "completed_time", "log", "debug"]) 

491 if task.parent_id: 491 ↛ 492line 491 didn't jump to line 492 because the condition on line 491 was never true

492 Task.increment_cancelled_children_for_parent(task.parent_id) 

493 should_mark_parent_cancelled = True 

494 if not task.parent_id: 494 ↛ 530line 494 didn't jump to line 530

495 queue_solve_completion_email_for_task(task, scenario_id=solve_response.scenario_id) 

496 else: 

497 task.completed_time = timezone.now() 

498 task.log = solve_response.log 

499 task.debug = { 

500 **(task.debug or {}), 

501 "timing": solve_response.timing or {}, 

502 } 

503 

504 if solve_response.status == CompletionStatus.SUCCESS: 

505 task.status = TaskStatus.Completed 

506 else: 

507 task.status = TaskStatus.Failed 

508 task.error = { 

509 "message": solve_response.error["message"], 

510 "cause": "idaes_service_request", 

511 "traceback": solve_response.traceback 

512 } 

513 

514 task.save(update_fields=[ 

515 "status", "completed_time", "log", "debug", "error"]) 

516 

517 # Save the solved flowsheet values 

518 if task.status == TaskStatus.Completed: 

519 store_properties_schema(solve_response.flowsheet.properties, task.flowsheet_id, 

520 solve_response.scenario_id, solve_response.solve_index) 

521 

522 # For now, only save initial values for single and dynamic solves, not MSS 

523 # In future, we may need some more complex logic to handle MSS initial values 

524 if solve_response.solve_index is None: 

525 save_all_initial_values( 

526 solve_response.flowsheet.initial_values) 

527 

528 _send_task_notifications(task, scenario_id=solve_response.scenario_id) 

529 

530 if should_mark_parent_cancelled: 

531 _send_task_cancelled_notification(task) 

532 

533 if solve_response.scenario_id is None: 

534 mark_parent_cancelled(task) 

535 else: 

536 mark_parent_cancelled(task, scenario_id=solve_response.scenario_id) 

537 

538 

539def mark_parent_cancelled(task: Task, scenario_id: int | None = None): 

540 """Resolve a task or parent task to ``cancelled`` once cancellation has won. 

541 

542 A parent may legitimately end with zero explicitly cancelled children if the 

543 last in-flight solves complete before their kill signals land. In that case 

544 an already-``cancelling`` parent is still resolved to ``cancelled`` once all 

545 children have reached any terminal state. 

546 """ 

547 with transaction.atomic(): 

548 try: 

549 parent = ( 

550 Task.objects.select_for_update().get(id=task.parent_id) 

551 if task.parent_id 

552 else None 

553 ) 

554 except Task.DoesNotExist: 

555 logger.info( 

556 "Discarding parent cancellation update for missing parent task_id=%s.", 

557 task.parent_id, 

558 ) 

559 return 

560 

561 if parent is None: 

562 parent = task 

563 

564 has_children = parent.children.exists() 

565 has_in_flight_children = parent.children.filter( 

566 status__in=[ 

567 TaskStatus.Running, 

568 TaskStatus.Pending, 

569 TaskStatus.Cancelling, 

570 ] 

571 ).exists() 

572 

573 has_cancelled_children = parent.children.filter( 

574 status=TaskStatus.Cancelled 

575 ).exists() 

576 

577 # When a cancelling parent has children, resolve it to cancelled once all 

578 # children have reached any terminal state. This covers races where child 

579 # solves finish before their kill signals land, so none end up explicitly 

580 # marked as cancelled even though the parent cancellation should still win. 

581 should_cancel_parent = ( 

582 not has_in_flight_children 

583 and ( 

584 has_cancelled_children 

585 or (has_children and parent.status == TaskStatus.Cancelling) 

586 ) 

587 ) 

588 

589 if should_cancel_parent: 

590 parent.status = TaskStatus.Cancelled 

591 parent.completed_time = timezone.now() 

592 parent.save(update_fields=["status", "completed_time"]) 

593 

594 messaging.send_flowsheet_notification_message( 

595 parent.flowsheet_id, 

596 TaskSerializer(parent).data, 

597 NotificationServiceMessageType.TASK_CANCELLED 

598 ) 

599 queue_solve_completion_email_for_task(parent, scenario_id=scenario_id) 

600 

601 

602def process_failed_idaes_solve_response(solve_response: IdaesSolveCompletionPayload): 

603 """Handle final failure notifications for solves that could not be processed. 

604 

605 Args: 

606 solve_response: Completion payload received from the dead-letter queue. 

607 """ 

608 # Use a transaction to ensure that either everything succeeds or nothing does 

609 should_mark_parent_cancelled = False 

610 with transaction.atomic(): 

611 try: 

612 task = Task.objects.select_for_update().get(id=solve_response.task_id) 

613 except Task.DoesNotExist: 

614 logger.info( 

615 "Discarding failed solve completion for missing task_id=%s.", 

616 solve_response.task_id, 

617 ) 

618 return 

619 

620 # Silently ignore if the task has already been marked as failed. 

621 # This allows us to simulate exactly-once delivery semantics (only process 

622 # a failed task once). Our dead letter queue is configured in "at least once" delivery mode. 

623 if task.status == TaskStatus.Failed or task.status == TaskStatus.Cancelled: 623 ↛ 624line 623 didn't jump to line 624 because the condition on line 623 was never true

624 return 

625 

626 if task.status == TaskStatus.Cancelling: 626 ↛ 640line 626 didn't jump to line 640 because the condition on line 626 was always true

627 task.status = TaskStatus.Cancelled 

628 task.completed_time = timezone.now() 

629 task.log = solve_response.log 

630 task.save(update_fields=["status", "completed_time", "log"]) 

631 if task.parent_id: 631 ↛ 632line 631 didn't jump to line 632 because the condition on line 631 was never true

632 Task.increment_cancelled_children_for_parent(task.parent_id) 

633 should_mark_parent_cancelled = True 

634 if not task.parent_id: 634 ↛ 653line 634 didn't jump to line 653

635 queue_solve_completion_email_for_task( 

636 task, 

637 scenario_id=getattr(solve_response, "scenario_id", None), 

638 ) 

639 else: 

640 task.completed_time = timezone.now() 

641 task.log = solve_response.log 

642 task.status = TaskStatus.Failed 

643 task.error = { 

644 "message": "Internal server error: several attempts to process finished solve failed." 

645 } 

646 task.save() 

647 

648 _send_task_notifications( 

649 task, 

650 scenario_id=getattr(solve_response, "scenario_id", None), 

651 ) 

652 

653 if should_mark_parent_cancelled: 653 ↛ exitline 653 didn't return from function 'process_failed_idaes_solve_response' because the condition on line 653 was always true

654 _send_task_cancelled_notification(task) 

655 scenario_id = getattr(solve_response, "scenario_id", None) 

656 if scenario_id is None: 656 ↛ 659line 656 didn't jump to line 659 because the condition on line 656 was always true

657 mark_parent_cancelled(task) 

658 else: 

659 mark_parent_cancelled(task, scenario_id=scenario_id) 

660 

661 

662def process_ml_training_response( 

663 ml_training_response: MLTrainingCompletionPayload 

664): 

665 """Persist the result of a machine-learning training job and send updates. 

666 

667 Args: 

668 ml_training_response: Completion payload returned by the ML service. 

669 """ 

670 with transaction.atomic(): 

671 task = Task.objects.select_for_update().get(id=ml_training_response.task_id) 

672 

673 # Silently ignore if the task has already been marked as completed. 

674 # This allows us to simulate exactly-once delivery semantics (only process 

675 # a finished task once). 

676 if task.status == TaskStatus.Completed or task.status == TaskStatus.Cancelled: 676 ↛ 677line 676 didn't jump to line 677 because the condition on line 676 was never true

677 return 

678 

679 task.completed_time = timezone.now() 

680 task.log = ml_training_response.log 

681 existing_debug = task.debug or {} 

682 model_id = existing_debug.get("model_id") 

683 

684 if ml_training_response.status == "success": 

685 if ml_training_response.json_response is None: 685 ↛ 686line 685 didn't jump to line 686 because the condition on line 685 was never true

686 raise ValueError("Successful ML training payloads must include json_response.") 

687 result = ml_training_response.json_response 

688 task.status = TaskStatus.Completed 

689 task.debug = { 

690 **existing_debug, 

691 "timing": result.timing, 

692 } 

693 else: 

694 task.status = TaskStatus.Failed 

695 task.error = { 

696 "message": ml_training_response.error, 

697 "traceback": ml_training_response.traceback 

698 } 

699 if model_id is not None: 699 ↛ 702line 699 didn't jump to line 702 because the condition on line 699 was always true

700 MLModel.objects.filter(id=model_id).update(progress=4) 

701 

702 task.save( 

703 update_fields=["status", "completed_time", "log", "debug", "error"] 

704 ) 

705 

706 if task.status == TaskStatus.Completed and model_id is not None: 

707 MLModel.objects.filter(id=model_id).update( 

708 surrogate_model=result.surrogate_model, 

709 charts=[chart.model_dump() for chart in result.charts], 

710 metrics=[metric.model_dump() for metric in result.metrics], 

711 test_results_bucket=result.test_results_bucket, 

712 test_results_key=result.test_results_key, 

713 progress=3, 

714 ) 

715 if result.test_results_bucket and result.test_results_key: 715 ↛ 724line 715 didn't jump to line 724 because the condition on line 715 was always true

716 schedule_object_expiration( 

717 bucket=result.test_results_bucket, 

718 key=result.test_results_key, 

719 expires_at=expires_at_from_ttl( 

720 completed_csv_lifecycle_ttl(UploadSessionPurpose.ML_TRAINING_CSV), 

721 reference_time=task.completed_time, 

722 ), 

723 ) 

724 _send_task_notifications(task) 

725 

726 

727 

728def cancel_idaes_solve(task_id: int): 

729 """Mark an in-flight solve task as cancelled and notify subscribers. 

730 

731 For parent tasks with children (MSS scenarios), child status updates use 

732 queryset-level UPDATEs (O(1) queries regardless of child count) and the 

733 frontend is told to invalidate its child-task cache with a single 

734 ``TASK_CHILDREN_CANCELLED`` notification instead of one message per child. 

735 

736 Args: 

737 task_id: Identifier of the `Task` being cancelled. 

738 """ 

739 to_cache_cancel: set[int] = set() 

740 to_send_cancel: list[int] = [] 

741 has_children = False 

742 cancelled_at = timezone.now() 

743 with transaction.atomic(): 

744 task = Task.objects.select_for_update().get(id=task_id) 

745 

746 # Ignore cancellation request if a final status (e.g. completed or failed) has already been set 

747 if task.status in ( 

748 TaskStatus.Completed, 

749 TaskStatus.Failed, 

750 TaskStatus.Cancelled, 

751 TaskStatus.Cancelling 

752 ): 

753 return 

754 

755 if task.parent is None and task.children.exists(): 

756 has_children = True 

757 

758 # Use queryset-level updates to avoid loading potentially 

759 # hundreds of thousands of child Task objects into memory. 

760 # PostgreSQL guarantees atomicity of each UPDATE statement, 

761 # so the WHERE clause filters at execution time. 

762 non_terminal_children = task.children.exclude( 

763 status__in=[ 

764 TaskStatus.Completed, 

765 TaskStatus.Failed, 

766 TaskStatus.Cancelled, 

767 TaskStatus.Cancelling, 

768 ] 

769 ) 

770 # Immediately cancel pending children; they haven't been dispatched yet. 

771 cancelled_pending_child_count = ( 

772 non_terminal_children.filter(status=TaskStatus.Pending) 

773 .update(status=TaskStatus.Cancelled, completed_time=cancelled_at) 

774 ) 

775 # Mark remaining non-terminal children (e.g. Running) as Cancelling. 

776 non_terminal_children.exclude(status=TaskStatus.Pending).update( 

777 status=TaskStatus.Cancelling 

778 ) 

779 

780 Task.increment_cancelled_children_for_parent(task.id, cancelled_pending_child_count) 

781 

782 to_cache_cancel.add(task.id) 

783 to_send_cancel.append(task.id) 

784 task.status = TaskStatus.Cancelling 

785 else: 

786 task.status = TaskStatus.Cancelling 

787 to_cache_cancel.add(task.id) 

788 to_send_cancel.append(task.id) 

789 

790 task.save() 

791 

792 cache_cancelled_tasks(to_cache_cancel) 

793 for cancel_task_id in to_send_cancel: 

794 # For MSS, broadcasting just the parent task ID is enough: every child 

795 # solve consults shared cancellation state for both its own ID and its 

796 # parent ID before starting work. 

797 messaging.send_task_cancel_message(cancel_task_id) 

798 

799 # Send a single parent-level notification; for parent tasks with children, 

800 # include a children-invalidated event so the frontend drops its cached 

801 # child pages and refetches on demand rather than processing one message 

802 # per child task. 

803 notification_messages = [ 

804 NotificationServiceMessage( 

805 data=TaskSerializer(task).data, 

806 message_type=NotificationServiceMessageType.TASK_CANCELLING, 

807 ) 

808 ] 

809 if has_children: 

810 notification_messages.append( 

811 NotificationServiceMessage( 

812 data=TaskSerializer(task).data, 

813 message_type=NotificationServiceMessageType.TASK_CHILDREN_CANCELLED, 

814 ) 

815 ) 

816 messaging.send_flowsheet_notification_messages( 

817 task.flowsheet_id, notification_messages, 

818 ) 

819 mark_parent_cancelled(task) 

820 

821 

822def process_cancel_solve_response(cancel_response: TaskPayload): 

823 """Persist a remote cancellation acknowledgement from the IDAES service. 

824 

825 The IDAES service emits this only after it has successfully interrupted the 

826 active solver, so Django can safely settle the task to a terminal 

827 ``cancelled`` state and then check whether the parent can also be finalised. 

828 """ 

829 notification_message: NotificationServiceMessage | None = None 

830 # Use a transaction to ensure that either everything succeeds or nothing does 

831 with transaction.atomic(): 

832 try: 

833 task = Task.objects.select_for_update().get(id=cancel_response.task_id) 

834 except Task.DoesNotExist: 

835 logger.info( 

836 "Discarding cancel acknowledgement for missing task_id=%s.", 

837 cancel_response.task_id, 

838 ) 

839 return 

840 

841 # Timeout expiry can arrive before Django transitions a task to 

842 # `Cancelling`, so accept pending/running states for timeout paths. 

843 allowed_statuses = {TaskStatus.Cancelling} 

844 if cancel_response.timed_out: 

845 allowed_statuses.update((TaskStatus.Pending, TaskStatus.Running)) 

846 

847 if task.status not in allowed_statuses: 

848 return 

849 

850 task.status = TaskStatus.Cancelled 

851 task.completed_time = timezone.now() 

852 # Preserve timeout metadata on the task debug payload so downstream 

853 # notifications have the value that actually governed the solve. 

854 solve_timeout_seconds = (task.debug or {}).get( 

855 "solve_timeout_seconds", 

856 SOLVE_TIMEOUT_DEFAULT_SECONDS, 

857 ) 

858 task.debug = { 

859 **(task.debug or {}), 

860 "timed_out": bool(cancel_response.timed_out), 

861 "solve_timeout_seconds": solve_timeout_seconds, 

862 } 

863 if cancel_response.timed_out: 

864 timeout_message = ( 

865 f"Solve timed out after {solve_timeout_seconds} seconds." 

866 ) 

867 task.log = ( 

868 f"{task.log}\n{timeout_message}" if task.log else timeout_message 

869 ) 

870 task.save(update_fields=["status", "completed_time", "debug", "log"]) 

871 if task.parent_id: 

872 Task.increment_cancelled_children_for_parent(task.parent_id) 

873 notification_message = NotificationServiceMessage( 

874 data=TaskSerializer(task).data, 

875 message_type=NotificationServiceMessageType.TASK_CANCELLED 

876 ) 

877 

878 if notification_message is not None: 878 ↛ 884line 878 didn't jump to line 884 because the condition on line 878 was always true

879 messaging.send_flowsheet_notification_messages( 

880 task.flowsheet_id, [notification_message] 

881 ) 

882 queue_solve_completion_email_for_task(task) 

883 

884 mark_parent_cancelled(task) 

885 

886 

887def process_build_state_response(build_state_response: BuildStateCompletionPayload): 

888 """ 

889 Apply an asynchronous build-state completion payload from IDAES. 

890 

891 Successful responses update property values for the originating flowsheet. 

892 App-level failures are intentionally acknowledged without applying partial 

893 data and then surfaced to frontend subscribers. 

894 """ 

895 context = build_state_response.context 

896 if context is None: 

897 logger.warning( 

898 "Discarding build-state response without request context." 

899 ) 

900 return 

901 

902 if not _settle_build_state_task_from_completion(build_state_response): 

903 return 

904 

905 if not _is_current_build_state_response(context): 

906 return 

907 

908 if build_state_response.status == CompletionStatus.SUCCESS: 

909 store_properties_schema( 

910 build_state_response.properties, 

911 context.flowsheet_id, 

912 ) 

913 _send_build_state_completion_notification(build_state_response) 

914 return 

915 

916 logger.warning( 

917 "Build-state request failed for flowsheet_id=%s stream_id=%s " 

918 "property_set_id=%s: %s", 

919 context.flowsheet_id, 

920 context.stream_id, 

921 context.property_set_id, 

922 build_state_response.error, 

923 ) 

924 _send_build_state_completion_notification(build_state_response) 

925 

926 

927def _settle_build_state_task_from_completion( 

928 build_state_response: BuildStateCompletionPayload, 

929) -> bool: 

930 """Record the final status for a tracked build-state request. 

931 

932 Returns ``False`` when a terminal task already exists for this payload. That 

933 lets duplicate completion deliveries remain idempotent, matching solve and 

934 ML task handling. 

935 """ 

936 context = build_state_response.context 

937 if context is None: 937 ↛ 938line 937 didn't jump to line 938 because the condition on line 937 was never true

938 return True 

939 

940 with transaction.atomic(): 

941 try: 

942 task = Task.objects.select_for_update().get(id=context.task_id) 

943 except Task.DoesNotExist: 

944 logger.info( 

945 "Discarding build-state completion for missing task_id=%s.", 

946 context.task_id, 

947 ) 

948 return False 

949 

950 if task.status in ( 950 ↛ 955line 950 didn't jump to line 955 because the condition on line 950 was never true

951 TaskStatus.Completed, 

952 TaskStatus.Failed, 

953 TaskStatus.Cancelled, 

954 ): 

955 return False 

956 

957 if task.status == TaskStatus.Cancelling: 957 ↛ 958line 957 didn't jump to line 958 because the condition on line 957 was never true

958 task.status = TaskStatus.Cancelled 

959 elif build_state_response.status == CompletionStatus.SUCCESS: 

960 task.status = TaskStatus.Completed 

961 else: 

962 _revert_build_state_task_values(task) 

963 task.status = TaskStatus.Failed 

964 task.error = { 

965 "message": build_state_response.error, 

966 "cause": "build_state_request", 

967 "traceback": build_state_response.traceback, 

968 } 

969 

970 task.completed_time = timezone.now() 

971 task.log = build_state_response.log 

972 task.debug = { 

973 **(task.debug or {}), 

974 "context": context.model_dump(mode="json"), 

975 } 

976 task.save(update_fields=["status", "completed_time", "log", "debug", "error"]) 

977 

978 _send_task_notifications(task) 

979 return task.status != TaskStatus.Cancelled 

980 

981 

982def _is_current_build_state_response(context: BuildStateRequestContext) -> bool: 

983 """Return whether a build-state response still matches the latest request.""" 

984 return _is_current_build_state_context(context, "build-state response") 

985 

986 

987def _is_current_build_state_context( 

988 context: BuildStateRequestContext, 

989 event_label: str, 

990) -> bool: 

991 """Return whether a build-state event context still matches the latest request.""" 

992 try: 

993 latest_request_version = BuildStateRequestVersion.objects.only("version").get( 

994 property_set_id=context.property_set_id, 

995 ).version 

996 except BuildStateRequestVersion.DoesNotExist: 

997 logger.warning( 

998 "Discarding %s because no request version tracker exists " 

999 "for flowsheet_id=%s stream_id=%s property_set_id=%s request_version=%s.", 

1000 event_label, 

1001 context.flowsheet_id, 

1002 context.stream_id, 

1003 context.property_set_id, 

1004 context.request_version, 

1005 ) 

1006 return False 

1007 

1008 if latest_request_version != context.request_version: 

1009 logger.info( 

1010 "Discarding stale %s for flowsheet_id=%s stream_id=%s " 

1011 "property_set_id=%s request_version=%s latest_request_version=%s.", 

1012 event_label, 

1013 context.flowsheet_id, 

1014 context.stream_id, 

1015 context.property_set_id, 

1016 context.request_version, 

1017 latest_request_version, 

1018 ) 

1019 return False 

1020 

1021 return True 

1022 

1023 

1024def _send_build_state_completion_notification( 

1025 build_state_response: BuildStateCompletionPayload, 

1026): 

1027 """Broadcast a build-state completion update to flowsheet subscribers.""" 

1028 context = build_state_response.context 

1029 if context is None: 1029 ↛ 1030line 1029 didn't jump to line 1030 because the condition on line 1029 was never true

1030 return 

1031 

1032 payload = BuildStateCompletedPayload( 

1033 context=context, 

1034 status=build_state_response.status, 

1035 properties=build_state_response.properties, 

1036 error=build_state_response.error, 

1037 failure_kind=( 

1038 "idaes-error" 

1039 if build_state_response.status == CompletionStatus.ERROR 

1040 else None 

1041 ), 

1042 ) 

1043 messaging.send_flowsheet_notification_message( 

1044 context.flowsheet_id, 

1045 payload.model_dump(mode="json"), 

1046 NotificationServiceMessageType.BUILD_STATE_COMPLETED, 

1047 ) 

1048 

1049 

1050def process_build_state_request_dead_letter(build_state_request: BuildStateRequestSchema): 

1051 """Record that a build-state request failed broker delivery. 

1052 

1053 Dead letters are transport-level failures, not IDAES state-block solve 

1054 failures. This handler avoids applying property data and only emits a 

1055 diagnostic notification when the dead-lettered request is still the latest 

1056 request. 

1057 """ 

1058 context = build_state_request.context 

1059 if context is None: 

1060 logger.warning( 

1061 "Build-state request reached the dead-letter topic without request context." 

1062 ) 

1063 return 

1064 

1065 if not _settle_build_state_task_from_delivery_failure(context): 

1066 return 

1067 

1068 if not _is_current_build_state_context(context, "build-state request dead letter"): 1068 ↛ 1069line 1068 didn't jump to line 1069 because the condition on line 1068 was never true

1069 return 

1070 

1071 logger.warning( 

1072 "Build-state request delivery failed for flowsheet_id=%s stream_id=%s " 

1073 "property_set_id=%s request_version=%s.", 

1074 context.flowsheet_id, 

1075 context.stream_id, 

1076 context.property_set_id, 

1077 context.request_version, 

1078 ) 

1079 _send_build_state_delivery_failure_notification(context) 

1080 

1081 

1082def _settle_build_state_task_from_delivery_failure( 

1083 context: BuildStateRequestContext, 

1084) -> bool: 

1085 """Mark a tracked build-state request as failed when broker delivery fails.""" 

1086 with transaction.atomic(): 

1087 try: 

1088 task = Task.objects.select_for_update().get(id=context.task_id) 

1089 except Task.DoesNotExist: 

1090 logger.info( 

1091 "Discarding build-state delivery failure for missing task_id=%s.", 

1092 context.task_id, 

1093 ) 

1094 return False 

1095 

1096 if task.status in ( 1096 ↛ 1101line 1096 didn't jump to line 1101 because the condition on line 1096 was never true

1097 TaskStatus.Completed, 

1098 TaskStatus.Failed, 

1099 TaskStatus.Cancelled, 

1100 ): 

1101 return False 

1102 

1103 task.status = ( 

1104 TaskStatus.Cancelled 

1105 if task.status == TaskStatus.Cancelling 

1106 else TaskStatus.Failed 

1107 ) 

1108 if task.status == TaskStatus.Failed: 1108 ↛ 1110line 1108 didn't jump to line 1110 because the condition on line 1108 was always true

1109 _revert_build_state_task_values(task) 

1110 task.completed_time = timezone.now() 

1111 task.error = { 

1112 "message": "Build-state request delivery failed.", 

1113 "cause": "message_delivery", 

1114 "traceback": None, 

1115 } 

1116 task.debug = { 

1117 **(task.debug or {}), 

1118 "context": context.model_dump(mode="json"), 

1119 } 

1120 task.save(update_fields=["status", "completed_time", "error", "debug"]) 

1121 

1122 _send_task_notifications(task) 

1123 return task.status != TaskStatus.Cancelled 

1124 

1125 

1126def _send_build_state_delivery_failure_notification( 

1127 context: BuildStateRequestContext, 

1128): 

1129 """Broadcast a silent transport-level build-state request failure event.""" 

1130 payload = BuildStateCompletedPayload( 

1131 context=context, 

1132 status="error", 

1133 properties=None, 

1134 error=None, 

1135 failure_kind="delivery-failure", 

1136 ) 

1137 messaging.send_flowsheet_notification_message( 

1138 context.flowsheet_id, 

1139 payload.model_dump(mode="json"), 

1140 NotificationServiceMessageType.BUILD_STATE_COMPLETED, 

1141 ) 

1142 

1143 

1144def generate_IDAES_python_request(flowsheet_id: int) -> Response: 

1145 """Return the flowsheet JSON for the given flowsheet. 

1146 

1147 The previous behaviour attempted to generate Python source by forwarding 

1148 the flowsheet to the IDAES service. Python generation is no longer 

1149 supported; this function now always returns the local flowsheet JSON. 

1150 """ 

1151 scenario = None 

1152 flowsheet = Flowsheet.objects.get(id=flowsheet_id) 

1153 factory = IdaesFactory(group_id=flowsheet.rootGrouping.id, scenario=scenario, require_variables_fixed=False) 

1154 response_data = ResponseType( 

1155 status="success", 

1156 error=None, 

1157 log=None, 

1158 debug=None 

1159 ) 

1160 try: 

1161 factory.build() 

1162 data = factory.flowsheet 

1163 # use model_dump_json so that pydantic can handle any non-serializable fields instead 

1164 # of django's logic which would error out. 

1165 return Response(data.model_dump_json(), status=200) 

1166 except Exception as e: 

1167 response_data["status"] = "error" 

1168 response_data["error"] = { 

1169 "message": str(e), 

1170 "traceback": traceback.format_exc() 

1171 } 

1172 return Response(response_data, status=400) 

1173 # NOTE: Python generation used to be implemented by forwarding the 

1174 # flowsheet to the IDAES service; that pathway has been removed. We 

1175 # have already returned above with the JSON representation. 

1176 

1177 

1178class BuildStateSolveError(Exception): 

1179 pass 

1180 

1181 

1182def _next_build_state_request_version(property_set: PropertySet) -> int: 

1183 """Increment and return the latest build-state request version for a property set.""" 

1184 try: 

1185 return _next_build_state_request_version_locked(property_set) 

1186 except IntegrityError: 

1187 # A concurrent first request may have created the tracker between the 

1188 # get and create inside get_or_create. Retry once and lock the new row. 

1189 return _next_build_state_request_version_locked(property_set) 

1190 

1191 

1192def _next_build_state_request_version_locked(property_set: PropertySet) -> int: 

1193 """Increment the version while holding a row lock for this property set.""" 

1194 with transaction.atomic(): 

1195 tracker, _ = BuildStateRequestVersion.objects.select_for_update().get_or_create( 

1196 property_set_id=property_set.id, 

1197 defaults={"flowsheet_id": property_set.flowsheet_id}, 

1198 ) 

1199 tracker.version += 1 

1200 tracker.save(update_fields=["version", "updated_at"]) 

1201 return tracker.version 

1202 

1203 

1204def state_request_build( 

1205 stream: SimulationObject, 

1206 *, 

1207 task_id: int, 

1208) -> BuildStateRequestSchema: 

1209 """Build the IDAES service payload for a stream state request.""" 

1210 ctx = IdaesFactoryContext(stream.flowsheet.rootGrouping.id) 

1211 property_set = stream.properties 

1212 

1213 port = stream.connectedPorts.get(direction="inlet") 

1214 unitop: SimulationObject = port.unitOp 

1215 # find the property package key for this port (we have the value, the key of this port) 

1216 property_package_ports = unitop.schema.propertyPackagePorts 

1217 for key, port_list in property_package_ports.items(): 

1218 if port.key in port_list: 

1219 property_package_key = key 

1220 PropertyPackageAdapter( 

1221 property_package_key).serialise(ctx, unitop) 

1222 

1223 return BuildStateRequestSchema( 

1224 property_package=ctx.property_packages[0], 

1225 properties=serialise_stream(ctx, stream, is_inlet=True), 

1226 context=BuildStateRequestContext( 

1227 flowsheet_id=stream.flowsheet_id, 

1228 stream_id=stream.id, 

1229 task_id=task_id, 

1230 property_set_id=property_set.id, 

1231 request_version=_next_build_state_request_version(property_set), 

1232 ), 

1233 ) 

1234 

1235 

1236def _create_build_state_task( 

1237 stream: SimulationObject, 

1238 user: User | int, 

1239 rollback_values: dict[int, float] | None = None, 

1240) -> Task: 

1241 """Create the tracking task used for a build-state request.""" 

1242 task = Task.create( 

1243 user, 

1244 stream.flowsheet_id, 

1245 task_type=TaskType.BUILD_STATE, 

1246 status=TaskStatus.Pending, 

1247 save=True, 

1248 ) 

1249 task.debug = { 

1250 "stream_id": stream.id, 

1251 "property_set_id": stream.properties.id, 

1252 "rollback_values": ( 

1253 {str(prop_id): value for prop_id, value in rollback_values.items()} 

1254 if rollback_values 

1255 else {} 

1256 ), 

1257 } 

1258 task.save(update_fields=["debug"]) 

1259 return task 

1260 

1261 

1262def _revert_build_state_task_values(task: Task) -> None: 

1263 """Restore raw property values captured before the build-state request.""" 

1264 rollback_values = (task.debug or {}).get("rollback_values") 

1265 if not rollback_values: 

1266 return 

1267 

1268 property_values = list( 

1269 PropertyValue.objects.filter(id__in=[int(prop_id) for prop_id in rollback_values]) 

1270 ) 

1271 for property_value in property_values: 

1272 property_value.value = rollback_values[str(property_value.id)] 

1273 

1274 PropertyValue.objects.bulk_update(property_values, ["value"]) 

1275 

1276 

1277def _fail_build_state_task_before_dispatch( 

1278 task: Task, 

1279 exception: Exception, 

1280 *, 

1281 cause: str, 

1282) -> None: 

1283 """Persist a build-state setup/dispatch failure before re-raising it.""" 

1284 _revert_build_state_task_values(task) 

1285 task.set_failure_with_exception( 

1286 DetailedException(exception, source=cause), 

1287 save=True, 

1288 ) 

1289 _send_task_notifications(task) 

1290 

1291 

1292def _mark_build_state_task_dispatched( 

1293 task: Task, 

1294 context: BuildStateRequestContext | None, 

1295) -> None: 

1296 """Persist dispatch metadata for a queued build-state task.""" 

1297 task.debug = { 

1298 **(task.debug or {}), 

1299 "idaes_dispatched": True, 

1300 "context": ( 

1301 context.model_dump(mode="json") if context is not None else None 

1302 ), 

1303 } 

1304 task.save(update_fields=["debug"]) 

1305 

1306 

1307def build_state_request( 

1308 stream: SimulationObject, 

1309 user: User | int, 

1310 rollback_values: dict[int, float] | None = None, 

1311): 

1312 """Queue a state build request for the provided stream. 

1313 

1314 Args: 

1315 stream: Stream object whose inlet properties should be used for the build. 

1316 user: User requesting the property update that triggered the build. 

1317 

1318 Returns: 

1319 REST response containing the tracking task. Built properties are applied 

1320 later by ``process_build_state_response`` when IDAES publishes the 

1321 completion event. 

1322 

1323 Raises: 

1324 BuildStateSolveError: If the request cannot be queued. 

1325 Exception: If preparing the payload fails for any reason. 

1326 """ 

1327 task = _create_build_state_task( 

1328 stream, 

1329 user, 

1330 rollback_values=rollback_values, 

1331 ) 

1332 try: 

1333 data = state_request_build(stream, task_id=task.id) 

1334 messaging.send_idaes_build_state_request_message(data) 

1335 _mark_build_state_task_dispatched(task, data.context) 

1336 return Response(TaskSerializer(task).data, status=202) 

1337 except Exception as e: 

1338 _fail_build_state_task_before_dispatch( 

1339 task, 

1340 e, 

1341 cause="build_state_dispatch", 

1342 ) 

1343 raise BuildStateSolveError(str(e))