Coverage for backend/django/idaes_factory/endpoints.py: 86%
493 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1import logging
2import os
3from typing import TypedDict
4from common.models.general import TaskPayload
5import requests
6import traceback
8from django.db import IntegrityError, transaction
9from django.utils import timezone
10from opentelemetry import trace
11from rest_framework.response import Response
12from pydantic import JsonValue
14from CoreRoot import settings
15from authentication.user.models import User
16from ahuora_builder_types.payloads.build_state_request_schema import (
17 BuildStateCompletionPayload,
18 BuildStateRequestContext,
19 BuildStateRequestSchema,
20)
21from ahuora_builder_types.payloads.solve_request_schema import IdaesSolveRequestPayload, IdaesSolveCompletionPayload, \
22 MultiSolvePayload
23from common.models.idaes.payloads.solve_request_schema import CompletionStatus
24from ahuora_builder_types.payloads.ml_request_schema import MLTrainRequestPayload, MLTrainingCompletionPayload
25from core.auxiliary.models.MLModel import MLModel
26from core.auxiliary.models.UploadSession import UploadSessionPurpose
27from common.models.notifications.payloads import (
28 BuildStateCompletedPayload,
29 TaskCompletedPayload,
30 NotificationServiceMessageType,
31 NotificationServiceMessage,
32)
33from core.auxiliary.enums.generalEnums import TaskStatus
34from core.auxiliary.models.Task import Task, TaskType
35from core.auxiliary.models.BuildStateRequestVersion import BuildStateRequestVersion
36from core.auxiliary.models.PropertySet import PropertySet
37from core.auxiliary.models.PropertyValue import PropertyValue
38from core.auxiliary.models.Flowsheet import Flowsheet
39from core.auxiliary.serializers import TaskSerializer
40from common.services.task_cancellation_state import cache_cancelled_tasks
41from core.auxiliary.services.csv_lifecycle import completed_csv_lifecycle_ttl, expires_at_from_ttl
42from core.auxiliary.services.object_storage.s3 import schedule_object_expiration
43from core.auxiliary.services.solve_completion_email import queue_solve_completion_email_for_task
44from core.exceptions import DetailedException
45from .idaes_factory import IdaesFactory, save_all_initial_values, store_properties_schema
46from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
47from .adapters.stream_properties import serialise_stream
48from .adapters.property_package_adapter import PropertyPackageAdapter
49from .idaes_factory_context import IdaesFactoryContext
50from core.auxiliary.models.Scenario import (
51 Scenario,
52 ScenarioTabTypeEnum,
53 SOLVE_TIMEOUT_DEFAULT_SECONDS,
54)
55from common.services import messaging
57logger = logging.getLogger(__name__)
58tracer = trace.get_tracer(settings.OPEN_TELEMETRY_TRACER_NAME)
59PARENT_TASK_CANCEL_CHECK_BATCH_SIZE = 25
62def _resolve_solve_timeout_seconds(
63 scenario: Scenario | None,
64 *,
65 parent_task_id: int | None,
66) -> int | None:
67 """Return the solve timeout that should be forwarded to one IDAES solve.
69 MSS parent tasks are orchestration-only and are not forwarded to the IDAES
70 service as solve requests. Their child solves still inherit the scenario
71 timeout, while direct solves fall back to the platform default
72 when no scenario was selected.
73 """
74 if scenario is None:
75 return SOLVE_TIMEOUT_DEFAULT_SECONDS
76 return scenario.solve_timeout_seconds
79class IdaesServiceRequestException(Exception):
80 def __init__(self, message: str) -> None:
81 super().__init__(message)
82 self.message = message
85class SolveFlowsheetError(DetailedException):
86 pass
89class ResponseType(TypedDict, total=False):
90 status: str
91 error: dict | None
92 log: str | None
93 debug: dict | None
96def idaes_service_request(endpoint: str, data: JsonValue) -> JsonValue:
97 """Send a JSON payload to the configured IDAES service endpoint.
99 Args:
100 endpoint: Relative path of the IDAES service endpoint to call.
101 data: Serialised payload that conforms to the endpoint schema.
103 Returns:
104 Parsed JSON response returned by the IDAES service.
106 Raises:
107 IdaesServiceRequestException: If the service responds with a non-200 status.
108 """
109 url = (os.getenv('IDAES_SERVICE_URL')
110 or "http://localhost:8080") + "/" + endpoint
111 result = requests.post(url, json=data)
112 if result.status_code != 200:
113 raise IdaesServiceRequestException(result.json())
115 return result.json()
118@tracer.start_as_current_span("send_flowsheet_solve_request")
119def _solve_flowsheet_request(
120 task_id: int,
121 built_factory: IdaesFactory,
122 parent_task_id: int | None = None,
123 perform_diagnostics: bool = False,
124 high_priority: bool = False
125):
126 """Queue an IDAES solve request for the provided flowsheet build.
128 Args:
129 task_id: Identifier of the task tracking the solve request.
130 built_factory: Fully built factory containing flowsheet data to solve.
131 perform_diagnostics: Whether to request diagnostic output from IDAES.
132 high_priority: Whether the message should be prioritised over normal solves.
134 Raises:
135 SolveFlowsheetError: If the message cannot be dispatched to the queue.
136 """
138 try:
139 solve_timeout_seconds = _resolve_solve_timeout_seconds(
140 built_factory.scenario,
141 parent_task_id=parent_task_id,
142 )
143 idaes_payload = IdaesSolveRequestPayload(
144 flowsheet=built_factory.flowsheet,
145 solve_index=built_factory.solve_index,
146 scenario_id=(
147 built_factory.scenario.id if built_factory.scenario else None),
148 task_id=task_id,
149 parent_task_id=parent_task_id,
150 perform_diagnostics=perform_diagnostics,
151 solve_timeout_seconds=solve_timeout_seconds,
152 )
154 messaging.send_idaes_solve_message(
155 idaes_payload, high_priority=high_priority)
156 except Exception as e:
157 raise SolveFlowsheetError(e, "idaes_factory_solve_message")
160def _mark_task_dispatched_to_idaes(task: Task) -> None:
161 """Persist that a solve task was successfully queued to the IDAES service."""
162 task.debug = {
163 **(task.debug or {}),
164 "idaes_dispatched": True,
165 }
166 task.save(update_fields=["debug"])
169def start_flowsheet_solve_event(
170 flowsheet_id: int,
171 group_id: int,
172 user: User,
173 scenario: Scenario = None,
174 perform_diagnostics: bool = False
175) -> Response:
176 """Start a single solve for the given flowsheet and return the tracking task.
178 Args:
179 flowsheet_id: Identifier of the flowsheet that should be solved.
180 user: User initiating the solve request.
181 scenario: Optional scenario providing context for the solve.
182 perform_diagnostics: Whether the solve should run with diagnostic output enabled.
184 Returns:
185 REST response containing the serialised `Task` used to track the solve.
186 """
187 if scenario and scenario.state_name != ScenarioTabTypeEnum.SteadyState and not scenario.dataRows.exists(): 187 ↛ 189line 187 didn't jump to line 189 because the condition on line 187 was never true
188 # if not steady state solves and no data rows exist, cannot proceed
189 return Response(status=400, data=f"No data was provided for {scenario.state_name} scenario.")
191 try:
192 # Remove all previously created DynamicResults for this scenario
193 scenario.solutions.all().delete()
194 except:
195 # there's no solution yet, that's fine
196 pass
198 solve_task = Task.create(
199 user, flowsheet_id, status=TaskStatus.Pending, save=True)
200 # Persist the user's intent alongside the task so downstream consumers
201 # can tell whether this solve was launched with diagnostics enabled.
202 debug: dict[str, JsonValue] = {
203 **(solve_task.debug or {}),
204 "perform_diagnostics": bool(perform_diagnostics),
205 "solve_timeout_seconds": (
206 scenario.solve_timeout_seconds
207 if scenario is not None
208 else SOLVE_TIMEOUT_DEFAULT_SECONDS
209 ),
210 }
211 if scenario is not None:
212 debug["scenario_id"] = scenario.id
213 solve_task.debug = debug
214 solve_task.save(update_fields=["debug"])
216 try:
217 factory = IdaesFactory(
218 group_id=group_id,
219 scenario=scenario
220 )
221 factory.build()
223 # We send single solve requests as high priority to ensure
224 # they are not blocked by large multi-solve requests.
225 _solve_flowsheet_request(
226 solve_task.id,
227 factory,
228 perform_diagnostics=perform_diagnostics,
229 high_priority=True
230 )
231 _mark_task_dispatched_to_idaes(solve_task)
232 except DetailedException as e:
233 solve_task.set_failure_with_exception(e, save=True)
235 task_serializer = TaskSerializer(solve_task)
237 return Response(task_serializer.data, status=200)
240def start_multi_steady_state_solve_event(flowsheet_id: int, user: User, scenario: Scenario) -> Response:
241 """Kick off a multi steady-state solve and return the parent tracking task.
243 Args:
244 flowsheet_id: Identifier of the flowsheet being solved.
245 user: User who requested the multi-solve.
246 scenario: Scenario containing the steady-state configurations to solve.
248 Returns:
249 REST response containing the parent `Task` that aggregates child solves.
250 """
251 if not scenario.dataRows.exists(): # empty rows 251 ↛ 252line 251 didn't jump to line 252 because the condition on line 251 was never true
252 return Response(status=400, data="No data was provided for multi steady-state scenario.")
254 # Remove all previously created DynamicResults for this scenario
255 scenario.solutions.all().delete()
256 solve_iterations = scenario.dataRows.count()
258 parent_task = Task.create_parent_task(
259 creator=user,
260 flowsheet_id=flowsheet_id,
261 scheduled_tasks=solve_iterations,
262 status=TaskStatus.Running
263 )
264 parent_task.debug = {"scenario_id": scenario.id}
265 parent_task.save(update_fields=["debug"])
266 child_tasks: list[Task] = [
267 Task.create(
268 user,
269 flowsheet_id,
270 parent=parent_task,
271 status=TaskStatus.Pending
272 ) for _ in range(solve_iterations)
273 ]
275 Task.objects.bulk_create(child_tasks)
277 messaging.send_dispatch_multi_solve_message(MultiSolvePayload(
278 task_id=parent_task.id,
279 scenario_id=scenario.id)
280 )
282 return Response(TaskSerializer(parent_task).data, status=200)
285def dispatch_multi_solves(parent_task_id: int, scenario_id: int):
286 """Build and dispatch queued steady-state solves for each child task.
288 Args:
289 parent_task_id: Identifier of the parent multi-solve task.
290 scenario_id: Scenario containing the data rows to iterate through.
291 """
292 parent_task = Task.objects.get(id=parent_task_id)
293 scenario = Scenario.objects.get(id=scenario_id)
295 rootGroup = scenario.flowsheet.rootGrouping
296 factory = IdaesFactory(
297 group_id=rootGroup.id,
298 scenario=scenario,
299 )
301 child_tasks = list(parent_task.children.order_by('start_time'))
302 last_child_index = len(child_tasks) - 1
304 for solve_index, task in enumerate(child_tasks):
305 should_check_parent_status = (
306 solve_index % PARENT_TASK_CANCEL_CHECK_BATCH_SIZE == 0
307 or solve_index == last_child_index
308 )
309 if should_check_parent_status:
310 parent_task.refresh_from_db(fields=["status"])
311 if parent_task.status in (TaskStatus.Cancelling, TaskStatus.Cancelled):
312 logger.info(
313 "Stopping multi-solve dispatch for parent task %s because it is now %s.",
314 parent_task.id,
315 parent_task.status,
316 )
317 break
319 try:
320 factory.clear_flowsheet()
321 factory.use_with_solve_index(solve_index)
322 factory.build()
324 _solve_flowsheet_request(
325 task.id,
326 factory,
327 parent_task_id=parent_task.id,
328 )
329 _mark_task_dispatched_to_idaes(task)
330 except DetailedException as e:
331 task.set_failure_with_exception(exception=e, save=True)
332 parent_completed = parent_task.update_status_from_child(task)
333 if parent_completed:
334 queue_solve_completion_email_for_task(parent_task, scenario_id=scenario.id)
336 flowsheet_messages = [
337 NotificationServiceMessage(
338 data=TaskSerializer(task).data,
339 message_type=NotificationServiceMessageType.TASK_UPDATED
340 ) for task in [task, parent_task]
341 ]
343 messaging.send_flowsheet_notification_messages(
344 parent_task.flowsheet_id, flowsheet_messages)
347def start_ml_training_event(
348 csv_bucket: str,
349 csv_key: str,
350 csv_delimiter: str | None,
351 input_labels: list[str],
352 output_labels: list[str],
353 user: User,
354 flowsheet_id: int,
355 model_id: int | None = None,
356):
357 """Queue an asynchronous machine-learning training job for the given dataset.
359 Args:
360 csv_bucket: Bucket containing the training CSV.
361 csv_key: Object key containing the training CSV.
362 csv_delimiter: Optional delimiter for the CSV object.
363 input_labels: Names of the input features.
364 output_labels: Names of the predicted outputs.
365 user: User requesting the training run.
366 flowsheet_id: Flowsheet the training run is associated with.
368 Returns:
369 REST response containing the serialised `Task` for the training job.
370 """
371 training_task = Task.create(
372 user,
373 flowsheet_id,
374 task_type=TaskType.ML_TRAINING,
375 status=TaskStatus.Pending,
376 save=True
377 )
378 training_task.debug = {"model_id": model_id}
379 training_task.save(update_fields=["debug"])
381 try:
382 payload = MLTrainRequestPayload(
383 csv_bucket=csv_bucket,
384 csv_key=csv_key,
385 csv_delimiter=csv_delimiter,
386 input_labels=input_labels,
387 output_labels=output_labels,
388 task_id=training_task.id,
389 )
390 messaging.send_ml_training_message(payload)
392 except DetailedException as e:
393 training_task.set_failure_with_exception(e, save=True)
394 if model_id is not None: 394 ↛ 396line 394 didn't jump to line 396 because the condition on line 394 was always true
395 MLModel.objects.filter(id=model_id).update(progress=4)
396 _send_task_notifications(training_task)
398 task_serializer = TaskSerializer(training_task)
399 return Response(task_serializer.data, status=200)
402def _send_task_notifications(task: Task, scenario_id: int | None = None):
403 """Broadcast task completion or status updates to interested flowsheet clients.
405 Args:
406 task: Task whose status change should be pushed to subscribers.
407 """
408 flowsheet_messages = []
410 # If this is a child task, update the parent task status
411 if task.parent:
412 parent_completed = task.parent.update_status_from_child(task)
413 if parent_completed and task.parent.task_type == TaskType.IDAES_SOLVE:
414 queue_solve_completion_email_for_task(task.parent, scenario_id=scenario_id)
416 message_type = (NotificationServiceMessageType.TASK_COMPLETED
417 if task.parent.status == TaskStatus.Completed else
418 NotificationServiceMessageType.TASK_UPDATED)
420 flowsheet_messages.append(NotificationServiceMessage(
421 data=TaskSerializer(task.parent).data,
422 message_type=message_type
423 ))
424 else:
425 if task.task_type == TaskType.IDAES_SOLVE:
426 queue_solve_completion_email_for_task(task, scenario_id=scenario_id)
428 flowsheet_messages.append(NotificationServiceMessage(
429 data=TaskSerializer(task).data,
430 message_type=NotificationServiceMessageType.TASK_COMPLETED
431 ))
433 messaging.send_flowsheet_notification_messages(
434 task.flowsheet_id, flowsheet_messages)
437def _send_task_cancelled_notification(task: Task):
438 """Broadcast that a task settled in the cancelled state."""
439 messaging.send_flowsheet_notification_messages(
440 task.flowsheet_id,
441 [
442 NotificationServiceMessage(
443 data=TaskSerializer(task).data,
444 message_type=NotificationServiceMessageType.TASK_CANCELLED,
445 )
446 ],
447 )
450def process_idaes_solve_response(solve_response: IdaesSolveCompletionPayload):
451 """Persist the outcome of a completed IDAES solve and notify listeners.
453 Args:
454 solve_response: Payload describing the finished solve result.
455 """
456 # Use a transaction to ensure that either everything succeeds or nothing does
457 should_mark_parent_cancelled = False
458 with transaction.atomic():
459 try:
460 task = (
461 Task.objects
462 .select_related("flowsheet") # eager load FK/OneToOne relations
463 .select_for_update() # lock rows in transaction
464 .get(id=solve_response.task_id)
465 )
466 except Task.DoesNotExist:
467 logger.info(
468 "Discarding solve completion for missing task_id=%s.",
469 solve_response.task_id,
470 )
471 return
474 # Silently ignore if the task has already been marked as completed.
475 # This allows us to simulate exactly-once delivery semantics (only process
476 # a finished task once).
477 if task.status == TaskStatus.Completed or task.status == TaskStatus.Cancelled: 477 ↛ 478line 477 didn't jump to line 478 because the condition on line 477 was never true
478 return
480 # If cancellation won the race before the completion payload arrived,
481 # keep the task cancelled rather than reviving it back to completed/failed.
482 if task.status == TaskStatus.Cancelling:
483 task.status = TaskStatus.Cancelled
484 task.completed_time = timezone.now()
485 task.log = solve_response.log
486 task.debug = {
487 **(task.debug or {}),
488 "timing": solve_response.timing or {},
489 }
490 task.save(update_fields=["status", "completed_time", "log", "debug"])
491 if task.parent_id: 491 ↛ 492line 491 didn't jump to line 492 because the condition on line 491 was never true
492 Task.increment_cancelled_children_for_parent(task.parent_id)
493 should_mark_parent_cancelled = True
494 if not task.parent_id: 494 ↛ 530line 494 didn't jump to line 530
495 queue_solve_completion_email_for_task(task, scenario_id=solve_response.scenario_id)
496 else:
497 task.completed_time = timezone.now()
498 task.log = solve_response.log
499 task.debug = {
500 **(task.debug or {}),
501 "timing": solve_response.timing or {},
502 }
504 if solve_response.status == CompletionStatus.SUCCESS:
505 task.status = TaskStatus.Completed
506 else:
507 task.status = TaskStatus.Failed
508 task.error = {
509 "message": solve_response.error["message"],
510 "cause": "idaes_service_request",
511 "traceback": solve_response.traceback
512 }
514 task.save(update_fields=[
515 "status", "completed_time", "log", "debug", "error"])
517 # Save the solved flowsheet values
518 if task.status == TaskStatus.Completed:
519 store_properties_schema(solve_response.flowsheet.properties, task.flowsheet_id,
520 solve_response.scenario_id, solve_response.solve_index)
522 # For now, only save initial values for single and dynamic solves, not MSS
523 # In future, we may need some more complex logic to handle MSS initial values
524 if solve_response.solve_index is None:
525 save_all_initial_values(
526 solve_response.flowsheet.initial_values)
528 _send_task_notifications(task, scenario_id=solve_response.scenario_id)
530 if should_mark_parent_cancelled:
531 _send_task_cancelled_notification(task)
533 if solve_response.scenario_id is None:
534 mark_parent_cancelled(task)
535 else:
536 mark_parent_cancelled(task, scenario_id=solve_response.scenario_id)
539def mark_parent_cancelled(task: Task, scenario_id: int | None = None):
540 """Resolve a task or parent task to ``cancelled`` once cancellation has won.
542 A parent may legitimately end with zero explicitly cancelled children if the
543 last in-flight solves complete before their kill signals land. In that case
544 an already-``cancelling`` parent is still resolved to ``cancelled`` once all
545 children have reached any terminal state.
546 """
547 with transaction.atomic():
548 try:
549 parent = (
550 Task.objects.select_for_update().get(id=task.parent_id)
551 if task.parent_id
552 else None
553 )
554 except Task.DoesNotExist:
555 logger.info(
556 "Discarding parent cancellation update for missing parent task_id=%s.",
557 task.parent_id,
558 )
559 return
561 if parent is None:
562 parent = task
564 has_children = parent.children.exists()
565 has_in_flight_children = parent.children.filter(
566 status__in=[
567 TaskStatus.Running,
568 TaskStatus.Pending,
569 TaskStatus.Cancelling,
570 ]
571 ).exists()
573 has_cancelled_children = parent.children.filter(
574 status=TaskStatus.Cancelled
575 ).exists()
577 # When a cancelling parent has children, resolve it to cancelled once all
578 # children have reached any terminal state. This covers races where child
579 # solves finish before their kill signals land, so none end up explicitly
580 # marked as cancelled even though the parent cancellation should still win.
581 should_cancel_parent = (
582 not has_in_flight_children
583 and (
584 has_cancelled_children
585 or (has_children and parent.status == TaskStatus.Cancelling)
586 )
587 )
589 if should_cancel_parent:
590 parent.status = TaskStatus.Cancelled
591 parent.completed_time = timezone.now()
592 parent.save(update_fields=["status", "completed_time"])
594 messaging.send_flowsheet_notification_message(
595 parent.flowsheet_id,
596 TaskSerializer(parent).data,
597 NotificationServiceMessageType.TASK_CANCELLED
598 )
599 queue_solve_completion_email_for_task(parent, scenario_id=scenario_id)
602def process_failed_idaes_solve_response(solve_response: IdaesSolveCompletionPayload):
603 """Handle final failure notifications for solves that could not be processed.
605 Args:
606 solve_response: Completion payload received from the dead-letter queue.
607 """
608 # Use a transaction to ensure that either everything succeeds or nothing does
609 should_mark_parent_cancelled = False
610 with transaction.atomic():
611 try:
612 task = Task.objects.select_for_update().get(id=solve_response.task_id)
613 except Task.DoesNotExist:
614 logger.info(
615 "Discarding failed solve completion for missing task_id=%s.",
616 solve_response.task_id,
617 )
618 return
620 # Silently ignore if the task has already been marked as failed.
621 # This allows us to simulate exactly-once delivery semantics (only process
622 # a failed task once). Our dead letter queue is configured in "at least once" delivery mode.
623 if task.status == TaskStatus.Failed or task.status == TaskStatus.Cancelled: 623 ↛ 624line 623 didn't jump to line 624 because the condition on line 623 was never true
624 return
626 if task.status == TaskStatus.Cancelling: 626 ↛ 640line 626 didn't jump to line 640 because the condition on line 626 was always true
627 task.status = TaskStatus.Cancelled
628 task.completed_time = timezone.now()
629 task.log = solve_response.log
630 task.save(update_fields=["status", "completed_time", "log"])
631 if task.parent_id: 631 ↛ 632line 631 didn't jump to line 632 because the condition on line 631 was never true
632 Task.increment_cancelled_children_for_parent(task.parent_id)
633 should_mark_parent_cancelled = True
634 if not task.parent_id: 634 ↛ 653line 634 didn't jump to line 653
635 queue_solve_completion_email_for_task(
636 task,
637 scenario_id=getattr(solve_response, "scenario_id", None),
638 )
639 else:
640 task.completed_time = timezone.now()
641 task.log = solve_response.log
642 task.status = TaskStatus.Failed
643 task.error = {
644 "message": "Internal server error: several attempts to process finished solve failed."
645 }
646 task.save()
648 _send_task_notifications(
649 task,
650 scenario_id=getattr(solve_response, "scenario_id", None),
651 )
653 if should_mark_parent_cancelled: 653 ↛ exitline 653 didn't return from function 'process_failed_idaes_solve_response' because the condition on line 653 was always true
654 _send_task_cancelled_notification(task)
655 scenario_id = getattr(solve_response, "scenario_id", None)
656 if scenario_id is None: 656 ↛ 659line 656 didn't jump to line 659 because the condition on line 656 was always true
657 mark_parent_cancelled(task)
658 else:
659 mark_parent_cancelled(task, scenario_id=scenario_id)
662def process_ml_training_response(
663 ml_training_response: MLTrainingCompletionPayload
664):
665 """Persist the result of a machine-learning training job and send updates.
667 Args:
668 ml_training_response: Completion payload returned by the ML service.
669 """
670 with transaction.atomic():
671 task = Task.objects.select_for_update().get(id=ml_training_response.task_id)
673 # Silently ignore if the task has already been marked as completed.
674 # This allows us to simulate exactly-once delivery semantics (only process
675 # a finished task once).
676 if task.status == TaskStatus.Completed or task.status == TaskStatus.Cancelled: 676 ↛ 677line 676 didn't jump to line 677 because the condition on line 676 was never true
677 return
679 task.completed_time = timezone.now()
680 task.log = ml_training_response.log
681 existing_debug = task.debug or {}
682 model_id = existing_debug.get("model_id")
684 if ml_training_response.status == "success":
685 if ml_training_response.json_response is None: 685 ↛ 686line 685 didn't jump to line 686 because the condition on line 685 was never true
686 raise ValueError("Successful ML training payloads must include json_response.")
687 result = ml_training_response.json_response
688 task.status = TaskStatus.Completed
689 task.debug = {
690 **existing_debug,
691 "timing": result.timing,
692 }
693 else:
694 task.status = TaskStatus.Failed
695 task.error = {
696 "message": ml_training_response.error,
697 "traceback": ml_training_response.traceback
698 }
699 if model_id is not None: 699 ↛ 702line 699 didn't jump to line 702 because the condition on line 699 was always true
700 MLModel.objects.filter(id=model_id).update(progress=4)
702 task.save(
703 update_fields=["status", "completed_time", "log", "debug", "error"]
704 )
706 if task.status == TaskStatus.Completed and model_id is not None:
707 MLModel.objects.filter(id=model_id).update(
708 surrogate_model=result.surrogate_model,
709 charts=[chart.model_dump() for chart in result.charts],
710 metrics=[metric.model_dump() for metric in result.metrics],
711 test_results_bucket=result.test_results_bucket,
712 test_results_key=result.test_results_key,
713 progress=3,
714 )
715 if result.test_results_bucket and result.test_results_key: 715 ↛ 724line 715 didn't jump to line 724 because the condition on line 715 was always true
716 schedule_object_expiration(
717 bucket=result.test_results_bucket,
718 key=result.test_results_key,
719 expires_at=expires_at_from_ttl(
720 completed_csv_lifecycle_ttl(UploadSessionPurpose.ML_TRAINING_CSV),
721 reference_time=task.completed_time,
722 ),
723 )
724 _send_task_notifications(task)
728def cancel_idaes_solve(task_id: int):
729 """Mark an in-flight solve task as cancelled and notify subscribers.
731 For parent tasks with children (MSS scenarios), child status updates use
732 queryset-level UPDATEs (O(1) queries regardless of child count) and the
733 frontend is told to invalidate its child-task cache with a single
734 ``TASK_CHILDREN_CANCELLED`` notification instead of one message per child.
736 Args:
737 task_id: Identifier of the `Task` being cancelled.
738 """
739 to_cache_cancel: set[int] = set()
740 to_send_cancel: list[int] = []
741 has_children = False
742 cancelled_at = timezone.now()
743 with transaction.atomic():
744 task = Task.objects.select_for_update().get(id=task_id)
746 # Ignore cancellation request if a final status (e.g. completed or failed) has already been set
747 if task.status in (
748 TaskStatus.Completed,
749 TaskStatus.Failed,
750 TaskStatus.Cancelled,
751 TaskStatus.Cancelling
752 ):
753 return
755 if task.parent is None and task.children.exists():
756 has_children = True
758 # Use queryset-level updates to avoid loading potentially
759 # hundreds of thousands of child Task objects into memory.
760 # PostgreSQL guarantees atomicity of each UPDATE statement,
761 # so the WHERE clause filters at execution time.
762 non_terminal_children = task.children.exclude(
763 status__in=[
764 TaskStatus.Completed,
765 TaskStatus.Failed,
766 TaskStatus.Cancelled,
767 TaskStatus.Cancelling,
768 ]
769 )
770 # Immediately cancel pending children; they haven't been dispatched yet.
771 cancelled_pending_child_count = (
772 non_terminal_children.filter(status=TaskStatus.Pending)
773 .update(status=TaskStatus.Cancelled, completed_time=cancelled_at)
774 )
775 # Mark remaining non-terminal children (e.g. Running) as Cancelling.
776 non_terminal_children.exclude(status=TaskStatus.Pending).update(
777 status=TaskStatus.Cancelling
778 )
780 Task.increment_cancelled_children_for_parent(task.id, cancelled_pending_child_count)
782 to_cache_cancel.add(task.id)
783 to_send_cancel.append(task.id)
784 task.status = TaskStatus.Cancelling
785 else:
786 task.status = TaskStatus.Cancelling
787 to_cache_cancel.add(task.id)
788 to_send_cancel.append(task.id)
790 task.save()
792 cache_cancelled_tasks(to_cache_cancel)
793 for cancel_task_id in to_send_cancel:
794 # For MSS, broadcasting just the parent task ID is enough: every child
795 # solve consults shared cancellation state for both its own ID and its
796 # parent ID before starting work.
797 messaging.send_task_cancel_message(cancel_task_id)
799 # Send a single parent-level notification; for parent tasks with children,
800 # include a children-invalidated event so the frontend drops its cached
801 # child pages and refetches on demand rather than processing one message
802 # per child task.
803 notification_messages = [
804 NotificationServiceMessage(
805 data=TaskSerializer(task).data,
806 message_type=NotificationServiceMessageType.TASK_CANCELLING,
807 )
808 ]
809 if has_children:
810 notification_messages.append(
811 NotificationServiceMessage(
812 data=TaskSerializer(task).data,
813 message_type=NotificationServiceMessageType.TASK_CHILDREN_CANCELLED,
814 )
815 )
816 messaging.send_flowsheet_notification_messages(
817 task.flowsheet_id, notification_messages,
818 )
819 mark_parent_cancelled(task)
822def process_cancel_solve_response(cancel_response: TaskPayload):
823 """Persist a remote cancellation acknowledgement from the IDAES service.
825 The IDAES service emits this only after it has successfully interrupted the
826 active solver, so Django can safely settle the task to a terminal
827 ``cancelled`` state and then check whether the parent can also be finalised.
828 """
829 notification_message: NotificationServiceMessage | None = None
830 # Use a transaction to ensure that either everything succeeds or nothing does
831 with transaction.atomic():
832 try:
833 task = Task.objects.select_for_update().get(id=cancel_response.task_id)
834 except Task.DoesNotExist:
835 logger.info(
836 "Discarding cancel acknowledgement for missing task_id=%s.",
837 cancel_response.task_id,
838 )
839 return
841 # Timeout expiry can arrive before Django transitions a task to
842 # `Cancelling`, so accept pending/running states for timeout paths.
843 allowed_statuses = {TaskStatus.Cancelling}
844 if cancel_response.timed_out:
845 allowed_statuses.update((TaskStatus.Pending, TaskStatus.Running))
847 if task.status not in allowed_statuses:
848 return
850 task.status = TaskStatus.Cancelled
851 task.completed_time = timezone.now()
852 # Preserve timeout metadata on the task debug payload so downstream
853 # notifications have the value that actually governed the solve.
854 solve_timeout_seconds = (task.debug or {}).get(
855 "solve_timeout_seconds",
856 SOLVE_TIMEOUT_DEFAULT_SECONDS,
857 )
858 task.debug = {
859 **(task.debug or {}),
860 "timed_out": bool(cancel_response.timed_out),
861 "solve_timeout_seconds": solve_timeout_seconds,
862 }
863 if cancel_response.timed_out:
864 timeout_message = (
865 f"Solve timed out after {solve_timeout_seconds} seconds."
866 )
867 task.log = (
868 f"{task.log}\n{timeout_message}" if task.log else timeout_message
869 )
870 task.save(update_fields=["status", "completed_time", "debug", "log"])
871 if task.parent_id:
872 Task.increment_cancelled_children_for_parent(task.parent_id)
873 notification_message = NotificationServiceMessage(
874 data=TaskSerializer(task).data,
875 message_type=NotificationServiceMessageType.TASK_CANCELLED
876 )
878 if notification_message is not None: 878 ↛ 884line 878 didn't jump to line 884 because the condition on line 878 was always true
879 messaging.send_flowsheet_notification_messages(
880 task.flowsheet_id, [notification_message]
881 )
882 queue_solve_completion_email_for_task(task)
884 mark_parent_cancelled(task)
887def process_build_state_response(build_state_response: BuildStateCompletionPayload):
888 """
889 Apply an asynchronous build-state completion payload from IDAES.
891 Successful responses update property values for the originating flowsheet.
892 App-level failures are intentionally acknowledged without applying partial
893 data and then surfaced to frontend subscribers.
894 """
895 context = build_state_response.context
896 if context is None:
897 logger.warning(
898 "Discarding build-state response without request context."
899 )
900 return
902 if not _settle_build_state_task_from_completion(build_state_response):
903 return
905 if not _is_current_build_state_response(context):
906 return
908 if build_state_response.status == CompletionStatus.SUCCESS:
909 store_properties_schema(
910 build_state_response.properties,
911 context.flowsheet_id,
912 )
913 _send_build_state_completion_notification(build_state_response)
914 return
916 logger.warning(
917 "Build-state request failed for flowsheet_id=%s stream_id=%s "
918 "property_set_id=%s: %s",
919 context.flowsheet_id,
920 context.stream_id,
921 context.property_set_id,
922 build_state_response.error,
923 )
924 _send_build_state_completion_notification(build_state_response)
927def _settle_build_state_task_from_completion(
928 build_state_response: BuildStateCompletionPayload,
929) -> bool:
930 """Record the final status for a tracked build-state request.
932 Returns ``False`` when a terminal task already exists for this payload. That
933 lets duplicate completion deliveries remain idempotent, matching solve and
934 ML task handling.
935 """
936 context = build_state_response.context
937 if context is None: 937 ↛ 938line 937 didn't jump to line 938 because the condition on line 937 was never true
938 return True
940 with transaction.atomic():
941 try:
942 task = Task.objects.select_for_update().get(id=context.task_id)
943 except Task.DoesNotExist:
944 logger.info(
945 "Discarding build-state completion for missing task_id=%s.",
946 context.task_id,
947 )
948 return False
950 if task.status in ( 950 ↛ 955line 950 didn't jump to line 955 because the condition on line 950 was never true
951 TaskStatus.Completed,
952 TaskStatus.Failed,
953 TaskStatus.Cancelled,
954 ):
955 return False
957 if task.status == TaskStatus.Cancelling: 957 ↛ 958line 957 didn't jump to line 958 because the condition on line 957 was never true
958 task.status = TaskStatus.Cancelled
959 elif build_state_response.status == CompletionStatus.SUCCESS:
960 task.status = TaskStatus.Completed
961 else:
962 _revert_build_state_task_values(task)
963 task.status = TaskStatus.Failed
964 task.error = {
965 "message": build_state_response.error,
966 "cause": "build_state_request",
967 "traceback": build_state_response.traceback,
968 }
970 task.completed_time = timezone.now()
971 task.log = build_state_response.log
972 task.debug = {
973 **(task.debug or {}),
974 "context": context.model_dump(mode="json"),
975 }
976 task.save(update_fields=["status", "completed_time", "log", "debug", "error"])
978 _send_task_notifications(task)
979 return task.status != TaskStatus.Cancelled
982def _is_current_build_state_response(context: BuildStateRequestContext) -> bool:
983 """Return whether a build-state response still matches the latest request."""
984 return _is_current_build_state_context(context, "build-state response")
987def _is_current_build_state_context(
988 context: BuildStateRequestContext,
989 event_label: str,
990) -> bool:
991 """Return whether a build-state event context still matches the latest request."""
992 try:
993 latest_request_version = BuildStateRequestVersion.objects.only("version").get(
994 property_set_id=context.property_set_id,
995 ).version
996 except BuildStateRequestVersion.DoesNotExist:
997 logger.warning(
998 "Discarding %s because no request version tracker exists "
999 "for flowsheet_id=%s stream_id=%s property_set_id=%s request_version=%s.",
1000 event_label,
1001 context.flowsheet_id,
1002 context.stream_id,
1003 context.property_set_id,
1004 context.request_version,
1005 )
1006 return False
1008 if latest_request_version != context.request_version:
1009 logger.info(
1010 "Discarding stale %s for flowsheet_id=%s stream_id=%s "
1011 "property_set_id=%s request_version=%s latest_request_version=%s.",
1012 event_label,
1013 context.flowsheet_id,
1014 context.stream_id,
1015 context.property_set_id,
1016 context.request_version,
1017 latest_request_version,
1018 )
1019 return False
1021 return True
1024def _send_build_state_completion_notification(
1025 build_state_response: BuildStateCompletionPayload,
1026):
1027 """Broadcast a build-state completion update to flowsheet subscribers."""
1028 context = build_state_response.context
1029 if context is None: 1029 ↛ 1030line 1029 didn't jump to line 1030 because the condition on line 1029 was never true
1030 return
1032 payload = BuildStateCompletedPayload(
1033 context=context,
1034 status=build_state_response.status,
1035 properties=build_state_response.properties,
1036 error=build_state_response.error,
1037 failure_kind=(
1038 "idaes-error"
1039 if build_state_response.status == CompletionStatus.ERROR
1040 else None
1041 ),
1042 )
1043 messaging.send_flowsheet_notification_message(
1044 context.flowsheet_id,
1045 payload.model_dump(mode="json"),
1046 NotificationServiceMessageType.BUILD_STATE_COMPLETED,
1047 )
1050def process_build_state_request_dead_letter(build_state_request: BuildStateRequestSchema):
1051 """Record that a build-state request failed broker delivery.
1053 Dead letters are transport-level failures, not IDAES state-block solve
1054 failures. This handler avoids applying property data and only emits a
1055 diagnostic notification when the dead-lettered request is still the latest
1056 request.
1057 """
1058 context = build_state_request.context
1059 if context is None:
1060 logger.warning(
1061 "Build-state request reached the dead-letter topic without request context."
1062 )
1063 return
1065 if not _settle_build_state_task_from_delivery_failure(context):
1066 return
1068 if not _is_current_build_state_context(context, "build-state request dead letter"): 1068 ↛ 1069line 1068 didn't jump to line 1069 because the condition on line 1068 was never true
1069 return
1071 logger.warning(
1072 "Build-state request delivery failed for flowsheet_id=%s stream_id=%s "
1073 "property_set_id=%s request_version=%s.",
1074 context.flowsheet_id,
1075 context.stream_id,
1076 context.property_set_id,
1077 context.request_version,
1078 )
1079 _send_build_state_delivery_failure_notification(context)
1082def _settle_build_state_task_from_delivery_failure(
1083 context: BuildStateRequestContext,
1084) -> bool:
1085 """Mark a tracked build-state request as failed when broker delivery fails."""
1086 with transaction.atomic():
1087 try:
1088 task = Task.objects.select_for_update().get(id=context.task_id)
1089 except Task.DoesNotExist:
1090 logger.info(
1091 "Discarding build-state delivery failure for missing task_id=%s.",
1092 context.task_id,
1093 )
1094 return False
1096 if task.status in ( 1096 ↛ 1101line 1096 didn't jump to line 1101 because the condition on line 1096 was never true
1097 TaskStatus.Completed,
1098 TaskStatus.Failed,
1099 TaskStatus.Cancelled,
1100 ):
1101 return False
1103 task.status = (
1104 TaskStatus.Cancelled
1105 if task.status == TaskStatus.Cancelling
1106 else TaskStatus.Failed
1107 )
1108 if task.status == TaskStatus.Failed: 1108 ↛ 1110line 1108 didn't jump to line 1110 because the condition on line 1108 was always true
1109 _revert_build_state_task_values(task)
1110 task.completed_time = timezone.now()
1111 task.error = {
1112 "message": "Build-state request delivery failed.",
1113 "cause": "message_delivery",
1114 "traceback": None,
1115 }
1116 task.debug = {
1117 **(task.debug or {}),
1118 "context": context.model_dump(mode="json"),
1119 }
1120 task.save(update_fields=["status", "completed_time", "error", "debug"])
1122 _send_task_notifications(task)
1123 return task.status != TaskStatus.Cancelled
1126def _send_build_state_delivery_failure_notification(
1127 context: BuildStateRequestContext,
1128):
1129 """Broadcast a silent transport-level build-state request failure event."""
1130 payload = BuildStateCompletedPayload(
1131 context=context,
1132 status="error",
1133 properties=None,
1134 error=None,
1135 failure_kind="delivery-failure",
1136 )
1137 messaging.send_flowsheet_notification_message(
1138 context.flowsheet_id,
1139 payload.model_dump(mode="json"),
1140 NotificationServiceMessageType.BUILD_STATE_COMPLETED,
1141 )
1144def generate_IDAES_python_request(flowsheet_id: int) -> Response:
1145 """Return the flowsheet JSON for the given flowsheet.
1147 The previous behaviour attempted to generate Python source by forwarding
1148 the flowsheet to the IDAES service. Python generation is no longer
1149 supported; this function now always returns the local flowsheet JSON.
1150 """
1151 scenario = None
1152 flowsheet = Flowsheet.objects.get(id=flowsheet_id)
1153 factory = IdaesFactory(group_id=flowsheet.rootGrouping.id, scenario=scenario, require_variables_fixed=False)
1154 response_data = ResponseType(
1155 status="success",
1156 error=None,
1157 log=None,
1158 debug=None
1159 )
1160 try:
1161 factory.build()
1162 data = factory.flowsheet
1163 # use model_dump_json so that pydantic can handle any non-serializable fields instead
1164 # of django's logic which would error out.
1165 return Response(data.model_dump_json(), status=200)
1166 except Exception as e:
1167 response_data["status"] = "error"
1168 response_data["error"] = {
1169 "message": str(e),
1170 "traceback": traceback.format_exc()
1171 }
1172 return Response(response_data, status=400)
1173 # NOTE: Python generation used to be implemented by forwarding the
1174 # flowsheet to the IDAES service; that pathway has been removed. We
1175 # have already returned above with the JSON representation.
1178class BuildStateSolveError(Exception):
1179 pass
1182def _next_build_state_request_version(property_set: PropertySet) -> int:
1183 """Increment and return the latest build-state request version for a property set."""
1184 try:
1185 return _next_build_state_request_version_locked(property_set)
1186 except IntegrityError:
1187 # A concurrent first request may have created the tracker between the
1188 # get and create inside get_or_create. Retry once and lock the new row.
1189 return _next_build_state_request_version_locked(property_set)
1192def _next_build_state_request_version_locked(property_set: PropertySet) -> int:
1193 """Increment the version while holding a row lock for this property set."""
1194 with transaction.atomic():
1195 tracker, _ = BuildStateRequestVersion.objects.select_for_update().get_or_create(
1196 property_set_id=property_set.id,
1197 defaults={"flowsheet_id": property_set.flowsheet_id},
1198 )
1199 tracker.version += 1
1200 tracker.save(update_fields=["version", "updated_at"])
1201 return tracker.version
1204def state_request_build(
1205 stream: SimulationObject,
1206 *,
1207 task_id: int,
1208) -> BuildStateRequestSchema:
1209 """Build the IDAES service payload for a stream state request."""
1210 ctx = IdaesFactoryContext(stream.flowsheet.rootGrouping.id)
1211 property_set = stream.properties
1213 port = stream.connectedPorts.get(direction="inlet")
1214 unitop: SimulationObject = port.unitOp
1215 # find the property package key for this port (we have the value, the key of this port)
1216 property_package_ports = unitop.schema.propertyPackagePorts
1217 for key, port_list in property_package_ports.items():
1218 if port.key in port_list:
1219 property_package_key = key
1220 PropertyPackageAdapter(
1221 property_package_key).serialise(ctx, unitop)
1223 return BuildStateRequestSchema(
1224 property_package=ctx.property_packages[0],
1225 properties=serialise_stream(ctx, stream, is_inlet=True),
1226 context=BuildStateRequestContext(
1227 flowsheet_id=stream.flowsheet_id,
1228 stream_id=stream.id,
1229 task_id=task_id,
1230 property_set_id=property_set.id,
1231 request_version=_next_build_state_request_version(property_set),
1232 ),
1233 )
1236def _create_build_state_task(
1237 stream: SimulationObject,
1238 user: User | int,
1239 rollback_values: dict[int, float] | None = None,
1240) -> Task:
1241 """Create the tracking task used for a build-state request."""
1242 task = Task.create(
1243 user,
1244 stream.flowsheet_id,
1245 task_type=TaskType.BUILD_STATE,
1246 status=TaskStatus.Pending,
1247 save=True,
1248 )
1249 task.debug = {
1250 "stream_id": stream.id,
1251 "property_set_id": stream.properties.id,
1252 "rollback_values": (
1253 {str(prop_id): value for prop_id, value in rollback_values.items()}
1254 if rollback_values
1255 else {}
1256 ),
1257 }
1258 task.save(update_fields=["debug"])
1259 return task
1262def _revert_build_state_task_values(task: Task) -> None:
1263 """Restore raw property values captured before the build-state request."""
1264 rollback_values = (task.debug or {}).get("rollback_values")
1265 if not rollback_values:
1266 return
1268 property_values = list(
1269 PropertyValue.objects.filter(id__in=[int(prop_id) for prop_id in rollback_values])
1270 )
1271 for property_value in property_values:
1272 property_value.value = rollback_values[str(property_value.id)]
1274 PropertyValue.objects.bulk_update(property_values, ["value"])
1277def _fail_build_state_task_before_dispatch(
1278 task: Task,
1279 exception: Exception,
1280 *,
1281 cause: str,
1282) -> None:
1283 """Persist a build-state setup/dispatch failure before re-raising it."""
1284 _revert_build_state_task_values(task)
1285 task.set_failure_with_exception(
1286 DetailedException(exception, source=cause),
1287 save=True,
1288 )
1289 _send_task_notifications(task)
1292def _mark_build_state_task_dispatched(
1293 task: Task,
1294 context: BuildStateRequestContext | None,
1295) -> None:
1296 """Persist dispatch metadata for a queued build-state task."""
1297 task.debug = {
1298 **(task.debug or {}),
1299 "idaes_dispatched": True,
1300 "context": (
1301 context.model_dump(mode="json") if context is not None else None
1302 ),
1303 }
1304 task.save(update_fields=["debug"])
1307def build_state_request(
1308 stream: SimulationObject,
1309 user: User | int,
1310 rollback_values: dict[int, float] | None = None,
1311):
1312 """Queue a state build request for the provided stream.
1314 Args:
1315 stream: Stream object whose inlet properties should be used for the build.
1316 user: User requesting the property update that triggered the build.
1318 Returns:
1319 REST response containing the tracking task. Built properties are applied
1320 later by ``process_build_state_response`` when IDAES publishes the
1321 completion event.
1323 Raises:
1324 BuildStateSolveError: If the request cannot be queued.
1325 Exception: If preparing the payload fails for any reason.
1326 """
1327 task = _create_build_state_task(
1328 stream,
1329 user,
1330 rollback_values=rollback_values,
1331 )
1332 try:
1333 data = state_request_build(stream, task_id=task.id)
1334 messaging.send_idaes_build_state_request_message(data)
1335 _mark_build_state_task_dispatched(task, data.context)
1336 return Response(TaskSerializer(task).data, status=202)
1337 except Exception as e:
1338 _fail_build_state_task_before_dispatch(
1339 task,
1340 e,
1341 cause="build_state_dispatch",
1342 )
1343 raise BuildStateSolveError(str(e))