Coverage for backend/django/Economics/formulas/engine/core.py: 90%

192 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1from __future__ import annotations 

2 

3from dataclasses import dataclass 

4from decimal import Decimal, InvalidOperation, localcontext 

5from typing import Mapping 

6 

7import sympy 

8 

9from core.auxiliary.formula_limits import validate_formula_length 

10 

11 

12DEFAULT_EVAL_PRECISION = 50 

13MAX_RESULT_ADJUSTED_EXPONENT = 30 

14MAX_FORMULA_OPERATIONS = 500 

15FORMULA_AUDIT_SCHEMA_VERSION = 1 

16_PRECEDENCE_ADD = 1 

17_PRECEDENCE_MUL = 2 

18_PRECEDENCE_POW = 3 

19 

20 

21class FormulaError(ValueError): 

22 """Raised when a canonical economics formula cannot be built or evaluated.""" 

23 

24 def __init__(self, code: str, message: str, *, context: dict | None = None): 

25 super().__init__(message) 

26 self.code = code 

27 self.message = message 

28 self.context = context or {} 

29 

30 

31@dataclass(frozen=True) 

32class FormulaInput: 

33 key: str 

34 label: str 

35 unit: str = "" 

36 source_property_info_id: int | None = None 

37 

38 

39@dataclass(frozen=True) 

40class FormulaStep: 

41 kind: str 

42 label: str 

43 expression: str = "" 

44 amount: Decimal | None = None 

45 unit: str = "" 

46 

47 

48@dataclass(frozen=True) 

49class FormulaEvaluation: 

50 value: Decimal | None 

51 blocked_reason: str = "" 

52 bindings: Mapping[str, Decimal | int | str] | None = None 

53 conversion_diagnostics: tuple[dict[str, str], ...] = () 

54 

55 

56@dataclass(frozen=True) 

57class EconomicsFormula: 

58 key: str 

59 expression: sympy.Expr 

60 unit: str 

61 inputs: tuple[FormulaInput, ...] 

62 steps: tuple[FormulaStep, ...] = () 

63 missing_child_policy: str = "" 

64 blocked_children: tuple[dict[str, str], ...] = () 

65 solve_visible: bool = True 

66 blocked_reason: str = "" 

67 

68 def evaluate(self, bindings: Mapping[str, Decimal | int | str]) -> Decimal | None: 

69 if self.blocked_reason: 

70 return None 

71 validate_formula_operation_count(self.expression) 

72 substitutions = {} 

73 for formula_input in self.inputs: 

74 if formula_input.key not in bindings: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true

75 raise FormulaError( 

76 "missing_formula_input", 

77 "Formula evaluation is missing an input value.", 

78 context={"formula_key": self.key, "input": formula_input.key}, 

79 ) 

80 substitutions[sympy.Symbol(formula_input.key)] = decimal_to_sympy(bindings[formula_input.key]) 

81 reject_non_real_powers(self.expression, substitutions=substitutions) 

82 value = self.expression.subs(substitutions) 

83 validate_formula_operation_count(value) 

84 reject_non_real_powers(value) 

85 return sympy_to_decimal(value) 

86 

87 def render_property_formula(self, render_bindings: Mapping[str, str]) -> str: 

88 if self.blocked_reason: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 raise FormulaError( 

90 "blocked_formula", 

91 self.blocked_reason, 

92 context={"formula_key": self.key}, 

93 ) 

94 return validate_formula_length(render_sympy_expression(self.expression, render_bindings)) 

95 

96 def audit_payload(self, evaluation: FormulaEvaluation | None = None) -> dict: 

97 bindings = evaluation.bindings if evaluation and evaluation.bindings is not None else {} 

98 return { 

99 "kind": audit_kind_for_key(self.key), 

100 "schema_version": FORMULA_AUDIT_SCHEMA_VERSION, 

101 "formula_key": self.key, 

102 "formula": render_sympy_expression(self.expression, {}), 

103 "unit": self.unit, 

104 "inputs": [ 

105 { 

106 "key": formula_input.key, 

107 "label": formula_input.label, 

108 "value": str(bindings[formula_input.key]) if formula_input.key in bindings else None, 

109 "unit": formula_input.unit, 

110 "source_property_info_id": formula_input.source_property_info_id, 

111 } 

112 for formula_input in self.inputs 

113 ], 

114 "steps": [ 

115 { 

116 "kind": step.kind, 

117 "label": step.label, 

118 "expression": step.expression, 

119 "amount": str(step.amount) if step.amount is not None else None, 

120 "unit": step.unit, 

121 } 

122 for step in self.steps 

123 ], 

124 "missing_child_policy": self.missing_child_policy, 

125 "blocked_children": list(self.blocked_children), 

126 "conversion_diagnostics": list(evaluation.conversion_diagnostics) if evaluation else [], 

127 "value": str(evaluation.value) if evaluation and evaluation.value is not None else None, 

128 "blocked_reason": evaluation.blocked_reason if evaluation else self.blocked_reason, 

129 "warnings": [], 

130 } 

131 

132 

133def audit_kind_for_key(formula_key: str) -> str: 

134 """Return the stable audit-envelope discriminator for a formula key.""" 

135 if formula_key.startswith("generated_capital_line:"): 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true

136 return "generated_capital_line" 

137 if formula_key.startswith("custom_capital_line:"): 

138 return "custom_capital_line" 

139 if formula_key.startswith("operating_line:"): 

140 return "operating_line_annualization" 

141 if formula_key.startswith("process_energy_contribution:"): 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true

142 return "process_energy_contribution" 

143 if formula_key.startswith("metric:cash_flow") or formula_key.startswith("metric:discounted_cash_flow"): 143 ↛ 144line 143 didn't jump to line 144 because the condition on line 143 was never true

144 return "cash_flow_row" 

145 if formula_key.startswith("metric:"): 

146 return "financial_metric" 

147 if formula_key.startswith("default_rate:"): 

148 return "default_rate" 

149 if formula_key == "electrical_upgrade_capex": 

150 return "electrical_upgrade" 

151 return formula_key.split(":", 1)[0].replace("-", "_") 

152 

153 

154def validate_formula_operation_count(expression: sympy.Expr) -> None: 

155 operation_count = int(sympy.count_ops(expression, visual=False)) 

156 if operation_count > MAX_FORMULA_OPERATIONS: 

157 raise FormulaError( 

158 "formula_too_large", 

159 "Formula contains too many operations to evaluate safely.", 

160 context={"operation_count": operation_count, "max_operations": MAX_FORMULA_OPERATIONS}, 

161 ) 

162 

163 

164def decimal_to_sympy(value: Decimal | int | str) -> sympy.Rational: 

165 try: 

166 decimal_value = Decimal(str(value)) 

167 except (InvalidOperation, TypeError, ValueError) as exc: 

168 raise FormulaError( 

169 "invalid_formula_number", 

170 "Formula received a non-numeric value.", 

171 context={"value": str(value)}, 

172 ) from exc 

173 if not decimal_value.is_finite(): 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true

174 raise FormulaError( 

175 "invalid_formula_number", 

176 "Formula received a non-finite value.", 

177 context={"value": str(value)}, 

178 ) 

179 return sympy.Rational(str(decimal_value)) 

180 

181 

182def sympy_to_decimal(value: sympy.Expr) -> Decimal: 

183 if value.has(sympy.I): 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true

184 raise FormulaError("complex_formula_result", "Formula evaluated to a complex value.") 

185 if value.free_symbols: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 raise FormulaError( 

187 "unbound_formula_symbols", 

188 "Formula still contains symbols after substitution.", 

189 context={"symbols": sorted(str(symbol) for symbol in value.free_symbols)}, 

190 ) 

191 if value.is_real is False: 

192 raise FormulaError("non_real_formula_result", "Formula evaluated to a non-real value.") 

193 

194 with localcontext() as context: 

195 context.prec = DEFAULT_EVAL_PRECISION 

196 if value.is_Rational: 

197 decimal_value = Decimal(int(value.p)) / Decimal(int(value.q)) 

198 else: 

199 decimal_value = Decimal(str(sympy.N(value, DEFAULT_EVAL_PRECISION))) 

200 

201 if not decimal_value.is_finite(): 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 raise FormulaError("non_finite_formula_result", "Formula evaluated to a non-finite value.") 

203 if abs(decimal_value.adjusted()) > MAX_RESULT_ADJUSTED_EXPONENT: 

204 raise FormulaError( 

205 "formula_result_too_large", 

206 "Formula evaluated to a value outside the supported numeric range.", 

207 context={"value": str(decimal_value)}, 

208 ) 

209 return decimal_value 

210 

211 

212def render_sympy_expression(expression: sympy.Expr, render_bindings: Mapping[str, str]) -> str: 

213 return _render_expression(expression, render_bindings, parent_precedence=0) 

214 

215 

216def _render_expression( 

217 expression: sympy.Expr, 

218 render_bindings: Mapping[str, str], 

219 *, 

220 parent_precedence: int, 

221) -> str: 

222 if expression.is_Symbol: 

223 return render_bindings.get(str(expression), str(expression)) 

224 if expression.is_Number: 

225 return _number_literal(expression) 

226 if expression.is_Add: 

227 rendered = _render_add(expression, render_bindings) 

228 return _parenthesize_if_needed(rendered, _PRECEDENCE_ADD, parent_precedence) 

229 if expression.is_Mul: 

230 rendered = _render_mul(expression, render_bindings) 

231 return _parenthesize_if_needed(rendered, _PRECEDENCE_MUL, parent_precedence) 

232 if expression.is_Pow: 232 ↛ 239line 232 didn't jump to line 239 because the condition on line 232 was always true

233 base, exponent = expression.args 

234 rendered = ( 

235 f"{_render_power_base(base, render_bindings)}" 

236 f" ** {_render_expression(exponent, render_bindings, parent_precedence=_PRECEDENCE_POW)}" 

237 ) 

238 return _parenthesize_if_needed(rendered, _PRECEDENCE_POW, parent_precedence) 

239 raise FormulaError( 

240 "unsupported_formula_node", 

241 "Formula contains unsupported symbolic structure.", 

242 context={"node": type(expression).__name__}, 

243 ) 

244 

245 

246def reject_non_real_powers( 

247 expression: sympy.Expr, 

248 *, 

249 substitutions: Mapping[sympy.Symbol, sympy.Expr] | None = None, 

250) -> None: 

251 for power in expression.atoms(sympy.Pow): 

252 base, exponent = power.args 

253 if substitutions: 

254 base = base.subs(substitutions) 

255 exponent = exponent.subs(substitutions) 

256 if not base.is_number or not exponent.is_number: 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true

257 continue 

258 if base.is_real is False or exponent.is_real is False: 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true

259 raise FormulaError("non_real_power", "Formula contains a non-real power operation.") 

260 if base.is_real and exponent.is_real and base < 0 and not exponent.is_integer: 

261 raise FormulaError( 

262 "invalid_fractional_power", 

263 "Fractional powers require a non-negative base.", 

264 context={"base": str(base), "exponent": str(exponent)}, 

265 ) 

266 

267 

268def _number_literal(value: sympy.Expr) -> str: 

269 if value.is_Integer: 

270 return str(int(value)) 

271 if value.is_Rational: 271 ↛ 274line 271 didn't jump to line 274 because the condition on line 271 was always true

272 decimal_value = sympy_to_decimal(value) 

273 return format(decimal_value, "f").rstrip("0").rstrip(".") or "0" 

274 return str(value) 

275 

276 

277def _render_add(expression: sympy.Expr, render_bindings: Mapping[str, str]) -> str: 

278 terms: list[str] = [] 

279 for arg in expression.args: 

280 positive_arg = _positive_if_negative(arg) 

281 if positive_arg is None: 

282 rendered_arg = _render_expression(arg, render_bindings, parent_precedence=_PRECEDENCE_ADD) 

283 terms.append(rendered_arg if not terms else f" + {rendered_arg}") 

284 continue 

285 

286 rendered_arg = _render_expression(positive_arg, render_bindings, parent_precedence=_PRECEDENCE_ADD) 

287 terms.append(f"-{rendered_arg}" if not terms else f" - {rendered_arg}") 

288 return "".join(terms) 

289 

290 

291def _render_mul(expression: sympy.Expr, render_bindings: Mapping[str, str]) -> str: 

292 positive_expression = _positive_if_negative(expression) 

293 if positive_expression is not None: 

294 return f"-{_render_expression(positive_expression, render_bindings, parent_precedence=_PRECEDENCE_MUL)}" 

295 return " * ".join( 

296 _render_expression(arg, render_bindings, parent_precedence=_PRECEDENCE_MUL) 

297 for arg in expression.args 

298 ) 

299 

300 

301def _render_power_base(expression: sympy.Expr, render_bindings: Mapping[str, str]) -> str: 

302 rendered = _render_expression(expression, render_bindings, parent_precedence=_PRECEDENCE_POW) 

303 if expression.is_Pow: 

304 return f"({rendered})" 

305 return rendered 

306 

307 

308def _positive_if_negative(expression: sympy.Expr) -> sympy.Expr | None: 

309 if expression.is_Number: 

310 if expression < 0: 

311 return -expression 

312 return None 

313 

314 if not expression.is_Mul: 

315 return None 

316 

317 coefficient, remainder = expression.as_coeff_Mul() 

318 if coefficient >= 0: 

319 return None 

320 if coefficient == -1: 

321 return remainder 

322 return sympy.Mul(-coefficient, remainder, evaluate=False) 

323 

324 

325def _parenthesize_if_needed(rendered: str, precedence: int, parent_precedence: int) -> str: 

326 if precedence < parent_precedence: 

327 return f"({rendered})" 

328 return rendered