Coverage for backend/django/Economics/formulas/engine/core.py: 90%
192 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1from __future__ import annotations
3from dataclasses import dataclass
4from decimal import Decimal, InvalidOperation, localcontext
5from typing import Mapping
7import sympy
9from core.auxiliary.formula_limits import validate_formula_length
12DEFAULT_EVAL_PRECISION = 50
13MAX_RESULT_ADJUSTED_EXPONENT = 30
14MAX_FORMULA_OPERATIONS = 500
15FORMULA_AUDIT_SCHEMA_VERSION = 1
16_PRECEDENCE_ADD = 1
17_PRECEDENCE_MUL = 2
18_PRECEDENCE_POW = 3
21class FormulaError(ValueError):
22 """Raised when a canonical economics formula cannot be built or evaluated."""
24 def __init__(self, code: str, message: str, *, context: dict | None = None):
25 super().__init__(message)
26 self.code = code
27 self.message = message
28 self.context = context or {}
31@dataclass(frozen=True)
32class FormulaInput:
33 key: str
34 label: str
35 unit: str = ""
36 source_property_info_id: int | None = None
39@dataclass(frozen=True)
40class FormulaStep:
41 kind: str
42 label: str
43 expression: str = ""
44 amount: Decimal | None = None
45 unit: str = ""
48@dataclass(frozen=True)
49class FormulaEvaluation:
50 value: Decimal | None
51 blocked_reason: str = ""
52 bindings: Mapping[str, Decimal | int | str] | None = None
53 conversion_diagnostics: tuple[dict[str, str], ...] = ()
56@dataclass(frozen=True)
57class EconomicsFormula:
58 key: str
59 expression: sympy.Expr
60 unit: str
61 inputs: tuple[FormulaInput, ...]
62 steps: tuple[FormulaStep, ...] = ()
63 missing_child_policy: str = ""
64 blocked_children: tuple[dict[str, str], ...] = ()
65 solve_visible: bool = True
66 blocked_reason: str = ""
68 def evaluate(self, bindings: Mapping[str, Decimal | int | str]) -> Decimal | None:
69 if self.blocked_reason:
70 return None
71 validate_formula_operation_count(self.expression)
72 substitutions = {}
73 for formula_input in self.inputs:
74 if formula_input.key not in bindings: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true
75 raise FormulaError(
76 "missing_formula_input",
77 "Formula evaluation is missing an input value.",
78 context={"formula_key": self.key, "input": formula_input.key},
79 )
80 substitutions[sympy.Symbol(formula_input.key)] = decimal_to_sympy(bindings[formula_input.key])
81 reject_non_real_powers(self.expression, substitutions=substitutions)
82 value = self.expression.subs(substitutions)
83 validate_formula_operation_count(value)
84 reject_non_real_powers(value)
85 return sympy_to_decimal(value)
87 def render_property_formula(self, render_bindings: Mapping[str, str]) -> str:
88 if self.blocked_reason: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 raise FormulaError(
90 "blocked_formula",
91 self.blocked_reason,
92 context={"formula_key": self.key},
93 )
94 return validate_formula_length(render_sympy_expression(self.expression, render_bindings))
96 def audit_payload(self, evaluation: FormulaEvaluation | None = None) -> dict:
97 bindings = evaluation.bindings if evaluation and evaluation.bindings is not None else {}
98 return {
99 "kind": audit_kind_for_key(self.key),
100 "schema_version": FORMULA_AUDIT_SCHEMA_VERSION,
101 "formula_key": self.key,
102 "formula": render_sympy_expression(self.expression, {}),
103 "unit": self.unit,
104 "inputs": [
105 {
106 "key": formula_input.key,
107 "label": formula_input.label,
108 "value": str(bindings[formula_input.key]) if formula_input.key in bindings else None,
109 "unit": formula_input.unit,
110 "source_property_info_id": formula_input.source_property_info_id,
111 }
112 for formula_input in self.inputs
113 ],
114 "steps": [
115 {
116 "kind": step.kind,
117 "label": step.label,
118 "expression": step.expression,
119 "amount": str(step.amount) if step.amount is not None else None,
120 "unit": step.unit,
121 }
122 for step in self.steps
123 ],
124 "missing_child_policy": self.missing_child_policy,
125 "blocked_children": list(self.blocked_children),
126 "conversion_diagnostics": list(evaluation.conversion_diagnostics) if evaluation else [],
127 "value": str(evaluation.value) if evaluation and evaluation.value is not None else None,
128 "blocked_reason": evaluation.blocked_reason if evaluation else self.blocked_reason,
129 "warnings": [],
130 }
133def audit_kind_for_key(formula_key: str) -> str:
134 """Return the stable audit-envelope discriminator for a formula key."""
135 if formula_key.startswith("generated_capital_line:"): 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true
136 return "generated_capital_line"
137 if formula_key.startswith("custom_capital_line:"):
138 return "custom_capital_line"
139 if formula_key.startswith("operating_line:"):
140 return "operating_line_annualization"
141 if formula_key.startswith("process_energy_contribution:"): 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true
142 return "process_energy_contribution"
143 if formula_key.startswith("metric:cash_flow") or formula_key.startswith("metric:discounted_cash_flow"): 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true
144 return "cash_flow_row"
145 if formula_key.startswith("metric:"):
146 return "financial_metric"
147 if formula_key.startswith("default_rate:"):
148 return "default_rate"
149 if formula_key == "electrical_upgrade_capex":
150 return "electrical_upgrade"
151 return formula_key.split(":", 1)[0].replace("-", "_")
154def validate_formula_operation_count(expression: sympy.Expr) -> None:
155 operation_count = int(sympy.count_ops(expression, visual=False))
156 if operation_count > MAX_FORMULA_OPERATIONS:
157 raise FormulaError(
158 "formula_too_large",
159 "Formula contains too many operations to evaluate safely.",
160 context={"operation_count": operation_count, "max_operations": MAX_FORMULA_OPERATIONS},
161 )
164def decimal_to_sympy(value: Decimal | int | str) -> sympy.Rational:
165 try:
166 decimal_value = Decimal(str(value))
167 except (InvalidOperation, TypeError, ValueError) as exc:
168 raise FormulaError(
169 "invalid_formula_number",
170 "Formula received a non-numeric value.",
171 context={"value": str(value)},
172 ) from exc
173 if not decimal_value.is_finite(): 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true
174 raise FormulaError(
175 "invalid_formula_number",
176 "Formula received a non-finite value.",
177 context={"value": str(value)},
178 )
179 return sympy.Rational(str(decimal_value))
182def sympy_to_decimal(value: sympy.Expr) -> Decimal:
183 if value.has(sympy.I): 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true
184 raise FormulaError("complex_formula_result", "Formula evaluated to a complex value.")
185 if value.free_symbols: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 raise FormulaError(
187 "unbound_formula_symbols",
188 "Formula still contains symbols after substitution.",
189 context={"symbols": sorted(str(symbol) for symbol in value.free_symbols)},
190 )
191 if value.is_real is False:
192 raise FormulaError("non_real_formula_result", "Formula evaluated to a non-real value.")
194 with localcontext() as context:
195 context.prec = DEFAULT_EVAL_PRECISION
196 if value.is_Rational:
197 decimal_value = Decimal(int(value.p)) / Decimal(int(value.q))
198 else:
199 decimal_value = Decimal(str(sympy.N(value, DEFAULT_EVAL_PRECISION)))
201 if not decimal_value.is_finite(): 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 raise FormulaError("non_finite_formula_result", "Formula evaluated to a non-finite value.")
203 if abs(decimal_value.adjusted()) > MAX_RESULT_ADJUSTED_EXPONENT:
204 raise FormulaError(
205 "formula_result_too_large",
206 "Formula evaluated to a value outside the supported numeric range.",
207 context={"value": str(decimal_value)},
208 )
209 return decimal_value
212def render_sympy_expression(expression: sympy.Expr, render_bindings: Mapping[str, str]) -> str:
213 return _render_expression(expression, render_bindings, parent_precedence=0)
216def _render_expression(
217 expression: sympy.Expr,
218 render_bindings: Mapping[str, str],
219 *,
220 parent_precedence: int,
221) -> str:
222 if expression.is_Symbol:
223 return render_bindings.get(str(expression), str(expression))
224 if expression.is_Number:
225 return _number_literal(expression)
226 if expression.is_Add:
227 rendered = _render_add(expression, render_bindings)
228 return _parenthesize_if_needed(rendered, _PRECEDENCE_ADD, parent_precedence)
229 if expression.is_Mul:
230 rendered = _render_mul(expression, render_bindings)
231 return _parenthesize_if_needed(rendered, _PRECEDENCE_MUL, parent_precedence)
232 if expression.is_Pow: 232 ↛ 239line 232 didn't jump to line 239 because the condition on line 232 was always true
233 base, exponent = expression.args
234 rendered = (
235 f"{_render_power_base(base, render_bindings)}"
236 f" ** {_render_expression(exponent, render_bindings, parent_precedence=_PRECEDENCE_POW)}"
237 )
238 return _parenthesize_if_needed(rendered, _PRECEDENCE_POW, parent_precedence)
239 raise FormulaError(
240 "unsupported_formula_node",
241 "Formula contains unsupported symbolic structure.",
242 context={"node": type(expression).__name__},
243 )
246def reject_non_real_powers(
247 expression: sympy.Expr,
248 *,
249 substitutions: Mapping[sympy.Symbol, sympy.Expr] | None = None,
250) -> None:
251 for power in expression.atoms(sympy.Pow):
252 base, exponent = power.args
253 if substitutions:
254 base = base.subs(substitutions)
255 exponent = exponent.subs(substitutions)
256 if not base.is_number or not exponent.is_number: 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true
257 continue
258 if base.is_real is False or exponent.is_real is False: 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true
259 raise FormulaError("non_real_power", "Formula contains a non-real power operation.")
260 if base.is_real and exponent.is_real and base < 0 and not exponent.is_integer:
261 raise FormulaError(
262 "invalid_fractional_power",
263 "Fractional powers require a non-negative base.",
264 context={"base": str(base), "exponent": str(exponent)},
265 )
268def _number_literal(value: sympy.Expr) -> str:
269 if value.is_Integer:
270 return str(int(value))
271 if value.is_Rational: 271 ↛ 274line 271 didn't jump to line 274 because the condition on line 271 was always true
272 decimal_value = sympy_to_decimal(value)
273 return format(decimal_value, "f").rstrip("0").rstrip(".") or "0"
274 return str(value)
277def _render_add(expression: sympy.Expr, render_bindings: Mapping[str, str]) -> str:
278 terms: list[str] = []
279 for arg in expression.args:
280 positive_arg = _positive_if_negative(arg)
281 if positive_arg is None:
282 rendered_arg = _render_expression(arg, render_bindings, parent_precedence=_PRECEDENCE_ADD)
283 terms.append(rendered_arg if not terms else f" + {rendered_arg}")
284 continue
286 rendered_arg = _render_expression(positive_arg, render_bindings, parent_precedence=_PRECEDENCE_ADD)
287 terms.append(f"-{rendered_arg}" if not terms else f" - {rendered_arg}")
288 return "".join(terms)
291def _render_mul(expression: sympy.Expr, render_bindings: Mapping[str, str]) -> str:
292 positive_expression = _positive_if_negative(expression)
293 if positive_expression is not None:
294 return f"-{_render_expression(positive_expression, render_bindings, parent_precedence=_PRECEDENCE_MUL)}"
295 return " * ".join(
296 _render_expression(arg, render_bindings, parent_precedence=_PRECEDENCE_MUL)
297 for arg in expression.args
298 )
301def _render_power_base(expression: sympy.Expr, render_bindings: Mapping[str, str]) -> str:
302 rendered = _render_expression(expression, render_bindings, parent_precedence=_PRECEDENCE_POW)
303 if expression.is_Pow:
304 return f"({rendered})"
305 return rendered
308def _positive_if_negative(expression: sympy.Expr) -> sympy.Expr | None:
309 if expression.is_Number:
310 if expression < 0:
311 return -expression
312 return None
314 if not expression.is_Mul:
315 return None
317 coefficient, remainder = expression.as_coeff_Mul()
318 if coefficient >= 0:
319 return None
320 if coefficient == -1:
321 return remainder
322 return sympy.Mul(-coefficient, remainder, evaluate=False)
325def _parenthesize_if_needed(rendered: str, precedence: int, parent_precedence: int) -> str:
326 if precedence < parent_precedence:
327 return f"({rendered})"
328 return rendered