Coverage for backend/django/Economics/costing/cost_curves/driver_properties.py: 83%

377 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1from __future__ import annotations 

2 

3from dataclasses import dataclass 

4from typing import Any 

5 

6from core.auxiliary.enums import ConType 

7from core.auxiliary.models.PropertyInfo import PropertyInfo 

8from django.core.exceptions import ObjectDoesNotExist 

9from Economics.costing.capital.capital_line_sources import GENERATED_CAPITAL_LINE_SOURCE 

10from Economics.costing.models import CapitalCostLine, CostCurve, CostDriver, EquipmentMapping 

11from Economics.shared.choices import CostDriverSource 

12from Economics.costing.cost_curves.driver_specs import ( 

13 CapitalCostDriverInput, 

14 CapitalCostDriverInputsPayload, 

15 CostCurveDriverSpec, 

16 capital_cost_driver_input_payload, 

17 normalize_capital_cost_driver_inputs, 

18 parse_required_driver_specs, 

19) 

20from Economics.costing.cost_curves.registry import COST_DRIVER_RULES, CostDriverRule, PreferredProperty 

21from Economics.costing.cost_curves.evaluation import cost_curve_units_compatible 

22 

23 

24BulkDriverInputSetup = dict[str, dict[str, str]] 

25 

26 

27@dataclass(frozen=True) 

28class CostDriverPropertyOption: 

29 property_info: int 

30 scope: str 

31 object_id: int 

32 object_name: str 

33 object_type: str 

34 property_key: str 

35 display_name: str 

36 unit: str 

37 unit_type: str 

38 value_preview: str 

39 has_value: bool 

40 recommended: bool = False 

41 recommendation_label: str = "" 

42 

43 

44@dataclass(frozen=True) 

45class BulkEquipmentSetupResult: 

46 mapping: int 

47 costable_item: int 

48 unit_name: str 

49 cost_curve: int | None 

50 cost_curve_name: str 

51 curve_status: str 

52 curve_overwrite: bool 

53 cost_driver: int | None 

54 sizing_status: str 

55 sizing_property: int | None = None 

56 sizing_property_label: str = "" 

57 sizing_property_unit: str = "" 

58 sizing_property_scope: str = "" 

59 sizing_property_object_name: str = "" 

60 sizing_overwrite: bool = False 

61 driver_input_results: dict[str, "BulkEquipmentDriverInputResult"] | None = None 

62 message: str = "" 

63 

64 

65@dataclass(frozen=True) 

66class BulkEquipmentDriverInputResult: 

67 key: str 

68 label: str 

69 unit: str 

70 mode: str 

71 status: str 

72 property: int | None = None 

73 property_label: str = "" 

74 property_unit: str = "" 

75 property_scope: str = "" 

76 property_object_name: str = "" 

77 manual_value: str = "" 

78 overwrite: bool = False 

79 message: str = "" 

80 

81 

82def cost_driver_property_options(driver: CostDriver) -> list[CostDriverPropertyOption]: 

83 """Return scalar numeric properties that can size a unit cost curve.""" 

84 return _cost_driver_property_options(driver) 

85 

86 

87def _cost_driver_property_options( 

88 driver: CostDriver, 

89 *, 

90 equipment_category: str | None = None, 

91) -> list[CostDriverPropertyOption]: 

92 """Return scalar numeric sizing properties, optionally for a pending category.""" 

93 simulation_object = driver.costable_item.simulation_object 

94 if simulation_object is None: 94 ↛ 95line 94 didn't jump to line 95 because the condition on line 94 was never true

95 return [] 

96 

97 equipment_category = ( 

98 equipment_category 

99 if equipment_category is not None 

100 else _driver_equipment_category(driver) 

101 ) 

102 options: list[CostDriverPropertyOption] = [] 

103 options.extend( 

104 _property_options_for_object( 

105 driver, 

106 simulation_object, 

107 scope="unit", 

108 equipment_category=equipment_category, 

109 ) 

110 ) 

111 for stream in _connected_streams(simulation_object, direction=ConType.Inlet): 

112 options.extend( 

113 _property_options_for_object( 

114 driver, 

115 stream, 

116 scope="input_stream", 

117 equipment_category=equipment_category, 

118 ) 

119 ) 

120 for stream in _connected_streams(simulation_object, direction=ConType.Outlet): 

121 options.extend( 

122 _property_options_for_object( 

123 driver, 

124 stream, 

125 scope="output_stream", 

126 equipment_category=equipment_category, 

127 ) 

128 ) 

129 

130 seen: set[int] = set() 

131 unique_options: list[CostDriverPropertyOption] = [] 

132 for option in options: 

133 if option.property_info in seen: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true

134 continue 

135 seen.add(option.property_info) 

136 unique_options.append(option) 

137 return unique_options 

138 

139 

140def preview_or_apply_bulk_equipment_setup( 

141 *, 

142 mappings: list[EquipmentMapping], 

143 cost_curve: CostCurve | None, 

144 dry_run: bool, 

145 apply_cost_curve: bool, 

146 apply_recommended_sizing_property: bool, 

147 overwrite_sizing_property: bool, 

148 driver_inputs: BulkDriverInputSetup | None = None, 

149) -> list[BulkEquipmentSetupResult]: 

150 """Preview or apply one bulk capital-line setup decision across mappings.""" 

151 results: list[BulkEquipmentSetupResult] = [] 

152 explicit_driver_inputs = bool(driver_inputs) 

153 for mapping in mappings: 

154 driver = getattr(mapping.costable_item, "cost_driver", None) 

155 curve_status = _curve_status(mapping, cost_curve, apply_cost_curve) 

156 previous_curve_id = mapping.cost_curve_id 

157 effective_curve = cost_curve if apply_cost_curve and cost_curve is not None else mapping.cost_curve 

158 effective_equipment_category = _bulk_equipment_category( 

159 mapping=mapping, 

160 cost_curve=cost_curve, 

161 apply_cost_curve=apply_cost_curve, 

162 ) 

163 curve_overwrite = bool( 

164 apply_cost_curve 

165 and cost_curve is not None 

166 and previous_curve_id 

167 and previous_curve_id != cost_curve.pk 

168 ) 

169 sizing_result = _recommended_sizing_result( 

170 driver=driver, 

171 cost_curve=cost_curve, 

172 equipment_category=effective_equipment_category, 

173 apply_recommended_sizing_property=apply_recommended_sizing_property, 

174 overwrite_sizing_property=overwrite_sizing_property, 

175 ) 

176 driver_input_results = _driver_input_results( 

177 mapping=mapping, 

178 driver=driver, 

179 cost_curve=effective_curve, 

180 equipment_category=effective_equipment_category, 

181 driver_inputs=driver_inputs or {}, 

182 apply_recommended_sizing_property=apply_recommended_sizing_property, 

183 preserve_existing=not explicit_driver_inputs and not overwrite_sizing_property, 

184 ) 

185 

186 if not dry_run: 

187 update_fields: list[str] = [] 

188 if apply_cost_curve and cost_curve is not None and mapping.cost_curve_id != cost_curve.pk: 188 ↛ 197line 188 didn't jump to line 197 because the condition on line 188 was always true

189 mapping.cost_curve = cost_curve 

190 if cost_curve.equipment_category: 190 ↛ 193line 190 didn't jump to line 193 because the condition on line 190 was always true

191 mapping.equipment_category = cost_curve.equipment_category 

192 update_fields.append("equipment_category") 

193 if cost_curve.equipment_subtype: 

194 mapping.equipment_subtype = cost_curve.equipment_subtype 

195 update_fields.append("equipment_subtype") 

196 update_fields.extend(["cost_curve", "updated_at"]) 

197 if update_fields: 197 ↛ 199line 197 didn't jump to line 199 because the condition on line 197 was always true

198 mapping.save(update_fields=sorted(set(update_fields))) 

199 if driver is not None and sizing_result.option is not None: 199 ↛ 206line 199 didn't jump to line 206 because the condition on line 199 was always true

200 property_info = PropertyInfo.objects.get(pk=sizing_result.option.property_info) 

201 updates = normalize_property_cost_driver(driver, property_info) 

202 for field_name, value in updates.items(): 

203 setattr(driver, field_name, value) 

204 driver.save(update_fields=[*updates.keys(), "updated_at"]) 

205 

206 results.append( 

207 BulkEquipmentSetupResult( 

208 mapping=mapping.pk, 

209 costable_item=mapping.costable_item_id, 

210 unit_name=mapping.costable_item.name, 

211 cost_curve=cost_curve.pk if cost_curve is not None else mapping.cost_curve_id, 

212 cost_curve_name=cost_curve.name if cost_curve is not None else (mapping.cost_curve.name if mapping.cost_curve else ""), 

213 curve_status=curve_status, 

214 curve_overwrite=curve_overwrite, 

215 cost_driver=driver.pk if driver is not None else None, 

216 sizing_status=sizing_result.status, 

217 sizing_property=sizing_result.option.property_info if sizing_result.option is not None else None, 

218 sizing_property_label=sizing_result.option.display_name if sizing_result.option is not None else "", 

219 sizing_property_unit=sizing_result.option.unit if sizing_result.option is not None else "", 

220 sizing_property_scope=sizing_result.option.scope if sizing_result.option is not None else "", 

221 sizing_property_object_name=sizing_result.option.object_name if sizing_result.option is not None else "", 

222 sizing_overwrite=sizing_result.overwrite, 

223 driver_input_results=driver_input_results, 

224 message=sizing_result.message, 

225 ) 

226 ) 

227 return results 

228 

229 

230@dataclass(frozen=True) 

231class _RecommendedSizingResult: 

232 status: str 

233 option: CostDriverPropertyOption | None = None 

234 overwrite: bool = False 

235 message: str = "" 

236 

237 

238def _curve_status(mapping: EquipmentMapping, cost_curve: CostCurve | None, apply_cost_curve: bool) -> str: 

239 if not apply_cost_curve or cost_curve is None: 239 ↛ 240line 239 didn't jump to line 240 because the condition on line 239 was never true

240 return "not_requested" 

241 if mapping.cost_curve_id == cost_curve.pk: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true

242 return "unchanged" 

243 return "ready" 

244 

245 

246def _recommended_sizing_result( 

247 *, 

248 driver: CostDriver | None, 

249 cost_curve: CostCurve | None, 

250 equipment_category: str, 

251 apply_recommended_sizing_property: bool, 

252 overwrite_sizing_property: bool, 

253) -> _RecommendedSizingResult: 

254 if not apply_recommended_sizing_property: 254 ↛ 255line 254 didn't jump to line 255 because the condition on line 254 was never true

255 return _RecommendedSizingResult(status="not_requested") 

256 if driver is None: 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true

257 return _RecommendedSizingResult(status="no_driver", message="No cost driver is available.") 

258 has_existing_sizing = bool(driver.property_info_id) or driver.design_value is not None 

259 if has_existing_sizing and not overwrite_sizing_property: 

260 return _RecommendedSizingResult( 

261 status="preserved", 

262 message="Existing sizing property or custom value will be preserved.", 

263 ) 

264 

265 option = _recommended_property_option( 

266 driver, 

267 cost_curve, 

268 equipment_category=equipment_category, 

269 ) 

270 if option is None: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true

271 return _RecommendedSizingResult( 

272 status="no_recommendation", 

273 message="No compatible sizing property was found.", 

274 ) 

275 return _RecommendedSizingResult( 

276 status="ready", 

277 option=option, 

278 overwrite=has_existing_sizing, 

279 ) 

280 

281 

282def _driver_input_results( 

283 *, 

284 mapping: EquipmentMapping, 

285 driver: CostDriver | None, 

286 cost_curve: CostCurve | None, 

287 equipment_category: str, 

288 driver_inputs: BulkDriverInputSetup, 

289 apply_recommended_sizing_property: bool, 

290 preserve_existing: bool, 

291) -> dict[str, BulkEquipmentDriverInputResult]: 

292 if cost_curve is None: 292 ↛ 293line 292 didn't jump to line 293 because the condition on line 292 was never true

293 return {} 

294 try: 

295 specs = parse_required_driver_specs(cost_curve.required_driver_specs) 

296 except ValueError: 

297 return {} 

298 existing_inputs = _generated_line_driver_inputs(mapping) 

299 return { 

300 spec.key: _driver_input_result( 

301 mapping=mapping, 

302 driver=driver, 

303 spec=spec, 

304 equipment_category=equipment_category, 

305 setup=driver_inputs.get(spec.key), 

306 existing_input=existing_inputs.get(spec.key), 

307 apply_recommended_sizing_property=apply_recommended_sizing_property, 

308 preserve_existing=preserve_existing, 

309 ) 

310 for spec in specs 

311 } 

312 

313 

314def _driver_input_result( 

315 *, 

316 mapping: EquipmentMapping, 

317 driver: CostDriver | None, 

318 spec: CostCurveDriverSpec, 

319 equipment_category: str, 

320 setup: dict[str, str] | None, 

321 existing_input: dict[str, Any] | None, 

322 apply_recommended_sizing_property: bool, 

323 preserve_existing: bool, 

324) -> BulkEquipmentDriverInputResult: 

325 mode = (setup or {}).get("mode") or _default_bulk_driver_input_mode( 

326 spec, 

327 apply_recommended_sizing_property=apply_recommended_sizing_property, 

328 ) 

329 existing_has_value = _driver_input_has_value(existing_input) 

330 if mode == "keep": 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true

331 return BulkEquipmentDriverInputResult( 

332 key=spec.key, 

333 label=spec.label, 

334 unit=spec.unit, 

335 mode=mode, 

336 status="preserved" if existing_has_value else "missing", 

337 message="" if existing_has_value else "No existing driver input is selected.", 

338 ) 

339 if mode == "manual": 

340 if "manual" not in spec.source_options: 340 ↛ 341line 340 didn't jump to line 341 because the condition on line 340 was never true

341 return BulkEquipmentDriverInputResult( 

342 key=spec.key, 

343 label=spec.label, 

344 unit=spec.unit, 

345 mode=mode, 

346 status="unsupported", 

347 message="Manual values are not allowed for this driver input.", 

348 ) 

349 manual_value = (setup or {}).get("manual_value", "").strip() 

350 return BulkEquipmentDriverInputResult( 

351 key=spec.key, 

352 label=spec.label, 

353 unit=spec.unit, 

354 mode=mode, 

355 status="ready" if manual_value else "missing", 

356 manual_value=manual_value, 

357 overwrite=existing_has_value, 

358 message="" if manual_value else "Enter a manual value.", 

359 ) 

360 if mode != "auto_property": 360 ↛ 361line 360 didn't jump to line 361 because the condition on line 360 was never true

361 return BulkEquipmentDriverInputResult( 

362 key=spec.key, 

363 label=spec.label, 

364 unit=spec.unit, 

365 mode=mode, 

366 status="unsupported", 

367 message="Unsupported driver input setup mode.", 

368 ) 

369 if "property" not in spec.source_options: 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true

370 return BulkEquipmentDriverInputResult( 

371 key=spec.key, 

372 label=spec.label, 

373 unit=spec.unit, 

374 mode=mode, 

375 status="unsupported", 

376 message="Properties are not allowed for this driver input.", 

377 ) 

378 if existing_has_value and preserve_existing: 378 ↛ 379line 378 didn't jump to line 379 because the condition on line 378 was never true

379 return BulkEquipmentDriverInputResult( 

380 key=spec.key, 

381 label=spec.label, 

382 unit=spec.unit, 

383 mode=mode, 

384 status="preserved", 

385 overwrite=False, 

386 message="Existing driver input will be preserved.", 

387 ) 

388 if driver is None: 388 ↛ 389line 388 didn't jump to line 389 because the condition on line 388 was never true

389 return BulkEquipmentDriverInputResult( 

390 key=spec.key, 

391 label=spec.label, 

392 unit=spec.unit, 

393 mode=mode, 

394 status="no_driver", 

395 message="No cost driver is available.", 

396 ) 

397 option = _recommended_property_option_for_unit( 

398 driver, 

399 unit=spec.unit, 

400 equipment_category=equipment_category, 

401 ) 

402 if option is None: 

403 return BulkEquipmentDriverInputResult( 

404 key=spec.key, 

405 label=spec.label, 

406 unit=spec.unit, 

407 mode=mode, 

408 status="no_recommendation", 

409 message="No compatible property was found.", 

410 ) 

411 return BulkEquipmentDriverInputResult( 

412 key=spec.key, 

413 label=spec.label, 

414 unit=spec.unit, 

415 mode=mode, 

416 status="ready", 

417 property=option.property_info, 

418 property_label=option.display_name, 

419 property_unit=option.unit, 

420 property_scope=option.scope, 

421 property_object_name=option.object_name, 

422 overwrite=existing_has_value, 

423 ) 

424 

425 

426def apply_bulk_driver_inputs( 

427 *, 

428 mappings: list[EquipmentMapping], 

429 cost_curve: CostCurve | None, 

430 apply_cost_curve: bool, 

431 apply_recommended_sizing_property: bool, 

432 overwrite_sizing_property: bool, 

433 driver_inputs: BulkDriverInputSetup | None, 

434) -> bool: 

435 changed = False 

436 explicit_driver_inputs = bool(driver_inputs) 

437 for mapping in mappings: 

438 driver = getattr(mapping.costable_item, "cost_driver", None) 

439 effective_curve = cost_curve if apply_cost_curve and cost_curve is not None else mapping.cost_curve 

440 if effective_curve is None: 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true

441 continue 

442 line = _generated_line_for_mapping(mapping) 

443 if line is None: 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true

444 continue 

445 effective_equipment_category = _bulk_equipment_category( 

446 mapping=mapping, 

447 cost_curve=cost_curve, 

448 apply_cost_curve=apply_cost_curve, 

449 ) 

450 results = _driver_input_results( 

451 mapping=mapping, 

452 driver=driver, 

453 cost_curve=effective_curve, 

454 equipment_category=effective_equipment_category, 

455 driver_inputs=driver_inputs or {}, 

456 apply_recommended_sizing_property=apply_recommended_sizing_property, 

457 preserve_existing=not explicit_driver_inputs and not overwrite_sizing_property, 

458 ) 

459 if not results: 459 ↛ 460line 459 didn't jump to line 460 because the condition on line 459 was never true

460 continue 

461 next_inputs = _driver_inputs_payload_for_results(effective_curve, line.driver_inputs, results) 

462 if line.driver_inputs != next_inputs: 462 ↛ 437line 462 didn't jump to line 437 because the condition on line 462 was always true

463 line.driver_inputs = next_inputs 

464 line.save(update_fields=["driver_inputs", "updated_at"]) 

465 changed = True 

466 return changed 

467 

468 

469def _driver_inputs_payload_for_results( 

470 cost_curve: CostCurve, 

471 existing_inputs: Any, 

472 results: dict[str, BulkEquipmentDriverInputResult], 

473) -> CapitalCostDriverInputsPayload: 

474 try: 

475 current_inputs = normalize_capital_cost_driver_inputs(existing_inputs or {}) 

476 except ValueError: 

477 current_inputs = {} 

478 payload: CapitalCostDriverInputsPayload = {} 

479 for spec in parse_required_driver_specs(cost_curve.required_driver_specs): 

480 existing_input = current_inputs.get(spec.key) 

481 result = results.get(spec.key) 

482 if result is None or result.status == "preserved": 482 ↛ 483line 482 didn't jump to line 483 because the condition on line 482 was never true

483 payload[spec.key] = existing_input or _blank_driver_input(spec) 

484 elif result.status == "ready" and result.property is not None: 

485 payload[spec.key] = capital_cost_driver_input_payload( 

486 CapitalCostDriverInput( 

487 source="property", 

488 property_info=result.property, 

489 manual_value="", 

490 unit=spec.unit, 

491 ) 

492 ) 

493 elif result.status == "ready" and result.mode == "manual": 493 ↛ 503line 493 didn't jump to line 503 because the condition on line 493 was always true

494 payload[spec.key] = capital_cost_driver_input_payload( 

495 CapitalCostDriverInput( 

496 source="manual", 

497 property_info=None, 

498 manual_value=result.manual_value, 

499 unit=spec.unit, 

500 ) 

501 ) 

502 else: 

503 payload[spec.key] = _blank_driver_input(spec) 

504 return payload 

505 

506 

507def _blank_driver_input(spec: CostCurveDriverSpec): 

508 return capital_cost_driver_input_payload( 

509 CapitalCostDriverInput(source="", property_info=None, manual_value="", unit=spec.unit) 

510 ) 

511 

512 

513def _default_bulk_driver_input_mode( 

514 spec: CostCurveDriverSpec, 

515 *, 

516 apply_recommended_sizing_property: bool, 

517) -> str: 

518 if not apply_recommended_sizing_property: 518 ↛ 519line 518 didn't jump to line 519 because the condition on line 518 was never true

519 return "keep" 

520 if "property" in spec.source_options: 520 ↛ 522line 520 didn't jump to line 522 because the condition on line 520 was always true

521 return "auto_property" 

522 if "manual" in spec.source_options: 

523 return "manual" 

524 return "keep" 

525 

526 

527def _recommended_property_option( 

528 driver: CostDriver, 

529 cost_curve: CostCurve | None, 

530 *, 

531 equipment_category: str, 

532) -> CostDriverPropertyOption | None: 

533 curve_unit = _primary_formula_input_unit(cost_curve) 

534 return _recommended_property_option_for_unit( 

535 driver, 

536 unit=curve_unit, 

537 equipment_category=equipment_category, 

538 ) 

539 

540 

541def _recommended_property_option_for_unit( 

542 driver: CostDriver, 

543 *, 

544 unit: str | None, 

545 equipment_category: str, 

546) -> CostDriverPropertyOption | None: 

547 compatible_fallback: CostDriverPropertyOption | None = None 

548 for option in _cost_driver_property_options( 

549 driver, 

550 equipment_category=equipment_category, 

551 ): 

552 if unit and not cost_curve_units_compatible(option.unit, unit): 552 ↛ 553line 552 didn't jump to line 553 because the condition on line 552 was never true

553 continue 

554 if compatible_fallback is None: 

555 compatible_fallback = option 

556 if not option.recommended: 

557 continue 

558 return option 

559 return compatible_fallback 

560 

561 

562def _generated_line_for_mapping(mapping: EquipmentMapping) -> CapitalCostLine | None: 

563 return ( 

564 CapitalCostLine.objects.filter( 

565 flowsheet=mapping.flowsheet, 

566 study=mapping.costable_item.study, 

567 costable_item=mapping.costable_item, 

568 source=GENERATED_CAPITAL_LINE_SOURCE, 

569 ) 

570 .order_by("pk") 

571 .first() 

572 ) 

573 

574 

575def _generated_line_driver_inputs(mapping: EquipmentMapping) -> CapitalCostDriverInputsPayload: 

576 line = _generated_line_for_mapping(mapping) 

577 if line is None: 

578 return {} 

579 try: 

580 return normalize_capital_cost_driver_inputs(line.driver_inputs or {}) 

581 except ValueError: 

582 return {} 

583 

584 

585def _driver_input_has_value(driver_input: dict[str, Any] | None) -> bool: 

586 if not driver_input: 

587 return False 

588 source = driver_input.get("source") 

589 if source == "property": 589 ↛ 590line 589 didn't jump to line 590 because the condition on line 589 was never true

590 return driver_input.get("property_info") is not None 

591 if source == "manual": 591 ↛ 592line 591 didn't jump to line 592 because the condition on line 591 was never true

592 return bool(str(driver_input.get("manual_value") or "").strip()) 

593 return False 

594 

595 

596def _primary_formula_input_unit(cost_curve: CostCurve | None) -> str | None: 

597 """Return the recommendation unit for setup-only cost-driver suggestions.""" 

598 if cost_curve is None: 598 ↛ 599line 598 didn't jump to line 599 because the condition on line 598 was never true

599 return None 

600 try: 

601 specs = parse_required_driver_specs(cost_curve.required_driver_specs) 

602 except ValueError: 

603 return None 

604 primary_spec = next((spec for spec in specs if spec.role == "formula_input" and spec.primary), None) 

605 return None if primary_spec is None else primary_spec.unit 

606 

607 

608def _bulk_equipment_category( 

609 *, 

610 mapping: EquipmentMapping, 

611 cost_curve: CostCurve | None, 

612 apply_cost_curve: bool, 

613) -> str: 

614 if apply_cost_curve and cost_curve is not None and cost_curve.equipment_category: 614 ↛ 616line 614 didn't jump to line 616 because the condition on line 614 was always true

615 return cost_curve.equipment_category 

616 driver = getattr(mapping.costable_item, "cost_driver", None) 

617 if driver is None: 

618 return mapping.equipment_category 

619 return mapping.equipment_category or _driver_equipment_category(driver) 

620 

621 

622def validate_cost_driver_property(driver: CostDriver, property_info: PropertyInfo) -> None: 

623 available_ids = {option.property_info for option in cost_driver_property_options(driver)} 

624 if property_info.pk not in available_ids: 

625 raise ValueError( 

626 "Cost driver property must belong to the unit operation, an input stream, or an output stream." 

627 ) 

628 

629 

630def normalize_property_cost_driver(driver: CostDriver, property_info: PropertyInfo | None) -> dict[str, Any]: 

631 """Return model field updates implied by selecting a driver property.""" 

632 if property_info is None: 

633 if driver.design_value is not None: 633 ↛ 639line 633 didn't jump to line 639 because the condition on line 633 was always true

634 return { 

635 "source": CostDriverSource.MANUAL_OVERRIDE, 

636 "property_info": None, 

637 "sizing_mode": "manual", 

638 } 

639 return { 

640 "source": CostDriverSource.UNRESOLVED, 

641 "property_info": None, 

642 "sizing_mode": "", 

643 } 

644 

645 payload = dict(driver.warning_payload or {}) 

646 payload.update( 

647 { 

648 "driver_property_id": property_info.pk, 

649 "driver_property_key": property_info.key, 

650 "driver_property_name": property_info.displayName, 

651 "driver_property_unit": property_info.unit, 

652 "warnings": [], 

653 "design_value_basis": "user_selected_unit_or_connected_stream_property", 

654 } 

655 ) 

656 payload.update(_registry_payload_for_property(driver, property_info)) 

657 return { 

658 "source": CostDriverSource.PROPERTY, 

659 "property_info": property_info, 

660 "canonical_unit": property_info.unit or "", 

661 "design_value": None, 

662 "sizing_mode": "property", 

663 "unresolved_reason_code": "", 

664 "warning_payload": payload, 

665 } 

666 

667 

668def apply_recommended_property_for_mapping(mapping: EquipmentMapping) -> CostDriver | None: 

669 """Select the first recommended property after equipment-category changes.""" 

670 driver = ( 

671 CostDriver.objects.filter(costable_item=mapping.costable_item) 

672 .select_related("costable_item", "costable_item__simulation_object") 

673 .first() 

674 ) 

675 if driver is None or driver.property_info_id is not None or driver.design_value is not None: 

676 return None 

677 

678 for option in cost_driver_property_options(driver): 

679 if not option.recommended or not option.has_value: 

680 continue 

681 property_info = PropertyInfo.objects.get(pk=option.property_info) 

682 updates = normalize_property_cost_driver(driver, property_info) 

683 for field_name, value in updates.items(): 

684 setattr(driver, field_name, value) 

685 driver.save(update_fields=[*updates.keys(), "updated_at"]) 

686 return driver 

687 return None 

688 

689 

690def _registry_payload_for_property(driver: CostDriver, property_info: PropertyInfo) -> dict[str, str]: 

691 simulation_object = driver.costable_item.simulation_object 

692 if simulation_object is None: 692 ↛ 693line 692 didn't jump to line 693 because the condition on line 692 was never true

693 return {} 

694 for rule in COST_DRIVER_RULES: 

695 if simulation_object.objectType not in rule.compatible_object_types: 

696 continue 

697 preferred_properties = rule.preferred_properties + rule.preferred_input_stream_properties 

698 for preferred_property in preferred_properties: 

699 if property_info.key != preferred_property.key or property_info.unitType != preferred_property.unit_type: 

700 continue 

701 return { 

702 "equipment_category": rule.equipment_category, 

703 "curve_input_variable": preferred_property.curve_input_variable or rule.curve_input_variable, 

704 } 

705 return {} 

706 

707 

708def _connected_streams(simulation_object, *, direction: str): 

709 ports = simulation_object.ports.filter(direction=direction).select_related("stream").order_by("index", "pk") 

710 return [port.stream for port in ports if port.stream is not None] 

711 

712 

713def _property_options_for_object( 

714 driver: CostDriver, 

715 simulation_object, 

716 *, 

717 scope: str, 

718 equipment_category: str, 

719) -> list[CostDriverPropertyOption]: 

720 property_set = getattr(simulation_object, "properties", None) 

721 if property_set is None: 721 ↛ 722line 721 didn't jump to line 722 because the condition on line 721 was never true

722 return [] 

723 properties = ( 

724 property_set.containedProperties.filter(type="numeric") 

725 .prefetch_related("values") 

726 .order_by("displayName", "key", "pk") 

727 ) 

728 return [ 

729 _option( 

730 driver, 

731 simulation_object, 

732 scope=scope, 

733 property_info=property_info, 

734 equipment_category=equipment_category, 

735 ) 

736 for property_info in properties 

737 if _is_scalar(property_info) and not _is_manual_override_property(driver, property_info) 

738 ] 

739 

740 

741def _option( 

742 driver: CostDriver, 

743 simulation_object, 

744 *, 

745 scope: str, 

746 property_info: PropertyInfo, 

747 equipment_category: str, 

748) -> CostDriverPropertyOption: 

749 value = property_info.get_value() 

750 recommendation_label = _recommendation_label( 

751 driver=driver, 

752 scope=scope, 

753 property_info=property_info, 

754 equipment_category=equipment_category, 

755 ) 

756 return CostDriverPropertyOption( 

757 property_info=property_info.pk, 

758 scope=scope, 

759 object_id=simulation_object.pk, 

760 object_name=simulation_object.componentName or f"Unit {simulation_object.pk}", 

761 object_type=simulation_object.objectType, 

762 property_key=property_info.key, 

763 display_name=property_info.displayName, 

764 unit=property_info.unit or "", 

765 unit_type=property_info.unitType or "", 

766 value_preview="" if value in (None, "") else str(value), 

767 has_value=property_info.has_value(), 

768 recommended=bool(recommendation_label), 

769 recommendation_label=recommendation_label, 

770 ) 

771 

772 

773def _is_scalar(property_info: PropertyInfo) -> bool: 

774 return len(list(property_info.values.all())) <= 1 

775 

776 

777def _is_manual_override_property(driver: CostDriver, property_info: PropertyInfo) -> bool: 

778 return driver.manual_property_info_id == property_info.pk 

779 

780 

781def manual_property_for_costable_item(costable_item) -> PropertyInfo | None: 

782 simulation_object = getattr(costable_item, "simulation_object", None) 

783 try: 

784 property_set = getattr(simulation_object, "properties", None) 

785 except ObjectDoesNotExist: 

786 property_set = None 

787 if simulation_object is None or property_set is None: 787 ↛ 788line 787 didn't jump to line 788 because the condition on line 787 was never true

788 return None 

789 for rule in COST_DRIVER_RULES: 789 ↛ 799line 789 didn't jump to line 799 because the loop on line 789 didn't complete

790 if simulation_object.objectType not in rule.compatible_object_types: 

791 continue 

792 return PropertyInfo.objects.filter( 

793 set=property_set, 

794 key=rule.manual_property.key, 

795 unitType=rule.manual_property.unit_type, 

796 unit=rule.manual_property.canonical_unit, 

797 index=0, 

798 ).first() 

799 return None 

800 

801 

802def _driver_equipment_category(driver: CostDriver) -> str: 

803 try: 

804 category = driver.costable_item.equipment_mapping.equipment_category 

805 except EquipmentMapping.DoesNotExist: 

806 category = "" 

807 if category: 

808 return category 

809 rule = _rule_for_driver(driver) 

810 return rule.equipment_category if rule is not None else "" 

811 

812 

813def _recommendation_label( 

814 *, 

815 driver: CostDriver, 

816 scope: str, 

817 property_info: PropertyInfo, 

818 equipment_category: str, 

819) -> str: 

820 rule = _rule_for_driver(driver) 

821 if rule is None: 

822 return "" 

823 

824 preferred_properties = ( 

825 rule.preferred_properties if scope == "unit" else rule.preferred_input_stream_properties 

826 ) 

827 if scope == "output_stream": 

828 preferred_properties = () 

829 

830 for preferred_property in preferred_properties: 

831 if not preferred_property_matches_property_info( 

832 preferred_property=preferred_property, 

833 property_info=property_info, 

834 rule=rule, 

835 equipment_category=equipment_category, 

836 ): 

837 continue 

838 category_label = _category_label(equipment_category) 

839 return f"Recommended for {category_label} sizing" if category_label else "Recommended sizing property" 

840 return "" 

841 

842 

843def preferred_property_matches_property_info( 

844 *, 

845 preferred_property: PreferredProperty, 

846 property_info: PropertyInfo, 

847 rule: CostDriverRule, 

848 equipment_category: str, 

849) -> bool: 

850 if property_info.key != preferred_property.key or property_info.unitType != preferred_property.unit_type: 

851 return False 

852 recommended_categories = preferred_property.recommended_equipment_categories 

853 if not recommended_categories and rule.equipment_category: 

854 recommended_categories = (rule.equipment_category,) 

855 return not recommended_categories or equipment_category in recommended_categories 

856 

857 

858def _rule_for_driver(driver: CostDriver) -> CostDriverRule | None: 

859 simulation_object = driver.costable_item.simulation_object 

860 if simulation_object is None: 860 ↛ 861line 860 didn't jump to line 861 because the condition on line 860 was never true

861 return None 

862 for rule in COST_DRIVER_RULES: 

863 if simulation_object.objectType in rule.compatible_object_types: 

864 return rule 

865 return None 

866 

867 

868def _category_label(value: str) -> str: 

869 return value.replace("_", " ") if value else ""