Coverage for backend/ahuora-builder/src/ahuora_builder/methods/expression_parsing.py: 91%

174 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1"""Parse factory-emitted expression strings into Pyomo expressions. 

2 

3Supported expression-language functions are intentionally narrow. The Django 

4factory emits lowercase ``min(arg, arg, ...)`` and ``max(arg, arg, ...)`` for 

5property-key aggregates, plus ``convert(expr, unit)`` when generated formulas 

6need an explicit unit basis. Uppercase or mixed-case variants remain invalid so 

7that user-facing aggregate names are resolved before expressions reach the 

8builder. 

9""" 

10 

11import re 

12from typing import Any 

13from pyomo.environ import Expr_if, Expression, Param 

14from pyomo.core.base.expression import ExpressionData 

15from pyomo.core.base.units_container import units as pyomo_units, _PyomoUnit 

16from idaes.core.util.math import smooth_min 

17from sympy import Symbol 

18from sympy.parsing.sympy_parser import parse_expr 

19from pyomo.core.expr.sympy_tools import sympy2pyomo_expression, PyomoSympyBimap 

20from pyomo.core.base.indexed_component import IndexedComponent 

21from ahuora_builder.properties_manager import PropertiesManager 

22from .slice_manipulation import is_scalar_reference 

23class ExpressionParsingError(Exception): 

24 """Custom exception for errors during expression parsing.""" 

25 pass 

26 

27# add extra units to pyomo's units library 

28# could probably make this on-demand 

29ureg = pyomo_units._pint_registry 

30ureg.define("dollar = [currency]") 

31ureg.define("NZD = dollar") 

32FIXED_FX_RATES_PER_NZD = { 

33 "USD": 0.5849305057151759, 

34 "AUD": 0.8203239576440228, 

35 "EUR": 0.5039856740506515, 

36 "GBP": 0.4352380389574088, 

37} 

38for currency_code, units_per_nzd in FIXED_FX_RATES_PER_NZD.items(): 

39 ureg.define(f"{currency_code} = {1 / units_per_nzd} * NZD") 

40 

41# Builder function grammar: lowercase special function identifiers followed by 

42# a parenthesized comma-separated argument list. Uppercase or mixed-case calls 

43# are rejected explicitly instead of being handed to SymPy. 

44SPECIAL_FUNCTION_NAMES = {"min", "max", "convert"} 

45SPECIAL_FUNCTION_PATTERN = re.compile(r"\b(min|max|convert)\s*\(") 

46SPECIAL_FUNCTION_CASE_VARIANT_PATTERN = re.compile(r"\b([A-Za-z_][A-Za-z0-9_]*)\s*\(") 

47SMOOTH_MIN_EPSILON = 1e-6 

48 

49 

50def handle_special_chars(expr: str) -> str: 

51 # replace special characters so they can be parsed 

52 expr = expr.replace("^", "**") 

53 expr = expr.replace("$", "dollar") 

54 

55 return expr 

56 

57 

58def get_property_from_id(fs, property_id, time_index): 

59 """Return the Pyomo property data object referenced by a property id.""" 

60 

61 properties_map : PropertiesManager = fs.properties_map 

62 pyomo_object: IndexedComponent = properties_map.get_component(property_id) 

63 

64 if pyomo_object is None: 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true

65 raise ValueError(f"Symbol with id {property_id} not found in model") 

66 # check if this is a time-indexed var, and if so get the value at the given time index 

67 if is_scalar_reference(pyomo_object): 

68 # reference with index None 

69 return pyomo_object[None] 

70 elif pyomo_object.index_set() == fs.time: 70 ↛ 73line 70 didn't jump to line 73 because the condition on line 70 was always true

71 return pyomo_object[time_index] 

72 else: 

73 raise NotImplementedError("Only 0D and 1D time-indexed properties are supported in expressions") 

74 

75 

76def get_property_expression_for_symbol(fs, property_id, time_index): 

77 """Return the value expression parsing should substitute for an ``id_`` symbol.""" 

78 

79 value = get_property_from_id(fs, property_id, time_index) 

80 if isinstance(value, ExpressionData): 

81 return value.expr 

82 return value 

83 

84 

85def evaluate_symbol(fs, symbol: str,time_index) -> Any: 

86 if symbol.lower() == "time" or symbol.lower() == "t": 86 ↛ 87line 86 didn't jump to line 87 because the condition on line 86 was never true

87 return float(time_index) 

88 if symbol.startswith("id_"): 

89 # get the property from flowsheet properties_map 

90 id = int(symbol[3:]) 

91 return get_property_expression_for_symbol(fs, id, time_index) 

92 else: 

93 # assume its a unit, eg. "m" or "kg" 

94 # get the unit from pint, pyomo's units library 

95 ureg = pyomo_units._pint_registry 

96 pint_unit = getattr(ureg, symbol) 

97 pyomo_unit = _PyomoUnit(pint_unit, ureg) 

98 # We want people to write expressions such as (10 * W + 5 * kW). Pyomo doesn't natively support this, 

99 # so we can always convert to base units. 

100 if symbol == "delta_degC" or symbol == "delta_degF": 

101 # special case, because degC is not a base unit 

102 return 1 * pyomo_unit 

103 #return _PyomoUnit(ureg.delta_degC) 

104 elif symbol == "degC" or symbol == "degF": 104 ↛ 107line 104 didn't jump to line 107 because the condition on line 104 was never true

105 # throw an error (we do not support this, as it is unclear what to do) 

106 # https://pyomo.readthedocs.io/en/6.8.1/explanation/modeling/units.html 

107 raise ValueError(f"Use relative temperature units (delta_degC, delta_degF) or absolute temperature units (K, degF). Cannot use {symbol} as addition and multiplication is inconsistent on non-absolute units") 

108 scale_factor, base_units = ureg.get_base_units(pint_unit, check_nonmult=True) # TODO: handle degC etc. 

109 base_pyomo_unit = _PyomoUnit(base_units, ureg) 

110 return pyomo_units.convert( 1 * pyomo_unit, to_units=base_pyomo_unit) 

111 

112 

113def split_function_args(arguments: str) -> list[str]: 

114 """Split function arguments without treating nested commas as separators.""" 

115 

116 args = [] 

117 start = 0 

118 depth = 0 

119 for index, char in enumerate(arguments): 

120 if char == "(": 

121 depth += 1 

122 elif char == ")": 

123 depth -= 1 

124 if depth < 0: 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true

125 raise ExpressionParsingError( 

126 f"Unexpected closing parenthesis in function arguments '{arguments}'" 

127 ) 

128 elif char == "," and depth == 0: 

129 args.append(arguments[start:index].strip()) 

130 start = index + 1 

131 args.append(arguments[start:].strip()) 

132 if depth != 0: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true

133 raise ExpressionParsingError( 

134 f"Unbalanced parentheses in function arguments '{arguments}'" 

135 ) 

136 return args 

137 

138 

139def find_matching_paren(expression: str, open_index: int) -> int: 

140 """Find the closing parenthesis matching the opening parenthesis at open_index.""" 

141 

142 depth = 0 

143 for index in range(open_index, len(expression)): 

144 if expression[index] == "(": 

145 depth += 1 

146 elif expression[index] == ")": 

147 depth -= 1 

148 if depth == 0: 

149 return index 

150 raise ExpressionParsingError(f"Unclosed function call in expression '{expression}'") 

151 

152 

153def _smooth_min_epsilon(model, expression: Any) -> Any: 

154 """Return a small smoothing term compatible with IDAES ``smooth_min``. 

155 

156 IDAES accepts a numeric epsilon for unitless expressions, but unitful 

157 expressions need a Pyomo ``Param`` carrying the same units so Pyomo can 

158 prove ``sqrt((a - b)^2 + eps^2)`` is dimensionally consistent. 

159 """ 

160 

161 expression_units = pyomo_units.get_units(expression) 

162 if expression_units is None: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true

163 return SMOOTH_MIN_EPSILON 

164 

165 unit_key = str(expression_units) 

166 epsilon_cache = getattr(model, "_ahuora_smooth_min_epsilon_by_unit", None) 

167 if epsilon_cache is None: 

168 epsilon_cache = {} 

169 setattr(model, "_ahuora_smooth_min_epsilon_by_unit", epsilon_cache) 

170 if unit_key in epsilon_cache: 

171 return epsilon_cache[unit_key] 

172 

173 component_index = len(epsilon_cache) 

174 while True: 

175 component_name = f"_ahuora_smooth_min_epsilon_{component_index}" 

176 if not hasattr(model, component_name): 176 ↛ 178line 176 didn't jump to line 178 because the condition on line 176 was always true

177 break 

178 component_index += 1 

179 

180 epsilon = Param(initialize=SMOOTH_MIN_EPSILON, units=expression_units) 

181 model.add_component(component_name, epsilon) 

182 epsilon_cache[unit_key] = epsilon 

183 return epsilon 

184 

185 

186def _pairwise_min_max(model, function_name: str, args: list[Any]) -> Any: 

187 """Build a left-folded expression-language min/max expression. 

188 

189 ``min`` lowers to IDAES ``smooth_min`` so objective functions remain 

190 differentiable. ``max`` keeps the existing exact conditional semantics 

191 until there is a product requirement to smooth it too. 

192 """ 

193 

194 expression = args[0] 

195 for arg in args[1:]: 

196 expression_units = pyomo_units.get_units(expression) 

197 arg_units = pyomo_units.get_units(arg) 

198 if expression_units is not None and arg_units is not None: 198 ↛ 200line 198 didn't jump to line 200 because the condition on line 198 was always true

199 arg = pyomo_units.convert(arg, to_units=expression_units) 

200 if function_name == "min": 

201 expression = smooth_min(expression, arg, eps=_smooth_min_epsilon(model, expression)) 

202 else: 

203 expression = Expr_if(IF_=expression >= arg, THEN_=expression, ELSE_=arg) 

204 return expression 

205 

206 

207def _convert_to_units(expression: Any, target_quantity: Any) -> Any: 

208 """Convert an expression to the units carried by a parsed unit quantity.""" 

209 

210 target_units = pyomo_units.get_units(target_quantity) 

211 if target_units is None: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 raise ExpressionParsingError("convert target must include units") 

213 return pyomo_units.convert(expression, to_units=target_units) 

214 

215 

216def replace_special_functions(expression: str, model, time_index) -> tuple[str, dict[str, Any]]: 

217 """Lower expression-language special calls before handing the rest to SymPy. 

218 

219 SymPy can parse the names ``min`` and ``max``, but the result is not a Pyomo 

220 expression. Django emits lowercase builder expression-language calls for 

221 aggregate ``MIN``/``MAX`` and generated unit coercions; this pass lowers 

222 those calls into Pyomo placeholders whose expressions are returned by the 

223 bimap unchanged. 

224 """ 

225 

226 special_symbols = {} 

227 

228 while True: 

229 case_variant = next( 

230 ( 

231 match 

232 for match in SPECIAL_FUNCTION_CASE_VARIANT_PATTERN.finditer(expression) 

233 if match.group(1).lower() in SPECIAL_FUNCTION_NAMES 

234 and match.group(1) != match.group(1).lower() 

235 ), 

236 None, 

237 ) 

238 if case_variant is not None: 

239 raise ExpressionParsingError( 

240 f"{case_variant.group(1)} must be written as lowercase " 

241 f"{case_variant.group(1).lower()}" 

242 ) 

243 

244 match = SPECIAL_FUNCTION_PATTERN.search(expression) 

245 if match is None: 

246 return expression, special_symbols 

247 

248 open_index = match.end() - 1 

249 close_index = find_matching_paren(expression, open_index) 

250 function_name = match.group(1) 

251 args = split_function_args(expression[open_index + 1:close_index]) 

252 if any(arg == "" for arg in args): 

253 raise ExpressionParsingError(f"{function_name} arguments cannot be empty") 

254 if function_name in {"min", "max"} and len(args) < 2: 

255 raise ExpressionParsingError( 

256 f"{function_name} expects at least 2 arguments, got {len(args)}" 

257 ) 

258 if function_name == "convert" and len(args) != 2: 

259 raise ExpressionParsingError( 

260 f"convert expects exactly 2 arguments, got {len(args)}" 

261 ) 

262 parsed_args = [parse_expression(arg, model, time_index) for arg in args] 

263 placeholder = f"__ahuora_special_{len(special_symbols)}" 

264 if function_name == "convert": 

265 special_symbols[placeholder] = _convert_to_units(parsed_args[0], parsed_args[1]) 

266 else: 

267 special_symbols[placeholder] = _pairwise_min_max(model, function_name, parsed_args) 

268 expression = expression[:match.start()] + placeholder + expression[close_index + 1:] 

269 

270 

271class PyomoSympyMap(PyomoSympyBimap): 

272 """Map expression symbols to Pyomo objects during SymPy-to-Pyomo conversion.""" 

273 

274 def __init__(self, model,time_index, special_symbols: dict[str, Any] | None = None): 

275 self.model = model 

276 self.time_index = time_index 

277 self.special_symbols = special_symbols or {} 

278 

279 def getPyomoSymbol(self, sympy_object: Symbol, default=None): 

280 if not isinstance(sympy_object, Symbol): 

281 return None # It's not in pyomo, e.g a number or something 

282 if sympy_object.name in self.special_symbols: 

283 return self.special_symbols[sympy_object.name] 

284 return evaluate_symbol(self.model, sympy_object.name, self.time_index) 

285 

286 def getSympySymbol(self, pyomo_object, default=None): 

287 raise NotImplementedError( 

288 "getSympySymbol not implemented, because it shouldn't be needed" 

289 ) 

290 # we don't care, it only needs to go one way 

291 

292 def sympyVars(self): 

293 raise NotImplementedError( 

294 "sympyVars not implemented, because it shouldn't be needed" 

295 ) 

296 

297 

298def parse_expression(expression, model,time_index) -> Expression: 

299 """Parse an Ahuora expression string into a Pyomo expression.""" 

300 

301 # use the bimap to get the correct pyomo object for each symbol 

302 try: 

303 expression = handle_special_chars(expression) 

304 expression, special_symbols = replace_special_functions(expression, model, time_index) 

305 bimap = PyomoSympyMap(model,time_index, special_symbols) 

306 sympy_expr = parse_expr(expression) 

307 pyomo_expr = sympy2pyomo_expression(sympy_expr, bimap) 

308 except Exception as e: 

309 raise ExpressionParsingError(f"{expression}: error: {e}") 

310 return pyomo_expr