Coverage for backend/django/diagnostics/orchestrator.py: 63%
230 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-02-11 21:43 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-02-11 21:43 +0000
1"""
2Diagnostics orchestration and parsing utilities.
4This module is responsible for turning various failure sources (Task failures,
5IDAES solve payloads, DiagnosticsToolbox blobs) into a stable, deterministic
6"diagnostic run" representation.
8Notes on design (the "why"):
9- We keep a small set of deterministic rulesets/checkpoints that can block early
10 on obvious structural/numerical issues.
11- We persist a bounded snapshot into `DiagnosticRun.summary` so the UI can show
12 evidence without relying on short lived logs.
13"""
15from dataclasses import dataclass
17from django.db import transaction
18from django.db.models import Prefetch
19from django.utils import timezone
21from pydantic import BaseModel, Field, JsonValue
23from diagnostics.rules import evaluate_property_rules
24from diagnostics.schemas import DiagnosticsFinding
25from common.config_types import ObjectType
26from flowsheetInternals.unitops.config.config_methods import get_object_schema
27from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
29from core.auxiliary.models.PropertyInfo import PropertyInfo
30from core.auxiliary.models.PropertyValue import PropertyValue
32from diagnostics.models import DiagnosticRun
33from diagnostics.constants import (
34 DiagnosticRunStatus,
35 DiagnosticTrigger,
36 MAX_VARIABLES_OUTSIDE_BOUNDS_IN_RULESET_DATA,
37 TERMINAL_STATES,
38 MAX_VARIABLES_OUTSIDE_BOUNDS_IN_SUMMARY,
39)
40from core.auxiliary.models.Task import Task
41from diagnostics.failure_bundle import FailureBundle, VariableOutsideBounds
42from diagnostics.parsers import extract_variables_outside_bounds
44class FailureBundleSummary(BaseModel):
45 """
46 Snapshot of failure evidence stored in `DiagnosticRun.summary`.
48 I keep this intentionally small so the DB doesn't fill up with noisy,
49 unstable upstream payloads.
50 """
52 task_id: int
53 flowsheet_id: int
54 solve_index: int | None = None
55 trigger: str
56 scenario_id: int | None = None
58 solver: JsonValue = Field(default_factory=dict)
59 eval_errors: list[str] | None = None
60 structural: JsonValue = Field(default_factory=dict)
61 numerical: JsonValue = Field(default_factory=dict)
63 has_diagnostics_raw_text: bool = False
66class DiagnosticRunSummary(BaseModel):
67 """
68 The JSON stored in `DiagnosticRun.summary`.
70 I use "rulesets" because it's more obvious what this is when you read the
71 code.
72 """
74 rulesets_passed: list[str] = Field(default_factory=list)
75 failure_bundle: FailureBundleSummary
76 findings: list[DiagnosticsFinding] = Field(default_factory=list)
78 blocked_at: str | None = None
79 blocked_reason: str | None = None
82@dataclass
83class RuleSetResult:
84 blocked: bool
85 reason: str | None = None
86 # Ruleset-specific metadata. We keep this as JSON-ish because each
87 # ruleset can attach different evidence to show in events.
88 data: JsonValue | None = None
92def _ruleset1_eval_errors(bundle: FailureBundle) -> RuleSetResult:
93 # Record eval_errors as evidence but don't block — later rulesets (structural, OOB)
94 # catch specific actionable issues. Blocking here was too aggressive for generic errors.
95 if bundle.eval_errors:
96 return RuleSetResult(False, data={"eval_errors": bundle.eval_errors})
97 return RuleSetResult(False)
100# TODO: confirm this structural gate belongs in RULESET2 (or should move).
101def _ruleset2_structural(bundle: FailureBundle) -> RuleSetResult:
102 # Structural problems (DoF, unit consistency) are not fixable by a single value,
103 # so we block early when we detect them.
104 dof = bundle.structural.dof
105 if dof is not None and dof != 0: 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true
106 return RuleSetResult(True, reason="structural_dof_nonzero", data={"dof": dof})
107 # TODO: Over/under-constrained sets are printed as text by DiagnosticsToolbox.
108 # We don't parse those lines yet, so these fields are currently None.
109 # Plan: add a parser, then enable the check below.
110 # over = bundle.structural.overconstrained
111 # under = bundle.structural.underconstrained
112 # if over or under:
113 # return RuleSetResult(True, reason="structural_issue", data={"over": over, "under": under})
114 if bundle.structural.inconsistent_units: 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true
115 return RuleSetResult(
116 True,
117 reason="inconsistent_units",
118 data={"inconsistent_units": bundle.structural.inconsistent_units},
119 )
120 return RuleSetResult(False)
123def _ruleset_outside_bounds(bundle: FailureBundle) -> RuleSetResult:
124 """
125 Block immediately when DiagnosticsToolbox reports variables at/outside bounds.
127 This captures cases like:
128 "model contains a trivially infeasible variable '...pressure' (fixed value ... outside bounds [...])"
129 """
130 vars_oob = bundle.numerical.variables_outside_bounds or []
131 if vars_oob:
132 # Cap ruleset event payload so we don't store huge blobs.
133 cap = [
134 v.model_dump(exclude_none=True)
135 for v in vars_oob[:MAX_VARIABLES_OUTSIDE_BOUNDS_IN_RULESET_DATA]
136 ]
137 return RuleSetResult(
138 True,
139 reason="variables_outside_bounds",
140 data={
141 "variables_outside_bounds": cap,
142 "truncated": len(vars_oob) > len(cap),
143 },
144 )
145 return RuleSetResult(False)
148def _ruleset3_rule_violations(bundle: FailureBundle) -> RuleSetResult:
149 # Placeholder for future "policy" enforcement (site / project rules).
150 return RuleSetResult(False)
153def _ruleset4_numerical(bundle: FailureBundle) -> RuleSetResult:
154 # Numerical triage: allow completion but emit findings candidates.
155 return RuleSetResult(False, data={"numerical": bundle.numerical.model_dump()})
158def guess_property_key_from_var_name(var_name: str) -> str | None:
159 """Extract property key from IDAES variable names like '...properties_in[0.0].pressure' -> 'pressure'."""
160 # This is a best-effort heuristic; if the toolbox output changes format we
161 # should update the tests and tweak this rather than making it "clever".
162 if "." not in var_name:
163 return None
164 return var_name.split(".")[-1].strip() or None
167def _is_container_object(obj: SimulationObject) -> bool:
168 return obj.objectType in ("group", "Grouping")
171def _load_flowsheet_objects(flowsheet_id: int) -> list[SimulationObject]:
172 # Prefetch aggressively here to keep rule evaluation from doing N+1 queries
173 # when we walk objects -> properties -> values.
174 #
175 # Important: when prefetching a one-to-many relation, you must also
176 # `select_related()` inside the Prefetch queryset for any FK/OneToOne fields
177 # you access during evaluation (see SimulationObjectViewSet / GraphicObjectViewSet).
178 return list(
179 SimulationObject.objects.filter(flowsheet_id=flowsheet_id, is_deleted=False)
180 .select_related("properties")
181 .prefetch_related(
182 Prefetch(
183 "properties__ContainedProperties",
184 queryset=PropertyInfo.objects.select_related("recycleConnection").prefetch_related(
185 Prefetch(
186 "values",
187 queryset=PropertyValue.objects.select_related(
188 "controlManipulated",
189 "controlSetPoint",
190 ).prefetch_related("indexedItems"),
191 )
192 ),
193 ),
194 "ports__stream__connectedPorts__unitOp",
195 )
196 )
199def _get_object_schema_cached(obj: SimulationObject) -> ObjectType | None:
200 cached = getattr(obj, "_diagnostics_schema", None)
201 if cached is not None:
202 return cached
203 try:
204 schema = get_object_schema(obj)
205 except Exception:
206 schema = None
207 setattr(obj, "_diagnostics_schema", schema)
208 return schema
211def _allowed_property_keys(obj: SimulationObject) -> set[str]:
212 cached = getattr(obj, "_diagnostics_allowed_keys", None)
213 if cached is not None:
214 return cached
215 schema = _get_object_schema_cached(obj)
216 # `get_object_schema` is not cheap; caching the allowed keys avoids repeated
217 # schema calls when we're iterating many variables/findings.
218 keys = set(schema.properties.keys()) if schema else set()
219 setattr(obj, "_diagnostics_allowed_keys", keys)
220 return keys
223def _build_rule_findings(objects: list[SimulationObject]) -> list[DiagnosticsFinding]:
224 # The rules engine already produces the "frontend shape". I still wrap it in
225 # a model here so the orchestrator doesn't pass around anonymous dicts.
226 findings: list[DiagnosticsFinding] = []
227 for obj in objects:
228 if _is_container_object(obj):
229 continue
230 for raw in evaluate_property_rules(obj): 230 ↛ 233line 230 didn't jump to line 233 because the loop on line 230 never started
231 # Convert RuleFinding (Pydantic) to DiagnosticsFinding (Pydantic)
232 # using model_dump() to avoid type mismatch issues
233 findings.append(DiagnosticsFinding.model_validate(raw.model_dump(exclude_none=True)))
234 return findings
237def _match_object_for_var(var_name: str, objects: list[SimulationObject]) -> SimulationObject | None:
238 # Another best-effort heuristic:
239 # - sometimes the var string includes the componentName
240 # - otherwise the DB id often shows up as `_{id}`
241 if not var_name: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true
242 return None
243 for obj in objects:
244 if obj.componentName and obj.componentName in var_name: 244 ↛ 245line 244 didn't jump to line 245 because the condition on line 244 was never true
245 return obj
246 if f"_{obj.id}" in var_name: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true
247 return obj
248 return None
251def _build_bounds_findings(
252 objects: list[SimulationObject],
253 variables: list[VariableOutsideBounds],
254) -> list[DiagnosticsFinding]:
255 """
256 Turn out-of-bounds variables into UI-friendly findings.
258 I iterate through each variable the toolbox flagged, try to match it back
259 to a SimulationObject/property, and emit a finding with a suggested fix.
260 If I can't confidently match, I skip it rather than showing garbage.
261 """
262 if not variables:
263 return []
265 findings: list[DiagnosticsFinding] = []
266 for var in variables:
267 name = var.name
268 if not name: 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true
269 continue
271 # Try to figure out which object this variable belongs to. The IDAES
272 # variable names are messy (e.g. "unit.properties_in[0.0].pressure"),
273 # so this is best-effort heuristics.
274 obj = _match_object_for_var(name, objects)
275 if not obj or _is_container_object(obj) or not getattr(obj, "properties", None): 275 ↛ 279line 275 didn't jump to line 279 because the condition on line 275 was always true
276 continue
278 # Extract the property key from the variable name (e.g. "pressure").
279 prop_key = guess_property_key_from_var_name(name)
280 if not prop_key:
281 continue
283 # Only show findings for properties that actually exist in the schema.
284 # This avoids confusing the user with internal IDAES variables.
285 if prop_key not in _allowed_property_keys(obj):
286 continue
288 prop = obj.properties.ContainedProperties.filter(key=prop_key).first()
289 if not prop:
290 continue
292 # Figure out which bound was violated so I can suggest the right value.
293 try:
294 value = float(var.value)
295 except (TypeError, ValueError):
296 value = None
297 try:
298 lower = float(var.lower)
299 except (TypeError, ValueError):
300 lower = None
301 try:
302 upper = float(var.upper)
303 except (TypeError, ValueError):
304 upper = None
305 suggested = None
306 if value is not None and lower is not None and value < lower:
307 suggested = lower
308 elif value is not None and upper is not None and value > upper:
309 suggested = upper
310 if suggested is None:
311 continue
313 # Check if the property is formula-controlled or indexed. This affects
314 # how I phrase the suggested fix — I can't just say "set it" if it's
315 # computed from a formula.
316 values = list(prop.values.all())
317 has_formula = any(v.formula for v in values)
318 is_scalar = len(values) == 1 and not values[0].indexedItems.exists()
319 display_name = prop.displayName or prop_key
321 finding = DiagnosticsFinding(
322 id=f"IDAES_BOUNDS:{name}",
323 severity="error",
324 title="Value outside IDAES bounds",
325 description=(
326 f"{display_name} is fixed to {var.value} outside bounds "
327 f"[{var.lower}, {var.upper}]."
328 ),
329 ruleReference="IDAES_BOUNDS_CHECK",
330 componentName=obj.componentName,
331 componentId=obj.id,
332 propertyKey=prop_key,
333 propertyId=prop.id,
334 suggestedValue=suggested,
335 )
337 # Tailor the fix suggestion based on whether the property is editable.
338 if has_formula:
339 finding.suggestedFix = (
340 f"{display_name} is formula-controlled. Update the formula to move toward {suggested}."
341 )
342 elif is_scalar:
343 # For simple scalar properties, I can offer a one-click fix action.
344 finding.suggestedFix = f"Set {display_name} to {suggested}"
345 finding.fixAction = {
346 "kind": "property_value",
347 "propertyId": prop.id,
348 "suggestedValue": suggested,
349 }
351 findings.append(finding)
353 return findings
356# Rulesets are deterministic checkpoints that run sequentially after a solve failure.
357# They mirror the IDAES DiagnosticsToolbox workflow: structural checks first, then numerical.
358# IDs are stable strings persisted into `DiagnosticRun.summary` and events for backwards compat.
359#
360# RULESET1: Evaluation errors - records error messages as evidence but does NOT block.
361# Later rulesets (structural, OOB) catch specific actionable issues.
362#
363# RULESET2: Structural issues - verifies the model is "square" (zero degrees of freedom).
364# Non-zero DOF means the model is over/under-constrained and can't be solved.
365# Also catches unit inconsistencies (e.g., mixing kg and lbs).
366#
367# RULESET2B: Variables outside bounds - flags variables where the fixed value violates
368# IDAES property bounds (e.g., pressure fixed to -100 when bounds are [0, 1e8]).
369# These cause "trivially infeasible" errors and are often user input mistakes.
370#
371# RULESET3: Policy/rule violations - placeholder for site/project-specific constraints
372# (e.g., "temperature must be < 500K for this equipment type").
373#
374# RULESET4: Numerical triage - catches remaining numerical warnings (e.g., poorly scaled
375# variables, near-singular Jacobian) but doesn't block; just surfaces findings.
376RULESETS = [
377 ("RULESET1", _ruleset1_eval_errors),
378 ("RULESET2", _ruleset2_structural),
379 ("RULESET2B", _ruleset_outside_bounds),
380 ("RULESET3", _ruleset3_rule_violations),
381 ("RULESET4", _ruleset4_numerical),
382]
385def _transition(run: DiagnosticRun, state: str, data: JsonValue | None = None) -> None:
386 # State transitions are always recorded as events so the UI can show progress
387 # and we can debug what happened later.
388 run.diagnostic_status = state
389 if state in (DiagnosticRunStatus.COLLECTING, DiagnosticRunStatus.GATING) and run.started_at is None:
390 run.started_at = timezone.now()
391 if state in TERMINAL_STATES:
392 run.completed_at = timezone.now()
393 run.save(update_fields=["diagnostic_status", "started_at", "completed_at", "updated_at"])
394 run.append_event("state_transition", {"state": state, **(data or {})})
397def run_diagnostics_for_task(task: Task, failure_bundle: FailureBundle, trigger: str = DiagnosticTrigger.SOLVE_FAILURE) -> DiagnosticRun:
398 """
399 Run deterministic rulesets for a task failure and persist DiagnosticRun/Events.
400 """
401 # I keep the orchestrator model-first:
402 # `failure_bundle` is already validated, and every "finding" we emit is typed
403 # until the moment we dump JSON into the DB/event stream.
404 # `failure_bundle` comes from `build_failure_bundle_from_payload()` /
405 # `build_failure_bundle_from_task()` and is already validated.
406 fb = failure_bundle.model_copy(update={"task_id": task.id, "flowsheet_id": task.flowsheet_id, "trigger": trigger})
408 summary: DiagnosticRunSummary | None = None
410 with transaction.atomic():
411 run = DiagnosticRun.objects.create(
412 flowsheet=task.flowsheet,
413 task=task,
414 diagnostic_status=DiagnosticRunStatus.CREATED,
415 trigger=trigger,
416 )
417 run.append_event("created", {"trigger": trigger})
419 # COLLECTING and GATING states transition back-to-back with no work in between.
420 # COLLECTING is a placeholder for future async data gathering (e.g., fetching
421 # historical solve logs, pulling in related flowsheet context).
422 # For now, it's kept to maintain state machine consistency and provide a hook
423 # for future enhancements without requiring migration changes.
424 _transition(run, DiagnosticRunStatus.COLLECTING)
425 _transition(run, DiagnosticRunStatus.GATING)
427 scenario_id = None
428 if isinstance(task.debug, dict):
429 scenario_id = task.debug.get("scenario_id")
431 objects = _load_flowsheet_objects(task.flowsheet_id)
433 # "Base" findings are deterministic rule warnings/errors for objects/properties.
434 # These are safe to show even when we later block early on a ruleset.
435 base_findings: list[DiagnosticsFinding] = []
436 # OOB findings are different: they are specific "variable is outside bounds" items
437 # that we only attach when the outside-bounds ruleset is what blocks the run.
438 oob_findings: list[DiagnosticsFinding] = []
439 try:
440 base_findings.extend(_build_rule_findings(objects))
441 except Exception as exc:
442 # Don't crash the whole run if findings generation breaks.
443 # Keep going with empty findings, but surface a warning for debugging/UI.
444 base_findings.append(
445 DiagnosticsFinding(
446 id="FINDINGS_ERROR:base_findings",
447 severity="warning",
448 title="Couldn't generate diagnostic findings",
449 description=f"Base findings failed: {exc}",
450 ruleReference="FINDINGS_GENERATION_ERROR",
451 )
452 )
454 try:
455 oob_findings = _build_bounds_findings(objects, fb.numerical.variables_outside_bounds or [])
456 except Exception as exc:
457 # Bounds findings only show up for the outside-bounds ruleset; add a warning to the
458 # main findings list so the failure is still visible in the summary.
459 base_findings.append(
460 DiagnosticsFinding(
461 id="FINDINGS_ERROR:oob_findings",
462 severity="warning",
463 title="Couldn't generate bounds findings",
464 description=f"Bounds findings failed: {exc}",
465 ruleReference="FINDINGS_GENERATION_ERROR",
466 )
467 )
471 # Persist a stable, bounded snapshot of the failure bundle into summary
472 # so the Diagnostics tab can show evidence without relying on Task.log.
473 # We cap large lists here to keep DB storage + API payload size reasonable.
474 # The UI can scroll, but we still don't want to store/ship huge blobs.
475 numerical_summary = fb.numerical.model_dump()
476 vars_oob = numerical_summary.get("variables_outside_bounds") or []
477 if len(vars_oob) > MAX_VARIABLES_OUTSIDE_BOUNDS_IN_SUMMARY: 477 ↛ 478line 477 didn't jump to line 478 because the condition on line 477 was never true
478 numerical_summary["variables_outside_bounds"] = vars_oob[:MAX_VARIABLES_OUTSIDE_BOUNDS_IN_SUMMARY]
479 numerical_summary["variables_outside_bounds_truncated"] = True
480 numerical_summary["variables_outside_bounds_total"] = len(vars_oob)
482 summary = DiagnosticRunSummary(
483 rulesets_passed=[],
484 failure_bundle=FailureBundleSummary(
485 task_id=fb.task_id,
486 flowsheet_id=fb.flowsheet_id,
487 solve_index=fb.solve_index,
488 trigger=fb.trigger,
489 scenario_id=scenario_id,
490 solver=fb.solver.model_dump(),
491 eval_errors=fb.eval_errors,
492 structural=fb.structural.model_dump(),
493 numerical=numerical_summary,
494 has_diagnostics_raw_text=bool(fb.diagnostics_raw_text),
495 ),
496 findings=base_findings,
497 )
499 run.summary = summary.model_dump(exclude_none=True)
500 run.save(update_fields=["summary"])
502 for ruleset_id, ruleset_fn in RULESETS:
503 run.append_event("ruleset_start", {"ruleset": ruleset_id})
504 result = ruleset_fn(fb)
505 run.append_event(
506 "ruleset_end",
507 {"ruleset": ruleset_id, "blocked": result.blocked, "reason": result.reason, "data": result.data},
508 )
509 if result.blocked:
510 # "Blocked" means: stop running further rulesets.
511 # We still want the UI to have something clear to show the user about why we stopped.
512 summary.blocked_at = ruleset_id
513 summary.blocked_reason = result.reason
514 blocked_findings = list(summary.findings or [])
515 # "RULESET2B" = outside-bounds check; show actionable findings with suggested fixes
516 if ruleset_id == "RULESET2B" and oob_findings: 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true
517 blocked_findings.extend(oob_findings)
518 else:
519 # For other rulesets we don't have a nicer, user-friendly finding yet,
520 # so show one generic "blocked at ..." item.
521 blocked_findings.append(
522 DiagnosticsFinding(
523 id=ruleset_id,
524 severity="error",
525 title=f"Blocked at {ruleset_id}",
526 description=result.reason or "Blocked by deterministic ruleset",
527 ruleReference=ruleset_id,
528 )
529 )
530 summary.findings = blocked_findings
531 _transition(run, DiagnosticRunStatus.BLOCKED, {"reason": result.reason})
532 run.summary = summary.model_dump(exclude_none=True)
533 run.save(update_fields=["diagnostic_status", "summary"])
534 return run
535 summary.rulesets_passed.append(ruleset_id)
538 run.summary = summary.model_dump(exclude_none=True)
539 run.save(update_fields=["summary"])
541 _transition(run, DiagnosticRunStatus.SUMMARISING)
542 _transition(run, DiagnosticRunStatus.COMPLETE)
543 return run
548def build_failure_bundle_from_payload(
549 solve_payload,
550 *,
551 trigger: str = DiagnosticTrigger.SOLVE_FAILURE,
552) -> FailureBundle:
553 """
554 Convert an IdaesSolveCompletionPayload into a failure bundle used by rulesets.
555 """
556 # Get the diagnostics raw text (try both field names for backwards compat)
557 diagnostics_raw_text = getattr(solve_payload, "diagnostics_raw_text", None)
558 if diagnostics_raw_text is None:
559 diagnostics_raw_text = getattr(solve_payload, "diagnostics_text", None)
561 # Get error message if there is one
562 error = getattr(solve_payload, "error", None) or {}
563 error_message = error.get("message") if isinstance(error, dict) else None
565 # Build a simple text to parse for variables outside bounds
566 # We just use the diagnostics text, or the error message as fallback
567 text_to_parse = diagnostics_raw_text or error_message or ""
569 return FailureBundle(
570 task_id=-1,
571 flowsheet_id=-1,
572 solve_index=getattr(solve_payload, "solve_index", None),
573 trigger=trigger,
574 eval_errors=[error_message] if error_message else None,
575 diagnostics_raw_text=diagnostics_raw_text,
576 structural={},
577 numerical={
578 "variables_outside_bounds": extract_variables_outside_bounds(text_to_parse)
579 },
580 )
583def build_failure_bundle_from_task(task: Task, solve_index: int | None = None) -> FailureBundle:
584 """
585 Build a failure bundle from a Task for diagnostics evaluation.
586 """
587 # Get diagnostics raw text from task debug info
588 debug = task.debug or {}
589 diagnostics_raw_text = debug.get("diagnostics_raw_text")
591 # Get error message if there is one
592 error = task.error or {}
593 error_message = error.get("message") if isinstance(error, dict) else None
595 # Build a simple text to parse for variables outside bounds
596 text_to_parse = diagnostics_raw_text or error_message or ""
598 return FailureBundle(
599 task_id=task.id,
600 flowsheet_id=task.flowsheet_id,
601 solve_index=solve_index,
602 trigger=DiagnosticTrigger.SOLVE_FAILURE,
603 eval_errors=[error_message] if error_message else None,
604 diagnostics_raw_text=diagnostics_raw_text,
605 structural={},
606 numerical={
607 "variables_outside_bounds": extract_variables_outside_bounds(text_to_parse)
608 },
609 )