Coverage for backend/django/idaes_factory/idaes_factory.py: 97%
170 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1import traceback
3from opentelemetry import trace
4from CoreRoot import settings
5from ahuora_builder_types.unit_model_schema import SolvedPropertyValueSchema
6import dotenv
7from typing import Any
8from django.db import transaction
9from ahuora_builder_types.scenario_schema import UnfixedVariableSchema, OptimizationSchema
10from core.auxiliary.models.Scenario import Scenario, OptimizationDegreesOfFreedom
11from core.exceptions import DetailedException
12from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
13from ahuora_builder_types import FlowsheetSchema
14from core.auxiliary.models.PropertyInfo import PropertyInfo
15from core.auxiliary.models.PropertyValue import PropertyValue
16from core.auxiliary.models.Solution import Solution
17from core.auxiliary.enums.unitsLibrary import units_library
18from .adapters import arc_adapter
19from .adapters.convert_expression import convert_expression
20from .idaes_factory_context import IdaesFactoryContext, LiveSolveParams
21from .queryset_lookup import get_value_object
22from .unit_conversion import convert_value
23from idaes_factory.unit_conversion.unit_conversion import can_convert
24from core.auxiliary.models.Scenario import Scenario, SolverOptionEnum
27dotenv.load_dotenv()
29# Todo: replace these with literal types from the Compounds/PP library
30Compound = str
31PropertyPackage = str
34class IdaesFactoryBuildException(DetailedException):
35 pass
38tracer = trace.get_tracer(settings.OPEN_TELEMETRY_TRACER_NAME)
41class IdaesFactory:
42 """
43 The IdaesFactory class is the core class for building
44 a flowsheet (JSON schema) that can be sent to the IDAES
45 solver, and for storing the results back in the database.
46 """
48 def __init__(
49 self,
50 group_id: int,
51 scenario: Scenario | None = None,
52 require_variables_fixed: bool = True,
53 solve_index: int | None = None,
55 ) -> None:
56 """Prepare a factory capable of serialising the requested flowsheet.
58 Args:
59 group_id: Identifier of the flowsheet to serialise.
60 scenario: Optional scenario providing solve configuration settings.
61 require_variables_fixed: Whether adapters should enforce fixed variables.
62 solve_index: Optional multi-steady-state index to bind to the context.
63 """
65 self.solve_index = solve_index
66 self.scenario = scenario
68 if scenario is not None:
69 is_dynamic = scenario.enable_dynamics
70 step_size = scenario.simulation_length / \
71 float(scenario.num_time_steps)
72 enable_rating = scenario.enable_rating
74 if scenario.enable_optimization:
75 # If we are doing optimization, we solve from the root
76 group_id = scenario.flowsheet.rootGrouping.id
77 else:
78 is_dynamic = False
79 step_size = 1 # Just need a placeholder value.
80 enable_rating = False
82 time_steps = list([int(i) * step_size for i in range(0,
83 scenario.num_time_steps)]) if is_dynamic else [0]
85 self.flowsheet = FlowsheetSchema(
86 group_id=group_id,
87 dynamic=is_dynamic,
88 time_set=time_steps,
89 property_packages=[],
90 unit_models=[],
91 arcs=[],
92 expressions=[],
93 optimizations=[],
94 is_rating_mode=enable_rating,
95 disable_initialization=getattr(
96 scenario, "disable_initialization", False),
97 skip_initialization_for_units_with_initial_values=getattr(
98 scenario,
99 "skip_initialization_for_units_with_initial_values",
100 False,
101 ),
102 solver_option=getattr(scenario, "solver_option", "ipopt"),
103 )
105 # factory context
106 self.context = IdaesFactoryContext(
107 group_id,
108 require_variables_fixed=require_variables_fixed,
109 solve_index=solve_index,
110 time_steps=time_steps,
111 time_step_size=step_size,
112 scenario=scenario,
113 )
115 # Updates the context to use a different solve index.
116 # build() should be called after this to update the extracted flowsheet data.
117 def use_with_solve_index(self, solve_index: int) -> None:
118 """Rebind the factory to a different multi steady-state solve index.
120 Args:
121 solve_index: Index of the solve configuration within the scenario.
122 """
123 self.solve_index = solve_index
124 self.context.update_solve_index(self.solve_index)
126 @tracer.start_as_current_span("build_flowsheet")
127 def build(self):
128 """Populate the flowsheet schema with units, arcs, expressions, and metadata.
130 Raises:
131 IdaesFactoryBuildException: If any adapter fails during serialisation.
132 """
133 try:
134 self.setup_unit_models()
135 self.create_arcs()
136 self.add_property_packages()
137 self.add_expressions()
138 self.add_optimizations()
139 self.check_dependencies()
140 except Exception as e:
141 raise IdaesFactoryBuildException(e, "idaes_factory_build") from e
143 def clear_flowsheet(self) -> None:
144 """Reset the in-memory flowsheet while preserving configuration metadata."""
145 self.flowsheet = FlowsheetSchema(
146 group_id=self.flowsheet.group_id,
147 dynamic=self.flowsheet.dynamic,
148 time_set=self.flowsheet.time_set,
149 property_packages=[],
150 unit_models=[],
151 arcs=[],
152 expressions=[],
153 optimizations=[],
154 is_rating_mode=self.flowsheet.is_rating_mode,
155 disable_initialization=self.flowsheet.disable_initialization,
156 skip_initialization_for_units_with_initial_values=(
157 self.flowsheet.skip_initialization_for_units_with_initial_values
158 ),
159 solver_option=self.flowsheet.solver_option
160 )
162 def add_property_packages(self) -> None:
163 """Attach any property packages collected during context loading."""
164 self.flowsheet.property_packages = self.context.property_packages
166 def setup_unit_models(self):
167 """Serialise all unit operations."""
168 # add all unit models
169 exclude = {"stream", "group", "recycle", "specificationBlock",
170 "energy_stream", "ac_stream", "humid_air_stream", "transformer_stream"}
171 for unit_model in self.context.exclude_object_type(exclude):
172 self.add_unit_model(unit_model)
174 def add_unit_model(self, unit_model: SimulationObject) -> None:
175 """Serialise and append a unit model using its registered adapter.
177 Args:
178 unit_model: Simulation object to convert into IDAES schema.
180 Raises:
181 Exception: If the adapter fails to serialise the unit model.
182 """
183 try:
184 adapter = unit_model.schema.idaes_adapter
185 if adapter is None: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 raise ValueError(
187 f"No IDAES adapter registered for object type {unit_model.objectType}"
188 )
189 schema = adapter.serialise(self.context, unit_model)
190 self.flowsheet.unit_models.append(schema)
191 except Exception as e:
192 raise Exception(
193 f"Error adding unit model {unit_model.componentName} to the flowsheet: {e}"
194 )
196 def add_expressions(self) -> None:
197 """Collect custom property expressions and expose them on the flowsheet."""
198 # expressions are stored in the property set of a group
199 # eg. the global base flowsheet object
200 simulation_object: SimulationObject
201 for simulation_object in self.context.exclude_object_type({"machineLearningBlock"}):
202 # skip machine learning blocks, their properties are handled differently. We still need to support them in future.
204 properties = simulation_object.properties
205 prop: PropertyInfo
206 for prop in properties.ContainedProperties.all():
207 if prop.key in simulation_object.schema.properties:
208 # This is a default property, we have already processed it.
209 # We only want to capture custom properties
210 continue
211 property_value = get_value_object(prop)
212 self.context.add_property_value_dependency(property_value)
213 self.flowsheet.expressions.append(
214 {
215 "id": property_value.id,
216 "name": prop.displayName,
217 "expression": convert_expression(property_value.formula),
218 }
219 )
221 def add_optimizations(self) -> None:
222 """Serialise scenario-level optimisation settings onto the flowsheet."""
223 # This method was originally written to return multiple optimisations.
224 # this doesn't make sense, but idaes_service hasn't been updated to only expect one.
225 # so for now, it sets optimisations to an array with one item
226 optimization = self.context.scenario
227 if optimization is None or optimization.enable_optimization is False:
228 # no optimization to add
229 return
230 sense = "minimize" if optimization.minimize else "maximize"
231 print(optimization.objective)
232 if optimization.objective is None: 232 ↛ 233line 232 didn't jump to line 233 because the condition on line 232 was never true
233 raise ValueError(
234 "Please set an objective for the optimization to minimize or maximize.")
235 objective = get_value_object(optimization.objective)
237 degrees_of_freedom = []
238 degree_of_freedom: OptimizationDegreesOfFreedom
239 for degree_of_freedom in optimization.degreesOfFreedom.all():
240 property_value_id = degree_of_freedom.propertyValue_id
242 dof_schema = UnfixedVariableSchema(
243 id=property_value_id,
244 lower_bound=degree_of_freedom.lower_bound,
245 upper_bound=degree_of_freedom.upper_bound
246 )
247 degrees_of_freedom.append(dof_schema)
249 self.flowsheet.optimizations.append(OptimizationSchema(
250 objective=objective.id,
251 sense=sense,
252 unfixed_variables=degrees_of_freedom,
253 ))
255 def check_dependencies(self) -> None:
256 """Verify that all property value dependencies are serialised"""
257 if self.scenario is not None and self.scenario.enable_optimization is True:
258 # No need to check since we are serialising everything
259 return
260 serialised_property_values = self.context.serialised_property_values
261 for dependency, prop_values in self.context.property_value_dependencies.items():
262 if dependency not in serialised_property_values:
263 dependency_prop_value = PropertyValue.objects.get(id=dependency)
264 prop_info_list_str = ", ".join([f"{prop_value.get_simulation_object().componentName}/{prop_value.property.displayName}" for prop_value in prop_values])
265 raise Exception(f"Dependency property {dependency_prop_value.get_simulation_object().componentName}/{dependency_prop_value.property.displayName} is not serialised, but is required by properties {prop_info_list_str}.")
267 def create_arcs(self):
268 """Serialise stream-like objects into arc connections for the flowsheet."""
269 streams = self.context.filter_object_type(
270 {"stream", "energy_stream", "ac_stream", "humid_air_stream"})
271 for stream in streams:
272 arc_schema = arc_adapter.create_arc(self.context, stream)
274 if arc_schema is not None:
275 self.flowsheet.arcs.append(arc_schema)
278# noinspection PyUnreachableCode
279def store_properties_schema(
280 properties_schema: list[SolvedPropertyValueSchema] | None,
281 flowsheet_id: int,
282 scenario_id: int | None = None,
283 solve_index: int | None = None
284) -> None:
285 """Persist solved property values and dynamic results to the database.
287 Args:
288 properties_schema: Collection of property payloads returned by IDAES.
289 flowsheet_id: Identifier of the flowsheet whose properties were solved.
290 scenario_id: Optional scenario identifier associated with the solve.
291 solve_index: Multi-steady-state index for the stored values, if any.
292 """
293 if not properties_schema:
294 return
295 properties_schema = [
296 SolvedPropertyValueSchema(**prop) if isinstance(prop, dict) else prop
297 for prop in properties_schema
298 ]
299 # create a id->property map
300 ids = [prop.id for prop in properties_schema]
301 property_values = PropertyValue.objects.filter(id__in=ids).select_related(
302 "property"
303 )
304 prop_map = {prop.id: prop for prop in property_values}
306 property_values = []
307 property_infos = []
308 dynamic_results = []
310 for prop_schema in properties_schema:
311 prop = prop_map.get(prop_schema.id, None)
312 if prop is None: 312 ↛ 313line 312 didn't jump to line 313 because the condition on line 312 was never true
313 raise Exception(
314 f"Property {prop_schema.id} not found in the database.")
316 property_info: PropertyInfo = prop.property
317 updated_value = prop_schema.value
319 updated_value = prop_schema.value
321 from_unit = prop_schema.unit
323 is_multi_steady_state = solve_index is not None
324 is_dynamics = scenario_id != None and isinstance(
325 updated_value, list) and len(updated_value) > 1
327 # If we're doing MSS or dynamics, we need to create associated dynamic results.
328 if is_multi_steady_state or is_dynamics:
329 dynamic_result = Solution(
330 property=prop,
331 flowsheet_id=flowsheet_id,
332 solve_index=solve_index,
333 scenario_id=scenario_id
334 )
336 # If we're dealing with dynamics, the updated_value is a list of values.
337 # Otherwise, it's a single scalar value.
338 dynamic_result.values = (updated_value if isinstance(updated_value, list)
339 else [updated_value])
340 dynamic_results.append(dynamic_result)
341 continue
343 # else we can continue as normal
344 if prop_schema.unknown_units and not can_convert(
345 from_unit, property_info.unit
346 ):
347 # we don't know the category of unit_type this unit is in!
348 # default to "unknown" with a custom unit
349 property_info.unitType = "unknown"
350 property_info.unit = from_unit
351 to_unit = from_unit
352 # try to find the unit_type by looping through all
353 # the units library and checking the first unit in the unit_type
354 # to see if it can be converted
355 for unit_type in units_library.keys():
356 default_unit = units_library[unit_type][0]["value"]
357 if can_convert(from_unit, default_unit):
358 # update the unitType
359 property_info.unitType = unit_type
360 property_info.unit = default_unit
361 to_unit = default_unit
362 break
363 property_infos.append(property_info)
364 else:
365 to_unit = property_info.unit
367 # TODO: better handling of multi-dimensional indexed properties.
368 if isinstance(updated_value, list):
369 val = updated_value[0]
370 else:
371 val = updated_value
373 new_value = convert_value(val, from_unit=from_unit, to_unit=to_unit)
374 prop.value = new_value
375 property_values.append(prop)
377 with transaction.atomic():
378 # save the property values
379 PropertyValue.objects.bulk_update(property_values, ["value"])
380 Solution.objects.bulk_create(
381 dynamic_results,
382 update_conflicts=True,
383 update_fields=["values"],
384 unique_fields=["pk"],
385 )
387 # save the property infos
388 PropertyInfo.objects.bulk_update(property_infos, ["unitType", "unit"])
391def save_all_initial_values(unit_models: dict[str, Any]) -> None:
392 """Persist initial values returned from IDAES for each unit model.
394 Args:
395 unit_models: Mapping of unit model ids to serialised initial value payloads.
396 """
397 simulation_objects = {unit_op.id: unit_op for unit_op in (SimulationObject.objects
398 .filter(id__in=unit_models.keys())
399 .only("id", "initial_values")
400 )}
402 for unit_model_id, unit_model in unit_models.items():
403 simulation_object = simulation_objects[int(unit_model_id)]
404 initial_values = unit_model
405 simulation_object.initial_values = initial_values
407 SimulationObject.objects.bulk_update(
408 simulation_objects.values(), ["initial_values"])