Coverage for backend/django/Economics/costing/capital/generated_lines.py: 86%
278 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1"""Generated capital-line synchronization for capital costing.
3This module materializes generated ``CapitalCostLine`` rows before
4fingerprinting and result calculation. Rows are created as soon as an equipment
5mapping exists so work-capable units can expose editable peak demand even before
6the user has selected a cost curve.
7"""
9from __future__ import annotations
11from decimal import Decimal, DecimalException
12from typing import Any, TypedDict
14from core.auxiliary.enums import ConType
15from Economics.costing.models import CapitalCostLine, CostCurve, CostDriver, EquipmentMapping
17from Economics.shared.choices import CostBasis, CostDriverSource
19from Economics.studies.models import EconomicsStudy
20from Economics.costing.capital.capital_line_sources import GENERATED_CAPITAL_LINE_SOURCE
21from Economics.costing.cost_curves.driver_specs import (
22 CapitalCostDriverInput,
23 CapitalCostDriverInputsPayload,
24 CostCurveDriverSpec,
25 CostCurveDriverSpecPayload,
26 default_driver_inputs_payload,
27 driver_specs_payload,
28 normalize_capital_cost_driver_inputs,
29 parse_required_driver_specs,
30)
31from Economics.costing.cost_curves.evaluation import (
32 CostCurveEvaluationError,
33 cost_curve_units_compatible,
34 evaluate_cost_curve,
35 normalize_economics_unit_notation,
36)
37from Economics.costing.capital.electrical_upgrade import unit_work_peak_demand_kw
38from Economics.costing.cost_curves.driver_properties import validate_cost_driver_property
39from Economics.formulas.builders.capital import build_generated_capital_line_formula
40from Economics.formulas.engine.core import FormulaError
41from Economics.shared.payloads import json_ready, result_amount, warning_record
42from idaes_factory.unit_conversion.unit_conversion import convert_value
43from pint.errors import PintError
46class ResolvedDriverInput(TypedDict):
47 value: Decimal | float | int | str
48 unit: str
51def _sync_generated_capital_lines(study: EconomicsStudy) -> bool:
52 """Materialize generated capital lines before result fingerprinting.
54 Fingerprints include capital line rows, so generated rows have to be updated
55 first or the dependency comparison would describe stale generated costs.
56 """
57 changed = False
58 mappings = (
59 EquipmentMapping.objects.filter(
60 flowsheet=study.flowsheet,
61 costable_item__study=study,
62 costable_item__simulation_object__is_deleted=False,
63 )
64 .select_related(
65 "costable_item",
66 "costable_item__cost_driver",
67 "costable_item__cost_driver__property_info",
68 "cost_curve",
69 )
70 .order_by("pk")
71 )
72 active_costable_item_ids = set()
73 for mapping in mappings:
74 active_costable_item_ids.add(mapping.costable_item_id)
75 changed = _sync_generated_capital_line(mapping) or changed
76 stale_lines = CapitalCostLine.objects.filter(
77 flowsheet=study.flowsheet,
78 study=study,
79 source=GENERATED_CAPITAL_LINE_SOURCE,
80 )
81 if active_costable_item_ids:
82 stale_lines = stale_lines.exclude(costable_item_id__in=active_costable_item_ids)
83 deleted_count, _ = stale_lines.delete()
84 return changed or deleted_count > 0
87def _sync_generated_capital_line(mapping: EquipmentMapping) -> bool:
88 """Create or update the single generated capital line for one equipment mapping."""
89 driver = getattr(mapping.costable_item, "cost_driver", None)
90 generated_lines = list(
91 CapitalCostLine.objects.filter(
92 flowsheet=mapping.flowsheet,
93 study=mapping.costable_item.study,
94 costable_item=mapping.costable_item,
95 source=GENERATED_CAPITAL_LINE_SOURCE,
96 ).order_by("pk")
97 )
98 line = generated_lines[0] if generated_lines else None
99 changed = False
100 for duplicate in generated_lines[1:]: 100 ↛ 101line 100 didn't jump to line 101 because the loop on line 100 never started
101 duplicate.delete()
102 changed = True
104 fields = _generated_capital_line_fields(mapping=mapping, driver=driver, existing_line=line)
105 if line is None:
106 fields["peak_demand_kw"] = fields.get("minimum_peak_demand_kw")
107 CapitalCostLine.objects.create(
108 flowsheet=mapping.flowsheet,
109 study=mapping.costable_item.study,
110 costable_item=mapping.costable_item,
111 **fields,
112 )
113 return True
115 minimum_peak_demand_kw = fields.get("minimum_peak_demand_kw")
116 selected_peak_demand_kw = line.peak_demand_kw
117 previous_minimum_peak_demand_kw = line.minimum_peak_demand_kw
118 selected_peak_demand_is_auto = selected_peak_demand_kw == previous_minimum_peak_demand_kw
119 if minimum_peak_demand_kw is None:
120 fields["peak_demand_kw"] = None
121 elif (
122 selected_peak_demand_kw is None
123 or selected_peak_demand_is_auto
124 or selected_peak_demand_kw < minimum_peak_demand_kw
125 ):
126 fields["peak_demand_kw"] = minimum_peak_demand_kw
128 changed_fields = []
129 for field_name, value in fields.items():
130 if getattr(line, field_name) != value:
131 setattr(line, field_name, value)
132 changed_fields.append(field_name)
133 if changed_fields:
134 line.save(update_fields=[*changed_fields, "updated_at"])
135 changed = True
136 return changed
139def sync_generated_capital_lines_for_property(property_info) -> list[EconomicsStudy]:
140 """Refresh generated economics rows affected by an edited flowsheet property."""
141 property_set = getattr(property_info, "set", None)
142 simulation_object = getattr(property_set, "simulationObject", None)
143 changed_studies = []
144 study_ids: set[int] = set()
145 flowsheet_studies = EconomicsStudy.objects.filter(flowsheet=property_info.flowsheet)
146 if simulation_object is not None:
147 study_ids.update(
148 flowsheet_studies.filter(
149 costable_items__simulation_object=simulation_object,
150 ).values_list("pk", flat=True)
151 )
152 connected_unit_ids = simulation_object.connectedPorts.filter(
153 unitOp__isnull=False,
154 ).values_list("unitOp_id", flat=True)
155 study_ids.update(
156 flowsheet_studies.filter(
157 costable_items__simulation_object_id__in=connected_unit_ids,
158 ).values_list("pk", flat=True)
159 )
160 referenced_study_ids = set()
161 for line in CapitalCostLine.objects.filter(
162 flowsheet=property_info.flowsheet,
163 source=GENERATED_CAPITAL_LINE_SOURCE,
164 ).only("study_id", "driver_inputs"):
165 if _driver_inputs_reference_property(line.driver_inputs, property_info.pk):
166 referenced_study_ids.add(line.study_id)
167 input_studies = EconomicsStudy.objects.filter(
168 pk__in=referenced_study_ids,
169 flowsheet=property_info.flowsheet,
170 )
171 study_ids.update(input_studies.values_list("pk", flat=True))
172 for study in flowsheet_studies.filter(pk__in=study_ids).order_by("pk"):
173 if _sync_generated_capital_lines(study): 173 ↛ 172line 173 didn't jump to line 172 because the condition on line 173 was always true
174 changed_studies.append(study)
175 return changed_studies
178def _generated_capital_line_fields(
179 *,
180 mapping: EquipmentMapping,
181 driver: CostDriver | None,
182 existing_line: CapitalCostLine | None = None,
183) -> dict[str, Any]:
184 """Return persisted fields for the generated capital line tied to one mapping."""
185 curve = mapping.cost_curve
186 minimum_peak_demand_kw = unit_work_peak_demand_kw(mapping.costable_item)
187 if curve is None:
188 return _pending_generated_capital_line_fields(
189 mapping=mapping,
190 minimum_peak_demand_kw=minimum_peak_demand_kw,
191 )
193 warning_payload: dict[str, Any] = {
194 "calculation_method": "cost_curve",
195 "cost_curve_id": curve.pk,
196 "cost_curve_key": curve.curve_key,
197 "cost_basis": curve.cost_basis,
198 "required_driver_specs": _curve_required_driver_specs_payload(curve),
199 "minimum_peak_demand_kw": None if minimum_peak_demand_kw is None else str(minimum_peak_demand_kw),
200 "warnings": [],
201 }
202 amount = None
203 confidence = "calculated"
204 try:
205 driver_inputs = _reconciled_driver_inputs(
206 curve=curve,
207 existing_inputs=(
208 normalize_capital_cost_driver_inputs(existing_line.driver_inputs)
209 if existing_line is not None and existing_line.driver_inputs
210 else {}
211 ),
212 )
213 resolved_inputs = _resolved_driver_inputs(
214 curve=curve,
215 driver_inputs=driver_inputs,
216 mapping=mapping,
217 )
218 generated_formula = build_generated_capital_line_formula(
219 mapping,
220 driver=driver,
221 existing_line=existing_line,
222 )
223 evaluation = evaluate_cost_curve(
224 curve,
225 inputs_by_key=resolved_inputs,
226 apply_installation_factor=generated_formula.applies_lang_factor,
227 )
228 base_amount = evaluation.amount
229 if base_amount < 0:
230 raise CostCurveEvaluationError(
231 "negative_cost_output",
232 "Cost curve evaluated to a negative cost.",
233 context={"curve_key": curve.curve_key, "amount": str(base_amount), "output_unit": curve.output_unit},
234 )
235 factor_rows = [
236 _capital_factor_row(
237 kind="base_curve_cost",
238 label="Curve base cost",
239 amount=base_amount,
240 detail=_base_curve_detail(evaluation),
241 )
242 ]
243 indexed_amount = base_amount * generated_formula.index_adjustment.factor
244 purchase_basis_amount = indexed_amount if curve.cost_basis == CostBasis.PURCHASE else Decimal("0")
245 factor_rows.append(
246 _capital_factor_row(
247 kind="index_adjustment",
248 label="CPI/index adjustment",
249 amount=indexed_amount,
250 factor=generated_formula.index_adjustment.factor,
251 detail=generated_formula.index_adjustment.detail,
252 )
253 )
254 installed_basis_amount = indexed_amount if curve.cost_basis == CostBasis.INSTALLED else Decimal("0")
255 uplift_base_amount = indexed_amount
256 if generated_formula.applies_lang_factor and generated_formula.lang_factor is not None: 256 ↛ 268line 256 didn't jump to line 268 because the condition on line 256 was always true
257 uplift_base_amount = indexed_amount * generated_formula.lang_factor
258 installed_basis_amount = uplift_base_amount
259 factor_rows.append(
260 _capital_factor_row(
261 kind="lang_factor",
262 label="Lang factor",
263 amount=uplift_base_amount,
264 factor=generated_formula.lang_factor,
265 detail=generated_formula.lang_factor_source,
266 )
267 )
268 contingency_percent = generated_formula.contingency_percent
269 contingency_amount = uplift_base_amount * (contingency_percent / Decimal("100"))
270 amount = uplift_base_amount * generated_formula.contingency_factor
271 factor_rows.append(
272 _capital_factor_row(
273 kind="contingency",
274 label="Contingency",
275 amount=amount,
276 factor=generated_formula.contingency_factor,
277 percent=contingency_percent,
278 )
279 )
280 amount = result_amount(amount)
281 warning_payload.update(
282 {
283 "input_value": None if evaluation.input_value is None else str(evaluation.input_value),
284 "input_unit": evaluation.input_unit,
285 "normalized_inputs": evaluation.normalized_inputs,
286 "selected_variant": evaluation.selected_variant,
287 "selector_diagnostics": list(evaluation.selector_diagnostics),
288 "base_amount": str(result_amount(evaluation.amount)),
289 "index_factor": str(generated_formula.index_adjustment.factor),
290 "lang_factor": (
291 None
292 if generated_formula.lang_factor is None
293 else str(generated_formula.lang_factor)
294 ),
295 "lang_factor_source": generated_formula.lang_factor_source,
296 "purchase_basis_amount": str(result_amount(purchase_basis_amount)),
297 "installed_basis_amount": str(result_amount(installed_basis_amount)),
298 "contingency_percent": str(contingency_percent),
299 "contingency_amount": str(result_amount(contingency_amount)),
300 "amount": None if amount is None else str(amount),
301 "output_unit": evaluation.output_unit,
302 "capital_factors": factor_rows,
303 }
304 )
305 warning_payload["warnings"].extend(evaluation.warnings_payload())
306 warning_payload["warnings"].extend(
307 _flow_capacity_warnings(
308 mapping=mapping,
309 curve=curve,
310 driver_inputs=driver_inputs,
311 resolved_inputs=resolved_inputs,
312 )
313 )
314 except (CostCurveEvaluationError, FormulaError, ValueError) as exc:
315 warning_payload["warnings"].append(_cost_curve_error_warning(exc, mapping=mapping, driver=driver))
316 confidence = "blocked"
318 fields: dict[str, Any] = {
319 "cost_curve": curve,
320 "label": f"{mapping.costable_item.name} capital cost",
321 "line_type": "equipment_capital",
322 "amount": amount,
323 "currency": curve.currency or curve.output_unit or "NZD",
324 "included": mapping.costable_item.included,
325 "manual": False,
326 "source": GENERATED_CAPITAL_LINE_SOURCE,
327 "confidence": confidence,
328 "minimum_peak_demand_kw": minimum_peak_demand_kw,
329 "warning_payload": json_ready(warning_payload),
330 }
331 # Driver inputs are user-editable on generated capital lines. Populate the
332 # declarative defaults only for a new or still-empty row so recalculation
333 # cannot erase the user's property/manual selections.
334 reconciled_inputs = _reconciled_driver_inputs(
335 curve=curve,
336 existing_inputs=(
337 normalize_capital_cost_driver_inputs(existing_line.driver_inputs)
338 if existing_line is not None and existing_line.driver_inputs
339 else {}
340 ),
341 )
342 if existing_line is None or existing_line.driver_inputs != reconciled_inputs:
343 fields["driver_inputs"] = reconciled_inputs
344 return fields
347def _pending_generated_capital_line_fields(
348 *,
349 mapping: EquipmentMapping,
350 minimum_peak_demand_kw: Decimal | None,
351) -> dict[str, Any]:
352 """Return a generated capital line that is configurable but not costed yet."""
353 warning_payload = {
354 "calculation_method": "cost_curve",
355 "cost_curve_id": None,
356 "cost_curve_key": "",
357 "cost_basis": mapping.cost_basis,
358 "minimum_peak_demand_kw": None if minimum_peak_demand_kw is None else str(minimum_peak_demand_kw),
359 "warnings": [],
360 }
361 return {
362 "cost_curve": None,
363 "label": f"{mapping.costable_item.name} capital cost",
364 "line_type": "equipment_capital",
365 "amount": None,
366 "currency": "NZD",
367 "included": mapping.costable_item.included,
368 "manual": False,
369 "source": GENERATED_CAPITAL_LINE_SOURCE,
370 "driver_inputs": {},
371 "confidence": "blocked",
372 "minimum_peak_demand_kw": minimum_peak_demand_kw,
373 "warning_payload": json_ready(warning_payload),
374 }
377def _curve_required_driver_specs(curve: CostCurve) -> tuple[CostCurveDriverSpec, ...]:
378 """Return the selected curve's validated driver spec models."""
379 return parse_required_driver_specs(curve.required_driver_specs)
382def _curve_required_driver_specs_payload(curve: CostCurve) -> list[CostCurveDriverSpecPayload]:
383 """Return the selected curve's JSON-ready driver spec payload."""
384 return driver_specs_payload(_curve_required_driver_specs(curve))
387def _default_driver_inputs(curve: CostCurve) -> CapitalCostDriverInputsPayload:
388 """Create JSON-ready per-spec capital-line input rows from typed specs."""
389 return default_driver_inputs_payload(_curve_required_driver_specs(curve))
392def _reconciled_driver_inputs(
393 *,
394 curve: CostCurve,
395 existing_inputs: CapitalCostDriverInputsPayload,
396) -> CapitalCostDriverInputsPayload:
397 """Preserve matching driver inputs and reset keys that no longer match the curve."""
398 defaults = _default_driver_inputs(curve)
399 reconciled: CapitalCostDriverInputsPayload = {}
400 specs_by_key = {spec.key: spec for spec in _curve_required_driver_specs(curve)}
401 for key, default_input in defaults.items():
402 existing_input = existing_inputs.get(key)
403 spec = specs_by_key[key]
404 reconciled[key] = (
405 existing_input
406 if _driver_input_matches_spec(existing_input, spec)
407 else default_input
408 )
409 return reconciled
412def _driver_input_matches_spec(driver_input: Any, spec: CostCurveDriverSpec) -> bool:
413 if not isinstance(driver_input, dict):
414 return False
415 try:
416 parsed_input = CapitalCostDriverInput.model_validate(driver_input)
417 except ValueError:
418 return False
419 if parsed_input.source and parsed_input.source not in spec.source_options:
420 return False
421 return cost_curve_units_compatible(parsed_input.unit, spec.unit)
424def _driver_inputs_reference_property(driver_inputs: Any, property_info_id: int) -> bool:
425 """Return whether a spec-keyed driver-input JSON payload references a property."""
426 if not isinstance(driver_inputs, dict): 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true
427 return False
428 return any(
429 isinstance(driver_input, dict)
430 and driver_input.get("source") == "property"
431 and driver_input.get("property_info") == property_info_id
432 for driver_input in driver_inputs.values()
433 )
436def _capital_factor_row(
437 *,
438 kind: str,
439 label: str,
440 amount: Decimal,
441 factor: Decimal | None = None,
442 percent: Decimal | None = None,
443 detail: str = "",
444) -> dict[str, Any]:
445 """Serialize one capital factor step for generated-line audit payloads."""
446 return {
447 "kind": kind,
448 "label": label,
449 "factor": None if factor is None else str(factor),
450 "percent": None if percent is None else str(percent),
451 "amount": str(result_amount(amount)),
452 "detail": detail,
453 }
456def _resolved_driver_inputs(
457 *,
458 curve: CostCurve,
459 driver_inputs: CapitalCostDriverInputsPayload,
460 mapping: EquipmentMapping,
461) -> dict[str, ResolvedDriverInput]:
462 """Resolve persisted driver-input selections into evaluator-ready values."""
463 resolved: dict[str, ResolvedDriverInput] = {}
464 for spec in _curve_required_driver_specs(curve):
465 driver_input_payload = driver_inputs.get(spec.key)
466 if driver_input_payload is None: 466 ↛ 467line 466 didn't jump to line 467 because the condition on line 466 was never true
467 continue
468 driver_input = CapitalCostDriverInput.model_validate(driver_input_payload)
469 if driver_input.source == "property":
470 property_info_id = driver_input.property_info
471 if property_info_id is None: 471 ↛ 472line 471 didn't jump to line 472 because the condition on line 471 was never true
472 raise CostCurveEvaluationError(
473 "missing_cost_curve_input",
474 "Property-backed cost curve input has no selected property.",
475 context={"curve_key": curve.curve_key, "input_key": spec.key},
476 )
477 property_info = spec_property_info(mapping=mapping, property_info_id=property_info_id)
478 value = property_info.get_value()
479 if value in (None, ""):
480 raise CostCurveEvaluationError(
481 "missing_cost_curve_input",
482 "Selected cost curve input property has no value.",
483 context={
484 "curve_key": curve.curve_key,
485 "input_key": spec.key,
486 "property_info_id": property_info_id,
487 },
488 )
489 resolved[spec.key] = {
490 "value": value,
491 "unit": property_info.unit or driver_input.unit or spec.unit,
492 }
493 elif driver_input.source == "manual":
494 if driver_input.manual_value == "": 494 ↛ 495line 494 didn't jump to line 495 because the condition on line 494 was never true
495 raise CostCurveEvaluationError(
496 "missing_cost_curve_input",
497 "Manual cost curve input has no value.",
498 context={"curve_key": curve.curve_key, "input_key": spec.key},
499 )
500 resolved[spec.key] = {
501 "value": driver_input.manual_value,
502 "unit": driver_input.unit or spec.unit,
503 }
504 elif spec.required: 504 ↛ 464line 504 didn't jump to line 464 because the condition on line 504 was always true
505 raise CostCurveEvaluationError(
506 "missing_cost_curve_input",
507 "Cost curve input has no selected source.",
508 context={"curve_key": curve.curve_key, "input_key": spec.key},
509 )
510 return resolved
513def spec_property_info(*, mapping: EquipmentMapping, property_info_id: int):
514 from core.auxiliary.models import PropertyInfo
516 try:
517 property_info = PropertyInfo.objects.get(pk=property_info_id, flowsheet=mapping.flowsheet)
518 except PropertyInfo.DoesNotExist as exc:
519 raise CostCurveEvaluationError(
520 "invalid_cost_curve_input_property",
521 "Selected cost curve input property does not belong to this flowsheet.",
522 context={"property_info_id": property_info_id},
523 ) from exc
524 driver = getattr(mapping.costable_item, "cost_driver", None)
525 if driver is not None:
526 try:
527 validate_cost_driver_property(driver, property_info)
528 except ValueError as exc:
529 raise CostCurveEvaluationError(
530 "invalid_cost_curve_input_property",
531 str(exc),
532 context={"property_info_id": property_info_id, "costable_item_id": mapping.costable_item_id},
533 ) from exc
534 return property_info
537def _flow_capacity_warnings(
538 *,
539 mapping: EquipmentMapping,
540 curve: CostCurve,
541 driver_inputs: CapitalCostDriverInputsPayload,
542 resolved_inputs: dict[str, ResolvedDriverInput],
543) -> list[dict[str, Any]]:
544 """Warn when manual HX flow capacity is smaller than inlet stream flow."""
545 warnings: list[dict[str, Any]] = []
546 for spec in _curve_required_driver_specs(curve):
547 flow_property_key = _flow_capacity_property_key(spec)
548 if flow_property_key is None:
549 continue
550 driver_input_payload = driver_inputs.get(spec.key)
551 if not isinstance(driver_input_payload, dict): 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true
552 continue
553 driver_input = CapitalCostDriverInput.model_validate(driver_input_payload)
554 if driver_input.source != "manual": 554 ↛ 555line 554 didn't jump to line 555 because the condition on line 554 was never true
555 continue
556 resolved_input = resolved_inputs.get(spec.key)
557 if resolved_input is None: 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true
558 continue
559 capacity = _decimal_or_none(resolved_input.get("value"))
560 if capacity is None: 560 ↛ 561line 560 didn't jump to line 561 because the condition on line 560 was never true
561 continue
562 capacity_unit = str(resolved_input.get("unit") or spec.unit)
563 capacity_overage = _inlet_flow_capacity_overage(
564 mapping=mapping,
565 property_key=flow_property_key,
566 capacity=capacity,
567 capacity_unit=capacity_unit,
568 )
569 if capacity_overage is None:
570 continue
571 warnings.append(
572 warning_record(
573 code="cost_curve_flow_capacity_exceeded",
574 severity="warning",
575 message=(
576 "Combined input stream flow exceeds the selected "
577 f"{spec.label.lower()} for this cost curve."
578 ),
579 context={
580 "curve_key": curve.curve_key,
581 "input_key": spec.key,
582 "label": spec.label,
583 "capacity_value": str(capacity),
584 "capacity_unit": capacity_unit,
585 **capacity_overage,
586 },
587 )
588 )
589 return warnings
592def _flow_capacity_property_key(spec: CostCurveDriverSpec) -> str | None:
593 if spec.role != "discrete_selector":
594 return None
595 if spec.key == "volumetric_flow": 595 ↛ 597line 595 didn't jump to line 597 because the condition on line 595 was always true
596 return "flow_vol"
597 if spec.key == "mass_flow":
598 return "flow_mass"
599 return None
602def _inlet_flow_capacity_overage(
603 *,
604 mapping: EquipmentMapping,
605 property_key: str,
606 capacity: Decimal,
607 capacity_unit: str,
608) -> dict[str, Any] | None:
609 simulation_object = mapping.costable_item.simulation_object
610 if simulation_object is None: 610 ↛ 611line 610 didn't jump to line 611 because the condition on line 610 was never true
611 return None
612 total_flow = Decimal("0")
613 contributing_streams: list[dict[str, str]] = []
614 ports = (
615 simulation_object.ports.filter(direction=ConType.Inlet)
616 .select_related("stream")
617 .order_by("index", "pk")
618 )
619 for port in ports:
620 stream = port.stream
621 if stream is None or getattr(stream, "properties", None) is None: 621 ↛ 622line 621 didn't jump to line 622 because the condition on line 621 was never true
622 continue
623 property_info = (
624 stream.properties.containedProperties.filter(key=property_key, type="numeric")
625 .prefetch_related("values")
626 .first()
627 )
628 if property_info is None: 628 ↛ 629line 628 didn't jump to line 629 because the condition on line 628 was never true
629 continue
630 stream_value = _decimal_or_none(property_info.get_value())
631 if stream_value is None: 631 ↛ 632line 631 didn't jump to line 632 because the condition on line 631 was never true
632 continue
633 stream_unit = property_info.unit or capacity_unit
634 if not cost_curve_units_compatible(stream_unit, capacity_unit): 634 ↛ 635line 634 didn't jump to line 635 because the condition on line 634 was never true
635 continue
636 stream_value_in_capacity_unit = _convert_decimal_value(
637 value=stream_value,
638 source_unit=stream_unit,
639 target_unit=capacity_unit,
640 )
641 if stream_value_in_capacity_unit is None: 641 ↛ 642line 641 didn't jump to line 642 because the condition on line 641 was never true
642 continue
643 total_flow += stream_value_in_capacity_unit
644 contributing_streams.append(
645 {
646 "stream_id": str(stream.pk),
647 "stream_name": stream.componentName or f"Stream {stream.pk}",
648 "port_name": port.displayName,
649 "property_id": str(property_info.pk),
650 "property_key": property_info.key,
651 "value": str(stream_value_in_capacity_unit),
652 "unit": normalize_economics_unit_notation(capacity_unit),
653 "selected_capacity": str(capacity),
654 }
655 )
656 if not contributing_streams or total_flow <= capacity:
657 return None
658 return {
659 "total_value": str(total_flow),
660 "total_unit": normalize_economics_unit_notation(capacity_unit),
661 "streams": contributing_streams,
662 }
665def _convert_decimal_value(*, value: Decimal, source_unit: str, target_unit: str) -> Decimal | None:
666 source_unit = normalize_economics_unit_notation(source_unit)
667 target_unit = normalize_economics_unit_notation(target_unit)
668 if source_unit == target_unit: 668 ↛ 670line 668 didn't jump to line 670 because the condition on line 668 was always true
669 return value
670 try:
671 return Decimal(str(convert_value(value, from_unit=source_unit, to_unit=target_unit)))
672 except (ValueError, DecimalException, PintError):
673 return None
676def _decimal_or_none(value: Any) -> Decimal | None:
677 if value in (None, ""): 677 ↛ 678line 677 didn't jump to line 678 because the condition on line 677 was never true
678 return None
679 try:
680 return Decimal(str(value))
681 except (DecimalException, ValueError):
682 return None
685def _base_curve_detail(evaluation) -> str:
686 if evaluation.selected_variant:
687 return f"{evaluation.selected_variant['label']} selected from lowest-cost candidate"
688 return f"{evaluation.input_value} {evaluation.input_unit}"
691def _cost_curve_error_warning(exc: Exception, *, mapping: EquipmentMapping, driver: CostDriver | None) -> dict[str, Any]:
692 """Convert curve/driver failures into the generated-line warning contract."""
693 if isinstance(exc, CostCurveEvaluationError | FormulaError): 693 ↛ 700line 693 didn't jump to line 700 because the condition on line 693 was always true
694 return warning_record(
695 code=exc.code,
696 severity="error",
697 message=exc.message,
698 context=exc.context,
699 )
700 return warning_record(
701 code="invalid_cost_driver_value",
702 severity="error",
703 message=str(exc),
704 context={
705 "costable_item_id": mapping.costable_item_id,
706 "cost_driver_id": None if driver is None else driver.pk,
707 "property_info_id": None if driver is None else driver.property_info_id,
708 },
709 )