Coverage for backend/idaes_factory/idaes_factory.py: 88%
154 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1import traceback
3from opentelemetry import trace
4from CoreRoot import settings
5from common.models.idaes.unit_model_schema import SolvedPropertyValueSchema
6import dotenv
7from typing import Any
8from django.db import transaction
9from common.models.idaes.scenario_schema import UnfixedVariableSchema, OptimizationSchema
10from core.auxiliary.models.Scenario import Scenario, OptimizationDegreesOfFreedom
11from core.exceptions import DetailedException
12from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
13from common.models.idaes import FlowsheetSchema
14from core.auxiliary.models.PropertyInfo import PropertyInfo
15from core.auxiliary.models.PropertyValue import PropertyValue
16from core.auxiliary.models.Solution import Solution
17from core.auxiliary.enums.unitsLibrary import units_library
18from .adapters import arc_adapter
19from .adapters.convert_expression import convert_expression
20from .idaes_factory_context import IdaesFactoryContext, LiveSolveParams
21from .queryset_lookup import get_value_object
22from .unit_conversion import convert_value
23from idaes_factory.unit_conversion.unit_conversion import can_convert
24from idaes_factory.adapters.adapter_library import adapters
25from core.auxiliary.models.Scenario import Scenario, SolverOptionEnum
28dotenv.load_dotenv()
30# Todo: replace these with literal types from the Compounds/PP library
31Compound = str
32PropertyPackage = str
35class IdaesFactoryBuildException(DetailedException):
36 pass
38tracer = trace.get_tracer(settings.OPEN_TELEMETRY_TRACER_NAME)
40class IdaesFactory:
41 """
42 The IdaesFactory class is the core class for building
43 a flowsheet (JSON schema) that can be sent to the IDAES
44 solver, and for storing the results back in the database.
45 """
47 def __init__(
48 self, flowsheet_id: int,
49 scenario: Scenario = None,
50 require_variables_fixed: bool = True,
51 solve_index: int | None = None,
53 ) -> None:
54 """Prepare a factory capable of serialising the requested flowsheet.
56 Args:
57 flowsheet_id: Identifier of the flowsheet to serialise.
58 scenario: Optional scenario providing solve configuration settings.
59 require_variables_fixed: Whether adapters should enforce fixed variables.
60 solve_index: Optional multi-steady-state index to bind to the context.
61 """
63 self.solve_index = solve_index
64 self.scenario = scenario
66 if scenario is not None:
67 is_dynamic = scenario.enable_dynamics
68 step_size = scenario.simulation_length / float(scenario.num_time_steps)
69 enable_rating = scenario.enable_rating
70 else:
71 is_dynamic = False
72 step_size = 1 # Just need a placeholder value.
73 enable_rating = False
75 time_steps = list([ int(i) * step_size for i in range(0,scenario.num_time_steps)]) if is_dynamic else [0]
77 self.flowsheet = FlowsheetSchema(
78 id=flowsheet_id,
79 dynamic=is_dynamic,
80 time_set=time_steps,
81 property_packages=[],
82 unit_models=[],
83 arcs=[],
84 expressions=[],
85 optimizations=[],
86 is_rating_mode=enable_rating,
87 disable_initialization=getattr(scenario, "disable_initialization", False),
88 solver_option = getattr(scenario, "solver_option", "ipopt"),
89 )
91 # factory context
92 self.context = IdaesFactoryContext(
93 flowsheet_id,
94 require_variables_fixed=require_variables_fixed,
95 solve_index=solve_index,
96 time_steps=time_steps,
97 time_step_size=step_size,
98 scenario=scenario,
99 )
101 # Updates the context to use a different solve index.
102 # build() should be called after this to update the extracted flowsheet data.
103 def use_with_solve_index(self, solve_index: int) -> None:
104 """Rebind the factory to a different multi steady-state solve index.
106 Args:
107 solve_index: Index of the solve configuration within the scenario.
108 """
109 self.solve_index = solve_index
110 self.context.update_solve_index(self.solve_index)
112 @tracer.start_as_current_span("build_flowsheet")
113 def build(self):
114 """Populate the flowsheet schema with units, arcs, expressions, and metadata.
116 Raises:
117 IdaesFactoryBuildException: If any adapter fails during serialisation.
118 """
119 try:
120 self.setup_unit_models()
121 self.create_arcs()
122 self.add_property_packages()
123 self.add_expressions()
124 self.add_optimizations()
125 except Exception as e:
126 raise IdaesFactoryBuildException(e, "idaes_factory_build") from e
128 def clear_flowsheet(self) -> None:
129 """Reset the in-memory flowsheet while preserving configuration metadata."""
130 self.flowsheet = FlowsheetSchema(
131 id=self.flowsheet.id,
132 dynamic=self.flowsheet.dynamic,
133 time_set=self.flowsheet.time_set,
134 property_packages=[],
135 unit_models=[],
136 arcs=[],
137 expressions=[],
138 optimizations=[],
139 is_rating_mode=self.flowsheet.is_rating_mode,
140 disable_initialization=self.flowsheet.disable_initialization,
141 solver_option=self.flowsheet.solver_option
142 )
144 def add_property_packages(self) -> None:
145 """Attach any property packages collected during context loading."""
146 self.flowsheet.property_packages = self.context.property_packages
148 def setup_unit_models(self):
149 """Serialise all unit operations."""
150 # add all unit models
151 exclude = {"stream", "group", "recycle", "specificationBlock", "energy_stream","ac_stream", "humid_air_stream", "transformer_stream"}
152 for unit_model in self.context.exclude_object_type(exclude):
153 self.add_unit_model(unit_model)
155 def add_unit_model(self, unit_model: SimulationObject) -> None:
156 """Serialise and append a unit model using its registered adapter.
158 Args:
159 unit_model: Simulation object to convert into IDAES schema.
161 Raises:
162 Exception: If the adapter fails to serialise the unit model.
163 """
164 try:
165 adapter = adapters[unit_model.objectType]
166 schema = adapter.serialise(self.context, unit_model)
167 self.flowsheet.unit_models.append(schema)
168 except Exception as e:
169 raise Exception(
170 f"Error adding unit model {unit_model.componentName} to the flowsheet: {e}"
171 )
173 def add_expressions(self) -> None:
174 """Collect custom property expressions and expose them on the flowsheet."""
175 # expressions are stored in the property set of a group
176 # eg. the global base flowsheet object
177 simulation_object: SimulationObject
178 for simulation_object in self.context.exclude_object_type({"machineLearningBlock"}):
179 # skip machine learning blocks, their properties are handled differently. We still need to support them in future.
181 properties = simulation_object.properties
182 prop: PropertyInfo
183 for prop in properties.ContainedProperties.all():
184 if prop.key in simulation_object.schema.properties:
185 # This is a default property, we have already processed it.
186 # We only want to capture custom properties
187 continue
188 property_value = get_value_object(prop)
189 self.flowsheet.expressions.append(
190 {
191 "id": property_value.id,
192 "name": prop.displayName,
193 "expression": convert_expression(property_value.formula),
194 }
195 )
197 def add_optimizations(self) -> None:
198 """Serialise scenario-level optimisation settings onto the flowsheet."""
199 # This method was originally written to return multiple optimisations.
200 # this doesn't make sense, but idaes_service hasn't been updated to only expect one.
201 # so for now, it sets optimisations to an array with one item
202 optimization = self.context.scenario
203 if optimization is None or optimization.enable_optimization is False: 203 ↛ 206line 203 didn't jump to line 206 because the condition on line 203 was always true
204 # no optimization to add
205 return
206 sense = "minimize" if optimization.minimize else "maximize"
207 print(optimization.objective)
208 if optimization.objective is None:
209 raise ValueError("Please set an objective for the optimization to minimize or maximize.")
210 objective = get_value_object(optimization.objective)
212 degrees_of_freedom = []
213 degree_of_freedom: OptimizationDegreesOfFreedom
214 for degree_of_freedom in optimization.degreesOfFreedom.all():
215 property_value_id = degree_of_freedom.propertyValue_id
217 dof_schema = UnfixedVariableSchema(
218 id=property_value_id,
219 lower_bound=degree_of_freedom.lower_bound,
220 upper_bound=degree_of_freedom.upper_bound
221 )
222 degrees_of_freedom.append(dof_schema)
224 self.flowsheet.optimizations.append(OptimizationSchema(
225 objective=objective.id,
226 sense=sense,
227 unfixed_variables=degrees_of_freedom,
228 ))
230 def create_arcs(self):
231 """Serialise stream-like objects into arc connections for the flowsheet."""
232 streams = self.context.filter_object_type({"stream", "energy_stream","ac_stream", "humid_air_stream"})
233 for stream in streams:
234 arc_schema = arc_adapter.create_arc(self.context, stream)
236 if arc_schema is not None:
237 self.flowsheet.arcs.append(arc_schema)
240# noinspection PyUnreachableCode
241def store_properties_schema(
242 properties_schema: list[SolvedPropertyValueSchema],
243 flowsheet_id: int,
244 scenario_id: int | None = None,
245 solve_index: int | None = None
246) -> None:
247 """Persist solved property values and dynamic results to the database.
249 Args:
250 properties_schema: Collection of property payloads returned by IDAES.
251 flowsheet_id: Identifier of the flowsheet whose properties were solved.
252 scenario_id: Optional scenario identifier associated with the solve.
253 solve_index: Multi-steady-state index for the stored values, if any.
254 """
255 properties_schema = [
256 SolvedPropertyValueSchema(**prop) if isinstance(prop, dict) else prop
257 for prop in properties_schema
258 ]
259 # create a id->property map
260 ids = [prop.id for prop in properties_schema]
261 property_values = PropertyValue.objects.filter(id__in=ids).select_related(
262 "property"
263 )
264 prop_map = {prop.id: prop for prop in property_values}
266 property_values = []
267 property_infos = []
268 dynamic_results = []
270 for prop_schema in properties_schema:
271 prop = prop_map.get(prop_schema.id, None)
272 if prop is None: 272 ↛ 273line 272 didn't jump to line 273 because the condition on line 272 was never true
273 raise Exception(f"Property {prop_schema.id} not found in the database.")
275 property_info: PropertyInfo = prop.property
276 updated_value = prop_schema.value
278 updated_value = prop_schema.value
280 from_unit = prop_schema.unit
282 is_multi_steady_state = solve_index is not None
283 is_dynamics = scenario_id != None and isinstance(updated_value, list) and len(updated_value) > 1
285 # If we're doing MSS or dynamics, we need to create associated dynamic results.
286 if is_multi_steady_state or is_dynamics:
287 dynamic_result = Solution(
288 property=prop,
289 flowsheet_id=flowsheet_id,
290 solve_index=solve_index,
291 scenario_id=scenario_id
292 )
294 # If we're dealing with dynamics, the updated_value is a list of values.
295 # Otherwise, it's a single scalar value.
296 dynamic_result.values = (updated_value if isinstance(updated_value, list)
297 else [updated_value])
298 dynamic_results.append(dynamic_result)
299 continue
301 # else we can continue as normal
302 if prop_schema.unknown_units and not can_convert(
303 from_unit, property_info.unit
304 ):
305 # we don't know the category of unit_type this unit is in!
306 # default to "unknown" with a custom unit
307 property_info.unitType = "unknown"
308 property_info.unit = from_unit
309 to_unit = from_unit
310 # try to find the unit_type by looping through all
311 # the units library and checking the first unit in the unit_type
312 # to see if it can be converted
313 for unit_type in units_library.keys():
314 default_unit = units_library[unit_type][0]["value"]
315 if can_convert(from_unit, default_unit):
316 # update the unitType
317 property_info.unitType = unit_type
318 property_info.unit = default_unit
319 to_unit = default_unit
320 break
321 property_infos.append(property_info)
322 else:
323 to_unit = property_info.unit
325 # TODO: better handling of multi-dimensional indexed properties.
326 if isinstance(updated_value, list):
327 val = updated_value[0]
328 else:
329 val = updated_value
331 new_value = convert_value(val, from_unit=from_unit, to_unit=to_unit)
332 prop.value = new_value
333 property_values.append(prop)
335 with transaction.atomic():
336 # save the property values
337 PropertyValue.objects.bulk_update(property_values, ["value"])
338 Solution.objects.bulk_create(
339 dynamic_results,
340 update_conflicts=True,
341 update_fields=["values"],
342 unique_fields=["pk"],
343 )
345 # save the property infos
346 PropertyInfo.objects.bulk_update(property_infos, ["unitType", "unit"])
349def save_all_initial_values(unit_models: dict[str, Any]) -> None:
350 """Persist initial values returned from IDAES for each unit model.
352 Args:
353 unit_models: Mapping of unit model ids to serialised initial value payloads.
354 """
355 simulation_objects = { unit_op.id: unit_op for unit_op in (SimulationObject.objects
356 .filter(id__in=unit_models.keys())
357 .only("id", "initial_values")
358 )}
360 for unit_model_id, unit_model in unit_models.items():
361 simulation_object = simulation_objects[int(unit_model_id)]
362 initial_values = unit_model
363 simulation_object.initial_values = initial_values
365 SimulationObject.objects.bulk_update(simulation_objects.values(), ["initial_values"])