Coverage for backend/django/idaes_factory/endpoints.py: 85%
506 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1import json
2import logging
3import os
4from typing import TypedDict
5from common.models.general import TaskPayload
6import requests
7import traceback
9from django.db import IntegrityError, transaction
10from django.utils import timezone
11from opentelemetry import trace
12from rest_framework.exceptions import ValidationError
13from rest_framework.response import Response
14from pydantic import JsonValue
16from CoreRoot import settings
17from authentication.user.models import User
18from ahuora_builder_types.payloads.build_state_request_schema import (
19 BuildStateCompletionPayload,
20 BuildStateRequestContext,
21 BuildStateRequestSchema,
22)
23from ahuora_builder_types.payloads.solve_request_schema import IdaesSolveRequestPayload, IdaesSolveCompletionPayload, \
24 MultiSolvePayload
25from common.models.idaes.payloads.solve_request_schema import CompletionStatus
26from ahuora_builder_types.payloads.ml_request_schema import MLTrainRequestPayload, MLTrainingCompletionPayload
27from core.auxiliary.models.MLModel import MLModel
28from core.auxiliary.models.UploadSession import UploadSessionPurpose
29from common.models.notifications.payloads import (
30 BuildStateCompletedPayload,
31 TaskCompletedPayload,
32 NotificationServiceMessageType,
33 NotificationServiceMessage,
34)
35from core.auxiliary.enums.generalEnums import TaskStatus
36from core.auxiliary.models.Task import Task, TaskType
37from core.auxiliary.models.BuildStateRequestVersion import BuildStateRequestVersion
38from core.auxiliary.models.PropertySet import PropertySet
39from core.auxiliary.models.PropertyValue import PropertyValue
40from core.auxiliary.models.Flowsheet import Flowsheet
41from core.auxiliary.serializers import TaskSerializer
42from common.services.task_cancellation_state import cache_cancelled_tasks
43from core.auxiliary.services.csv_lifecycle import completed_csv_lifecycle_ttl, expires_at_from_ttl
44from core.auxiliary.services.object_storage.s3 import schedule_object_expiration
45from core.auxiliary.services.solve_completion_email import queue_solve_completion_email_for_task
46from core.auxiliary.services.parameter_sweep import validate_parameter_sweep_solve_ready
47from core.exceptions import DetailedException
48from .idaes_factory import IdaesFactory, save_all_initial_values, store_properties_schema
49from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
50from .adapters.stream_properties import serialise_stream
51from .adapters.property_package_adapter import PropertyPackageAdapter
52from .idaes_factory_context import IdaesFactoryContext
53from core.auxiliary.models.Scenario import (
54 Scenario,
55 ScenarioTabTypeEnum,
56 SOLVE_TIMEOUT_DEFAULT_SECONDS,
57)
58from common.services import messaging
59from diagnostics.methods.update_diagnostic_result import update_diagnostics_results
61logger = logging.getLogger(__name__)
62tracer = trace.get_tracer(settings.OPEN_TELEMETRY_TRACER_NAME)
63PARENT_TASK_CANCEL_CHECK_BATCH_SIZE = 25
66def _resolve_solve_timeout_seconds(
67 scenario: Scenario | None,
68 *,
69 parent_task_id: int | None,
70) -> int | None:
71 """Return the solve timeout that should be forwarded to one IDAES solve.
73 MSS parent tasks are orchestration-only and are not forwarded to the IDAES
74 service as solve requests. Their child solves still inherit the scenario
75 timeout, while direct solves fall back to the platform default
76 when no scenario was selected.
77 """
78 if scenario is None:
79 return SOLVE_TIMEOUT_DEFAULT_SECONDS
80 return scenario.solve_timeout_seconds
83class IdaesServiceRequestException(Exception):
84 def __init__(self, message: str) -> None:
85 super().__init__(message)
86 self.message = message
89class SolveFlowsheetError(DetailedException):
90 pass
93class ResponseType(TypedDict, total=False):
94 status: str
95 error: dict | None
96 log: str | None
97 debug: dict | None
100def idaes_service_request(endpoint: str, data: JsonValue) -> JsonValue:
101 """Send a JSON payload to the configured IDAES service endpoint.
103 Args:
104 endpoint: Relative path of the IDAES service endpoint to call.
105 data: Serialised payload that conforms to the endpoint schema.
107 Returns:
108 Parsed JSON response returned by the IDAES service.
110 Raises:
111 IdaesServiceRequestException: If the service responds with a non-200 status.
112 """
113 url = (os.getenv('IDAES_SERVICE_URL')
114 or "http://localhost:8080") + "/" + endpoint
115 result = requests.post(url, json=data)
116 if result.status_code != 200:
117 raise IdaesServiceRequestException(result.json())
119 return result.json()
122@tracer.start_as_current_span("send_flowsheet_solve_request")
123def _solve_flowsheet_request(
124 task_id: int,
125 built_factory: IdaesFactory,
126 parent_task_id: int | None = None,
127 perform_diagnostics: bool = False,
128 high_priority: bool = False
129):
130 """Queue an IDAES solve request for the provided flowsheet build.
132 Args:
133 task_id: Identifier of the task tracking the solve request.
134 built_factory: Fully built factory containing flowsheet data to solve.
135 perform_diagnostics: Whether to request diagnostic output from IDAES.
136 high_priority: Whether the message should be prioritised over normal solves.
138 Raises:
139 SolveFlowsheetError: If the message cannot be dispatched to the queue.
140 """
142 try:
143 solve_timeout_seconds = _resolve_solve_timeout_seconds(
144 built_factory.scenario,
145 parent_task_id=parent_task_id,
146 )
147 idaes_payload = IdaesSolveRequestPayload(
148 flowsheet=built_factory.flowsheet,
149 solve_index=built_factory.solve_index,
150 scenario_id=(
151 built_factory.scenario.id if built_factory.scenario else None),
152 task_id=task_id,
153 parent_task_id=parent_task_id,
154 perform_diagnostics=perform_diagnostics,
155 solve_timeout_seconds=solve_timeout_seconds,
156 )
158 messaging.send_idaes_solve_message(
159 idaes_payload, high_priority=high_priority)
160 except Exception as e:
161 raise SolveFlowsheetError(e, "idaes_factory_solve_message")
164def _mark_task_dispatched_to_idaes(task: Task) -> None:
165 """Persist that a solve task was successfully queued to the IDAES service."""
166 task.debug = {
167 **(task.debug or {}),
168 "idaes_dispatched": True,
169 }
170 task.save(update_fields=["debug"])
173def start_flowsheet_solve_event(
174 flowsheet_id: int,
175 group_id: int,
176 user: User,
177 scenario: Scenario = None,
178 perform_diagnostics: bool = False
179) -> Response:
180 """Start a single solve for the given flowsheet and return the tracking task.
182 Args:
183 flowsheet_id: Identifier of the flowsheet that should be solved.
184 user: User initiating the solve request.
185 scenario: Optional scenario providing context for the solve.
186 perform_diagnostics: Whether the solve should run with diagnostic output enabled.
188 Returns:
189 REST response containing the serialised `Task` used to track the solve.
190 """
191 if scenario and scenario.state_name != ScenarioTabTypeEnum.SteadyState and not scenario.dataRows.exists(): 191 ↛ 193line 191 didn't jump to line 193 because the condition on line 191 was never true
192 # if not steady state solves and no data rows exist, cannot proceed
193 return Response(status=400, data=f"No data was provided for {scenario.state_name} scenario.")
195 try:
196 # Remove all previously created DynamicResults for this scenario
197 scenario.solutions.all().delete()
198 except:
199 # there's no solution yet, that's fine
200 pass
202 solve_task = Task.create(
203 user, flowsheet_id, status=TaskStatus.Pending, save=True)
204 # Persist the user's intent alongside the task so downstream consumers
205 # can tell whether this solve was launched with diagnostics enabled.
206 debug: dict[str, JsonValue] = {
207 **(solve_task.debug or {}),
208 "perform_diagnostics": bool(perform_diagnostics),
209 "solve_timeout_seconds": (
210 scenario.solve_timeout_seconds
211 if scenario is not None
212 else SOLVE_TIMEOUT_DEFAULT_SECONDS
213 ),
214 }
215 if scenario is not None:
216 debug["scenario_id"] = scenario.id
217 solve_task.debug = debug
218 solve_task.save(update_fields=["debug"])
220 try:
221 factory = IdaesFactory(
222 group_id=group_id,
223 scenario=scenario
224 )
225 factory.build()
227 # We send single solve requests as high priority to ensure
228 # they are not blocked by large multi-solve requests.
229 _solve_flowsheet_request(
230 solve_task.id,
231 factory,
232 perform_diagnostics=perform_diagnostics,
233 high_priority=True
234 )
235 _mark_task_dispatched_to_idaes(solve_task)
236 except DetailedException as e:
237 solve_task.set_failure_with_exception(e, save=True)
239 task_serializer = TaskSerializer(solve_task)
241 return Response(task_serializer.data, status=200)
244def start_multi_steady_state_solve_event(flowsheet_id: int, user: User, scenario: Scenario) -> Response:
245 """Kick off a multi steady-state solve and return the parent tracking task.
247 Args:
248 flowsheet_id: Identifier of the flowsheet being solved.
249 user: User who requested the multi-solve.
250 scenario: Scenario containing the steady-state configurations to solve.
252 Returns:
253 REST response containing the parent `Task` that aggregates child solves.
254 """
255 if not scenario.dataRows.exists(): # empty rows 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true
256 return Response(status=400, data="No data was provided for multi steady-state scenario.")
257 try:
258 validate_parameter_sweep_solve_ready(scenario)
259 except ValidationError as exc:
260 return Response(status=400, data=exc.detail)
262 # Remove all previously created DynamicResults for this scenario
263 scenario.solutions.all().delete()
264 solve_iterations = scenario.dataRows.count()
266 parent_task = Task.create_parent_task(
267 creator=user,
268 flowsheet_id=flowsheet_id,
269 scheduled_tasks=solve_iterations,
270 status=TaskStatus.Running
271 )
272 parent_task.debug = {"scenario_id": scenario.id}
273 parent_task.save(update_fields=["debug"])
274 child_tasks: list[Task] = [
275 Task.create(
276 user,
277 flowsheet_id,
278 parent=parent_task,
279 status=TaskStatus.Pending
280 ) for _ in range(solve_iterations)
281 ]
283 Task.objects.bulk_create(child_tasks)
285 messaging.send_dispatch_multi_solve_message(MultiSolvePayload(
286 task_id=parent_task.id,
287 scenario_id=scenario.id)
288 )
290 return Response(TaskSerializer(parent_task).data, status=200)
293def dispatch_multi_solves(parent_task_id: int, scenario_id: int):
294 """Build and dispatch queued steady-state solves for each child task.
296 Args:
297 parent_task_id: Identifier of the parent multi-solve task.
298 scenario_id: Scenario containing the data rows to iterate through.
299 """
300 parent_task = Task.objects.get(id=parent_task_id)
301 scenario = Scenario.objects.get(id=scenario_id)
303 rootGroup = scenario.flowsheet.rootGrouping
304 factory = IdaesFactory(
305 group_id=rootGroup.id,
306 scenario=scenario,
307 )
309 child_tasks = list(parent_task.children.order_by('start_time'))
310 last_child_index = len(child_tasks) - 1
312 for solve_index, task in enumerate(child_tasks):
313 should_check_parent_status = (
314 solve_index % PARENT_TASK_CANCEL_CHECK_BATCH_SIZE == 0
315 or solve_index == last_child_index
316 )
317 if should_check_parent_status:
318 parent_task.refresh_from_db(fields=["status"])
319 if parent_task.status in (TaskStatus.Cancelling, TaskStatus.Cancelled):
320 logger.info(
321 "Stopping multi-solve dispatch for parent task %s because it is now %s.",
322 parent_task.id,
323 parent_task.status,
324 )
325 break
327 try:
328 factory.clear_flowsheet()
329 factory.use_with_solve_index(solve_index)
330 factory.build()
332 _solve_flowsheet_request(
333 task.id,
334 factory,
335 parent_task_id=parent_task.id,
336 )
337 _mark_task_dispatched_to_idaes(task)
338 except DetailedException as e:
339 task.set_failure_with_exception(exception=e, save=True)
340 parent_completed = parent_task.update_status_from_child(task)
341 if parent_completed:
342 queue_solve_completion_email_for_task(parent_task, scenario_id=scenario.id)
344 flowsheet_messages = [
345 NotificationServiceMessage(
346 data=TaskSerializer(task).data,
347 message_type=NotificationServiceMessageType.TASK_UPDATED
348 ) for task in [task, parent_task]
349 ]
351 messaging.send_flowsheet_notification_messages(
352 parent_task.flowsheet_id, flowsheet_messages)
355def start_ml_training_event(
356 csv_bucket: str,
357 csv_key: str,
358 csv_delimiter: str | None,
359 input_labels: list[str],
360 output_labels: list[str],
361 user: User,
362 flowsheet_id: int,
363 model_id: int | None = None,
364):
365 """Queue an asynchronous machine-learning training job for the given dataset.
367 Args:
368 csv_bucket: Bucket containing the training CSV.
369 csv_key: Object key containing the training CSV.
370 csv_delimiter: Optional delimiter for the CSV object.
371 input_labels: Names of the input features.
372 output_labels: Names of the predicted outputs.
373 user: User requesting the training run.
374 flowsheet_id: Flowsheet the training run is associated with.
376 Returns:
377 REST response containing the serialised `Task` for the training job.
378 """
379 training_task = Task.create(
380 user,
381 flowsheet_id,
382 task_type=TaskType.ML_TRAINING,
383 status=TaskStatus.Pending,
384 save=True
385 )
386 training_task.debug = {"model_id": model_id}
387 training_task.save(update_fields=["debug"])
389 try:
390 payload = MLTrainRequestPayload(
391 csv_bucket=csv_bucket,
392 csv_key=csv_key,
393 csv_delimiter=csv_delimiter,
394 input_labels=input_labels,
395 output_labels=output_labels,
396 task_id=training_task.id,
397 )
398 messaging.send_ml_training_message(payload)
400 except DetailedException as e:
401 training_task.set_failure_with_exception(e, save=True)
402 if model_id is not None: 402 ↛ 404line 402 didn't jump to line 404 because the condition on line 402 was always true
403 MLModel.objects.filter(id=model_id).update(progress=4)
404 _send_task_notifications(training_task)
406 task_serializer = TaskSerializer(training_task)
407 return Response(task_serializer.data, status=200)
410def _send_task_notifications(task: Task, scenario_id: int | None = None):
411 """Broadcast task completion or status updates to interested flowsheet clients.
413 Args:
414 task: Task whose status change should be pushed to subscribers.
415 """
416 flowsheet_messages = []
418 # If this is a child task, update the parent task status
419 if task.parent:
420 parent_completed = task.parent.update_status_from_child(task)
421 if parent_completed and task.parent.task_type == TaskType.IDAES_SOLVE:
422 queue_solve_completion_email_for_task(task.parent, scenario_id=scenario_id)
424 message_type = (NotificationServiceMessageType.TASK_COMPLETED
425 if task.parent.status == TaskStatus.Completed else
426 NotificationServiceMessageType.TASK_UPDATED)
428 flowsheet_messages.append(NotificationServiceMessage(
429 data=TaskSerializer(task.parent).data,
430 message_type=message_type
431 ))
432 else:
433 if task.task_type == TaskType.IDAES_SOLVE:
434 queue_solve_completion_email_for_task(task, scenario_id=scenario_id)
436 flowsheet_messages.append(NotificationServiceMessage(
437 data=TaskSerializer(task).data,
438 message_type=NotificationServiceMessageType.TASK_COMPLETED
439 ))
441 messaging.send_flowsheet_notification_messages(
442 task.flowsheet_id, flowsheet_messages)
445def _send_task_cancelled_notification(task: Task):
446 """Broadcast that a task settled in the cancelled state."""
447 messaging.send_flowsheet_notification_messages(
448 task.flowsheet_id,
449 [
450 NotificationServiceMessage(
451 data=TaskSerializer(task).data,
452 message_type=NotificationServiceMessageType.TASK_CANCELLED,
453 )
454 ],
455 )
458def process_idaes_solve_response(solve_response: IdaesSolveCompletionPayload):
459 """Persist the outcome of a completed IDAES solve and notify listeners.
461 Args:
462 solve_response: Payload describing the finished solve result.
463 """
464 # Use a transaction to ensure that either everything succeeds or nothing does
465 should_mark_parent_cancelled = False
466 with transaction.atomic():
467 try:
468 task = (
469 Task.objects
470 .select_related("flowsheet") # eager load FK/OneToOne relations
471 .select_for_update() # lock rows in transaction
472 .get(id=solve_response.task_id)
473 )
474 except Task.DoesNotExist:
475 logger.info(
476 "Discarding solve completion for missing task_id=%s.",
477 solve_response.task_id,
478 )
479 return
482 # Silently ignore if the task has already been marked as completed.
483 # This allows us to simulate exactly-once delivery semantics (only process
484 # a finished task once).
485 if task.status == TaskStatus.Completed or task.status == TaskStatus.Cancelled: 485 ↛ 486line 485 didn't jump to line 486 because the condition on line 485 was never true
486 return
488 # If cancellation won the race before the completion payload arrived,
489 # keep the task cancelled rather than reviving it back to completed/failed.
490 if task.status == TaskStatus.Cancelling:
491 task.status = TaskStatus.Cancelled
492 task.completed_time = timezone.now()
493 task.log = solve_response.log
494 task.debug = {
495 **(task.debug or {}),
496 "timing": solve_response.timing or {},
497 }
498 task.save(update_fields=["status", "completed_time", "log", "debug"])
499 if task.parent_id: 499 ↛ 500line 499 didn't jump to line 500 because the condition on line 499 was never true
500 Task.increment_cancelled_children_for_parent(task.parent_id)
501 should_mark_parent_cancelled = True
502 if not task.parent_id: 502 ↛ 540line 502 didn't jump to line 540
503 queue_solve_completion_email_for_task(task, scenario_id=solve_response.scenario_id)
504 else:
505 task.completed_time = timezone.now()
506 task.log = solve_response.log
507 task.debug = {
508 **(task.debug or {}),
509 "timing": solve_response.timing or {},
510 }
512 if solve_response.status == CompletionStatus.SUCCESS:
513 task.status = TaskStatus.Completed
514 else:
515 task.status = TaskStatus.Failed
516 task.error = {
517 "message": solve_response.error["message"],
518 "cause": "idaes_service_request",
519 "traceback": solve_response.traceback
520 }
522 task.save(update_fields=[
523 "status", "completed_time", "log", "debug", "error"])
525 # Save the solved flowsheet values
526 if task.status == TaskStatus.Completed:
527 store_properties_schema(solve_response.flowsheet.properties, task.flowsheet_id,
528 solve_response.scenario_id, solve_response.solve_index)
531 # For now, only save initial values for single and dynamic solves, not MSS
532 # In future, we may need some more complex logic to handle MSS initial values
533 if solve_response.solve_index is None:
534 save_all_initial_values(
535 solve_response.flowsheet.initial_values)
536 update_diagnostics_results(task.flowsheet,solve_response.unit_diagnostics)
538 _send_task_notifications(task, scenario_id=solve_response.scenario_id)
540 if should_mark_parent_cancelled:
541 _send_task_cancelled_notification(task)
543 if solve_response.scenario_id is None:
544 mark_parent_cancelled(task)
545 else:
546 mark_parent_cancelled(task, scenario_id=solve_response.scenario_id)
549def mark_parent_cancelled(task: Task, scenario_id: int | None = None):
550 """Resolve a task or parent task to ``cancelled`` once cancellation has won.
552 A parent may legitimately end with zero explicitly cancelled children if the
553 last in-flight solves complete before their kill signals land. In that case
554 an already-``cancelling`` parent is still resolved to ``cancelled`` once all
555 children have reached any terminal state.
556 """
557 with transaction.atomic():
558 try:
559 parent = (
560 Task.objects.select_for_update().get(id=task.parent_id)
561 if task.parent_id
562 else None
563 )
564 except Task.DoesNotExist:
565 logger.info(
566 "Discarding parent cancellation update for missing parent task_id=%s.",
567 task.parent_id,
568 )
569 return
571 if parent is None:
572 parent = task
574 has_children = parent.children.exists()
575 has_in_flight_children = parent.children.filter(
576 status__in=[
577 TaskStatus.Running,
578 TaskStatus.Pending,
579 TaskStatus.Cancelling,
580 ]
581 ).exists()
583 has_cancelled_children = parent.children.filter(
584 status=TaskStatus.Cancelled
585 ).exists()
587 # When a cancelling parent has children, resolve it to cancelled once all
588 # children have reached any terminal state. This covers races where child
589 # solves finish before their kill signals land, so none end up explicitly
590 # marked as cancelled even though the parent cancellation should still win.
591 should_cancel_parent = (
592 not has_in_flight_children
593 and (
594 has_cancelled_children
595 or (has_children and parent.status == TaskStatus.Cancelling)
596 )
597 )
599 if should_cancel_parent:
600 parent.status = TaskStatus.Cancelled
601 parent.completed_time = timezone.now()
602 parent.save(update_fields=["status", "completed_time"])
604 messaging.send_flowsheet_notification_message(
605 parent.flowsheet_id,
606 TaskSerializer(parent).data,
607 NotificationServiceMessageType.TASK_CANCELLED
608 )
609 queue_solve_completion_email_for_task(parent, scenario_id=scenario_id)
612def process_failed_idaes_solve_response(solve_response: IdaesSolveCompletionPayload):
613 """Handle final failure notifications for solves that could not be processed.
615 Args:
616 solve_response: Completion payload received from the dead-letter queue.
617 """
618 # Use a transaction to ensure that either everything succeeds or nothing does
619 should_mark_parent_cancelled = False
620 with transaction.atomic():
621 try:
622 task = Task.objects.select_for_update().get(id=solve_response.task_id)
623 except Task.DoesNotExist:
624 logger.info(
625 "Discarding failed solve completion for missing task_id=%s.",
626 solve_response.task_id,
627 )
628 return
630 # Silently ignore if the task has already been marked as failed.
631 # This allows us to simulate exactly-once delivery semantics (only process
632 # a failed task once). Our dead letter queue is configured in "at least once" delivery mode.
633 if task.status == TaskStatus.Failed or task.status == TaskStatus.Cancelled: 633 ↛ 634line 633 didn't jump to line 634 because the condition on line 633 was never true
634 return
636 if task.status == TaskStatus.Cancelling: 636 ↛ 650line 636 didn't jump to line 650 because the condition on line 636 was always true
637 task.status = TaskStatus.Cancelled
638 task.completed_time = timezone.now()
639 task.log = solve_response.log
640 task.save(update_fields=["status", "completed_time", "log"])
641 if task.parent_id: 641 ↛ 642line 641 didn't jump to line 642 because the condition on line 641 was never true
642 Task.increment_cancelled_children_for_parent(task.parent_id)
643 should_mark_parent_cancelled = True
644 if not task.parent_id: 644 ↛ 664line 644 didn't jump to line 664
645 queue_solve_completion_email_for_task(
646 task,
647 scenario_id=getattr(solve_response, "scenario_id", None),
648 )
649 else:
650 task.completed_time = timezone.now()
651 task.log = solve_response.log
653 task.status = TaskStatus.Failed
654 task.error = {
655 "message": f"Internal server error: several attempts to process finished solve failed. {json.dumps(solve_response.error)}"
656 }
657 task.save()
659 _send_task_notifications(
660 task,
661 scenario_id=getattr(solve_response, "scenario_id", None),
662 )
664 if should_mark_parent_cancelled: 664 ↛ exitline 664 didn't return from function 'process_failed_idaes_solve_response' because the condition on line 664 was always true
665 _send_task_cancelled_notification(task)
666 scenario_id = getattr(solve_response, "scenario_id", None)
667 if scenario_id is None: 667 ↛ 670line 667 didn't jump to line 670 because the condition on line 667 was always true
668 mark_parent_cancelled(task)
669 else:
670 mark_parent_cancelled(task, scenario_id=scenario_id)
673def process_ml_training_response(
674 ml_training_response: MLTrainingCompletionPayload
675):
676 """Persist the result of a machine-learning training job and send updates.
678 Args:
679 ml_training_response: Completion payload returned by the ML service.
680 """
681 with transaction.atomic():
682 try:
683 task = Task.objects.select_for_update().get(id=ml_training_response.task_id)
684 except Task.DoesNotExist:
685 logger.info(
686 "Discarding ML training completion for missing task_id=%s.",
687 ml_training_response.task_id,
688 )
689 return
691 # Silently ignore if the task has already been marked as completed.
692 # This allows us to simulate exactly-once delivery semantics (only process
693 # a finished task once).
694 if task.status == TaskStatus.Completed or task.status == TaskStatus.Cancelled: 694 ↛ 695line 694 didn't jump to line 695 because the condition on line 694 was never true
695 return
697 task.completed_time = timezone.now()
698 task.log = ml_training_response.log
699 existing_debug = task.debug or {}
700 model_id = existing_debug.get("model_id")
702 if ml_training_response.status == "success":
703 if ml_training_response.json_response is None: 703 ↛ 704line 703 didn't jump to line 704 because the condition on line 703 was never true
704 raise ValueError("Successful ML training payloads must include json_response.")
705 result = ml_training_response.json_response
706 task.status = TaskStatus.Completed
707 task.debug = {
708 **existing_debug,
709 "timing": result.timing,
710 }
711 else:
712 task.status = TaskStatus.Failed
713 task.error = {
714 "message": ml_training_response.error,
715 "traceback": ml_training_response.traceback
716 }
717 if model_id is not None: 717 ↛ 720line 717 didn't jump to line 720 because the condition on line 717 was always true
718 MLModel.objects.filter(id=model_id).update(progress=4)
720 task.save(
721 update_fields=["status", "completed_time", "log", "debug", "error"]
722 )
724 if task.status == TaskStatus.Completed and model_id is not None:
725 MLModel.objects.filter(id=model_id).update(
726 surrogate_model=result.surrogate_model,
727 charts=[chart.model_dump() for chart in result.charts],
728 metrics=[metric.model_dump() for metric in result.metrics],
729 test_results_bucket=result.test_results_bucket,
730 test_results_key=result.test_results_key,
731 progress=3,
732 )
733 if result.test_results_bucket and result.test_results_key: 733 ↛ 742line 733 didn't jump to line 742 because the condition on line 733 was always true
734 schedule_object_expiration(
735 bucket=result.test_results_bucket,
736 key=result.test_results_key,
737 expires_at=expires_at_from_ttl(
738 completed_csv_lifecycle_ttl(UploadSessionPurpose.ML_TRAINING_CSV),
739 reference_time=task.completed_time,
740 ),
741 )
742 _send_task_notifications(task)
746def cancel_idaes_solve(task_id: int):
747 """Mark an in-flight solve task as cancelled and notify subscribers.
749 For parent tasks with children (MSS scenarios), child status updates use
750 queryset-level UPDATEs (O(1) queries regardless of child count) and the
751 frontend is told to invalidate its child-task cache with a single
752 ``TASK_CHILDREN_CANCELLED`` notification instead of one message per child.
754 Args:
755 task_id: Identifier of the `Task` being cancelled.
756 """
757 to_cache_cancel: set[int] = set()
758 to_send_cancel: list[int] = []
759 has_children = False
760 cancelled_at = timezone.now()
761 with transaction.atomic():
762 task = Task.objects.select_for_update().get(id=task_id)
764 # Ignore cancellation request if a final status (e.g. completed or failed) has already been set
765 if task.status in (
766 TaskStatus.Completed,
767 TaskStatus.Failed,
768 TaskStatus.Cancelled,
769 TaskStatus.Cancelling
770 ):
771 return
773 if task.parent is None and task.children.exists():
774 has_children = True
776 # Use queryset-level updates to avoid loading potentially
777 # hundreds of thousands of child Task objects into memory.
778 # PostgreSQL guarantees atomicity of each UPDATE statement,
779 # so the WHERE clause filters at execution time.
780 non_terminal_children = task.children.exclude(
781 status__in=[
782 TaskStatus.Completed,
783 TaskStatus.Failed,
784 TaskStatus.Cancelled,
785 TaskStatus.Cancelling,
786 ]
787 )
788 # Immediately cancel pending children; they haven't been dispatched yet.
789 cancelled_pending_child_count = (
790 non_terminal_children.filter(status=TaskStatus.Pending)
791 .update(status=TaskStatus.Cancelled, completed_time=cancelled_at)
792 )
793 # Mark remaining non-terminal children (e.g. Running) as Cancelling.
794 non_terminal_children.exclude(status=TaskStatus.Pending).update(
795 status=TaskStatus.Cancelling
796 )
798 Task.increment_cancelled_children_for_parent(task.id, cancelled_pending_child_count)
800 to_cache_cancel.add(task.id)
801 to_send_cancel.append(task.id)
802 task.status = TaskStatus.Cancelling
803 else:
804 task.status = TaskStatus.Cancelling
805 to_cache_cancel.add(task.id)
806 to_send_cancel.append(task.id)
808 task.save()
810 cache_cancelled_tasks(to_cache_cancel)
811 for cancel_task_id in to_send_cancel:
812 # For MSS, broadcasting just the parent task ID is enough: every child
813 # solve consults shared cancellation state for both its own ID and its
814 # parent ID before starting work.
815 messaging.send_task_cancel_message(cancel_task_id)
817 # Send a single parent-level notification; for parent tasks with children,
818 # include a children-invalidated event so the frontend drops its cached
819 # child pages and refetches on demand rather than processing one message
820 # per child task.
821 notification_messages = [
822 NotificationServiceMessage(
823 data=TaskSerializer(task).data,
824 message_type=NotificationServiceMessageType.TASK_CANCELLING,
825 )
826 ]
827 if has_children:
828 notification_messages.append(
829 NotificationServiceMessage(
830 data=TaskSerializer(task).data,
831 message_type=NotificationServiceMessageType.TASK_CHILDREN_CANCELLED,
832 )
833 )
834 messaging.send_flowsheet_notification_messages(
835 task.flowsheet_id, notification_messages,
836 )
837 mark_parent_cancelled(task)
840def process_cancel_solve_response(cancel_response: TaskPayload):
841 """Persist a remote cancellation acknowledgement from the IDAES service.
843 The IDAES service emits this only after it has successfully interrupted the
844 active solver, so Django can safely settle the task to a terminal
845 ``cancelled`` state and then check whether the parent can also be finalised.
846 """
847 notification_message: NotificationServiceMessage | None = None
848 # Use a transaction to ensure that either everything succeeds or nothing does
849 with transaction.atomic():
850 try:
851 task = Task.objects.select_for_update().get(id=cancel_response.task_id)
852 except Task.DoesNotExist:
853 logger.info(
854 "Discarding cancel acknowledgement for missing task_id=%s.",
855 cancel_response.task_id,
856 )
857 return
859 # Timeout expiry can arrive before Django transitions a task to
860 # `Cancelling`, so accept pending/running states for timeout paths.
861 allowed_statuses = {TaskStatus.Cancelling}
862 if cancel_response.timed_out:
863 allowed_statuses.update((TaskStatus.Pending, TaskStatus.Running))
865 if task.status not in allowed_statuses:
866 return
868 task.status = TaskStatus.Cancelled
869 task.completed_time = timezone.now()
870 # Preserve timeout metadata on the task debug payload so downstream
871 # notifications have the value that actually governed the solve.
872 solve_timeout_seconds = (task.debug or {}).get(
873 "solve_timeout_seconds",
874 SOLVE_TIMEOUT_DEFAULT_SECONDS,
875 )
876 task.debug = {
877 **(task.debug or {}),
878 "timed_out": bool(cancel_response.timed_out),
879 "solve_timeout_seconds": solve_timeout_seconds,
880 }
881 if cancel_response.timed_out:
882 timeout_message = (
883 f"Solve timed out after {solve_timeout_seconds} seconds."
884 )
885 task.log = (
886 f"{task.log}\n{timeout_message}" if task.log else timeout_message
887 )
888 task.save(update_fields=["status", "completed_time", "debug", "log"])
889 if task.parent_id:
890 Task.increment_cancelled_children_for_parent(task.parent_id)
891 notification_message = NotificationServiceMessage(
892 data=TaskSerializer(task).data,
893 message_type=NotificationServiceMessageType.TASK_CANCELLED
894 )
896 if notification_message is not None: 896 ↛ 902line 896 didn't jump to line 902 because the condition on line 896 was always true
897 messaging.send_flowsheet_notification_messages(
898 task.flowsheet_id, [notification_message]
899 )
900 queue_solve_completion_email_for_task(task)
902 mark_parent_cancelled(task)
905def process_build_state_response(build_state_response: BuildStateCompletionPayload):
906 """
907 Apply an asynchronous build-state completion payload from IDAES.
909 Successful responses update property values for the originating flowsheet.
910 App-level failures are intentionally acknowledged without applying partial
911 data and then surfaced to frontend subscribers.
912 """
913 context = build_state_response.context
914 if context is None:
915 logger.warning(
916 "Discarding build-state response without request context."
917 )
918 return
920 if not _settle_build_state_task_from_completion(build_state_response):
921 return
923 if not _is_current_build_state_response(context):
924 return
926 if build_state_response.status == CompletionStatus.SUCCESS:
927 store_properties_schema(
928 build_state_response.properties,
929 context.flowsheet_id,
930 )
931 _send_build_state_completion_notification(build_state_response)
932 return
934 logger.warning(
935 "Build-state request failed for flowsheet_id=%s stream_id=%s "
936 "property_set_id=%s: %s",
937 context.flowsheet_id,
938 context.stream_id,
939 context.property_set_id,
940 build_state_response.error,
941 )
942 _send_build_state_completion_notification(build_state_response)
945def _settle_build_state_task_from_completion(
946 build_state_response: BuildStateCompletionPayload,
947) -> bool:
948 """Record the final status for a tracked build-state request.
950 Returns ``False`` when a terminal task already exists for this payload. That
951 lets duplicate completion deliveries remain idempotent, matching solve and
952 ML task handling.
953 """
954 context = build_state_response.context
955 if context is None: 955 ↛ 956line 955 didn't jump to line 956 because the condition on line 955 was never true
956 return True
958 with transaction.atomic():
959 try:
960 task = Task.objects.select_for_update().get(id=context.task_id)
961 except Task.DoesNotExist:
962 logger.info(
963 "Discarding build-state completion for missing task_id=%s.",
964 context.task_id,
965 )
966 return False
968 if task.status in ( 968 ↛ 973line 968 didn't jump to line 973 because the condition on line 968 was never true
969 TaskStatus.Completed,
970 TaskStatus.Failed,
971 TaskStatus.Cancelled,
972 ):
973 return False
975 if task.status == TaskStatus.Cancelling: 975 ↛ 976line 975 didn't jump to line 976 because the condition on line 975 was never true
976 task.status = TaskStatus.Cancelled
977 elif build_state_response.status == CompletionStatus.SUCCESS:
978 task.status = TaskStatus.Completed
979 else:
980 _revert_build_state_task_values(task)
981 task.status = TaskStatus.Failed
982 task.error = {
983 "message": build_state_response.error,
984 "cause": "build_state_request",
985 "traceback": build_state_response.traceback,
986 }
988 task.completed_time = timezone.now()
989 task.log = build_state_response.log
990 task.debug = {
991 **(task.debug or {}),
992 "context": context.model_dump(mode="json"),
993 }
994 task.save(update_fields=["status", "completed_time", "log", "debug", "error"])
996 _send_task_notifications(task)
997 return task.status != TaskStatus.Cancelled
1000def _is_current_build_state_response(context: BuildStateRequestContext) -> bool:
1001 """Return whether a build-state response still matches the latest request."""
1002 return _is_current_build_state_context(context, "build-state response")
1005def _is_current_build_state_context(
1006 context: BuildStateRequestContext,
1007 event_label: str,
1008) -> bool:
1009 """Return whether a build-state event context still matches the latest request."""
1010 try:
1011 latest_request_version = BuildStateRequestVersion.objects.only("version").get(
1012 property_set_id=context.property_set_id,
1013 ).version
1014 except BuildStateRequestVersion.DoesNotExist:
1015 logger.warning(
1016 "Discarding %s because no request version tracker exists "
1017 "for flowsheet_id=%s stream_id=%s property_set_id=%s request_version=%s.",
1018 event_label,
1019 context.flowsheet_id,
1020 context.stream_id,
1021 context.property_set_id,
1022 context.request_version,
1023 )
1024 return False
1026 if latest_request_version != context.request_version:
1027 logger.info(
1028 "Discarding stale %s for flowsheet_id=%s stream_id=%s "
1029 "property_set_id=%s request_version=%s latest_request_version=%s.",
1030 event_label,
1031 context.flowsheet_id,
1032 context.stream_id,
1033 context.property_set_id,
1034 context.request_version,
1035 latest_request_version,
1036 )
1037 return False
1039 return True
1042def _send_build_state_completion_notification(
1043 build_state_response: BuildStateCompletionPayload,
1044):
1045 """Broadcast a build-state completion update to flowsheet subscribers."""
1046 context = build_state_response.context
1047 if context is None: 1047 ↛ 1048line 1047 didn't jump to line 1048 because the condition on line 1047 was never true
1048 return
1050 payload = BuildStateCompletedPayload(
1051 context=context,
1052 status=build_state_response.status,
1053 properties=build_state_response.properties,
1054 error=build_state_response.error,
1055 failure_kind=(
1056 "idaes-error"
1057 if build_state_response.status == CompletionStatus.ERROR
1058 else None
1059 ),
1060 )
1061 messaging.send_flowsheet_notification_message(
1062 context.flowsheet_id,
1063 payload.model_dump(mode="json"),
1064 NotificationServiceMessageType.BUILD_STATE_COMPLETED,
1065 )
1068def process_build_state_request_dead_letter(build_state_request: BuildStateRequestSchema):
1069 """Record that a build-state request failed broker delivery.
1071 Dead letters are transport-level failures, not IDAES state-block solve
1072 failures. This handler avoids applying property data and only emits a
1073 diagnostic notification when the dead-lettered request is still the latest
1074 request.
1075 """
1076 context = build_state_request.context
1077 if context is None:
1078 logger.warning(
1079 "Build-state request reached the dead-letter topic without request context."
1080 )
1081 return
1083 if not _settle_build_state_task_from_delivery_failure(context):
1084 return
1086 if not _is_current_build_state_context(context, "build-state request dead letter"): 1086 ↛ 1087line 1086 didn't jump to line 1087 because the condition on line 1086 was never true
1087 return
1089 logger.warning(
1090 "Build-state request delivery failed for flowsheet_id=%s stream_id=%s "
1091 "property_set_id=%s request_version=%s.",
1092 context.flowsheet_id,
1093 context.stream_id,
1094 context.property_set_id,
1095 context.request_version,
1096 )
1097 _send_build_state_delivery_failure_notification(context)
1100def _settle_build_state_task_from_delivery_failure(
1101 context: BuildStateRequestContext,
1102) -> bool:
1103 """Mark a tracked build-state request as failed when broker delivery fails."""
1104 with transaction.atomic():
1105 try:
1106 task = Task.objects.select_for_update().get(id=context.task_id)
1107 except Task.DoesNotExist:
1108 logger.info(
1109 "Discarding build-state delivery failure for missing task_id=%s.",
1110 context.task_id,
1111 )
1112 return False
1114 if task.status in ( 1114 ↛ 1119line 1114 didn't jump to line 1119 because the condition on line 1114 was never true
1115 TaskStatus.Completed,
1116 TaskStatus.Failed,
1117 TaskStatus.Cancelled,
1118 ):
1119 return False
1121 task.status = (
1122 TaskStatus.Cancelled
1123 if task.status == TaskStatus.Cancelling
1124 else TaskStatus.Failed
1125 )
1126 if task.status == TaskStatus.Failed: 1126 ↛ 1128line 1126 didn't jump to line 1128 because the condition on line 1126 was always true
1127 _revert_build_state_task_values(task)
1128 task.completed_time = timezone.now()
1129 task.error = {
1130 "message": "Build-state request delivery failed.",
1131 "cause": "message_delivery",
1132 "traceback": None,
1133 }
1134 task.debug = {
1135 **(task.debug or {}),
1136 "context": context.model_dump(mode="json"),
1137 }
1138 task.save(update_fields=["status", "completed_time", "error", "debug"])
1140 _send_task_notifications(task)
1141 return task.status != TaskStatus.Cancelled
1144def _send_build_state_delivery_failure_notification(
1145 context: BuildStateRequestContext,
1146):
1147 """Broadcast a silent transport-level build-state request failure event."""
1148 payload = BuildStateCompletedPayload(
1149 context=context,
1150 status="error",
1151 properties=None,
1152 error=None,
1153 failure_kind="delivery-failure",
1154 )
1155 messaging.send_flowsheet_notification_message(
1156 context.flowsheet_id,
1157 payload.model_dump(mode="json"),
1158 NotificationServiceMessageType.BUILD_STATE_COMPLETED,
1159 )
1162def generate_IDAES_python_request(flowsheet_id: int) -> Response:
1163 """Return the flowsheet JSON for the given flowsheet.
1165 The previous behaviour attempted to generate Python source by forwarding
1166 the flowsheet to the IDAES service. Python generation is no longer
1167 supported; this function now always returns the local flowsheet JSON.
1168 """
1169 scenario = None
1170 flowsheet = Flowsheet.objects.get(id=flowsheet_id)
1171 factory = IdaesFactory(group_id=flowsheet.rootGrouping.id, scenario=scenario, require_variables_fixed=False)
1172 response_data = ResponseType(
1173 status="success",
1174 error=None,
1175 log=None,
1176 debug=None
1177 )
1178 try:
1179 factory.build()
1180 data = factory.flowsheet
1181 # use model_dump_json so that pydantic can handle any non-serializable fields instead
1182 # of django's logic which would error out.
1183 return Response(data.model_dump_json(), status=200)
1184 except Exception as e:
1185 response_data["status"] = "error"
1186 response_data["error"] = {
1187 "message": str(e),
1188 "traceback": traceback.format_exc()
1189 }
1190 return Response(response_data, status=400)
1191 # NOTE: Python generation used to be implemented by forwarding the
1192 # flowsheet to the IDAES service; that pathway has been removed. We
1193 # have already returned above with the JSON representation.
1196class BuildStateSolveError(Exception):
1197 pass
1200def _next_build_state_request_version(property_set: PropertySet) -> int:
1201 """Increment and return the latest build-state request version for a property set."""
1202 try:
1203 return _next_build_state_request_version_locked(property_set)
1204 except IntegrityError:
1205 # A concurrent first request may have created the tracker between the
1206 # get and create inside get_or_create. Retry once and lock the new row.
1207 return _next_build_state_request_version_locked(property_set)
1210def _next_build_state_request_version_locked(property_set: PropertySet) -> int:
1211 """Increment the version while holding a row lock for this property set."""
1212 with transaction.atomic():
1213 tracker, _ = BuildStateRequestVersion.objects.select_for_update().get_or_create(
1214 property_set_id=property_set.id,
1215 defaults={"flowsheet_id": property_set.flowsheet_id},
1216 )
1217 tracker.version += 1
1218 tracker.save(update_fields=["version", "updated_at"])
1219 return tracker.version
1222def state_request_build(
1223 stream: SimulationObject,
1224 *,
1225 task_id: int,
1226) -> BuildStateRequestSchema:
1227 """Build the IDAES service payload for a stream state request."""
1228 ctx = IdaesFactoryContext(stream.flowsheet.rootGrouping.id)
1229 property_set = stream.properties
1231 port = stream.connectedPorts.get(direction="inlet")
1232 unitop: SimulationObject = port.unitOp
1233 # find the property package key for this port (we have the value, the key of this port)
1234 property_package_ports = unitop.schema.propertyPackagePorts
1235 for key, port_list in property_package_ports.items():
1236 if port.key in port_list:
1237 property_package_key = key
1238 PropertyPackageAdapter(
1239 property_package_key).serialise(ctx, unitop)
1241 return BuildStateRequestSchema(
1242 property_package=ctx.property_packages[0],
1243 properties=serialise_stream(ctx, stream, is_inlet=True),
1244 context=BuildStateRequestContext(
1245 flowsheet_id=stream.flowsheet_id,
1246 stream_id=stream.id,
1247 task_id=task_id,
1248 property_set_id=property_set.id,
1249 request_version=_next_build_state_request_version(property_set),
1250 ),
1251 )
1254def _create_build_state_task(
1255 stream: SimulationObject,
1256 user: User | int,
1257 rollback_values: dict[int, float] | None = None,
1258) -> Task:
1259 """Create the tracking task used for a build-state request."""
1260 task = Task.create(
1261 user,
1262 stream.flowsheet_id,
1263 task_type=TaskType.BUILD_STATE,
1264 status=TaskStatus.Pending,
1265 save=True,
1266 )
1267 task.debug = {
1268 "stream_id": stream.id,
1269 "property_set_id": stream.properties.id,
1270 "rollback_values": (
1271 {str(prop_id): value for prop_id, value in rollback_values.items()}
1272 if rollback_values
1273 else {}
1274 ),
1275 }
1276 task.save(update_fields=["debug"])
1277 return task
1280def _revert_build_state_task_values(task: Task) -> None:
1281 """Restore raw property values captured before the build-state request."""
1282 rollback_values = (task.debug or {}).get("rollback_values")
1283 if not rollback_values:
1284 return
1286 property_values = list(
1287 PropertyValue.objects.filter(id__in=[int(prop_id) for prop_id in rollback_values])
1288 )
1289 for property_value in property_values:
1290 property_value.value = rollback_values[str(property_value.id)]
1292 PropertyValue.objects.bulk_update(property_values, ["value"])
1295def _fail_build_state_task_before_dispatch(
1296 task: Task,
1297 exception: Exception,
1298 *,
1299 cause: str,
1300) -> None:
1301 """Persist a build-state setup/dispatch failure before re-raising it."""
1302 _revert_build_state_task_values(task)
1303 task.set_failure_with_exception(
1304 DetailedException(exception, source=cause),
1305 save=True,
1306 )
1307 _send_task_notifications(task)
1310def _mark_build_state_task_dispatched(
1311 task: Task,
1312 context: BuildStateRequestContext | None,
1313) -> None:
1314 """Persist dispatch metadata for a queued build-state task."""
1315 task.debug = {
1316 **(task.debug or {}),
1317 "idaes_dispatched": True,
1318 "context": (
1319 context.model_dump(mode="json") if context is not None else None
1320 ),
1321 }
1322 task.save(update_fields=["debug"])
1325def build_state_request(
1326 stream: SimulationObject,
1327 user: User | int,
1328 rollback_values: dict[int, float] | None = None,
1329):
1330 """Queue a state build request for the provided stream.
1332 Args:
1333 stream: Stream object whose inlet properties should be used for the build.
1334 user: User requesting the property update that triggered the build.
1336 Returns:
1337 REST response containing the tracking task. Built properties are applied
1338 later by ``process_build_state_response`` when IDAES publishes the
1339 completion event.
1341 Raises:
1342 BuildStateSolveError: If the request cannot be queued.
1343 Exception: If preparing the payload fails for any reason.
1344 """
1345 task = _create_build_state_task(
1346 stream,
1347 user,
1348 rollback_values=rollback_values,
1349 )
1350 try:
1351 data = state_request_build(stream, task_id=task.id)
1352 messaging.send_idaes_build_state_request_message(data)
1353 _mark_build_state_task_dispatched(task, data.context)
1354 return Response(TaskSerializer(task).data, status=202)
1355 except Exception as e:
1356 _fail_build_state_task_before_dispatch(
1357 task,
1358 e,
1359 cause="build_state_dispatch",
1360 )
1361 raise BuildStateSolveError(str(e))