Coverage for backend/django/flowsheetInternals/unitops/models/SimulationObject.py: 87%
546 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1from django.db import models
2from django.db.models import QuerySet
3from core.managers import SoftDeleteManager
4from core.auxiliary.enums.unitOpData import SimulationObjectClass
5from core.auxiliary.models.Flowsheet import Flowsheet
6from core.auxiliary.models.PropertySet import PropertySet
7from core.auxiliary.models.PropertyInfo import PropertyInfo
8from core.auxiliary.enums.generalEnums import PropertyType as PropertyTypeChoices
10from typing import TYPE_CHECKING
11from core.auxiliary.models.PropertyValue import PropertyValue, PropertyValueIntermediate
12from core.auxiliary.models.IndexedItem import IndexedItem, IndexChoices
13from typing import Set
15from flowsheetInternals.unitops.models.Port import Port
16from typing import Iterable
17from flowsheetInternals.unitops.config.config_methods import *
18from common.config_types import *
19import itertools
21from .compound_propogation import update_compounds_on_set, update_compounds_on_merge, _get_compound_keys, \
22 update_decision_node_and_propagate, update_compounds_on_add_stream
23from typing import Optional, List
24from ..methods.add_expression import add_expression as _add_expression
26if TYPE_CHECKING:
27 from core.auxiliary.models.PropertySet import PropertySet
28 from core.auxiliary.models.PropertyInfo import PropertyInfo
29 from core.auxiliary.models.RecycleData import RecycleData
30 from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject
31 from flowsheetInternals.graphicData.models.groupingModel import Grouping
32 from flowsheetInternals.unitops.models.Port import Port
33 from core.auxiliary.models.CustomPropertyPackage import CustomPropertyPackage
36class SimulationObject(models.Model):
37 flowsheet = models.ForeignKey(Flowsheet, on_delete=models.CASCADE, related_name="flowsheetObjects")
38 componentName = models.CharField(max_length=64)
39 objectType = models.CharField(choices=SimulationObjectClass.choices)
41 created_at = models.DateTimeField(auto_now_add=True)
42 is_deleted = models.BooleanField(default=False)
43 initial_values = models.JSONField(null=True, blank=True)
44 propertyPackageType = models.CharField(max_length=64, default="helmholtz")
45 customPackage = models.ForeignKey["CustomPropertyPackage"](
46 "core_auxiliary.CustomPropertyPackage",
47 on_delete=models.SET_NULL,
48 null=True, blank=True,
49 related_name="simulationObjects")
51 # add a soft delete manager
52 objects = SoftDeleteManager()
53 add_expression = _add_expression
54 # runtime-accessed attributes
55 properties: "PropertySet"
56 graphicObject: "GraphicObject"
57 ports: QuerySet["Port"]
58 connectedPorts: QuerySet["Port"]
59 recycleConnection: "Optional[RecycleData]"
60 recycleData: "Optional[RecycleData]"
62 @property
63 def schema(self) -> ObjectType:
64 return get_object_schema(self)
66 @property
67 def has_recycle_connection(self) -> bool:
68 return hasattr(self, "recycleConnection")
70 def is_stream(self) -> bool:
71 return self.schema.is_stream
73 def get_stream(self, key: str, index: int = 0) -> "SimulationObject":
74 """
75 Returns the stream attached to the port with the given key
76 """
77 port = self.get_port(key, index)
78 stream = port.stream
79 return stream
81 def get_group(self) -> "Grouping":
82 """
83 Returns the group that this object belongs to
84 """
85 if not self.is_stream(): 85 ↛ 90line 85 didn't jump to line 90 because the condition on line 85 was always true
86 # There is only one graphic object, so we can just return the group
87 return self.graphicObject.last().group
88 else:
89 # Streams arent' really in a group, throw an error
90 raise ValueError("Streams do not belong to a group")
92 def get_groups(self) -> Iterable["Grouping"]:
93 """
94 Returns an iterable of groups that this object belongs to. Mostly needed for streams that have many groups.
95 """
96 return (graphic.group for graphic in self.graphicObject.all())
98 def get_parent_groups(self) -> List["Grouping"]:
99 """
100 Returns an iterable of parent groups that this object belongs to.
101 """
102 parent_groups = []
103 current_group = self.get_group()
104 while current_group:
105 parent_groups.append(current_group)
106 simulationObject = current_group.simulationObject
107 if simulationObject is None: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 break
109 current_group = simulationObject.get_group()
110 return parent_groups
112 def get_property_package(self, name: str | None = None):
113 """
114 Returns the property package slot with the given name
115 """
116 return self.propertyPackageType
118 def set_property_package(self, property_package, name: str | None = None) -> None:
119 """
120 Sets the property package for this object
121 """
122 self.propertyPackageType = property_package
123 self.save()
125 def get_port(self, key: str, index: int = 0) -> Port:
126 """
127 Returns the port with the given key
128 """
129 try:
131 port: Port = self.ports.get(key=key, index=index)
132 return port
133 except Port.DoesNotExist:
134 raise ValueError(f"Port with key {key} does not exist on object {self.componentName}")
136 def reorder_object_ports(self):
137 """
138 Reorders port mappings by connected unit operation y position
139 :return: None
140 """
141 inlet_connections = [port for port in self.ports.filter(direction=ConType.Inlet).all()]
142 outlet_connections = [port for port in self.ports.filter(direction=ConType.Outlet).all()]
143 inlet_connections.sort(
144 key=lambda port: port.stream.connectedPorts.get(direction=ConType.Outlet).unitOp.graphicObject.last().y)
145 outlet_connections.sort(
146 key=lambda port: port.stream.connectedPorts.get(direction=ConType.Inlet).unitOp.graphicObject.last().y)
147 for i, port in enumerate(inlet_connections):
148 port.index = i
149 port.save()
150 for i, port in enumerate(outlet_connections):
151 port.index = i
152 port.save()
153 self.save()
155 def horizontally_center_graphic(self) -> None:
156 """
157 Horizontally centers GraphicObject for a stream or unit operation
158 Currently uni-directional - left to right connections
159 Centers based on inlet and outlet graphics.
160 :return: None
161 """
162 # TODO: Make this work as appropriate for streams with multiple graphic objects
163 # Is an Intermediate Stream
164 if self.objectType == SimulationObjectClass.Stream or SimulationObjectClass.EnergyStream or SimulationObjectClass.acStream and self.connectedPorts.count() > 1: 164 ↛ 177line 164 didn't jump to line 177 because the condition on line 164 was always true
165 inlet, outlet = self.connectedPorts.filter(direction=ConType.Outlet).first(), self.connectedPorts.filter(
166 direction=ConType.Inlet).first()
167 inlet_x, outlet_x = inlet.unitOp.graphicObject.last().x, outlet.unitOp.graphicObject.last().x
168 if inlet_x <= outlet_x:
169 self.graphicObject.last().x = abs(
170 (inlet_x + (inlet.unitOp.graphicObject.last().width)) + (outlet_x)) / 2
171 else:
172 self.graphicObject.last().x = abs(inlet_x + (outlet_x + outlet.unitOp.graphicObject.last().width)) / 2
173 # Flip Graphic Object horizontally
174 self.graphicObject.last().save()
175 self.save()
176 else:
177 inlet, outlet = self.ports.filter(direction=ConType.Inlet).first(), self.ports.filter(
178 direction=ConType.Outlet).first()
179 self.graphicObject.last().x = abs(
180 (inlet.stream.graphicObject.last().x + outlet.stream.graphicObject.last().x) / 2)
181 self.graphicObject.last().save()
182 self.save()
183 return
185 def vertically_center_graphic(self) -> None:
186 """
187 Vertically centers GraphicObject for a stream or unit operation
188 :return: None
189 """
190 # TODO: Make this work as appropriate for streams with multiple graphic objects
191 # Is an intermediate stream
192 if self.objectType == SimulationObjectClass.Stream or SimulationObjectClass.EnergyStream or SimulationObjectClass.acStream and self.connectedPorts.count() > 1:
193 inlet, outlet = self.connectedPorts.filter(direction=ConType.Inlet).first(), self.connectedPorts.filter(
194 direction=ConType.Outlet).first()
195 self.graphicObject.last().y = abs(
196 (inlet.unitOp.graphicObject.last().y + outlet.unitOp.graphicObject.last().y) / 2)
197 self.graphicObject.last().save()
198 self.save()
199 else:
200 inlet, outlet = self.ports.filter(direction=ConType.Inlet).first(), \
201 self.ports.filter(direction=ConType.Outlet).all()[1]
202 self.graphicObject.last().y = abs(
203 (inlet.stream.graphicObject.last().y + outlet.stream.graphicObject.last().y) / 2)
204 self.graphicObject.last().save()
205 self.save()
206 return
208 def split_stream(self) -> "SimulationObject":
209 """
210 Splits a stream into two separate streams (one inlet and one outlet - disconnected).
211 :return: New Stream Object (outlet stream) if object is a stream.
212 """
213 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
214 from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject
216 new_stream = None
217 if self.objectType == SimulationObjectClass.Stream or SimulationObjectClass.EnergyStream or SimulationObjectClass.acStream: 217 ↛ 286line 217 didn't jump to line 286 because the condition on line 217 was always true
218 connectedPorts = self.connectedPorts.all()
219 inlet_port: Port = connectedPorts.get(direction=ConType.Inlet)
220 outlet_port: Port = connectedPorts.get(direction=ConType.Outlet)
221 new_stream = SimulationObjectFactory.create_stream_at_port(inlet_port)
222 new_stream.save()
223 # reset the stream to default position
224 coordinates = SimulationObjectFactory.default_stream_position(outlet_port.unitOp, outlet_port)
225 stream_graphic_object = self.graphicObject.last()
226 stream_graphic_object.x = coordinates['x'] - stream_graphic_object.width / 2
227 stream_graphic_object.y = coordinates['y'] - stream_graphic_object.height / 2
228 stream_graphic_object.save()
230 """
231 If a stream connects two groups together, it will have graphic objects in each of those groups.
232 This next section figures out which graphic object to keep, and which to move to the new stream.
233 """
234 all_graphic_objects = self.graphicObject.all()
235 default_graphic_object = new_stream.graphicObject.last() # a default graphic object is created by create_stream_at_port
236 # The graphic objects in these groups should be kept
237 groups_to_keep: List[int] = [g.id for g in outlet_port.unitOp.get_parent_groups()]
238 groups_to_move: List[int] = [g.id for g in inlet_port.unitOp.get_parent_groups()]
240 for gobj in all_graphic_objects:
241 if gobj.group.id in groups_to_keep:
242 # keep it here
243 if gobj.group.id in groups_to_move:
244 # move the default graphic object into this group, so both the inlet and outlet show in this group
245 default_graphic_object.group = gobj.group
246 default_graphic_object.save()
247 else:
248 if gobj.group.id not in groups_to_move: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true
249 print("Error: this should be in either ggroups to move or the other", gobj.group)
250 # must be in groups_to_move
251 # move graphic object to this stream
252 gobj.simulationObject = new_stream
253 gobj.save()
255 # make sure both the original and new streams are shown in all relevant groups
256 for graphic_object in self.graphicObject.all():
257 current_group = graphic_object.group
258 while current_group:
259 parent_group = current_group.get_parent_group()
260 if parent_group:
261 # make sure the original stream is shown in the parent group
262 if not self.graphicObject.filter(group=parent_group).exists(): 262 ↛ 263line 262 didn't jump to line 263 because the condition on line 262 was never true
263 GraphicObject.objects.create(
264 flowsheet=self.flowsheet,
265 simulationObject=self,
266 width=graphic_object.width,
267 height=graphic_object.height,
268 x=graphic_object.x,
269 y=graphic_object.y,
270 group=parent_group,
271 )
273 # make sure the new stream is shown in the parent group
274 if not new_stream.graphicObject.filter(group=parent_group).exists(): 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true
275 GraphicObject.objects.create(
276 flowsheet=self.flowsheet,
277 simulationObject=new_stream,
278 width=graphic_object.width,
279 height=graphic_object.height,
280 x=graphic_object.x,
281 y=graphic_object.y,
282 group=parent_group
283 )
284 current_group = parent_group
286 return new_stream
288 def merge_parallel_streams(self, connected_stream: "SimulationObject", decision_node=None,
289 coordinates=None) -> "SimulationObject":
290 """
291 Merges two streams that have the same direction (Both Inlet or Outlet)
292 :param connected_stream: Stream to connect
293 :param decision_node: Decision Node object to merges streams into (In the case of n-inlet / n-outlet)
294 :return: Decision Node Object
295 """
296 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
297 is_inlet = self.connectedPorts.first().direction == ConType.Inlet
298 direction = ConType.Outlet if is_inlet else ConType.Inlet
299 if decision_node is None:
300 modified_schema: ObjectType = configuration["decisionNode"].model_copy(deep=True)
301 if (is_inlet): 301 ↛ 305line 301 didn't jump to line 305 because the condition on line 301 was always true
302 modified_schema.ports["outlet"].default = 2
303 modified_schema.ports["inlet"].default = 2
304 else:
305 modified_schema.ports["inlet"].default = 2
306 modified_schema.ports["outlet"].default = 2
307 decision_node = SimulationObjectFactory.create_simulation_object(
308 coordinates={'x': self.graphicObject.last().x,
309 'y': self.graphicObject.last().y} if coordinates is None else coordinates,
310 objectType="decisionNode",
311 schema=modified_schema,
312 flowsheet=self.flowsheet,
313 create_attached_streams=False,
314 )
315 decision_node.graphicObject.last().rotation = self.connectedPorts.first().unitOp.graphicObject.last().rotation
316 decision_node.graphicObject.last().save()
317 ports = decision_node.ports.filter(direction=direction).all()
318 port1 = ports[0]
319 port2 = ports[1]
320 port1.stream = self
321 port2.stream = connected_stream
322 port1.save()
323 port2.save()
325 decision_node.save()
327 # Center GraphicObjects
328 self.horizontally_center_graphic()
329 connected_stream.horizontally_center_graphic()
330 self.save()
331 connected_stream.save()
333 return decision_node
335 def merge_stream(self, connectStream: "SimulationObject") -> "SimulationObject":
336 """
337 Connects this object (a stream) to another stream
338 :param connectedStream: Stream to connect to this object
339 :return: Decision Node Object
340 """
341 material_stream_1 = self
342 material_stream_2 = connectStream
344 material_stream_1_port = material_stream_1.connectedPorts.first()
345 material_stream_2_port = material_stream_2.connectedPorts.first()
347 # Case 1: Intermediate to Intermediate
348 if material_stream_1.connectedPorts.count() > 1 and material_stream_2.connectedPorts.count() > 1:
349 coordinates = {'x': material_stream_2.graphicObject.last().x, 'y': material_stream_2.graphicObject.last().y}
350 outlet_stream_1 = material_stream_1.split_stream()
351 outlet_stream_2 = material_stream_2.split_stream()
353 decision_node = outlet_stream_2.merge_parallel_streams(outlet_stream_1, coordinates=coordinates)
354 decision_node = material_stream_2.merge_parallel_streams(material_stream_1, decision_node)
356 update_decision_node_and_propagate(decision_node)
357 decision_node.save()
358 return decision_node
360 # Case 2: Connecting intermediate stream with product or feed
361 if material_stream_1.connectedPorts.count() > 1 or material_stream_2.connectedPorts.count() > 1:
362 inter_stream, connected_stream = (
363 (material_stream_1,
364 material_stream_2) if material_stream_1.connectedPorts.count() > material_stream_2.connectedPorts.count()
365 else (material_stream_2, material_stream_1)
366 )
367 connected_stream_port = connected_stream.connectedPorts.first()
368 is_inlet = (connected_stream_port.direction == ConType.Inlet)
370 if is_inlet:
371 # Feed -> Intermediate
372 decision_node = inter_stream.make_decision_node(num_inlets=1, num_outlets=2)
373 empty_port = decision_node.ports.filter(direction=ConType.Outlet, stream=None).first()
374 empty_port.stream = connected_stream
375 empty_port.save()
377 self.delete_empty_node(connected_stream, connected_stream_port)
379 # Center graphics
380 self.horizontally_center_graphic()
381 connected_stream.horizontally_center_graphic()
382 self.save()
383 connected_stream.save()
385 # Update compounds - First update decision node, then propagate
386 update_decision_node_and_propagate(decision_node)
387 return decision_node
388 else:
389 # Product -> Intermediate
390 decision_node = inter_stream.make_decision_node(num_inlets=2, num_outlets=1)
391 empty_port = decision_node.ports.filter(direction=ConType.Inlet, stream=None).first()
392 empty_port.stream = connected_stream
393 empty_port.save()
395 self.delete_empty_node(connected_stream, connected_stream_port)
397 # Center graphics
398 self.horizontally_center_graphic()
399 connected_stream.horizontally_center_graphic()
400 self.save()
401 connected_stream.save()
403 # Update compounds - First update decision node, then propagate
404 update_decision_node_and_propagate(decision_node)
405 return decision_node
407 # Case 3: Feed to feed or product to product
408 if material_stream_1_port.direction == material_stream_2_port.direction: 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true
409 result = material_stream_2.merge_parallel_streams(material_stream_1)
410 update_decision_node_and_propagate(result)
411 return result
413 # Case 4: Feed to product or product to feed
414 if material_stream_1_port.direction == "inlet":
415 inlet_stream, outlet_stream = material_stream_1, material_stream_2
416 inlet_port, outlet_port = material_stream_1_port, material_stream_2_port
417 else:
418 inlet_stream, outlet_stream = material_stream_2, material_stream_1
419 inlet_port, outlet_port = material_stream_2_port, material_stream_1_port
421 # Get all the graphic objects to preserve
423 # Always preserve the graphic of the stream that is being connected to
424 preserve_graphic = material_stream_2.graphicObject.last()
425 # Update compounds
426 if inlet_stream.objectType == SimulationObjectClass.EnergyStream: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true
427 pass
428 elif inlet_stream.objectType != SimulationObjectClass.acStream: 428 ↛ 431line 428 didn't jump to line 431 because the condition on line 428 was always true
429 update_compounds_on_merge(inlet_stream, outlet_stream)
431 def update_graphic_object_on_merge(preserve_stream, delete_stream) -> None:
432 # Preserve one graphic object for each group either stream is connected to
433 # If the stream is connected to multiple groups, it will be shown in both groups
434 preserve_stream_groups = list(preserve_stream.get_groups())
435 preserve_stream_gobjs = preserve_stream.graphicObject.all()
436 merged_gobjs = []
437 for gobj in delete_stream.graphicObject.all():
438 # Check if there's a preserved stream in this group
439 preserved_gobj = next((g for g in preserve_stream_gobjs if g.group == gobj.group), None)
440 if preserved_gobj is not None:
441 # Keep this if
442 if preserve_stream == material_stream_2:
443 # Keep this graphic object's position
444 gobj.delete()
445 else:
446 # Keep the position of the stream to be deleted
447 preserved_gobj.copy_position_from(gobj)
448 gobj.delete()
449 # If both streams are shown in multiple parent groups, we only want to show the
450 # stream at the lowest level. make a list of these merged graphicsObjects so we can remove
451 # the extras later.
452 merged_gobjs.append(preserved_gobj)
453 else:
454 # connect this graphic object to the preserved stream
455 gobj.simulationObject = preserve_stream
456 gobj.save()
458 # Of the merged graphics objects, only keep the one in the lowest group
459 parent_groups: list[int] = []
461 for gobj in merged_gobjs:
462 parent_group = gobj.group.get_parent_group()
463 if parent_group is not None:
464 parent_groups.append(parent_group.pk)
466 for gobj in merged_gobjs:
467 if gobj.group.pk in parent_groups:
468 gobj.delete()
470 def _collect_unique_groups(*group_iters) -> set["Grouping"]:
471 """Return a set of unique Grouping objects from one or more iterables.
473 This preserves iteration order only in the sense that groups seen earlier
474 are added first, but the return value is a set because callers expect
475 membership semantics rather than ordering.
476 """
477 unique = set()
478 for iterable in group_iters:
479 for g in iterable:
480 if g not in unique:
481 unique.add(g)
482 return unique
484 # get all groups that are groups of either stream
485 inlet_stream_groups = list(inlet_stream.get_groups())
486 outlet_stream_groups = list(outlet_stream.get_groups())
487 intermediate_groups: set["Grouping"] = _collect_unique_groups(inlet_stream_groups, outlet_stream_groups)
489 from flowsheetInternals.unitops.models.delete_factory import DeleteFactory
490 if not outlet_stream.has_recycle_connection and inlet_stream.has_path_to(outlet_stream):
492 # preserve the outlet stream
493 preserve_stream = outlet_stream
494 inlet_port.stream = preserve_stream
495 inlet_port.save()
496 DeleteFactory.delete_object(inlet_stream)
498 update_graphic_object_on_merge(preserve_stream, inlet_stream)
500 # attach recycle block to the inlet stream
501 # this also handles updating property access
502 outlet_stream.attach_recycle(intermediate_groups)
503 inlet_stream.delete_control_values()
505 outlet_stream.reevaluate_properties_enabled()
506 else:
507 # Preserve the outlet stream
508 preserve_stream = outlet_stream
509 inlet_port.stream = preserve_stream
510 inlet_port.save()
511 DeleteFactory.delete_object(inlet_stream)
512 # Update graphic object
513 update_graphic_object_on_merge(preserve_stream, inlet_stream)
515 return preserve_stream
517 def delete_control_values(self) -> None:
518 """
519 Deletes all control values connected to properties in this object
520 """
521 property_set: PropertySet = self.properties
522 for prop in property_set.ContainedProperties.all():
523 for value in prop.values.all():
524 if value.is_control_set_point():
525 value.controlSetPoint.delete()
527 def attach_recycle(self, intermediate_groups: set["Grouping"] | None = None) -> None:
528 """
529 Attaches a new recycle block to the stream
530 """
531 if self.has_recycle_connection: 531 ↛ 532line 531 didn't jump to line 532 because the condition on line 531 was never true
532 return # already has a recycle connection
533 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
534 from flowsheetInternals.graphicData.models.groupingModel import Grouping
535 from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject
537 recycle = SimulationObjectFactory.create_simulation_object(
538 objectType="recycle",
539 flowsheet=self.flowsheet,
540 coordinates={
541 'x': self.graphicObject.last().x + self.graphicObject.last().width / 2,
542 'y': self.graphicObject.last().y + self.graphicObject.last().height / 2 + 100
543 },
544 )
546 # Set graphic objects for the recycle block in the same groups as the stream
547 default_graphic = recycle.graphicObject.last()
548 default_width = default_graphic.width if default_graphic else 32
549 default_height = default_graphic.height if default_graphic else 32
551 # Remove all graphic objects associated with the recycle so we can re-add them to proper groups
552 recycle.graphicObject.all().delete()
554 # If caller didn't provide groups, use this stream's groups
555 if intermediate_groups is None:
556 intermediate_groups = set(self.get_groups())
557 else:
558 # ensure we have a set (in case a generator or queryset was passed)
559 intermediate_groups = set(intermediate_groups)
561 normalized_groups: set[Grouping] = set()
562 seen_group_ids: set[int] = set()
564 # Normalise groups to Grouping objects and remove duplicates
565 for group in intermediate_groups:
566 group_obj = None
567 if isinstance(group, Grouping): 567 ↛ 569line 567 didn't jump to line 569 because the condition on line 567 was always true
568 group_obj = group
569 elif group is not None:
570 group_obj = Grouping.objects.filter(pk=group).first()
572 if group_obj and group_obj.pk not in seen_group_ids: 572 ↛ 565line 572 didn't jump to line 565 because the condition on line 572 was always true
573 normalized_groups.add(group_obj)
574 seen_group_ids.add(group_obj.pk)
576 # Create graphic objects for each group
577 for group in normalized_groups:
578 stream_graphic = self.graphicObject.filter(group=group).last()
579 if stream_graphic is None:
580 continue
582 GraphicObject.objects.create(
583 flowsheet=self.flowsheet,
584 simulationObject=recycle,
585 width=default_width,
586 height=default_height,
587 x=stream_graphic.x + (stream_graphic.width - default_width) / 2,
588 y=stream_graphic.y + stream_graphic.height + 30,
589 group=group,
590 )
592 recycle.recycleData.update(self)
593 recycle.save()
595 def has_path_to(self, end_stream: "SimulationObject", check_recycles=True) -> bool:
596 """
597 Checks if there is a path in the flowsheet from the start stream (self) to the end stream
598 Can be used to check for loops in the flowsheet if these two streams are being merged
600 - param end_stream: The stream to check if there is a path to (from self)
601 - param check_recycles: If True, will skip the path if a stream has a recycle connection
602 """
603 remaining_streams = [self]
604 while remaining_streams:
605 current_stream = remaining_streams.pop()
606 if current_stream == end_stream:
607 # loop detected
608 return True
609 unit_op_port = current_stream.connectedPorts.filter(direction=ConType.Inlet).first()
610 if not unit_op_port:
611 continue
612 unit_op = unit_op_port.unitOp
613 # get the outlet ports of the unitop
614 connected_port_keys = get_connected_port_keys(unit_op_port.key, unit_op.schema)
615 outlet_ports = unit_op.ports.filter(
616 direction=ConType.Outlet,
617 key__in=connected_port_keys
618 )
619 for outlet_port in outlet_ports:
620 outlet_stream = outlet_port.stream
621 if outlet_stream is not None: 621 ↛ 619line 621 didn't jump to line 619 because the condition on line 621 was always true
622 if check_recycles and outlet_stream.has_recycle_connection: 622 ↛ 623line 622 didn't jump to line 623 because the condition on line 622 was never true
623 continue
624 remaining_streams.append(outlet_stream)
625 return False
627 def make_decision_node(self, num_inlets, num_outlets):
628 """
629 Turns a stream into a decision node with n inlet and m outlet ports
630 :param num_inlets: number of inlet ports to create
631 :param num_outlets: number of outlet ports to create
632 :return: Decision Node object
633 """
634 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
635 if self.objectType == SimulationObjectClass.Stream: 635 ↛ exitline 635 didn't return from function 'make_decision_node' because the condition on line 635 was always true
637 # Create Decision Node
638 modified_schema: ObjectType = configuration["decisionNode"].model_copy(deep=True)
639 modified_schema.ports["inlet"].default = num_inlets
640 modified_schema.ports["outlet"].default = num_outlets
641 graphicObject = self.graphicObject.last() # For now, we are assuming this only has one graphicObject.
642 parentGroup = self.graphicObject.last().group.id
643 decision_node = SimulationObjectFactory.create_simulation_object(
644 coordinates={'x': graphicObject.x, 'y': graphicObject.y},
645 objectType="decisionNode",
646 schema=modified_schema,
647 flowsheet=self.flowsheet,
648 create_attached_streams=False,
649 parentGroup=parentGroup,
650 )
652 if self.connectedPorts.count() > 1:
653 # Connect both streams to Decision Node
654 ms_outlet_port = self.connectedPorts.filter(direction=ConType.Inlet).first()
655 dn_outlet_port = decision_node.ports.filter(direction=ConType.Outlet).first()
656 dn_inlet_port = decision_node.ports.filter(direction=ConType.Inlet).first()
657 new_stream = SimulationObjectFactory.create_stream_at_port(port=dn_outlet_port)
658 ms_outlet_port.stream = new_stream
659 dn_inlet_port.stream = self
661 # Save Objects
662 ms_outlet_port.save()
663 dn_outlet_port.save()
664 dn_inlet_port.save()
665 decision_node.save()
667 # Center GraphicObjects
668 self.horizontally_center_graphic()
669 new_stream.horizontally_center_graphic()
670 self.save()
671 new_stream.save()
672 else:
673 # For now, only dealing with the case that a regular stream is initialized with one port
674 port = decision_node.ports.first()
675 port.stream = self
676 port.save()
677 self.horizontally_center_graphic()
678 self.save()
679 self.save()
680 decision_node.save()
681 # handle compounds for decision node
682 update_decision_node_and_propagate(decision_node, updated_via_right_click=True)
683 return decision_node
685 def update_compounds(self, compounds: list[str]) -> None:
686 """
687 Updates the compounds for this stream
688 """
689 update_compounds_on_set(self, compounds)
691 def add_port(self, key: str, existing_stream: "SimulationObject | None" = None) -> Port:
692 """
693 Adds a port to this object and adds a a new stream if none is provided
694 """
695 from flowsheetInternals.graphicData.logic.make_group import propagate_streams
696 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
697 # Create ports
698 ports_schema = self.schema.ports
700 # replace any many=True ports with multiple ports
701 port_dict = ports_schema[key]
702 # get the next index for the port (looks at all ports with the same key and gets the length of the l
703 next_index = self.ports.filter(key=key).count()
704 new_port = Port(
705 key=key,
706 index=next_index,
707 direction=port_dict.type,
708 displayName=port_dict.displayName + f" {next_index + 1}",
709 unitOp=self,
710 flowsheet=self.flowsheet
711 )
712 new_port.save()
714 self.update_height()
716 if existing_stream:
717 # connect existing stream to the new port
718 new_port.stream = existing_stream
719 new_port.save() # might have to remove this trying to figure out why other streams are being removed
721 if self.objectType == "decisionNode": 721 ↛ 795line 721 didn't jump to line 795 because the condition on line 721 was always true
722 # Use existing function to update decision node compounds
723 update_decision_node_and_propagate(self)
724 else:
725 # create a new strea at the port using the factory
726 new_stream = SimulationObjectFactory.create_stream_at_port(new_port)
727 update_compounds_on_add_stream(new_port, new_stream)
728 outlet_name = self.schema.splitter_fraction_name
729 for property_key, property in self.schema.properties.items():
730 if property.indexSets is not None and (
731 'splitter_fraction' in property.indexSets) and new_port.direction == ConType.Outlet:
732 property_infos = self.properties.ContainedProperties.all()
733 property_info = property_infos.first()
734 new_indexed_item = IndexedItem.objects.create(
735 owner=self,
736 key="outlet_" + f"{next_index + 1}",
737 displayName=outlet_name + f" {next_index + 1}",
738 type="splitter_fraction",
739 flowsheet=self.flowsheet,
740 )
742 # get indexed set for split_fraction or priorities:
743 index_sets = property.indexSets
745 # figure out which other indexed items this should link to
746 indexed_item_sets: List[List[IndexedItem]] = []
747 for index in index_sets:
748 if index == IndexChoices.SplitterFraction:
749 continue # We don't need this, it's the one we're editing
750 indexed_items: List[IndexedItem] = self.get_indexed_items(
751 index) # e.g get_indexed_items("phase"), "compound", etc
752 indexed_item_sets.append(indexed_items)
754 # create a property value for each combination of indexed items, for the new outlet
755 combinations = list(itertools.product(*indexed_item_sets))
757 # Bulk update existing property values to be enabled
758 property_status = False
759 if self.objectType == "header" or self.objectType == "simple_header":
760 property_status = True
761 else:
762 property_info.values.update(enabled=True)
764 # bulk create indexed items (all disabled as it's the last outlet)
765 index_links_to_create: List[PropertyValueIntermediate] = []
766 for combination in combinations:
767 # Create a propertyValue for this combination
768 combo_property_value = PropertyValue.objects.create(property=property_info,
769 enabled=property_status,
770 flowsheet=self.flowsheet)
771 # link it up to the new outlet
772 index_links_to_create.append(
773 PropertyValueIntermediate(propertyvalue=combo_property_value, indexeditem=new_indexed_item)
774 )
776 # Also link it up to all the other sets
777 for indexed_item in combination:
778 index_links_to_create.append(
779 PropertyValueIntermediate(propertyvalue=combo_property_value, indexeditem=indexed_item)
780 )
782 # Bulk create the links
783 PropertyValueIntermediate.objects.bulk_create(index_links_to_create)
785 new_stream.save()
786 # Rotate the new stream to match the rotation of the unitop it is attached to
787 self.update_stream_rotation(new_stream, self)
789 # Ensure the new stream appears in all parent groups
790 if new_port.direction == ConType.Inlet:
791 propagate_streams([new_stream], ConType.Inlet)
792 elif new_port.direction == ConType.Outlet: 792 ↛ 795line 792 didn't jump to line 795 because the condition on line 792 was always true
793 propagate_streams([new_stream], ConType.Outlet)
795 return new_port
797 def update_height(self):
798 """
799 Updates the height of the graphic object based on the number of ports
800 """
801 if self.schema.graphicObject.autoHeight: # e.g header has auto height calculation
802 OFFSET_VALUE = 200
804 max_ports = max(self.ports.filter(direction=ConType.Inlet).count(),
805 self.ports.filter(direction=ConType.Outlet).count())
806 rotation = self.graphicObject.last().rotation
808 graphic_object: GraphicObject = self.graphicObject.last()
809 new_length = max_ports * OFFSET_VALUE
810 obj_width = graphic_object.width
811 obj_height = graphic_object.height
813 # Update graphic object position to keep the other streams centered
814 if rotation == 90:
815 if new_length > obj_width:
816 self.graphicObject.update(x=graphic_object.x - OFFSET_VALUE)
817 elif new_length < obj_width:
818 self.graphicObject.update(x=graphic_object.x + OFFSET_VALUE)
819 elif rotation == 180:
820 if new_length > obj_height:
821 self.graphicObject.update(y=graphic_object.y - OFFSET_VALUE)
822 elif new_length < obj_height:
823 self.graphicObject.update(y=graphic_object.y + OFFSET_VALUE)
825 # Update the correct dimension based on rotation
826 if rotation in [90, 270]:
827 self.graphicObject.update(width=new_length)
828 else:
829 self.graphicObject.update(height=new_length)
831 def update_stream_rotation(self, stream: "SimulationObject", unitop: "SimulationObject") -> None:
832 """
833 Updates the rotation of the stream based on the rotation of the unitop it is attached to.
834 """
835 graphic_object = stream.graphicObject.last()
837 if graphic_object is None: 837 ↛ 838line 837 didn't jump to line 838 because the condition on line 837 was never true
838 return
840 # Update the rotation of the stream to match the rotation of the unitop
841 graphic_object.rotation = unitop.graphicObject.last().rotation
842 graphic_object.flipped = unitop.graphicObject.last().flipped
843 graphic_object.save()
845 def get_indexed_items(self, index_set_type: IndexChoices) -> List[IndexedItem]:
846 match index_set_type:
847 case IndexChoices.Phase:
848 # get all the indexedItems that are type=phase
849 items = IndexedItem.objects.filter(owner=self, type=IndexChoices.Phase).all()
850 return items
851 case IndexChoices.Compound: 851 ↛ 854line 851 didn't jump to line 854 because the pattern on line 851 always matched
852 items = IndexedItem.objects.filter(owner=self, type=IndexChoices.Compound).all()
853 return items
854 case _:
855 raise ValueError("Get_indexed_items didn't expect this index set type")
857 def merge_decision_nodes(self, decision_node_active: "SimulationObject", decision_node_over: "SimulationObject") -> \
858 Optional["SimulationObject"]:
859 """
860 Merges this decision node with the over decision node and handles graphics positioning
861 """
862 import flowsheetInternals.unitops.models.delete_factory as DeleteFactory
864 # Get all streams from active node
865 inlet_streams = decision_node_active.ports.filter(direction="inlet").all()
866 outlet_streams = decision_node_active.ports.filter(direction="outlet").all()
868 # Transfer and reposition inlet streams
869 for inlet_port in inlet_streams:
870 stream = inlet_port.stream
871 # Add stream to new decision node
872 decision_node_over.add_port("inlet", stream)
874 # Update compounds and propagate
875 update_decision_node_and_propagate(decision_node_over)
877 # Transfer and reposition outlet streams
878 for outlet_port in outlet_streams:
879 stream = outlet_port.stream
880 # Add stream to new decision node
881 decision_node_over.add_port("outlet", stream)
883 # Delete the active node and its graphic object
884 DeleteFactory.delete_object(decision_node_active)
885 decision_node_over.save()
886 return decision_node_over
888 def reevaluate_properties_enabled(self) -> None:
889 """
890 Reevaluates property access for all properties in this object
892 This should only really be called when connections are changed,
893 otherwise the property enabling should be handled by adding
894 or removing control values.
895 """
896 if (self.objectType == SimulationObjectClass.MachineLearningBlock):
897 return # We don't want to change from the defaults
898 properties: list[PropertyInfo] = self.properties.ContainedProperties.all()
899 config = self.schema
900 config_properties = config.properties
901 config_groups = config.propertySetGroups
903 def _eval_enabled(prop: PropertyInfo, config_group) -> bool:
904 if self.is_stream():
905 # disable outlet/intermediate stream properties
906 ports = self.connectedPorts.all()
907 if len(ports) == 2 or (len(ports) == 1 and ports[0].direction == "outlet"):
908 return False
909 #inlet stream properties should be enabled
910 if config_group.type == "stateVars":
911 state_vars = getattr(config_group, "stateVars") or ()
912 return prop.key in state_vars
913 return True # eg. All, default to enabled
915 list_prop_val = []
918 for prop in properties:
919 config_prop = config_properties.get(prop.key)
920 #ignore custom properties
921 if config_prop:
922 group = config_prop.propertySetGroup
923 config_group = config_groups.get(group, None)
924 res = _eval_enabled(prop, config_group)
925 list_prop_val.extend(prop.enable(res))
929 PropertyValue.objects.bulk_update(list_prop_val, ["enabled"])
931 def get_unspecified_properties(self) -> list:
932 if not hasattr(self, "properties") or not self.properties: 932 ↛ 933line 932 didn't jump to line 933 because the condition on line 932 was never true
933 return []
935 contained_properties: models.QuerySet[PropertyInfo] = self.properties.ContainedProperties.all()
936 is_splitter = getattr(self.schema, "displayType", "").lower() == "splitter"
938 # use schema stateVars to find required properties
939 required_properties: Set[str] = set()
940 for group in self.schema.propertySetGroups.values():
941 if group.toggle and not self.properties.get_property(group.toggle).get_value():
942 # The group is toggled off, so we don't care that the properties are not specified
943 continue
944 if group.type == "stateVars" or group.type == "composition" or group.type == "exceptLast":
945 required_properties.update(group.stateVars)
947 # use pre fetched data to avoid additional queries
948 unspecified_properties = []
949 property_info: PropertyInfo
950 for property_info in contained_properties:
951 # check if the property has no values or if all values are invalid
952 has_valid_value = property_info.isSpecified()
953 if not has_valid_value and property_info.key in required_properties:
954 unspecified_properties.append(property_info.key)
956 # check if mole_frac_comp sums to 1
957 mole_frac_props = [x for x in contained_properties if x.key == "mole_frac_comp"]
959 for prop in mole_frac_props:
960 if prop.has_value_bulk():
961 total = 0.0
962 for value in prop.values.all():
963 try:
964 val = float(value.value)
965 total += val
966 except (ValueError, TypeError):
967 continue
968 if abs(total - 1.0) > 0.001 and "mole_frac_comp" not in unspecified_properties:
969 unspecified_properties.append("mole_frac_comp")
970 else:
971 if "mole_frac_comp" not in unspecified_properties:
972 unspecified_properties.append("mole_frac_comp")
974 return unspecified_properties
976 def delete(self, *args, **kwargs):
977 raise NotImplementedError("Use delete_object method from DeleteFactory")
979 def permanently_delete(self, *args, **kwargs):
980 """
981 Permanently deletes the object from the database.
982 This should only be used in tests or when you are sure you want to delete the object.
983 """
984 super().delete(*args, **kwargs)
986 def delete_empty_node(self, connected_stream: "SimulationObject", connected_stream_port: "Port"):
987 """
988 Deletes the empty_port stream from parent groups when creating a decision node inside a group.
989 :param connected_stream: Stream connected to the empty port
990 :param connected_stream_port: Port of the connected stream
991 """
992 simulation_object_id = connected_stream.id
993 current_group = connected_stream_port.unitOp.get_group()
994 parent_groups = connected_stream_port.unitOp.get_parent_groups()
996 for parent_group in parent_groups:
997 if parent_group != current_group:
998 gobjs_in_group = connected_stream.graphicObject.filter(group=parent_group)
999 for graphic_object in gobjs_in_group:
1000 if graphic_object.simulationObject.id == simulation_object_id: 1000 ↛ 999line 1000 didn't jump to line 999 because the condition on line 1000 was always true
1001 graphic_object.delete()