Coverage for backend/django/Economics/costing/cost_curves/evaluation.py: 83%
321 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1"""Evaluate Economics cost curves through the declared driver-spec contract."""
3from __future__ import annotations
5from decimal import Decimal, DecimalException, InvalidOperation
6from typing import TYPE_CHECKING, Any, Mapping, Sequence, TypeAlias, TypedDict
8from Economics.shared.choices import CostBasis, CostCurveEvaluationKind
10from Economics.costing.models import CostCurve
11from Economics.formulas.engine.core import EconomicsFormula, FormulaError, FormulaEvaluation
12from idaes_factory.unit_conversion.unit_conversion import can_convert, convert_value
13from pint.errors import PintError
14from pydantic import BaseModel, ConfigDict
16if TYPE_CHECKING:
17 from Economics.costing.cost_curves.driver_specs import CostCurveDiscreteVariant, CostCurveDriverSpec
19class NormalizedCurveInputValue(TypedDict):
20 value: Decimal
23class NormalizedCurveInput(NormalizedCurveInputValue):
24 unit: str
25 raw_value: str
26 raw_unit: str
27 role: str
28 variable_symbol: str
31class SerializedNormalizedCurveInput(TypedDict):
32 value: str
33 unit: str
34 role: str
35 variable_symbol: str
38NormalizedCurveInputs: TypeAlias = Mapping[str, NormalizedCurveInputValue]
39FullNormalizedCurveInputs: TypeAlias = Mapping[str, NormalizedCurveInput]
42class SelectedDiscreteVariantPayload(TypedDict):
43 key: str
44 label: str
45 selector_values: dict[str, str]
46 expression_text: str
47 valid_min: str
48 valid_max: str
49 valid_range_note: str
50 source_reference: str
51 notes: str
54class SelectorDiagnostic(TypedDict):
55 curve_key: str
56 input_key: str
57 role: str
58 label: str
59 entered_value: str
60 entered_unit: str
61 selected_discrete_value: str
62 selector_min: str
63 selector_max: str
64 range_direction: str | None
65 range_bound: str | None
66 absolute_difference: str
67 percentage_difference: str
68 soft_maximum_adjustment_percent: str
71class DiscreteFamilyEvaluation(TypedDict):
72 amount: Decimal
73 formula: EconomicsFormula
74 bindings: dict[str, Decimal]
75 selected_variant: SelectedDiscreteVariantPayload
76 selector_diagnostics: list[SelectorDiagnostic]
77 warnings: list[CostCurveWarning]
80class EconomicsContract(BaseModel):
81 model_config = ConfigDict(frozen=True)
84class CostCurveWarning(EconomicsContract):
85 code: str
86 severity: str
87 message: str
88 context: dict[str, Any]
91class CostCurveEvaluationResult(EconomicsContract):
92 curve_id: int | None
93 curve_key: str
94 cost_basis: str
95 evaluation_kind: str
96 input_value: Decimal | None
97 input_unit: str
98 canonical_value: Decimal | None
99 canonical_unit: str
100 amount: Decimal
101 output_unit: str
102 raw_inputs: dict[str, Any]
103 normalized_inputs: dict[str, SerializedNormalizedCurveInput]
104 selected_variant: SelectedDiscreteVariantPayload | None
105 selector_diagnostics: tuple[SelectorDiagnostic, ...]
106 formula_audit: dict[str, Any]
107 warnings: tuple[CostCurveWarning, ...]
108 source_metadata: dict[str, Any]
110 def warnings_payload(self) -> list[dict[str, Any]]:
111 """Return JSON-ready warning records for result rows, UI, and exports."""
112 return [warning.model_dump(mode="json") for warning in self.warnings]
115class CostCurveEvaluationError(ValueError):
116 def __init__(self, code: str, message: str, *, context: dict[str, Any] | None = None):
117 super().__init__(message)
118 self.code = code
119 self.message = message
120 self.context = context or {}
123def evaluate_cost_curve(
124 curve: CostCurve,
125 *,
126 inputs_by_key: Mapping[str, Mapping[str, Any]],
127 apply_installation_factor: bool = False,
128) -> CostCurveEvaluationResult:
129 """Evaluate a curve from explicit, per-spec driver inputs.
131 The evaluator intentionally has no single-variable fallback. Every caller
132 must resolve values into ``inputs_by_key`` using the selected curve's
133 ``required_driver_specs`` so expression and discrete-family curves share one
134 runtime contract.
135 """
136 specs = _curve_required_driver_specs(curve)
137 formula_specs = tuple(spec for spec in specs if spec.role == "formula_input")
138 selector_specs = tuple(spec for spec in specs if spec.role == "discrete_selector")
139 warnings: list[CostCurveWarning] = []
140 normalized_inputs = _normalize_inputs(curve=curve, specs=specs, inputs_by_key=inputs_by_key)
142 for spec in specs:
143 _append_spec_range_warnings(
144 curve=curve,
145 spec=spec,
146 input_value=normalized_inputs[spec.key]["value"],
147 warnings=warnings,
148 )
149 _append_cost_basis_warnings(
150 curve=curve,
151 apply_installation_factor=apply_installation_factor,
152 warnings=warnings,
153 )
155 if curve.evaluation_kind == CostCurveEvaluationKind.EXPRESSION:
156 formula = _build_curve_formula(curve)
157 bindings = _formula_bindings(formula_specs=formula_specs, normalized_inputs=normalized_inputs)
158 selected_variant = None
159 selector_diagnostics: tuple[SelectorDiagnostic, ...] = ()
160 conversion_diagnostics = _conversion_diagnostics(specs=specs, normalized_inputs=normalized_inputs)
161 amount = _evaluate_formula(curve=curve, formula=formula, bindings=bindings)
162 elif curve.evaluation_kind == CostCurveEvaluationKind.DISCRETE_FAMILY: 162 ↛ 183line 162 didn't jump to line 183 because the condition on line 162 was always true
163 if not selector_specs: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true
164 raise CostCurveEvaluationError(
165 "missing_discrete_selectors",
166 "Discrete-family curves require at least one discrete selector input.",
167 context={"curve_key": curve.curve_key},
168 )
169 selected = _evaluate_discrete_family(
170 curve=curve,
171 formula_specs=formula_specs,
172 selector_specs=selector_specs,
173 normalized_inputs=normalized_inputs,
174 )
175 formula = selected["formula"]
176 bindings = selected["bindings"]
177 amount = selected["amount"]
178 selected_variant = selected["selected_variant"]
179 selector_diagnostics = tuple(selected["selector_diagnostics"])
180 conversion_diagnostics = _conversion_diagnostics(specs=specs, normalized_inputs=normalized_inputs)
181 warnings.extend(selected["warnings"])
182 else:
183 raise CostCurveEvaluationError(
184 "unsupported_cost_curve_evaluation_kind",
185 "Cost curve evaluation_kind is not supported.",
186 context={"curve_key": curve.curve_key, "evaluation_kind": curve.evaluation_kind},
187 )
189 if amount is None: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true
190 raise CostCurveEvaluationError(
191 "blocked_cost_curve_formula",
192 formula.blocked_reason or "Cost curve formula is blocked.",
193 context={"curve_key": curve.curve_key},
194 )
195 _ensure_finite_decimal(
196 amount,
197 code="non_finite_cost_output",
198 message="Cost curve evaluated to a non-finite cost.",
199 context={"curve_key": curve.curve_key, "output_unit": curve.output_unit},
200 )
201 if amount < 0:
202 warnings.append(
203 CostCurveWarning(
204 code="negative_cost_output",
205 severity="error",
206 message="Cost curve evaluated to a negative cost.",
207 context={"amount": str(amount), "output_unit": curve.output_unit},
208 )
209 )
211 primary_spec = next((spec for spec in formula_specs if spec.primary), formula_specs[0])
212 primary_input = normalized_inputs[primary_spec.key]
213 return CostCurveEvaluationResult(
214 curve_id=curve.pk,
215 curve_key=curve.curve_key,
216 cost_basis=curve.cost_basis,
217 evaluation_kind=curve.evaluation_kind,
218 input_value=primary_input["value"],
219 input_unit=primary_input["unit"],
220 canonical_value=primary_input["value"],
221 canonical_unit=primary_input["unit"],
222 amount=amount,
223 output_unit=curve.output_unit,
224 raw_inputs={
225 key: {
226 "input_value": value["raw_value"],
227 "input_unit": value["raw_unit"],
228 "coerced_input_value": str(value["value"]),
229 "coerced_input_unit": value["unit"],
230 }
231 for key, value in normalized_inputs.items()
232 } | {
233 "apply_installation_factor": apply_installation_factor,
234 },
235 normalized_inputs={
236 key: {
237 "value": str(value["value"]),
238 "unit": value["unit"],
239 "role": value["role"],
240 "variable_symbol": value["variable_symbol"],
241 }
242 for key, value in normalized_inputs.items()
243 },
244 selected_variant=selected_variant,
245 selector_diagnostics=selector_diagnostics,
246 formula_audit=formula.audit_payload(
247 FormulaEvaluation(
248 value=amount,
249 bindings=bindings,
250 conversion_diagnostics=conversion_diagnostics,
251 )
252 ),
253 warnings=tuple(warnings),
254 source_metadata=_source_metadata(curve),
255 )
258def cost_curve_units_compatible(provided_unit: str | None, curve_unit: str | None) -> bool:
259 """Return whether a driver unit can be converted to a curve input unit."""
260 provided_unit = normalize_economics_unit_notation(provided_unit)
261 curve_unit = normalize_economics_unit_notation(curve_unit)
262 if not provided_unit or not curve_unit or provided_unit == curve_unit:
263 return True
264 try:
265 return can_convert(provided_unit, curve_unit)
266 except (PintError, ValueError, AttributeError):
267 return False
270def normalize_economics_unit_notation(unit: str | None) -> str:
271 """Return the stored economics unit text after boundary whitespace cleanup."""
272 if unit is None: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true
273 return ""
274 return unit.strip()
277def _build_curve_formula(curve: CostCurve):
278 try:
279 from Economics.formulas.builders.capital import build_cost_curve_formula
281 return build_cost_curve_formula(curve)
282 except FormulaError as exc:
283 raise CostCurveEvaluationError(
284 exc.code,
285 exc.message,
286 context={**exc.context, "curve_key": curve.curve_key},
287 ) from exc
290def _append_spec_range_warnings(
291 *,
292 curve: CostCurve,
293 spec: CostCurveDriverSpec,
294 input_value: Decimal,
295 warnings: list[CostCurveWarning],
296) -> None:
297 valid_min = _optional_decimal(spec.valid_min)
298 valid_max = _optional_decimal(spec.valid_max)
299 if valid_min is not None and input_value < valid_min: 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true
300 warnings.append(
301 CostCurveWarning(
302 code="input_below_valid_range",
303 severity="warning",
304 message="Cost driver value is below the input's stated valid range.",
305 context=_range_context(spec),
306 )
307 )
308 if valid_max is not None and input_value > valid_max:
309 warnings.append(
310 CostCurveWarning(
311 code="input_above_valid_range",
312 severity="warning",
313 message="Cost driver value is above the input's stated valid range.",
314 context=_range_context(spec),
315 )
316 )
318def _append_cost_basis_warnings(
319 *,
320 curve: CostCurve,
321 apply_installation_factor: bool,
322 warnings: list[CostCurveWarning],
323) -> None:
324 if curve.cost_basis == CostBasis.INSTALLED:
325 warnings.append(
326 CostCurveWarning(
327 code="installed_cost_basis",
328 severity="warning",
329 message="Cost curve already represents installed cost.",
330 context={"cost_basis": curve.cost_basis},
331 )
332 )
333 if apply_installation_factor:
334 warnings.append(
335 CostCurveWarning(
336 code="installed_cost_factor_double_counting",
337 severity="error",
338 message="Do not apply a default Lang factor to an installed-cost curve.",
339 context={"cost_basis": curve.cost_basis},
340 )
341 )
344def _source_metadata(curve: CostCurve) -> dict[str, Any]:
345 return {
346 "basis_date": curve.basis_date.isoformat() if curve.basis_date else None,
347 "basis_index_name": curve.basis_index_name,
348 "basis_index_value": _decimal_to_string(curve.basis_index_value),
349 "currency": curve.currency,
350 "source_document_title": curve.source_document_title,
351 "source_page": curve.source_page,
352 "source_figure": curve.source_figure,
353 "source_data_origin": curve.source_data_origin,
354 "source_range_precision": curve.source_range_precision,
355 "source_license_status": curve.source_license_status,
356 "source_reference": curve.source_reference,
357 "source_note": curve.source_note,
358 }
361def _coerce_curve_input_value(
362 *,
363 value: Decimal,
364 provided_unit: str,
365 target_unit: str,
366 curve_key: str,
367 input_key: str,
368) -> Decimal:
369 if provided_unit == target_unit:
370 return value
371 if not cost_curve_units_compatible(provided_unit, target_unit):
372 raise CostCurveEvaluationError(
373 "invalid_input_unit",
374 "Cost curve input unit is not compatible with the declared driver spec unit.",
375 context={
376 "curve_key": curve_key,
377 "input_key": input_key,
378 "provided_unit": provided_unit,
379 "expected_unit": target_unit,
380 },
381 )
382 try:
383 return Decimal(str(convert_value(value, from_unit=provided_unit, to_unit=target_unit)))
384 except (ValueError, DecimalException, PintError) as exc:
385 raise CostCurveEvaluationError(
386 "invalid_input_unit",
387 "Cost curve input unit could not be converted to the declared driver spec unit.",
388 context={
389 "curve_key": curve_key,
390 "input_key": input_key,
391 "provided_unit": provided_unit,
392 "expected_unit": target_unit,
393 "error": str(exc),
394 },
395 ) from exc
398def _range_context(spec: CostCurveDriverSpec) -> dict[str, Any]:
399 return {
400 "input_key": spec.key,
401 "valid_min": spec.valid_min or None,
402 "valid_max": spec.valid_max or None,
403 "input_unit": spec.unit,
404 "valid_range_note": spec.valid_range_note,
405 }
408def _to_decimal(value: Decimal | int | float | str, *, code: str) -> Decimal:
409 try:
410 decimal_value = Decimal(str(value))
411 except (InvalidOperation, TypeError, ValueError) as exc:
412 raise CostCurveEvaluationError(
413 code,
414 "Cost curve evaluator received a non-numeric value.",
415 context={"value": str(value)},
416 ) from exc
417 if not decimal_value.is_finite(): 417 ↛ 418line 417 didn't jump to line 418 because the condition on line 417 was never true
418 raise CostCurveEvaluationError(
419 code,
420 "Cost curve evaluator received a non-finite numeric value.",
421 context={"value": str(value)},
422 )
423 return decimal_value
426def _ensure_finite_decimal(
427 value: Decimal,
428 *,
429 code: str,
430 message: str,
431 context: dict[str, Any],
432) -> Decimal:
433 if not value.is_finite(): 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true
434 raise CostCurveEvaluationError(code, message, context={**context, "value": str(value)})
435 return value
438def _decimal_to_string(value: Decimal | None) -> str | None:
439 return None if value is None else str(value)
442def _curve_required_driver_specs(curve: CostCurve) -> tuple[CostCurveDriverSpec, ...]:
443 """Parse the curve's declared input contract into typed driver specs."""
444 from Economics.costing.cost_curves.driver_specs import parse_required_driver_specs
446 try:
447 return parse_required_driver_specs(curve.required_driver_specs)
448 except ValueError as exc:
449 raise CostCurveEvaluationError(
450 "invalid_cost_curve_driver_specs",
451 str(exc),
452 context={"curve_key": curve.curve_key},
453 ) from exc
456def _curve_discrete_variants(curve: CostCurve) -> tuple[CostCurveDiscreteVariant, ...]:
457 """Parse and validate the catalog-backed variant rows for a discrete curve."""
458 from Economics.costing.cost_curves.driver_specs import parse_discrete_variants
460 try:
461 variants = parse_discrete_variants(curve.discrete_variants)
462 except ValueError as exc:
463 raise CostCurveEvaluationError(
464 "invalid_discrete_variants",
465 str(exc),
466 context={"curve_key": curve.curve_key},
467 ) from exc
468 if not variants: 468 ↛ 469line 468 didn't jump to line 469 because the condition on line 468 was never true
469 raise CostCurveEvaluationError(
470 "missing_discrete_variants",
471 "Discrete-family curves require at least one variant.",
472 context={"curve_key": curve.curve_key},
473 )
474 return variants
477def _normalize_inputs(
478 *,
479 curve: CostCurve,
480 specs: Sequence[CostCurveDriverSpec],
481 inputs_by_key: Mapping[str, Mapping[str, Any]],
482) -> dict[str, NormalizedCurveInput]:
483 """Normalize raw driver inputs to spec units and reject unknown keys."""
484 spec_keys = {spec.key for spec in specs}
485 unknown_keys = sorted(set(inputs_by_key) - spec_keys)
486 if unknown_keys: 486 ↛ 487line 486 didn't jump to line 487 because the condition on line 486 was never true
487 raise CostCurveEvaluationError(
488 "unknown_cost_curve_inputs",
489 "Cost curve evaluation received inputs that are not declared by the selected curve.",
490 context={"curve_key": curve.curve_key, "input_keys": unknown_keys},
491 )
493 normalized: dict[str, NormalizedCurveInput] = {}
494 for spec in specs:
495 raw_input = inputs_by_key.get(spec.key)
496 if raw_input is None: 496 ↛ 497line 496 didn't jump to line 497 because the condition on line 496 was never true
497 if spec.required:
498 raise CostCurveEvaluationError(
499 "missing_cost_curve_input",
500 "Cost curve evaluation is missing a required input.",
501 context={"curve_key": curve.curve_key, "input_key": spec.key},
502 )
503 continue
504 raw_value = raw_input.get("value")
505 raw_unit = normalize_economics_unit_notation(str(raw_input.get("unit") or ""))
506 if raw_value in (None, ""): 506 ↛ 507line 506 didn't jump to line 507 because the condition on line 506 was never true
507 raise CostCurveEvaluationError(
508 "invalid_input_value",
509 "Cost curve evaluator received a blank input value.",
510 context={"curve_key": curve.curve_key, "input_key": spec.key},
511 )
512 if not raw_unit: 512 ↛ 513line 512 didn't jump to line 513 because the condition on line 512 was never true
513 raise CostCurveEvaluationError(
514 "invalid_input_unit",
515 "Cost curve evaluator received a blank input unit.",
516 context={"curve_key": curve.curve_key, "input_key": spec.key},
517 )
518 input_value = _to_decimal(raw_value, code="invalid_input_value")
519 target_unit = normalize_economics_unit_notation(spec.unit)
520 normalized_value = _coerce_curve_input_value(
521 value=input_value,
522 provided_unit=raw_unit,
523 target_unit=target_unit,
524 curve_key=curve.curve_key,
525 input_key=spec.key,
526 )
527 normalized[spec.key] = {
528 "value": normalized_value,
529 "unit": target_unit,
530 "raw_value": str(raw_value),
531 "raw_unit": raw_unit,
532 "role": spec.role,
533 "variable_symbol": spec.variable_symbol,
534 }
535 return normalized
538def _formula_bindings(
539 *,
540 formula_specs: Sequence[CostCurveDriverSpec],
541 normalized_inputs: NormalizedCurveInputs,
542) -> dict[str, Decimal]:
543 """Bind declared expression symbols to normalized formula-input values."""
544 return {
545 spec.variable_symbol: normalized_inputs[spec.key]["value"]
546 for spec in formula_specs
547 }
550def _conversion_diagnostics(
551 *,
552 specs: Sequence[CostCurveDriverSpec],
553 normalized_inputs: FullNormalizedCurveInputs,
554) -> tuple[dict[str, str], ...]:
555 """Return source-to-target unit conversion records for audit payloads."""
556 diagnostics = []
557 for spec in specs:
558 normalized = normalized_inputs[spec.key]
559 diagnostics.append(
560 {
561 "input": spec.key,
562 "variable_symbol": spec.variable_symbol,
563 "source_value": normalized["raw_value"],
564 "source_unit": normalized["raw_unit"],
565 "target_value": str(normalized["value"]),
566 "target_unit": normalized["unit"],
567 }
568 )
569 return tuple(diagnostics)
572def _evaluate_formula(*, curve: CostCurve, formula, bindings: Mapping[str, Decimal]) -> Decimal:
573 """Evaluate a parsed formula and normalize parser failures to service errors."""
574 try:
575 amount = formula.evaluate(bindings)
576 except FormulaError as exc:
577 raise CostCurveEvaluationError(
578 exc.code,
579 exc.message,
580 context={**exc.context, "curve_key": curve.curve_key},
581 ) from exc
582 if amount is None: 582 ↛ 583line 582 didn't jump to line 583 because the condition on line 582 was never true
583 raise CostCurveEvaluationError(
584 "blocked_cost_curve_formula",
585 formula.blocked_reason or "Cost curve formula is blocked.",
586 context={"curve_key": curve.curve_key},
587 )
588 _ensure_finite_decimal(
589 amount,
590 code="non_finite_cost_output",
591 message="Cost curve evaluated to a non-finite cost.",
592 context={"curve_key": curve.curve_key, "output_unit": curve.output_unit},
593 )
594 return amount
597def _evaluate_discrete_family(
598 *,
599 curve: CostCurve,
600 formula_specs: Sequence[CostCurveDriverSpec],
601 selector_specs: Sequence[CostCurveDriverSpec],
602 normalized_inputs: FullNormalizedCurveInputs,
603) -> DiscreteFamilyEvaluation:
604 """Select a capacity-covering variant and evaluate that option.
606 Discrete selectors represent design-capacity thresholds. The selected
607 variant is the smallest published capacity that covers the requested
608 selector value, so the evaluator cannot under-size equipment by picking a
609 cheaper lower-capacity variant.
610 """
611 from Economics.formulas.builders.capital import build_cost_curve_variant_formula
613 variants = _curve_discrete_variants(curve)
614 selector_keys = {spec.key for spec in selector_specs}
615 validate_discrete_variant_selectors(curve=curve, variants=variants, selector_keys=selector_keys)
616 variant = select_discrete_capacity_variant(
617 curve=curve,
618 variants=variants,
619 selector_specs=selector_specs,
620 normalized_inputs=normalized_inputs,
621 )
622 bindings = _formula_bindings(formula_specs=formula_specs, normalized_inputs=normalized_inputs)
623 formula = build_cost_curve_variant_formula(curve, variant, formula_specs=formula_specs)
624 amount = _evaluate_formula(curve=curve, formula=formula, bindings=bindings)
625 selector_diagnostics = [
626 _selector_diagnostic(
627 curve=curve,
628 spec=spec,
629 variant=variant,
630 variants=variants,
631 normalized_inputs=normalized_inputs,
632 )
633 for spec in selector_specs
634 ]
635 warnings = [
636 warning
637 for diagnostic in selector_diagnostics
638 for warning in _selector_adjustment_warnings(diagnostic)
639 ]
640 warnings.extend(
641 _selected_variant_range_warnings(
642 curve=curve,
643 variant=variant,
644 formula_specs=formula_specs,
645 normalized_inputs=normalized_inputs,
646 )
647 )
648 return {
649 "amount": amount,
650 "formula": formula,
651 "bindings": bindings,
652 "selected_variant": _selected_variant_payload(variant),
653 "selector_diagnostics": selector_diagnostics,
654 "warnings": warnings,
655 }
658def validate_discrete_variant_selectors(
659 *,
660 curve: CostCurve,
661 variants: Sequence[CostCurveDiscreteVariant],
662 selector_keys: set[str],
663) -> None:
664 """Validate that each variant declares exactly the curve's selector keys."""
665 for variant in variants:
666 missing_selectors = sorted(selector_keys - set(variant.selector_values))
667 extra_selectors = sorted(set(variant.selector_values) - selector_keys)
668 if missing_selectors or extra_selectors:
669 raise CostCurveEvaluationError(
670 "invalid_discrete_variant_selectors",
671 "Discrete variant selector values must exactly match the curve's selector inputs.",
672 context={
673 "curve_key": curve.curve_key,
674 "variant_key": variant.key,
675 "missing_selectors": missing_selectors,
676 "extra_selectors": extra_selectors,
677 },
678 )
681def select_discrete_capacity_variant(
682 *,
683 curve: CostCurve,
684 variants: Sequence[CostCurveDiscreteVariant],
685 selector_specs: Sequence[CostCurveDriverSpec],
686 normalized_inputs: NormalizedCurveInputs,
687) -> CostCurveDiscreteVariant:
688 """Return the smallest discrete variant whose design capacity covers the inputs."""
689 if not selector_specs:
690 raise CostCurveEvaluationError(
691 "missing_discrete_selectors",
692 "Discrete-family curves require at least one discrete selector input.",
693 context={"curve_key": curve.curve_key},
694 )
695 if len(selector_specs) > 1:
696 raise CostCurveEvaluationError(
697 "unsupported_multi_selector_discrete_family",
698 "Discrete-family curves currently support exactly one capacity selector.",
699 context={
700 "curve_key": curve.curve_key,
701 "selector_keys": [spec.key for spec in selector_specs],
702 },
703 )
704 eligible: list[tuple[Decimal, str, CostCurveDiscreteVariant]] = []
705 for variant in variants:
706 selector_value = _to_decimal(
707 variant.selector_values[selector_specs[0].key],
708 code="invalid_discrete_selector_value",
709 )
710 if selector_value >= normalized_inputs[selector_specs[0].key]["value"]:
711 eligible.append((selector_value, variant.key, variant))
712 if not eligible:
713 raise CostCurveEvaluationError(
714 "discrete_capacity_exceeded",
715 "No discrete cost-curve variant can cover the selected design capacity.",
716 context={
717 "curve_key": curve.curve_key,
718 "selected_capacities": {
719 spec.key: str(normalized_inputs[spec.key]["value"])
720 for spec in selector_specs
721 },
722 "maximum_capacities": {
723 spec.key: str(
724 max(
725 _to_decimal(variant.selector_values[spec.key], code="invalid_discrete_selector_value")
726 for variant in variants
727 )
728 )
729 for spec in selector_specs
730 },
731 },
732 )
733 return min(eligible, key=lambda candidate: (candidate[0], candidate[1]))[2]
736def _selected_variant_payload(variant: CostCurveDiscreteVariant) -> SelectedDiscreteVariantPayload:
737 return {
738 "key": variant.key,
739 "label": variant.label,
740 "selector_values": variant.selector_values,
741 "expression_text": variant.expression_text,
742 "valid_min": variant.valid_min,
743 "valid_max": variant.valid_max,
744 "valid_range_note": variant.valid_range_note,
745 "source_reference": variant.source_reference,
746 "notes": variant.notes,
747 }
750def _selector_diagnostic(
751 *,
752 curve: CostCurve,
753 spec: CostCurveDriverSpec,
754 variant: CostCurveDiscreteVariant,
755 variants: Sequence[CostCurveDiscreteVariant],
756 normalized_inputs: FullNormalizedCurveInputs,
757) -> SelectorDiagnostic:
758 """Describe whether an entered selector sits outside the family range."""
759 selected_value = _to_decimal(variant.selector_values[spec.key], code="invalid_discrete_selector_value")
760 family_values = tuple(
761 _to_decimal(candidate.selector_values[spec.key], code="invalid_discrete_selector_value")
762 for candidate in variants
763 )
764 selector_min = min(family_values)
765 selector_max = max(family_values)
766 entered_value = normalized_inputs[spec.key]["value"]
767 range_direction = None
768 range_bound = None
769 absolute_difference = Decimal("0")
770 percentage_difference = Decimal("0")
771 if entered_value < selector_min: 771 ↛ 772line 771 didn't jump to line 772 because the condition on line 771 was never true
772 range_direction = "below"
773 range_bound = selector_min
774 absolute_difference = selector_min - entered_value
775 elif entered_value > selector_max: 775 ↛ 776line 775 didn't jump to line 776 because the condition on line 775 was never true
776 range_direction = "above"
777 range_bound = selector_max
778 absolute_difference = entered_value - selector_max
779 if range_bound is not None: 779 ↛ 780line 779 didn't jump to line 780 because the condition on line 779 was never true
780 denominator = abs(range_bound) if range_bound != 0 else abs(entered_value)
781 if denominator != 0:
782 percentage_difference = (absolute_difference / denominator) * Decimal("100")
783 return {
784 "curve_key": curve.curve_key,
785 "input_key": spec.key,
786 "role": spec.role,
787 "label": spec.label,
788 "entered_value": str(entered_value),
789 "entered_unit": normalized_inputs[spec.key]["unit"],
790 "selected_discrete_value": str(selected_value),
791 "selector_min": str(selector_min),
792 "selector_max": str(selector_max),
793 "range_direction": range_direction,
794 "range_bound": None if range_bound is None else str(range_bound),
795 "absolute_difference": str(absolute_difference),
796 "percentage_difference": str(percentage_difference),
797 "soft_maximum_adjustment_percent": spec.soft_maximum_adjustment_percent or "10",
798 }
801def _selector_adjustment_warnings(diagnostic: SelectorDiagnostic) -> tuple[CostCurveWarning, ...]:
802 """Warn when a selector input is significantly outside the family range."""
803 if diagnostic["range_direction"] is None: 803 ↛ 805line 803 didn't jump to line 805 because the condition on line 803 was always true
804 return ()
805 percentage = Decimal(diagnostic["percentage_difference"])
806 threshold = Decimal(diagnostic["soft_maximum_adjustment_percent"])
807 if percentage <= threshold:
808 return ()
809 return (
810 CostCurveWarning(
811 code="discrete_selector_adjustment",
812 severity="warning",
813 message="Cost curve selector input is outside the supported range.",
814 context=diagnostic,
815 ),
816 )
819def _selected_variant_range_warnings(
820 *,
821 curve: CostCurve,
822 variant: CostCurveDiscreteVariant,
823 formula_specs: Sequence[CostCurveDriverSpec],
824 normalized_inputs: FullNormalizedCurveInputs,
825) -> list[CostCurveWarning]:
826 """Warn when the winning variant is used outside its supported input range."""
827 primary_spec = next((spec for spec in formula_specs if spec.primary), None)
828 if primary_spec is None: 828 ↛ 829line 828 didn't jump to line 829 because the condition on line 828 was never true
829 return []
830 input_value = normalized_inputs[primary_spec.key]["value"]
831 valid_min = _optional_decimal(variant.valid_min)
832 valid_max = _optional_decimal(variant.valid_max)
833 context = {
834 "curve_key": curve.curve_key,
835 "variant_key": variant.key,
836 "variant_label": variant.label,
837 "input_key": primary_spec.key,
838 "input_label": primary_spec.label,
839 "input_value": str(input_value),
840 "input_unit": normalized_inputs[primary_spec.key]["unit"],
841 "valid_min": variant.valid_min or None,
842 "valid_max": variant.valid_max or None,
843 "valid_range_note": variant.valid_range_note,
844 }
845 warnings = []
846 if valid_min is not None and input_value < valid_min: 846 ↛ 847line 846 didn't jump to line 847 because the condition on line 846 was never true
847 warnings.append(
848 CostCurveWarning(
849 code="selected_variant_input_below_valid_range",
850 severity="warning",
851 message="Selected curve option is below its stated input range.",
852 context=context,
853 )
854 )
855 if valid_max is not None and input_value > valid_max: 855 ↛ 856line 855 didn't jump to line 856 because the condition on line 855 was never true
856 warnings.append(
857 CostCurveWarning(
858 code="selected_variant_input_above_valid_range",
859 severity="warning",
860 message="Selected curve option is above its stated input range.",
861 context=context,
862 )
863 )
864 return warnings
867def _optional_decimal(value: Any) -> Decimal | None:
868 """Parse optional decimal metadata while preserving blank values as unset."""
869 if value in (None, ""):
870 return None
871 return _to_decimal(value, code="invalid_decimal")