Coverage for backend/django/diagnostics/orchestrator.py: 63%

230 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-02-12 01:47 +0000

1""" 

2Diagnostics orchestration and parsing utilities. 

3 

4This module is responsible for turning various failure sources (Task failures, 

5IDAES solve payloads, DiagnosticsToolbox blobs) into a stable, deterministic 

6"diagnostic run" representation. 

7 

8Notes on design (the "why"): 

9- We keep a small set of deterministic rulesets/checkpoints that can block early 

10 on obvious structural/numerical issues. 

11- We persist a bounded snapshot into `DiagnosticRun.summary` so the UI can show 

12 evidence without relying on short lived logs. 

13""" 

14 

15from dataclasses import dataclass 

16 

17from django.db import transaction 

18from django.db.models import Prefetch 

19from django.utils import timezone 

20 

21from pydantic import BaseModel, Field, JsonValue 

22 

23from diagnostics.rules import evaluate_property_rules 

24from diagnostics.schemas import DiagnosticsFinding 

25from common.config_types import ObjectType 

26from flowsheetInternals.unitops.config.config_methods import get_object_schema 

27from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

28 

29from core.auxiliary.models.PropertyInfo import PropertyInfo 

30from core.auxiliary.models.PropertyValue import PropertyValue 

31 

32from diagnostics.models import DiagnosticRun 

33from diagnostics.constants import ( 

34 DiagnosticRunStatus, 

35 DiagnosticTrigger, 

36 MAX_VARIABLES_OUTSIDE_BOUNDS_IN_RULESET_DATA, 

37 TERMINAL_STATES, 

38 MAX_VARIABLES_OUTSIDE_BOUNDS_IN_SUMMARY, 

39) 

40from core.auxiliary.models.Task import Task 

41from diagnostics.failure_bundle import FailureBundle, VariableOutsideBounds 

42from diagnostics.parsers import extract_variables_outside_bounds 

43 

44class FailureBundleSummary(BaseModel): 

45 """ 

46 Snapshot of failure evidence stored in `DiagnosticRun.summary`. 

47 

48 I keep this intentionally small so the DB doesn't fill up with noisy, 

49 unstable upstream payloads. 

50 """ 

51 

52 task_id: int 

53 flowsheet_id: int 

54 solve_index: int | None = None 

55 trigger: str 

56 scenario_id: int | None = None 

57 

58 solver: JsonValue = Field(default_factory=dict) 

59 eval_errors: list[str] | None = None 

60 structural: JsonValue = Field(default_factory=dict) 

61 numerical: JsonValue = Field(default_factory=dict) 

62 

63 has_diagnostics_raw_text: bool = False 

64 

65 

66class DiagnosticRunSummary(BaseModel): 

67 """ 

68 The JSON stored in `DiagnosticRun.summary`. 

69 

70 I use "rulesets" because it's more obvious what this is when you read the 

71 code. 

72 """ 

73 

74 rulesets_passed: list[str] = Field(default_factory=list) 

75 failure_bundle: FailureBundleSummary 

76 findings: list[DiagnosticsFinding] = Field(default_factory=list) 

77 

78 blocked_at: str | None = None 

79 blocked_reason: str | None = None 

80 

81 

82@dataclass 

83class RuleSetResult: 

84 blocked: bool 

85 reason: str | None = None 

86 # Ruleset-specific metadata. We keep this as JSON-ish because each 

87 # ruleset can attach different evidence to show in events. 

88 data: JsonValue | None = None 

89 

90 

91 

92def _ruleset1_eval_errors(bundle: FailureBundle) -> RuleSetResult: 

93 # Record eval_errors as evidence but don't block — later rulesets (structural, OOB) 

94 # catch specific actionable issues. Blocking here was too aggressive for generic errors. 

95 if bundle.eval_errors: 

96 return RuleSetResult(False, data={"eval_errors": bundle.eval_errors}) 

97 return RuleSetResult(False) 

98 

99 

100# TODO: confirm this structural gate belongs in RULESET2 (or should move). 

101def _ruleset2_structural(bundle: FailureBundle) -> RuleSetResult: 

102 # Structural problems (DoF, unit consistency) are not fixable by a single value, 

103 # so we block early when we detect them. 

104 dof = bundle.structural.dof 

105 if dof is not None and dof != 0: 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true

106 return RuleSetResult(True, reason="structural_dof_nonzero", data={"dof": dof}) 

107 # TODO: Over/under-constrained sets are printed as text by DiagnosticsToolbox. 

108 # We don't parse those lines yet, so these fields are currently None. 

109 # Plan: add a parser, then enable the check below. 

110 # over = bundle.structural.overconstrained 

111 # under = bundle.structural.underconstrained 

112 # if over or under: 

113 # return RuleSetResult(True, reason="structural_issue", data={"over": over, "under": under}) 

114 if bundle.structural.inconsistent_units: 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true

115 return RuleSetResult( 

116 True, 

117 reason="inconsistent_units", 

118 data={"inconsistent_units": bundle.structural.inconsistent_units}, 

119 ) 

120 return RuleSetResult(False) 

121 

122 

123def _ruleset_outside_bounds(bundle: FailureBundle) -> RuleSetResult: 

124 """ 

125 Block immediately when DiagnosticsToolbox reports variables at/outside bounds. 

126 

127 This captures cases like: 

128 "model contains a trivially infeasible variable '...pressure' (fixed value ... outside bounds [...])" 

129 """ 

130 vars_oob = bundle.numerical.variables_outside_bounds or [] 

131 if vars_oob: 

132 # Cap ruleset event payload so we don't store huge blobs. 

133 cap = [ 

134 v.model_dump(exclude_none=True) 

135 for v in vars_oob[:MAX_VARIABLES_OUTSIDE_BOUNDS_IN_RULESET_DATA] 

136 ] 

137 return RuleSetResult( 

138 True, 

139 reason="variables_outside_bounds", 

140 data={ 

141 "variables_outside_bounds": cap, 

142 "truncated": len(vars_oob) > len(cap), 

143 }, 

144 ) 

145 return RuleSetResult(False) 

146 

147 

148def _ruleset3_rule_violations(bundle: FailureBundle) -> RuleSetResult: 

149 # Placeholder for future "policy" enforcement (site / project rules). 

150 return RuleSetResult(False) 

151 

152 

153def _ruleset4_numerical(bundle: FailureBundle) -> RuleSetResult: 

154 # Numerical triage: allow completion but emit findings candidates. 

155 return RuleSetResult(False, data={"numerical": bundle.numerical.model_dump()}) 

156 

157 

158def guess_property_key_from_var_name(var_name: str) -> str | None: 

159 """Extract property key from IDAES variable names like '...properties_in[0.0].pressure' -> 'pressure'.""" 

160 # This is a best-effort heuristic; if the toolbox output changes format we 

161 # should update the tests and tweak this rather than making it "clever". 

162 if "." not in var_name: 

163 return None 

164 return var_name.split(".")[-1].strip() or None 

165 

166 

167def _is_container_object(obj: SimulationObject) -> bool: 

168 return obj.objectType in ("group", "Grouping") 

169 

170 

171def _load_flowsheet_objects(flowsheet_id: int) -> list[SimulationObject]: 

172 # Prefetch aggressively here to keep rule evaluation from doing N+1 queries 

173 # when we walk objects -> properties -> values. 

174 # 

175 # Important: when prefetching a one-to-many relation, you must also 

176 # `select_related()` inside the Prefetch queryset for any FK/OneToOne fields 

177 # you access during evaluation (see SimulationObjectViewSet / GraphicObjectViewSet). 

178 return list( 

179 SimulationObject.objects.filter(flowsheet_id=flowsheet_id, is_deleted=False) 

180 .select_related("properties") 

181 .prefetch_related( 

182 Prefetch( 

183 "properties__ContainedProperties", 

184 queryset=PropertyInfo.objects.select_related("recycleConnection").prefetch_related( 

185 Prefetch( 

186 "values", 

187 queryset=PropertyValue.objects.select_related( 

188 "controlManipulated", 

189 "controlSetPoint", 

190 ).prefetch_related("indexedItems"), 

191 ) 

192 ), 

193 ), 

194 "ports__stream__connectedPorts__unitOp", 

195 ) 

196 ) 

197 

198 

199def _get_object_schema_cached(obj: SimulationObject) -> ObjectType | None: 

200 cached = getattr(obj, "_diagnostics_schema", None) 

201 if cached is not None: 

202 return cached 

203 try: 

204 schema = get_object_schema(obj) 

205 except Exception: 

206 schema = None 

207 setattr(obj, "_diagnostics_schema", schema) 

208 return schema 

209 

210 

211def _allowed_property_keys(obj: SimulationObject) -> set[str]: 

212 cached = getattr(obj, "_diagnostics_allowed_keys", None) 

213 if cached is not None: 

214 return cached 

215 schema = _get_object_schema_cached(obj) 

216 # `get_object_schema` is not cheap; caching the allowed keys avoids repeated 

217 # schema calls when we're iterating many variables/findings. 

218 keys = set(schema.properties.keys()) if schema else set() 

219 setattr(obj, "_diagnostics_allowed_keys", keys) 

220 return keys 

221 

222 

223def _build_rule_findings(objects: list[SimulationObject]) -> list[DiagnosticsFinding]: 

224 # The rules engine already produces the "frontend shape". I still wrap it in 

225 # a model here so the orchestrator doesn't pass around anonymous dicts. 

226 findings: list[DiagnosticsFinding] = [] 

227 for obj in objects: 

228 if _is_container_object(obj): 

229 continue 

230 for raw in evaluate_property_rules(obj): 230 ↛ 233line 230 didn't jump to line 233 because the loop on line 230 never started

231 # Convert RuleFinding (Pydantic) to DiagnosticsFinding (Pydantic) 

232 # using model_dump() to avoid type mismatch issues 

233 findings.append(DiagnosticsFinding.model_validate(raw.model_dump(exclude_none=True))) 

234 return findings 

235 

236 

237def _match_object_for_var(var_name: str, objects: list[SimulationObject]) -> SimulationObject | None: 

238 # Another best-effort heuristic: 

239 # - sometimes the var string includes the componentName 

240 # - otherwise the DB id often shows up as `_{id}` 

241 if not var_name: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true

242 return None 

243 for obj in objects: 

244 if obj.componentName and obj.componentName in var_name: 244 ↛ 245line 244 didn't jump to line 245 because the condition on line 244 was never true

245 return obj 

246 if f"_{obj.id}" in var_name: 246 ↛ 247line 246 didn't jump to line 247 because the condition on line 246 was never true

247 return obj 

248 return None 

249 

250 

251def _build_bounds_findings( 

252 objects: list[SimulationObject], 

253 variables: list[VariableOutsideBounds], 

254) -> list[DiagnosticsFinding]: 

255 """ 

256 Turn out-of-bounds variables into UI-friendly findings. 

257 

258 I iterate through each variable the toolbox flagged, try to match it back 

259 to a SimulationObject/property, and emit a finding with a suggested fix. 

260 If I can't confidently match, I skip it rather than showing garbage. 

261 """ 

262 if not variables: 

263 return [] 

264 

265 findings: list[DiagnosticsFinding] = [] 

266 for var in variables: 

267 name = var.name 

268 if not name: 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true

269 continue 

270 

271 # Try to figure out which object this variable belongs to. The IDAES 

272 # variable names are messy (e.g. "unit.properties_in[0.0].pressure"), 

273 # so this is best-effort heuristics. 

274 obj = _match_object_for_var(name, objects) 

275 if not obj or _is_container_object(obj) or not getattr(obj, "properties", None): 275 ↛ 279line 275 didn't jump to line 279 because the condition on line 275 was always true

276 continue 

277 

278 # Extract the property key from the variable name (e.g. "pressure"). 

279 prop_key = guess_property_key_from_var_name(name) 

280 if not prop_key: 

281 continue 

282 

283 # Only show findings for properties that actually exist in the schema. 

284 # This avoids confusing the user with internal IDAES variables. 

285 if prop_key not in _allowed_property_keys(obj): 

286 continue 

287 

288 prop = obj.properties.ContainedProperties.filter(key=prop_key).first() 

289 if not prop: 

290 continue 

291 

292 # Figure out which bound was violated so I can suggest the right value. 

293 try: 

294 value = float(var.value) 

295 except (TypeError, ValueError): 

296 value = None 

297 try: 

298 lower = float(var.lower) 

299 except (TypeError, ValueError): 

300 lower = None 

301 try: 

302 upper = float(var.upper) 

303 except (TypeError, ValueError): 

304 upper = None 

305 suggested = None 

306 if value is not None and lower is not None and value < lower: 

307 suggested = lower 

308 elif value is not None and upper is not None and value > upper: 

309 suggested = upper 

310 if suggested is None: 

311 continue 

312 

313 # Check if the property is formula-controlled or indexed. This affects 

314 # how I phrase the suggested fix — I can't just say "set it" if it's 

315 # computed from a formula. 

316 values = list(prop.values.all()) 

317 has_formula = any(v.formula for v in values) 

318 is_scalar = len(values) == 1 and not values[0].indexedItems.exists() 

319 display_name = prop.displayName or prop_key 

320 

321 finding = DiagnosticsFinding( 

322 id=f"IDAES_BOUNDS:{name}", 

323 severity="error", 

324 title="Value outside IDAES bounds", 

325 description=( 

326 f"{display_name} is fixed to {var.value} outside bounds " 

327 f"[{var.lower}, {var.upper}]." 

328 ), 

329 ruleReference="IDAES_BOUNDS_CHECK", 

330 componentName=obj.componentName, 

331 componentId=obj.id, 

332 propertyKey=prop_key, 

333 propertyId=prop.id, 

334 suggestedValue=suggested, 

335 ) 

336 

337 # Tailor the fix suggestion based on whether the property is editable. 

338 if has_formula: 

339 finding.suggestedFix = ( 

340 f"{display_name} is formula-controlled. Update the formula to move toward {suggested}." 

341 ) 

342 elif is_scalar: 

343 # For simple scalar properties, I can offer a one-click fix action. 

344 finding.suggestedFix = f"Set {display_name} to {suggested}" 

345 finding.fixAction = { 

346 "kind": "property_value", 

347 "propertyId": prop.id, 

348 "suggestedValue": suggested, 

349 } 

350 

351 findings.append(finding) 

352 

353 return findings 

354 

355 

356# Rulesets are deterministic checkpoints that run sequentially after a solve failure. 

357# They mirror the IDAES DiagnosticsToolbox workflow: structural checks first, then numerical. 

358# IDs are stable strings persisted into `DiagnosticRun.summary` and events for backwards compat. 

359# 

360# RULESET1: Evaluation errors - records error messages as evidence but does NOT block. 

361# Later rulesets (structural, OOB) catch specific actionable issues. 

362# 

363# RULESET2: Structural issues - verifies the model is "square" (zero degrees of freedom). 

364# Non-zero DOF means the model is over/under-constrained and can't be solved. 

365# Also catches unit inconsistencies (e.g., mixing kg and lbs). 

366# 

367# RULESET2B: Variables outside bounds - flags variables where the fixed value violates 

368# IDAES property bounds (e.g., pressure fixed to -100 when bounds are [0, 1e8]). 

369# These cause "trivially infeasible" errors and are often user input mistakes. 

370# 

371# RULESET3: Policy/rule violations - placeholder for site/project-specific constraints 

372# (e.g., "temperature must be < 500K for this equipment type"). 

373# 

374# RULESET4: Numerical triage - catches remaining numerical warnings (e.g., poorly scaled 

375# variables, near-singular Jacobian) but doesn't block; just surfaces findings. 

376RULESETS = [ 

377 ("RULESET1", _ruleset1_eval_errors), 

378 ("RULESET2", _ruleset2_structural), 

379 ("RULESET2B", _ruleset_outside_bounds), 

380 ("RULESET3", _ruleset3_rule_violations), 

381 ("RULESET4", _ruleset4_numerical), 

382] 

383 

384 

385def _transition(run: DiagnosticRun, state: str, data: JsonValue | None = None) -> None: 

386 # State transitions are always recorded as events so the UI can show progress 

387 # and we can debug what happened later. 

388 run.diagnostic_status = state 

389 if state in (DiagnosticRunStatus.COLLECTING, DiagnosticRunStatus.GATING) and run.started_at is None: 

390 run.started_at = timezone.now() 

391 if state in TERMINAL_STATES: 

392 run.completed_at = timezone.now() 

393 run.save(update_fields=["diagnostic_status", "started_at", "completed_at", "updated_at"]) 

394 run.append_event("state_transition", {"state": state, **(data or {})}) 

395 

396 

397def run_diagnostics_for_task(task: Task, failure_bundle: FailureBundle, trigger: str = DiagnosticTrigger.SOLVE_FAILURE) -> DiagnosticRun: 

398 """ 

399 Run deterministic rulesets for a task failure and persist DiagnosticRun/Events. 

400 """ 

401 # I keep the orchestrator model-first: 

402 # `failure_bundle` is already validated, and every "finding" we emit is typed 

403 # until the moment we dump JSON into the DB/event stream. 

404 # `failure_bundle` comes from `build_failure_bundle_from_payload()` / 

405 # `build_failure_bundle_from_task()` and is already validated. 

406 fb = failure_bundle.model_copy(update={"task_id": task.id, "flowsheet_id": task.flowsheet_id, "trigger": trigger}) 

407 

408 summary: DiagnosticRunSummary | None = None 

409 

410 with transaction.atomic(): 

411 run = DiagnosticRun.objects.create( 

412 flowsheet=task.flowsheet, 

413 task=task, 

414 diagnostic_status=DiagnosticRunStatus.CREATED, 

415 trigger=trigger, 

416 ) 

417 run.append_event("created", {"trigger": trigger}) 

418 

419 # COLLECTING and GATING states transition back-to-back with no work in between. 

420 # COLLECTING is a placeholder for future async data gathering (e.g., fetching 

421 # historical solve logs, pulling in related flowsheet context). 

422 # For now, it's kept to maintain state machine consistency and provide a hook 

423 # for future enhancements without requiring migration changes. 

424 _transition(run, DiagnosticRunStatus.COLLECTING) 

425 _transition(run, DiagnosticRunStatus.GATING) 

426 

427 scenario_id = None 

428 if isinstance(task.debug, dict): 

429 scenario_id = task.debug.get("scenario_id") 

430 

431 objects = _load_flowsheet_objects(task.flowsheet_id) 

432 

433 # "Base" findings are deterministic rule warnings/errors for objects/properties. 

434 # These are safe to show even when we later block early on a ruleset. 

435 base_findings: list[DiagnosticsFinding] = [] 

436 # OOB findings are different: they are specific "variable is outside bounds" items 

437 # that we only attach when the outside-bounds ruleset is what blocks the run. 

438 oob_findings: list[DiagnosticsFinding] = [] 

439 try: 

440 base_findings.extend(_build_rule_findings(objects)) 

441 except Exception as exc: 

442 # Don't crash the whole run if findings generation breaks. 

443 # Keep going with empty findings, but surface a warning for debugging/UI. 

444 base_findings.append( 

445 DiagnosticsFinding( 

446 id="FINDINGS_ERROR:base_findings", 

447 severity="warning", 

448 title="Couldn't generate diagnostic findings", 

449 description=f"Base findings failed: {exc}", 

450 ruleReference="FINDINGS_GENERATION_ERROR", 

451 ) 

452 ) 

453 

454 try: 

455 oob_findings = _build_bounds_findings(objects, fb.numerical.variables_outside_bounds or []) 

456 except Exception as exc: 

457 # Bounds findings only show up for the outside-bounds ruleset; add a warning to the 

458 # main findings list so the failure is still visible in the summary. 

459 base_findings.append( 

460 DiagnosticsFinding( 

461 id="FINDINGS_ERROR:oob_findings", 

462 severity="warning", 

463 title="Couldn't generate bounds findings", 

464 description=f"Bounds findings failed: {exc}", 

465 ruleReference="FINDINGS_GENERATION_ERROR", 

466 ) 

467 ) 

468 

469 

470 

471 # Persist a stable, bounded snapshot of the failure bundle into summary 

472 # so the Diagnostics tab can show evidence without relying on Task.log. 

473 # We cap large lists here to keep DB storage + API payload size reasonable. 

474 # The UI can scroll, but we still don't want to store/ship huge blobs. 

475 numerical_summary = fb.numerical.model_dump() 

476 vars_oob = numerical_summary.get("variables_outside_bounds") or [] 

477 if len(vars_oob) > MAX_VARIABLES_OUTSIDE_BOUNDS_IN_SUMMARY: 477 ↛ 478line 477 didn't jump to line 478 because the condition on line 477 was never true

478 numerical_summary["variables_outside_bounds"] = vars_oob[:MAX_VARIABLES_OUTSIDE_BOUNDS_IN_SUMMARY] 

479 numerical_summary["variables_outside_bounds_truncated"] = True 

480 numerical_summary["variables_outside_bounds_total"] = len(vars_oob) 

481 

482 summary = DiagnosticRunSummary( 

483 rulesets_passed=[], 

484 failure_bundle=FailureBundleSummary( 

485 task_id=fb.task_id, 

486 flowsheet_id=fb.flowsheet_id, 

487 solve_index=fb.solve_index, 

488 trigger=fb.trigger, 

489 scenario_id=scenario_id, 

490 solver=fb.solver.model_dump(), 

491 eval_errors=fb.eval_errors, 

492 structural=fb.structural.model_dump(), 

493 numerical=numerical_summary, 

494 has_diagnostics_raw_text=bool(fb.diagnostics_raw_text), 

495 ), 

496 findings=base_findings, 

497 ) 

498 

499 run.summary = summary.model_dump(exclude_none=True) 

500 run.save(update_fields=["summary"]) 

501 

502 for ruleset_id, ruleset_fn in RULESETS: 

503 run.append_event("ruleset_start", {"ruleset": ruleset_id}) 

504 result = ruleset_fn(fb) 

505 run.append_event( 

506 "ruleset_end", 

507 {"ruleset": ruleset_id, "blocked": result.blocked, "reason": result.reason, "data": result.data}, 

508 ) 

509 if result.blocked: 

510 # "Blocked" means: stop running further rulesets. 

511 # We still want the UI to have something clear to show the user about why we stopped. 

512 summary.blocked_at = ruleset_id 

513 summary.blocked_reason = result.reason 

514 blocked_findings = list(summary.findings or []) 

515 # "RULESET2B" = outside-bounds check; show actionable findings with suggested fixes 

516 if ruleset_id == "RULESET2B" and oob_findings: 516 ↛ 517line 516 didn't jump to line 517 because the condition on line 516 was never true

517 blocked_findings.extend(oob_findings) 

518 else: 

519 # For other rulesets we don't have a nicer, user-friendly finding yet, 

520 # so show one generic "blocked at ..." item. 

521 blocked_findings.append( 

522 DiagnosticsFinding( 

523 id=ruleset_id, 

524 severity="error", 

525 title=f"Blocked at {ruleset_id}", 

526 description=result.reason or "Blocked by deterministic ruleset", 

527 ruleReference=ruleset_id, 

528 ) 

529 ) 

530 summary.findings = blocked_findings 

531 _transition(run, DiagnosticRunStatus.BLOCKED, {"reason": result.reason}) 

532 run.summary = summary.model_dump(exclude_none=True) 

533 run.save(update_fields=["diagnostic_status", "summary"]) 

534 return run 

535 summary.rulesets_passed.append(ruleset_id) 

536 

537 

538 run.summary = summary.model_dump(exclude_none=True) 

539 run.save(update_fields=["summary"]) 

540 

541 _transition(run, DiagnosticRunStatus.SUMMARISING) 

542 _transition(run, DiagnosticRunStatus.COMPLETE) 

543 return run 

544 

545 

546 

547 

548def build_failure_bundle_from_payload( 

549 solve_payload, 

550 *, 

551 trigger: str = DiagnosticTrigger.SOLVE_FAILURE, 

552) -> FailureBundle: 

553 """ 

554 Convert an IdaesSolveCompletionPayload into a failure bundle used by rulesets. 

555 """ 

556 # Get the diagnostics raw text (try both field names for backwards compat) 

557 diagnostics_raw_text = getattr(solve_payload, "diagnostics_raw_text", None) 

558 if diagnostics_raw_text is None: 

559 diagnostics_raw_text = getattr(solve_payload, "diagnostics_text", None) 

560 

561 # Get error message if there is one 

562 error = getattr(solve_payload, "error", None) or {} 

563 error_message = error.get("message") if isinstance(error, dict) else None 

564 

565 # Build a simple text to parse for variables outside bounds 

566 # We just use the diagnostics text, or the error message as fallback 

567 text_to_parse = diagnostics_raw_text or error_message or "" 

568 

569 return FailureBundle( 

570 task_id=-1, 

571 flowsheet_id=-1, 

572 solve_index=getattr(solve_payload, "solve_index", None), 

573 trigger=trigger, 

574 eval_errors=[error_message] if error_message else None, 

575 diagnostics_raw_text=diagnostics_raw_text, 

576 structural={}, 

577 numerical={ 

578 "variables_outside_bounds": extract_variables_outside_bounds(text_to_parse) 

579 }, 

580 ) 

581 

582 

583def build_failure_bundle_from_task(task: Task, solve_index: int | None = None) -> FailureBundle: 

584 """ 

585 Build a failure bundle from a Task for diagnostics evaluation. 

586 """ 

587 # Get diagnostics raw text from task debug info 

588 debug = task.debug or {} 

589 diagnostics_raw_text = debug.get("diagnostics_raw_text") 

590 

591 # Get error message if there is one 

592 error = task.error or {} 

593 error_message = error.get("message") if isinstance(error, dict) else None 

594 

595 # Build a simple text to parse for variables outside bounds 

596 text_to_parse = diagnostics_raw_text or error_message or "" 

597 

598 return FailureBundle( 

599 task_id=task.id, 

600 flowsheet_id=task.flowsheet_id, 

601 solve_index=solve_index, 

602 trigger=DiagnosticTrigger.SOLVE_FAILURE, 

603 eval_errors=[error_message] if error_message else None, 

604 diagnostics_raw_text=diagnostics_raw_text, 

605 structural={}, 

606 numerical={ 

607 "variables_outside_bounds": extract_variables_outside_bounds(text_to_parse) 

608 }, 

609 )