Coverage for backend/django/idaes_factory/adapters/convert_expression.py: 90%

114 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1"""Convert user-authored formulas into builder-readable expression strings. 

2 

3Expressions are authored in the frontend with human-friendly mentions such as 

4``@[Pressure](prop123)``. The IDAES factory is the right layer to resolve those 

5mentions, plus property-key aggregates such as ``SUM(@work_mechanical)``, because 

6it has the current serialized group scope and property metadata. The builder 

7receives only ordinary ``id_<PropertyValueId>`` symbols and expression-language 

8``min``/``max`` calls. 

9""" 

10 

11import re 

12from typing import TYPE_CHECKING 

13 

14if TYPE_CHECKING: 

15 from core.auxiliary.models.PropertyInfo import PropertyInfo 

16 from core.auxiliary.models.PropertyValue import PropertyValue 

17 from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

18 from idaes_factory.idaes_factory_context import IdaesFactoryContext 

19 

20from .serialisation_rules import is_group_enabled 

21 

22 

23# Aggregate calls accepted by this adapter: 

24# FUNCTION(@property_key) 

25# 

26# FUNCTION is one of SUM/AVG/MIN/MAX/COUNT, matched case-insensitively. 

27# property_key is a stable object-schema property key, not a display name or 

28# property-value ID. Keys may include dots for nested schema names such as 

29# ``sink.heat``. 

30SUPPORTED_AGGREGATE_FUNCTIONS = ("SUM", "AVG", "MIN", "MAX", "COUNT") 

31AGGREGATE_KEY_PATTERN_SOURCE = r"[A-Za-z_][A-Za-z0-9_.]*" 

32AGGREGATE_PATTERN = re.compile( 

33 rf"\b({'|'.join(SUPPORTED_AGGREGATE_FUNCTIONS)})\s*\(\s*@({AGGREGATE_KEY_PATTERN_SOURCE})\s*\)", 

34 flags=re.IGNORECASE, 

35) 

36MENTION_PATTERN = r"@\[[^\]]+\]\(([^)]+)\)" 

37 

38 

39class AggregateExpressionError(ValueError): 

40 """Raised when a property-key aggregate cannot be resolved safely.""" 

41 

42 

43def _get_serializable_numeric_property( 

44 ctx: "IdaesFactoryContext", 

45 simulation_object: "SimulationObject", 

46 property_key: str, 

47) -> "PropertyInfo | None": 

48 """Return a matching scalar numeric property if it belongs in this build.""" 

49 

50 prop_schema = simulation_object.schema.properties.get(property_key) 

51 if prop_schema is None or prop_schema.type != "numeric": 

52 return None 

53 if prop_schema.indexSets: 

54 raise AggregateExpressionError( 

55 f"Aggregate property `{property_key}` is indexed on " 

56 f"{simulation_object.componentName}; indexed aggregates are not supported." 

57 ) 

58 if not is_group_enabled(simulation_object, ctx, property_key): 

59 return None 

60 return ctx.get_property(simulation_object.properties, property_key) 

61 

62 

63def _get_single_property_value( 

64 property_info: "PropertyInfo", 

65 property_key: str, 

66) -> "PropertyValue": 

67 """Return the single scalar value attached to an aggregate property.""" 

68 

69 property_values = list(property_info.values.all()) 

70 if len(property_values) != 1: 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true

71 simulation_object = property_info.set.simulationObject 

72 raise AggregateExpressionError( 

73 f"Aggregate property `{property_key}` has {len(property_values)} values on " 

74 f"{simulation_object.componentName}; only one scalar value per object is supported." 

75 ) 

76 return property_values[0] 

77 

78 

79def _should_include_aggregate_value( 

80 property_value: "PropertyValue", 

81 property_key: str, 

82 owner_property_value: "PropertyValue | None", 

83) -> bool: 

84 """Return whether a value participates, raising for invalid matches.""" 

85 

86 if owner_property_value is not None and property_value.id == owner_property_value.id: 

87 raise AggregateExpressionError( 

88 f"Aggregate `@{property_key}` would include the property value it is defined on." 

89 ) 

90 if not property_value.is_enabled() and not property_value.is_control_manipulated(): 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 return False 

92 if not property_value.has_value(): 

93 raise AggregateExpressionError( 

94 f"Aggregate property `{property_key}` is missing a value on " 

95 f"{property_value.get_simulation_object().componentName}." 

96 ) 

97 return True 

98 

99 

100def _ensure_compatible_unit_type( 

101 property_info: "PropertyInfo", 

102 property_key: str, 

103 expected_unit_type: str | None, 

104) -> str: 

105 """Keep aggregate inputs within one physical unit category.""" 

106 

107 if expected_unit_type is None: 

108 return property_info.unitType 

109 if property_info.unitType != expected_unit_type: 

110 raise AggregateExpressionError( 

111 f"Aggregate property `{property_key}` has incompatible unit types: " 

112 f"`{expected_unit_type}` and `{property_info.unitType}`." 

113 ) 

114 return expected_unit_type 

115 

116 

117def _get_scalar_aggregate_values( 

118 ctx: "IdaesFactoryContext", 

119 property_key: str, 

120 owner_property_value: "PropertyValue | None" = None, 

121) -> list["PropertyValue"]: 

122 """Resolve one scalar numeric value per matching object in the factory scope. 

123 

124 Aggregates intentionally use the same object set and enablement rules as the 

125 IDAES factory serialization path. Disabled values are skipped, but matching 

126 enabled values must be scalar, populated, and unit-type compatible so the 

127 generated Pyomo expression has a clear physical meaning. 

128 """ 

129 

130 matches = [] 

131 unit_type = None 

132 

133 for simulation_object in ctx.simulation_objects: 

134 property_info = _get_serializable_numeric_property( 

135 ctx, 

136 simulation_object, 

137 property_key, 

138 ) 

139 if property_info is None: 

140 continue 

141 

142 property_value = _get_single_property_value(property_info, property_key) 

143 if not _should_include_aggregate_value( 143 ↛ 148line 143 didn't jump to line 148 because the condition on line 143 was never true

144 property_value, 

145 property_key, 

146 owner_property_value, 

147 ): 

148 continue 

149 

150 unit_type = _ensure_compatible_unit_type( 

151 property_info, 

152 property_key, 

153 unit_type, 

154 ) 

155 matches.append(property_value) 

156 

157 if not matches: 

158 raise AggregateExpressionError( 

159 f"Aggregate property `@{property_key}` did not match any serializable values." 

160 ) 

161 

162 return sorted(matches, key=lambda value: value.id) 

163 

164 

165def _pairwise_function(function_name: str, symbols: list[str]) -> str: 

166 """Build a left-folded function call for an arbitrary number of values.""" 

167 

168 expression = symbols[0] 

169 for symbol in symbols[1:]: 

170 expression = f"{function_name}({expression}, {symbol})" 

171 return expression 

172 

173 

174def _expand_aggregate( 

175 function_name: str, 

176 property_key: str, 

177 ctx: "IdaesFactoryContext | None", 

178 owner_property_value: "PropertyValue | None", 

179) -> str: 

180 """Expand ``AGG(@key)`` into the expression language consumed by the builder.""" 

181 

182 if ctx is None: 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true

183 raise AggregateExpressionError( 

184 f"Aggregate function `{function_name}` requires an IDAES factory context." 

185 ) 

186 

187 values = _get_scalar_aggregate_values(ctx, property_key, owner_property_value) 

188 symbols = [f"id_{value.id}" for value in values] 

189 aggregate_name = function_name.upper() 

190 

191 if aggregate_name == "COUNT": 

192 return str(len(symbols)) 

193 if aggregate_name == "SUM": 

194 return "(" + " + ".join(symbols) + ")" 

195 if aggregate_name == "AVG": 

196 return "((" + " + ".join(symbols) + f") / {len(symbols)})" 

197 if aggregate_name == "MIN": 

198 return _pairwise_function("min", symbols) 

199 if aggregate_name == "MAX": 199 ↛ 202line 199 didn't jump to line 202 because the condition on line 199 was always true

200 return _pairwise_function("max", symbols) 

201 

202 raise AggregateExpressionError(f"Unsupported aggregate function `{function_name}`.") 

203 

204 

205def _expand_aggregates( 

206 expression: str, 

207 ctx: "IdaesFactoryContext | None", 

208 owner_property_value: "PropertyValue | None", 

209) -> str: 

210 """Expand all property-key aggregate calls in an expression string.""" 

211 

212 return AGGREGATE_PATTERN.sub( 

213 lambda match: _expand_aggregate( 

214 match.group(1), 

215 match.group(2), 

216 ctx, 

217 owner_property_value, 

218 ), 

219 expression, 

220 ) 

221 

222 

223def _mention_id(match: re.Match[str]) -> str: 

224 """Return the stored ID from one react-mentions markup match.""" 

225 

226 return match.group(1) 

227 

228 

229def _replace_property_mentions(expression: str) -> str: 

230 """Replace property mention markup with builder ``id_*`` symbols.""" 

231 

232 def replacer(match: re.Match[str]) -> str: 

233 mention_id = _mention_id(match) 

234 if mention_id.startswith("prop"): 

235 return "id_" + mention_id[4:] 

236 return "" 

237 

238 return re.sub(MENTION_PATTERN, replacer, expression) 

239 

240 

241def _collect_property_mention_dependencies(expression: str) -> set[int]: 

242 """Collect property value IDs referenced through react-mentions markup.""" 

243 

244 dependencies: set[int] = set() 

245 for match in re.finditer(MENTION_PATTERN, expression): 

246 mention_id = _mention_id(match) 

247 if mention_id.startswith("prop"): 

248 dependencies.add(int(mention_id[4:])) 

249 return dependencies 

250 

251 

252def _collect_aggregate_dependencies( 

253 expression: str, 

254 ctx: "IdaesFactoryContext | None", 

255 owner_property_value: "PropertyValue | None", 

256) -> set[int]: 

257 """Collect aggregate inputs before expansion can erase them. 

258 

259 ``COUNT(@key)`` lowers to a literal count, so dependency extraction must 

260 inspect aggregate calls directly instead of relying only on the expanded 

261 expression text. 

262 """ 

263 

264 dependencies: set[int] = set() 

265 for match in AGGREGATE_PATTERN.finditer(expression): 

266 if ctx is None: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true

267 raise AggregateExpressionError( 

268 f"Aggregate function `{match.group(1)}` requires an IDAES factory context." 

269 ) 

270 for property_value in _get_scalar_aggregate_values( 

271 ctx, 

272 match.group(2), 

273 owner_property_value, 

274 ): 

275 dependencies.add(property_value.id) 

276 return dependencies 

277 

278 

279def convert_expression( 

280 expression: str | None, 

281 ctx: "IdaesFactoryContext | None" = None, 

282 owner_property_value: "PropertyValue | None" = None, 

283) -> str: 

284 """Convert frontend formula syntax into ID symbols understood by the builder.""" 

285 

286 if expression is None: 286 ↛ 287line 286 didn't jump to line 287 because the condition on line 286 was never true

287 raise ValueError("Unexpected null expression") 

288 

289 expression = _expand_aggregates(expression, ctx, owner_property_value) 

290 

291 # Clean up any extra spaces or operators left behind 

292 return _replace_property_mentions(expression) 

293 

294 

295def get_expression_dependencies( 

296 expression: str | None, 

297 ctx: "IdaesFactoryContext | None" = None, 

298 owner_property_value: "PropertyValue | None" = None, 

299) -> set[int]: 

300 """Return every property value ID referenced directly or through aggregates.""" 

301 

302 if expression is None: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true

303 raise ValueError("Unexpected null expression") 

304 

305 dependencies = _collect_aggregate_dependencies( 

306 expression, 

307 ctx, 

308 owner_property_value, 

309 ) 

310 expression = _expand_aggregates(expression, ctx, owner_property_value) 

311 

312 dependencies.update(_collect_property_mention_dependencies(expression)) 

313 for dependency in re.findall(r"\bid_(\d+)\b", expression): 

314 dependencies.add(int(dependency)) 

315 

316 return dependencies