Coverage for backend/django/flowsheetInternals/unitops/models/SimulationObject.py: 87%
548 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1from django.db import models
2from django.db.models import QuerySet
3from core.managers import SoftDeleteManager
4from core.auxiliary.enums.unitOpData import SimulationObjectClass
5from core.auxiliary.models.Flowsheet import Flowsheet
6from core.auxiliary.models.PropertySet import PropertySet
7from core.auxiliary.models.PropertyInfo import PropertyInfo
8from typing import TYPE_CHECKING
9from core.auxiliary.models.PropertyValue import PropertyValue, PropertyValueIntermediate
10from core.auxiliary.models.IndexedItem import IndexedItem, IndexChoices
11from typing import Set
13from flowsheetInternals.unitops.models.Port import Port
14from typing import Iterable
15from flowsheetInternals.unitops.config.config_methods import *
16from common.config_types import *
17import itertools
19from .compound_propogation import update_compounds_on_set, update_compounds_on_merge, _get_compound_keys, \
20 update_decision_node_and_propagate, update_compounds_on_add_stream
21from typing import Optional, List
22from ..methods.add_expression import add_expression as _add_expression
24if TYPE_CHECKING:
25 from core.auxiliary.models.PropertySet import PropertySet
26 from core.auxiliary.models.PropertyInfo import PropertyInfo
27 from core.auxiliary.models.RecycleData import RecycleData
28 from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject
29 from flowsheetInternals.graphicData.models.groupingModel import Grouping
30 from flowsheetInternals.unitops.models.Port import Port
31 from core.auxiliary.models.CustomPropertyPackage import CustomPropertyPackage
34class SimulationObject(models.Model):
35 flowsheet = models.ForeignKey(Flowsheet, on_delete=models.CASCADE, related_name="flowsheetObjects")
36 componentName = models.CharField(max_length=64)
37 objectType = models.CharField(choices=SimulationObjectClass.choices)
39 created_at = models.DateTimeField(auto_now_add=True)
40 is_deleted = models.BooleanField(default=False)
41 initial_values = models.JSONField(null=True, blank=True)
42 propertyPackageType = models.CharField(max_length=64, default="helmholtz")
43 customPackage = models.ForeignKey["CustomPropertyPackage"]("core_auxiliary.CustomPropertyPackage", on_delete=models.SET_NULL, null=True, blank=True, related_name="simulationObjects")
45 # add a soft delete manager
46 objects = SoftDeleteManager()
47 add_expression = _add_expression
48 # runtime-accessed attributes
49 properties: "PropertySet"
50 graphicObject: "GraphicObject"
51 ports: QuerySet["Port"]
52 connectedPorts: QuerySet["Port"]
53 recycleConnection: "Optional[RecycleData]"
54 recycleData: "Optional[RecycleData]"
56 @property
57 def schema(self) -> ObjectType:
58 return get_object_schema(self)
60 @property
61 def has_recycle_connection(self) -> bool:
62 return hasattr(self, "recycleConnection")
64 def is_stream(self) -> bool:
65 return self.schema.is_stream
67 def get_stream(self, key: str, index: int = 0) -> "SimulationObject":
68 """
69 Returns the stream attached to the port with the given key
70 """
71 port = self.get_port(key, index)
72 stream = port.stream
73 return stream
75 def get_group(self) -> "Grouping":
76 """
77 Returns the group that this object belongs to
78 """
79 if not self.is_stream(): 79 ↛ 84line 79 didn't jump to line 84 because the condition on line 79 was always true
80 # There is only one graphic object, so we can just return the group
81 return self.graphicObject.last().group
82 else:
83 # Streams arent' really in a group, throw an error
84 raise ValueError("Streams do not belong to a group")
86 def get_groups(self) -> Iterable["Grouping"]:
87 """
88 Returns an iterable of groups that this object belongs to. Mostly needed for streams that have many groups.
89 """
90 return (graphic.group for graphic in self.graphicObject.all())
92 def get_parent_groups(self) -> List["Grouping"]:
93 """
94 Returns an iterable of parent groups that this object belongs to.
95 """
96 parent_groups = []
97 current_group = self.get_group()
98 while current_group:
99 parent_groups.append(current_group)
100 simulationObject = current_group.simulationObject
101 if simulationObject is None: 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true
102 break
103 current_group = simulationObject.get_group()
104 return parent_groups
106 def get_property_package(self, name: str | None = None):
107 """
108 Returns the property package slot with the given name
109 """
110 return self.propertyPackageType
112 def set_property_package(self, property_package, name: str | None = None) -> None:
113 """
114 Sets the property package for this object
115 """
116 self.propertyPackageType = property_package
117 self.save()
119 def get_port(self, key: str, index: int = 0) -> Port:
120 """
121 Returns the port with the given key
122 """
123 try:
125 port: Port = self.ports.get(key=key, index=index)
126 return port
127 except Port.DoesNotExist:
128 raise ValueError(f"Port with key {key} does not exist on object {self.componentName}")
130 def reorder_object_ports(self):
131 """
132 Reorders port mappings by connected unit operation y position
133 :return: None
134 """
135 inlet_connections = [port for port in self.ports.filter(direction=ConType.Inlet).all()]
136 outlet_connections = [port for port in self.ports.filter(direction=ConType.Outlet).all()]
137 inlet_connections.sort(
138 key=lambda port: port.stream.connectedPorts.get(direction=ConType.Outlet).unitOp.graphicObject.last().y)
139 outlet_connections.sort(
140 key=lambda port: port.stream.connectedPorts.get(direction=ConType.Inlet).unitOp.graphicObject.last().y)
141 for i, port in enumerate(inlet_connections):
142 port.index = i
143 port.save()
144 for i, port in enumerate(outlet_connections):
145 port.index = i
146 port.save()
147 self.save()
149 def horizontally_center_graphic(self) -> None:
150 """
151 Horizontally centers GraphicObject for a stream or unit operation
152 Currently uni-directional - left to right connections
153 Centers based on inlet and outlet graphics.
154 :return: None
155 """
156 # TODO: Make this work as appropriate for streams with multiple graphic objects
157 # Is an Intermediate Stream
158 if self.objectType == SimulationObjectClass.Stream or SimulationObjectClass.EnergyStream or SimulationObjectClass.acStream and self.connectedPorts.count() > 1: 158 ↛ 171line 158 didn't jump to line 171 because the condition on line 158 was always true
159 inlet, outlet = self.connectedPorts.filter(direction=ConType.Outlet).first(), self.connectedPorts.filter(
160 direction=ConType.Inlet).first()
161 inlet_x, outlet_x = inlet.unitOp.graphicObject.last().x, outlet.unitOp.graphicObject.last().x
162 if inlet_x <= outlet_x:
163 self.graphicObject.last().x = abs(
164 (inlet_x + (inlet.unitOp.graphicObject.last().width)) + (outlet_x)) / 2
165 else:
166 self.graphicObject.last().x = abs(inlet_x + (outlet_x + outlet.unitOp.graphicObject.last().width)) / 2
167 # Flip Graphic Object horizontally
168 self.graphicObject.last().save()
169 self.save()
170 else:
171 inlet, outlet = self.ports.filter(direction=ConType.Inlet).first(), self.ports.filter(
172 direction=ConType.Outlet).first()
173 self.graphicObject.last().x = abs(
174 (inlet.stream.graphicObject.last().x + outlet.stream.graphicObject.last().x) / 2)
175 self.graphicObject.last().save()
176 self.save()
177 return
179 def vertically_center_graphic(self) -> None:
180 """
181 Vertically centers GraphicObject for a stream or unit operation
182 :return: None
183 """
184 # TODO: Make this work as appropriate for streams with multiple graphic objects
185 # Is an intermediate stream
186 if self.objectType == SimulationObjectClass.Stream or SimulationObjectClass.EnergyStream or SimulationObjectClass.acStream and self.connectedPorts.count() > 1:
187 inlet, outlet = self.connectedPorts.filter(direction=ConType.Inlet).first(), self.connectedPorts.filter(
188 direction=ConType.Outlet).first()
189 self.graphicObject.last().y = abs(
190 (inlet.unitOp.graphicObject.last().y + outlet.unitOp.graphicObject.last().y) / 2)
191 self.graphicObject.last().save()
192 self.save()
193 else:
194 inlet, outlet = self.ports.filter(direction=ConType.Inlet).first(), \
195 self.ports.filter(direction=ConType.Outlet).all()[1]
196 self.graphicObject.last().y = abs(
197 (inlet.stream.graphicObject.last().y + outlet.stream.graphicObject.last().y) / 2)
198 self.graphicObject.last().save()
199 self.save()
200 return
202 def split_stream(self) -> "SimulationObject":
203 """
204 Splits a stream into two separate streams (one inlet and one outlet - disconnected).
205 :return: New Stream Object (outlet stream) if object is a stream.
206 """
207 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
208 from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject
210 new_stream = None
211 if self.objectType == SimulationObjectClass.Stream or SimulationObjectClass.EnergyStream or SimulationObjectClass.acStream: 211 ↛ 280line 211 didn't jump to line 280 because the condition on line 211 was always true
212 connectedPorts = self.connectedPorts.all()
213 inlet_port: Port = connectedPorts.get(direction=ConType.Inlet)
214 outlet_port: Port = connectedPorts.get(direction=ConType.Outlet)
215 new_stream = SimulationObjectFactory.create_stream_at_port(inlet_port)
216 new_stream.save()
217 # reset the stream to default position
218 coordinates = SimulationObjectFactory.default_stream_position(outlet_port.unitOp, outlet_port)
219 stream_graphic_object = self.graphicObject.last()
220 stream_graphic_object.x = coordinates['x'] - stream_graphic_object.width / 2
221 stream_graphic_object.y = coordinates['y'] - stream_graphic_object.height / 2
222 stream_graphic_object.save()
224 """
225 If a stream connects two groups together, it will have graphic objects in each of those groups.
226 This next section figures out which graphic object to keep, and which to move to the new stream.
227 """
228 all_graphic_objects = self.graphicObject.all()
229 default_graphic_object = new_stream.graphicObject.last() # a default graphic object is created by create_stream_at_port
230 # The graphic objects in these groups should be kept
231 groups_to_keep: List[int] = [g.id for g in outlet_port.unitOp.get_parent_groups()]
232 groups_to_move: List[int] = [g.id for g in inlet_port.unitOp.get_parent_groups()]
234 for gobj in all_graphic_objects:
235 if gobj.group.id in groups_to_keep:
236 # keep it here
237 if gobj.group.id in groups_to_move:
238 # move the default graphic object into this group, so both the inlet and outlet show in this group
239 default_graphic_object.group = gobj.group
240 default_graphic_object.save()
241 else:
242 if gobj.group.id not in groups_to_move: 242 ↛ 243line 242 didn't jump to line 243 because the condition on line 242 was never true
243 print("Error: this should be in either ggroups to move or the other", gobj.group)
244 # must be in groups_to_move
245 # move graphic object to this stream
246 gobj.simulationObject = new_stream
247 gobj.save()
249 # make sure both the original and new streams are shown in all relevant groups
250 for graphic_object in self.graphicObject.all():
251 current_group = graphic_object.group
252 while current_group:
253 parent_group = current_group.get_parent_group()
254 if parent_group:
255 # make sure the original stream is shown in the parent group
256 if not self.graphicObject.filter(group=parent_group).exists(): 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true
257 GraphicObject.objects.create(
258 flowsheet=self.flowsheet,
259 simulationObject=self,
260 width=graphic_object.width,
261 height=graphic_object.height,
262 x=graphic_object.x,
263 y=graphic_object.y,
264 group=parent_group,
265 )
267 # make sure the new stream is shown in the parent group
268 if not new_stream.graphicObject.filter(group=parent_group).exists(): 268 ↛ 269line 268 didn't jump to line 269 because the condition on line 268 was never true
269 GraphicObject.objects.create(
270 flowsheet=self.flowsheet,
271 simulationObject=new_stream,
272 width=graphic_object.width,
273 height=graphic_object.height,
274 x=graphic_object.x,
275 y=graphic_object.y,
276 group=parent_group
277 )
278 current_group = parent_group
280 return new_stream
282 def merge_parallel_streams(self, connected_stream: "SimulationObject", decision_node=None,
283 coordinates=None) -> "SimulationObject":
284 """
285 Merges two streams that have the same direction (Both Inlet or Outlet)
286 :param connected_stream: Stream to connect
287 :param decision_node: Decision Node object to merges streams into (In the case of n-inlet / n-outlet)
288 :return: Decision Node Object
289 """
290 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
291 is_inlet = self.connectedPorts.first().direction == ConType.Inlet
292 direction = ConType.Outlet if is_inlet else ConType.Inlet
293 if decision_node is None:
294 modified_schema: ObjectType = configuration["decisionNode"].model_copy(deep=True)
295 if (is_inlet): 295 ↛ 299line 295 didn't jump to line 299 because the condition on line 295 was always true
296 modified_schema.ports["outlet"].default = 2
297 modified_schema.ports["inlet"].default = 2
298 else:
299 modified_schema.ports["inlet"].default = 2
300 modified_schema.ports["outlet"].default = 2
301 decision_node = SimulationObjectFactory.create_simulation_object(
302 coordinates={'x': self.graphicObject.last().x,
303 'y': self.graphicObject.last().y} if coordinates is None else coordinates,
304 objectType="decisionNode",
305 schema=modified_schema,
306 flowsheet=self.flowsheet,
307 create_attached_streams=False,
308 )
309 decision_node.graphicObject.last().rotation = self.connectedPorts.first().unitOp.graphicObject.last().rotation
310 decision_node.graphicObject.last().save()
311 ports = decision_node.ports.filter(direction=direction).all()
312 port1 = ports[0]
313 port2 = ports[1]
314 port1.stream = self
315 port2.stream = connected_stream
316 port1.save()
317 port2.save()
319 decision_node.save()
321 # Center GraphicObjects
322 self.horizontally_center_graphic()
323 connected_stream.horizontally_center_graphic()
324 self.save()
325 connected_stream.save()
327 return decision_node
329 def merge_stream(self, connectStream: "SimulationObject") -> "SimulationObject":
330 """
331 Connects this object (a stream) to another stream
332 :param connectedStream: Stream to connect to this object
333 :return: Decision Node Object
334 """
335 material_stream_1 = self
336 material_stream_2 = connectStream
338 material_stream_1_port = material_stream_1.connectedPorts.first()
339 material_stream_2_port = material_stream_2.connectedPorts.first()
341 # Case 1: Intermediate to Intermediate
342 if material_stream_1.connectedPorts.count() > 1 and material_stream_2.connectedPorts.count() > 1:
343 coordinates = {'x': material_stream_2.graphicObject.last().x, 'y': material_stream_2.graphicObject.last().y}
344 outlet_stream_1 = material_stream_1.split_stream()
345 outlet_stream_2 = material_stream_2.split_stream()
347 decision_node = outlet_stream_2.merge_parallel_streams(outlet_stream_1, coordinates=coordinates)
348 decision_node = material_stream_2.merge_parallel_streams(material_stream_1, decision_node)
350 update_decision_node_and_propagate(decision_node)
351 decision_node.save()
352 return decision_node
354 # Case 2: Connecting intermediate stream with product or feed
355 if material_stream_1.connectedPorts.count() > 1 or material_stream_2.connectedPorts.count() > 1:
356 inter_stream, connected_stream = (
357 (material_stream_1,
358 material_stream_2) if material_stream_1.connectedPorts.count() > material_stream_2.connectedPorts.count()
359 else (material_stream_2, material_stream_1)
360 )
361 connected_stream_port = connected_stream.connectedPorts.first()
362 is_inlet = (connected_stream_port.direction == ConType.Inlet)
364 if is_inlet:
365 # Feed -> Intermediate
366 decision_node = inter_stream.make_decision_node(num_inlets=1, num_outlets=2)
367 empty_port = decision_node.ports.filter(direction=ConType.Outlet, stream=None).first()
368 empty_port.stream = connected_stream
369 empty_port.save()
371 self.delete_empty_node(connected_stream, connected_stream_port)
373 # Center graphics
374 self.horizontally_center_graphic()
375 connected_stream.horizontally_center_graphic()
376 self.save()
377 connected_stream.save()
379 # Update compounds - First update decision node, then propagate
380 update_decision_node_and_propagate(decision_node)
381 return decision_node
382 else:
383 # Product -> Intermediate
384 decision_node = inter_stream.make_decision_node(num_inlets=2, num_outlets=1)
385 empty_port = decision_node.ports.filter(direction=ConType.Inlet, stream=None).first()
386 empty_port.stream = connected_stream
387 empty_port.save()
389 self.delete_empty_node(connected_stream, connected_stream_port)
391 # Center graphics
392 self.horizontally_center_graphic()
393 connected_stream.horizontally_center_graphic()
394 self.save()
395 connected_stream.save()
397 # Update compounds - First update decision node, then propagate
398 update_decision_node_and_propagate(decision_node)
399 return decision_node
401 # Case 3: Feed to feed or product to product
402 if material_stream_1_port.direction == material_stream_2_port.direction: 402 ↛ 403line 402 didn't jump to line 403 because the condition on line 402 was never true
403 result = material_stream_2.merge_parallel_streams(material_stream_1)
404 update_decision_node_and_propagate(result)
405 return result
407 # Case 4: Feed to product or product to feed
408 if material_stream_1_port.direction == "inlet":
409 inlet_stream, outlet_stream = material_stream_1, material_stream_2
410 inlet_port, outlet_port = material_stream_1_port, material_stream_2_port
411 else:
412 inlet_stream, outlet_stream = material_stream_2, material_stream_1
413 inlet_port, outlet_port = material_stream_2_port, material_stream_1_port
415 # Get all the graphic objects to preserve
417 # Always preserve the graphic of the stream that is being connected to
418 preserve_graphic = material_stream_2.graphicObject.last()
419 # Update compounds
420 if inlet_stream.objectType == SimulationObjectClass.EnergyStream: 420 ↛ 421line 420 didn't jump to line 421 because the condition on line 420 was never true
421 pass
422 elif inlet_stream.objectType != SimulationObjectClass.acStream: 422 ↛ 425line 422 didn't jump to line 425 because the condition on line 422 was always true
423 update_compounds_on_merge(inlet_stream, outlet_stream)
425 def update_graphic_object_on_merge(preserve_stream, delete_stream) -> None:
426 # Preserve one graphic object for each group either stream is connected to
427 # If the stream is connected to multiple groups, it will be shown in both groups
428 preserve_stream_groups = list(preserve_stream.get_groups())
429 preserve_stream_gobjs = preserve_stream.graphicObject.all()
430 merged_gobjs = []
431 for gobj in delete_stream.graphicObject.all():
432 # Check if there's a preserved stream in this group
433 preserved_gobj = next((g for g in preserve_stream_gobjs if g.group == gobj.group), None)
434 if preserved_gobj is not None:
435 # Keep this if
436 if preserve_stream == material_stream_2:
437 # Keep this graphic object's position
438 gobj.delete()
439 else:
440 # Keep the position of the stream to be deleted
441 preserved_gobj.copy_position_from(gobj)
442 gobj.delete()
443 # If both streams are shown in multiple parent groups, we only want to show the
444 # stream at the lowest level. make a list of these merged graphicsObjects so we can remove
445 # the extras later.
446 merged_gobjs.append(preserved_gobj)
447 else:
448 # connect this graphic object to the preserved stream
449 gobj.simulationObject = preserve_stream
450 gobj.save()
452 # Of the merged graphics objects, only keep the one in the lowest group
453 parent_groups: list[int] = []
455 for gobj in merged_gobjs:
456 parent_group = gobj.group.get_parent_group()
457 if parent_group is not None:
458 parent_groups.append(parent_group.pk)
460 for gobj in merged_gobjs:
461 if gobj.group.pk in parent_groups:
462 gobj.delete()
464 def _collect_unique_groups(*group_iters) -> set["Grouping"]:
465 """Return a set of unique Grouping objects from one or more iterables.
467 This preserves iteration order only in the sense that groups seen earlier
468 are added first, but the return value is a set because callers expect
469 membership semantics rather than ordering.
470 """
471 unique = set()
472 for iterable in group_iters:
473 for g in iterable:
474 if g not in unique:
475 unique.add(g)
476 return unique
478 # get all groups that are groups of either stream
479 inlet_stream_groups = list(inlet_stream.get_groups())
480 outlet_stream_groups = list(outlet_stream.get_groups())
481 intermediate_groups: set["Grouping"] = _collect_unique_groups(inlet_stream_groups, outlet_stream_groups)
483 from flowsheetInternals.unitops.models.delete_factory import DeleteFactory
484 if not outlet_stream.has_recycle_connection and inlet_stream.has_path_to(outlet_stream):
486 # preserve the outlet stream
487 preserve_stream = outlet_stream
488 inlet_port.stream = preserve_stream
489 inlet_port.save()
490 DeleteFactory.delete_object(inlet_stream)
492 update_graphic_object_on_merge(preserve_stream, inlet_stream)
494 # attach recycle block to the inlet stream
495 # this also handles updating property access
496 outlet_stream.attach_recycle(intermediate_groups)
497 inlet_stream.delete_control_values()
499 outlet_stream.reevaluate_properties_enabled()
500 else:
501 # Preserve the outlet stream
502 preserve_stream = outlet_stream
503 inlet_port.stream = preserve_stream
504 inlet_port.save()
505 DeleteFactory.delete_object(inlet_stream)
506 # Update graphic object
507 update_graphic_object_on_merge(preserve_stream, inlet_stream)
509 return preserve_stream
511 def delete_control_values(self) -> None:
512 """
513 Deletes all control values connected to properties in this object
514 """
515 property_set: PropertySet = self.properties
516 for prop in property_set.ContainedProperties.all():
517 for value in prop.values.all():
518 if value.is_control_set_point():
519 value.controlSetPoint.delete()
521 def attach_recycle(self, intermediate_groups: set["Grouping"] | None = None) -> None:
522 """
523 Attaches a new recycle block to the stream
524 """
525 if self.has_recycle_connection: 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true
526 return # already has a recycle connection
527 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
528 from flowsheetInternals.graphicData.models.groupingModel import Grouping
529 from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject
531 recycle = SimulationObjectFactory.create_simulation_object(
532 objectType="recycle",
533 flowsheet=self.flowsheet,
534 coordinates={
535 'x': self.graphicObject.last().x + self.graphicObject.last().width / 2,
536 'y': self.graphicObject.last().y + self.graphicObject.last().height / 2 + 100
537 },
538 )
540 # Set graphic objects for the recycle block in the same groups as the stream
541 default_graphic = recycle.graphicObject.last()
542 default_width = default_graphic.width if default_graphic else 32
543 default_height = default_graphic.height if default_graphic else 32
545 # Remove all graphic objects associated with the recycle so we can re-add them to proper groups
546 recycle.graphicObject.all().delete()
548 # If caller didn't provide groups, use this stream's groups
549 if intermediate_groups is None:
550 intermediate_groups = set(self.get_groups())
551 else:
552 # ensure we have a set (in case a generator or queryset was passed)
553 intermediate_groups = set(intermediate_groups)
555 normalized_groups: set[Grouping] = set()
556 seen_group_ids: set[int] = set()
558 # Normalise groups to Grouping objects and remove duplicates
559 for group in intermediate_groups:
560 group_obj = None
561 if isinstance(group, Grouping): 561 ↛ 563line 561 didn't jump to line 563 because the condition on line 561 was always true
562 group_obj = group
563 elif group is not None:
564 group_obj = Grouping.objects.filter(pk=group).first()
566 if group_obj and group_obj.pk not in seen_group_ids: 566 ↛ 559line 566 didn't jump to line 559 because the condition on line 566 was always true
567 normalized_groups.add(group_obj)
568 seen_group_ids.add(group_obj.pk)
570 # Create graphic objects for each group
571 for group in normalized_groups:
572 stream_graphic = self.graphicObject.filter(group=group).last()
573 if stream_graphic is None:
574 continue
576 GraphicObject.objects.create(
577 flowsheet=self.flowsheet,
578 simulationObject=recycle,
579 width=default_width,
580 height=default_height,
581 x=stream_graphic.x + (stream_graphic.width - default_width) / 2,
582 y=stream_graphic.y + stream_graphic.height + 30,
583 group=group,
584 )
586 recycle.recycleData.update(self)
587 recycle.save()
589 def has_path_to(self, end_stream: "SimulationObject", check_recycles=True) -> bool:
590 """
591 Checks if there is a path in the flowsheet from the start stream (self) to the end stream
592 Can be used to check for loops in the flowsheet if these two streams are being merged
594 - param end_stream: The stream to check if there is a path to (from self)
595 - param check_recycles: If True, will skip the path if a stream has a recycle connection
596 """
597 remaining_streams = [self]
598 while remaining_streams:
599 current_stream = remaining_streams.pop()
600 if current_stream == end_stream:
601 # loop detected
602 return True
603 unit_op_port = current_stream.connectedPorts.filter(direction=ConType.Inlet).first()
604 if not unit_op_port:
605 continue
606 unit_op = unit_op_port.unitOp
607 # get the outlet ports of the unitop
608 connected_port_keys = get_connected_port_keys(unit_op_port.key, unit_op.schema)
609 outlet_ports = unit_op.ports.filter(
610 direction=ConType.Outlet,
611 key__in=connected_port_keys
612 )
613 for outlet_port in outlet_ports:
614 outlet_stream = outlet_port.stream
615 if outlet_stream is not None: 615 ↛ 613line 615 didn't jump to line 613 because the condition on line 615 was always true
616 if check_recycles and outlet_stream.has_recycle_connection: 616 ↛ 617line 616 didn't jump to line 617 because the condition on line 616 was never true
617 continue
618 remaining_streams.append(outlet_stream)
619 return False
621 def make_decision_node(self, num_inlets, num_outlets):
622 """
623 Turns a stream into a decision node with n inlet and m outlet ports
624 :param num_inlets: number of inlet ports to create
625 :param num_outlets: number of outlet ports to create
626 :return: Decision Node object
627 """
628 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
629 if self.objectType == SimulationObjectClass.Stream: 629 ↛ exitline 629 didn't return from function 'make_decision_node' because the condition on line 629 was always true
631 # Create Decision Node
632 modified_schema: ObjectType = configuration["decisionNode"].model_copy(deep=True)
633 modified_schema.ports["inlet"].default = num_inlets
634 modified_schema.ports["outlet"].default = num_outlets
635 graphicObject = self.graphicObject.last() # For now, we are assuming this only has one graphicObject.
636 parentGroup = self.graphicObject.last().group.id
637 decision_node = SimulationObjectFactory.create_simulation_object(
638 coordinates={'x': graphicObject.x, 'y': graphicObject.y},
639 objectType="decisionNode",
640 schema=modified_schema,
641 flowsheet=self.flowsheet,
642 create_attached_streams=False,
643 parentGroup=parentGroup,
644 )
646 if self.connectedPorts.count() > 1:
647 # Connect both streams to Decision Node
648 ms_outlet_port = self.connectedPorts.filter(direction=ConType.Inlet).first()
649 dn_outlet_port = decision_node.ports.filter(direction=ConType.Outlet).first()
650 dn_inlet_port = decision_node.ports.filter(direction=ConType.Inlet).first()
651 new_stream = SimulationObjectFactory.create_stream_at_port(port=dn_outlet_port)
652 ms_outlet_port.stream = new_stream
653 dn_inlet_port.stream = self
655 # Save Objects
656 ms_outlet_port.save()
657 dn_outlet_port.save()
658 dn_inlet_port.save()
659 decision_node.save()
661 # Center GraphicObjects
662 self.horizontally_center_graphic()
663 new_stream.horizontally_center_graphic()
664 self.save()
665 new_stream.save()
666 else:
667 # For now, only dealing with the case that a regular stream is initialized with one port
668 port = decision_node.ports.first()
669 port.stream = self
670 port.save()
671 self.horizontally_center_graphic()
672 self.save()
673 self.save()
674 decision_node.save()
675 # handle compounds for decision node
676 update_decision_node_and_propagate(decision_node, updated_via_right_click=True)
677 return decision_node
679 def update_compounds(self, compounds: list[str]) -> None:
680 """
681 Updates the compounds for this stream
682 """
683 update_compounds_on_set(self, compounds)
685 def add_port(self, key: str, existing_stream: "SimulationObject | None" = None) -> Port:
686 """
687 Adds a port to this object and adds a a new stream if none is provided
688 """
689 from flowsheetInternals.graphicData.logic.make_group import propagate_streams
690 from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory
691 # Create ports
692 ports_schema = self.schema.ports
694 # replace any many=True ports with multiple ports
695 port_dict = ports_schema[key]
696 # get the next index for the port (looks at all ports with the same key and gets the length of the l
697 next_index = self.ports.filter(key=key).count()
698 new_port = Port(
699 key=key,
700 index=next_index,
701 direction=port_dict.type,
702 displayName=port_dict.displayName + f" {next_index + 1}",
703 unitOp=self,
704 flowsheet=self.flowsheet
705 )
706 new_port.save()
708 self.update_height()
710 if existing_stream:
711 # connect existing stream to the new port
712 new_port.stream = existing_stream
713 new_port.save() # might have to remove this trying to figure out why other streams are being removed
715 if self.objectType == "decisionNode": 715 ↛ 789line 715 didn't jump to line 789 because the condition on line 715 was always true
716 # Use existing function to update decision node compounds
717 update_decision_node_and_propagate(self)
718 else:
719 # create a new strea at the port using the factory
720 new_stream = SimulationObjectFactory.create_stream_at_port(new_port)
721 update_compounds_on_add_stream(new_port, new_stream)
722 outlet_name = self.schema.splitter_fraction_name
723 for property_key, property in self.schema.properties.items():
724 if property.indexSets is not None and (
725 'splitter_fraction' in property.indexSets) and new_port.direction == ConType.Outlet:
726 property_infos = self.properties.ContainedProperties.all()
727 property_info = property_infos.first()
728 new_indexed_item = IndexedItem.objects.create(
729 owner=self,
730 key="outlet_" + f"{next_index + 1}",
731 displayName=outlet_name + f" {next_index + 1}",
732 type="splitter_fraction",
733 flowsheet=self.flowsheet,
734 )
736 # get indexed set for split_fraction or priorities:
737 index_sets = property.indexSets
739 # figure out which other indexed items this should link to
740 indexed_item_sets: List[List[IndexedItem]] = []
741 for index in index_sets:
742 if index == IndexChoices.SplitterFraction:
743 continue # We don't need this, it's the one we're editing
744 indexed_items: List[IndexedItem] = self.get_indexed_items(
745 index) # e.g get_indexed_items("phase"), "compound", etc
746 indexed_item_sets.append(indexed_items)
748 # create a property value for each combination of indexed items, for the new outlet
749 combinations = list(itertools.product(*indexed_item_sets))
751 # Bulk update existing property values to be enabled
752 property_status = False
753 if self.objectType == "header" or self.objectType == "simple_header":
754 property_status = True
755 else:
756 property_info.values.update(enabled=True)
758 # bulk create indexed items (all disabled as it's the last outlet)
759 index_links_to_create: List[PropertyValueIntermediate] = []
760 for combination in combinations:
761 # Create a propertyValue for this combination
762 combo_property_value = PropertyValue.objects.create(property=property_info,
763 enabled=property_status,
764 flowsheet=self.flowsheet)
765 # link it up to the new outlet
766 index_links_to_create.append(
767 PropertyValueIntermediate(propertyvalue=combo_property_value, indexeditem=new_indexed_item)
768 )
770 # Also link it up to all the other sets
771 for indexed_item in combination:
772 index_links_to_create.append(
773 PropertyValueIntermediate(propertyvalue=combo_property_value, indexeditem=indexed_item)
774 )
776 # Bulk create the links
777 PropertyValueIntermediate.objects.bulk_create(index_links_to_create)
779 new_stream.save()
780 # Rotate the new stream to match the rotation of the unitop it is attached to
781 self.update_stream_rotation(new_stream, self)
783 # Ensure the new stream appears in all parent groups
784 if new_port.direction == ConType.Inlet:
785 propagate_streams([new_stream], ConType.Inlet)
786 elif new_port.direction == ConType.Outlet: 786 ↛ 789line 786 didn't jump to line 789 because the condition on line 786 was always true
787 propagate_streams([new_stream], ConType.Outlet)
789 return new_port
791 def update_height(self):
792 """
793 Updates the height of the graphic object based on the number of ports
794 """
795 if self.schema.graphicObject.autoHeight: # e.g header has auto height calculation
796 OFFSET_VALUE = 200
798 max_ports = max(self.ports.filter(direction=ConType.Inlet).count(),
799 self.ports.filter(direction=ConType.Outlet).count())
800 rotation = self.graphicObject.last().rotation
802 graphic_object: GraphicObject = self.graphicObject.last()
803 new_length = max_ports * OFFSET_VALUE
804 obj_width = graphic_object.width
805 obj_height = graphic_object.height
807 # Update graphic object position to keep the other streams centered
808 if rotation == 90:
809 if new_length > obj_width:
810 self.graphicObject.update(x=graphic_object.x - OFFSET_VALUE)
811 elif new_length < obj_width:
812 self.graphicObject.update(x=graphic_object.x + OFFSET_VALUE)
813 elif rotation == 180:
814 if new_length > obj_height:
815 self.graphicObject.update(y=graphic_object.y - OFFSET_VALUE)
816 elif new_length < obj_height:
817 self.graphicObject.update(y=graphic_object.y + OFFSET_VALUE)
819 # Update the correct dimension based on rotation
820 if rotation in [90, 270]:
821 self.graphicObject.update(width=new_length)
822 else:
823 self.graphicObject.update(height=new_length)
825 def update_stream_rotation(self, stream: "SimulationObject", unitop: "SimulationObject") -> None:
826 """
827 Updates the rotation of the stream based on the rotation of the unitop it is attached to.
828 """
829 graphic_object = stream.graphicObject.last()
831 if graphic_object is None: 831 ↛ 832line 831 didn't jump to line 832 because the condition on line 831 was never true
832 return
834 # Update the rotation of the stream to match the rotation of the unitop
835 graphic_object.rotation = unitop.graphicObject.last().rotation
836 graphic_object.flipped = unitop.graphicObject.last().flipped
837 graphic_object.save()
839 def get_indexed_items(self, index_set_type: IndexChoices) -> List[IndexedItem]:
840 match index_set_type:
841 case IndexChoices.Phase:
842 # get all the indexedItems that are type=phase
843 items = IndexedItem.objects.filter(owner=self, type=IndexChoices.Phase).all()
844 return items
845 case IndexChoices.Compound: 845 ↛ 848line 845 didn't jump to line 848 because the pattern on line 845 always matched
846 items = IndexedItem.objects.filter(owner=self, type=IndexChoices.Compound).all()
847 return items
848 case _:
849 raise ValueError("Get_indexed_items didn't expect this index set type")
851 def merge_decision_nodes(self, decision_node_active: "SimulationObject", decision_node_over: "SimulationObject") -> \
852 Optional["SimulationObject"]:
853 """
854 Merges this decision node with the over decision node and handles graphics positioning
855 """
856 import flowsheetInternals.unitops.models.delete_factory as DeleteFactory
858 # Get all streams from active node
859 inlet_streams = decision_node_active.ports.filter(direction="inlet").all()
860 outlet_streams = decision_node_active.ports.filter(direction="outlet").all()
862 # Transfer and reposition inlet streams
863 for inlet_port in inlet_streams:
864 stream = inlet_port.stream
865 # Add stream to new decision node
866 decision_node_over.add_port("inlet", stream)
868 # Update compounds and propagate
869 update_decision_node_and_propagate(decision_node_over)
871 # Transfer and reposition outlet streams
872 for outlet_port in outlet_streams:
873 stream = outlet_port.stream
874 # Add stream to new decision node
875 decision_node_over.add_port("outlet", stream)
877 # Delete the active node and its graphic object
878 DeleteFactory.delete_object(decision_node_active)
879 decision_node_over.save()
880 return decision_node_over
882 def reevaluate_properties_enabled(self) -> None:
883 """
884 Reevaluates property access for all properties in this object
886 This should only really be called when connections are changed,
887 otherwise the property enabling should be handled by adding
888 or removing control values.
889 """
890 if (self.objectType == SimulationObjectClass.MachineLearningBlock):
891 return # We don't want to change from the defaults
892 properties: list[PropertyInfo] = self.properties.ContainedProperties.all()
893 config = self.schema
894 config_properties = config.properties
895 config_groups = config.propertySetGroups
897 def _eval_enabled(prop: PropertyInfo, config_group) -> bool:
898 if self.is_stream():
899 # disable outlet/intermediate stream properties
900 ports = self.connectedPorts.all()
901 if len(ports) == 2 or (len(ports) == 1 and ports[0].direction == "outlet"):
902 return False
903 if config_group is None: 903 ↛ 904line 903 didn't jump to line 904 because the condition on line 903 was never true
904 return True # default to enabled, e.g for custom properties
905 if config_group.type == "stateVars":
906 state_vars = getattr(config_group, "stateVars") or ()
907 return prop.key in state_vars
908 return True # eg. All, default to enabled
910 list_prop_val = []
912 for prop in properties:
913 config_prop = config_properties.get(prop.key)
914 if config_prop:
915 group = config_prop.propertySetGroup
916 else:
917 group = "default"
918 config_group = config_groups.get(group, None)
919 res = _eval_enabled(prop, config_group)
920 list_prop_val.extend(prop.enable(res))
922 PropertyValue.objects.bulk_update(list_prop_val, ["enabled"])
924 def get_unspecified_properties(self) -> list:
925 if not hasattr(self, "properties") or not self.properties: 925 ↛ 926line 925 didn't jump to line 926 because the condition on line 925 was never true
926 return []
928 contained_properties: models.QuerySet[PropertyInfo] = self.properties.ContainedProperties.all()
929 is_splitter = getattr(self.schema, "displayType", "").lower() == "splitter"
931 # use schema stateVars to find required properties
932 required_properties: Set[str] = set()
933 for group in self.schema.propertySetGroups.values():
934 if group.toggle and not self.properties.get_property(group.toggle).get_value():
935 # The group is toggled off, so we don't care that the properties are not specified
936 continue
937 if group.type == "stateVars" or group.type == "composition" or group.type == "exceptLast":
938 required_properties.update(group.stateVars)
940 # use pre fetched data to avoid additional queries
941 unspecified_properties = []
942 property_info: PropertyInfo
943 for property_info in contained_properties:
944 # check if the property has no values or if all values are invalid
945 has_valid_value = property_info.isSpecified()
946 if not has_valid_value and property_info.key in required_properties:
947 unspecified_properties.append(property_info.key)
949 # check if mole_frac_comp sums to 1
950 mole_frac_props = [x for x in contained_properties if x.key == "mole_frac_comp"]
952 for prop in mole_frac_props:
953 if prop.has_value_bulk():
954 total = 0.0
955 for value in prop.values.all():
956 try:
957 val = float(value.value)
958 total += val
959 except (ValueError, TypeError):
960 continue
961 if abs(total - 1.0) > 0.001 and "mole_frac_comp" not in unspecified_properties:
962 unspecified_properties.append("mole_frac_comp")
963 else:
964 if "mole_frac_comp" not in unspecified_properties:
965 unspecified_properties.append("mole_frac_comp")
967 return unspecified_properties
969 def delete(self, *args, **kwargs):
970 raise NotImplementedError("Use delete_object method from DeleteFactory")
972 def permanently_delete(self, *args, **kwargs):
973 """
974 Permanently deletes the object from the database.
975 This should only be used in tests or when you are sure you want to delete the object.
976 """
977 super().delete(*args, **kwargs)
979 def delete_empty_node(self, connected_stream: "SimulationObject", connected_stream_port: "Port"):
980 """
981 Deletes the empty_port stream from parent groups when creating a decision node inside a group.
982 :param connected_stream: Stream connected to the empty port
983 :param connected_stream_port: Port of the connected stream
984 """
985 simulation_object_id = connected_stream.id
986 current_group = connected_stream_port.unitOp.get_group()
987 parent_groups = connected_stream_port.unitOp.get_parent_groups()
989 for parent_group in parent_groups:
990 if parent_group != current_group:
991 gobjs_in_group = connected_stream.graphicObject.filter(group=parent_group)
992 for graphic_object in gobjs_in_group:
993 if graphic_object.simulationObject.id == simulation_object_id: 993 ↛ 992line 993 didn't jump to line 992 because the condition on line 993 was always true
994 graphic_object.delete()