Coverage for backend/django/idaes_factory/idaes_factory.py: 97%
167 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-03-26 20:57 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-03-26 20:57 +0000
1import traceback
3from opentelemetry import trace
4from CoreRoot import settings
5from ahuora_builder_types.unit_model_schema import SolvedPropertyValueSchema
6import dotenv
7from typing import Any
8from django.db import transaction
9from ahuora_builder_types.scenario_schema import UnfixedVariableSchema, OptimizationSchema
10from core.auxiliary.models.Scenario import Scenario, OptimizationDegreesOfFreedom
11from core.exceptions import DetailedException
12from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
13from ahuora_builder_types import FlowsheetSchema
14from core.auxiliary.models.PropertyInfo import PropertyInfo
15from core.auxiliary.models.PropertyValue import PropertyValue
16from core.auxiliary.models.Solution import Solution
17from core.auxiliary.enums.unitsLibrary import units_library
18from .adapters import arc_adapter
19from .adapters.convert_expression import convert_expression
20from .idaes_factory_context import IdaesFactoryContext, LiveSolveParams
21from .queryset_lookup import get_value_object
22from .unit_conversion import convert_value
23from idaes_factory.unit_conversion.unit_conversion import can_convert
24from idaes_factory.adapters.adapter_library import adapters
25from core.auxiliary.models.Scenario import Scenario, SolverOptionEnum
28dotenv.load_dotenv()
30# Todo: replace these with literal types from the Compounds/PP library
31Compound = str
32PropertyPackage = str
35class IdaesFactoryBuildException(DetailedException):
36 pass
38tracer = trace.get_tracer(settings.OPEN_TELEMETRY_TRACER_NAME)
40class IdaesFactory:
41 """
42 The IdaesFactory class is the core class for building
43 a flowsheet (JSON schema) that can be sent to the IDAES
44 solver, and for storing the results back in the database.
45 """
47 def __init__(
48 self,
49 group_id: int,
50 scenario: Scenario = None,
51 require_variables_fixed: bool = True,
52 solve_index: int | None = None,
54 ) -> None:
55 """Prepare a factory capable of serialising the requested flowsheet.
57 Args:
58 group_id: Identifier of the flowsheet to serialise.
59 scenario: Optional scenario providing solve configuration settings.
60 require_variables_fixed: Whether adapters should enforce fixed variables.
61 solve_index: Optional multi-steady-state index to bind to the context.
62 """
64 self.solve_index = solve_index
65 self.scenario = scenario
67 if scenario is not None:
68 is_dynamic = scenario.enable_dynamics
69 step_size = scenario.simulation_length / float(scenario.num_time_steps)
70 enable_rating = scenario.enable_rating
72 if scenario.enable_optimization:
73 # If we are doing optimization, we solve from the root
74 group_id = scenario.flowsheet.rootGrouping.id
75 else:
76 is_dynamic = False
77 step_size = 1 # Just need a placeholder value.
78 enable_rating = False
80 time_steps = list([ int(i) * step_size for i in range(0,scenario.num_time_steps)]) if is_dynamic else [0]
82 self.flowsheet = FlowsheetSchema(
83 group_id=group_id,
84 dynamic=is_dynamic,
85 time_set=time_steps,
86 property_packages=[],
87 unit_models=[],
88 arcs=[],
89 expressions=[],
90 optimizations=[],
91 is_rating_mode=enable_rating,
92 disable_initialization=getattr(scenario, "disable_initialization", False),
93 solver_option = getattr(scenario, "solver_option", "ipopt"),
94 )
96 # factory context
97 self.context = IdaesFactoryContext(
98 group_id,
99 require_variables_fixed=require_variables_fixed,
100 solve_index=solve_index,
101 time_steps=time_steps,
102 time_step_size=step_size,
103 scenario=scenario,
104 )
106 # Updates the context to use a different solve index.
107 # build() should be called after this to update the extracted flowsheet data.
108 def use_with_solve_index(self, solve_index: int) -> None:
109 """Rebind the factory to a different multi steady-state solve index.
111 Args:
112 solve_index: Index of the solve configuration within the scenario.
113 """
114 self.solve_index = solve_index
115 self.context.update_solve_index(self.solve_index)
117 @tracer.start_as_current_span("build_flowsheet")
118 def build(self):
119 """Populate the flowsheet schema with units, arcs, expressions, and metadata.
121 Raises:
122 IdaesFactoryBuildException: If any adapter fails during serialisation.
123 """
124 try:
125 self.setup_unit_models()
126 self.create_arcs()
127 self.add_property_packages()
128 self.add_expressions()
129 self.add_optimizations()
130 self.check_dependencies()
131 except Exception as e:
132 raise IdaesFactoryBuildException(e, "idaes_factory_build") from e
134 def clear_flowsheet(self) -> None:
135 """Reset the in-memory flowsheet while preserving configuration metadata."""
136 self.flowsheet = FlowsheetSchema(
137 group_id=self.flowsheet.group_id,
138 dynamic=self.flowsheet.dynamic,
139 time_set=self.flowsheet.time_set,
140 property_packages=[],
141 unit_models=[],
142 arcs=[],
143 expressions=[],
144 optimizations=[],
145 is_rating_mode=self.flowsheet.is_rating_mode,
146 disable_initialization=self.flowsheet.disable_initialization,
147 solver_option=self.flowsheet.solver_option
148 )
150 def add_property_packages(self) -> None:
151 """Attach any property packages collected during context loading."""
152 self.flowsheet.property_packages = self.context.property_packages
154 def setup_unit_models(self):
155 """Serialise all unit operations."""
156 # add all unit models
157 exclude = {"stream", "group", "recycle", "specificationBlock", "energy_stream","ac_stream", "humid_air_stream", "transformer_stream"}
158 for unit_model in self.context.exclude_object_type(exclude):
159 self.add_unit_model(unit_model)
161 def add_unit_model(self, unit_model: SimulationObject) -> None:
162 """Serialise and append a unit model using its registered adapter.
164 Args:
165 unit_model: Simulation object to convert into IDAES schema.
167 Raises:
168 Exception: If the adapter fails to serialise the unit model.
169 """
170 try:
171 adapter = adapters[unit_model.objectType]
172 schema = adapter.serialise(self.context, unit_model)
173 self.flowsheet.unit_models.append(schema)
174 except Exception as e:
175 raise Exception(
176 f"Error adding unit model {unit_model.componentName} to the flowsheet: {e}"
177 )
179 def add_expressions(self) -> None:
180 """Collect custom property expressions and expose them on the flowsheet."""
181 # expressions are stored in the property set of a group
182 # eg. the global base flowsheet object
183 simulation_object: SimulationObject
184 for simulation_object in self.context.exclude_object_type({"machineLearningBlock"}):
185 # skip machine learning blocks, their properties are handled differently. We still need to support them in future.
187 properties = simulation_object.properties
188 prop: PropertyInfo
189 for prop in properties.ContainedProperties.all():
190 if prop.key in simulation_object.schema.properties:
191 # This is a default property, we have already processed it.
192 # We only want to capture custom properties
193 continue
194 property_value = get_value_object(prop)
195 self.context.add_property_value_dependency(property_value)
196 self.flowsheet.expressions.append(
197 {
198 "id": property_value.id,
199 "name": prop.displayName,
200 "expression": convert_expression(property_value.formula),
201 }
202 )
204 def add_optimizations(self) -> None:
205 """Serialise scenario-level optimisation settings onto the flowsheet."""
206 # This method was originally written to return multiple optimisations.
207 # this doesn't make sense, but idaes_service hasn't been updated to only expect one.
208 # so for now, it sets optimisations to an array with one item
209 optimization = self.context.scenario
210 if optimization is None or optimization.enable_optimization is False:
211 # no optimization to add
212 return
213 sense = "minimize" if optimization.minimize else "maximize"
214 print(optimization.objective)
215 if optimization.objective is None: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true
216 raise ValueError("Please set an objective for the optimization to minimize or maximize.")
217 objective = get_value_object(optimization.objective)
219 degrees_of_freedom = []
220 degree_of_freedom: OptimizationDegreesOfFreedom
221 for degree_of_freedom in optimization.degreesOfFreedom.all():
222 property_value_id = degree_of_freedom.propertyValue_id
224 dof_schema = UnfixedVariableSchema(
225 id=property_value_id,
226 lower_bound=degree_of_freedom.lower_bound,
227 upper_bound=degree_of_freedom.upper_bound
228 )
229 degrees_of_freedom.append(dof_schema)
231 self.flowsheet.optimizations.append(OptimizationSchema(
232 objective=objective.id,
233 sense=sense,
234 unfixed_variables=degrees_of_freedom,
235 ))
237 def check_dependencies(self) -> None:
238 """Verify that all property value dependencies are serialised"""
239 if self.scenario is not None and self.scenario.enable_optimization is True:
240 # No need to check since we are serialising everything
241 return
242 serialised_property_values = self.context.serialised_property_values
243 for dependency, prop_values in self.context.property_value_dependencies.items():
244 if dependency not in serialised_property_values:
245 dependency_prop_value = PropertyValue.objects.get(id=dependency)
246 prop_info_list_str = ", ".join([f"{prop_value.get_simulation_object().componentName}/{prop_value.property.displayName}" for prop_value in prop_values])
247 raise Exception(f"Dependency property {dependency_prop_value.get_simulation_object().componentName}/{dependency_prop_value.property.displayName} is not serialised, but is required by properties {prop_info_list_str}.")
249 def create_arcs(self):
250 """Serialise stream-like objects into arc connections for the flowsheet."""
251 streams = self.context.filter_object_type({"stream", "energy_stream","ac_stream", "humid_air_stream"})
252 for stream in streams:
253 arc_schema = arc_adapter.create_arc(self.context, stream)
255 if arc_schema is not None:
256 self.flowsheet.arcs.append(arc_schema)
259# noinspection PyUnreachableCode
260def store_properties_schema(
261 properties_schema: list[SolvedPropertyValueSchema],
262 flowsheet_id: int,
263 scenario_id: int | None = None,
264 solve_index: int | None = None
265) -> None:
266 """Persist solved property values and dynamic results to the database.
268 Args:
269 properties_schema: Collection of property payloads returned by IDAES.
270 flowsheet_id: Identifier of the flowsheet whose properties were solved.
271 scenario_id: Optional scenario identifier associated with the solve.
272 solve_index: Multi-steady-state index for the stored values, if any.
273 """
274 properties_schema = [
275 SolvedPropertyValueSchema(**prop) if isinstance(prop, dict) else prop
276 for prop in properties_schema
277 ]
278 # create a id->property map
279 ids = [prop.id for prop in properties_schema]
280 property_values = PropertyValue.objects.filter(id__in=ids).select_related(
281 "property"
282 )
283 prop_map = {prop.id: prop for prop in property_values}
285 property_values = []
286 property_infos = []
287 dynamic_results = []
289 for prop_schema in properties_schema:
290 prop = prop_map.get(prop_schema.id, None)
291 if prop is None: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true
292 raise Exception(f"Property {prop_schema.id} not found in the database.")
294 property_info: PropertyInfo = prop.property
295 updated_value = prop_schema.value
297 updated_value = prop_schema.value
299 from_unit = prop_schema.unit
301 is_multi_steady_state = solve_index is not None
302 is_dynamics = scenario_id != None and isinstance(updated_value, list) and len(updated_value) > 1
304 # If we're doing MSS or dynamics, we need to create associated dynamic results.
305 if is_multi_steady_state or is_dynamics:
306 dynamic_result = Solution(
307 property=prop,
308 flowsheet_id=flowsheet_id,
309 solve_index=solve_index,
310 scenario_id=scenario_id
311 )
313 # If we're dealing with dynamics, the updated_value is a list of values.
314 # Otherwise, it's a single scalar value.
315 dynamic_result.values = (updated_value if isinstance(updated_value, list)
316 else [updated_value])
317 dynamic_results.append(dynamic_result)
318 continue
320 # else we can continue as normal
321 if prop_schema.unknown_units and not can_convert(
322 from_unit, property_info.unit
323 ):
324 # we don't know the category of unit_type this unit is in!
325 # default to "unknown" with a custom unit
326 property_info.unitType = "unknown"
327 property_info.unit = from_unit
328 to_unit = from_unit
329 # try to find the unit_type by looping through all
330 # the units library and checking the first unit in the unit_type
331 # to see if it can be converted
332 for unit_type in units_library.keys():
333 default_unit = units_library[unit_type][0]["value"]
334 if can_convert(from_unit, default_unit):
335 # update the unitType
336 property_info.unitType = unit_type
337 property_info.unit = default_unit
338 to_unit = default_unit
339 break
340 property_infos.append(property_info)
341 else:
342 to_unit = property_info.unit
344 # TODO: better handling of multi-dimensional indexed properties.
345 if isinstance(updated_value, list):
346 val = updated_value[0]
347 else:
348 val = updated_value
350 new_value = convert_value(val, from_unit=from_unit, to_unit=to_unit)
351 prop.value = new_value
352 property_values.append(prop)
354 with transaction.atomic():
355 # save the property values
356 PropertyValue.objects.bulk_update(property_values, ["value"])
357 Solution.objects.bulk_create(
358 dynamic_results,
359 update_conflicts=True,
360 update_fields=["values"],
361 unique_fields=["pk"],
362 )
364 # save the property infos
365 PropertyInfo.objects.bulk_update(property_infos, ["unitType", "unit"])
368def save_all_initial_values(unit_models: dict[str, Any]) -> None:
369 """Persist initial values returned from IDAES for each unit model.
371 Args:
372 unit_models: Mapping of unit model ids to serialised initial value payloads.
373 """
374 simulation_objects = { unit_op.id: unit_op for unit_op in (SimulationObject.objects
375 .filter(id__in=unit_models.keys())
376 .only("id", "initial_values")
377 )}
379 for unit_model_id, unit_model in unit_models.items():
380 simulation_object = simulation_objects[int(unit_model_id)]
381 initial_values = unit_model
382 simulation_object.initial_values = initial_values
384 SimulationObject.objects.bulk_update(simulation_objects.values(), ["initial_values"])