Coverage for backend/django/Economics/formulas/builders/operating.py: 77%
250 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1from __future__ import annotations
3from dataclasses import dataclass
4from decimal import Decimal, InvalidOperation
6import sympy
8from Economics.shared.choices import (
10 DefaultRateReviewStatus,
12 DefaultRateType,
14 DefaultRateValueKind,
16 OperatingLineCategory,
18 OperatingLineEconomicEffect,
20)
22from Economics.reference_data.models import EconomicsDefaultRate
24from Economics.studies.models import EconomicsStudy
26from Economics.costing.models import OperatingCostLine
27from Economics.costing.operating.line_calculation import (
28 AnnualBasisQuantity,
29 annualization_requires_operating_hours,
30 annualized_basis_quantity,
31 convert_quantity,
32 strip_single_time_denominator,
33)
34from Economics.settings_profiles.services.settings_profiles import get_settings_profile
36from Economics.formulas.engine.core import EconomicsFormula, FormulaError, FormulaInput, FormulaStep, decimal_to_sympy
37from core.auxiliary.formula_units import formula_unit_expression
40SUPPORTED_OPERATING_METHODS = frozenset({"rate_times_quantity", "work_to_cost"})
41PROCESS_ENERGY_QUANTITY_UNIT = "MWh"
42OPERATING_LINE_RATE_QUANTUM = Decimal("0.00000001")
43STEAM_MASS_RATE_DENOMINATORS = frozenset({"t"})
44STEAM_ENERGY_BASIS_UNIT = "GJ"
47@dataclass(frozen=True)
48class OperatingLineFormula:
49 formula: EconomicsFormula
50 annual_basis: AnnualBasisQuantity
51 basis_quantity: Decimal
52 basis_unit: str
53 target_basis_unit: str
54 annualization_factor: Decimal
55 rate_amount: Decimal
56 rate_unit: str
58 def evaluate(self) -> Decimal | None:
59 return self.formula.evaluate({self.formula.inputs[0].key: self.basis_quantity})
62@dataclass(frozen=True)
63class OperatingAggregateFormula:
64 formula: EconomicsFormula
65 bindings: dict[str, Decimal]
67 def evaluate(self) -> Decimal | None:
68 return self.formula.evaluate(self.bindings)
71@dataclass(frozen=True)
72class ProcessEnergyContributionFormula:
73 formula: EconomicsFormula
74 source_quantity: Decimal
75 source_unit: str
77 def evaluate(self) -> Decimal | None:
78 return self.formula.evaluate({})
81@dataclass(frozen=True)
82class DefaultRateFormula:
83 formula: EconomicsFormula
84 bindings: dict[str, Decimal]
86 def evaluate(self) -> Decimal | None:
87 value = self.formula.evaluate(self.bindings)
88 if value is None: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 return None
90 return value.quantize(OPERATING_LINE_RATE_QUANTUM)
93def build_operating_line_formula(line: OperatingCostLine, *, study: EconomicsStudy) -> OperatingLineFormula:
94 """Build the canonical annual amount formula for one operating line."""
95 if line.calculation_method not in SUPPORTED_OPERATING_METHODS:
96 raise FormulaError(
97 "unsupported_operating_line_method",
98 "uses an unsupported calculation method.",
99 context={"operating_line_id": line.pk, "calculation_method": line.calculation_method},
100 )
101 if line.basis_quantity is None: 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true
102 raise FormulaError(
103 "missing_operating_line_basis",
104 "has no basis quantity.",
105 context={"operating_line_id": line.pk},
106 )
107 rate_amount = _effective_rate_amount(line)
108 if rate_amount is None: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true
109 raise FormulaError(
110 "missing_operating_line_rate",
111 "has no rate.",
112 context={"operating_line_id": line.pk},
113 )
114 rate_unit = _effective_rate_unit(line)
115 target_basis_unit = rate_denominator_unit(
116 rate_unit=rate_unit,
117 source_basis_unit=line.basis_unit,
118 )
119 if not target_basis_unit: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true
120 target_basis_unit = strip_single_time_denominator(line.basis_unit)
121 annualization_factor = _annualization_factor_for_line(
122 line,
123 study=study,
124 target_basis_unit=target_basis_unit,
125 )
126 if annualization_factor is None:
127 context = {
128 "operating_line_id": line.pk,
129 "source_unit": line.basis_unit,
130 "target_unit": target_basis_unit,
131 }
132 if _missing_annual_operating_hours_for_annualization( 132 ↛ 142line 132 didn't jump to line 142 because the condition on line 132 was always true
133 line,
134 study=study,
135 target_basis_unit=target_basis_unit,
136 ):
137 raise FormulaError(
138 "missing_annual_operating_hours",
139 _missing_annual_operating_hours_message(line, rate_unit),
140 context=context,
141 )
142 raise FormulaError(
143 "unsupported_operating_line_unit_conversion",
144 _unsupported_annual_cost_conversion_message(line, rate_unit),
145 context=context,
146 )
147 annual_basis_quantity = line.basis_quantity * annualization_factor
148 basis_symbol = sympy.Symbol(_basis_input_key(line))
149 expression = _multiply_terms(
150 basis_symbol,
151 annualization_factor,
152 rate_amount,
153 )
154 formula = EconomicsFormula(
155 key=f"operating_line:{line.pk or 'unsaved'}",
156 expression=expression,
157 unit=f"{line.currency}/year",
158 inputs=(
159 FormulaInput(
160 key=str(basis_symbol),
161 label="Basis quantity",
162 unit=line.basis_unit,
163 source_property_info_id=line.source_property_info_id,
164 ),
165 ),
166 steps=(
167 FormulaStep(
168 kind="annualization_factor",
169 label="Annualization factor",
170 expression=str(annualization_factor),
171 amount=annualization_factor,
172 unit=target_basis_unit,
173 ),
174 FormulaStep(
175 kind="rate",
176 label="Rate",
177 expression=str(rate_amount),
178 amount=rate_amount,
179 unit=rate_unit,
180 ),
181 ),
182 )
183 return OperatingLineFormula(
184 formula=formula,
185 annual_basis=AnnualBasisQuantity(quantity=annual_basis_quantity, unit=target_basis_unit),
186 basis_quantity=line.basis_quantity,
187 basis_unit=line.basis_unit,
188 target_basis_unit=target_basis_unit,
189 annualization_factor=annualization_factor,
190 rate_amount=rate_amount,
191 rate_unit=rate_unit,
192 )
195def build_derived_steam_rate_formula(
196 default_rate: EconomicsDefaultRate,
197 *,
198 override: dict | None = None,
199) -> DefaultRateFormula:
200 """Build the derived steam default-rate formula from fuel, steam energy, and efficiency."""
201 if default_rate.rate_type != DefaultRateType.STEAM or default_rate.value_kind != DefaultRateValueKind.DERIVED_TEMPLATE: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 raise FormulaError(
203 "unsupported_default_rate_formula",
204 "Only derived steam templates expose a default-rate formula.",
205 context={"default_rate_id": default_rate.pk},
206 )
207 metadata = default_rate.metadata if isinstance(default_rate.metadata, dict) else {}
208 fuel_price = _metadata_decimal(metadata.get("fuel_price_nzd_per_gj"))
209 steam_energy = _override_or_metadata_decimal(
210 override,
211 "steam_energy_gj_per_t",
212 metadata.get("steam_energy_gj_per_t"),
213 )
214 efficiency_percent = _override_or_metadata_decimal(
215 override,
216 "boiler_efficiency_percent",
217 metadata.get("default_boiler_efficiency_percent"),
218 )
219 if fuel_price is None or steam_energy is None or efficiency_percent is None or efficiency_percent <= 0: 219 ↛ 220line 219 didn't jump to line 220 because the condition on line 219 was never true
220 raise FormulaError(
221 "blocked_derived_steam_rate",
222 "Derived steam rate requires fuel price, steam energy, and positive boiler efficiency.",
223 context={"default_rate_id": default_rate.pk},
224 )
225 expression = (
226 sympy.Symbol("fuel_price_nzd_per_gj")
227 * sympy.Symbol("steam_energy_gj_per_t")
228 / (sympy.Symbol("boiler_efficiency_percent") / decimal_to_sympy(Decimal("100")))
229 )
230 return DefaultRateFormula(
231 formula=EconomicsFormula(
232 key=f"default_rate:derived_steam:{default_rate.pk or default_rate.key}",
233 expression=expression,
234 unit=default_rate.display_unit,
235 inputs=(
236 FormulaInput("fuel_price_nzd_per_gj", "Fuel price", "NZD/GJ"),
237 FormulaInput("steam_energy_gj_per_t", "Steam energy basis", "GJ/t"),
238 FormulaInput("boiler_efficiency_percent", "Boiler efficiency", "percent"),
239 ),
240 ),
241 bindings={
242 "fuel_price_nzd_per_gj": fuel_price,
243 "steam_energy_gj_per_t": steam_energy,
244 "boiler_efficiency_percent": efficiency_percent,
245 },
246 )
249def build_process_energy_contribution_formula(
250 line: OperatingCostLine,
251 *,
252 study: EconomicsStudy,
253) -> ProcessEnergyContributionFormula:
254 """Build the target-process-energy contribution formula for one operating line."""
255 if line.basis_quantity is None: 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true
256 raise FormulaError(
257 "missing_process_energy_basis",
258 "Process-energy contribution requires a basis quantity.",
259 context={"operating_line_id": line.pk},
260 )
261 direct_energy = annualized_basis_quantity(
262 line.basis_quantity,
263 source_unit=line.basis_unit,
264 target_unit=PROCESS_ENERGY_QUANTITY_UNIT,
265 study=study,
266 )
267 if direct_energy is not None: 267 ↛ 275line 267 didn't jump to line 275 because the condition on line 267 was always true
268 return _process_energy_formula(
269 line=line,
270 target_quantity=direct_energy,
271 source_quantity=line.basis_quantity,
272 source_unit=line.basis_unit,
273 )
275 annual_basis = build_operating_line_formula(line, study=study).annual_basis
276 source_unit = strip_single_time_denominator(annual_basis.unit)
277 if not source_unit:
278 raise FormulaError(
279 "unsupported_process_energy_unit",
280 "Process-energy contribution requires an energy-compatible annual basis.",
281 context={"operating_line_id": line.pk, "annual_basis_unit": annual_basis.unit},
282 )
283 target_quantity = convert_quantity(
284 value=annual_basis.quantity,
285 source_unit=source_unit,
286 target_unit=PROCESS_ENERGY_QUANTITY_UNIT,
287 )
288 if target_quantity is None:
289 raise FormulaError(
290 "unsupported_process_energy_unit",
291 "Process-energy contribution requires an energy-compatible annual basis.",
292 context={"operating_line_id": line.pk, "annual_basis_unit": annual_basis.unit},
293 )
294 return _process_energy_formula(
295 line=line,
296 target_quantity=target_quantity,
297 source_quantity=annual_basis.quantity,
298 source_unit=annual_basis.unit,
299 )
302def build_annual_operating_expense_formula(study: EconomicsStudy) -> OperatingAggregateFormula:
303 """Build annual OPEX from included non-revenue operating line formulas."""
304 return _build_operating_total_formula(
305 study,
306 key="annual_operating_expense",
307 label="Annual operating expense",
308 include_revenue=False,
309 )
312def build_annual_operating_revenue_formula(study: EconomicsStudy) -> OperatingAggregateFormula:
313 """Build annual revenue from included output-revenue operating line formulas."""
314 return _build_operating_total_formula(
315 study,
316 key="annual_operating_revenue",
317 label="Annual operating revenue",
318 include_revenue=True,
319 )
322def render_operating_line_property_formula(
323 operating_formula: OperatingLineFormula,
324 *,
325 source_property_formula: str | None,
326) -> str:
327 """Render an operating-line formula as an IDAES-visible annual-cost formula."""
328 rate_unit_expression = formula_unit_expression(operating_formula.rate_unit)
329 target_basis_unit_expression = formula_unit_expression(operating_formula.target_basis_unit)
330 if rate_unit_expression is None or target_basis_unit_expression is None: 330 ↛ 331line 330 didn't jump to line 331 because the condition on line 330 was never true
331 raise FormulaError(
332 "unsupported_operating_line_unit_rendering",
333 "uses a unit conversion that is not solve-visible yet.",
334 )
336 input_key = operating_formula.formula.inputs[0].key
337 if source_property_formula is None:
338 rendered = operating_formula.formula.render_property_formula(
339 {input_key: _decimal_literal(operating_formula.basis_quantity)}
340 )
341 return (
342 f"({rendered} * ({target_basis_unit_expression}) "
343 f"* ({rate_unit_expression}) / year)"
344 )
346 basis_unit_expression = formula_unit_expression(operating_formula.basis_unit)
347 if basis_unit_expression is None: 347 ↛ 348line 347 didn't jump to line 348 because the condition on line 347 was never true
348 raise FormulaError(
349 "unsupported_operating_line_unit_rendering",
350 "uses a unit conversion that is not solve-visible yet.",
351 )
352 converted_source = f"convert({source_property_formula}, {basis_unit_expression})"
353 rendered = operating_formula.formula.render_property_formula({input_key: converted_source})
354 return (
355 f"({rendered} * ({rate_unit_expression}) "
356 f"* ({target_basis_unit_expression}) / ({basis_unit_expression}) / year)"
357 )
360def rate_denominator_unit(*, rate_unit: str, source_basis_unit: str) -> str:
361 """Return the physical basis denominator implied by a rate unit."""
362 if "/" not in rate_unit: 362 ↛ 363line 362 didn't jump to line 363 because the condition on line 362 was never true
363 return ""
364 denominator = rate_unit.split("/", 1)[1].strip()
365 if denominator == source_basis_unit.strip():
366 return strip_single_time_denominator(denominator)
367 return denominator
370def _annualization_factor_for_line(
371 line: OperatingCostLine,
372 *,
373 study: EconomicsStudy,
374 target_basis_unit: str,
375) -> Decimal | None:
376 annualization_factor = annualized_basis_quantity(
377 Decimal("1"),
378 source_unit=line.basis_unit,
379 target_unit=target_basis_unit,
380 study=study,
381 )
382 if annualization_factor is not None:
383 return annualization_factor
384 return _steam_mass_annualization_factor(
385 line,
386 study=study,
387 target_basis_unit=target_basis_unit,
388 )
391def _missing_annual_operating_hours_for_annualization(
392 line: OperatingCostLine,
393 *,
394 study: EconomicsStudy,
395 target_basis_unit: str,
396) -> bool:
397 settings_profile = get_settings_profile(study)
398 if settings_profile is not None and settings_profile.annual_operating_hours is not None: 398 ↛ 399line 398 didn't jump to line 399 because the condition on line 398 was never true
399 return False
400 if (
401 target_basis_unit.strip().lower() in STEAM_MASS_RATE_DENOMINATORS
402 and line.rate_type == DefaultRateType.STEAM
403 and _steam_energy_basis_gj_per_t(line)
404 ):
405 return annualization_requires_operating_hours(
406 source_unit=line.basis_unit,
407 target_unit=STEAM_ENERGY_BASIS_UNIT,
408 )
409 return annualization_requires_operating_hours(
410 source_unit=line.basis_unit,
411 target_unit=target_basis_unit,
412 )
415def _steam_mass_annualization_factor(
416 line: OperatingCostLine,
417 *,
418 study: EconomicsStudy,
419 target_basis_unit: str,
420) -> Decimal | None:
421 if target_basis_unit.strip().lower() not in STEAM_MASS_RATE_DENOMINATORS:
422 return None
423 if line.rate_type != DefaultRateType.STEAM: 423 ↛ 424line 423 didn't jump to line 424 because the condition on line 423 was never true
424 return None
425 steam_energy_basis = _steam_energy_basis_gj_per_t(line)
426 if steam_energy_basis is None or steam_energy_basis <= 0: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true
427 return None
428 annual_energy = annualized_basis_quantity(
429 Decimal("1"),
430 source_unit=line.basis_unit,
431 target_unit=STEAM_ENERGY_BASIS_UNIT,
432 study=study,
433 )
434 if annual_energy is None:
435 return None
436 return annual_energy / steam_energy_basis
439def _steam_energy_basis_gj_per_t(line: OperatingCostLine) -> Decimal | None:
440 if not line.source_default_rate_id: 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true
441 return None
442 metadata = line.source_default_rate.metadata if isinstance(line.source_default_rate.metadata, dict) else {}
443 return _metadata_decimal(metadata.get("steam_energy_gj_per_t"))
446def _build_operating_total_formula(
447 study: EconomicsStudy,
448 *,
449 key: str,
450 label: str,
451 include_revenue: bool,
452) -> OperatingAggregateFormula:
453 terms: list[sympy.Expr] = []
454 bindings: dict[str, Decimal] = {}
455 inputs: list[FormulaInput] = []
456 blocked_children: list[dict[str, str]] = []
457 for line in study.operating_lines.filter(included=True).select_related("source_default_rate").order_by("pk"):
458 if operating_line_is_revenue(line) != include_revenue:
459 continue
460 try:
461 amount = build_operating_line_formula(line, study=study).evaluate()
462 except FormulaError as exc:
463 amount = None
464 blocked_children.append(
465 {
466 "key": f"operating_line:{line.pk}",
467 "code": exc.code,
468 "reason": exc.message,
469 }
470 )
471 if amount is not None:
472 symbol_key = operating_line_input_key(line.pk)
473 terms.append(sympy.Symbol(symbol_key))
474 bindings[symbol_key] = amount
475 inputs.append(
476 FormulaInput(
477 key=symbol_key,
478 label=line.label,
479 unit=f"{line.currency}/year",
480 )
481 )
482 return OperatingAggregateFormula(
483 formula=EconomicsFormula(
484 key=key,
485 expression=_sum_expressions(terms),
486 unit=f"{_study_currency(study)}/year",
487 inputs=tuple(inputs),
488 steps=(
489 FormulaStep(
490 kind=key,
491 label=label,
492 expression="sum(included operating line formulas)",
493 unit=f"{_study_currency(study)}/year",
494 ),
495 ),
496 missing_child_policy="strict_after_revenue_classification",
497 blocked_children=tuple(blocked_children),
498 blocked_reason=_operating_total_blocked_reason(label, blocked_children),
499 ),
500 bindings=bindings,
501 )
504def _operating_total_blocked_reason(label: str, blocked_children: list[dict[str, str]]) -> str:
505 if not blocked_children:
506 return ""
507 if all(child.get("code") == "missing_annual_operating_hours" for child in blocked_children):
508 return "Set annual operating hours in Settings, then recalculate Project overview."
509 return f"{label} has blocked operating lines."
512def _missing_annual_operating_hours_message(line: OperatingCostLine, rate_unit: str) -> str:
513 unit_pair = _operating_line_unit_pair(line, rate_unit)
514 if unit_pair: 514 ↛ 516line 514 didn't jump to line 516 because the condition on line 514 was always true
515 return f"{unit_pair} needs annual operating hours in Settings."
516 return "Needs annual operating hours in Settings."
519def _unsupported_annual_cost_conversion_message(line: OperatingCostLine, rate_unit: str) -> str:
520 unit_pair = _operating_line_unit_pair(line, rate_unit)
521 if unit_pair:
522 return f"{unit_pair} cannot be converted to an annual cost. Check this line's basis and rate units."
523 return "This line's basis and rate units cannot be converted to an annual cost."
526def _operating_line_unit_pair(line: OperatingCostLine, rate_unit: str) -> str:
527 basis_unit = line.basis_unit.strip()
528 rate_unit = rate_unit.strip()
529 if basis_unit and rate_unit: 529 ↛ 531line 529 didn't jump to line 531 because the condition on line 529 was always true
530 return f"{basis_unit} with {rate_unit}"
531 if basis_unit:
532 return f"{basis_unit} basis"
533 if rate_unit:
534 return f"{rate_unit} rate"
535 return ""
538def operating_line_is_revenue(line: OperatingCostLine) -> bool:
539 """Return whether an operating line is a credit against operating cost."""
540 return (
541 line.economic_effect == OperatingLineEconomicEffect.REVENUE
542 or line.category == OperatingLineCategory.OUTPUT_REVENUE
543 )
546def operating_line_input_key(line_id: int) -> str:
547 return f"operating_line_{line_id}"
550def _process_energy_formula(
551 *,
552 line: OperatingCostLine,
553 target_quantity: Decimal,
554 source_quantity: Decimal,
555 source_unit: str,
556) -> ProcessEnergyContributionFormula:
557 return ProcessEnergyContributionFormula(
558 formula=EconomicsFormula(
559 key=f"process_energy_contribution:{line.pk or 'unsaved'}",
560 expression=decimal_to_sympy(target_quantity),
561 unit=PROCESS_ENERGY_QUANTITY_UNIT,
562 inputs=(),
563 steps=(
564 FormulaStep(
565 kind="process_energy_contribution",
566 label="Process-energy contribution",
567 expression=f"{source_quantity} {source_unit} -> {PROCESS_ENERGY_QUANTITY_UNIT}",
568 amount=target_quantity,
569 unit=PROCESS_ENERGY_QUANTITY_UNIT,
570 ),
571 ),
572 ),
573 source_quantity=source_quantity,
574 source_unit=source_unit,
575 )
578def _multiply_terms(basis_symbol: sympy.Symbol, annualization_factor: Decimal, rate_amount: Decimal) -> sympy.Expr:
579 terms: list[sympy.Expr] = [basis_symbol]
580 if annualization_factor != Decimal("1"):
581 terms.append(decimal_to_sympy(annualization_factor))
582 terms.append(decimal_to_sympy(rate_amount))
583 return sympy.Mul(*terms, evaluate=False)
586def _sum_expressions(terms: list[sympy.Expr]) -> sympy.Expr:
587 if not terms:
588 return decimal_to_sympy(Decimal("0"))
589 if len(terms) == 1:
590 return terms[0]
591 return sympy.Add(*terms, evaluate=False)
594def _study_currency(study: EconomicsStudy) -> str:
595 assumptions = get_settings_profile(study)
596 if assumptions is None:
597 return "NZD"
598 return assumptions.currency or "NZD"
601def _effective_rate_amount(line: OperatingCostLine) -> Decimal | None:
602 if line.rate_amount is not None: 602 ↛ 604line 602 didn't jump to line 604 because the condition on line 602 was always true
603 return line.rate_amount
604 if (
605 line.source_default_rate_id
606 and line.source_default_rate.value is not None
607 and line.source_default_rate.review_status == DefaultRateReviewStatus.REVIEWED
608 and line.source_default_rate.value_kind == DefaultRateValueKind.REVIEWED_DEFAULT
609 ):
610 return line.source_default_rate.value
611 return None
614def _effective_rate_unit(line: OperatingCostLine) -> str:
615 if line.rate_unit: 615 ↛ 617line 615 didn't jump to line 617 because the condition on line 615 was always true
616 return line.rate_unit
617 if line.source_default_rate_id:
618 return line.source_default_rate.display_unit
619 return ""
622def _basis_input_key(line: OperatingCostLine) -> str:
623 return f"basis_{line.pk or 'unsaved'}"
626def _metadata_decimal(value) -> Decimal | None:
627 if value in (None, ""): 627 ↛ 628line 627 didn't jump to line 628 because the condition on line 627 was never true
628 return None
629 try:
630 amount = Decimal(str(value))
631 except (InvalidOperation, ValueError, TypeError):
632 return None
633 return amount if amount.is_finite() else None
636def _override_or_metadata_decimal(override: dict | None, key: str, metadata_value) -> Decimal | None:
637 if not override or key not in override or override.get(key) in (None, ""):
638 return _metadata_decimal(metadata_value)
639 return _metadata_decimal(override.get(key))
642def _decimal_literal(value: Decimal) -> str:
643 return format(value.normalize(), "f")