Coverage for backend/django/idaes_factory/endpoints.py: 85%

506 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1import json 

2import logging 

3import os 

4from typing import TypedDict 

5from common.models.general import TaskPayload 

6import requests 

7import traceback 

8 

9from django.db import IntegrityError, transaction 

10from django.utils import timezone 

11from opentelemetry import trace 

12from rest_framework.exceptions import ValidationError 

13from rest_framework.response import Response 

14from pydantic import JsonValue 

15 

16from CoreRoot import settings 

17from authentication.user.models import User 

18from ahuora_builder_types.payloads.build_state_request_schema import ( 

19 BuildStateCompletionPayload, 

20 BuildStateRequestContext, 

21 BuildStateRequestSchema, 

22) 

23from ahuora_builder_types.payloads.solve_request_schema import IdaesSolveRequestPayload, IdaesSolveCompletionPayload, \ 

24 MultiSolvePayload 

25from common.models.idaes.payloads.solve_request_schema import CompletionStatus 

26from ahuora_builder_types.payloads.ml_request_schema import MLTrainRequestPayload, MLTrainingCompletionPayload 

27from core.auxiliary.models.MLModel import MLModel 

28from core.auxiliary.models.UploadSession import UploadSessionPurpose 

29from common.models.notifications.payloads import ( 

30 BuildStateCompletedPayload, 

31 TaskCompletedPayload, 

32 NotificationServiceMessageType, 

33 NotificationServiceMessage, 

34) 

35from core.auxiliary.enums.generalEnums import TaskStatus 

36from core.auxiliary.models.Task import Task, TaskType 

37from core.auxiliary.models.BuildStateRequestVersion import BuildStateRequestVersion 

38from core.auxiliary.models.PropertySet import PropertySet 

39from core.auxiliary.models.PropertyValue import PropertyValue 

40from core.auxiliary.models.Flowsheet import Flowsheet 

41from core.auxiliary.serializers import TaskSerializer 

42from common.services.task_cancellation_state import cache_cancelled_tasks 

43from core.auxiliary.services.csv_lifecycle import completed_csv_lifecycle_ttl, expires_at_from_ttl 

44from core.auxiliary.services.object_storage.s3 import schedule_object_expiration 

45from core.auxiliary.services.solve_completion_email import queue_solve_completion_email_for_task 

46from core.auxiliary.services.parameter_sweep import validate_parameter_sweep_solve_ready 

47from core.exceptions import DetailedException 

48from .idaes_factory import IdaesFactory, save_all_initial_values, store_properties_schema 

49from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

50from .adapters.stream_properties import serialise_stream 

51from .adapters.property_package_adapter import PropertyPackageAdapter 

52from .idaes_factory_context import IdaesFactoryContext 

53from core.auxiliary.models.Scenario import ( 

54 Scenario, 

55 ScenarioTabTypeEnum, 

56 SOLVE_TIMEOUT_DEFAULT_SECONDS, 

57) 

58from common.services import messaging 

59from diagnostics.methods.update_diagnostic_result import update_diagnostics_results 

60 

61logger = logging.getLogger(__name__) 

62tracer = trace.get_tracer(settings.OPEN_TELEMETRY_TRACER_NAME) 

63PARENT_TASK_CANCEL_CHECK_BATCH_SIZE = 25 

64 

65 

66def _resolve_solve_timeout_seconds( 

67 scenario: Scenario | None, 

68 *, 

69 parent_task_id: int | None, 

70) -> int | None: 

71 """Return the solve timeout that should be forwarded to one IDAES solve. 

72 

73 MSS parent tasks are orchestration-only and are not forwarded to the IDAES 

74 service as solve requests. Their child solves still inherit the scenario 

75 timeout, while direct solves fall back to the platform default 

76 when no scenario was selected. 

77 """ 

78 if scenario is None: 

79 return SOLVE_TIMEOUT_DEFAULT_SECONDS 

80 return scenario.solve_timeout_seconds 

81 

82 

83class IdaesServiceRequestException(Exception): 

84 def __init__(self, message: str) -> None: 

85 super().__init__(message) 

86 self.message = message 

87 

88 

89class SolveFlowsheetError(DetailedException): 

90 pass 

91 

92 

93class ResponseType(TypedDict, total=False): 

94 status: str 

95 error: dict | None 

96 log: str | None 

97 debug: dict | None 

98 

99 

100def idaes_service_request(endpoint: str, data: JsonValue) -> JsonValue: 

101 """Send a JSON payload to the configured IDAES service endpoint. 

102 

103 Args: 

104 endpoint: Relative path of the IDAES service endpoint to call. 

105 data: Serialised payload that conforms to the endpoint schema. 

106 

107 Returns: 

108 Parsed JSON response returned by the IDAES service. 

109 

110 Raises: 

111 IdaesServiceRequestException: If the service responds with a non-200 status. 

112 """ 

113 url = (os.getenv('IDAES_SERVICE_URL') 

114 or "http://localhost:8080") + "/" + endpoint 

115 result = requests.post(url, json=data) 

116 if result.status_code != 200: 

117 raise IdaesServiceRequestException(result.json()) 

118 

119 return result.json() 

120 

121 

122@tracer.start_as_current_span("send_flowsheet_solve_request") 

123def _solve_flowsheet_request( 

124 task_id: int, 

125 built_factory: IdaesFactory, 

126 parent_task_id: int | None = None, 

127 perform_diagnostics: bool = False, 

128 high_priority: bool = False 

129): 

130 """Queue an IDAES solve request for the provided flowsheet build. 

131 

132 Args: 

133 task_id: Identifier of the task tracking the solve request. 

134 built_factory: Fully built factory containing flowsheet data to solve. 

135 perform_diagnostics: Whether to request diagnostic output from IDAES. 

136 high_priority: Whether the message should be prioritised over normal solves. 

137 

138 Raises: 

139 SolveFlowsheetError: If the message cannot be dispatched to the queue. 

140 """ 

141 

142 try: 

143 solve_timeout_seconds = _resolve_solve_timeout_seconds( 

144 built_factory.scenario, 

145 parent_task_id=parent_task_id, 

146 ) 

147 idaes_payload = IdaesSolveRequestPayload( 

148 flowsheet=built_factory.flowsheet, 

149 solve_index=built_factory.solve_index, 

150 scenario_id=( 

151 built_factory.scenario.id if built_factory.scenario else None), 

152 task_id=task_id, 

153 parent_task_id=parent_task_id, 

154 perform_diagnostics=perform_diagnostics, 

155 solve_timeout_seconds=solve_timeout_seconds, 

156 ) 

157 

158 messaging.send_idaes_solve_message( 

159 idaes_payload, high_priority=high_priority) 

160 except Exception as e: 

161 raise SolveFlowsheetError(e, "idaes_factory_solve_message") 

162 

163 

164def _mark_task_dispatched_to_idaes(task: Task) -> None: 

165 """Persist that a solve task was successfully queued to the IDAES service.""" 

166 task.debug = { 

167 **(task.debug or {}), 

168 "idaes_dispatched": True, 

169 } 

170 task.save(update_fields=["debug"]) 

171 

172 

173def start_flowsheet_solve_event( 

174 flowsheet_id: int, 

175 group_id: int, 

176 user: User, 

177 scenario: Scenario = None, 

178 perform_diagnostics: bool = False 

179) -> Response: 

180 """Start a single solve for the given flowsheet and return the tracking task. 

181 

182 Args: 

183 flowsheet_id: Identifier of the flowsheet that should be solved. 

184 user: User initiating the solve request. 

185 scenario: Optional scenario providing context for the solve. 

186 perform_diagnostics: Whether the solve should run with diagnostic output enabled. 

187 

188 Returns: 

189 REST response containing the serialised `Task` used to track the solve. 

190 """ 

191 if scenario and scenario.state_name != ScenarioTabTypeEnum.SteadyState and not scenario.dataRows.exists(): 191 ↛ 193line 191 didn't jump to line 193 because the condition on line 191 was never true

192 # if not steady state solves and no data rows exist, cannot proceed 

193 return Response(status=400, data=f"No data was provided for {scenario.state_name} scenario.") 

194 

195 try: 

196 # Remove all previously created DynamicResults for this scenario 

197 scenario.solutions.all().delete() 

198 except: 

199 # there's no solution yet, that's fine 

200 pass 

201 

202 solve_task = Task.create( 

203 user, flowsheet_id, status=TaskStatus.Pending, save=True) 

204 # Persist the user's intent alongside the task so downstream consumers 

205 # can tell whether this solve was launched with diagnostics enabled. 

206 debug: dict[str, JsonValue] = { 

207 **(solve_task.debug or {}), 

208 "perform_diagnostics": bool(perform_diagnostics), 

209 "solve_timeout_seconds": ( 

210 scenario.solve_timeout_seconds 

211 if scenario is not None 

212 else SOLVE_TIMEOUT_DEFAULT_SECONDS 

213 ), 

214 } 

215 if scenario is not None: 

216 debug["scenario_id"] = scenario.id 

217 solve_task.debug = debug 

218 solve_task.save(update_fields=["debug"]) 

219 

220 try: 

221 factory = IdaesFactory( 

222 group_id=group_id, 

223 scenario=scenario 

224 ) 

225 factory.build() 

226 

227 # We send single solve requests as high priority to ensure 

228 # they are not blocked by large multi-solve requests. 

229 _solve_flowsheet_request( 

230 solve_task.id, 

231 factory, 

232 perform_diagnostics=perform_diagnostics, 

233 high_priority=True 

234 ) 

235 _mark_task_dispatched_to_idaes(solve_task) 

236 except DetailedException as e: 

237 solve_task.set_failure_with_exception(e, save=True) 

238 

239 task_serializer = TaskSerializer(solve_task) 

240 

241 return Response(task_serializer.data, status=200) 

242 

243 

244def start_multi_steady_state_solve_event(flowsheet_id: int, user: User, scenario: Scenario) -> Response: 

245 """Kick off a multi steady-state solve and return the parent tracking task. 

246 

247 Args: 

248 flowsheet_id: Identifier of the flowsheet being solved. 

249 user: User who requested the multi-solve. 

250 scenario: Scenario containing the steady-state configurations to solve. 

251 

252 Returns: 

253 REST response containing the parent `Task` that aggregates child solves. 

254 """ 

255 if not scenario.dataRows.exists(): # empty rows 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true

256 return Response(status=400, data="No data was provided for multi steady-state scenario.") 

257 try: 

258 validate_parameter_sweep_solve_ready(scenario) 

259 except ValidationError as exc: 

260 return Response(status=400, data=exc.detail) 

261 

262 # Remove all previously created DynamicResults for this scenario 

263 scenario.solutions.all().delete() 

264 solve_iterations = scenario.dataRows.count() 

265 

266 parent_task = Task.create_parent_task( 

267 creator=user, 

268 flowsheet_id=flowsheet_id, 

269 scheduled_tasks=solve_iterations, 

270 status=TaskStatus.Running 

271 ) 

272 parent_task.debug = {"scenario_id": scenario.id} 

273 parent_task.save(update_fields=["debug"]) 

274 child_tasks: list[Task] = [ 

275 Task.create( 

276 user, 

277 flowsheet_id, 

278 parent=parent_task, 

279 status=TaskStatus.Pending 

280 ) for _ in range(solve_iterations) 

281 ] 

282 

283 Task.objects.bulk_create(child_tasks) 

284 

285 messaging.send_dispatch_multi_solve_message(MultiSolvePayload( 

286 task_id=parent_task.id, 

287 scenario_id=scenario.id) 

288 ) 

289 

290 return Response(TaskSerializer(parent_task).data, status=200) 

291 

292 

293def dispatch_multi_solves(parent_task_id: int, scenario_id: int): 

294 """Build and dispatch queued steady-state solves for each child task. 

295 

296 Args: 

297 parent_task_id: Identifier of the parent multi-solve task. 

298 scenario_id: Scenario containing the data rows to iterate through. 

299 """ 

300 parent_task = Task.objects.get(id=parent_task_id) 

301 scenario = Scenario.objects.get(id=scenario_id) 

302 

303 rootGroup = scenario.flowsheet.rootGrouping 

304 factory = IdaesFactory( 

305 group_id=rootGroup.id, 

306 scenario=scenario, 

307 ) 

308 

309 child_tasks = list(parent_task.children.order_by('start_time')) 

310 last_child_index = len(child_tasks) - 1 

311 

312 for solve_index, task in enumerate(child_tasks): 

313 should_check_parent_status = ( 

314 solve_index % PARENT_TASK_CANCEL_CHECK_BATCH_SIZE == 0 

315 or solve_index == last_child_index 

316 ) 

317 if should_check_parent_status: 

318 parent_task.refresh_from_db(fields=["status"]) 

319 if parent_task.status in (TaskStatus.Cancelling, TaskStatus.Cancelled): 

320 logger.info( 

321 "Stopping multi-solve dispatch for parent task %s because it is now %s.", 

322 parent_task.id, 

323 parent_task.status, 

324 ) 

325 break 

326 

327 try: 

328 factory.clear_flowsheet() 

329 factory.use_with_solve_index(solve_index) 

330 factory.build() 

331 

332 _solve_flowsheet_request( 

333 task.id, 

334 factory, 

335 parent_task_id=parent_task.id, 

336 ) 

337 _mark_task_dispatched_to_idaes(task) 

338 except DetailedException as e: 

339 task.set_failure_with_exception(exception=e, save=True) 

340 parent_completed = parent_task.update_status_from_child(task) 

341 if parent_completed: 

342 queue_solve_completion_email_for_task(parent_task, scenario_id=scenario.id) 

343 

344 flowsheet_messages = [ 

345 NotificationServiceMessage( 

346 data=TaskSerializer(task).data, 

347 message_type=NotificationServiceMessageType.TASK_UPDATED 

348 ) for task in [task, parent_task] 

349 ] 

350 

351 messaging.send_flowsheet_notification_messages( 

352 parent_task.flowsheet_id, flowsheet_messages) 

353 

354 

355def start_ml_training_event( 

356 csv_bucket: str, 

357 csv_key: str, 

358 csv_delimiter: str | None, 

359 input_labels: list[str], 

360 output_labels: list[str], 

361 user: User, 

362 flowsheet_id: int, 

363 model_id: int | None = None, 

364): 

365 """Queue an asynchronous machine-learning training job for the given dataset. 

366 

367 Args: 

368 csv_bucket: Bucket containing the training CSV. 

369 csv_key: Object key containing the training CSV. 

370 csv_delimiter: Optional delimiter for the CSV object. 

371 input_labels: Names of the input features. 

372 output_labels: Names of the predicted outputs. 

373 user: User requesting the training run. 

374 flowsheet_id: Flowsheet the training run is associated with. 

375 

376 Returns: 

377 REST response containing the serialised `Task` for the training job. 

378 """ 

379 training_task = Task.create( 

380 user, 

381 flowsheet_id, 

382 task_type=TaskType.ML_TRAINING, 

383 status=TaskStatus.Pending, 

384 save=True 

385 ) 

386 training_task.debug = {"model_id": model_id} 

387 training_task.save(update_fields=["debug"]) 

388 

389 try: 

390 payload = MLTrainRequestPayload( 

391 csv_bucket=csv_bucket, 

392 csv_key=csv_key, 

393 csv_delimiter=csv_delimiter, 

394 input_labels=input_labels, 

395 output_labels=output_labels, 

396 task_id=training_task.id, 

397 ) 

398 messaging.send_ml_training_message(payload) 

399 

400 except DetailedException as e: 

401 training_task.set_failure_with_exception(e, save=True) 

402 if model_id is not None: 402 ↛ 404line 402 didn't jump to line 404 because the condition on line 402 was always true

403 MLModel.objects.filter(id=model_id).update(progress=4) 

404 _send_task_notifications(training_task) 

405 

406 task_serializer = TaskSerializer(training_task) 

407 return Response(task_serializer.data, status=200) 

408 

409 

410def _send_task_notifications(task: Task, scenario_id: int | None = None): 

411 """Broadcast task completion or status updates to interested flowsheet clients. 

412 

413 Args: 

414 task: Task whose status change should be pushed to subscribers. 

415 """ 

416 flowsheet_messages = [] 

417 

418 # If this is a child task, update the parent task status 

419 if task.parent: 

420 parent_completed = task.parent.update_status_from_child(task) 

421 if parent_completed and task.parent.task_type == TaskType.IDAES_SOLVE: 

422 queue_solve_completion_email_for_task(task.parent, scenario_id=scenario_id) 

423 

424 message_type = (NotificationServiceMessageType.TASK_COMPLETED 

425 if task.parent.status == TaskStatus.Completed else 

426 NotificationServiceMessageType.TASK_UPDATED) 

427 

428 flowsheet_messages.append(NotificationServiceMessage( 

429 data=TaskSerializer(task.parent).data, 

430 message_type=message_type 

431 )) 

432 else: 

433 if task.task_type == TaskType.IDAES_SOLVE: 

434 queue_solve_completion_email_for_task(task, scenario_id=scenario_id) 

435 

436 flowsheet_messages.append(NotificationServiceMessage( 

437 data=TaskSerializer(task).data, 

438 message_type=NotificationServiceMessageType.TASK_COMPLETED 

439 )) 

440 

441 messaging.send_flowsheet_notification_messages( 

442 task.flowsheet_id, flowsheet_messages) 

443 

444 

445def _send_task_cancelled_notification(task: Task): 

446 """Broadcast that a task settled in the cancelled state.""" 

447 messaging.send_flowsheet_notification_messages( 

448 task.flowsheet_id, 

449 [ 

450 NotificationServiceMessage( 

451 data=TaskSerializer(task).data, 

452 message_type=NotificationServiceMessageType.TASK_CANCELLED, 

453 ) 

454 ], 

455 ) 

456 

457 

458def process_idaes_solve_response(solve_response: IdaesSolveCompletionPayload): 

459 """Persist the outcome of a completed IDAES solve and notify listeners. 

460 

461 Args: 

462 solve_response: Payload describing the finished solve result. 

463 """ 

464 # Use a transaction to ensure that either everything succeeds or nothing does 

465 should_mark_parent_cancelled = False 

466 with transaction.atomic(): 

467 try: 

468 task = ( 

469 Task.objects 

470 .select_related("flowsheet") # eager load FK/OneToOne relations 

471 .select_for_update() # lock rows in transaction 

472 .get(id=solve_response.task_id) 

473 ) 

474 except Task.DoesNotExist: 

475 logger.info( 

476 "Discarding solve completion for missing task_id=%s.", 

477 solve_response.task_id, 

478 ) 

479 return 

480 

481 

482 # Silently ignore if the task has already been marked as completed. 

483 # This allows us to simulate exactly-once delivery semantics (only process 

484 # a finished task once). 

485 if task.status == TaskStatus.Completed or task.status == TaskStatus.Cancelled: 485 ↛ 486line 485 didn't jump to line 486 because the condition on line 485 was never true

486 return 

487 

488 # If cancellation won the race before the completion payload arrived, 

489 # keep the task cancelled rather than reviving it back to completed/failed. 

490 if task.status == TaskStatus.Cancelling: 

491 task.status = TaskStatus.Cancelled 

492 task.completed_time = timezone.now() 

493 task.log = solve_response.log 

494 task.debug = { 

495 **(task.debug or {}), 

496 "timing": solve_response.timing or {}, 

497 } 

498 task.save(update_fields=["status", "completed_time", "log", "debug"]) 

499 if task.parent_id: 499 ↛ 500line 499 didn't jump to line 500 because the condition on line 499 was never true

500 Task.increment_cancelled_children_for_parent(task.parent_id) 

501 should_mark_parent_cancelled = True 

502 if not task.parent_id: 502 ↛ 540line 502 didn't jump to line 540

503 queue_solve_completion_email_for_task(task, scenario_id=solve_response.scenario_id) 

504 else: 

505 task.completed_time = timezone.now() 

506 task.log = solve_response.log 

507 task.debug = { 

508 **(task.debug or {}), 

509 "timing": solve_response.timing or {}, 

510 } 

511 

512 if solve_response.status == CompletionStatus.SUCCESS: 

513 task.status = TaskStatus.Completed 

514 else: 

515 task.status = TaskStatus.Failed 

516 task.error = { 

517 "message": solve_response.error["message"], 

518 "cause": "idaes_service_request", 

519 "traceback": solve_response.traceback 

520 } 

521 

522 task.save(update_fields=[ 

523 "status", "completed_time", "log", "debug", "error"]) 

524 

525 # Save the solved flowsheet values 

526 if task.status == TaskStatus.Completed: 

527 store_properties_schema(solve_response.flowsheet.properties, task.flowsheet_id, 

528 solve_response.scenario_id, solve_response.solve_index) 

529 

530 

531 # For now, only save initial values for single and dynamic solves, not MSS 

532 # In future, we may need some more complex logic to handle MSS initial values 

533 if solve_response.solve_index is None: 

534 save_all_initial_values( 

535 solve_response.flowsheet.initial_values) 

536 update_diagnostics_results(task.flowsheet,solve_response.unit_diagnostics) 

537 

538 _send_task_notifications(task, scenario_id=solve_response.scenario_id) 

539 

540 if should_mark_parent_cancelled: 

541 _send_task_cancelled_notification(task) 

542 

543 if solve_response.scenario_id is None: 

544 mark_parent_cancelled(task) 

545 else: 

546 mark_parent_cancelled(task, scenario_id=solve_response.scenario_id) 

547 

548 

549def mark_parent_cancelled(task: Task, scenario_id: int | None = None): 

550 """Resolve a task or parent task to ``cancelled`` once cancellation has won. 

551 

552 A parent may legitimately end with zero explicitly cancelled children if the 

553 last in-flight solves complete before their kill signals land. In that case 

554 an already-``cancelling`` parent is still resolved to ``cancelled`` once all 

555 children have reached any terminal state. 

556 """ 

557 with transaction.atomic(): 

558 try: 

559 parent = ( 

560 Task.objects.select_for_update().get(id=task.parent_id) 

561 if task.parent_id 

562 else None 

563 ) 

564 except Task.DoesNotExist: 

565 logger.info( 

566 "Discarding parent cancellation update for missing parent task_id=%s.", 

567 task.parent_id, 

568 ) 

569 return 

570 

571 if parent is None: 

572 parent = task 

573 

574 has_children = parent.children.exists() 

575 has_in_flight_children = parent.children.filter( 

576 status__in=[ 

577 TaskStatus.Running, 

578 TaskStatus.Pending, 

579 TaskStatus.Cancelling, 

580 ] 

581 ).exists() 

582 

583 has_cancelled_children = parent.children.filter( 

584 status=TaskStatus.Cancelled 

585 ).exists() 

586 

587 # When a cancelling parent has children, resolve it to cancelled once all 

588 # children have reached any terminal state. This covers races where child 

589 # solves finish before their kill signals land, so none end up explicitly 

590 # marked as cancelled even though the parent cancellation should still win. 

591 should_cancel_parent = ( 

592 not has_in_flight_children 

593 and ( 

594 has_cancelled_children 

595 or (has_children and parent.status == TaskStatus.Cancelling) 

596 ) 

597 ) 

598 

599 if should_cancel_parent: 

600 parent.status = TaskStatus.Cancelled 

601 parent.completed_time = timezone.now() 

602 parent.save(update_fields=["status", "completed_time"]) 

603 

604 messaging.send_flowsheet_notification_message( 

605 parent.flowsheet_id, 

606 TaskSerializer(parent).data, 

607 NotificationServiceMessageType.TASK_CANCELLED 

608 ) 

609 queue_solve_completion_email_for_task(parent, scenario_id=scenario_id) 

610 

611 

612def process_failed_idaes_solve_response(solve_response: IdaesSolveCompletionPayload): 

613 """Handle final failure notifications for solves that could not be processed. 

614 

615 Args: 

616 solve_response: Completion payload received from the dead-letter queue. 

617 """ 

618 # Use a transaction to ensure that either everything succeeds or nothing does 

619 should_mark_parent_cancelled = False 

620 with transaction.atomic(): 

621 try: 

622 task = Task.objects.select_for_update().get(id=solve_response.task_id) 

623 except Task.DoesNotExist: 

624 logger.info( 

625 "Discarding failed solve completion for missing task_id=%s.", 

626 solve_response.task_id, 

627 ) 

628 return 

629 

630 # Silently ignore if the task has already been marked as failed. 

631 # This allows us to simulate exactly-once delivery semantics (only process 

632 # a failed task once). Our dead letter queue is configured in "at least once" delivery mode. 

633 if task.status == TaskStatus.Failed or task.status == TaskStatus.Cancelled: 633 ↛ 634line 633 didn't jump to line 634 because the condition on line 633 was never true

634 return 

635 

636 if task.status == TaskStatus.Cancelling: 636 ↛ 650line 636 didn't jump to line 650 because the condition on line 636 was always true

637 task.status = TaskStatus.Cancelled 

638 task.completed_time = timezone.now() 

639 task.log = solve_response.log 

640 task.save(update_fields=["status", "completed_time", "log"]) 

641 if task.parent_id: 641 ↛ 642line 641 didn't jump to line 642 because the condition on line 641 was never true

642 Task.increment_cancelled_children_for_parent(task.parent_id) 

643 should_mark_parent_cancelled = True 

644 if not task.parent_id: 644 ↛ 664line 644 didn't jump to line 664

645 queue_solve_completion_email_for_task( 

646 task, 

647 scenario_id=getattr(solve_response, "scenario_id", None), 

648 ) 

649 else: 

650 task.completed_time = timezone.now() 

651 task.log = solve_response.log 

652 

653 task.status = TaskStatus.Failed 

654 task.error = { 

655 "message": f"Internal server error: several attempts to process finished solve failed. {json.dumps(solve_response.error)}" 

656 } 

657 task.save() 

658 

659 _send_task_notifications( 

660 task, 

661 scenario_id=getattr(solve_response, "scenario_id", None), 

662 ) 

663 

664 if should_mark_parent_cancelled: 664 ↛ exitline 664 didn't return from function 'process_failed_idaes_solve_response' because the condition on line 664 was always true

665 _send_task_cancelled_notification(task) 

666 scenario_id = getattr(solve_response, "scenario_id", None) 

667 if scenario_id is None: 667 ↛ 670line 667 didn't jump to line 670 because the condition on line 667 was always true

668 mark_parent_cancelled(task) 

669 else: 

670 mark_parent_cancelled(task, scenario_id=scenario_id) 

671 

672 

673def process_ml_training_response( 

674 ml_training_response: MLTrainingCompletionPayload 

675): 

676 """Persist the result of a machine-learning training job and send updates. 

677 

678 Args: 

679 ml_training_response: Completion payload returned by the ML service. 

680 """ 

681 with transaction.atomic(): 

682 try: 

683 task = Task.objects.select_for_update().get(id=ml_training_response.task_id) 

684 except Task.DoesNotExist: 

685 logger.info( 

686 "Discarding ML training completion for missing task_id=%s.", 

687 ml_training_response.task_id, 

688 ) 

689 return 

690 

691 # Silently ignore if the task has already been marked as completed. 

692 # This allows us to simulate exactly-once delivery semantics (only process 

693 # a finished task once). 

694 if task.status == TaskStatus.Completed or task.status == TaskStatus.Cancelled: 694 ↛ 695line 694 didn't jump to line 695 because the condition on line 694 was never true

695 return 

696 

697 task.completed_time = timezone.now() 

698 task.log = ml_training_response.log 

699 existing_debug = task.debug or {} 

700 model_id = existing_debug.get("model_id") 

701 

702 if ml_training_response.status == "success": 

703 if ml_training_response.json_response is None: 703 ↛ 704line 703 didn't jump to line 704 because the condition on line 703 was never true

704 raise ValueError("Successful ML training payloads must include json_response.") 

705 result = ml_training_response.json_response 

706 task.status = TaskStatus.Completed 

707 task.debug = { 

708 **existing_debug, 

709 "timing": result.timing, 

710 } 

711 else: 

712 task.status = TaskStatus.Failed 

713 task.error = { 

714 "message": ml_training_response.error, 

715 "traceback": ml_training_response.traceback 

716 } 

717 if model_id is not None: 717 ↛ 720line 717 didn't jump to line 720 because the condition on line 717 was always true

718 MLModel.objects.filter(id=model_id).update(progress=4) 

719 

720 task.save( 

721 update_fields=["status", "completed_time", "log", "debug", "error"] 

722 ) 

723 

724 if task.status == TaskStatus.Completed and model_id is not None: 

725 MLModel.objects.filter(id=model_id).update( 

726 surrogate_model=result.surrogate_model, 

727 charts=[chart.model_dump() for chart in result.charts], 

728 metrics=[metric.model_dump() for metric in result.metrics], 

729 test_results_bucket=result.test_results_bucket, 

730 test_results_key=result.test_results_key, 

731 progress=3, 

732 ) 

733 if result.test_results_bucket and result.test_results_key: 733 ↛ 742line 733 didn't jump to line 742 because the condition on line 733 was always true

734 schedule_object_expiration( 

735 bucket=result.test_results_bucket, 

736 key=result.test_results_key, 

737 expires_at=expires_at_from_ttl( 

738 completed_csv_lifecycle_ttl(UploadSessionPurpose.ML_TRAINING_CSV), 

739 reference_time=task.completed_time, 

740 ), 

741 ) 

742 _send_task_notifications(task) 

743 

744 

745 

746def cancel_idaes_solve(task_id: int): 

747 """Mark an in-flight solve task as cancelled and notify subscribers. 

748 

749 For parent tasks with children (MSS scenarios), child status updates use 

750 queryset-level UPDATEs (O(1) queries regardless of child count) and the 

751 frontend is told to invalidate its child-task cache with a single 

752 ``TASK_CHILDREN_CANCELLED`` notification instead of one message per child. 

753 

754 Args: 

755 task_id: Identifier of the `Task` being cancelled. 

756 """ 

757 to_cache_cancel: set[int] = set() 

758 to_send_cancel: list[int] = [] 

759 has_children = False 

760 cancelled_at = timezone.now() 

761 with transaction.atomic(): 

762 task = Task.objects.select_for_update().get(id=task_id) 

763 

764 # Ignore cancellation request if a final status (e.g. completed or failed) has already been set 

765 if task.status in ( 

766 TaskStatus.Completed, 

767 TaskStatus.Failed, 

768 TaskStatus.Cancelled, 

769 TaskStatus.Cancelling 

770 ): 

771 return 

772 

773 if task.parent is None and task.children.exists(): 

774 has_children = True 

775 

776 # Use queryset-level updates to avoid loading potentially 

777 # hundreds of thousands of child Task objects into memory. 

778 # PostgreSQL guarantees atomicity of each UPDATE statement, 

779 # so the WHERE clause filters at execution time. 

780 non_terminal_children = task.children.exclude( 

781 status__in=[ 

782 TaskStatus.Completed, 

783 TaskStatus.Failed, 

784 TaskStatus.Cancelled, 

785 TaskStatus.Cancelling, 

786 ] 

787 ) 

788 # Immediately cancel pending children; they haven't been dispatched yet. 

789 cancelled_pending_child_count = ( 

790 non_terminal_children.filter(status=TaskStatus.Pending) 

791 .update(status=TaskStatus.Cancelled, completed_time=cancelled_at) 

792 ) 

793 # Mark remaining non-terminal children (e.g. Running) as Cancelling. 

794 non_terminal_children.exclude(status=TaskStatus.Pending).update( 

795 status=TaskStatus.Cancelling 

796 ) 

797 

798 Task.increment_cancelled_children_for_parent(task.id, cancelled_pending_child_count) 

799 

800 to_cache_cancel.add(task.id) 

801 to_send_cancel.append(task.id) 

802 task.status = TaskStatus.Cancelling 

803 else: 

804 task.status = TaskStatus.Cancelling 

805 to_cache_cancel.add(task.id) 

806 to_send_cancel.append(task.id) 

807 

808 task.save() 

809 

810 cache_cancelled_tasks(to_cache_cancel) 

811 for cancel_task_id in to_send_cancel: 

812 # For MSS, broadcasting just the parent task ID is enough: every child 

813 # solve consults shared cancellation state for both its own ID and its 

814 # parent ID before starting work. 

815 messaging.send_task_cancel_message(cancel_task_id) 

816 

817 # Send a single parent-level notification; for parent tasks with children, 

818 # include a children-invalidated event so the frontend drops its cached 

819 # child pages and refetches on demand rather than processing one message 

820 # per child task. 

821 notification_messages = [ 

822 NotificationServiceMessage( 

823 data=TaskSerializer(task).data, 

824 message_type=NotificationServiceMessageType.TASK_CANCELLING, 

825 ) 

826 ] 

827 if has_children: 

828 notification_messages.append( 

829 NotificationServiceMessage( 

830 data=TaskSerializer(task).data, 

831 message_type=NotificationServiceMessageType.TASK_CHILDREN_CANCELLED, 

832 ) 

833 ) 

834 messaging.send_flowsheet_notification_messages( 

835 task.flowsheet_id, notification_messages, 

836 ) 

837 mark_parent_cancelled(task) 

838 

839 

840def process_cancel_solve_response(cancel_response: TaskPayload): 

841 """Persist a remote cancellation acknowledgement from the IDAES service. 

842 

843 The IDAES service emits this only after it has successfully interrupted the 

844 active solver, so Django can safely settle the task to a terminal 

845 ``cancelled`` state and then check whether the parent can also be finalised. 

846 """ 

847 notification_message: NotificationServiceMessage | None = None 

848 # Use a transaction to ensure that either everything succeeds or nothing does 

849 with transaction.atomic(): 

850 try: 

851 task = Task.objects.select_for_update().get(id=cancel_response.task_id) 

852 except Task.DoesNotExist: 

853 logger.info( 

854 "Discarding cancel acknowledgement for missing task_id=%s.", 

855 cancel_response.task_id, 

856 ) 

857 return 

858 

859 # Timeout expiry can arrive before Django transitions a task to 

860 # `Cancelling`, so accept pending/running states for timeout paths. 

861 allowed_statuses = {TaskStatus.Cancelling} 

862 if cancel_response.timed_out: 

863 allowed_statuses.update((TaskStatus.Pending, TaskStatus.Running)) 

864 

865 if task.status not in allowed_statuses: 

866 return 

867 

868 task.status = TaskStatus.Cancelled 

869 task.completed_time = timezone.now() 

870 # Preserve timeout metadata on the task debug payload so downstream 

871 # notifications have the value that actually governed the solve. 

872 solve_timeout_seconds = (task.debug or {}).get( 

873 "solve_timeout_seconds", 

874 SOLVE_TIMEOUT_DEFAULT_SECONDS, 

875 ) 

876 task.debug = { 

877 **(task.debug or {}), 

878 "timed_out": bool(cancel_response.timed_out), 

879 "solve_timeout_seconds": solve_timeout_seconds, 

880 } 

881 if cancel_response.timed_out: 

882 timeout_message = ( 

883 f"Solve timed out after {solve_timeout_seconds} seconds." 

884 ) 

885 task.log = ( 

886 f"{task.log}\n{timeout_message}" if task.log else timeout_message 

887 ) 

888 task.save(update_fields=["status", "completed_time", "debug", "log"]) 

889 if task.parent_id: 

890 Task.increment_cancelled_children_for_parent(task.parent_id) 

891 notification_message = NotificationServiceMessage( 

892 data=TaskSerializer(task).data, 

893 message_type=NotificationServiceMessageType.TASK_CANCELLED 

894 ) 

895 

896 if notification_message is not None: 896 ↛ 902line 896 didn't jump to line 902 because the condition on line 896 was always true

897 messaging.send_flowsheet_notification_messages( 

898 task.flowsheet_id, [notification_message] 

899 ) 

900 queue_solve_completion_email_for_task(task) 

901 

902 mark_parent_cancelled(task) 

903 

904 

905def process_build_state_response(build_state_response: BuildStateCompletionPayload): 

906 """ 

907 Apply an asynchronous build-state completion payload from IDAES. 

908 

909 Successful responses update property values for the originating flowsheet. 

910 App-level failures are intentionally acknowledged without applying partial 

911 data and then surfaced to frontend subscribers. 

912 """ 

913 context = build_state_response.context 

914 if context is None: 

915 logger.warning( 

916 "Discarding build-state response without request context." 

917 ) 

918 return 

919 

920 if not _settle_build_state_task_from_completion(build_state_response): 

921 return 

922 

923 if not _is_current_build_state_response(context): 

924 return 

925 

926 if build_state_response.status == CompletionStatus.SUCCESS: 

927 store_properties_schema( 

928 build_state_response.properties, 

929 context.flowsheet_id, 

930 ) 

931 _send_build_state_completion_notification(build_state_response) 

932 return 

933 

934 logger.warning( 

935 "Build-state request failed for flowsheet_id=%s stream_id=%s " 

936 "property_set_id=%s: %s", 

937 context.flowsheet_id, 

938 context.stream_id, 

939 context.property_set_id, 

940 build_state_response.error, 

941 ) 

942 _send_build_state_completion_notification(build_state_response) 

943 

944 

945def _settle_build_state_task_from_completion( 

946 build_state_response: BuildStateCompletionPayload, 

947) -> bool: 

948 """Record the final status for a tracked build-state request. 

949 

950 Returns ``False`` when a terminal task already exists for this payload. That 

951 lets duplicate completion deliveries remain idempotent, matching solve and 

952 ML task handling. 

953 """ 

954 context = build_state_response.context 

955 if context is None: 955 ↛ 956line 955 didn't jump to line 956 because the condition on line 955 was never true

956 return True 

957 

958 with transaction.atomic(): 

959 try: 

960 task = Task.objects.select_for_update().get(id=context.task_id) 

961 except Task.DoesNotExist: 

962 logger.info( 

963 "Discarding build-state completion for missing task_id=%s.", 

964 context.task_id, 

965 ) 

966 return False 

967 

968 if task.status in ( 968 ↛ 973line 968 didn't jump to line 973 because the condition on line 968 was never true

969 TaskStatus.Completed, 

970 TaskStatus.Failed, 

971 TaskStatus.Cancelled, 

972 ): 

973 return False 

974 

975 if task.status == TaskStatus.Cancelling: 975 ↛ 976line 975 didn't jump to line 976 because the condition on line 975 was never true

976 task.status = TaskStatus.Cancelled 

977 elif build_state_response.status == CompletionStatus.SUCCESS: 

978 task.status = TaskStatus.Completed 

979 else: 

980 _revert_build_state_task_values(task) 

981 task.status = TaskStatus.Failed 

982 task.error = { 

983 "message": build_state_response.error, 

984 "cause": "build_state_request", 

985 "traceback": build_state_response.traceback, 

986 } 

987 

988 task.completed_time = timezone.now() 

989 task.log = build_state_response.log 

990 task.debug = { 

991 **(task.debug or {}), 

992 "context": context.model_dump(mode="json"), 

993 } 

994 task.save(update_fields=["status", "completed_time", "log", "debug", "error"]) 

995 

996 _send_task_notifications(task) 

997 return task.status != TaskStatus.Cancelled 

998 

999 

1000def _is_current_build_state_response(context: BuildStateRequestContext) -> bool: 

1001 """Return whether a build-state response still matches the latest request.""" 

1002 return _is_current_build_state_context(context, "build-state response") 

1003 

1004 

1005def _is_current_build_state_context( 

1006 context: BuildStateRequestContext, 

1007 event_label: str, 

1008) -> bool: 

1009 """Return whether a build-state event context still matches the latest request.""" 

1010 try: 

1011 latest_request_version = BuildStateRequestVersion.objects.only("version").get( 

1012 property_set_id=context.property_set_id, 

1013 ).version 

1014 except BuildStateRequestVersion.DoesNotExist: 

1015 logger.warning( 

1016 "Discarding %s because no request version tracker exists " 

1017 "for flowsheet_id=%s stream_id=%s property_set_id=%s request_version=%s.", 

1018 event_label, 

1019 context.flowsheet_id, 

1020 context.stream_id, 

1021 context.property_set_id, 

1022 context.request_version, 

1023 ) 

1024 return False 

1025 

1026 if latest_request_version != context.request_version: 

1027 logger.info( 

1028 "Discarding stale %s for flowsheet_id=%s stream_id=%s " 

1029 "property_set_id=%s request_version=%s latest_request_version=%s.", 

1030 event_label, 

1031 context.flowsheet_id, 

1032 context.stream_id, 

1033 context.property_set_id, 

1034 context.request_version, 

1035 latest_request_version, 

1036 ) 

1037 return False 

1038 

1039 return True 

1040 

1041 

1042def _send_build_state_completion_notification( 

1043 build_state_response: BuildStateCompletionPayload, 

1044): 

1045 """Broadcast a build-state completion update to flowsheet subscribers.""" 

1046 context = build_state_response.context 

1047 if context is None: 1047 ↛ 1048line 1047 didn't jump to line 1048 because the condition on line 1047 was never true

1048 return 

1049 

1050 payload = BuildStateCompletedPayload( 

1051 context=context, 

1052 status=build_state_response.status, 

1053 properties=build_state_response.properties, 

1054 error=build_state_response.error, 

1055 failure_kind=( 

1056 "idaes-error" 

1057 if build_state_response.status == CompletionStatus.ERROR 

1058 else None 

1059 ), 

1060 ) 

1061 messaging.send_flowsheet_notification_message( 

1062 context.flowsheet_id, 

1063 payload.model_dump(mode="json"), 

1064 NotificationServiceMessageType.BUILD_STATE_COMPLETED, 

1065 ) 

1066 

1067 

1068def process_build_state_request_dead_letter(build_state_request: BuildStateRequestSchema): 

1069 """Record that a build-state request failed broker delivery. 

1070 

1071 Dead letters are transport-level failures, not IDAES state-block solve 

1072 failures. This handler avoids applying property data and only emits a 

1073 diagnostic notification when the dead-lettered request is still the latest 

1074 request. 

1075 """ 

1076 context = build_state_request.context 

1077 if context is None: 

1078 logger.warning( 

1079 "Build-state request reached the dead-letter topic without request context." 

1080 ) 

1081 return 

1082 

1083 if not _settle_build_state_task_from_delivery_failure(context): 

1084 return 

1085 

1086 if not _is_current_build_state_context(context, "build-state request dead letter"): 1086 ↛ 1087line 1086 didn't jump to line 1087 because the condition on line 1086 was never true

1087 return 

1088 

1089 logger.warning( 

1090 "Build-state request delivery failed for flowsheet_id=%s stream_id=%s " 

1091 "property_set_id=%s request_version=%s.", 

1092 context.flowsheet_id, 

1093 context.stream_id, 

1094 context.property_set_id, 

1095 context.request_version, 

1096 ) 

1097 _send_build_state_delivery_failure_notification(context) 

1098 

1099 

1100def _settle_build_state_task_from_delivery_failure( 

1101 context: BuildStateRequestContext, 

1102) -> bool: 

1103 """Mark a tracked build-state request as failed when broker delivery fails.""" 

1104 with transaction.atomic(): 

1105 try: 

1106 task = Task.objects.select_for_update().get(id=context.task_id) 

1107 except Task.DoesNotExist: 

1108 logger.info( 

1109 "Discarding build-state delivery failure for missing task_id=%s.", 

1110 context.task_id, 

1111 ) 

1112 return False 

1113 

1114 if task.status in ( 1114 ↛ 1119line 1114 didn't jump to line 1119 because the condition on line 1114 was never true

1115 TaskStatus.Completed, 

1116 TaskStatus.Failed, 

1117 TaskStatus.Cancelled, 

1118 ): 

1119 return False 

1120 

1121 task.status = ( 

1122 TaskStatus.Cancelled 

1123 if task.status == TaskStatus.Cancelling 

1124 else TaskStatus.Failed 

1125 ) 

1126 if task.status == TaskStatus.Failed: 1126 ↛ 1128line 1126 didn't jump to line 1128 because the condition on line 1126 was always true

1127 _revert_build_state_task_values(task) 

1128 task.completed_time = timezone.now() 

1129 task.error = { 

1130 "message": "Build-state request delivery failed.", 

1131 "cause": "message_delivery", 

1132 "traceback": None, 

1133 } 

1134 task.debug = { 

1135 **(task.debug or {}), 

1136 "context": context.model_dump(mode="json"), 

1137 } 

1138 task.save(update_fields=["status", "completed_time", "error", "debug"]) 

1139 

1140 _send_task_notifications(task) 

1141 return task.status != TaskStatus.Cancelled 

1142 

1143 

1144def _send_build_state_delivery_failure_notification( 

1145 context: BuildStateRequestContext, 

1146): 

1147 """Broadcast a silent transport-level build-state request failure event.""" 

1148 payload = BuildStateCompletedPayload( 

1149 context=context, 

1150 status="error", 

1151 properties=None, 

1152 error=None, 

1153 failure_kind="delivery-failure", 

1154 ) 

1155 messaging.send_flowsheet_notification_message( 

1156 context.flowsheet_id, 

1157 payload.model_dump(mode="json"), 

1158 NotificationServiceMessageType.BUILD_STATE_COMPLETED, 

1159 ) 

1160 

1161 

1162def generate_IDAES_python_request(flowsheet_id: int) -> Response: 

1163 """Return the flowsheet JSON for the given flowsheet. 

1164 

1165 The previous behaviour attempted to generate Python source by forwarding 

1166 the flowsheet to the IDAES service. Python generation is no longer 

1167 supported; this function now always returns the local flowsheet JSON. 

1168 """ 

1169 scenario = None 

1170 flowsheet = Flowsheet.objects.get(id=flowsheet_id) 

1171 factory = IdaesFactory(group_id=flowsheet.rootGrouping.id, scenario=scenario, require_variables_fixed=False) 

1172 response_data = ResponseType( 

1173 status="success", 

1174 error=None, 

1175 log=None, 

1176 debug=None 

1177 ) 

1178 try: 

1179 factory.build() 

1180 data = factory.flowsheet 

1181 # use model_dump_json so that pydantic can handle any non-serializable fields instead 

1182 # of django's logic which would error out. 

1183 return Response(data.model_dump_json(), status=200) 

1184 except Exception as e: 

1185 response_data["status"] = "error" 

1186 response_data["error"] = { 

1187 "message": str(e), 

1188 "traceback": traceback.format_exc() 

1189 } 

1190 return Response(response_data, status=400) 

1191 # NOTE: Python generation used to be implemented by forwarding the 

1192 # flowsheet to the IDAES service; that pathway has been removed. We 

1193 # have already returned above with the JSON representation. 

1194 

1195 

1196class BuildStateSolveError(Exception): 

1197 pass 

1198 

1199 

1200def _next_build_state_request_version(property_set: PropertySet) -> int: 

1201 """Increment and return the latest build-state request version for a property set.""" 

1202 try: 

1203 return _next_build_state_request_version_locked(property_set) 

1204 except IntegrityError: 

1205 # A concurrent first request may have created the tracker between the 

1206 # get and create inside get_or_create. Retry once and lock the new row. 

1207 return _next_build_state_request_version_locked(property_set) 

1208 

1209 

1210def _next_build_state_request_version_locked(property_set: PropertySet) -> int: 

1211 """Increment the version while holding a row lock for this property set.""" 

1212 with transaction.atomic(): 

1213 tracker, _ = BuildStateRequestVersion.objects.select_for_update().get_or_create( 

1214 property_set_id=property_set.id, 

1215 defaults={"flowsheet_id": property_set.flowsheet_id}, 

1216 ) 

1217 tracker.version += 1 

1218 tracker.save(update_fields=["version", "updated_at"]) 

1219 return tracker.version 

1220 

1221 

1222def state_request_build( 

1223 stream: SimulationObject, 

1224 *, 

1225 task_id: int, 

1226) -> BuildStateRequestSchema: 

1227 """Build the IDAES service payload for a stream state request.""" 

1228 ctx = IdaesFactoryContext(stream.flowsheet.rootGrouping.id) 

1229 property_set = stream.properties 

1230 

1231 port = stream.connectedPorts.get(direction="inlet") 

1232 unitop: SimulationObject = port.unitOp 

1233 # find the property package key for this port (we have the value, the key of this port) 

1234 property_package_ports = unitop.schema.propertyPackagePorts 

1235 for key, port_list in property_package_ports.items(): 

1236 if port.key in port_list: 

1237 property_package_key = key 

1238 PropertyPackageAdapter( 

1239 property_package_key).serialise(ctx, unitop) 

1240 

1241 return BuildStateRequestSchema( 

1242 property_package=ctx.property_packages[0], 

1243 properties=serialise_stream(ctx, stream, is_inlet=True), 

1244 context=BuildStateRequestContext( 

1245 flowsheet_id=stream.flowsheet_id, 

1246 stream_id=stream.id, 

1247 task_id=task_id, 

1248 property_set_id=property_set.id, 

1249 request_version=_next_build_state_request_version(property_set), 

1250 ), 

1251 ) 

1252 

1253 

1254def _create_build_state_task( 

1255 stream: SimulationObject, 

1256 user: User | int, 

1257 rollback_values: dict[int, float] | None = None, 

1258) -> Task: 

1259 """Create the tracking task used for a build-state request.""" 

1260 task = Task.create( 

1261 user, 

1262 stream.flowsheet_id, 

1263 task_type=TaskType.BUILD_STATE, 

1264 status=TaskStatus.Pending, 

1265 save=True, 

1266 ) 

1267 task.debug = { 

1268 "stream_id": stream.id, 

1269 "property_set_id": stream.properties.id, 

1270 "rollback_values": ( 

1271 {str(prop_id): value for prop_id, value in rollback_values.items()} 

1272 if rollback_values 

1273 else {} 

1274 ), 

1275 } 

1276 task.save(update_fields=["debug"]) 

1277 return task 

1278 

1279 

1280def _revert_build_state_task_values(task: Task) -> None: 

1281 """Restore raw property values captured before the build-state request.""" 

1282 rollback_values = (task.debug or {}).get("rollback_values") 

1283 if not rollback_values: 

1284 return 

1285 

1286 property_values = list( 

1287 PropertyValue.objects.filter(id__in=[int(prop_id) for prop_id in rollback_values]) 

1288 ) 

1289 for property_value in property_values: 

1290 property_value.value = rollback_values[str(property_value.id)] 

1291 

1292 PropertyValue.objects.bulk_update(property_values, ["value"]) 

1293 

1294 

1295def _fail_build_state_task_before_dispatch( 

1296 task: Task, 

1297 exception: Exception, 

1298 *, 

1299 cause: str, 

1300) -> None: 

1301 """Persist a build-state setup/dispatch failure before re-raising it.""" 

1302 _revert_build_state_task_values(task) 

1303 task.set_failure_with_exception( 

1304 DetailedException(exception, source=cause), 

1305 save=True, 

1306 ) 

1307 _send_task_notifications(task) 

1308 

1309 

1310def _mark_build_state_task_dispatched( 

1311 task: Task, 

1312 context: BuildStateRequestContext | None, 

1313) -> None: 

1314 """Persist dispatch metadata for a queued build-state task.""" 

1315 task.debug = { 

1316 **(task.debug or {}), 

1317 "idaes_dispatched": True, 

1318 "context": ( 

1319 context.model_dump(mode="json") if context is not None else None 

1320 ), 

1321 } 

1322 task.save(update_fields=["debug"]) 

1323 

1324 

1325def build_state_request( 

1326 stream: SimulationObject, 

1327 user: User | int, 

1328 rollback_values: dict[int, float] | None = None, 

1329): 

1330 """Queue a state build request for the provided stream. 

1331 

1332 Args: 

1333 stream: Stream object whose inlet properties should be used for the build. 

1334 user: User requesting the property update that triggered the build. 

1335 

1336 Returns: 

1337 REST response containing the tracking task. Built properties are applied 

1338 later by ``process_build_state_response`` when IDAES publishes the 

1339 completion event. 

1340 

1341 Raises: 

1342 BuildStateSolveError: If the request cannot be queued. 

1343 Exception: If preparing the payload fails for any reason. 

1344 """ 

1345 task = _create_build_state_task( 

1346 stream, 

1347 user, 

1348 rollback_values=rollback_values, 

1349 ) 

1350 try: 

1351 data = state_request_build(stream, task_id=task.id) 

1352 messaging.send_idaes_build_state_request_message(data) 

1353 _mark_build_state_task_dispatched(task, data.context) 

1354 return Response(TaskSerializer(task).data, status=202) 

1355 except Exception as e: 

1356 _fail_build_state_task_before_dispatch( 

1357 task, 

1358 e, 

1359 cause="build_state_dispatch", 

1360 ) 

1361 raise BuildStateSolveError(str(e))