Coverage for backend/django/Economics/costing/cost_curves/evaluation.py: 83%

321 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1"""Evaluate Economics cost curves through the declared driver-spec contract.""" 

2 

3from __future__ import annotations 

4 

5from decimal import Decimal, DecimalException, InvalidOperation 

6from typing import TYPE_CHECKING, Any, Mapping, Sequence, TypeAlias, TypedDict 

7 

8from Economics.shared.choices import CostBasis, CostCurveEvaluationKind 

9 

10from Economics.costing.models import CostCurve 

11from Economics.formulas.engine.core import EconomicsFormula, FormulaError, FormulaEvaluation 

12from idaes_factory.unit_conversion.unit_conversion import can_convert, convert_value 

13from pint.errors import PintError 

14from pydantic import BaseModel, ConfigDict 

15 

16if TYPE_CHECKING: 

17 from Economics.costing.cost_curves.driver_specs import CostCurveDiscreteVariant, CostCurveDriverSpec 

18 

19class NormalizedCurveInputValue(TypedDict): 

20 value: Decimal 

21 

22 

23class NormalizedCurveInput(NormalizedCurveInputValue): 

24 unit: str 

25 raw_value: str 

26 raw_unit: str 

27 role: str 

28 variable_symbol: str 

29 

30 

31class SerializedNormalizedCurveInput(TypedDict): 

32 value: str 

33 unit: str 

34 role: str 

35 variable_symbol: str 

36 

37 

38NormalizedCurveInputs: TypeAlias = Mapping[str, NormalizedCurveInputValue] 

39FullNormalizedCurveInputs: TypeAlias = Mapping[str, NormalizedCurveInput] 

40 

41 

42class SelectedDiscreteVariantPayload(TypedDict): 

43 key: str 

44 label: str 

45 selector_values: dict[str, str] 

46 expression_text: str 

47 valid_min: str 

48 valid_max: str 

49 valid_range_note: str 

50 source_reference: str 

51 notes: str 

52 

53 

54class SelectorDiagnostic(TypedDict): 

55 curve_key: str 

56 input_key: str 

57 role: str 

58 label: str 

59 entered_value: str 

60 entered_unit: str 

61 selected_discrete_value: str 

62 selector_min: str 

63 selector_max: str 

64 range_direction: str | None 

65 range_bound: str | None 

66 absolute_difference: str 

67 percentage_difference: str 

68 soft_maximum_adjustment_percent: str 

69 

70 

71class DiscreteFamilyEvaluation(TypedDict): 

72 amount: Decimal 

73 formula: EconomicsFormula 

74 bindings: dict[str, Decimal] 

75 selected_variant: SelectedDiscreteVariantPayload 

76 selector_diagnostics: list[SelectorDiagnostic] 

77 warnings: list[CostCurveWarning] 

78 

79 

80class EconomicsContract(BaseModel): 

81 model_config = ConfigDict(frozen=True) 

82 

83 

84class CostCurveWarning(EconomicsContract): 

85 code: str 

86 severity: str 

87 message: str 

88 context: dict[str, Any] 

89 

90 

91class CostCurveEvaluationResult(EconomicsContract): 

92 curve_id: int | None 

93 curve_key: str 

94 cost_basis: str 

95 evaluation_kind: str 

96 input_value: Decimal | None 

97 input_unit: str 

98 canonical_value: Decimal | None 

99 canonical_unit: str 

100 amount: Decimal 

101 output_unit: str 

102 raw_inputs: dict[str, Any] 

103 normalized_inputs: dict[str, SerializedNormalizedCurveInput] 

104 selected_variant: SelectedDiscreteVariantPayload | None 

105 selector_diagnostics: tuple[SelectorDiagnostic, ...] 

106 formula_audit: dict[str, Any] 

107 warnings: tuple[CostCurveWarning, ...] 

108 source_metadata: dict[str, Any] 

109 

110 def warnings_payload(self) -> list[dict[str, Any]]: 

111 """Return JSON-ready warning records for result rows, UI, and exports.""" 

112 return [warning.model_dump(mode="json") for warning in self.warnings] 

113 

114 

115class CostCurveEvaluationError(ValueError): 

116 def __init__(self, code: str, message: str, *, context: dict[str, Any] | None = None): 

117 super().__init__(message) 

118 self.code = code 

119 self.message = message 

120 self.context = context or {} 

121 

122 

123def evaluate_cost_curve( 

124 curve: CostCurve, 

125 *, 

126 inputs_by_key: Mapping[str, Mapping[str, Any]], 

127 apply_installation_factor: bool = False, 

128) -> CostCurveEvaluationResult: 

129 """Evaluate a curve from explicit, per-spec driver inputs. 

130 

131 The evaluator intentionally has no single-variable fallback. Every caller 

132 must resolve values into ``inputs_by_key`` using the selected curve's 

133 ``required_driver_specs`` so expression and discrete-family curves share one 

134 runtime contract. 

135 """ 

136 specs = _curve_required_driver_specs(curve) 

137 formula_specs = tuple(spec for spec in specs if spec.role == "formula_input") 

138 selector_specs = tuple(spec for spec in specs if spec.role == "discrete_selector") 

139 warnings: list[CostCurveWarning] = [] 

140 normalized_inputs = _normalize_inputs(curve=curve, specs=specs, inputs_by_key=inputs_by_key) 

141 

142 for spec in specs: 

143 _append_spec_range_warnings( 

144 curve=curve, 

145 spec=spec, 

146 input_value=normalized_inputs[spec.key]["value"], 

147 warnings=warnings, 

148 ) 

149 _append_cost_basis_warnings( 

150 curve=curve, 

151 apply_installation_factor=apply_installation_factor, 

152 warnings=warnings, 

153 ) 

154 

155 if curve.evaluation_kind == CostCurveEvaluationKind.EXPRESSION: 

156 formula = _build_curve_formula(curve) 

157 bindings = _formula_bindings(formula_specs=formula_specs, normalized_inputs=normalized_inputs) 

158 selected_variant = None 

159 selector_diagnostics: tuple[SelectorDiagnostic, ...] = () 

160 conversion_diagnostics = _conversion_diagnostics(specs=specs, normalized_inputs=normalized_inputs) 

161 amount = _evaluate_formula(curve=curve, formula=formula, bindings=bindings) 

162 elif curve.evaluation_kind == CostCurveEvaluationKind.DISCRETE_FAMILY: 162 ↛ 183line 162 didn't jump to line 183 because the condition on line 162 was always true

163 if not selector_specs: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true

164 raise CostCurveEvaluationError( 

165 "missing_discrete_selectors", 

166 "Discrete-family curves require at least one discrete selector input.", 

167 context={"curve_key": curve.curve_key}, 

168 ) 

169 selected = _evaluate_discrete_family( 

170 curve=curve, 

171 formula_specs=formula_specs, 

172 selector_specs=selector_specs, 

173 normalized_inputs=normalized_inputs, 

174 ) 

175 formula = selected["formula"] 

176 bindings = selected["bindings"] 

177 amount = selected["amount"] 

178 selected_variant = selected["selected_variant"] 

179 selector_diagnostics = tuple(selected["selector_diagnostics"]) 

180 conversion_diagnostics = _conversion_diagnostics(specs=specs, normalized_inputs=normalized_inputs) 

181 warnings.extend(selected["warnings"]) 

182 else: 

183 raise CostCurveEvaluationError( 

184 "unsupported_cost_curve_evaluation_kind", 

185 "Cost curve evaluation_kind is not supported.", 

186 context={"curve_key": curve.curve_key, "evaluation_kind": curve.evaluation_kind}, 

187 ) 

188 

189 if amount is None: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true

190 raise CostCurveEvaluationError( 

191 "blocked_cost_curve_formula", 

192 formula.blocked_reason or "Cost curve formula is blocked.", 

193 context={"curve_key": curve.curve_key}, 

194 ) 

195 _ensure_finite_decimal( 

196 amount, 

197 code="non_finite_cost_output", 

198 message="Cost curve evaluated to a non-finite cost.", 

199 context={"curve_key": curve.curve_key, "output_unit": curve.output_unit}, 

200 ) 

201 if amount < 0: 

202 warnings.append( 

203 CostCurveWarning( 

204 code="negative_cost_output", 

205 severity="error", 

206 message="Cost curve evaluated to a negative cost.", 

207 context={"amount": str(amount), "output_unit": curve.output_unit}, 

208 ) 

209 ) 

210 

211 primary_spec = next((spec for spec in formula_specs if spec.primary), formula_specs[0]) 

212 primary_input = normalized_inputs[primary_spec.key] 

213 return CostCurveEvaluationResult( 

214 curve_id=curve.pk, 

215 curve_key=curve.curve_key, 

216 cost_basis=curve.cost_basis, 

217 evaluation_kind=curve.evaluation_kind, 

218 input_value=primary_input["value"], 

219 input_unit=primary_input["unit"], 

220 canonical_value=primary_input["value"], 

221 canonical_unit=primary_input["unit"], 

222 amount=amount, 

223 output_unit=curve.output_unit, 

224 raw_inputs={ 

225 key: { 

226 "input_value": value["raw_value"], 

227 "input_unit": value["raw_unit"], 

228 "coerced_input_value": str(value["value"]), 

229 "coerced_input_unit": value["unit"], 

230 } 

231 for key, value in normalized_inputs.items() 

232 } | { 

233 "apply_installation_factor": apply_installation_factor, 

234 }, 

235 normalized_inputs={ 

236 key: { 

237 "value": str(value["value"]), 

238 "unit": value["unit"], 

239 "role": value["role"], 

240 "variable_symbol": value["variable_symbol"], 

241 } 

242 for key, value in normalized_inputs.items() 

243 }, 

244 selected_variant=selected_variant, 

245 selector_diagnostics=selector_diagnostics, 

246 formula_audit=formula.audit_payload( 

247 FormulaEvaluation( 

248 value=amount, 

249 bindings=bindings, 

250 conversion_diagnostics=conversion_diagnostics, 

251 ) 

252 ), 

253 warnings=tuple(warnings), 

254 source_metadata=_source_metadata(curve), 

255 ) 

256 

257 

258def cost_curve_units_compatible(provided_unit: str | None, curve_unit: str | None) -> bool: 

259 """Return whether a driver unit can be converted to a curve input unit.""" 

260 provided_unit = normalize_economics_unit_notation(provided_unit) 

261 curve_unit = normalize_economics_unit_notation(curve_unit) 

262 if not provided_unit or not curve_unit or provided_unit == curve_unit: 

263 return True 

264 try: 

265 return can_convert(provided_unit, curve_unit) 

266 except (PintError, ValueError, AttributeError): 

267 return False 

268 

269 

270def normalize_economics_unit_notation(unit: str | None) -> str: 

271 """Return the stored economics unit text after boundary whitespace cleanup.""" 

272 if unit is None: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true

273 return "" 

274 return unit.strip() 

275 

276 

277def _build_curve_formula(curve: CostCurve): 

278 try: 

279 from Economics.formulas.builders.capital import build_cost_curve_formula 

280 

281 return build_cost_curve_formula(curve) 

282 except FormulaError as exc: 

283 raise CostCurveEvaluationError( 

284 exc.code, 

285 exc.message, 

286 context={**exc.context, "curve_key": curve.curve_key}, 

287 ) from exc 

288 

289 

290def _append_spec_range_warnings( 

291 *, 

292 curve: CostCurve, 

293 spec: CostCurveDriverSpec, 

294 input_value: Decimal, 

295 warnings: list[CostCurveWarning], 

296) -> None: 

297 valid_min = _optional_decimal(spec.valid_min) 

298 valid_max = _optional_decimal(spec.valid_max) 

299 if valid_min is not None and input_value < valid_min: 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true

300 warnings.append( 

301 CostCurveWarning( 

302 code="input_below_valid_range", 

303 severity="warning", 

304 message="Cost driver value is below the input's stated valid range.", 

305 context=_range_context(spec), 

306 ) 

307 ) 

308 if valid_max is not None and input_value > valid_max: 

309 warnings.append( 

310 CostCurveWarning( 

311 code="input_above_valid_range", 

312 severity="warning", 

313 message="Cost driver value is above the input's stated valid range.", 

314 context=_range_context(spec), 

315 ) 

316 ) 

317 

318def _append_cost_basis_warnings( 

319 *, 

320 curve: CostCurve, 

321 apply_installation_factor: bool, 

322 warnings: list[CostCurveWarning], 

323) -> None: 

324 if curve.cost_basis == CostBasis.INSTALLED: 

325 warnings.append( 

326 CostCurveWarning( 

327 code="installed_cost_basis", 

328 severity="warning", 

329 message="Cost curve already represents installed cost.", 

330 context={"cost_basis": curve.cost_basis}, 

331 ) 

332 ) 

333 if apply_installation_factor: 

334 warnings.append( 

335 CostCurveWarning( 

336 code="installed_cost_factor_double_counting", 

337 severity="error", 

338 message="Do not apply a default Lang factor to an installed-cost curve.", 

339 context={"cost_basis": curve.cost_basis}, 

340 ) 

341 ) 

342 

343 

344def _source_metadata(curve: CostCurve) -> dict[str, Any]: 

345 return { 

346 "basis_date": curve.basis_date.isoformat() if curve.basis_date else None, 

347 "basis_index_name": curve.basis_index_name, 

348 "basis_index_value": _decimal_to_string(curve.basis_index_value), 

349 "currency": curve.currency, 

350 "source_document_title": curve.source_document_title, 

351 "source_page": curve.source_page, 

352 "source_figure": curve.source_figure, 

353 "source_data_origin": curve.source_data_origin, 

354 "source_range_precision": curve.source_range_precision, 

355 "source_license_status": curve.source_license_status, 

356 "source_reference": curve.source_reference, 

357 "source_note": curve.source_note, 

358 } 

359 

360 

361def _coerce_curve_input_value( 

362 *, 

363 value: Decimal, 

364 provided_unit: str, 

365 target_unit: str, 

366 curve_key: str, 

367 input_key: str, 

368) -> Decimal: 

369 if provided_unit == target_unit: 

370 return value 

371 if not cost_curve_units_compatible(provided_unit, target_unit): 

372 raise CostCurveEvaluationError( 

373 "invalid_input_unit", 

374 "Cost curve input unit is not compatible with the declared driver spec unit.", 

375 context={ 

376 "curve_key": curve_key, 

377 "input_key": input_key, 

378 "provided_unit": provided_unit, 

379 "expected_unit": target_unit, 

380 }, 

381 ) 

382 try: 

383 return Decimal(str(convert_value(value, from_unit=provided_unit, to_unit=target_unit))) 

384 except (ValueError, DecimalException, PintError) as exc: 

385 raise CostCurveEvaluationError( 

386 "invalid_input_unit", 

387 "Cost curve input unit could not be converted to the declared driver spec unit.", 

388 context={ 

389 "curve_key": curve_key, 

390 "input_key": input_key, 

391 "provided_unit": provided_unit, 

392 "expected_unit": target_unit, 

393 "error": str(exc), 

394 }, 

395 ) from exc 

396 

397 

398def _range_context(spec: CostCurveDriverSpec) -> dict[str, Any]: 

399 return { 

400 "input_key": spec.key, 

401 "valid_min": spec.valid_min or None, 

402 "valid_max": spec.valid_max or None, 

403 "input_unit": spec.unit, 

404 "valid_range_note": spec.valid_range_note, 

405 } 

406 

407 

408def _to_decimal(value: Decimal | int | float | str, *, code: str) -> Decimal: 

409 try: 

410 decimal_value = Decimal(str(value)) 

411 except (InvalidOperation, TypeError, ValueError) as exc: 

412 raise CostCurveEvaluationError( 

413 code, 

414 "Cost curve evaluator received a non-numeric value.", 

415 context={"value": str(value)}, 

416 ) from exc 

417 if not decimal_value.is_finite(): 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true

418 raise CostCurveEvaluationError( 

419 code, 

420 "Cost curve evaluator received a non-finite numeric value.", 

421 context={"value": str(value)}, 

422 ) 

423 return decimal_value 

424 

425 

426def _ensure_finite_decimal( 

427 value: Decimal, 

428 *, 

429 code: str, 

430 message: str, 

431 context: dict[str, Any], 

432) -> Decimal: 

433 if not value.is_finite(): 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true

434 raise CostCurveEvaluationError(code, message, context={**context, "value": str(value)}) 

435 return value 

436 

437 

438def _decimal_to_string(value: Decimal | None) -> str | None: 

439 return None if value is None else str(value) 

440 

441 

442def _curve_required_driver_specs(curve: CostCurve) -> tuple[CostCurveDriverSpec, ...]: 

443 """Parse the curve's declared input contract into typed driver specs.""" 

444 from Economics.costing.cost_curves.driver_specs import parse_required_driver_specs 

445 

446 try: 

447 return parse_required_driver_specs(curve.required_driver_specs) 

448 except ValueError as exc: 

449 raise CostCurveEvaluationError( 

450 "invalid_cost_curve_driver_specs", 

451 str(exc), 

452 context={"curve_key": curve.curve_key}, 

453 ) from exc 

454 

455 

456def _curve_discrete_variants(curve: CostCurve) -> tuple[CostCurveDiscreteVariant, ...]: 

457 """Parse and validate the catalog-backed variant rows for a discrete curve.""" 

458 from Economics.costing.cost_curves.driver_specs import parse_discrete_variants 

459 

460 try: 

461 variants = parse_discrete_variants(curve.discrete_variants) 

462 except ValueError as exc: 

463 raise CostCurveEvaluationError( 

464 "invalid_discrete_variants", 

465 str(exc), 

466 context={"curve_key": curve.curve_key}, 

467 ) from exc 

468 if not variants: 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true

469 raise CostCurveEvaluationError( 

470 "missing_discrete_variants", 

471 "Discrete-family curves require at least one variant.", 

472 context={"curve_key": curve.curve_key}, 

473 ) 

474 return variants 

475 

476 

477def _normalize_inputs( 

478 *, 

479 curve: CostCurve, 

480 specs: Sequence[CostCurveDriverSpec], 

481 inputs_by_key: Mapping[str, Mapping[str, Any]], 

482) -> dict[str, NormalizedCurveInput]: 

483 """Normalize raw driver inputs to spec units and reject unknown keys.""" 

484 spec_keys = {spec.key for spec in specs} 

485 unknown_keys = sorted(set(inputs_by_key) - spec_keys) 

486 if unknown_keys: 486 ↛ 487line 486 didn't jump to line 487 because the condition on line 486 was never true

487 raise CostCurveEvaluationError( 

488 "unknown_cost_curve_inputs", 

489 "Cost curve evaluation received inputs that are not declared by the selected curve.", 

490 context={"curve_key": curve.curve_key, "input_keys": unknown_keys}, 

491 ) 

492 

493 normalized: dict[str, NormalizedCurveInput] = {} 

494 for spec in specs: 

495 raw_input = inputs_by_key.get(spec.key) 

496 if raw_input is None: 496 ↛ 497line 496 didn't jump to line 497 because the condition on line 496 was never true

497 if spec.required: 

498 raise CostCurveEvaluationError( 

499 "missing_cost_curve_input", 

500 "Cost curve evaluation is missing a required input.", 

501 context={"curve_key": curve.curve_key, "input_key": spec.key}, 

502 ) 

503 continue 

504 raw_value = raw_input.get("value") 

505 raw_unit = normalize_economics_unit_notation(str(raw_input.get("unit") or "")) 

506 if raw_value in (None, ""): 506 ↛ 507line 506 didn't jump to line 507 because the condition on line 506 was never true

507 raise CostCurveEvaluationError( 

508 "invalid_input_value", 

509 "Cost curve evaluator received a blank input value.", 

510 context={"curve_key": curve.curve_key, "input_key": spec.key}, 

511 ) 

512 if not raw_unit: 512 ↛ 513line 512 didn't jump to line 513 because the condition on line 512 was never true

513 raise CostCurveEvaluationError( 

514 "invalid_input_unit", 

515 "Cost curve evaluator received a blank input unit.", 

516 context={"curve_key": curve.curve_key, "input_key": spec.key}, 

517 ) 

518 input_value = _to_decimal(raw_value, code="invalid_input_value") 

519 target_unit = normalize_economics_unit_notation(spec.unit) 

520 normalized_value = _coerce_curve_input_value( 

521 value=input_value, 

522 provided_unit=raw_unit, 

523 target_unit=target_unit, 

524 curve_key=curve.curve_key, 

525 input_key=spec.key, 

526 ) 

527 normalized[spec.key] = { 

528 "value": normalized_value, 

529 "unit": target_unit, 

530 "raw_value": str(raw_value), 

531 "raw_unit": raw_unit, 

532 "role": spec.role, 

533 "variable_symbol": spec.variable_symbol, 

534 } 

535 return normalized 

536 

537 

538def _formula_bindings( 

539 *, 

540 formula_specs: Sequence[CostCurveDriverSpec], 

541 normalized_inputs: NormalizedCurveInputs, 

542) -> dict[str, Decimal]: 

543 """Bind declared expression symbols to normalized formula-input values.""" 

544 return { 

545 spec.variable_symbol: normalized_inputs[spec.key]["value"] 

546 for spec in formula_specs 

547 } 

548 

549 

550def _conversion_diagnostics( 

551 *, 

552 specs: Sequence[CostCurveDriverSpec], 

553 normalized_inputs: FullNormalizedCurveInputs, 

554) -> tuple[dict[str, str], ...]: 

555 """Return source-to-target unit conversion records for audit payloads.""" 

556 diagnostics = [] 

557 for spec in specs: 

558 normalized = normalized_inputs[spec.key] 

559 diagnostics.append( 

560 { 

561 "input": spec.key, 

562 "variable_symbol": spec.variable_symbol, 

563 "source_value": normalized["raw_value"], 

564 "source_unit": normalized["raw_unit"], 

565 "target_value": str(normalized["value"]), 

566 "target_unit": normalized["unit"], 

567 } 

568 ) 

569 return tuple(diagnostics) 

570 

571 

572def _evaluate_formula(*, curve: CostCurve, formula, bindings: Mapping[str, Decimal]) -> Decimal: 

573 """Evaluate a parsed formula and normalize parser failures to service errors.""" 

574 try: 

575 amount = formula.evaluate(bindings) 

576 except FormulaError as exc: 

577 raise CostCurveEvaluationError( 

578 exc.code, 

579 exc.message, 

580 context={**exc.context, "curve_key": curve.curve_key}, 

581 ) from exc 

582 if amount is None: 582 ↛ 583line 582 didn't jump to line 583 because the condition on line 582 was never true

583 raise CostCurveEvaluationError( 

584 "blocked_cost_curve_formula", 

585 formula.blocked_reason or "Cost curve formula is blocked.", 

586 context={"curve_key": curve.curve_key}, 

587 ) 

588 _ensure_finite_decimal( 

589 amount, 

590 code="non_finite_cost_output", 

591 message="Cost curve evaluated to a non-finite cost.", 

592 context={"curve_key": curve.curve_key, "output_unit": curve.output_unit}, 

593 ) 

594 return amount 

595 

596 

597def _evaluate_discrete_family( 

598 *, 

599 curve: CostCurve, 

600 formula_specs: Sequence[CostCurveDriverSpec], 

601 selector_specs: Sequence[CostCurveDriverSpec], 

602 normalized_inputs: FullNormalizedCurveInputs, 

603) -> DiscreteFamilyEvaluation: 

604 """Select a capacity-covering variant and evaluate that option. 

605 

606 Discrete selectors represent design-capacity thresholds. The selected 

607 variant is the smallest published capacity that covers the requested 

608 selector value, so the evaluator cannot under-size equipment by picking a 

609 cheaper lower-capacity variant. 

610 """ 

611 from Economics.formulas.builders.capital import build_cost_curve_variant_formula 

612 

613 variants = _curve_discrete_variants(curve) 

614 selector_keys = {spec.key for spec in selector_specs} 

615 validate_discrete_variant_selectors(curve=curve, variants=variants, selector_keys=selector_keys) 

616 variant = select_discrete_capacity_variant( 

617 curve=curve, 

618 variants=variants, 

619 selector_specs=selector_specs, 

620 normalized_inputs=normalized_inputs, 

621 ) 

622 bindings = _formula_bindings(formula_specs=formula_specs, normalized_inputs=normalized_inputs) 

623 formula = build_cost_curve_variant_formula(curve, variant, formula_specs=formula_specs) 

624 amount = _evaluate_formula(curve=curve, formula=formula, bindings=bindings) 

625 selector_diagnostics = [ 

626 _selector_diagnostic( 

627 curve=curve, 

628 spec=spec, 

629 variant=variant, 

630 variants=variants, 

631 normalized_inputs=normalized_inputs, 

632 ) 

633 for spec in selector_specs 

634 ] 

635 warnings = [ 

636 warning 

637 for diagnostic in selector_diagnostics 

638 for warning in _selector_adjustment_warnings(diagnostic) 

639 ] 

640 warnings.extend( 

641 _selected_variant_range_warnings( 

642 curve=curve, 

643 variant=variant, 

644 formula_specs=formula_specs, 

645 normalized_inputs=normalized_inputs, 

646 ) 

647 ) 

648 return { 

649 "amount": amount, 

650 "formula": formula, 

651 "bindings": bindings, 

652 "selected_variant": _selected_variant_payload(variant), 

653 "selector_diagnostics": selector_diagnostics, 

654 "warnings": warnings, 

655 } 

656 

657 

658def validate_discrete_variant_selectors( 

659 *, 

660 curve: CostCurve, 

661 variants: Sequence[CostCurveDiscreteVariant], 

662 selector_keys: set[str], 

663) -> None: 

664 """Validate that each variant declares exactly the curve's selector keys.""" 

665 for variant in variants: 

666 missing_selectors = sorted(selector_keys - set(variant.selector_values)) 

667 extra_selectors = sorted(set(variant.selector_values) - selector_keys) 

668 if missing_selectors or extra_selectors: 

669 raise CostCurveEvaluationError( 

670 "invalid_discrete_variant_selectors", 

671 "Discrete variant selector values must exactly match the curve's selector inputs.", 

672 context={ 

673 "curve_key": curve.curve_key, 

674 "variant_key": variant.key, 

675 "missing_selectors": missing_selectors, 

676 "extra_selectors": extra_selectors, 

677 }, 

678 ) 

679 

680 

681def select_discrete_capacity_variant( 

682 *, 

683 curve: CostCurve, 

684 variants: Sequence[CostCurveDiscreteVariant], 

685 selector_specs: Sequence[CostCurveDriverSpec], 

686 normalized_inputs: NormalizedCurveInputs, 

687) -> CostCurveDiscreteVariant: 

688 """Return the smallest discrete variant whose design capacity covers the inputs.""" 

689 if not selector_specs: 

690 raise CostCurveEvaluationError( 

691 "missing_discrete_selectors", 

692 "Discrete-family curves require at least one discrete selector input.", 

693 context={"curve_key": curve.curve_key}, 

694 ) 

695 if len(selector_specs) > 1: 

696 raise CostCurveEvaluationError( 

697 "unsupported_multi_selector_discrete_family", 

698 "Discrete-family curves currently support exactly one capacity selector.", 

699 context={ 

700 "curve_key": curve.curve_key, 

701 "selector_keys": [spec.key for spec in selector_specs], 

702 }, 

703 ) 

704 eligible: list[tuple[Decimal, str, CostCurveDiscreteVariant]] = [] 

705 for variant in variants: 

706 selector_value = _to_decimal( 

707 variant.selector_values[selector_specs[0].key], 

708 code="invalid_discrete_selector_value", 

709 ) 

710 if selector_value >= normalized_inputs[selector_specs[0].key]["value"]: 

711 eligible.append((selector_value, variant.key, variant)) 

712 if not eligible: 

713 raise CostCurveEvaluationError( 

714 "discrete_capacity_exceeded", 

715 "No discrete cost-curve variant can cover the selected design capacity.", 

716 context={ 

717 "curve_key": curve.curve_key, 

718 "selected_capacities": { 

719 spec.key: str(normalized_inputs[spec.key]["value"]) 

720 for spec in selector_specs 

721 }, 

722 "maximum_capacities": { 

723 spec.key: str( 

724 max( 

725 _to_decimal(variant.selector_values[spec.key], code="invalid_discrete_selector_value") 

726 for variant in variants 

727 ) 

728 ) 

729 for spec in selector_specs 

730 }, 

731 }, 

732 ) 

733 return min(eligible, key=lambda candidate: (candidate[0], candidate[1]))[2] 

734 

735 

736def _selected_variant_payload(variant: CostCurveDiscreteVariant) -> SelectedDiscreteVariantPayload: 

737 return { 

738 "key": variant.key, 

739 "label": variant.label, 

740 "selector_values": variant.selector_values, 

741 "expression_text": variant.expression_text, 

742 "valid_min": variant.valid_min, 

743 "valid_max": variant.valid_max, 

744 "valid_range_note": variant.valid_range_note, 

745 "source_reference": variant.source_reference, 

746 "notes": variant.notes, 

747 } 

748 

749 

750def _selector_diagnostic( 

751 *, 

752 curve: CostCurve, 

753 spec: CostCurveDriverSpec, 

754 variant: CostCurveDiscreteVariant, 

755 variants: Sequence[CostCurveDiscreteVariant], 

756 normalized_inputs: FullNormalizedCurveInputs, 

757) -> SelectorDiagnostic: 

758 """Describe whether an entered selector sits outside the family range.""" 

759 selected_value = _to_decimal(variant.selector_values[spec.key], code="invalid_discrete_selector_value") 

760 family_values = tuple( 

761 _to_decimal(candidate.selector_values[spec.key], code="invalid_discrete_selector_value") 

762 for candidate in variants 

763 ) 

764 selector_min = min(family_values) 

765 selector_max = max(family_values) 

766 entered_value = normalized_inputs[spec.key]["value"] 

767 range_direction = None 

768 range_bound = None 

769 absolute_difference = Decimal("0") 

770 percentage_difference = Decimal("0") 

771 if entered_value < selector_min: 771 ↛ 772line 771 didn't jump to line 772 because the condition on line 771 was never true

772 range_direction = "below" 

773 range_bound = selector_min 

774 absolute_difference = selector_min - entered_value 

775 elif entered_value > selector_max: 775 ↛ 776line 775 didn't jump to line 776 because the condition on line 775 was never true

776 range_direction = "above" 

777 range_bound = selector_max 

778 absolute_difference = entered_value - selector_max 

779 if range_bound is not None: 779 ↛ 780line 779 didn't jump to line 780 because the condition on line 779 was never true

780 denominator = abs(range_bound) if range_bound != 0 else abs(entered_value) 

781 if denominator != 0: 

782 percentage_difference = (absolute_difference / denominator) * Decimal("100") 

783 return { 

784 "curve_key": curve.curve_key, 

785 "input_key": spec.key, 

786 "role": spec.role, 

787 "label": spec.label, 

788 "entered_value": str(entered_value), 

789 "entered_unit": normalized_inputs[spec.key]["unit"], 

790 "selected_discrete_value": str(selected_value), 

791 "selector_min": str(selector_min), 

792 "selector_max": str(selector_max), 

793 "range_direction": range_direction, 

794 "range_bound": None if range_bound is None else str(range_bound), 

795 "absolute_difference": str(absolute_difference), 

796 "percentage_difference": str(percentage_difference), 

797 "soft_maximum_adjustment_percent": spec.soft_maximum_adjustment_percent or "10", 

798 } 

799 

800 

801def _selector_adjustment_warnings(diagnostic: SelectorDiagnostic) -> tuple[CostCurveWarning, ...]: 

802 """Warn when a selector input is significantly outside the family range.""" 

803 if diagnostic["range_direction"] is None: 803 ↛ 805line 803 didn't jump to line 805 because the condition on line 803 was always true

804 return () 

805 percentage = Decimal(diagnostic["percentage_difference"]) 

806 threshold = Decimal(diagnostic["soft_maximum_adjustment_percent"]) 

807 if percentage <= threshold: 

808 return () 

809 return ( 

810 CostCurveWarning( 

811 code="discrete_selector_adjustment", 

812 severity="warning", 

813 message="Cost curve selector input is outside the supported range.", 

814 context=diagnostic, 

815 ), 

816 ) 

817 

818 

819def _selected_variant_range_warnings( 

820 *, 

821 curve: CostCurve, 

822 variant: CostCurveDiscreteVariant, 

823 formula_specs: Sequence[CostCurveDriverSpec], 

824 normalized_inputs: FullNormalizedCurveInputs, 

825) -> list[CostCurveWarning]: 

826 """Warn when the winning variant is used outside its supported input range.""" 

827 primary_spec = next((spec for spec in formula_specs if spec.primary), None) 

828 if primary_spec is None: 828 ↛ 829line 828 didn't jump to line 829 because the condition on line 828 was never true

829 return [] 

830 input_value = normalized_inputs[primary_spec.key]["value"] 

831 valid_min = _optional_decimal(variant.valid_min) 

832 valid_max = _optional_decimal(variant.valid_max) 

833 context = { 

834 "curve_key": curve.curve_key, 

835 "variant_key": variant.key, 

836 "variant_label": variant.label, 

837 "input_key": primary_spec.key, 

838 "input_label": primary_spec.label, 

839 "input_value": str(input_value), 

840 "input_unit": normalized_inputs[primary_spec.key]["unit"], 

841 "valid_min": variant.valid_min or None, 

842 "valid_max": variant.valid_max or None, 

843 "valid_range_note": variant.valid_range_note, 

844 } 

845 warnings = [] 

846 if valid_min is not None and input_value < valid_min: 846 ↛ 847line 846 didn't jump to line 847 because the condition on line 846 was never true

847 warnings.append( 

848 CostCurveWarning( 

849 code="selected_variant_input_below_valid_range", 

850 severity="warning", 

851 message="Selected curve option is below its stated input range.", 

852 context=context, 

853 ) 

854 ) 

855 if valid_max is not None and input_value > valid_max: 855 ↛ 856line 855 didn't jump to line 856 because the condition on line 855 was never true

856 warnings.append( 

857 CostCurveWarning( 

858 code="selected_variant_input_above_valid_range", 

859 severity="warning", 

860 message="Selected curve option is above its stated input range.", 

861 context=context, 

862 ) 

863 ) 

864 return warnings 

865 

866 

867def _optional_decimal(value: Any) -> Decimal | None: 

868 """Parse optional decimal metadata while preserving blank values as unset.""" 

869 if value in (None, ""): 

870 return None 

871 return _to_decimal(value, code="invalid_decimal")