Coverage for backend/ahuora-builder/src/ahuora_builder/methods/expression_parsing.py: 91%
174 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1"""Parse factory-emitted expression strings into Pyomo expressions.
3Supported expression-language functions are intentionally narrow. The Django
4factory emits lowercase ``min(arg, arg, ...)`` and ``max(arg, arg, ...)`` for
5property-key aggregates, plus ``convert(expr, unit)`` when generated formulas
6need an explicit unit basis. Uppercase or mixed-case variants remain invalid so
7that user-facing aggregate names are resolved before expressions reach the
8builder.
9"""
11import re
12from typing import Any
13from pyomo.environ import Expr_if, Expression, Param
14from pyomo.core.base.expression import ExpressionData
15from pyomo.core.base.units_container import units as pyomo_units, _PyomoUnit
16from idaes.core.util.math import smooth_min
17from sympy import Symbol
18from sympy.parsing.sympy_parser import parse_expr
19from pyomo.core.expr.sympy_tools import sympy2pyomo_expression, PyomoSympyBimap
20from pyomo.core.base.indexed_component import IndexedComponent
21from ahuora_builder.properties_manager import PropertiesManager
22from .slice_manipulation import is_scalar_reference
23class ExpressionParsingError(Exception):
24 """Custom exception for errors during expression parsing."""
25 pass
27# add extra units to pyomo's units library
28# could probably make this on-demand
29ureg = pyomo_units._pint_registry
30ureg.define("dollar = [currency]")
31ureg.define("NZD = dollar")
32FIXED_FX_RATES_PER_NZD = {
33 "USD": 0.5849305057151759,
34 "AUD": 0.8203239576440228,
35 "EUR": 0.5039856740506515,
36 "GBP": 0.4352380389574088,
37}
38for currency_code, units_per_nzd in FIXED_FX_RATES_PER_NZD.items():
39 ureg.define(f"{currency_code} = {1 / units_per_nzd} * NZD")
41# Builder function grammar: lowercase special function identifiers followed by
42# a parenthesized comma-separated argument list. Uppercase or mixed-case calls
43# are rejected explicitly instead of being handed to SymPy.
44SPECIAL_FUNCTION_NAMES = {"min", "max", "convert"}
45SPECIAL_FUNCTION_PATTERN = re.compile(r"\b(min|max|convert)\s*\(")
46SPECIAL_FUNCTION_CASE_VARIANT_PATTERN = re.compile(r"\b([A-Za-z_][A-Za-z0-9_]*)\s*\(")
47SMOOTH_MIN_EPSILON = 1e-6
50def handle_special_chars(expr: str) -> str:
51 # replace special characters so they can be parsed
52 expr = expr.replace("^", "**")
53 expr = expr.replace("$", "dollar")
55 return expr
58def get_property_from_id(fs, property_id, time_index):
59 """Return the Pyomo property data object referenced by a property id."""
61 properties_map : PropertiesManager = fs.properties_map
62 pyomo_object: IndexedComponent = properties_map.get_component(property_id)
64 if pyomo_object is None: 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true
65 raise ValueError(f"Symbol with id {property_id} not found in model")
66 # check if this is a time-indexed var, and if so get the value at the given time index
67 if is_scalar_reference(pyomo_object):
68 # reference with index None
69 return pyomo_object[None]
70 elif pyomo_object.index_set() == fs.time: 70 ↛ 73line 70 didn't jump to line 73 because the condition on line 70 was always true
71 return pyomo_object[time_index]
72 else:
73 raise NotImplementedError("Only 0D and 1D time-indexed properties are supported in expressions")
76def get_property_expression_for_symbol(fs, property_id, time_index):
77 """Return the value expression parsing should substitute for an ``id_`` symbol."""
79 value = get_property_from_id(fs, property_id, time_index)
80 if isinstance(value, ExpressionData):
81 return value.expr
82 return value
85def evaluate_symbol(fs, symbol: str,time_index) -> Any:
86 if symbol.lower() == "time" or symbol.lower() == "t": 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true
87 return float(time_index)
88 if symbol.startswith("id_"):
89 # get the property from flowsheet properties_map
90 id = int(symbol[3:])
91 return get_property_expression_for_symbol(fs, id, time_index)
92 else:
93 # assume its a unit, eg. "m" or "kg"
94 # get the unit from pint, pyomo's units library
95 ureg = pyomo_units._pint_registry
96 pint_unit = getattr(ureg, symbol)
97 pyomo_unit = _PyomoUnit(pint_unit, ureg)
98 # We want people to write expressions such as (10 * W + 5 * kW). Pyomo doesn't natively support this,
99 # so we can always convert to base units.
100 if symbol == "delta_degC" or symbol == "delta_degF":
101 # special case, because degC is not a base unit
102 return 1 * pyomo_unit
103 #return _PyomoUnit(ureg.delta_degC)
104 elif symbol == "degC" or symbol == "degF": 104 ↛ 107line 104 didn't jump to line 107 because the condition on line 104 was never true
105 # throw an error (we do not support this, as it is unclear what to do)
106 # https://pyomo.readthedocs.io/en/6.8.1/explanation/modeling/units.html
107 raise ValueError(f"Use relative temperature units (delta_degC, delta_degF) or absolute temperature units (K, degF). Cannot use {symbol} as addition and multiplication is inconsistent on non-absolute units")
108 scale_factor, base_units = ureg.get_base_units(pint_unit, check_nonmult=True) # TODO: handle degC etc.
109 base_pyomo_unit = _PyomoUnit(base_units, ureg)
110 return pyomo_units.convert( 1 * pyomo_unit, to_units=base_pyomo_unit)
113def split_function_args(arguments: str) -> list[str]:
114 """Split function arguments without treating nested commas as separators."""
116 args = []
117 start = 0
118 depth = 0
119 for index, char in enumerate(arguments):
120 if char == "(":
121 depth += 1
122 elif char == ")":
123 depth -= 1
124 if depth < 0: 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true
125 raise ExpressionParsingError(
126 f"Unexpected closing parenthesis in function arguments '{arguments}'"
127 )
128 elif char == "," and depth == 0:
129 args.append(arguments[start:index].strip())
130 start = index + 1
131 args.append(arguments[start:].strip())
132 if depth != 0: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true
133 raise ExpressionParsingError(
134 f"Unbalanced parentheses in function arguments '{arguments}'"
135 )
136 return args
139def find_matching_paren(expression: str, open_index: int) -> int:
140 """Find the closing parenthesis matching the opening parenthesis at open_index."""
142 depth = 0
143 for index in range(open_index, len(expression)):
144 if expression[index] == "(":
145 depth += 1
146 elif expression[index] == ")":
147 depth -= 1
148 if depth == 0:
149 return index
150 raise ExpressionParsingError(f"Unclosed function call in expression '{expression}'")
153def _smooth_min_epsilon(model, expression: Any) -> Any:
154 """Return a small smoothing term compatible with IDAES ``smooth_min``.
156 IDAES accepts a numeric epsilon for unitless expressions, but unitful
157 expressions need a Pyomo ``Param`` carrying the same units so Pyomo can
158 prove ``sqrt((a - b)^2 + eps^2)`` is dimensionally consistent.
159 """
161 expression_units = pyomo_units.get_units(expression)
162 if expression_units is None: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true
163 return SMOOTH_MIN_EPSILON
165 unit_key = str(expression_units)
166 epsilon_cache = getattr(model, "_ahuora_smooth_min_epsilon_by_unit", None)
167 if epsilon_cache is None:
168 epsilon_cache = {}
169 setattr(model, "_ahuora_smooth_min_epsilon_by_unit", epsilon_cache)
170 if unit_key in epsilon_cache:
171 return epsilon_cache[unit_key]
173 component_index = len(epsilon_cache)
174 while True:
175 component_name = f"_ahuora_smooth_min_epsilon_{component_index}"
176 if not hasattr(model, component_name): 176 ↛ 178line 176 didn't jump to line 178 because the condition on line 176 was always true
177 break
178 component_index += 1
180 epsilon = Param(initialize=SMOOTH_MIN_EPSILON, units=expression_units)
181 model.add_component(component_name, epsilon)
182 epsilon_cache[unit_key] = epsilon
183 return epsilon
186def _pairwise_min_max(model, function_name: str, args: list[Any]) -> Any:
187 """Build a left-folded expression-language min/max expression.
189 ``min`` lowers to IDAES ``smooth_min`` so objective functions remain
190 differentiable. ``max`` keeps the existing exact conditional semantics
191 until there is a product requirement to smooth it too.
192 """
194 expression = args[0]
195 for arg in args[1:]:
196 expression_units = pyomo_units.get_units(expression)
197 arg_units = pyomo_units.get_units(arg)
198 if expression_units is not None and arg_units is not None: 198 ↛ 200line 198 didn't jump to line 200 because the condition on line 198 was always true
199 arg = pyomo_units.convert(arg, to_units=expression_units)
200 if function_name == "min":
201 expression = smooth_min(expression, arg, eps=_smooth_min_epsilon(model, expression))
202 else:
203 expression = Expr_if(IF_=expression >= arg, THEN_=expression, ELSE_=arg)
204 return expression
207def _convert_to_units(expression: Any, target_quantity: Any) -> Any:
208 """Convert an expression to the units carried by a parsed unit quantity."""
210 target_units = pyomo_units.get_units(target_quantity)
211 if target_units is None: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 raise ExpressionParsingError("convert target must include units")
213 return pyomo_units.convert(expression, to_units=target_units)
216def replace_special_functions(expression: str, model, time_index) -> tuple[str, dict[str, Any]]:
217 """Lower expression-language special calls before handing the rest to SymPy.
219 SymPy can parse the names ``min`` and ``max``, but the result is not a Pyomo
220 expression. Django emits lowercase builder expression-language calls for
221 aggregate ``MIN``/``MAX`` and generated unit coercions; this pass lowers
222 those calls into Pyomo placeholders whose expressions are returned by the
223 bimap unchanged.
224 """
226 special_symbols = {}
228 while True:
229 case_variant = next(
230 (
231 match
232 for match in SPECIAL_FUNCTION_CASE_VARIANT_PATTERN.finditer(expression)
233 if match.group(1).lower() in SPECIAL_FUNCTION_NAMES
234 and match.group(1) != match.group(1).lower()
235 ),
236 None,
237 )
238 if case_variant is not None:
239 raise ExpressionParsingError(
240 f"{case_variant.group(1)} must be written as lowercase "
241 f"{case_variant.group(1).lower()}"
242 )
244 match = SPECIAL_FUNCTION_PATTERN.search(expression)
245 if match is None:
246 return expression, special_symbols
248 open_index = match.end() - 1
249 close_index = find_matching_paren(expression, open_index)
250 function_name = match.group(1)
251 args = split_function_args(expression[open_index + 1:close_index])
252 if any(arg == "" for arg in args):
253 raise ExpressionParsingError(f"{function_name} arguments cannot be empty")
254 if function_name in {"min", "max"} and len(args) < 2:
255 raise ExpressionParsingError(
256 f"{function_name} expects at least 2 arguments, got {len(args)}"
257 )
258 if function_name == "convert" and len(args) != 2:
259 raise ExpressionParsingError(
260 f"convert expects exactly 2 arguments, got {len(args)}"
261 )
262 parsed_args = [parse_expression(arg, model, time_index) for arg in args]
263 placeholder = f"__ahuora_special_{len(special_symbols)}"
264 if function_name == "convert":
265 special_symbols[placeholder] = _convert_to_units(parsed_args[0], parsed_args[1])
266 else:
267 special_symbols[placeholder] = _pairwise_min_max(model, function_name, parsed_args)
268 expression = expression[:match.start()] + placeholder + expression[close_index + 1:]
271class PyomoSympyMap(PyomoSympyBimap):
272 """Map expression symbols to Pyomo objects during SymPy-to-Pyomo conversion."""
274 def __init__(self, model,time_index, special_symbols: dict[str, Any] | None = None):
275 self.model = model
276 self.time_index = time_index
277 self.special_symbols = special_symbols or {}
279 def getPyomoSymbol(self, sympy_object: Symbol, default=None):
280 if not isinstance(sympy_object, Symbol):
281 return None # It's not in pyomo, e.g a number or something
282 if sympy_object.name in self.special_symbols:
283 return self.special_symbols[sympy_object.name]
284 return evaluate_symbol(self.model, sympy_object.name, self.time_index)
286 def getSympySymbol(self, pyomo_object, default=None):
287 raise NotImplementedError(
288 "getSympySymbol not implemented, because it shouldn't be needed"
289 )
290 # we don't care, it only needs to go one way
292 def sympyVars(self):
293 raise NotImplementedError(
294 "sympyVars not implemented, because it shouldn't be needed"
295 )
298def parse_expression(expression, model,time_index) -> Expression:
299 """Parse an Ahuora expression string into a Pyomo expression."""
301 # use the bimap to get the correct pyomo object for each symbol
302 try:
303 expression = handle_special_chars(expression)
304 expression, special_symbols = replace_special_functions(expression, model, time_index)
305 bimap = PyomoSympyMap(model,time_index, special_symbols)
306 sympy_expr = parse_expr(expression)
307 pyomo_expr = sympy2pyomo_expression(sympy_expr, bimap)
308 except Exception as e:
309 raise ExpressionParsingError(f"{expression}: error: {e}")
310 return pyomo_expr