Coverage for backend/django/Economics/costing/capital/generated_lines.py: 86%

278 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1"""Generated capital-line synchronization for capital costing. 

2 

3This module materializes generated ``CapitalCostLine`` rows before 

4fingerprinting and result calculation. Rows are created as soon as an equipment 

5mapping exists so work-capable units can expose editable peak demand even before 

6the user has selected a cost curve. 

7""" 

8 

9from __future__ import annotations 

10 

11from decimal import Decimal, DecimalException 

12from typing import Any, TypedDict 

13 

14from core.auxiliary.enums import ConType 

15from Economics.costing.models import CapitalCostLine, CostCurve, CostDriver, EquipmentMapping 

16 

17from Economics.shared.choices import CostBasis, CostDriverSource 

18 

19from Economics.studies.models import EconomicsStudy 

20from Economics.costing.capital.capital_line_sources import GENERATED_CAPITAL_LINE_SOURCE 

21from Economics.costing.cost_curves.driver_specs import ( 

22 CapitalCostDriverInput, 

23 CapitalCostDriverInputsPayload, 

24 CostCurveDriverSpec, 

25 CostCurveDriverSpecPayload, 

26 default_driver_inputs_payload, 

27 driver_specs_payload, 

28 normalize_capital_cost_driver_inputs, 

29 parse_required_driver_specs, 

30) 

31from Economics.costing.cost_curves.evaluation import ( 

32 CostCurveEvaluationError, 

33 cost_curve_units_compatible, 

34 evaluate_cost_curve, 

35 normalize_economics_unit_notation, 

36) 

37from Economics.costing.capital.electrical_upgrade import unit_work_peak_demand_kw 

38from Economics.costing.cost_curves.driver_properties import validate_cost_driver_property 

39from Economics.formulas.builders.capital import build_generated_capital_line_formula 

40from Economics.formulas.engine.core import FormulaError 

41from Economics.shared.payloads import json_ready, result_amount, warning_record 

42from idaes_factory.unit_conversion.unit_conversion import convert_value 

43from pint.errors import PintError 

44 

45 

46class ResolvedDriverInput(TypedDict): 

47 value: Decimal | float | int | str 

48 unit: str 

49 

50 

51def _sync_generated_capital_lines(study: EconomicsStudy) -> bool: 

52 """Materialize generated capital lines before result fingerprinting. 

53 

54 Fingerprints include capital line rows, so generated rows have to be updated 

55 first or the dependency comparison would describe stale generated costs. 

56 """ 

57 changed = False 

58 mappings = ( 

59 EquipmentMapping.objects.filter( 

60 flowsheet=study.flowsheet, 

61 costable_item__study=study, 

62 costable_item__simulation_object__is_deleted=False, 

63 ) 

64 .select_related( 

65 "costable_item", 

66 "costable_item__cost_driver", 

67 "costable_item__cost_driver__property_info", 

68 "cost_curve", 

69 ) 

70 .order_by("pk") 

71 ) 

72 active_costable_item_ids = set() 

73 for mapping in mappings: 

74 active_costable_item_ids.add(mapping.costable_item_id) 

75 changed = _sync_generated_capital_line(mapping) or changed 

76 stale_lines = CapitalCostLine.objects.filter( 

77 flowsheet=study.flowsheet, 

78 study=study, 

79 source=GENERATED_CAPITAL_LINE_SOURCE, 

80 ) 

81 if active_costable_item_ids: 

82 stale_lines = stale_lines.exclude(costable_item_id__in=active_costable_item_ids) 

83 deleted_count, _ = stale_lines.delete() 

84 return changed or deleted_count > 0 

85 

86 

87def _sync_generated_capital_line(mapping: EquipmentMapping) -> bool: 

88 """Create or update the single generated capital line for one equipment mapping.""" 

89 driver = getattr(mapping.costable_item, "cost_driver", None) 

90 generated_lines = list( 

91 CapitalCostLine.objects.filter( 

92 flowsheet=mapping.flowsheet, 

93 study=mapping.costable_item.study, 

94 costable_item=mapping.costable_item, 

95 source=GENERATED_CAPITAL_LINE_SOURCE, 

96 ).order_by("pk") 

97 ) 

98 line = generated_lines[0] if generated_lines else None 

99 changed = False 

100 for duplicate in generated_lines[1:]: 100 ↛ 101line 100 didn't jump to line 101 because the loop on line 100 never started

101 duplicate.delete() 

102 changed = True 

103 

104 fields = _generated_capital_line_fields(mapping=mapping, driver=driver, existing_line=line) 

105 if line is None: 

106 fields["peak_demand_kw"] = fields.get("minimum_peak_demand_kw") 

107 CapitalCostLine.objects.create( 

108 flowsheet=mapping.flowsheet, 

109 study=mapping.costable_item.study, 

110 costable_item=mapping.costable_item, 

111 **fields, 

112 ) 

113 return True 

114 

115 minimum_peak_demand_kw = fields.get("minimum_peak_demand_kw") 

116 selected_peak_demand_kw = line.peak_demand_kw 

117 previous_minimum_peak_demand_kw = line.minimum_peak_demand_kw 

118 selected_peak_demand_is_auto = selected_peak_demand_kw == previous_minimum_peak_demand_kw 

119 if minimum_peak_demand_kw is None: 

120 fields["peak_demand_kw"] = None 

121 elif ( 

122 selected_peak_demand_kw is None 

123 or selected_peak_demand_is_auto 

124 or selected_peak_demand_kw < minimum_peak_demand_kw 

125 ): 

126 fields["peak_demand_kw"] = minimum_peak_demand_kw 

127 

128 changed_fields = [] 

129 for field_name, value in fields.items(): 

130 if getattr(line, field_name) != value: 

131 setattr(line, field_name, value) 

132 changed_fields.append(field_name) 

133 if changed_fields: 

134 line.save(update_fields=[*changed_fields, "updated_at"]) 

135 changed = True 

136 return changed 

137 

138 

139def sync_generated_capital_lines_for_property(property_info) -> list[EconomicsStudy]: 

140 """Refresh generated economics rows affected by an edited flowsheet property.""" 

141 property_set = getattr(property_info, "set", None) 

142 simulation_object = getattr(property_set, "simulationObject", None) 

143 changed_studies = [] 

144 study_ids: set[int] = set() 

145 flowsheet_studies = EconomicsStudy.objects.filter(flowsheet=property_info.flowsheet) 

146 if simulation_object is not None: 

147 study_ids.update( 

148 flowsheet_studies.filter( 

149 costable_items__simulation_object=simulation_object, 

150 ).values_list("pk", flat=True) 

151 ) 

152 connected_unit_ids = simulation_object.connectedPorts.filter( 

153 unitOp__isnull=False, 

154 ).values_list("unitOp_id", flat=True) 

155 study_ids.update( 

156 flowsheet_studies.filter( 

157 costable_items__simulation_object_id__in=connected_unit_ids, 

158 ).values_list("pk", flat=True) 

159 ) 

160 referenced_study_ids = set() 

161 for line in CapitalCostLine.objects.filter( 

162 flowsheet=property_info.flowsheet, 

163 source=GENERATED_CAPITAL_LINE_SOURCE, 

164 ).only("study_id", "driver_inputs"): 

165 if _driver_inputs_reference_property(line.driver_inputs, property_info.pk): 

166 referenced_study_ids.add(line.study_id) 

167 input_studies = EconomicsStudy.objects.filter( 

168 pk__in=referenced_study_ids, 

169 flowsheet=property_info.flowsheet, 

170 ) 

171 study_ids.update(input_studies.values_list("pk", flat=True)) 

172 for study in flowsheet_studies.filter(pk__in=study_ids).order_by("pk"): 

173 if _sync_generated_capital_lines(study): 173 ↛ 172line 173 didn't jump to line 172 because the condition on line 173 was always true

174 changed_studies.append(study) 

175 return changed_studies 

176 

177 

178def _generated_capital_line_fields( 

179 *, 

180 mapping: EquipmentMapping, 

181 driver: CostDriver | None, 

182 existing_line: CapitalCostLine | None = None, 

183) -> dict[str, Any]: 

184 """Return persisted fields for the generated capital line tied to one mapping.""" 

185 curve = mapping.cost_curve 

186 minimum_peak_demand_kw = unit_work_peak_demand_kw(mapping.costable_item) 

187 if curve is None: 

188 return _pending_generated_capital_line_fields( 

189 mapping=mapping, 

190 minimum_peak_demand_kw=minimum_peak_demand_kw, 

191 ) 

192 

193 warning_payload: dict[str, Any] = { 

194 "calculation_method": "cost_curve", 

195 "cost_curve_id": curve.pk, 

196 "cost_curve_key": curve.curve_key, 

197 "cost_basis": curve.cost_basis, 

198 "required_driver_specs": _curve_required_driver_specs_payload(curve), 

199 "minimum_peak_demand_kw": None if minimum_peak_demand_kw is None else str(minimum_peak_demand_kw), 

200 "warnings": [], 

201 } 

202 amount = None 

203 confidence = "calculated" 

204 try: 

205 driver_inputs = _reconciled_driver_inputs( 

206 curve=curve, 

207 existing_inputs=( 

208 normalize_capital_cost_driver_inputs(existing_line.driver_inputs) 

209 if existing_line is not None and existing_line.driver_inputs 

210 else {} 

211 ), 

212 ) 

213 resolved_inputs = _resolved_driver_inputs( 

214 curve=curve, 

215 driver_inputs=driver_inputs, 

216 mapping=mapping, 

217 ) 

218 generated_formula = build_generated_capital_line_formula( 

219 mapping, 

220 driver=driver, 

221 existing_line=existing_line, 

222 ) 

223 evaluation = evaluate_cost_curve( 

224 curve, 

225 inputs_by_key=resolved_inputs, 

226 apply_installation_factor=generated_formula.applies_lang_factor, 

227 ) 

228 base_amount = evaluation.amount 

229 if base_amount < 0: 

230 raise CostCurveEvaluationError( 

231 "negative_cost_output", 

232 "Cost curve evaluated to a negative cost.", 

233 context={"curve_key": curve.curve_key, "amount": str(base_amount), "output_unit": curve.output_unit}, 

234 ) 

235 factor_rows = [ 

236 _capital_factor_row( 

237 kind="base_curve_cost", 

238 label="Curve base cost", 

239 amount=base_amount, 

240 detail=_base_curve_detail(evaluation), 

241 ) 

242 ] 

243 indexed_amount = base_amount * generated_formula.index_adjustment.factor 

244 purchase_basis_amount = indexed_amount if curve.cost_basis == CostBasis.PURCHASE else Decimal("0") 

245 factor_rows.append( 

246 _capital_factor_row( 

247 kind="index_adjustment", 

248 label="CPI/index adjustment", 

249 amount=indexed_amount, 

250 factor=generated_formula.index_adjustment.factor, 

251 detail=generated_formula.index_adjustment.detail, 

252 ) 

253 ) 

254 installed_basis_amount = indexed_amount if curve.cost_basis == CostBasis.INSTALLED else Decimal("0") 

255 uplift_base_amount = indexed_amount 

256 if generated_formula.applies_lang_factor and generated_formula.lang_factor is not None: 256 ↛ 268line 256 didn't jump to line 268 because the condition on line 256 was always true

257 uplift_base_amount = indexed_amount * generated_formula.lang_factor 

258 installed_basis_amount = uplift_base_amount 

259 factor_rows.append( 

260 _capital_factor_row( 

261 kind="lang_factor", 

262 label="Lang factor", 

263 amount=uplift_base_amount, 

264 factor=generated_formula.lang_factor, 

265 detail=generated_formula.lang_factor_source, 

266 ) 

267 ) 

268 contingency_percent = generated_formula.contingency_percent 

269 contingency_amount = uplift_base_amount * (contingency_percent / Decimal("100")) 

270 amount = uplift_base_amount * generated_formula.contingency_factor 

271 factor_rows.append( 

272 _capital_factor_row( 

273 kind="contingency", 

274 label="Contingency", 

275 amount=amount, 

276 factor=generated_formula.contingency_factor, 

277 percent=contingency_percent, 

278 ) 

279 ) 

280 amount = result_amount(amount) 

281 warning_payload.update( 

282 { 

283 "input_value": None if evaluation.input_value is None else str(evaluation.input_value), 

284 "input_unit": evaluation.input_unit, 

285 "normalized_inputs": evaluation.normalized_inputs, 

286 "selected_variant": evaluation.selected_variant, 

287 "selector_diagnostics": list(evaluation.selector_diagnostics), 

288 "base_amount": str(result_amount(evaluation.amount)), 

289 "index_factor": str(generated_formula.index_adjustment.factor), 

290 "lang_factor": ( 

291 None 

292 if generated_formula.lang_factor is None 

293 else str(generated_formula.lang_factor) 

294 ), 

295 "lang_factor_source": generated_formula.lang_factor_source, 

296 "purchase_basis_amount": str(result_amount(purchase_basis_amount)), 

297 "installed_basis_amount": str(result_amount(installed_basis_amount)), 

298 "contingency_percent": str(contingency_percent), 

299 "contingency_amount": str(result_amount(contingency_amount)), 

300 "amount": None if amount is None else str(amount), 

301 "output_unit": evaluation.output_unit, 

302 "capital_factors": factor_rows, 

303 } 

304 ) 

305 warning_payload["warnings"].extend(evaluation.warnings_payload()) 

306 warning_payload["warnings"].extend( 

307 _flow_capacity_warnings( 

308 mapping=mapping, 

309 curve=curve, 

310 driver_inputs=driver_inputs, 

311 resolved_inputs=resolved_inputs, 

312 ) 

313 ) 

314 except (CostCurveEvaluationError, FormulaError, ValueError) as exc: 

315 warning_payload["warnings"].append(_cost_curve_error_warning(exc, mapping=mapping, driver=driver)) 

316 confidence = "blocked" 

317 

318 fields: dict[str, Any] = { 

319 "cost_curve": curve, 

320 "label": f"{mapping.costable_item.name} capital cost", 

321 "line_type": "equipment_capital", 

322 "amount": amount, 

323 "currency": curve.currency or curve.output_unit or "NZD", 

324 "included": mapping.costable_item.included, 

325 "manual": False, 

326 "source": GENERATED_CAPITAL_LINE_SOURCE, 

327 "confidence": confidence, 

328 "minimum_peak_demand_kw": minimum_peak_demand_kw, 

329 "warning_payload": json_ready(warning_payload), 

330 } 

331 # Driver inputs are user-editable on generated capital lines. Populate the 

332 # declarative defaults only for a new or still-empty row so recalculation 

333 # cannot erase the user's property/manual selections. 

334 reconciled_inputs = _reconciled_driver_inputs( 

335 curve=curve, 

336 existing_inputs=( 

337 normalize_capital_cost_driver_inputs(existing_line.driver_inputs) 

338 if existing_line is not None and existing_line.driver_inputs 

339 else {} 

340 ), 

341 ) 

342 if existing_line is None or existing_line.driver_inputs != reconciled_inputs: 

343 fields["driver_inputs"] = reconciled_inputs 

344 return fields 

345 

346 

347def _pending_generated_capital_line_fields( 

348 *, 

349 mapping: EquipmentMapping, 

350 minimum_peak_demand_kw: Decimal | None, 

351) -> dict[str, Any]: 

352 """Return a generated capital line that is configurable but not costed yet.""" 

353 warning_payload = { 

354 "calculation_method": "cost_curve", 

355 "cost_curve_id": None, 

356 "cost_curve_key": "", 

357 "cost_basis": mapping.cost_basis, 

358 "minimum_peak_demand_kw": None if minimum_peak_demand_kw is None else str(minimum_peak_demand_kw), 

359 "warnings": [], 

360 } 

361 return { 

362 "cost_curve": None, 

363 "label": f"{mapping.costable_item.name} capital cost", 

364 "line_type": "equipment_capital", 

365 "amount": None, 

366 "currency": "NZD", 

367 "included": mapping.costable_item.included, 

368 "manual": False, 

369 "source": GENERATED_CAPITAL_LINE_SOURCE, 

370 "driver_inputs": {}, 

371 "confidence": "blocked", 

372 "minimum_peak_demand_kw": minimum_peak_demand_kw, 

373 "warning_payload": json_ready(warning_payload), 

374 } 

375 

376 

377def _curve_required_driver_specs(curve: CostCurve) -> tuple[CostCurveDriverSpec, ...]: 

378 """Return the selected curve's validated driver spec models.""" 

379 return parse_required_driver_specs(curve.required_driver_specs) 

380 

381 

382def _curve_required_driver_specs_payload(curve: CostCurve) -> list[CostCurveDriverSpecPayload]: 

383 """Return the selected curve's JSON-ready driver spec payload.""" 

384 return driver_specs_payload(_curve_required_driver_specs(curve)) 

385 

386 

387def _default_driver_inputs(curve: CostCurve) -> CapitalCostDriverInputsPayload: 

388 """Create JSON-ready per-spec capital-line input rows from typed specs.""" 

389 return default_driver_inputs_payload(_curve_required_driver_specs(curve)) 

390 

391 

392def _reconciled_driver_inputs( 

393 *, 

394 curve: CostCurve, 

395 existing_inputs: CapitalCostDriverInputsPayload, 

396) -> CapitalCostDriverInputsPayload: 

397 """Preserve matching driver inputs and reset keys that no longer match the curve.""" 

398 defaults = _default_driver_inputs(curve) 

399 reconciled: CapitalCostDriverInputsPayload = {} 

400 specs_by_key = {spec.key: spec for spec in _curve_required_driver_specs(curve)} 

401 for key, default_input in defaults.items(): 

402 existing_input = existing_inputs.get(key) 

403 spec = specs_by_key[key] 

404 reconciled[key] = ( 

405 existing_input 

406 if _driver_input_matches_spec(existing_input, spec) 

407 else default_input 

408 ) 

409 return reconciled 

410 

411 

412def _driver_input_matches_spec(driver_input: Any, spec: CostCurveDriverSpec) -> bool: 

413 if not isinstance(driver_input, dict): 

414 return False 

415 try: 

416 parsed_input = CapitalCostDriverInput.model_validate(driver_input) 

417 except ValueError: 

418 return False 

419 if parsed_input.source and parsed_input.source not in spec.source_options: 

420 return False 

421 return cost_curve_units_compatible(parsed_input.unit, spec.unit) 

422 

423 

424def _driver_inputs_reference_property(driver_inputs: Any, property_info_id: int) -> bool: 

425 """Return whether a spec-keyed driver-input JSON payload references a property.""" 

426 if not isinstance(driver_inputs, dict): 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true

427 return False 

428 return any( 

429 isinstance(driver_input, dict) 

430 and driver_input.get("source") == "property" 

431 and driver_input.get("property_info") == property_info_id 

432 for driver_input in driver_inputs.values() 

433 ) 

434 

435 

436def _capital_factor_row( 

437 *, 

438 kind: str, 

439 label: str, 

440 amount: Decimal, 

441 factor: Decimal | None = None, 

442 percent: Decimal | None = None, 

443 detail: str = "", 

444) -> dict[str, Any]: 

445 """Serialize one capital factor step for generated-line audit payloads.""" 

446 return { 

447 "kind": kind, 

448 "label": label, 

449 "factor": None if factor is None else str(factor), 

450 "percent": None if percent is None else str(percent), 

451 "amount": str(result_amount(amount)), 

452 "detail": detail, 

453 } 

454 

455 

456def _resolved_driver_inputs( 

457 *, 

458 curve: CostCurve, 

459 driver_inputs: CapitalCostDriverInputsPayload, 

460 mapping: EquipmentMapping, 

461) -> dict[str, ResolvedDriverInput]: 

462 """Resolve persisted driver-input selections into evaluator-ready values.""" 

463 resolved: dict[str, ResolvedDriverInput] = {} 

464 for spec in _curve_required_driver_specs(curve): 

465 driver_input_payload = driver_inputs.get(spec.key) 

466 if driver_input_payload is None: 466 ↛ 467line 466 didn't jump to line 467 because the condition on line 466 was never true

467 continue 

468 driver_input = CapitalCostDriverInput.model_validate(driver_input_payload) 

469 if driver_input.source == "property": 

470 property_info_id = driver_input.property_info 

471 if property_info_id is None: 471 ↛ 472line 471 didn't jump to line 472 because the condition on line 471 was never true

472 raise CostCurveEvaluationError( 

473 "missing_cost_curve_input", 

474 "Property-backed cost curve input has no selected property.", 

475 context={"curve_key": curve.curve_key, "input_key": spec.key}, 

476 ) 

477 property_info = spec_property_info(mapping=mapping, property_info_id=property_info_id) 

478 value = property_info.get_value() 

479 if value in (None, ""): 

480 raise CostCurveEvaluationError( 

481 "missing_cost_curve_input", 

482 "Selected cost curve input property has no value.", 

483 context={ 

484 "curve_key": curve.curve_key, 

485 "input_key": spec.key, 

486 "property_info_id": property_info_id, 

487 }, 

488 ) 

489 resolved[spec.key] = { 

490 "value": value, 

491 "unit": property_info.unit or driver_input.unit or spec.unit, 

492 } 

493 elif driver_input.source == "manual": 

494 if driver_input.manual_value == "": 494 ↛ 495line 494 didn't jump to line 495 because the condition on line 494 was never true

495 raise CostCurveEvaluationError( 

496 "missing_cost_curve_input", 

497 "Manual cost curve input has no value.", 

498 context={"curve_key": curve.curve_key, "input_key": spec.key}, 

499 ) 

500 resolved[spec.key] = { 

501 "value": driver_input.manual_value, 

502 "unit": driver_input.unit or spec.unit, 

503 } 

504 elif spec.required: 504 ↛ 464line 504 didn't jump to line 464 because the condition on line 504 was always true

505 raise CostCurveEvaluationError( 

506 "missing_cost_curve_input", 

507 "Cost curve input has no selected source.", 

508 context={"curve_key": curve.curve_key, "input_key": spec.key}, 

509 ) 

510 return resolved 

511 

512 

513def spec_property_info(*, mapping: EquipmentMapping, property_info_id: int): 

514 from core.auxiliary.models import PropertyInfo 

515 

516 try: 

517 property_info = PropertyInfo.objects.get(pk=property_info_id, flowsheet=mapping.flowsheet) 

518 except PropertyInfo.DoesNotExist as exc: 

519 raise CostCurveEvaluationError( 

520 "invalid_cost_curve_input_property", 

521 "Selected cost curve input property does not belong to this flowsheet.", 

522 context={"property_info_id": property_info_id}, 

523 ) from exc 

524 driver = getattr(mapping.costable_item, "cost_driver", None) 

525 if driver is not None: 

526 try: 

527 validate_cost_driver_property(driver, property_info) 

528 except ValueError as exc: 

529 raise CostCurveEvaluationError( 

530 "invalid_cost_curve_input_property", 

531 str(exc), 

532 context={"property_info_id": property_info_id, "costable_item_id": mapping.costable_item_id}, 

533 ) from exc 

534 return property_info 

535 

536 

537def _flow_capacity_warnings( 

538 *, 

539 mapping: EquipmentMapping, 

540 curve: CostCurve, 

541 driver_inputs: CapitalCostDriverInputsPayload, 

542 resolved_inputs: dict[str, ResolvedDriverInput], 

543) -> list[dict[str, Any]]: 

544 """Warn when manual HX flow capacity is smaller than inlet stream flow.""" 

545 warnings: list[dict[str, Any]] = [] 

546 for spec in _curve_required_driver_specs(curve): 

547 flow_property_key = _flow_capacity_property_key(spec) 

548 if flow_property_key is None: 

549 continue 

550 driver_input_payload = driver_inputs.get(spec.key) 

551 if not isinstance(driver_input_payload, dict): 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true

552 continue 

553 driver_input = CapitalCostDriverInput.model_validate(driver_input_payload) 

554 if driver_input.source != "manual": 554 ↛ 555line 554 didn't jump to line 555 because the condition on line 554 was never true

555 continue 

556 resolved_input = resolved_inputs.get(spec.key) 

557 if resolved_input is None: 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true

558 continue 

559 capacity = _decimal_or_none(resolved_input.get("value")) 

560 if capacity is None: 560 ↛ 561line 560 didn't jump to line 561 because the condition on line 560 was never true

561 continue 

562 capacity_unit = str(resolved_input.get("unit") or spec.unit) 

563 capacity_overage = _inlet_flow_capacity_overage( 

564 mapping=mapping, 

565 property_key=flow_property_key, 

566 capacity=capacity, 

567 capacity_unit=capacity_unit, 

568 ) 

569 if capacity_overage is None: 

570 continue 

571 warnings.append( 

572 warning_record( 

573 code="cost_curve_flow_capacity_exceeded", 

574 severity="warning", 

575 message=( 

576 "Combined input stream flow exceeds the selected " 

577 f"{spec.label.lower()} for this cost curve." 

578 ), 

579 context={ 

580 "curve_key": curve.curve_key, 

581 "input_key": spec.key, 

582 "label": spec.label, 

583 "capacity_value": str(capacity), 

584 "capacity_unit": capacity_unit, 

585 **capacity_overage, 

586 }, 

587 ) 

588 ) 

589 return warnings 

590 

591 

592def _flow_capacity_property_key(spec: CostCurveDriverSpec) -> str | None: 

593 if spec.role != "discrete_selector": 

594 return None 

595 if spec.key == "volumetric_flow": 595 ↛ 597line 595 didn't jump to line 597 because the condition on line 595 was always true

596 return "flow_vol" 

597 if spec.key == "mass_flow": 

598 return "flow_mass" 

599 return None 

600 

601 

602def _inlet_flow_capacity_overage( 

603 *, 

604 mapping: EquipmentMapping, 

605 property_key: str, 

606 capacity: Decimal, 

607 capacity_unit: str, 

608) -> dict[str, Any] | None: 

609 simulation_object = mapping.costable_item.simulation_object 

610 if simulation_object is None: 610 ↛ 611line 610 didn't jump to line 611 because the condition on line 610 was never true

611 return None 

612 total_flow = Decimal("0") 

613 contributing_streams: list[dict[str, str]] = [] 

614 ports = ( 

615 simulation_object.ports.filter(direction=ConType.Inlet) 

616 .select_related("stream") 

617 .order_by("index", "pk") 

618 ) 

619 for port in ports: 

620 stream = port.stream 

621 if stream is None or getattr(stream, "properties", None) is None: 621 ↛ 622line 621 didn't jump to line 622 because the condition on line 621 was never true

622 continue 

623 property_info = ( 

624 stream.properties.containedProperties.filter(key=property_key, type="numeric") 

625 .prefetch_related("values") 

626 .first() 

627 ) 

628 if property_info is None: 628 ↛ 629line 628 didn't jump to line 629 because the condition on line 628 was never true

629 continue 

630 stream_value = _decimal_or_none(property_info.get_value()) 

631 if stream_value is None: 631 ↛ 632line 631 didn't jump to line 632 because the condition on line 631 was never true

632 continue 

633 stream_unit = property_info.unit or capacity_unit 

634 if not cost_curve_units_compatible(stream_unit, capacity_unit): 634 ↛ 635line 634 didn't jump to line 635 because the condition on line 634 was never true

635 continue 

636 stream_value_in_capacity_unit = _convert_decimal_value( 

637 value=stream_value, 

638 source_unit=stream_unit, 

639 target_unit=capacity_unit, 

640 ) 

641 if stream_value_in_capacity_unit is None: 641 ↛ 642line 641 didn't jump to line 642 because the condition on line 641 was never true

642 continue 

643 total_flow += stream_value_in_capacity_unit 

644 contributing_streams.append( 

645 { 

646 "stream_id": str(stream.pk), 

647 "stream_name": stream.componentName or f"Stream {stream.pk}", 

648 "port_name": port.displayName, 

649 "property_id": str(property_info.pk), 

650 "property_key": property_info.key, 

651 "value": str(stream_value_in_capacity_unit), 

652 "unit": normalize_economics_unit_notation(capacity_unit), 

653 "selected_capacity": str(capacity), 

654 } 

655 ) 

656 if not contributing_streams or total_flow <= capacity: 

657 return None 

658 return { 

659 "total_value": str(total_flow), 

660 "total_unit": normalize_economics_unit_notation(capacity_unit), 

661 "streams": contributing_streams, 

662 } 

663 

664 

665def _convert_decimal_value(*, value: Decimal, source_unit: str, target_unit: str) -> Decimal | None: 

666 source_unit = normalize_economics_unit_notation(source_unit) 

667 target_unit = normalize_economics_unit_notation(target_unit) 

668 if source_unit == target_unit: 668 ↛ 670line 668 didn't jump to line 670 because the condition on line 668 was always true

669 return value 

670 try: 

671 return Decimal(str(convert_value(value, from_unit=source_unit, to_unit=target_unit))) 

672 except (ValueError, DecimalException, PintError): 

673 return None 

674 

675 

676def _decimal_or_none(value: Any) -> Decimal | None: 

677 if value in (None, ""): 677 ↛ 678line 677 didn't jump to line 678 because the condition on line 677 was never true

678 return None 

679 try: 

680 return Decimal(str(value)) 

681 except (DecimalException, ValueError): 

682 return None 

683 

684 

685def _base_curve_detail(evaluation) -> str: 

686 if evaluation.selected_variant: 

687 return f"{evaluation.selected_variant['label']} selected from lowest-cost candidate" 

688 return f"{evaluation.input_value} {evaluation.input_unit}" 

689 

690 

691def _cost_curve_error_warning(exc: Exception, *, mapping: EquipmentMapping, driver: CostDriver | None) -> dict[str, Any]: 

692 """Convert curve/driver failures into the generated-line warning contract.""" 

693 if isinstance(exc, CostCurveEvaluationError | FormulaError): 693 ↛ 700line 693 didn't jump to line 700 because the condition on line 693 was always true

694 return warning_record( 

695 code=exc.code, 

696 severity="error", 

697 message=exc.message, 

698 context=exc.context, 

699 ) 

700 return warning_record( 

701 code="invalid_cost_driver_value", 

702 severity="error", 

703 message=str(exc), 

704 context={ 

705 "costable_item_id": mapping.costable_item_id, 

706 "cost_driver_id": None if driver is None else driver.pk, 

707 "property_info_id": None if driver is None else driver.property_info_id, 

708 }, 

709 )