Coverage for backend/django/flowsheetInternals/unitops/models/simulation_object_factory.py: 94%
270 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1import itertools
2from typing import List, Tuple
4from core.auxiliary.models.Flowsheet import Flowsheet
5from core.auxiliary.models.IndexedItem import IndexedItem
6from core.auxiliary.models.PropertyValue import PropertyValue, PropertyValueIntermediate
7from core.auxiliary.models.PropertyInfo import PropertyInfo
8from core.auxiliary.models.RecycleData import RecycleData
9from core.auxiliary.models.PropertySet import PropertySet
10from core.auxiliary.enums import ConType, heat_exchange_ops
12from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
13from flowsheetInternals.unitops.models.Port import Port
14from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject
15from flowsheetInternals.graphicData.models.groupingModel import Grouping
16from flowsheetInternals.unitops.models.compound_propogation import update_compounds_on_add_stream
18from flowsheetInternals.unitops.config.config_methods import *
19from flowsheetInternals.unitops.config.config_base import configuration
20from common.config_types import *
21from core.auxiliary.models.ObjectTypeCounter import ObjectTypeCounter
22from core.auxiliary.views.ExtractSegmentDataFromFS import create_he_streams
25class SimulationObjectFactory:
26 def __init__(self) -> None:
27 self._flowsheet: Flowsheet | None = None
29 self.simulation_objects: list[SimulationObject] = []
30 self.graphic_objects: list[GraphicObject] = []
31 self.property_sets: list[PropertySet] = []
32 self.property_infos: list[PropertyInfo] = []
33 self.property_values: list[PropertyValue] = []
34 self.ports: list[Port] = []
35 self.index_items: list[IndexedItem] = []
36 self.property_value_indexed_items: List[PropertyValueIntermediate] = []
38 # conditionally created objects
39 self.recycle_data: list[RecycleData] = []
41 self._unitop: SimulationObject = None # the unitop of the object being created
42 self._current_port: Port = None # the current port for the stream being created
44 self._idx_map: dict[int, SimulationObject] = {} # map of id to simulation object
45 self._property_set_map: dict[int, list[PropertySet]] = {} # map of simulation object to property sets for the many-to-many relationship
46 self._index_items_map: dict[int, dict[str, list[IndexedItem]]] = {} # map of simulation object to indexed items for the many-to-many relationship
48 # For storing old property data when replacing the gut
49 self.old_properties: dict[str, any] = {}
51 @classmethod
52 def create_simulation_object(cls, coordinates: dict[str, float] | None = None, createPropertySet: bool = True, flowsheet=None, parentGroup=None, **kwargs) -> SimulationObject:
53 """
54 Creates a new unitop with attached ports, streams etc
55 """
56 factory = SimulationObjectFactory()
58 # Create unitop
59 # if parentGroup is provided, fetch it, else fallback to flowsheet.rootGrouping.
60 if parentGroup:
61 group = Grouping.objects.get(pk=parentGroup)
62 else:
63 if flowsheet: 63 ↛ 66line 63 didn't jump to line 66 because the condition on line 63 was always true
64 group = flowsheet.rootGrouping
65 else:
66 group = None # or raise an error if flowsheet is required
67 factory._flowsheet = flowsheet
68 object_type = kwargs.pop("objectType")
69 if kwargs.get("schema") is not None:
70 object_schema = kwargs.pop("schema")
71 else:
72 object_schema: ObjectType = configuration[object_type]
75 if coordinates is None:
76 coordinates = {"x": 0, "y": 0}
78 unitop = factory.create(
79 object_type=object_type,
80 object_schema=object_schema,
81 coordinates=coordinates,
82 flowsheet=flowsheet,
83 createPropertySet=createPropertySet,
84 parentGroup= group,
85 )
87 factory._unitop = unitop
88 graphic_object = factory.graphic_objects[-1]
90 # count the number of inlet and outlet ports
91 num_inlets = len([port for port in factory.ports if port.direction == ConType.Inlet])
93 num_outlets = len([port for port in factory.ports
94 if ((port.direction == ConType.Outlet) and
95 (factory._unitop.schema.ports[port.key].makeStream==True))])
100 inlet_index = 0
101 outlet_index = 0
103 if kwargs.get("create_attached_streams", True):
104 for port in factory.ports:
105 factory._current_port = port
106 port_schema = factory._unitop.schema.ports[port.key]
107 if port_schema.makeStream is False:
108 continue
109 stream_type = port_schema.streamType
111 stream_schema = configuration[stream_type]
112 # evenly space the ports along the sides of the unitop
113 if port.direction == ConType.Inlet:
114 coordinates = port.default_stream_position(factory._unitop, graphic_object, inlet_index, num_inlets)
115 inlet_index += 1
116 else:
117 coordinates = port.default_stream_position(factory._unitop, graphic_object, outlet_index, num_outlets)
118 outlet_index += 1
120 stream_name = port.default_stream_name(factory._unitop)
121 stream = factory.create(
122 object_type=stream_type,
123 object_schema=stream_schema,
124 coordinates=coordinates,
125 flowsheet=flowsheet,
126 componentName=stream_name,
127 parentGroup=group,
128 )
130 port.stream = stream
132 # save all created objects
133 factory.perform_bulk_create()
135 if unitop.objectType in heat_exchange_ops:
136 create_he_streams(unitop, group)
138 # Propagate streams to other groups for the newly created unitop
139 inlet_streams = []
140 outlet_streams = []
141 for port in unitop.ports.all():
142 if port.stream:
143 if port.direction == ConType.Inlet:
144 inlet_streams.append(port.stream)
145 else:
146 outlet_streams.append(port.stream)
147 from flowsheetInternals.graphicData.logic.make_group import propagate_streams
148 propagate_streams(inlet_streams, ConType.Inlet)
149 propagate_streams(outlet_streams, ConType.Outlet)
151 return unitop
154 @classmethod
155 def create_stream_at_port(cls, port: Port) -> SimulationObject:
156 """
157 Creates a new stream attached to the specified port.
158 """
159 factory = SimulationObjectFactory()
160 factory._current_port = port
161 factory._unitop = port.unitOp
162 factory._flowsheet = factory._unitop.flowsheet
164 # Figure out what type of stream to create based on port config
165 # E.g energy_stream, stream, etc
166 object_type = factory._unitop.schema.ports[port.key].streamType
167 object_schema: ObjectType = configuration[object_type]
169 # get the coordinates for the stream
170 coordinates = cls.default_stream_position(factory._unitop, port)
172 #get the group of the unitop that the stream is attached to
173 group = factory._unitop.graphicObject.last().group
175 # create the stream
176 stream = factory.create(
177 object_type=object_type,
178 object_schema=object_schema,
179 coordinates=coordinates,
180 flowsheet=factory._unitop.flowsheet,
181 parentGroup=group,
182 )
183 factory.perform_bulk_create()
185 # attach the stream to the port
186 port.stream = stream
187 port.save()
189 #populate the stream with compounds
190 update_compounds_on_add_stream(port, stream)
192 return stream
195 @classmethod
196 def default_stream_position(cls, unitop: SimulationObject, port: Port) -> dict[str, float]:
197 """
198 Returns the default position for a stream attached to the specified port.
199 """
200 graphic_object = unitop.graphicObject.last() # the unit op only has one graphic object so this is okay.
201 attached_ports = unitop.ports.filter(direction=port.direction)
202 port_index = list(attached_ports).index(port)
203 return port.default_stream_position(unitop, graphic_object, port_index, attached_ports.count())
206 def perform_bulk_create(self) -> None:
207 """
208 Saves all created objects to the database.
209 """
210 SimulationObject.objects.bulk_create(self.simulation_objects)
211 GraphicObject.objects.bulk_create(self.graphic_objects)
212 PropertySet.objects.bulk_create(self.property_sets)
213 PropertyInfo.objects.bulk_create(self.property_infos)
214 PropertyValue.objects.bulk_create(self.property_values)
215 Port.objects.bulk_create(self.ports)
216 RecycleData.objects.bulk_create(self.recycle_data)
217 IndexedItem.objects.bulk_create(self.index_items)
218 PropertyValueIntermediate.objects.bulk_create(self.property_value_indexed_items)
220 def create(
221 self, object_type: str, object_schema: ObjectType, coordinates: dict[str, float], flowsheet: Flowsheet | None = None, parentGroup: Grouping | None = None, componentName: str | None = None, createPropertySet: bool = True
222 ) -> SimulationObject:
223 """
224 Creates a new simulation object
225 does not save the instance to the database.
227 @param object_type: The type of object to create
228 @param object_schema: The schema of the object
229 @param coordinates: The coordinates of the object
230 @param flowsheet: The flowsheet owner of the object
232 @return: The created simulation object, graphic object, property sets, property infos, property packages, and ports.
233 this is so that the caller can create multiple objects at once and do a bulk_create
234 """
235 if flowsheet is not None: 235 ↛ 238line 235 didn't jump to line 238 because the condition on line 235 was always true
236 idx_for_type = ObjectTypeCounter.next_for(flowsheet, object_type)
237 else:
238 last_sim_obj = SimulationObject.objects.last()
239 if last_sim_obj is None:
240 id = 1
241 else:
242 last_sim_obj = SimulationObject.objects.last()
243 idx_for_type = (last_sim_obj.id + 1) if last_sim_obj else 1
245 if componentName is None:
246 componentName = f"{object_schema.displayType}{idx_for_type}"
248 elif "stream" not in object_type and componentName != object_schema.displayName:
249 componentName = componentName
250 else:
251 componentName = f"{componentName}{idx_for_type}"
254 # create simulation object
255 fields = {
256 "objectType": object_type,
257 "componentName": componentName,
258 }
259 instance = SimulationObject(**fields)
260 self.simulation_objects.append(instance)
261 idx = len(self._idx_map)
262 self._idx_map[idx] = instance
263 self._property_set_map[idx] = []
265 # create index items
266 self._index_items_map[idx] = {}
267 for index_set in object_schema.indexSets:
268 self.create_indexed_items(instance, index_set, idx)
270 graphic_object_schema = object_schema.graphicObject
272 graphicObject = GraphicObject(
273 simulationObject=instance,
274 x=coordinates["x"] - graphic_object_schema.width / 2, # center the object horizontally
275 y=coordinates["y"] - graphic_object_schema.height / 2, # center the object vertically
276 width = graphic_object_schema.width,
277 height = graphic_object_schema.height,
278 visible=True,
279 group=parentGroup,
280 flowsheet=flowsheet
281 )
282 self.graphic_objects.append(graphicObject)
284 # If flowsheetKey is given, set flowsheet
285 if flowsheet is not None: 285 ↛ 290line 285 didn't jump to line 290 because the condition on line 285 was always true
286 instance.flowsheet = flowsheet
289 # Create property sets
290 if createPropertySet: 290 ↛ 294line 290 didn't jump to line 294 because the condition on line 290 was always true
291 self.create_property_set(object_schema, idx)
293 # Create ports
294 ports_schema = object_schema.ports or {}
295 # replace any many=True ports with multiple ports
296 for key, port_dict in ports_schema.items():
297 if port_dict.many:
298 # replace this key value pair with a list of ports up to the default provided
299 for i in range(port_dict.default):
300 self.ports.append(Port(
301 key=key,
302 index=i,
303 direction=port_dict.type,
304 displayName=port_dict.displayName + f" {i+1}",
305 unitOp=instance,
306 flowsheet=flowsheet
307 ))
308 else:
309 self.ports.append(Port(
310 key=key,
311 direction=port_dict.type,
312 displayName=port_dict.displayName,
313 unitOp=instance,
314 flowsheet=flowsheet
315 ))
317 # conditionally create attached objects based on object type
318 if object_type == "recycle":
319 self.recycle_data.append(RecycleData(
320 simulationObject=instance,
321 flowsheet=flowsheet
322 ))
324 return instance
326 def store_old_properties(self, instance: SimulationObject):
327 """
328 Stores the old properties of the stream.
329 Used when switching stream types to keep properties that exist in both schema.
330 Prevents losing user input in those properties.
331 e.g. If Molar Flow has a value, switching to Humid Air won't remove that value.
332 """
333 self.old_properties = {}
334 if instance.properties: 334 ↛ exitline 334 didn't return from function 'store_old_properties' because the condition on line 334 was always true
335 for prop_info in instance.properties.containedProperties.all():
336 try:
337 self.old_properties[prop_info.key] = prop_info.get_value_bulk()
338 except ValueError:
339 self.old_properties[prop_info.key] = None
341 def replace_the_gut(self, instance: SimulationObject):
342 self.simulation_objects.append(instance)
343 self._unitop = instance
344 self._current_port = instance.connectedPorts.first()
346 object_schema = configuration[instance.objectType]
347 idx = len(self._idx_map)
348 self._idx_map[idx] = instance
349 self._property_set_map[idx] = []
351 # create index items
352 self._index_items_map[idx] = {}
353 for index_set in object_schema.indexSets:
354 self.create_indexed_items(instance, index_set, idx)
356 self.create_property_set(object_schema, idx)
359 def create_property_set(self, object_schema: ObjectType, idx: int) -> PropertySet:
360 """
361 Creates a new SimulationObjectPropertySet instance with the specified schema.
362 """
363 defaults = {
364 'compoundMode': "",
365 "simulationObject": self._idx_map[idx],
366 }
367 for schema in object_schema.propertySetGroups.values():
368 if schema.type == "composition":
369 defaults['compoundMode'] = "MassFraction"
371 # Create SimulationObjectPropertySet
372 instance = PropertySet(**defaults, flowsheet=self._flowsheet)
374 if object_schema.properties != {}: 374 ↛ 385line 374 didn't jump to line 385 because the condition on line 374 was always true
375 # Create PropertyInfo objects
376 property_pairs = self.create_property_infos(instance, object_schema.properties, idx)
378 # set access based on the schema
379 self.set_properties_access(
380 config=object_schema,
381 properties=property_pairs,
382 idx=idx,
383 )
385 self.property_sets.append(instance)
386 self._property_set_map[idx].append(instance)
388 return instance
391 def create_property_infos(self, property_set: PropertySet, schema: PropertiesType, idx) -> List[Tuple[PropertyInfo, List[PropertyValue]]]:
392 """
393 Creates PropertyInfo objects based on the specified schema.
394 """
395 res: List[Tuple[PropertyInfo, List[PropertyValue]]] = []
397 # if schema.type == "composition":
398 # # the composition property set has no property infos, since these are compounds selected by the user
399 # # TODO: Initialise the properties based on the compounds upstream (if any)
400 # flowsheet = self._flowsheet
401 for key, prop in schema.items():
402 fields = get_property_fields(key, prop, property_set)
404 if self.old_properties:
405 # If property in the schema is also in the old properties, keep the value
406 if key in self.old_properties.keys():
407 fields["value"] = self.old_properties[key]
409 new_property_info, new_property_values = self.create_property_info(idx, prop.indexSets, **fields)
410 res.append((new_property_info, new_property_values))
412 return res
415 def create_property_info(self, idx, index_sets=None, **fields):
416 value = fields.pop("value")
417 property_info = PropertyInfo(**fields, flowsheet=self._flowsheet)
418 self.property_infos.append(property_info)
419 # Create a property value object with this value
420 if index_sets == None:
421 property_value = PropertyValue(value=value, property=property_info, flowsheet=self._flowsheet)
422 self.property_values.append(property_value)
423 return property_info, [property_value]
424 else:
425 combinations = self.get_combinations(self._index_items_map[idx], index_sets)
426 property_values = []
427 for indexes in combinations:
428 property_value = PropertyValue(value=value, property=property_info, flowsheet=self._flowsheet)
429 for indexed_item in indexes:
430 self.property_value_indexed_items.append(
431 PropertyValueIntermediate(propertyvalue=property_value, indexeditem=indexed_item)
432 )
433 property_values.append(property_value)
434 self.property_values.extend(property_values)
435 return property_info, property_values
438 def get_combinations(self, index_items_map: dict[str, list[IndexedItem]], index_sets=[]) -> list[list[IndexedItem]]:
439 """
440 Returns all possible combinations of indexed items for the specified index sets
441 (taking one item from each index set).
442 """
443 if index_items_map == {}: 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true
444 return []
445 indexes = [value for key, value in index_items_map.items() if key in index_sets]
446 return list(itertools.product(*indexes))
449 def set_properties_access(
450 self,
451 config: ObjectType,
452 properties: List[Tuple[PropertyInfo, List[PropertyValue]]],
453 idx: int,
454 ) -> None:
455 if (
456 self.simulation_objects[-1].is_stream()
457 and self._unitop.objectType != "recycle"
458 and self._current_port and self._current_port.direction == ConType.Outlet
459 ):
460 # disable outlet/intermediate stream properties
461 for propertyInfo, propVals in properties:
462 for property_value in propVals:
463 property_value.enabled = False
464 else:
465 index_map_items = self._index_items_map[idx]
466 # TODO: Refactor this.
467 # not disabled if it is a state variable
468 for prop, propVals in properties:
469 config_prop = config.properties.get(prop.key)
470 # # figure out the number of items in the first index set
471 if config_prop.indexSets:
472 first_index_type = config_prop.indexSets[0]
473 if first_index_type in index_map_items: 473 ↛ 480line 473 didn't jump to line 480 because the condition on line 473 was always true
474 if len(index_map_items[first_index_type]) == 0:
475 last_item = None
476 else:
477 last_item = index_map_items[first_index_type][-1]
478 # last_item is only needed on index sets.
479 # it should be fine if it's undefined otherwise.
480 for index, property_value in enumerate(propVals):
481 if config_prop: 481 ↛ 484line 481 didn't jump to line 484 because the condition on line 481 was always true
482 group = config_prop.propertySetGroup
483 else:
484 group = "default"
486 config_group = config.propertySetGroups.get(group, None)
487 if config_group is None: 487 ↛ 488line 487 didn't jump to line 488 because the condition on line 487 was never true
488 property_value.enabled = True
489 elif config_group.type == "exceptLast" or config_prop.sumToOne:
490 # TODO: Deprecate exceptLast in favor of sumToOne. See phase_seperator_config
491 # Instead of using len_last_item, just use the last item in the index set
492 indexed_item_links = [item for item in self.property_value_indexed_items
493 if item.propertyvalue == property_value]
494 indexed_items = [item.indexeditem for item in indexed_item_links]
495 is_last_item = last_item in indexed_items
497 if is_last_item:
498 property_value.enabled = False
499 else:
500 property_value.enabled = True
501 elif config_group.type == "stateVars": 501 ↛ 505line 501 didn't jump to line 505 because the condition on line 501 was always true
502 state_vars = getattr(config_group, "stateVars") or ()
503 property_value.enabled = prop.key in state_vars
504 else: # eg. All
505 property_value.enabled = True
508 def create_indexed_items(self, instance: SimulationObject, index_set: str, idx: int) -> None:
509 """
510 Creates IndexedItem instances for the specified index set.
511 """
512 items = []
514 outlet_name = instance.schema.splitter_fraction_name
515 match index_set:
516 case "splitter_fraction":
517 items = []
518 # create indexed items for the splitter_fraction index set
519 items.append(IndexedItem(
520 owner=instance,
521 key="outlet_1",
522 displayName= outlet_name + " 1",
523 type=index_set,
524 flowsheet=self._flowsheet
525 ))
526 items.append(IndexedItem(
527 owner=instance,
528 key="outlet_2",
529 displayName= outlet_name + " 2",
530 type=index_set,
531 flowsheet=self._flowsheet
532 ))
534 case "phase":
535 # create indexed items for the phase index set
536 items.append(IndexedItem(
537 owner=instance,
538 key="Liq",
539 displayName="Liquid",
540 type=index_set,
541 flowsheet=self._flowsheet
542 ))
543 items.append(IndexedItem(
544 owner=instance,
545 key="Vap",
546 displayName="Vapor",
547 type=index_set,
548 flowsheet=self._flowsheet
549 ))
551 case "compound":
552 pass # have to wait for user to select compounds
553 self._index_items_map[idx][index_set] = items
554 self.index_items.extend(items)