Coverage for backend/django/idaes_factory/adapters/convert_expression.py: 90%
114 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1"""Convert user-authored formulas into builder-readable expression strings.
3Expressions are authored in the frontend with human-friendly mentions such as
4``@[Pressure](prop123)``. The IDAES factory is the right layer to resolve those
5mentions, plus property-key aggregates such as ``SUM(@work_mechanical)``, because
6it has the current serialized group scope and property metadata. The builder
7receives only ordinary ``id_<PropertyValueId>`` symbols and expression-language
8``min``/``max`` calls.
9"""
11import re
12from typing import TYPE_CHECKING
14if TYPE_CHECKING:
15 from core.auxiliary.models.PropertyInfo import PropertyInfo
16 from core.auxiliary.models.PropertyValue import PropertyValue
17 from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
18 from idaes_factory.idaes_factory_context import IdaesFactoryContext
20from .serialisation_rules import is_group_enabled
23# Aggregate calls accepted by this adapter:
24# FUNCTION(@property_key)
25#
26# FUNCTION is one of SUM/AVG/MIN/MAX/COUNT, matched case-insensitively.
27# property_key is a stable object-schema property key, not a display name or
28# property-value ID. Keys may include dots for nested schema names such as
29# ``sink.heat``.
30SUPPORTED_AGGREGATE_FUNCTIONS = ("SUM", "AVG", "MIN", "MAX", "COUNT")
31AGGREGATE_KEY_PATTERN_SOURCE = r"[A-Za-z_][A-Za-z0-9_.]*"
32AGGREGATE_PATTERN = re.compile(
33 rf"\b({'|'.join(SUPPORTED_AGGREGATE_FUNCTIONS)})\s*\(\s*@({AGGREGATE_KEY_PATTERN_SOURCE})\s*\)",
34 flags=re.IGNORECASE,
35)
36MENTION_PATTERN = r"@\[[^\]]+\]\(([^)]+)\)"
39class AggregateExpressionError(ValueError):
40 """Raised when a property-key aggregate cannot be resolved safely."""
43def _get_serializable_numeric_property(
44 ctx: "IdaesFactoryContext",
45 simulation_object: "SimulationObject",
46 property_key: str,
47) -> "PropertyInfo | None":
48 """Return a matching scalar numeric property if it belongs in this build."""
50 prop_schema = simulation_object.schema.properties.get(property_key)
51 if prop_schema is None or prop_schema.type != "numeric":
52 return None
53 if prop_schema.indexSets:
54 raise AggregateExpressionError(
55 f"Aggregate property `{property_key}` is indexed on "
56 f"{simulation_object.componentName}; indexed aggregates are not supported."
57 )
58 if not is_group_enabled(simulation_object, ctx, property_key):
59 return None
60 return ctx.get_property(simulation_object.properties, property_key)
63def _get_single_property_value(
64 property_info: "PropertyInfo",
65 property_key: str,
66) -> "PropertyValue":
67 """Return the single scalar value attached to an aggregate property."""
69 property_values = list(property_info.values.all())
70 if len(property_values) != 1: 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true
71 simulation_object = property_info.set.simulationObject
72 raise AggregateExpressionError(
73 f"Aggregate property `{property_key}` has {len(property_values)} values on "
74 f"{simulation_object.componentName}; only one scalar value per object is supported."
75 )
76 return property_values[0]
79def _should_include_aggregate_value(
80 property_value: "PropertyValue",
81 property_key: str,
82 owner_property_value: "PropertyValue | None",
83) -> bool:
84 """Return whether a value participates, raising for invalid matches."""
86 if owner_property_value is not None and property_value.id == owner_property_value.id:
87 raise AggregateExpressionError(
88 f"Aggregate `@{property_key}` would include the property value it is defined on."
89 )
90 if not property_value.is_enabled() and not property_value.is_control_manipulated(): 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true
91 return False
92 if not property_value.has_value():
93 raise AggregateExpressionError(
94 f"Aggregate property `{property_key}` is missing a value on "
95 f"{property_value.get_simulation_object().componentName}."
96 )
97 return True
100def _ensure_compatible_unit_type(
101 property_info: "PropertyInfo",
102 property_key: str,
103 expected_unit_type: str | None,
104) -> str:
105 """Keep aggregate inputs within one physical unit category."""
107 if expected_unit_type is None:
108 return property_info.unitType
109 if property_info.unitType != expected_unit_type:
110 raise AggregateExpressionError(
111 f"Aggregate property `{property_key}` has incompatible unit types: "
112 f"`{expected_unit_type}` and `{property_info.unitType}`."
113 )
114 return expected_unit_type
117def _get_scalar_aggregate_values(
118 ctx: "IdaesFactoryContext",
119 property_key: str,
120 owner_property_value: "PropertyValue | None" = None,
121) -> list["PropertyValue"]:
122 """Resolve one scalar numeric value per matching object in the factory scope.
124 Aggregates intentionally use the same object set and enablement rules as the
125 IDAES factory serialization path. Disabled values are skipped, but matching
126 enabled values must be scalar, populated, and unit-type compatible so the
127 generated Pyomo expression has a clear physical meaning.
128 """
130 matches = []
131 unit_type = None
133 for simulation_object in ctx.simulation_objects:
134 property_info = _get_serializable_numeric_property(
135 ctx,
136 simulation_object,
137 property_key,
138 )
139 if property_info is None:
140 continue
142 property_value = _get_single_property_value(property_info, property_key)
143 if not _should_include_aggregate_value( 143 ↛ 148line 143 didn't jump to line 148 because the condition on line 143 was never true
144 property_value,
145 property_key,
146 owner_property_value,
147 ):
148 continue
150 unit_type = _ensure_compatible_unit_type(
151 property_info,
152 property_key,
153 unit_type,
154 )
155 matches.append(property_value)
157 if not matches:
158 raise AggregateExpressionError(
159 f"Aggregate property `@{property_key}` did not match any serializable values."
160 )
162 return sorted(matches, key=lambda value: value.id)
165def _pairwise_function(function_name: str, symbols: list[str]) -> str:
166 """Build a left-folded function call for an arbitrary number of values."""
168 expression = symbols[0]
169 for symbol in symbols[1:]:
170 expression = f"{function_name}({expression}, {symbol})"
171 return expression
174def _expand_aggregate(
175 function_name: str,
176 property_key: str,
177 ctx: "IdaesFactoryContext | None",
178 owner_property_value: "PropertyValue | None",
179) -> str:
180 """Expand ``AGG(@key)`` into the expression language consumed by the builder."""
182 if ctx is None: 182 ↛ 183line 182 didn't jump to line 183 because the condition on line 182 was never true
183 raise AggregateExpressionError(
184 f"Aggregate function `{function_name}` requires an IDAES factory context."
185 )
187 values = _get_scalar_aggregate_values(ctx, property_key, owner_property_value)
188 symbols = [f"id_{value.id}" for value in values]
189 aggregate_name = function_name.upper()
191 if aggregate_name == "COUNT":
192 return str(len(symbols))
193 if aggregate_name == "SUM":
194 return "(" + " + ".join(symbols) + ")"
195 if aggregate_name == "AVG":
196 return "((" + " + ".join(symbols) + f") / {len(symbols)})"
197 if aggregate_name == "MIN":
198 return _pairwise_function("min", symbols)
199 if aggregate_name == "MAX": 199 ↛ 202line 199 didn't jump to line 202 because the condition on line 199 was always true
200 return _pairwise_function("max", symbols)
202 raise AggregateExpressionError(f"Unsupported aggregate function `{function_name}`.")
205def _expand_aggregates(
206 expression: str,
207 ctx: "IdaesFactoryContext | None",
208 owner_property_value: "PropertyValue | None",
209) -> str:
210 """Expand all property-key aggregate calls in an expression string."""
212 return AGGREGATE_PATTERN.sub(
213 lambda match: _expand_aggregate(
214 match.group(1),
215 match.group(2),
216 ctx,
217 owner_property_value,
218 ),
219 expression,
220 )
223def _mention_id(match: re.Match[str]) -> str:
224 """Return the stored ID from one react-mentions markup match."""
226 return match.group(1)
229def _replace_property_mentions(expression: str) -> str:
230 """Replace property mention markup with builder ``id_*`` symbols."""
232 def replacer(match: re.Match[str]) -> str:
233 mention_id = _mention_id(match)
234 if mention_id.startswith("prop"):
235 return "id_" + mention_id[4:]
236 return ""
238 return re.sub(MENTION_PATTERN, replacer, expression)
241def _collect_property_mention_dependencies(expression: str) -> set[int]:
242 """Collect property value IDs referenced through react-mentions markup."""
244 dependencies: set[int] = set()
245 for match in re.finditer(MENTION_PATTERN, expression):
246 mention_id = _mention_id(match)
247 if mention_id.startswith("prop"):
248 dependencies.add(int(mention_id[4:]))
249 return dependencies
252def _collect_aggregate_dependencies(
253 expression: str,
254 ctx: "IdaesFactoryContext | None",
255 owner_property_value: "PropertyValue | None",
256) -> set[int]:
257 """Collect aggregate inputs before expansion can erase them.
259 ``COUNT(@key)`` lowers to a literal count, so dependency extraction must
260 inspect aggregate calls directly instead of relying only on the expanded
261 expression text.
262 """
264 dependencies: set[int] = set()
265 for match in AGGREGATE_PATTERN.finditer(expression):
266 if ctx is None: 266 ↛ 267line 266 didn't jump to line 267 because the condition on line 266 was never true
267 raise AggregateExpressionError(
268 f"Aggregate function `{match.group(1)}` requires an IDAES factory context."
269 )
270 for property_value in _get_scalar_aggregate_values(
271 ctx,
272 match.group(2),
273 owner_property_value,
274 ):
275 dependencies.add(property_value.id)
276 return dependencies
279def convert_expression(
280 expression: str | None,
281 ctx: "IdaesFactoryContext | None" = None,
282 owner_property_value: "PropertyValue | None" = None,
283) -> str:
284 """Convert frontend formula syntax into ID symbols understood by the builder."""
286 if expression is None: 286 ↛ 287line 286 didn't jump to line 287 because the condition on line 286 was never true
287 raise ValueError("Unexpected null expression")
289 expression = _expand_aggregates(expression, ctx, owner_property_value)
291 # Clean up any extra spaces or operators left behind
292 return _replace_property_mentions(expression)
295def get_expression_dependencies(
296 expression: str | None,
297 ctx: "IdaesFactoryContext | None" = None,
298 owner_property_value: "PropertyValue | None" = None,
299) -> set[int]:
300 """Return every property value ID referenced directly or through aggregates."""
302 if expression is None: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true
303 raise ValueError("Unexpected null expression")
305 dependencies = _collect_aggregate_dependencies(
306 expression,
307 ctx,
308 owner_property_value,
309 )
310 expression = _expand_aggregates(expression, ctx, owner_property_value)
312 dependencies.update(_collect_property_mention_dependencies(expression))
313 for dependency in re.findall(r"\bid_(\d+)\b", expression):
314 dependencies.add(int(dependency))
316 return dependencies